cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14    account::*,
15    balance::*,
16    entry::*,
17    ledger_operation::*,
18    outbox::*,
19    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
20};
21
22pub use cursor::*;
23pub use entity::*;
24use error::*;
25use repo::*;
26pub use repo::{account_set_cursor::*, members_cursor::*};
27
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32    repo: AccountSetRepo,
33    accounts: Accounts,
34    entries: Entries,
35    balances: Balances,
36    outbox: Outbox,
37    pool: PgPool,
38}
39
40impl AccountSets {
41    pub(crate) fn new(
42        pool: &PgPool,
43        outbox: Outbox,
44        accounts: &Accounts,
45        entries: &Entries,
46        balances: &Balances,
47    ) -> Self {
48        Self {
49            repo: AccountSetRepo::new(pool),
50            outbox,
51            accounts: accounts.clone(),
52            entries: entries.clone(),
53            balances: balances.clone(),
54            pool: pool.clone(),
55        }
56    }
57    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58    pub async fn create(
59        &self,
60        new_account_set: NewAccountSet,
61    ) -> Result<AccountSet, AccountSetError> {
62        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63        let account_set = self.create_in_op(&mut op, new_account_set).await?;
64        op.commit().await?;
65        Ok(account_set)
66    }
67
68    #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69    pub async fn create_in_op(
70        &self,
71        db: &mut LedgerOperation<'_>,
72        new_account_set: NewAccountSet,
73    ) -> Result<AccountSet, AccountSetError> {
74        let new_account = NewAccount::builder()
75            .id(uuid::Uuid::from(new_account_set.id))
76            .name(String::new())
77            .code(new_account_set.id.to_string())
78            .normal_balance_type(new_account_set.normal_balance_type)
79            .is_account_set(true)
80            .build()
81            .expect("Failed to build account");
82        self.accounts.create_in_op(db, new_account).await?;
83        let account_set = self.repo.create_in_op(db, new_account_set).await?;
84        db.accumulate(account_set.last_persisted(1).map(|p| &p.event));
85        Ok(account_set)
86    }
87
88    pub async fn create_all(
89        &self,
90        new_account_sets: Vec<NewAccountSet>,
91    ) -> Result<Vec<AccountSet>, AccountSetError> {
92        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94        op.commit().await?;
95        Ok(account_sets)
96    }
97
98    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99    pub async fn create_all_in_op(
100        &self,
101        db: &mut LedgerOperation<'_>,
102        new_account_sets: Vec<NewAccountSet>,
103    ) -> Result<Vec<AccountSet>, AccountSetError> {
104        let mut new_accounts = Vec::new();
105        for new_account_set in new_account_sets.iter() {
106            let new_account = NewAccount::builder()
107                .id(uuid::Uuid::from(new_account_set.id))
108                .name(String::new())
109                .code(new_account_set.id.to_string())
110                .normal_balance_type(new_account_set.normal_balance_type)
111                .is_account_set(true)
112                .build()
113                .expect("Failed to build account");
114            new_accounts.push(new_account);
115        }
116        self.accounts.create_all_in_op(db, new_accounts).await?;
117        let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
118        db.accumulate(
119            account_sets
120                .iter()
121                .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
122        );
123        Ok(account_sets)
124    }
125
126    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
127    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
128        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
129        self.persist_in_op(&mut op, account_set).await?;
130        op.commit().await?;
131        Ok(())
132    }
133
134    #[instrument(
135        name = "cala_ledger.account_sets.persist_in_op",
136        skip(self, db, account_set)
137    )]
138    pub async fn persist_in_op(
139        &self,
140        db: &mut LedgerOperation<'_>,
141        account_set: &mut AccountSet,
142    ) -> Result<(), AccountSetError> {
143        let n_events = self.repo.update_in_op(db, account_set).await?;
144        db.accumulate(account_set.last_persisted(n_events).map(|p| &p.event));
145        Ok(())
146    }
147
148    pub async fn add_member(
149        &self,
150        account_set_id: AccountSetId,
151        member: impl Into<AccountSetMemberId>,
152    ) -> Result<AccountSet, AccountSetError> {
153        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
154        let account_set = self
155            .add_member_in_op(&mut op, account_set_id, member)
156            .await?;
157        op.commit().await?;
158        Ok(account_set)
159    }
160
161    pub async fn add_member_in_op(
162        &self,
163        op: &mut LedgerOperation<'_>,
164        account_set_id: AccountSetId,
165        member: impl Into<AccountSetMemberId>,
166    ) -> Result<AccountSet, AccountSetError> {
167        let member = member.into();
168        let (time, parents, account_set, member_id) = match member {
169            AccountSetMemberId::Account(id) => {
170                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
171                let (time, parents) = self
172                    .repo
173                    .add_member_account_and_return_parents(&mut *op, account_set_id, id)
174                    .await?;
175                (time, parents, set, id)
176            }
177            AccountSetMemberId::AccountSet(id) => {
178                let mut accounts = self
179                    .repo
180                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
181                    .await?;
182                let target = accounts
183                    .remove(&account_set_id)
184                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
185                let member = accounts
186                    .remove(&id)
187                    .ok_or(AccountSetError::CouldNotFindById(id))?;
188
189                if target.values().journal_id != member.values().journal_id {
190                    return Err(AccountSetError::JournalIdMismatch);
191                }
192
193                let (time, parents) = self
194                    .repo
195                    .add_member_set_and_return_parents(op, account_set_id, id)
196                    .await?;
197                (time, parents, target, AccountId::from(id))
198            }
199        };
200
201        op.accumulate(std::iter::once(
202            OutboxEventPayload::AccountSetMemberCreated {
203                source: DataSource::Local,
204                account_set_id,
205                member_id: member,
206            },
207        ));
208
209        let balances = self
210            .balances
211            .find_balances_for_update(op, account_set.values().journal_id, member_id)
212            .await?;
213
214        let target_account_id = AccountId::from(&account_set.id());
215        let mut entries = Vec::new();
216        for balance in balances.into_values() {
217            entries_for_add_balance(&mut entries, target_account_id, balance);
218        }
219
220        if entries.is_empty() {
221            return Ok(account_set);
222        }
223        let entries = self.entries.create_all_in_op(op, entries).await?;
224        let mappings = std::iter::once((target_account_id, parents)).collect();
225        self.balances
226            .update_balances_in_op(
227                op,
228                account_set.values().journal_id,
229                entries,
230                time.date_naive(),
231                time,
232                mappings,
233            )
234            .await?;
235
236        Ok(account_set)
237    }
238
239    pub async fn remove_member(
240        &self,
241        account_set_id: AccountSetId,
242        member: impl Into<AccountSetMemberId>,
243    ) -> Result<AccountSet, AccountSetError> {
244        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
245        let account_set = self
246            .remove_member_in_op(&mut op, account_set_id, member)
247            .await?;
248        op.commit().await?;
249        Ok(account_set)
250    }
251
252    pub async fn remove_member_in_op(
253        &self,
254        op: &mut LedgerOperation<'_>,
255        account_set_id: AccountSetId,
256        member: impl Into<AccountSetMemberId>,
257    ) -> Result<AccountSet, AccountSetError> {
258        let member = member.into();
259        let (time, parents, account_set, member_id) = match member {
260            AccountSetMemberId::Account(id) => {
261                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
262                let (time, parents) = self
263                    .repo
264                    .remove_member_account_and_return_parents(op, account_set_id, id)
265                    .await?;
266                (time, parents, set, id)
267            }
268            AccountSetMemberId::AccountSet(id) => {
269                let mut accounts = self
270                    .repo
271                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
272                    .await?;
273                let target = accounts
274                    .remove(&account_set_id)
275                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
276                let member = accounts
277                    .remove(&id)
278                    .ok_or(AccountSetError::CouldNotFindById(id))?;
279
280                if target.values().journal_id != member.values().journal_id {
281                    return Err(AccountSetError::JournalIdMismatch);
282                }
283
284                let (time, parents) = self
285                    .repo
286                    .remove_member_set_and_return_parents(op, account_set_id, id)
287                    .await?;
288                (time, parents, target, AccountId::from(id))
289            }
290        };
291
292        op.accumulate(std::iter::once(
293            OutboxEventPayload::AccountSetMemberRemoved {
294                source: DataSource::Local,
295                account_set_id,
296                member_id: member,
297            },
298        ));
299
300        let balances = self
301            .balances
302            .find_balances_for_update(op, account_set.values().journal_id, member_id)
303            .await?;
304
305        let target_account_id = AccountId::from(&account_set.id());
306        let mut entries = Vec::new();
307        for balance in balances.into_values() {
308            entries_for_remove_balance(&mut entries, target_account_id, balance);
309        }
310
311        if entries.is_empty() {
312            return Ok(account_set);
313        }
314        let entries = self.entries.create_all_in_op(op, entries).await?;
315        let mappings = std::iter::once((target_account_id, parents)).collect();
316        self.balances
317            .update_balances_in_op(
318                op,
319                account_set.values().journal_id,
320                entries,
321                time.date_naive(),
322                time,
323                mappings,
324            )
325            .await?;
326
327        Ok(account_set)
328    }
329
330    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
331    pub async fn find_all<T: From<AccountSet>>(
332        &self,
333        account_set_ids: &[AccountSetId],
334    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
335        self.repo.find_all(account_set_ids).await
336    }
337
338    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
339    pub async fn find_all_in_op<T: From<AccountSet>>(
340        &self,
341        op: &mut LedgerOperation<'_>,
342        account_set_ids: &[AccountSetId],
343    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
344        self.repo.find_all_in_op(op, account_set_ids).await
345    }
346
347    #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
348    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
349        self.repo.find_by_id(account_set_id).await
350    }
351
352    #[instrument(
353        name = "cala_ledger.accounts_sets.find_by_external_id",
354        skip(self),
355        err
356    )]
357    pub async fn find_by_external_id(
358        &self,
359        external_id: String,
360    ) -> Result<AccountSet, AccountSetError> {
361        self.repo.find_by_external_id(Some(external_id)).await
362    }
363
364    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
365    pub async fn find_where_member(
366        &self,
367        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
368        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
369    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
370    {
371        match member.into() {
372            AccountSetMemberId::Account(account_id) => {
373                self.repo
374                    .find_where_account_is_member(account_id, query)
375                    .await
376            }
377            AccountSetMemberId::AccountSet(account_set_id) => {
378                self.repo
379                    .find_where_account_set_is_member(account_set_id, query)
380                    .await
381            }
382        }
383    }
384
385    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
386    pub async fn list_for_name(
387        &self,
388        name: String,
389        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
390    ) -> Result<
391        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
392        AccountSetError,
393    > {
394        self.repo
395            .list_for_name_by_created_at(name, args, Default::default())
396            .await
397    }
398
399    #[instrument(
400        name = "cala_ledger.account_sets.list_for_name_in_op",
401        skip(self, op),
402        err
403    )]
404    pub async fn list_for_name_in_op(
405        &self,
406        op: &mut LedgerOperation<'_>,
407        name: String,
408        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
409    ) -> Result<
410        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
411        AccountSetError,
412    > {
413        self.repo
414            .list_for_name_by_created_at_in_op(op, name, args, Default::default())
415            .await
416    }
417
418    #[instrument(
419        name = "cala_ledger.account_sets.find_where_member_in_op",
420        skip(self, op),
421        err
422    )]
423    pub async fn find_where_member_in_op(
424        &self,
425        op: &mut LedgerOperation<'_>,
426        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
427        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
428    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
429    {
430        match member.into() {
431            AccountSetMemberId::Account(account_id) => {
432                self.repo
433                    .find_where_account_is_member_in_op(op, account_id, query)
434                    .await
435            }
436            AccountSetMemberId::AccountSet(account_set_id) => {
437                self.repo
438                    .find_where_account_set_is_member_in_op(op, account_set_id, query)
439                    .await
440            }
441        }
442    }
443
444    pub async fn list_members_by_created_at(
445        &self,
446        id: AccountSetId,
447        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
448    ) -> Result<
449        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
450        AccountSetError,
451    > {
452        self.repo.list_children_by_created_at(id, args).await
453    }
454
455    pub async fn list_members_by_created_at_in_op(
456        &self,
457        op: &mut LedgerOperation<'_>,
458        id: AccountSetId,
459        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
460    ) -> Result<
461        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
462        AccountSetError,
463    > {
464        self.repo
465            .list_children_by_created_at_in_op(op, id, args)
466            .await
467    }
468
469    pub async fn list_members_by_external_id(
470        &self,
471        id: AccountSetId,
472        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
473    ) -> Result<
474        es_entity::PaginatedQueryRet<
475            AccountSetMemberByExternalId,
476            AccountSetMembersByExternalIdCursor,
477        >,
478        AccountSetError,
479    > {
480        self.repo.list_children_by_external_id(id, args).await
481    }
482
483    pub async fn list_members_by_external_id_in_op(
484        &self,
485        op: &mut LedgerOperation<'_>,
486        id: AccountSetId,
487        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
488    ) -> Result<
489        es_entity::PaginatedQueryRet<
490            AccountSetMemberByExternalId,
491            AccountSetMembersByExternalIdCursor,
492        >,
493        AccountSetError,
494    > {
495        self.repo
496            .list_children_by_external_id_in_op(op, id, args)
497            .await
498    }
499
500    pub(crate) async fn fetch_mappings_in_op(
501        &self,
502        op: &mut LedgerOperation<'_>,
503        journal_id: JournalId,
504        account_ids: &[AccountId],
505    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
506        self.repo
507            .fetch_mappings_in_op(op, journal_id, account_ids)
508            .await
509    }
510
511    #[cfg(feature = "import")]
512    pub(crate) async fn sync_account_set_creation(
513        &self,
514        mut db: es_entity::DbOpWithTime<'_>,
515        origin: DataSourceId,
516        values: AccountSetValues,
517    ) -> Result<(), AccountSetError> {
518        let mut account_set = AccountSet::import(origin, values);
519        self.repo
520            .import_in_op(&mut db, origin, &mut account_set)
521            .await?;
522        let outbox_events: Vec<_> = account_set
523            .last_persisted(1)
524            .map(|p| OutboxEventPayload::from(&p.event))
525            .collect();
526        let time = db.now();
527        self.outbox
528            .persist_events_at(db, outbox_events, time)
529            .await?;
530        Ok(())
531    }
532
533    #[cfg(feature = "import")]
534    pub(crate) async fn sync_account_set_update(
535        &self,
536        mut db: es_entity::DbOpWithTime<'_>,
537        values: AccountSetValues,
538        fields: Vec<String>,
539    ) -> Result<(), AccountSetError> {
540        let mut account_set = self.repo.find_by_id(values.id).await?;
541        account_set.update((values, fields));
542        let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
543        let outbox_events: Vec<_> = account_set
544            .last_persisted(n_events)
545            .map(|p| OutboxEventPayload::from(&p.event))
546            .collect();
547        let time = db.now();
548        self.outbox
549            .persist_events_at(db, outbox_events, time)
550            .await?;
551        Ok(())
552    }
553
554    #[cfg(feature = "import")]
555    pub(crate) async fn sync_account_set_member_creation(
556        &self,
557        mut db: es_entity::DbOpWithTime<'_>,
558        origin: DataSourceId,
559        account_set_id: AccountSetId,
560        member_id: AccountSetMemberId,
561    ) -> Result<(), AccountSetError> {
562        match member_id {
563            AccountSetMemberId::Account(account_id) => {
564                self.repo
565                    .import_member_account_in_op(&mut db, account_set_id, account_id)
566                    .await?;
567            }
568            AccountSetMemberId::AccountSet(account_set_id) => {
569                self.repo
570                    .import_member_set_in_op(&mut db, account_set_id, account_set_id)
571                    .await?;
572            }
573        }
574        let time = db.now();
575        self.outbox
576            .persist_events_at(
577                db,
578                std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
579                    source: DataSource::Remote { id: origin },
580                    account_set_id,
581                    member_id,
582                }),
583                time,
584            )
585            .await?;
586        Ok(())
587    }
588
589    #[cfg(feature = "import")]
590    pub(crate) async fn sync_account_set_member_removal(
591        &self,
592        mut db: es_entity::DbOpWithTime<'_>,
593        origin: DataSourceId,
594        account_set_id: AccountSetId,
595        member_id: AccountSetMemberId,
596    ) -> Result<(), AccountSetError> {
597        match member_id {
598            AccountSetMemberId::Account(account_id) => {
599                self.repo
600                    .import_remove_member_account(&mut db, account_set_id, account_id)
601                    .await?;
602            }
603            AccountSetMemberId::AccountSet(account_set_id) => {
604                self.repo
605                    .import_remove_member_set(&mut db, account_set_id, account_set_id)
606                    .await?;
607            }
608        }
609        let time = db.now();
610        self.outbox
611            .persist_events_at(
612                db,
613                std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
614                    source: DataSource::Remote { id: origin },
615                    account_set_id,
616                    member_id,
617                }),
618                time,
619            )
620            .await?;
621        Ok(())
622    }
623}
624
625fn entries_for_add_balance(
626    entries: &mut Vec<NewEntry>,
627    target_account_id: AccountId,
628    balance: BalanceSnapshot,
629) {
630    use rust_decimal::Decimal;
631
632    if balance.settled.cr_balance != Decimal::ZERO {
633        let entry = NewEntry::builder()
634            .id(EntryId::new())
635            .journal_id(balance.journal_id)
636            .account_id(target_account_id)
637            .currency(balance.currency)
638            .sequence(1u32)
639            .layer(Layer::Settled)
640            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
641            .direction(DebitOrCredit::Credit)
642            .units(balance.settled.cr_balance)
643            .transaction_id(UNASSIGNED_TRANSACTION_ID)
644            .build()
645            .expect("Couldn't build entry");
646        entries.push(entry);
647    }
648    if balance.settled.dr_balance != Decimal::ZERO {
649        let entry = NewEntry::builder()
650            .id(EntryId::new())
651            .journal_id(balance.journal_id)
652            .account_id(target_account_id)
653            .currency(balance.currency)
654            .sequence(1u32)
655            .layer(Layer::Settled)
656            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
657            .direction(DebitOrCredit::Debit)
658            .units(balance.settled.dr_balance)
659            .transaction_id(UNASSIGNED_TRANSACTION_ID)
660            .build()
661            .expect("Couldn't build entry");
662        entries.push(entry);
663    }
664    if balance.pending.cr_balance != Decimal::ZERO {
665        let entry = NewEntry::builder()
666            .id(EntryId::new())
667            .journal_id(balance.journal_id)
668            .account_id(target_account_id)
669            .currency(balance.currency)
670            .sequence(1u32)
671            .layer(Layer::Pending)
672            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
673            .direction(DebitOrCredit::Credit)
674            .units(balance.pending.cr_balance)
675            .transaction_id(UNASSIGNED_TRANSACTION_ID)
676            .build()
677            .expect("Couldn't build entry");
678        entries.push(entry);
679    }
680    if balance.pending.dr_balance != Decimal::ZERO {
681        let entry = NewEntry::builder()
682            .id(EntryId::new())
683            .journal_id(balance.journal_id)
684            .account_id(target_account_id)
685            .currency(balance.currency)
686            .sequence(1u32)
687            .layer(Layer::Pending)
688            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
689            .direction(DebitOrCredit::Debit)
690            .units(balance.pending.dr_balance)
691            .transaction_id(UNASSIGNED_TRANSACTION_ID)
692            .build()
693            .expect("Couldn't build entry");
694        entries.push(entry);
695    }
696    if balance.encumbrance.cr_balance != Decimal::ZERO {
697        let entry = NewEntry::builder()
698            .id(EntryId::new())
699            .journal_id(balance.journal_id)
700            .account_id(target_account_id)
701            .currency(balance.currency)
702            .sequence(1u32)
703            .layer(Layer::Encumbrance)
704            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
705            .direction(DebitOrCredit::Credit)
706            .units(balance.encumbrance.cr_balance)
707            .transaction_id(UNASSIGNED_TRANSACTION_ID)
708            .build()
709            .expect("Couldn't build entry");
710        entries.push(entry);
711    }
712    if balance.encumbrance.dr_balance != Decimal::ZERO {
713        let entry = NewEntry::builder()
714            .id(EntryId::new())
715            .journal_id(balance.journal_id)
716            .account_id(target_account_id)
717            .currency(balance.currency)
718            .sequence(1u32)
719            .layer(Layer::Encumbrance)
720            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
721            .direction(DebitOrCredit::Debit)
722            .units(balance.encumbrance.dr_balance)
723            .transaction_id(UNASSIGNED_TRANSACTION_ID)
724            .build()
725            .expect("Couldn't build entry");
726        entries.push(entry);
727    }
728}
729
730fn entries_for_remove_balance(
731    entries: &mut Vec<NewEntry>,
732    target_account_id: AccountId,
733    balance: BalanceSnapshot,
734) {
735    use rust_decimal::Decimal;
736
737    if balance.settled.cr_balance != Decimal::ZERO {
738        let entry = NewEntry::builder()
739            .id(EntryId::new())
740            .journal_id(balance.journal_id)
741            .account_id(target_account_id)
742            .currency(balance.currency)
743            .sequence(1u32)
744            .layer(Layer::Settled)
745            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
746            .direction(DebitOrCredit::Debit)
747            .units(balance.settled.cr_balance)
748            .transaction_id(UNASSIGNED_TRANSACTION_ID)
749            .build()
750            .expect("Couldn't build entry");
751        entries.push(entry);
752    }
753    if balance.settled.dr_balance != Decimal::ZERO {
754        let entry = NewEntry::builder()
755            .id(EntryId::new())
756            .journal_id(balance.journal_id)
757            .account_id(target_account_id)
758            .currency(balance.currency)
759            .sequence(1u32)
760            .layer(Layer::Settled)
761            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
762            .direction(DebitOrCredit::Credit)
763            .units(balance.settled.dr_balance)
764            .transaction_id(UNASSIGNED_TRANSACTION_ID)
765            .build()
766            .expect("Couldn't build entry");
767        entries.push(entry);
768    }
769    if balance.pending.cr_balance != Decimal::ZERO {
770        let entry = NewEntry::builder()
771            .id(EntryId::new())
772            .journal_id(balance.journal_id)
773            .account_id(target_account_id)
774            .currency(balance.currency)
775            .sequence(1u32)
776            .layer(Layer::Pending)
777            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
778            .direction(DebitOrCredit::Debit)
779            .units(balance.pending.cr_balance)
780            .transaction_id(UNASSIGNED_TRANSACTION_ID)
781            .build()
782            .expect("Couldn't build entry");
783        entries.push(entry);
784    }
785    if balance.pending.dr_balance != Decimal::ZERO {
786        let entry = NewEntry::builder()
787            .id(EntryId::new())
788            .journal_id(balance.journal_id)
789            .account_id(target_account_id)
790            .currency(balance.currency)
791            .sequence(1u32)
792            .layer(Layer::Pending)
793            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
794            .direction(DebitOrCredit::Credit)
795            .units(balance.pending.dr_balance)
796            .transaction_id(UNASSIGNED_TRANSACTION_ID)
797            .build()
798            .expect("Couldn't build entry");
799        entries.push(entry);
800    }
801    if balance.encumbrance.cr_balance != Decimal::ZERO {
802        let entry = NewEntry::builder()
803            .id(EntryId::new())
804            .journal_id(balance.journal_id)
805            .account_id(target_account_id)
806            .currency(balance.currency)
807            .sequence(1u32)
808            .layer(Layer::Encumbrance)
809            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
810            .direction(DebitOrCredit::Debit)
811            .units(balance.encumbrance.cr_balance)
812            .transaction_id(UNASSIGNED_TRANSACTION_ID)
813            .build()
814            .expect("Couldn't build entry");
815        entries.push(entry);
816    }
817    if balance.encumbrance.dr_balance != Decimal::ZERO {
818        let entry = NewEntry::builder()
819            .id(EntryId::new())
820            .journal_id(balance.journal_id)
821            .account_id(target_account_id)
822            .currency(balance.currency)
823            .sequence(1u32)
824            .layer(Layer::Encumbrance)
825            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
826            .direction(DebitOrCredit::Credit)
827            .units(balance.encumbrance.dr_balance)
828            .transaction_id(UNASSIGNED_TRANSACTION_ID)
829            .build()
830            .expect("Couldn't build entry");
831        entries.push(entry);
832    }
833}
834
835impl From<&AccountSetEvent> for OutboxEventPayload {
836    fn from(event: &AccountSetEvent) -> Self {
837        match event {
838            #[cfg(feature = "import")]
839            AccountSetEvent::Imported {
840                source,
841                values: account_set,
842            } => OutboxEventPayload::AccountSetCreated {
843                source: *source,
844                account_set: account_set.clone(),
845            },
846            AccountSetEvent::Initialized {
847                values: account_set,
848            } => OutboxEventPayload::AccountSetCreated {
849                source: DataSource::Local,
850                account_set: account_set.clone(),
851            },
852            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
853                source: DataSource::Local,
854                account_set: values.clone(),
855                fields: fields.clone(),
856            },
857        }
858    }
859}