cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14    account::*,
15    balance::*,
16    entry::*,
17    ledger_operation::*,
18    outbox::*,
19    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
20};
21
22pub use cursor::*;
23pub use entity::*;
24use error::*;
25use repo::*;
26pub use repo::{account_set_cursor::*, members_cursor::*};
27
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32    repo: AccountSetRepo,
33    accounts: Accounts,
34    entries: Entries,
35    balances: Balances,
36    outbox: Outbox,
37    pool: PgPool,
38}
39
40impl AccountSets {
41    pub(crate) fn new(
42        pool: &PgPool,
43        outbox: Outbox,
44        accounts: &Accounts,
45        entries: &Entries,
46        balances: &Balances,
47    ) -> Self {
48        Self {
49            repo: AccountSetRepo::new(pool),
50            outbox,
51            accounts: accounts.clone(),
52            entries: entries.clone(),
53            balances: balances.clone(),
54            pool: pool.clone(),
55        }
56    }
57    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58    pub async fn create(
59        &self,
60        new_account_set: NewAccountSet,
61    ) -> Result<AccountSet, AccountSetError> {
62        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63        let account_set = self.create_in_op(&mut op, new_account_set).await?;
64        op.commit().await?;
65        Ok(account_set)
66    }
67
68    #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69    pub async fn create_in_op(
70        &self,
71        db: &mut LedgerOperation<'_>,
72        new_account_set: NewAccountSet,
73    ) -> Result<AccountSet, AccountSetError> {
74        let new_account = NewAccount::builder()
75            .id(new_account_set.id)
76            .name(String::new())
77            .code(new_account_set.id.to_string())
78            .normal_balance_type(new_account_set.normal_balance_type)
79            .is_account_set(true)
80            .velocity_context_values(new_account_set.context_values())
81            .build()
82            .expect("Failed to build account");
83        self.accounts.create_in_op(db, new_account).await?;
84
85        let account_set = self.repo.create_in_op(db, new_account_set).await?;
86        db.accumulate(account_set.last_persisted(1).map(|p| &p.event));
87
88        Ok(account_set)
89    }
90
91    pub async fn create_all(
92        &self,
93        new_account_sets: Vec<NewAccountSet>,
94    ) -> Result<Vec<AccountSet>, AccountSetError> {
95        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
96        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
97        op.commit().await?;
98        Ok(account_sets)
99    }
100
101    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
102    pub async fn create_all_in_op(
103        &self,
104        db: &mut LedgerOperation<'_>,
105        new_account_sets: Vec<NewAccountSet>,
106    ) -> Result<Vec<AccountSet>, AccountSetError> {
107        let mut new_accounts = Vec::new();
108        for new_account_set in new_account_sets.iter() {
109            let new_account = NewAccount::builder()
110                .id(new_account_set.id)
111                .name(String::new())
112                .code(new_account_set.id.to_string())
113                .normal_balance_type(new_account_set.normal_balance_type)
114                .is_account_set(true)
115                .velocity_context_values(new_account_set.context_values())
116                .build()
117                .expect("Failed to build account");
118            new_accounts.push(new_account);
119        }
120        self.accounts.create_all_in_op(db, new_accounts).await?;
121
122        let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
123        db.accumulate(
124            account_sets
125                .iter()
126                .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
127        );
128
129        Ok(account_sets)
130    }
131
132    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
133    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
134        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
135        self.persist_in_op(&mut op, account_set).await?;
136        op.commit().await?;
137        Ok(())
138    }
139
140    #[instrument(
141        name = "cala_ledger.account_sets.persist_in_op",
142        skip(self, db, account_set)
143    )]
144    pub async fn persist_in_op(
145        &self,
146        db: &mut LedgerOperation<'_>,
147        account_set: &mut AccountSet,
148    ) -> Result<(), AccountSetError> {
149        let n_events = self.repo.update_in_op(db, account_set).await?;
150        db.accumulate(account_set.last_persisted(n_events).map(|p| &p.event));
151
152        self.accounts
153            .update_velocity_context_values_in_op(db, account_set.values())
154            .await?;
155
156        Ok(())
157    }
158
159    pub async fn add_member(
160        &self,
161        account_set_id: AccountSetId,
162        member: impl Into<AccountSetMemberId>,
163    ) -> Result<AccountSet, AccountSetError> {
164        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
165        let account_set = self
166            .add_member_in_op(&mut op, account_set_id, member)
167            .await?;
168        op.commit().await?;
169        Ok(account_set)
170    }
171
172    pub async fn add_member_in_op(
173        &self,
174        op: &mut LedgerOperation<'_>,
175        account_set_id: AccountSetId,
176        member: impl Into<AccountSetMemberId>,
177    ) -> Result<AccountSet, AccountSetError> {
178        let member = member.into();
179        let (time, parents, account_set, member_id) = match member {
180            AccountSetMemberId::Account(id) => {
181                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
182                let (time, parents) = self
183                    .repo
184                    .add_member_account_and_return_parents(&mut *op, account_set_id, id)
185                    .await?;
186                (time, parents, set, id)
187            }
188            AccountSetMemberId::AccountSet(id) => {
189                let mut accounts = self
190                    .repo
191                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
192                    .await?;
193                let target = accounts
194                    .remove(&account_set_id)
195                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
196                let member = accounts
197                    .remove(&id)
198                    .ok_or(AccountSetError::CouldNotFindById(id))?;
199
200                if target.values().journal_id != member.values().journal_id {
201                    return Err(AccountSetError::JournalIdMismatch);
202                }
203
204                let (time, parents) = self
205                    .repo
206                    .add_member_set_and_return_parents(op, account_set_id, id)
207                    .await?;
208                (time, parents, target, AccountId::from(id))
209            }
210        };
211
212        op.accumulate(std::iter::once(
213            OutboxEventPayload::AccountSetMemberCreated {
214                source: DataSource::Local,
215                account_set_id,
216                member_id: member,
217            },
218        ));
219
220        let balances = self
221            .balances
222            .find_balances_for_update(op, account_set.values().journal_id, member_id)
223            .await?;
224
225        let target_account_id = AccountId::from(&account_set.id());
226        let mut entries = Vec::new();
227        for balance in balances.into_values() {
228            entries_for_add_balance(&mut entries, target_account_id, balance);
229        }
230
231        if entries.is_empty() {
232            return Ok(account_set);
233        }
234        let entries = self.entries.create_all_in_op(op, entries).await?;
235        let mappings = std::iter::once((target_account_id, parents)).collect();
236        self.balances
237            .update_balances_in_op(
238                op,
239                account_set.values().journal_id,
240                entries,
241                time.date_naive(),
242                time,
243                mappings,
244            )
245            .await?;
246
247        Ok(account_set)
248    }
249
250    pub async fn remove_member(
251        &self,
252        account_set_id: AccountSetId,
253        member: impl Into<AccountSetMemberId>,
254    ) -> Result<AccountSet, AccountSetError> {
255        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
256        let account_set = self
257            .remove_member_in_op(&mut op, account_set_id, member)
258            .await?;
259        op.commit().await?;
260        Ok(account_set)
261    }
262
263    pub async fn remove_member_in_op(
264        &self,
265        op: &mut LedgerOperation<'_>,
266        account_set_id: AccountSetId,
267        member: impl Into<AccountSetMemberId>,
268    ) -> Result<AccountSet, AccountSetError> {
269        let member = member.into();
270        let (time, parents, account_set, member_id) = match member {
271            AccountSetMemberId::Account(id) => {
272                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
273                let (time, parents) = self
274                    .repo
275                    .remove_member_account_and_return_parents(op, account_set_id, id)
276                    .await?;
277                (time, parents, set, id)
278            }
279            AccountSetMemberId::AccountSet(id) => {
280                let mut accounts = self
281                    .repo
282                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
283                    .await?;
284                let target = accounts
285                    .remove(&account_set_id)
286                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
287                let member = accounts
288                    .remove(&id)
289                    .ok_or(AccountSetError::CouldNotFindById(id))?;
290
291                if target.values().journal_id != member.values().journal_id {
292                    return Err(AccountSetError::JournalIdMismatch);
293                }
294
295                let (time, parents) = self
296                    .repo
297                    .remove_member_set_and_return_parents(op, account_set_id, id)
298                    .await?;
299                (time, parents, target, AccountId::from(id))
300            }
301        };
302
303        op.accumulate(std::iter::once(
304            OutboxEventPayload::AccountSetMemberRemoved {
305                source: DataSource::Local,
306                account_set_id,
307                member_id: member,
308            },
309        ));
310
311        let balances = self
312            .balances
313            .find_balances_for_update(op, account_set.values().journal_id, member_id)
314            .await?;
315
316        let target_account_id = AccountId::from(&account_set.id());
317        let mut entries = Vec::new();
318        for balance in balances.into_values() {
319            entries_for_remove_balance(&mut entries, target_account_id, balance);
320        }
321
322        if entries.is_empty() {
323            return Ok(account_set);
324        }
325        let entries = self.entries.create_all_in_op(op, entries).await?;
326        let mappings = std::iter::once((target_account_id, parents)).collect();
327        self.balances
328            .update_balances_in_op(
329                op,
330                account_set.values().journal_id,
331                entries,
332                time.date_naive(),
333                time,
334                mappings,
335            )
336            .await?;
337
338        Ok(account_set)
339    }
340
341    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
342    pub async fn find_all<T: From<AccountSet>>(
343        &self,
344        account_set_ids: &[AccountSetId],
345    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
346        self.repo.find_all(account_set_ids).await
347    }
348
349    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
350    pub async fn find_all_in_op<T: From<AccountSet>>(
351        &self,
352        op: &mut LedgerOperation<'_>,
353        account_set_ids: &[AccountSetId],
354    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
355        self.repo.find_all_in_op(op, account_set_ids).await
356    }
357
358    #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
359    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
360        self.repo.find_by_id(account_set_id).await
361    }
362
363    #[instrument(
364        name = "cala_ledger.accounts_sets.find_by_external_id",
365        skip(self),
366        err
367    )]
368    pub async fn find_by_external_id(
369        &self,
370        external_id: String,
371    ) -> Result<AccountSet, AccountSetError> {
372        self.repo.find_by_external_id(Some(external_id)).await
373    }
374
375    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
376    pub async fn find_where_member(
377        &self,
378        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
379        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
380    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
381    {
382        match member.into() {
383            AccountSetMemberId::Account(account_id) => {
384                self.repo
385                    .find_where_account_is_member(account_id, query)
386                    .await
387            }
388            AccountSetMemberId::AccountSet(account_set_id) => {
389                self.repo
390                    .find_where_account_set_is_member(account_set_id, query)
391                    .await
392            }
393        }
394    }
395
396    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
397    pub async fn list_for_name(
398        &self,
399        name: String,
400        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
401    ) -> Result<
402        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
403        AccountSetError,
404    > {
405        self.repo
406            .list_for_name_by_created_at(name, args, Default::default())
407            .await
408    }
409
410    #[instrument(
411        name = "cala_ledger.account_sets.list_for_name_in_op",
412        skip(self, op),
413        err
414    )]
415    pub async fn list_for_name_in_op(
416        &self,
417        op: &mut LedgerOperation<'_>,
418        name: String,
419        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
420    ) -> Result<
421        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
422        AccountSetError,
423    > {
424        self.repo
425            .list_for_name_by_created_at_in_op(op, name, args, Default::default())
426            .await
427    }
428
429    #[instrument(
430        name = "cala_ledger.account_sets.find_where_member_in_op",
431        skip(self, op),
432        err
433    )]
434    pub async fn find_where_member_in_op(
435        &self,
436        op: &mut LedgerOperation<'_>,
437        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
438        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
439    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
440    {
441        match member.into() {
442            AccountSetMemberId::Account(account_id) => {
443                self.repo
444                    .find_where_account_is_member_in_op(op, account_id, query)
445                    .await
446            }
447            AccountSetMemberId::AccountSet(account_set_id) => {
448                self.repo
449                    .find_where_account_set_is_member_in_op(op, account_set_id, query)
450                    .await
451            }
452        }
453    }
454
455    pub async fn list_members_by_created_at(
456        &self,
457        id: AccountSetId,
458        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
459    ) -> Result<
460        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
461        AccountSetError,
462    > {
463        self.repo.list_children_by_created_at(id, args).await
464    }
465
466    pub async fn list_members_by_created_at_in_op(
467        &self,
468        op: &mut LedgerOperation<'_>,
469        id: AccountSetId,
470        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
471    ) -> Result<
472        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
473        AccountSetError,
474    > {
475        self.repo
476            .list_children_by_created_at_in_op(op, id, args)
477            .await
478    }
479
480    pub async fn list_members_by_external_id(
481        &self,
482        id: AccountSetId,
483        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
484    ) -> Result<
485        es_entity::PaginatedQueryRet<
486            AccountSetMemberByExternalId,
487            AccountSetMembersByExternalIdCursor,
488        >,
489        AccountSetError,
490    > {
491        self.repo.list_children_by_external_id(id, args).await
492    }
493
494    pub async fn list_members_by_external_id_in_op(
495        &self,
496        op: &mut LedgerOperation<'_>,
497        id: AccountSetId,
498        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
499    ) -> Result<
500        es_entity::PaginatedQueryRet<
501            AccountSetMemberByExternalId,
502            AccountSetMembersByExternalIdCursor,
503        >,
504        AccountSetError,
505    > {
506        self.repo
507            .list_children_by_external_id_in_op(op, id, args)
508            .await
509    }
510
511    pub(crate) async fn fetch_mappings_in_op(
512        &self,
513        op: &mut LedgerOperation<'_>,
514        journal_id: JournalId,
515        account_ids: &[AccountId],
516    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
517        self.repo
518            .fetch_mappings_in_op(op, journal_id, account_ids)
519            .await
520    }
521
522    #[cfg(feature = "import")]
523    pub(crate) async fn sync_account_set_creation(
524        &self,
525        mut db: es_entity::DbOpWithTime<'_>,
526        origin: DataSourceId,
527        values: AccountSetValues,
528    ) -> Result<(), AccountSetError> {
529        let mut account_set = AccountSet::import(origin, values);
530        self.repo
531            .import_in_op(&mut db, origin, &mut account_set)
532            .await?;
533        let outbox_events: Vec<_> = account_set
534            .last_persisted(1)
535            .map(|p| OutboxEventPayload::from(&p.event))
536            .collect();
537        let time = db.now();
538        self.outbox
539            .persist_events_at(db, outbox_events, time)
540            .await?;
541        Ok(())
542    }
543
544    #[cfg(feature = "import")]
545    pub(crate) async fn sync_account_set_update(
546        &self,
547        mut db: es_entity::DbOpWithTime<'_>,
548        values: AccountSetValues,
549        fields: Vec<String>,
550    ) -> Result<(), AccountSetError> {
551        let mut account_set = self.repo.find_by_id(values.id).await?;
552        account_set.update((values, fields));
553        let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
554        let outbox_events: Vec<_> = account_set
555            .last_persisted(n_events)
556            .map(|p| OutboxEventPayload::from(&p.event))
557            .collect();
558        let time = db.now();
559        self.outbox
560            .persist_events_at(db, outbox_events, time)
561            .await?;
562        Ok(())
563    }
564
565    #[cfg(feature = "import")]
566    pub(crate) async fn sync_account_set_member_creation(
567        &self,
568        mut db: es_entity::DbOpWithTime<'_>,
569        origin: DataSourceId,
570        account_set_id: AccountSetId,
571        member_id: AccountSetMemberId,
572    ) -> Result<(), AccountSetError> {
573        match member_id {
574            AccountSetMemberId::Account(account_id) => {
575                self.repo
576                    .import_member_account_in_op(&mut db, account_set_id, account_id)
577                    .await?;
578            }
579            AccountSetMemberId::AccountSet(account_set_id) => {
580                self.repo
581                    .import_member_set_in_op(&mut db, account_set_id, account_set_id)
582                    .await?;
583            }
584        }
585        let time = db.now();
586        self.outbox
587            .persist_events_at(
588                db,
589                std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
590                    source: DataSource::Remote { id: origin },
591                    account_set_id,
592                    member_id,
593                }),
594                time,
595            )
596            .await?;
597        Ok(())
598    }
599
600    #[cfg(feature = "import")]
601    pub(crate) async fn sync_account_set_member_removal(
602        &self,
603        mut db: es_entity::DbOpWithTime<'_>,
604        origin: DataSourceId,
605        account_set_id: AccountSetId,
606        member_id: AccountSetMemberId,
607    ) -> Result<(), AccountSetError> {
608        match member_id {
609            AccountSetMemberId::Account(account_id) => {
610                self.repo
611                    .import_remove_member_account(&mut db, account_set_id, account_id)
612                    .await?;
613            }
614            AccountSetMemberId::AccountSet(account_set_id) => {
615                self.repo
616                    .import_remove_member_set(&mut db, account_set_id, account_set_id)
617                    .await?;
618            }
619        }
620        let time = db.now();
621        self.outbox
622            .persist_events_at(
623                db,
624                std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
625                    source: DataSource::Remote { id: origin },
626                    account_set_id,
627                    member_id,
628                }),
629                time,
630            )
631            .await?;
632        Ok(())
633    }
634}
635
636fn entries_for_add_balance(
637    entries: &mut Vec<NewEntry>,
638    target_account_id: AccountId,
639    balance: BalanceSnapshot,
640) {
641    use rust_decimal::Decimal;
642
643    if balance.settled.cr_balance != Decimal::ZERO {
644        let entry = NewEntry::builder()
645            .id(EntryId::new())
646            .journal_id(balance.journal_id)
647            .account_id(target_account_id)
648            .currency(balance.currency)
649            .sequence(1u32)
650            .layer(Layer::Settled)
651            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
652            .direction(DebitOrCredit::Credit)
653            .units(balance.settled.cr_balance)
654            .transaction_id(UNASSIGNED_TRANSACTION_ID)
655            .build()
656            .expect("Couldn't build entry");
657        entries.push(entry);
658    }
659    if balance.settled.dr_balance != Decimal::ZERO {
660        let entry = NewEntry::builder()
661            .id(EntryId::new())
662            .journal_id(balance.journal_id)
663            .account_id(target_account_id)
664            .currency(balance.currency)
665            .sequence(1u32)
666            .layer(Layer::Settled)
667            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
668            .direction(DebitOrCredit::Debit)
669            .units(balance.settled.dr_balance)
670            .transaction_id(UNASSIGNED_TRANSACTION_ID)
671            .build()
672            .expect("Couldn't build entry");
673        entries.push(entry);
674    }
675    if balance.pending.cr_balance != Decimal::ZERO {
676        let entry = NewEntry::builder()
677            .id(EntryId::new())
678            .journal_id(balance.journal_id)
679            .account_id(target_account_id)
680            .currency(balance.currency)
681            .sequence(1u32)
682            .layer(Layer::Pending)
683            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
684            .direction(DebitOrCredit::Credit)
685            .units(balance.pending.cr_balance)
686            .transaction_id(UNASSIGNED_TRANSACTION_ID)
687            .build()
688            .expect("Couldn't build entry");
689        entries.push(entry);
690    }
691    if balance.pending.dr_balance != Decimal::ZERO {
692        let entry = NewEntry::builder()
693            .id(EntryId::new())
694            .journal_id(balance.journal_id)
695            .account_id(target_account_id)
696            .currency(balance.currency)
697            .sequence(1u32)
698            .layer(Layer::Pending)
699            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
700            .direction(DebitOrCredit::Debit)
701            .units(balance.pending.dr_balance)
702            .transaction_id(UNASSIGNED_TRANSACTION_ID)
703            .build()
704            .expect("Couldn't build entry");
705        entries.push(entry);
706    }
707    if balance.encumbrance.cr_balance != Decimal::ZERO {
708        let entry = NewEntry::builder()
709            .id(EntryId::new())
710            .journal_id(balance.journal_id)
711            .account_id(target_account_id)
712            .currency(balance.currency)
713            .sequence(1u32)
714            .layer(Layer::Encumbrance)
715            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
716            .direction(DebitOrCredit::Credit)
717            .units(balance.encumbrance.cr_balance)
718            .transaction_id(UNASSIGNED_TRANSACTION_ID)
719            .build()
720            .expect("Couldn't build entry");
721        entries.push(entry);
722    }
723    if balance.encumbrance.dr_balance != Decimal::ZERO {
724        let entry = NewEntry::builder()
725            .id(EntryId::new())
726            .journal_id(balance.journal_id)
727            .account_id(target_account_id)
728            .currency(balance.currency)
729            .sequence(1u32)
730            .layer(Layer::Encumbrance)
731            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
732            .direction(DebitOrCredit::Debit)
733            .units(balance.encumbrance.dr_balance)
734            .transaction_id(UNASSIGNED_TRANSACTION_ID)
735            .build()
736            .expect("Couldn't build entry");
737        entries.push(entry);
738    }
739}
740
741fn entries_for_remove_balance(
742    entries: &mut Vec<NewEntry>,
743    target_account_id: AccountId,
744    balance: BalanceSnapshot,
745) {
746    use rust_decimal::Decimal;
747
748    if balance.settled.cr_balance != Decimal::ZERO {
749        let entry = NewEntry::builder()
750            .id(EntryId::new())
751            .journal_id(balance.journal_id)
752            .account_id(target_account_id)
753            .currency(balance.currency)
754            .sequence(1u32)
755            .layer(Layer::Settled)
756            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
757            .direction(DebitOrCredit::Debit)
758            .units(balance.settled.cr_balance)
759            .transaction_id(UNASSIGNED_TRANSACTION_ID)
760            .build()
761            .expect("Couldn't build entry");
762        entries.push(entry);
763    }
764    if balance.settled.dr_balance != Decimal::ZERO {
765        let entry = NewEntry::builder()
766            .id(EntryId::new())
767            .journal_id(balance.journal_id)
768            .account_id(target_account_id)
769            .currency(balance.currency)
770            .sequence(1u32)
771            .layer(Layer::Settled)
772            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
773            .direction(DebitOrCredit::Credit)
774            .units(balance.settled.dr_balance)
775            .transaction_id(UNASSIGNED_TRANSACTION_ID)
776            .build()
777            .expect("Couldn't build entry");
778        entries.push(entry);
779    }
780    if balance.pending.cr_balance != Decimal::ZERO {
781        let entry = NewEntry::builder()
782            .id(EntryId::new())
783            .journal_id(balance.journal_id)
784            .account_id(target_account_id)
785            .currency(balance.currency)
786            .sequence(1u32)
787            .layer(Layer::Pending)
788            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
789            .direction(DebitOrCredit::Debit)
790            .units(balance.pending.cr_balance)
791            .transaction_id(UNASSIGNED_TRANSACTION_ID)
792            .build()
793            .expect("Couldn't build entry");
794        entries.push(entry);
795    }
796    if balance.pending.dr_balance != Decimal::ZERO {
797        let entry = NewEntry::builder()
798            .id(EntryId::new())
799            .journal_id(balance.journal_id)
800            .account_id(target_account_id)
801            .currency(balance.currency)
802            .sequence(1u32)
803            .layer(Layer::Pending)
804            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
805            .direction(DebitOrCredit::Credit)
806            .units(balance.pending.dr_balance)
807            .transaction_id(UNASSIGNED_TRANSACTION_ID)
808            .build()
809            .expect("Couldn't build entry");
810        entries.push(entry);
811    }
812    if balance.encumbrance.cr_balance != Decimal::ZERO {
813        let entry = NewEntry::builder()
814            .id(EntryId::new())
815            .journal_id(balance.journal_id)
816            .account_id(target_account_id)
817            .currency(balance.currency)
818            .sequence(1u32)
819            .layer(Layer::Encumbrance)
820            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
821            .direction(DebitOrCredit::Debit)
822            .units(balance.encumbrance.cr_balance)
823            .transaction_id(UNASSIGNED_TRANSACTION_ID)
824            .build()
825            .expect("Couldn't build entry");
826        entries.push(entry);
827    }
828    if balance.encumbrance.dr_balance != Decimal::ZERO {
829        let entry = NewEntry::builder()
830            .id(EntryId::new())
831            .journal_id(balance.journal_id)
832            .account_id(target_account_id)
833            .currency(balance.currency)
834            .sequence(1u32)
835            .layer(Layer::Encumbrance)
836            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
837            .direction(DebitOrCredit::Credit)
838            .units(balance.encumbrance.dr_balance)
839            .transaction_id(UNASSIGNED_TRANSACTION_ID)
840            .build()
841            .expect("Couldn't build entry");
842        entries.push(entry);
843    }
844}
845
846impl From<&AccountSetEvent> for OutboxEventPayload {
847    fn from(event: &AccountSetEvent) -> Self {
848        match event {
849            #[cfg(feature = "import")]
850            AccountSetEvent::Imported {
851                source,
852                values: account_set,
853            } => OutboxEventPayload::AccountSetCreated {
854                source: *source,
855                account_set: account_set.clone(),
856            },
857            AccountSetEvent::Initialized {
858                values: account_set,
859            } => OutboxEventPayload::AccountSetCreated {
860                source: DataSource::Local,
861                account_set: account_set.clone(),
862            },
863            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
864                source: DataSource::Local,
865                account_set: values.clone(),
866                fields: fields.clone(),
867            },
868        }
869    }
870}