cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13    account::*,
14    balance::*,
15    entry::*,
16    ledger_operation::*,
17    outbox::*,
18    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
28
29#[derive(Clone)]
30pub struct AccountSets {
31    repo: AccountSetRepo,
32    accounts: Accounts,
33    entries: Entries,
34    balances: Balances,
35    outbox: Outbox,
36    pool: PgPool,
37}
38
39impl AccountSets {
40    pub(crate) fn new(
41        pool: &PgPool,
42        outbox: Outbox,
43        accounts: &Accounts,
44        entries: &Entries,
45        balances: &Balances,
46    ) -> Self {
47        Self {
48            repo: AccountSetRepo::new(pool),
49            outbox,
50            accounts: accounts.clone(),
51            entries: entries.clone(),
52            balances: balances.clone(),
53            pool: pool.clone(),
54        }
55    }
56    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
57    pub async fn create(
58        &self,
59        new_account_set: NewAccountSet,
60    ) -> Result<AccountSet, AccountSetError> {
61        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
62        let account_set = self.create_in_op(&mut op, new_account_set).await?;
63        op.commit().await?;
64        Ok(account_set)
65    }
66
67    #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
68    pub async fn create_in_op(
69        &self,
70        db: &mut LedgerOperation<'_>,
71        new_account_set: NewAccountSet,
72    ) -> Result<AccountSet, AccountSetError> {
73        let new_account = NewAccount::builder()
74            .id(uuid::Uuid::from(new_account_set.id))
75            .name(String::new())
76            .code(new_account_set.id.to_string())
77            .normal_balance_type(new_account_set.normal_balance_type)
78            .is_account_set(true)
79            .build()
80            .expect("Failed to build account");
81        self.accounts.create_in_op(db, new_account).await?;
82        let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
83        db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
84        Ok(account_set)
85    }
86
87    pub async fn create_all(
88        &self,
89        new_account_sets: Vec<NewAccountSet>,
90    ) -> Result<Vec<AccountSet>, AccountSetError> {
91        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
92        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
93        op.commit().await?;
94        Ok(account_sets)
95    }
96
97    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
98    pub async fn create_all_in_op(
99        &self,
100        db: &mut LedgerOperation<'_>,
101        new_account_sets: Vec<NewAccountSet>,
102    ) -> Result<Vec<AccountSet>, AccountSetError> {
103        let mut new_accounts = Vec::new();
104        for new_account_set in new_account_sets.iter() {
105            let new_account = NewAccount::builder()
106                .id(uuid::Uuid::from(new_account_set.id))
107                .name(String::new())
108                .code(new_account_set.id.to_string())
109                .normal_balance_type(new_account_set.normal_balance_type)
110                .is_account_set(true)
111                .build()
112                .expect("Failed to build account");
113            new_accounts.push(new_account);
114        }
115        self.accounts.create_all_in_op(db, new_accounts).await?;
116        let account_sets = self
117            .repo
118            .create_all_in_op(db.op(), new_account_sets)
119            .await?;
120        db.accumulate(
121            account_sets
122                .iter()
123                .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
124        );
125        Ok(account_sets)
126    }
127
128    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
129    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
130        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
131        self.persist_in_op(&mut op, account_set).await?;
132        op.commit().await?;
133        Ok(())
134    }
135
136    #[instrument(
137        name = "cala_ledger.account_sets.persist_in_op",
138        skip(self, db, account_set)
139    )]
140    pub async fn persist_in_op(
141        &self,
142        db: &mut LedgerOperation<'_>,
143        account_set: &mut AccountSet,
144    ) -> Result<(), AccountSetError> {
145        let n_events = self.repo.update_in_op(db.op(), account_set).await?;
146        db.accumulate(
147            account_set
148                .events
149                .last_persisted(n_events)
150                .map(|p| &p.event),
151        );
152        Ok(())
153    }
154
155    pub async fn add_member(
156        &self,
157        account_set_id: AccountSetId,
158        member: impl Into<AccountSetMemberId>,
159    ) -> Result<AccountSet, AccountSetError> {
160        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
161        let account_set = self
162            .add_member_in_op(&mut op, account_set_id, member)
163            .await?;
164        op.commit().await?;
165        Ok(account_set)
166    }
167
168    pub async fn add_member_in_op(
169        &self,
170        op: &mut LedgerOperation<'_>,
171        account_set_id: AccountSetId,
172        member: impl Into<AccountSetMemberId>,
173    ) -> Result<AccountSet, AccountSetError> {
174        let member = member.into();
175        let (time, parents, account_set, member_id) = match member {
176            AccountSetMemberId::Account(id) => {
177                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
178                let (time, parents) = self
179                    .repo
180                    .add_member_account_and_return_parents(op.tx(), account_set_id, id)
181                    .await?;
182                (time, parents, set, id)
183            }
184            AccountSetMemberId::AccountSet(id) => {
185                let mut accounts = self
186                    .repo
187                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
188                    .await?;
189                let target = accounts
190                    .remove(&account_set_id)
191                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
192                let member = accounts
193                    .remove(&id)
194                    .ok_or(AccountSetError::CouldNotFindById(id))?;
195
196                if target.values().journal_id != member.values().journal_id {
197                    return Err(AccountSetError::JournalIdMismatch);
198                }
199
200                let (time, parents) = self
201                    .repo
202                    .add_member_set_and_return_parents(op.tx(), account_set_id, id)
203                    .await?;
204                (time, parents, target, AccountId::from(id))
205            }
206        };
207
208        op.accumulate(std::iter::once(
209            OutboxEventPayload::AccountSetMemberCreated {
210                source: DataSource::Local,
211                account_set_id,
212                member_id: member,
213            },
214        ));
215
216        let balances = self
217            .balances
218            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
219            .await?;
220
221        let target_account_id = AccountId::from(&account_set.id());
222        let mut entries = Vec::new();
223        for balance in balances.into_values() {
224            entries_for_add_balance(&mut entries, target_account_id, balance);
225        }
226
227        if entries.is_empty() {
228            return Ok(account_set);
229        }
230        let entries = self.entries.create_all_in_op(op, entries).await?;
231        let mappings = std::iter::once((target_account_id, parents)).collect();
232        self.balances
233            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
234            .await?;
235
236        Ok(account_set)
237    }
238
239    pub async fn remove_member(
240        &self,
241        account_set_id: AccountSetId,
242        member: impl Into<AccountSetMemberId>,
243    ) -> Result<AccountSet, AccountSetError> {
244        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
245        let account_set = self
246            .remove_member_in_op(&mut op, account_set_id, member)
247            .await?;
248        op.commit().await?;
249        Ok(account_set)
250    }
251
252    pub async fn remove_member_in_op(
253        &self,
254        op: &mut LedgerOperation<'_>,
255        account_set_id: AccountSetId,
256        member: impl Into<AccountSetMemberId>,
257    ) -> Result<AccountSet, AccountSetError> {
258        let member = member.into();
259        let (time, parents, account_set, member_id) = match member {
260            AccountSetMemberId::Account(id) => {
261                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
262                let (time, parents) = self
263                    .repo
264                    .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
265                    .await?;
266                (time, parents, set, id)
267            }
268            AccountSetMemberId::AccountSet(id) => {
269                let mut accounts = self
270                    .repo
271                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
272                    .await?;
273                let target = accounts
274                    .remove(&account_set_id)
275                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
276                let member = accounts
277                    .remove(&id)
278                    .ok_or(AccountSetError::CouldNotFindById(id))?;
279
280                if target.values().journal_id != member.values().journal_id {
281                    return Err(AccountSetError::JournalIdMismatch);
282                }
283
284                let (time, parents) = self
285                    .repo
286                    .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
287                    .await?;
288                (time, parents, target, AccountId::from(id))
289            }
290        };
291
292        op.accumulate(std::iter::once(
293            OutboxEventPayload::AccountSetMemberRemoved {
294                source: DataSource::Local,
295                account_set_id,
296                member_id: member,
297            },
298        ));
299
300        let balances = self
301            .balances
302            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
303            .await?;
304
305        let target_account_id = AccountId::from(&account_set.id());
306        let mut entries = Vec::new();
307        for balance in balances.into_values() {
308            entries_for_remove_balance(&mut entries, target_account_id, balance);
309        }
310
311        if entries.is_empty() {
312            return Ok(account_set);
313        }
314        let entries = self.entries.create_all_in_op(op, entries).await?;
315        let mappings = std::iter::once((target_account_id, parents)).collect();
316        self.balances
317            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
318            .await?;
319
320        Ok(account_set)
321    }
322
323    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
324    pub async fn find_all<T: From<AccountSet>>(
325        &self,
326        account_set_ids: &[AccountSetId],
327    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
328        self.repo.find_all(account_set_ids).await
329    }
330
331    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
332    pub async fn find_all_in_op<T: From<AccountSet>>(
333        &self,
334        op: &mut LedgerOperation<'_>,
335        account_set_ids: &[AccountSetId],
336    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
337        self.repo.find_all_in_tx(op.tx(), account_set_ids).await
338    }
339
340    #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
341    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
342        self.repo.find_by_id(account_set_id).await
343    }
344
345    #[instrument(
346        name = "cala_ledger.accounts_sets.find_by_external_id",
347        skip(self),
348        err
349    )]
350    pub async fn find_by_external_id(
351        &self,
352        external_id: String,
353    ) -> Result<AccountSet, AccountSetError> {
354        self.repo.find_by_external_id(Some(external_id)).await
355    }
356
357    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
358    pub async fn find_where_member(
359        &self,
360        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
361        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
362    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
363    {
364        match member.into() {
365            AccountSetMemberId::Account(account_id) => {
366                self.repo
367                    .find_where_account_is_member(account_id, query)
368                    .await
369            }
370            AccountSetMemberId::AccountSet(account_set_id) => {
371                self.repo
372                    .find_where_account_set_is_member(account_set_id, query)
373                    .await
374            }
375        }
376    }
377
378    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
379    pub async fn list_for_name(
380        &self,
381        name: String,
382        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
383    ) -> Result<
384        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
385        AccountSetError,
386    > {
387        self.repo
388            .list_for_name_by_created_at(name, args, Default::default())
389            .await
390    }
391
392    #[instrument(
393        name = "cala_ledger.account_sets.list_for_name_in_op",
394        skip(self, op),
395        err
396    )]
397    pub async fn list_for_name_in_op(
398        &self,
399        op: &mut LedgerOperation<'_>,
400        name: String,
401        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
402    ) -> Result<
403        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
404        AccountSetError,
405    > {
406        self.repo
407            .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
408            .await
409    }
410
411    #[instrument(
412        name = "cala_ledger.account_sets.find_where_member_in_op",
413        skip(self, op),
414        err
415    )]
416    pub async fn find_where_member_in_op(
417        &self,
418        op: &mut LedgerOperation<'_>,
419        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
420        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
421    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
422    {
423        match member.into() {
424            AccountSetMemberId::Account(account_id) => {
425                self.repo
426                    .find_where_account_is_member_in_tx(op.tx(), account_id, query)
427                    .await
428            }
429            AccountSetMemberId::AccountSet(account_set_id) => {
430                self.repo
431                    .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
432                    .await
433            }
434        }
435    }
436
437    pub async fn list_members_by_created_at(
438        &self,
439        id: AccountSetId,
440        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
441    ) -> Result<
442        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
443        AccountSetError,
444    > {
445        self.repo.list_children_by_created_at(id, args).await
446    }
447
448    pub async fn list_members_created_at_in_op(
449        &self,
450        op: &mut LedgerOperation<'_>,
451        id: AccountSetId,
452        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
453    ) -> Result<
454        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
455        AccountSetError,
456    > {
457        self.repo
458            .list_children_by_created_at_in_tx(op.tx(), id, args)
459            .await
460    }
461
462    pub async fn list_members_by_external_id(
463        &self,
464        id: AccountSetId,
465        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
466    ) -> Result<
467        es_entity::PaginatedQueryRet<
468            AccountSetMemberByExternalId,
469            AccountSetMembersByExternalIdCursor,
470        >,
471        AccountSetError,
472    > {
473        self.repo.list_children_by_external_id(id, args).await
474    }
475
476    pub async fn list_members_by_external_id_in_op(
477        &self,
478        op: &mut LedgerOperation<'_>,
479        id: AccountSetId,
480        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
481    ) -> Result<
482        es_entity::PaginatedQueryRet<
483            AccountSetMemberByExternalId,
484            AccountSetMembersByExternalIdCursor,
485        >,
486        AccountSetError,
487    > {
488        self.repo
489            .list_children_by_external_id_in_tx(op.tx(), id, args)
490            .await
491    }
492
493    pub(crate) async fn fetch_mappings(
494        &self,
495        journal_id: JournalId,
496        account_ids: &[AccountId],
497    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
498        self.repo.fetch_mappings(journal_id, account_ids).await
499    }
500
501    #[cfg(feature = "import")]
502    pub(crate) async fn sync_account_set_creation(
503        &self,
504        mut db: es_entity::DbOp<'_>,
505        origin: DataSourceId,
506        values: AccountSetValues,
507    ) -> Result<(), AccountSetError> {
508        let mut account_set = AccountSet::import(origin, values);
509        self.repo
510            .import_in_op(&mut db, origin, &mut account_set)
511            .await?;
512        let recorded_at = db.now();
513        let outbox_events: Vec<_> = account_set
514            .events
515            .last_persisted(1)
516            .map(|p| OutboxEventPayload::from(&p.event))
517            .collect();
518        self.outbox
519            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
520            .await?;
521        Ok(())
522    }
523
524    #[cfg(feature = "import")]
525    pub(crate) async fn sync_account_set_update(
526        &self,
527        mut db: es_entity::DbOp<'_>,
528        values: AccountSetValues,
529        fields: Vec<String>,
530    ) -> Result<(), AccountSetError> {
531        let mut account_set = self.repo.find_by_id(values.id).await?;
532        account_set.update((values, fields));
533        let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
534        let recorded_at = db.now();
535        let outbox_events: Vec<_> = account_set
536            .events
537            .last_persisted(n_events)
538            .map(|p| OutboxEventPayload::from(&p.event))
539            .collect();
540        self.outbox
541            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
542            .await?;
543        Ok(())
544    }
545
546    #[cfg(feature = "import")]
547    pub(crate) async fn sync_account_set_member_creation(
548        &self,
549        mut db: es_entity::DbOp<'_>,
550        origin: DataSourceId,
551        account_set_id: AccountSetId,
552        member_id: AccountSetMemberId,
553    ) -> Result<(), AccountSetError> {
554        match member_id {
555            AccountSetMemberId::Account(account_id) => {
556                self.repo
557                    .import_member_account_in_op(&mut db, account_set_id, account_id)
558                    .await?;
559            }
560            AccountSetMemberId::AccountSet(account_set_id) => {
561                self.repo
562                    .import_member_set_in_op(&mut db, account_set_id, account_set_id)
563                    .await?;
564            }
565        }
566        let recorded_at = db.now();
567        self.outbox
568            .persist_events_at(
569                db.into_tx(),
570                std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
571                    source: DataSource::Remote { id: origin },
572                    account_set_id,
573                    member_id,
574                }),
575                recorded_at,
576            )
577            .await?;
578        Ok(())
579    }
580
581    #[cfg(feature = "import")]
582    pub(crate) async fn sync_account_set_member_removal(
583        &self,
584        mut db: es_entity::DbOp<'_>,
585        origin: DataSourceId,
586        account_set_id: AccountSetId,
587        member_id: AccountSetMemberId,
588    ) -> Result<(), AccountSetError> {
589        match member_id {
590            AccountSetMemberId::Account(account_id) => {
591                self.repo
592                    .import_remove_member_account(db.tx(), account_set_id, account_id)
593                    .await?;
594            }
595            AccountSetMemberId::AccountSet(account_set_id) => {
596                self.repo
597                    .import_remove_member_set(db.tx(), account_set_id, account_set_id)
598                    .await?;
599            }
600        }
601        let recorded_at = db.now();
602        self.outbox
603            .persist_events_at(
604                db.into_tx(),
605                std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
606                    source: DataSource::Remote { id: origin },
607                    account_set_id,
608                    member_id,
609                }),
610                recorded_at,
611            )
612            .await?;
613        Ok(())
614    }
615}
616
617fn entries_for_add_balance(
618    entries: &mut Vec<NewEntry>,
619    target_account_id: AccountId,
620    balance: BalanceSnapshot,
621) {
622    use rust_decimal::Decimal;
623
624    if balance.settled.cr_balance != Decimal::ZERO {
625        let entry = NewEntry::builder()
626            .id(EntryId::new())
627            .journal_id(balance.journal_id)
628            .account_id(target_account_id)
629            .currency(balance.currency)
630            .sequence(1u32)
631            .layer(Layer::Settled)
632            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
633            .direction(DebitOrCredit::Credit)
634            .units(balance.settled.cr_balance)
635            .transaction_id(UNASSIGNED_TRANSACTION_ID)
636            .build()
637            .expect("Couldn't build entry");
638        entries.push(entry);
639    }
640    if balance.settled.dr_balance != Decimal::ZERO {
641        let entry = NewEntry::builder()
642            .id(EntryId::new())
643            .journal_id(balance.journal_id)
644            .account_id(target_account_id)
645            .currency(balance.currency)
646            .sequence(1u32)
647            .layer(Layer::Settled)
648            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
649            .direction(DebitOrCredit::Debit)
650            .units(balance.settled.dr_balance)
651            .transaction_id(UNASSIGNED_TRANSACTION_ID)
652            .build()
653            .expect("Couldn't build entry");
654        entries.push(entry);
655    }
656    if balance.pending.cr_balance != Decimal::ZERO {
657        let entry = NewEntry::builder()
658            .id(EntryId::new())
659            .journal_id(balance.journal_id)
660            .account_id(target_account_id)
661            .currency(balance.currency)
662            .sequence(1u32)
663            .layer(Layer::Pending)
664            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
665            .direction(DebitOrCredit::Credit)
666            .units(balance.pending.cr_balance)
667            .transaction_id(UNASSIGNED_TRANSACTION_ID)
668            .build()
669            .expect("Couldn't build entry");
670        entries.push(entry);
671    }
672    if balance.pending.dr_balance != Decimal::ZERO {
673        let entry = NewEntry::builder()
674            .id(EntryId::new())
675            .journal_id(balance.journal_id)
676            .account_id(target_account_id)
677            .currency(balance.currency)
678            .sequence(1u32)
679            .layer(Layer::Pending)
680            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
681            .direction(DebitOrCredit::Debit)
682            .units(balance.pending.dr_balance)
683            .transaction_id(UNASSIGNED_TRANSACTION_ID)
684            .build()
685            .expect("Couldn't build entry");
686        entries.push(entry);
687    }
688    if balance.encumbrance.cr_balance != Decimal::ZERO {
689        let entry = NewEntry::builder()
690            .id(EntryId::new())
691            .journal_id(balance.journal_id)
692            .account_id(target_account_id)
693            .currency(balance.currency)
694            .sequence(1u32)
695            .layer(Layer::Encumbrance)
696            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
697            .direction(DebitOrCredit::Credit)
698            .units(balance.encumbrance.cr_balance)
699            .transaction_id(UNASSIGNED_TRANSACTION_ID)
700            .build()
701            .expect("Couldn't build entry");
702        entries.push(entry);
703    }
704    if balance.encumbrance.dr_balance != Decimal::ZERO {
705        let entry = NewEntry::builder()
706            .id(EntryId::new())
707            .journal_id(balance.journal_id)
708            .account_id(target_account_id)
709            .currency(balance.currency)
710            .sequence(1u32)
711            .layer(Layer::Encumbrance)
712            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
713            .direction(DebitOrCredit::Debit)
714            .units(balance.encumbrance.dr_balance)
715            .transaction_id(UNASSIGNED_TRANSACTION_ID)
716            .build()
717            .expect("Couldn't build entry");
718        entries.push(entry);
719    }
720}
721
722fn entries_for_remove_balance(
723    entries: &mut Vec<NewEntry>,
724    target_account_id: AccountId,
725    balance: BalanceSnapshot,
726) {
727    use rust_decimal::Decimal;
728
729    if balance.settled.cr_balance != Decimal::ZERO {
730        let entry = NewEntry::builder()
731            .id(EntryId::new())
732            .journal_id(balance.journal_id)
733            .account_id(target_account_id)
734            .currency(balance.currency)
735            .sequence(1u32)
736            .layer(Layer::Settled)
737            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
738            .direction(DebitOrCredit::Debit)
739            .units(balance.settled.cr_balance)
740            .transaction_id(UNASSIGNED_TRANSACTION_ID)
741            .build()
742            .expect("Couldn't build entry");
743        entries.push(entry);
744    }
745    if balance.settled.dr_balance != Decimal::ZERO {
746        let entry = NewEntry::builder()
747            .id(EntryId::new())
748            .journal_id(balance.journal_id)
749            .account_id(target_account_id)
750            .currency(balance.currency)
751            .sequence(1u32)
752            .layer(Layer::Settled)
753            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
754            .direction(DebitOrCredit::Credit)
755            .units(balance.settled.dr_balance)
756            .transaction_id(UNASSIGNED_TRANSACTION_ID)
757            .build()
758            .expect("Couldn't build entry");
759        entries.push(entry);
760    }
761    if balance.pending.cr_balance != Decimal::ZERO {
762        let entry = NewEntry::builder()
763            .id(EntryId::new())
764            .journal_id(balance.journal_id)
765            .account_id(target_account_id)
766            .currency(balance.currency)
767            .sequence(1u32)
768            .layer(Layer::Pending)
769            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
770            .direction(DebitOrCredit::Debit)
771            .units(balance.pending.cr_balance)
772            .transaction_id(UNASSIGNED_TRANSACTION_ID)
773            .build()
774            .expect("Couldn't build entry");
775        entries.push(entry);
776    }
777    if balance.pending.dr_balance != Decimal::ZERO {
778        let entry = NewEntry::builder()
779            .id(EntryId::new())
780            .journal_id(balance.journal_id)
781            .account_id(target_account_id)
782            .currency(balance.currency)
783            .sequence(1u32)
784            .layer(Layer::Pending)
785            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
786            .direction(DebitOrCredit::Credit)
787            .units(balance.pending.dr_balance)
788            .transaction_id(UNASSIGNED_TRANSACTION_ID)
789            .build()
790            .expect("Couldn't build entry");
791        entries.push(entry);
792    }
793    if balance.encumbrance.cr_balance != Decimal::ZERO {
794        let entry = NewEntry::builder()
795            .id(EntryId::new())
796            .journal_id(balance.journal_id)
797            .account_id(target_account_id)
798            .currency(balance.currency)
799            .sequence(1u32)
800            .layer(Layer::Encumbrance)
801            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
802            .direction(DebitOrCredit::Debit)
803            .units(balance.encumbrance.cr_balance)
804            .transaction_id(UNASSIGNED_TRANSACTION_ID)
805            .build()
806            .expect("Couldn't build entry");
807        entries.push(entry);
808    }
809    if balance.encumbrance.dr_balance != Decimal::ZERO {
810        let entry = NewEntry::builder()
811            .id(EntryId::new())
812            .journal_id(balance.journal_id)
813            .account_id(target_account_id)
814            .currency(balance.currency)
815            .sequence(1u32)
816            .layer(Layer::Encumbrance)
817            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
818            .direction(DebitOrCredit::Credit)
819            .units(balance.encumbrance.dr_balance)
820            .transaction_id(UNASSIGNED_TRANSACTION_ID)
821            .build()
822            .expect("Couldn't build entry");
823        entries.push(entry);
824    }
825}
826
827impl From<&AccountSetEvent> for OutboxEventPayload {
828    fn from(event: &AccountSetEvent) -> Self {
829        match event {
830            #[cfg(feature = "import")]
831            AccountSetEvent::Imported {
832                source,
833                values: account_set,
834            } => OutboxEventPayload::AccountSetCreated {
835                source: *source,
836                account_set: account_set.clone(),
837            },
838            AccountSetEvent::Initialized {
839                values: account_set,
840            } => OutboxEventPayload::AccountSetCreated {
841                source: DataSource::Local,
842                account_set: account_set.clone(),
843            },
844            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
845                source: DataSource::Local,
846                account_set: values.clone(),
847                fields: fields.clone(),
848            },
849        }
850    }
851}