cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13    account::*,
14    balance::*,
15    entry::*,
16    ledger_operation::*,
17    outbox::*,
18    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27#[allow(dead_code)]
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32    repo: AccountSetRepo,
33    accounts: Accounts,
34    entries: Entries,
35    balances: Balances,
36    outbox: Outbox,
37    pool: PgPool,
38}
39
40impl AccountSets {
41    pub(crate) fn new(
42        pool: &PgPool,
43        outbox: Outbox,
44        accounts: &Accounts,
45        entries: &Entries,
46        balances: &Balances,
47    ) -> Self {
48        Self {
49            repo: AccountSetRepo::new(pool),
50            outbox,
51            accounts: accounts.clone(),
52            entries: entries.clone(),
53            balances: balances.clone(),
54            pool: pool.clone(),
55        }
56    }
57    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58    pub async fn create(
59        &self,
60        new_account_set: NewAccountSet,
61    ) -> Result<AccountSet, AccountSetError> {
62        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63        let account_set = self.create_in_op(&mut op, new_account_set).await?;
64        op.commit().await?;
65        Ok(account_set)
66    }
67
68    #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69    pub async fn create_in_op(
70        &self,
71        db: &mut LedgerOperation<'_>,
72        new_account_set: NewAccountSet,
73    ) -> Result<AccountSet, AccountSetError> {
74        let new_account = NewAccount::builder()
75            .id(uuid::Uuid::from(new_account_set.id))
76            .name(String::new())
77            .code(new_account_set.id.to_string())
78            .normal_balance_type(new_account_set.normal_balance_type)
79            .is_account_set(true)
80            .build()
81            .expect("Failed to build account");
82        self.accounts.create_in_op(db, new_account).await?;
83        let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84        db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
85        Ok(account_set)
86    }
87
88    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
89    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
90        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
91        self.persist_in_op(&mut op, account_set).await?;
92        op.commit().await?;
93        Ok(())
94    }
95
96    #[instrument(
97        name = "cala_ledger.account_sets.persist_in_op",
98        skip(self, db, account_set)
99    )]
100    pub async fn persist_in_op(
101        &self,
102        db: &mut LedgerOperation<'_>,
103        account_set: &mut AccountSet,
104    ) -> Result<(), AccountSetError> {
105        let n_events = self.repo.update_in_op(db.op(), account_set).await?;
106        db.accumulate(
107            account_set
108                .events
109                .last_persisted(n_events)
110                .map(|p| &p.event),
111        );
112        Ok(())
113    }
114
115    pub async fn add_member(
116        &self,
117        account_set_id: AccountSetId,
118        member: impl Into<AccountSetMemberId>,
119    ) -> Result<AccountSet, AccountSetError> {
120        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
121        let account_set = self
122            .add_member_in_op(&mut op, account_set_id, member)
123            .await?;
124        op.commit().await?;
125        Ok(account_set)
126    }
127
128    pub async fn add_member_in_op(
129        &self,
130        op: &mut LedgerOperation<'_>,
131        account_set_id: AccountSetId,
132        member: impl Into<AccountSetMemberId>,
133    ) -> Result<AccountSet, AccountSetError> {
134        let member = member.into();
135        let (time, parents, account_set, member_id) = match member {
136            AccountSetMemberId::Account(id) => {
137                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
138                let (time, parents) = self
139                    .repo
140                    .add_member_account_and_return_parents(op.tx(), account_set_id, id)
141                    .await?;
142                (time, parents, set, id)
143            }
144            AccountSetMemberId::AccountSet(id) => {
145                let mut accounts = self
146                    .repo
147                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
148                    .await?;
149                let target = accounts
150                    .remove(&account_set_id)
151                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
152                let member = accounts
153                    .remove(&id)
154                    .ok_or(AccountSetError::CouldNotFindById(id))?;
155
156                if target.values().journal_id != member.values().journal_id {
157                    return Err(AccountSetError::JournalIdMismatch);
158                }
159
160                let (time, parents) = self
161                    .repo
162                    .add_member_set_and_return_parents(op.tx(), account_set_id, id)
163                    .await?;
164                (time, parents, target, AccountId::from(id))
165            }
166        };
167
168        op.accumulate(std::iter::once(
169            OutboxEventPayload::AccountSetMemberCreated {
170                source: DataSource::Local,
171                account_set_id,
172                member_id: member,
173            },
174        ));
175
176        let balances = self
177            .balances
178            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
179            .await?;
180
181        let target_account_id = AccountId::from(&account_set.id());
182        let mut entries = Vec::new();
183        for balance in balances.into_values() {
184            entries_for_add_balance(&mut entries, target_account_id, balance);
185        }
186
187        if entries.is_empty() {
188            return Ok(account_set);
189        }
190        let entries = self.entries.create_all_in_op(op, entries).await?;
191        let mappings = std::iter::once((target_account_id, parents)).collect();
192        self.balances
193            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
194            .await?;
195
196        Ok(account_set)
197    }
198
199    pub async fn remove_member(
200        &self,
201        account_set_id: AccountSetId,
202        member: impl Into<AccountSetMemberId>,
203    ) -> Result<AccountSet, AccountSetError> {
204        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
205        let account_set = self
206            .remove_member_in_op(&mut op, account_set_id, member)
207            .await?;
208        op.commit().await?;
209        Ok(account_set)
210    }
211
212    pub async fn remove_member_in_op(
213        &self,
214        op: &mut LedgerOperation<'_>,
215        account_set_id: AccountSetId,
216        member: impl Into<AccountSetMemberId>,
217    ) -> Result<AccountSet, AccountSetError> {
218        let member = member.into();
219        let (time, parents, account_set, member_id) = match member {
220            AccountSetMemberId::Account(id) => {
221                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
222                let (time, parents) = self
223                    .repo
224                    .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
225                    .await?;
226                (time, parents, set, id)
227            }
228            AccountSetMemberId::AccountSet(id) => {
229                let mut accounts = self
230                    .repo
231                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
232                    .await?;
233                let target = accounts
234                    .remove(&account_set_id)
235                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
236                let member = accounts
237                    .remove(&id)
238                    .ok_or(AccountSetError::CouldNotFindById(id))?;
239
240                if target.values().journal_id != member.values().journal_id {
241                    return Err(AccountSetError::JournalIdMismatch);
242                }
243
244                let (time, parents) = self
245                    .repo
246                    .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
247                    .await?;
248                (time, parents, target, AccountId::from(id))
249            }
250        };
251
252        op.accumulate(std::iter::once(
253            OutboxEventPayload::AccountSetMemberRemoved {
254                source: DataSource::Local,
255                account_set_id,
256                member_id: member,
257            },
258        ));
259
260        let balances = self
261            .balances
262            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
263            .await?;
264
265        let target_account_id = AccountId::from(&account_set.id());
266        let mut entries = Vec::new();
267        for balance in balances.into_values() {
268            entries_for_remove_balance(&mut entries, target_account_id, balance);
269        }
270
271        if entries.is_empty() {
272            return Ok(account_set);
273        }
274        let entries = self.entries.create_all_in_op(op, entries).await?;
275        let mappings = std::iter::once((target_account_id, parents)).collect();
276        self.balances
277            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
278            .await?;
279
280        Ok(account_set)
281    }
282
283    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
284    pub async fn find_all<T: From<AccountSet>>(
285        &self,
286        account_set_ids: &[AccountSetId],
287    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
288        self.repo.find_all(account_set_ids).await
289    }
290
291    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
292    pub async fn find_all_in_op<T: From<AccountSet>>(
293        &self,
294        op: &mut LedgerOperation<'_>,
295        account_set_ids: &[AccountSetId],
296    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
297        self.repo.find_all_in_tx(op.tx(), account_set_ids).await
298    }
299
300    #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
301    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
302        self.repo.find_by_id(account_set_id).await
303    }
304
305    #[instrument(
306        name = "cala_ledger.accounts_sets.find_by_external_id",
307        skip(self),
308        err
309    )]
310    pub async fn find_by_external_id(
311        &self,
312        external_id: String,
313    ) -> Result<AccountSet, AccountSetError> {
314        self.repo.find_by_external_id(Some(external_id)).await
315    }
316
317    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
318    pub async fn find_where_member(
319        &self,
320        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
321        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
322    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
323    {
324        match member.into() {
325            AccountSetMemberId::Account(account_id) => {
326                self.repo
327                    .find_where_account_is_member(account_id, query)
328                    .await
329            }
330            AccountSetMemberId::AccountSet(account_set_id) => {
331                self.repo
332                    .find_where_account_set_is_member(account_set_id, query)
333                    .await
334            }
335        }
336    }
337
338    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
339    pub async fn list_for_name(
340        &self,
341        name: String,
342        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
343    ) -> Result<
344        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
345        AccountSetError,
346    > {
347        self.repo
348            .list_for_name_by_created_at(name, args, Default::default())
349            .await
350    }
351
352    #[instrument(
353        name = "cala_ledger.account_sets.list_for_name_in_op",
354        skip(self, op),
355        err
356    )]
357    pub async fn list_for_name_in_op(
358        &self,
359        op: &mut LedgerOperation<'_>,
360        name: String,
361        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
362    ) -> Result<
363        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
364        AccountSetError,
365    > {
366        self.repo
367            .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
368            .await
369    }
370
371    #[instrument(
372        name = "cala_ledger.account_sets.find_where_member_in_op",
373        skip(self, op),
374        err
375    )]
376    pub async fn find_where_member_in_op(
377        &self,
378        op: &mut LedgerOperation<'_>,
379        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
380        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
381    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
382    {
383        match member.into() {
384            AccountSetMemberId::Account(account_id) => {
385                self.repo
386                    .find_where_account_is_member_in_tx(op.tx(), account_id, query)
387                    .await
388            }
389            AccountSetMemberId::AccountSet(account_set_id) => {
390                self.repo
391                    .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
392                    .await
393            }
394        }
395    }
396
397    pub async fn list_members(
398        &self,
399        id: AccountSetId,
400        args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
401    ) -> Result<
402        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
403        AccountSetError,
404    > {
405        self.repo.list_children(id, args).await
406    }
407
408    pub async fn list_members_in_op(
409        &self,
410        op: &mut LedgerOperation<'_>,
411        id: AccountSetId,
412        args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
413    ) -> Result<
414        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
415        AccountSetError,
416    > {
417        self.repo.list_children_in_tx(op.tx(), id, args).await
418    }
419
420    pub(crate) async fn fetch_mappings(
421        &self,
422        journal_id: JournalId,
423        account_ids: &[AccountId],
424    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
425        self.repo.fetch_mappings(journal_id, account_ids).await
426    }
427
428    #[cfg(feature = "import")]
429    pub async fn sync_account_set_creation(
430        &self,
431        mut db: es_entity::DbOp<'_>,
432        origin: DataSourceId,
433        values: AccountSetValues,
434    ) -> Result<(), AccountSetError> {
435        let mut account_set = AccountSet::import(origin, values);
436        self.repo
437            .import_in_op(&mut db, origin, &mut account_set)
438            .await?;
439        let recorded_at = db.now();
440        let outbox_events: Vec<_> = account_set
441            .events
442            .last_persisted(1)
443            .map(|p| OutboxEventPayload::from(&p.event))
444            .collect();
445        self.outbox
446            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
447            .await?;
448        Ok(())
449    }
450
451    #[cfg(feature = "import")]
452    pub async fn sync_account_set_update(
453        &self,
454        mut db: es_entity::DbOp<'_>,
455        values: AccountSetValues,
456        fields: Vec<String>,
457    ) -> Result<(), AccountSetError> {
458        let mut account_set = self.repo.find_by_id(values.id).await?;
459        account_set.update((values, fields));
460        let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
461        let recorded_at = db.now();
462        let outbox_events: Vec<_> = account_set
463            .events
464            .last_persisted(n_events)
465            .map(|p| OutboxEventPayload::from(&p.event))
466            .collect();
467        self.outbox
468            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
469            .await?;
470        Ok(())
471    }
472
473    #[cfg(feature = "import")]
474    pub async fn sync_account_set_member_creation(
475        &self,
476        mut db: es_entity::DbOp<'_>,
477        origin: DataSourceId,
478        account_set_id: AccountSetId,
479        member_id: AccountSetMemberId,
480    ) -> Result<(), AccountSetError> {
481        match member_id {
482            AccountSetMemberId::Account(account_id) => {
483                self.repo
484                    .import_member_account_in_op(&mut db, account_set_id, account_id)
485                    .await?;
486            }
487            AccountSetMemberId::AccountSet(account_set_id) => {
488                self.repo
489                    .import_member_set_in_op(&mut db, account_set_id, account_set_id)
490                    .await?;
491            }
492        }
493        let recorded_at = db.now();
494        self.outbox
495            .persist_events_at(
496                db.into_tx(),
497                std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
498                    source: DataSource::Remote { id: origin },
499                    account_set_id,
500                    member_id,
501                }),
502                recorded_at,
503            )
504            .await?;
505        Ok(())
506    }
507
508    #[cfg(feature = "import")]
509    pub async fn sync_account_set_member_removal(
510        &self,
511        mut db: es_entity::DbOp<'_>,
512        origin: DataSourceId,
513        account_set_id: AccountSetId,
514        member_id: AccountSetMemberId,
515    ) -> Result<(), AccountSetError> {
516        match member_id {
517            AccountSetMemberId::Account(account_id) => {
518                self.repo
519                    .import_remove_member_account(db.tx(), account_set_id, account_id)
520                    .await?;
521            }
522            AccountSetMemberId::AccountSet(account_set_id) => {
523                self.repo
524                    .import_remove_member_set(db.tx(), account_set_id, account_set_id)
525                    .await?;
526            }
527        }
528        let recorded_at = db.now();
529        self.outbox
530            .persist_events_at(
531                db.into_tx(),
532                std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
533                    source: DataSource::Remote { id: origin },
534                    account_set_id,
535                    member_id,
536                }),
537                recorded_at,
538            )
539            .await?;
540        Ok(())
541    }
542}
543
544fn entries_for_add_balance(
545    entries: &mut Vec<NewEntry>,
546    target_account_id: AccountId,
547    balance: BalanceSnapshot,
548) {
549    use rust_decimal::Decimal;
550
551    if balance.settled.cr_balance != Decimal::ZERO {
552        let entry = NewEntry::builder()
553            .id(EntryId::new())
554            .journal_id(balance.journal_id)
555            .account_id(target_account_id)
556            .currency(balance.currency)
557            .sequence(1u32)
558            .layer(Layer::Settled)
559            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
560            .direction(DebitOrCredit::Credit)
561            .units(balance.settled.cr_balance)
562            .transaction_id(UNASSIGNED_TRANSACTION_ID)
563            .build()
564            .expect("Couldn't build entry");
565        entries.push(entry);
566    }
567    if balance.settled.dr_balance != Decimal::ZERO {
568        let entry = NewEntry::builder()
569            .id(EntryId::new())
570            .journal_id(balance.journal_id)
571            .account_id(target_account_id)
572            .currency(balance.currency)
573            .sequence(1u32)
574            .layer(Layer::Settled)
575            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
576            .direction(DebitOrCredit::Debit)
577            .units(balance.settled.dr_balance)
578            .transaction_id(UNASSIGNED_TRANSACTION_ID)
579            .build()
580            .expect("Couldn't build entry");
581        entries.push(entry);
582    }
583    if balance.pending.cr_balance != Decimal::ZERO {
584        let entry = NewEntry::builder()
585            .id(EntryId::new())
586            .journal_id(balance.journal_id)
587            .account_id(target_account_id)
588            .currency(balance.currency)
589            .sequence(1u32)
590            .layer(Layer::Pending)
591            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
592            .direction(DebitOrCredit::Credit)
593            .units(balance.pending.cr_balance)
594            .transaction_id(UNASSIGNED_TRANSACTION_ID)
595            .build()
596            .expect("Couldn't build entry");
597        entries.push(entry);
598    }
599    if balance.pending.dr_balance != Decimal::ZERO {
600        let entry = NewEntry::builder()
601            .id(EntryId::new())
602            .journal_id(balance.journal_id)
603            .account_id(target_account_id)
604            .currency(balance.currency)
605            .sequence(1u32)
606            .layer(Layer::Pending)
607            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
608            .direction(DebitOrCredit::Debit)
609            .units(balance.pending.dr_balance)
610            .transaction_id(UNASSIGNED_TRANSACTION_ID)
611            .build()
612            .expect("Couldn't build entry");
613        entries.push(entry);
614    }
615    if balance.encumbrance.cr_balance != Decimal::ZERO {
616        let entry = NewEntry::builder()
617            .id(EntryId::new())
618            .journal_id(balance.journal_id)
619            .account_id(target_account_id)
620            .currency(balance.currency)
621            .sequence(1u32)
622            .layer(Layer::Encumbrance)
623            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
624            .direction(DebitOrCredit::Credit)
625            .units(balance.encumbrance.cr_balance)
626            .transaction_id(UNASSIGNED_TRANSACTION_ID)
627            .build()
628            .expect("Couldn't build entry");
629        entries.push(entry);
630    }
631    if balance.encumbrance.dr_balance != Decimal::ZERO {
632        let entry = NewEntry::builder()
633            .id(EntryId::new())
634            .journal_id(balance.journal_id)
635            .account_id(target_account_id)
636            .currency(balance.currency)
637            .sequence(1u32)
638            .layer(Layer::Encumbrance)
639            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
640            .direction(DebitOrCredit::Debit)
641            .units(balance.encumbrance.dr_balance)
642            .transaction_id(UNASSIGNED_TRANSACTION_ID)
643            .build()
644            .expect("Couldn't build entry");
645        entries.push(entry);
646    }
647}
648
649fn entries_for_remove_balance(
650    entries: &mut Vec<NewEntry>,
651    target_account_id: AccountId,
652    balance: BalanceSnapshot,
653) {
654    use rust_decimal::Decimal;
655
656    if balance.settled.cr_balance != Decimal::ZERO {
657        let entry = NewEntry::builder()
658            .id(EntryId::new())
659            .journal_id(balance.journal_id)
660            .account_id(target_account_id)
661            .currency(balance.currency)
662            .sequence(1u32)
663            .layer(Layer::Settled)
664            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
665            .direction(DebitOrCredit::Debit)
666            .units(balance.settled.cr_balance)
667            .transaction_id(UNASSIGNED_TRANSACTION_ID)
668            .build()
669            .expect("Couldn't build entry");
670        entries.push(entry);
671    }
672    if balance.settled.dr_balance != Decimal::ZERO {
673        let entry = NewEntry::builder()
674            .id(EntryId::new())
675            .journal_id(balance.journal_id)
676            .account_id(target_account_id)
677            .currency(balance.currency)
678            .sequence(1u32)
679            .layer(Layer::Settled)
680            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
681            .direction(DebitOrCredit::Credit)
682            .units(balance.settled.dr_balance)
683            .transaction_id(UNASSIGNED_TRANSACTION_ID)
684            .build()
685            .expect("Couldn't build entry");
686        entries.push(entry);
687    }
688    if balance.pending.cr_balance != Decimal::ZERO {
689        let entry = NewEntry::builder()
690            .id(EntryId::new())
691            .journal_id(balance.journal_id)
692            .account_id(target_account_id)
693            .currency(balance.currency)
694            .sequence(1u32)
695            .layer(Layer::Pending)
696            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
697            .direction(DebitOrCredit::Debit)
698            .units(balance.pending.cr_balance)
699            .transaction_id(UNASSIGNED_TRANSACTION_ID)
700            .build()
701            .expect("Couldn't build entry");
702        entries.push(entry);
703    }
704    if balance.pending.dr_balance != Decimal::ZERO {
705        let entry = NewEntry::builder()
706            .id(EntryId::new())
707            .journal_id(balance.journal_id)
708            .account_id(target_account_id)
709            .currency(balance.currency)
710            .sequence(1u32)
711            .layer(Layer::Pending)
712            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
713            .direction(DebitOrCredit::Credit)
714            .units(balance.pending.dr_balance)
715            .transaction_id(UNASSIGNED_TRANSACTION_ID)
716            .build()
717            .expect("Couldn't build entry");
718        entries.push(entry);
719    }
720    if balance.encumbrance.cr_balance != Decimal::ZERO {
721        let entry = NewEntry::builder()
722            .id(EntryId::new())
723            .journal_id(balance.journal_id)
724            .account_id(target_account_id)
725            .currency(balance.currency)
726            .sequence(1u32)
727            .layer(Layer::Encumbrance)
728            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
729            .direction(DebitOrCredit::Debit)
730            .units(balance.encumbrance.cr_balance)
731            .transaction_id(UNASSIGNED_TRANSACTION_ID)
732            .build()
733            .expect("Couldn't build entry");
734        entries.push(entry);
735    }
736    if balance.encumbrance.dr_balance != Decimal::ZERO {
737        let entry = NewEntry::builder()
738            .id(EntryId::new())
739            .journal_id(balance.journal_id)
740            .account_id(target_account_id)
741            .currency(balance.currency)
742            .sequence(1u32)
743            .layer(Layer::Encumbrance)
744            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
745            .direction(DebitOrCredit::Credit)
746            .units(balance.encumbrance.dr_balance)
747            .transaction_id(UNASSIGNED_TRANSACTION_ID)
748            .build()
749            .expect("Couldn't build entry");
750        entries.push(entry);
751    }
752}
753
754impl From<&AccountSetEvent> for OutboxEventPayload {
755    fn from(event: &AccountSetEvent) -> Self {
756        match event {
757            #[cfg(feature = "import")]
758            AccountSetEvent::Imported {
759                source,
760                values: account_set,
761            } => OutboxEventPayload::AccountSetCreated {
762                source: *source,
763                account_set: account_set.clone(),
764            },
765            AccountSetEvent::Initialized {
766                values: account_set,
767            } => OutboxEventPayload::AccountSetCreated {
768                source: DataSource::Local,
769                account_set: account_set.clone(),
770            },
771            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
772                source: DataSource::Local,
773                account_set: values.clone(),
774                fields: fields.clone(),
775            },
776        }
777    }
778}