cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13    account::*,
14    balance::*,
15    entry::*,
16    ledger_operation::*,
17    outbox::*,
18    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27#[allow(dead_code)]
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32    repo: AccountSetRepo,
33    accounts: Accounts,
34    entries: Entries,
35    balances: Balances,
36    outbox: Outbox,
37    pool: PgPool,
38}
39
40impl AccountSets {
41    pub(crate) fn new(
42        pool: &PgPool,
43        outbox: Outbox,
44        accounts: &Accounts,
45        entries: &Entries,
46        balances: &Balances,
47    ) -> Self {
48        Self {
49            repo: AccountSetRepo::new(pool),
50            outbox,
51            accounts: accounts.clone(),
52            entries: entries.clone(),
53            balances: balances.clone(),
54            pool: pool.clone(),
55        }
56    }
57    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58    pub async fn create(
59        &self,
60        new_account_set: NewAccountSet,
61    ) -> Result<AccountSet, AccountSetError> {
62        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63        let account_set = self.create_in_op(&mut op, new_account_set).await?;
64        op.commit().await?;
65        Ok(account_set)
66    }
67
68    #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69    pub async fn create_in_op(
70        &self,
71        db: &mut LedgerOperation<'_>,
72        new_account_set: NewAccountSet,
73    ) -> Result<AccountSet, AccountSetError> {
74        let new_account = NewAccount::builder()
75            .id(uuid::Uuid::from(new_account_set.id))
76            .name(String::new())
77            .code(new_account_set.id.to_string())
78            .normal_balance_type(new_account_set.normal_balance_type)
79            .is_account_set(true)
80            .build()
81            .expect("Failed to build account");
82        self.accounts.create_in_op(db, new_account).await?;
83        let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84        db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
85        Ok(account_set)
86    }
87
88    pub async fn create_all(
89        &self,
90        new_account_sets: Vec<NewAccountSet>,
91    ) -> Result<Vec<AccountSet>, AccountSetError> {
92        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94        op.commit().await?;
95        Ok(account_sets)
96    }
97
98    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99    pub async fn create_all_in_op(
100        &self,
101        db: &mut LedgerOperation<'_>,
102        new_account_sets: Vec<NewAccountSet>,
103    ) -> Result<Vec<AccountSet>, AccountSetError> {
104        let mut new_accounts = Vec::new();
105        for new_account_set in new_account_sets.iter() {
106            let new_account = NewAccount::builder()
107                .id(uuid::Uuid::from(new_account_set.id))
108                .name(String::new())
109                .code(new_account_set.id.to_string())
110                .normal_balance_type(new_account_set.normal_balance_type)
111                .is_account_set(true)
112                .build()
113                .expect("Failed to build account");
114            new_accounts.push(new_account);
115        }
116        self.accounts.create_all_in_op(db, new_accounts).await?;
117        let account_sets = self
118            .repo
119            .create_all_in_op(db.op(), new_account_sets)
120            .await?;
121        db.accumulate(
122            account_sets
123                .iter()
124                .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
125        );
126        Ok(account_sets)
127    }
128
129    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
130    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
131        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
132        self.persist_in_op(&mut op, account_set).await?;
133        op.commit().await?;
134        Ok(())
135    }
136
137    #[instrument(
138        name = "cala_ledger.account_sets.persist_in_op",
139        skip(self, db, account_set)
140    )]
141    pub async fn persist_in_op(
142        &self,
143        db: &mut LedgerOperation<'_>,
144        account_set: &mut AccountSet,
145    ) -> Result<(), AccountSetError> {
146        let n_events = self.repo.update_in_op(db.op(), account_set).await?;
147        db.accumulate(
148            account_set
149                .events
150                .last_persisted(n_events)
151                .map(|p| &p.event),
152        );
153        Ok(())
154    }
155
156    pub async fn add_member(
157        &self,
158        account_set_id: AccountSetId,
159        member: impl Into<AccountSetMemberId>,
160    ) -> Result<AccountSet, AccountSetError> {
161        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
162        let account_set = self
163            .add_member_in_op(&mut op, account_set_id, member)
164            .await?;
165        op.commit().await?;
166        Ok(account_set)
167    }
168
169    pub async fn add_member_in_op(
170        &self,
171        op: &mut LedgerOperation<'_>,
172        account_set_id: AccountSetId,
173        member: impl Into<AccountSetMemberId>,
174    ) -> Result<AccountSet, AccountSetError> {
175        let member = member.into();
176        let (time, parents, account_set, member_id) = match member {
177            AccountSetMemberId::Account(id) => {
178                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
179                let (time, parents) = self
180                    .repo
181                    .add_member_account_and_return_parents(op.tx(), account_set_id, id)
182                    .await?;
183                (time, parents, set, id)
184            }
185            AccountSetMemberId::AccountSet(id) => {
186                let mut accounts = self
187                    .repo
188                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
189                    .await?;
190                let target = accounts
191                    .remove(&account_set_id)
192                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
193                let member = accounts
194                    .remove(&id)
195                    .ok_or(AccountSetError::CouldNotFindById(id))?;
196
197                if target.values().journal_id != member.values().journal_id {
198                    return Err(AccountSetError::JournalIdMismatch);
199                }
200
201                let (time, parents) = self
202                    .repo
203                    .add_member_set_and_return_parents(op.tx(), account_set_id, id)
204                    .await?;
205                (time, parents, target, AccountId::from(id))
206            }
207        };
208
209        op.accumulate(std::iter::once(
210            OutboxEventPayload::AccountSetMemberCreated {
211                source: DataSource::Local,
212                account_set_id,
213                member_id: member,
214            },
215        ));
216
217        let balances = self
218            .balances
219            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
220            .await?;
221
222        let target_account_id = AccountId::from(&account_set.id());
223        let mut entries = Vec::new();
224        for balance in balances.into_values() {
225            entries_for_add_balance(&mut entries, target_account_id, balance);
226        }
227
228        if entries.is_empty() {
229            return Ok(account_set);
230        }
231        let entries = self.entries.create_all_in_op(op, entries).await?;
232        let mappings = std::iter::once((target_account_id, parents)).collect();
233        self.balances
234            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
235            .await?;
236
237        Ok(account_set)
238    }
239
240    pub async fn remove_member(
241        &self,
242        account_set_id: AccountSetId,
243        member: impl Into<AccountSetMemberId>,
244    ) -> Result<AccountSet, AccountSetError> {
245        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
246        let account_set = self
247            .remove_member_in_op(&mut op, account_set_id, member)
248            .await?;
249        op.commit().await?;
250        Ok(account_set)
251    }
252
253    pub async fn remove_member_in_op(
254        &self,
255        op: &mut LedgerOperation<'_>,
256        account_set_id: AccountSetId,
257        member: impl Into<AccountSetMemberId>,
258    ) -> Result<AccountSet, AccountSetError> {
259        let member = member.into();
260        let (time, parents, account_set, member_id) = match member {
261            AccountSetMemberId::Account(id) => {
262                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
263                let (time, parents) = self
264                    .repo
265                    .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
266                    .await?;
267                (time, parents, set, id)
268            }
269            AccountSetMemberId::AccountSet(id) => {
270                let mut accounts = self
271                    .repo
272                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
273                    .await?;
274                let target = accounts
275                    .remove(&account_set_id)
276                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
277                let member = accounts
278                    .remove(&id)
279                    .ok_or(AccountSetError::CouldNotFindById(id))?;
280
281                if target.values().journal_id != member.values().journal_id {
282                    return Err(AccountSetError::JournalIdMismatch);
283                }
284
285                let (time, parents) = self
286                    .repo
287                    .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
288                    .await?;
289                (time, parents, target, AccountId::from(id))
290            }
291        };
292
293        op.accumulate(std::iter::once(
294            OutboxEventPayload::AccountSetMemberRemoved {
295                source: DataSource::Local,
296                account_set_id,
297                member_id: member,
298            },
299        ));
300
301        let balances = self
302            .balances
303            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
304            .await?;
305
306        let target_account_id = AccountId::from(&account_set.id());
307        let mut entries = Vec::new();
308        for balance in balances.into_values() {
309            entries_for_remove_balance(&mut entries, target_account_id, balance);
310        }
311
312        if entries.is_empty() {
313            return Ok(account_set);
314        }
315        let entries = self.entries.create_all_in_op(op, entries).await?;
316        let mappings = std::iter::once((target_account_id, parents)).collect();
317        self.balances
318            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
319            .await?;
320
321        Ok(account_set)
322    }
323
324    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
325    pub async fn find_all<T: From<AccountSet>>(
326        &self,
327        account_set_ids: &[AccountSetId],
328    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
329        self.repo.find_all(account_set_ids).await
330    }
331
332    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
333    pub async fn find_all_in_op<T: From<AccountSet>>(
334        &self,
335        op: &mut LedgerOperation<'_>,
336        account_set_ids: &[AccountSetId],
337    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
338        self.repo.find_all_in_tx(op.tx(), account_set_ids).await
339    }
340
341    #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
342    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
343        self.repo.find_by_id(account_set_id).await
344    }
345
346    #[instrument(
347        name = "cala_ledger.accounts_sets.find_by_external_id",
348        skip(self),
349        err
350    )]
351    pub async fn find_by_external_id(
352        &self,
353        external_id: String,
354    ) -> Result<AccountSet, AccountSetError> {
355        self.repo.find_by_external_id(Some(external_id)).await
356    }
357
358    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
359    pub async fn find_where_member(
360        &self,
361        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
362        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
363    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
364    {
365        match member.into() {
366            AccountSetMemberId::Account(account_id) => {
367                self.repo
368                    .find_where_account_is_member(account_id, query)
369                    .await
370            }
371            AccountSetMemberId::AccountSet(account_set_id) => {
372                self.repo
373                    .find_where_account_set_is_member(account_set_id, query)
374                    .await
375            }
376        }
377    }
378
379    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
380    pub async fn list_for_name(
381        &self,
382        name: String,
383        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
384    ) -> Result<
385        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
386        AccountSetError,
387    > {
388        self.repo
389            .list_for_name_by_created_at(name, args, Default::default())
390            .await
391    }
392
393    #[instrument(
394        name = "cala_ledger.account_sets.list_for_name_in_op",
395        skip(self, op),
396        err
397    )]
398    pub async fn list_for_name_in_op(
399        &self,
400        op: &mut LedgerOperation<'_>,
401        name: String,
402        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
403    ) -> Result<
404        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
405        AccountSetError,
406    > {
407        self.repo
408            .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
409            .await
410    }
411
412    #[instrument(
413        name = "cala_ledger.account_sets.find_where_member_in_op",
414        skip(self, op),
415        err
416    )]
417    pub async fn find_where_member_in_op(
418        &self,
419        op: &mut LedgerOperation<'_>,
420        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
421        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
422    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
423    {
424        match member.into() {
425            AccountSetMemberId::Account(account_id) => {
426                self.repo
427                    .find_where_account_is_member_in_tx(op.tx(), account_id, query)
428                    .await
429            }
430            AccountSetMemberId::AccountSet(account_set_id) => {
431                self.repo
432                    .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
433                    .await
434            }
435        }
436    }
437
438    pub async fn list_members(
439        &self,
440        id: AccountSetId,
441        args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
442    ) -> Result<
443        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
444        AccountSetError,
445    > {
446        self.repo.list_children(id, args).await
447    }
448
449    pub async fn list_members_in_op(
450        &self,
451        op: &mut LedgerOperation<'_>,
452        id: AccountSetId,
453        args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
454    ) -> Result<
455        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
456        AccountSetError,
457    > {
458        self.repo.list_children_in_tx(op.tx(), id, args).await
459    }
460
461    pub(crate) async fn fetch_mappings(
462        &self,
463        journal_id: JournalId,
464        account_ids: &[AccountId],
465    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
466        self.repo.fetch_mappings(journal_id, account_ids).await
467    }
468
469    #[cfg(feature = "import")]
470    pub async fn sync_account_set_creation(
471        &self,
472        mut db: es_entity::DbOp<'_>,
473        origin: DataSourceId,
474        values: AccountSetValues,
475    ) -> Result<(), AccountSetError> {
476        let mut account_set = AccountSet::import(origin, values);
477        self.repo
478            .import_in_op(&mut db, origin, &mut account_set)
479            .await?;
480        let recorded_at = db.now();
481        let outbox_events: Vec<_> = account_set
482            .events
483            .last_persisted(1)
484            .map(|p| OutboxEventPayload::from(&p.event))
485            .collect();
486        self.outbox
487            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
488            .await?;
489        Ok(())
490    }
491
492    #[cfg(feature = "import")]
493    pub async fn sync_account_set_update(
494        &self,
495        mut db: es_entity::DbOp<'_>,
496        values: AccountSetValues,
497        fields: Vec<String>,
498    ) -> Result<(), AccountSetError> {
499        let mut account_set = self.repo.find_by_id(values.id).await?;
500        account_set.update((values, fields));
501        let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
502        let recorded_at = db.now();
503        let outbox_events: Vec<_> = account_set
504            .events
505            .last_persisted(n_events)
506            .map(|p| OutboxEventPayload::from(&p.event))
507            .collect();
508        self.outbox
509            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
510            .await?;
511        Ok(())
512    }
513
514    #[cfg(feature = "import")]
515    pub async fn sync_account_set_member_creation(
516        &self,
517        mut db: es_entity::DbOp<'_>,
518        origin: DataSourceId,
519        account_set_id: AccountSetId,
520        member_id: AccountSetMemberId,
521    ) -> Result<(), AccountSetError> {
522        match member_id {
523            AccountSetMemberId::Account(account_id) => {
524                self.repo
525                    .import_member_account_in_op(&mut db, account_set_id, account_id)
526                    .await?;
527            }
528            AccountSetMemberId::AccountSet(account_set_id) => {
529                self.repo
530                    .import_member_set_in_op(&mut db, account_set_id, account_set_id)
531                    .await?;
532            }
533        }
534        let recorded_at = db.now();
535        self.outbox
536            .persist_events_at(
537                db.into_tx(),
538                std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
539                    source: DataSource::Remote { id: origin },
540                    account_set_id,
541                    member_id,
542                }),
543                recorded_at,
544            )
545            .await?;
546        Ok(())
547    }
548
549    #[cfg(feature = "import")]
550    pub async fn sync_account_set_member_removal(
551        &self,
552        mut db: es_entity::DbOp<'_>,
553        origin: DataSourceId,
554        account_set_id: AccountSetId,
555        member_id: AccountSetMemberId,
556    ) -> Result<(), AccountSetError> {
557        match member_id {
558            AccountSetMemberId::Account(account_id) => {
559                self.repo
560                    .import_remove_member_account(db.tx(), account_set_id, account_id)
561                    .await?;
562            }
563            AccountSetMemberId::AccountSet(account_set_id) => {
564                self.repo
565                    .import_remove_member_set(db.tx(), account_set_id, account_set_id)
566                    .await?;
567            }
568        }
569        let recorded_at = db.now();
570        self.outbox
571            .persist_events_at(
572                db.into_tx(),
573                std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
574                    source: DataSource::Remote { id: origin },
575                    account_set_id,
576                    member_id,
577                }),
578                recorded_at,
579            )
580            .await?;
581        Ok(())
582    }
583}
584
585fn entries_for_add_balance(
586    entries: &mut Vec<NewEntry>,
587    target_account_id: AccountId,
588    balance: BalanceSnapshot,
589) {
590    use rust_decimal::Decimal;
591
592    if balance.settled.cr_balance != Decimal::ZERO {
593        let entry = NewEntry::builder()
594            .id(EntryId::new())
595            .journal_id(balance.journal_id)
596            .account_id(target_account_id)
597            .currency(balance.currency)
598            .sequence(1u32)
599            .layer(Layer::Settled)
600            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
601            .direction(DebitOrCredit::Credit)
602            .units(balance.settled.cr_balance)
603            .transaction_id(UNASSIGNED_TRANSACTION_ID)
604            .build()
605            .expect("Couldn't build entry");
606        entries.push(entry);
607    }
608    if balance.settled.dr_balance != Decimal::ZERO {
609        let entry = NewEntry::builder()
610            .id(EntryId::new())
611            .journal_id(balance.journal_id)
612            .account_id(target_account_id)
613            .currency(balance.currency)
614            .sequence(1u32)
615            .layer(Layer::Settled)
616            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
617            .direction(DebitOrCredit::Debit)
618            .units(balance.settled.dr_balance)
619            .transaction_id(UNASSIGNED_TRANSACTION_ID)
620            .build()
621            .expect("Couldn't build entry");
622        entries.push(entry);
623    }
624    if balance.pending.cr_balance != Decimal::ZERO {
625        let entry = NewEntry::builder()
626            .id(EntryId::new())
627            .journal_id(balance.journal_id)
628            .account_id(target_account_id)
629            .currency(balance.currency)
630            .sequence(1u32)
631            .layer(Layer::Pending)
632            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
633            .direction(DebitOrCredit::Credit)
634            .units(balance.pending.cr_balance)
635            .transaction_id(UNASSIGNED_TRANSACTION_ID)
636            .build()
637            .expect("Couldn't build entry");
638        entries.push(entry);
639    }
640    if balance.pending.dr_balance != Decimal::ZERO {
641        let entry = NewEntry::builder()
642            .id(EntryId::new())
643            .journal_id(balance.journal_id)
644            .account_id(target_account_id)
645            .currency(balance.currency)
646            .sequence(1u32)
647            .layer(Layer::Pending)
648            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
649            .direction(DebitOrCredit::Debit)
650            .units(balance.pending.dr_balance)
651            .transaction_id(UNASSIGNED_TRANSACTION_ID)
652            .build()
653            .expect("Couldn't build entry");
654        entries.push(entry);
655    }
656    if balance.encumbrance.cr_balance != Decimal::ZERO {
657        let entry = NewEntry::builder()
658            .id(EntryId::new())
659            .journal_id(balance.journal_id)
660            .account_id(target_account_id)
661            .currency(balance.currency)
662            .sequence(1u32)
663            .layer(Layer::Encumbrance)
664            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
665            .direction(DebitOrCredit::Credit)
666            .units(balance.encumbrance.cr_balance)
667            .transaction_id(UNASSIGNED_TRANSACTION_ID)
668            .build()
669            .expect("Couldn't build entry");
670        entries.push(entry);
671    }
672    if balance.encumbrance.dr_balance != Decimal::ZERO {
673        let entry = NewEntry::builder()
674            .id(EntryId::new())
675            .journal_id(balance.journal_id)
676            .account_id(target_account_id)
677            .currency(balance.currency)
678            .sequence(1u32)
679            .layer(Layer::Encumbrance)
680            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
681            .direction(DebitOrCredit::Debit)
682            .units(balance.encumbrance.dr_balance)
683            .transaction_id(UNASSIGNED_TRANSACTION_ID)
684            .build()
685            .expect("Couldn't build entry");
686        entries.push(entry);
687    }
688}
689
690fn entries_for_remove_balance(
691    entries: &mut Vec<NewEntry>,
692    target_account_id: AccountId,
693    balance: BalanceSnapshot,
694) {
695    use rust_decimal::Decimal;
696
697    if balance.settled.cr_balance != Decimal::ZERO {
698        let entry = NewEntry::builder()
699            .id(EntryId::new())
700            .journal_id(balance.journal_id)
701            .account_id(target_account_id)
702            .currency(balance.currency)
703            .sequence(1u32)
704            .layer(Layer::Settled)
705            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
706            .direction(DebitOrCredit::Debit)
707            .units(balance.settled.cr_balance)
708            .transaction_id(UNASSIGNED_TRANSACTION_ID)
709            .build()
710            .expect("Couldn't build entry");
711        entries.push(entry);
712    }
713    if balance.settled.dr_balance != Decimal::ZERO {
714        let entry = NewEntry::builder()
715            .id(EntryId::new())
716            .journal_id(balance.journal_id)
717            .account_id(target_account_id)
718            .currency(balance.currency)
719            .sequence(1u32)
720            .layer(Layer::Settled)
721            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
722            .direction(DebitOrCredit::Credit)
723            .units(balance.settled.dr_balance)
724            .transaction_id(UNASSIGNED_TRANSACTION_ID)
725            .build()
726            .expect("Couldn't build entry");
727        entries.push(entry);
728    }
729    if balance.pending.cr_balance != Decimal::ZERO {
730        let entry = NewEntry::builder()
731            .id(EntryId::new())
732            .journal_id(balance.journal_id)
733            .account_id(target_account_id)
734            .currency(balance.currency)
735            .sequence(1u32)
736            .layer(Layer::Pending)
737            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
738            .direction(DebitOrCredit::Debit)
739            .units(balance.pending.cr_balance)
740            .transaction_id(UNASSIGNED_TRANSACTION_ID)
741            .build()
742            .expect("Couldn't build entry");
743        entries.push(entry);
744    }
745    if balance.pending.dr_balance != Decimal::ZERO {
746        let entry = NewEntry::builder()
747            .id(EntryId::new())
748            .journal_id(balance.journal_id)
749            .account_id(target_account_id)
750            .currency(balance.currency)
751            .sequence(1u32)
752            .layer(Layer::Pending)
753            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
754            .direction(DebitOrCredit::Credit)
755            .units(balance.pending.dr_balance)
756            .transaction_id(UNASSIGNED_TRANSACTION_ID)
757            .build()
758            .expect("Couldn't build entry");
759        entries.push(entry);
760    }
761    if balance.encumbrance.cr_balance != Decimal::ZERO {
762        let entry = NewEntry::builder()
763            .id(EntryId::new())
764            .journal_id(balance.journal_id)
765            .account_id(target_account_id)
766            .currency(balance.currency)
767            .sequence(1u32)
768            .layer(Layer::Encumbrance)
769            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
770            .direction(DebitOrCredit::Debit)
771            .units(balance.encumbrance.cr_balance)
772            .transaction_id(UNASSIGNED_TRANSACTION_ID)
773            .build()
774            .expect("Couldn't build entry");
775        entries.push(entry);
776    }
777    if balance.encumbrance.dr_balance != Decimal::ZERO {
778        let entry = NewEntry::builder()
779            .id(EntryId::new())
780            .journal_id(balance.journal_id)
781            .account_id(target_account_id)
782            .currency(balance.currency)
783            .sequence(1u32)
784            .layer(Layer::Encumbrance)
785            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
786            .direction(DebitOrCredit::Credit)
787            .units(balance.encumbrance.dr_balance)
788            .transaction_id(UNASSIGNED_TRANSACTION_ID)
789            .build()
790            .expect("Couldn't build entry");
791        entries.push(entry);
792    }
793}
794
795impl From<&AccountSetEvent> for OutboxEventPayload {
796    fn from(event: &AccountSetEvent) -> Self {
797        match event {
798            #[cfg(feature = "import")]
799            AccountSetEvent::Imported {
800                source,
801                values: account_set,
802            } => OutboxEventPayload::AccountSetCreated {
803                source: *source,
804                account_set: account_set.clone(),
805            },
806            AccountSetEvent::Initialized {
807                values: account_set,
808            } => OutboxEventPayload::AccountSetCreated {
809                source: DataSource::Local,
810                account_set: account_set.clone(),
811            },
812            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
813                source: DataSource::Local,
814                account_set: values.clone(),
815                fields: fields.clone(),
816            },
817        }
818    }
819}