cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13    account::*,
14    balance::*,
15    entry::*,
16    ledger_operation::*,
17    outbox::*,
18    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27#[allow(dead_code)]
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32    repo: AccountSetRepo,
33    accounts: Accounts,
34    entries: Entries,
35    balances: Balances,
36    outbox: Outbox,
37    pool: PgPool,
38}
39
40impl AccountSets {
41    pub(crate) fn new(
42        pool: &PgPool,
43        outbox: Outbox,
44        accounts: &Accounts,
45        entries: &Entries,
46        balances: &Balances,
47    ) -> Self {
48        Self {
49            repo: AccountSetRepo::new(pool),
50            outbox,
51            accounts: accounts.clone(),
52            entries: entries.clone(),
53            balances: balances.clone(),
54            pool: pool.clone(),
55        }
56    }
57    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58    pub async fn create(
59        &self,
60        new_account_set: NewAccountSet,
61    ) -> Result<AccountSet, AccountSetError> {
62        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63        let account_set = self.create_in_op(&mut op, new_account_set).await?;
64        op.commit().await?;
65        Ok(account_set)
66    }
67
68    #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69    pub async fn create_in_op(
70        &self,
71        db: &mut LedgerOperation<'_>,
72        new_account_set: NewAccountSet,
73    ) -> Result<AccountSet, AccountSetError> {
74        let new_account = NewAccount::builder()
75            .id(uuid::Uuid::from(new_account_set.id))
76            .name(String::new())
77            .code(new_account_set.id.to_string())
78            .normal_balance_type(new_account_set.normal_balance_type)
79            .is_account_set(true)
80            .build()
81            .expect("Failed to build account");
82        self.accounts.create_in_op(db, new_account).await?;
83        let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84        db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
85        Ok(account_set)
86    }
87
88    pub async fn create_all(
89        &self,
90        new_account_sets: Vec<NewAccountSet>,
91    ) -> Result<Vec<AccountSet>, AccountSetError> {
92        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94        op.commit().await?;
95        Ok(account_sets)
96    }
97
98    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99    pub async fn create_all_in_op(
100        &self,
101        db: &mut LedgerOperation<'_>,
102        new_account_sets: Vec<NewAccountSet>,
103    ) -> Result<Vec<AccountSet>, AccountSetError> {
104        let mut new_accounts = Vec::new();
105        for new_account_set in new_account_sets.iter() {
106            let new_account = NewAccount::builder()
107                .id(uuid::Uuid::from(new_account_set.id))
108                .name(String::new())
109                .code(new_account_set.id.to_string())
110                .normal_balance_type(new_account_set.normal_balance_type)
111                .is_account_set(true)
112                .build()
113                .expect("Failed to build account");
114            new_accounts.push(new_account);
115        }
116        self.accounts.create_all_in_op(db, new_accounts).await?;
117        let account_sets = self
118            .repo
119            .create_all_in_op(db.op(), new_account_sets)
120            .await?;
121        db.accumulate(
122            account_sets
123                .iter()
124                .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
125        );
126        Ok(account_sets)
127    }
128
129    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
130    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
131        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
132        self.persist_in_op(&mut op, account_set).await?;
133        op.commit().await?;
134        Ok(())
135    }
136
137    #[instrument(
138        name = "cala_ledger.account_sets.persist_in_op",
139        skip(self, db, account_set)
140    )]
141    pub async fn persist_in_op(
142        &self,
143        db: &mut LedgerOperation<'_>,
144        account_set: &mut AccountSet,
145    ) -> Result<(), AccountSetError> {
146        let n_events = self.repo.update_in_op(db.op(), account_set).await?;
147        db.accumulate(
148            account_set
149                .events
150                .last_persisted(n_events)
151                .map(|p| &p.event),
152        );
153        Ok(())
154    }
155
156    pub async fn add_member(
157        &self,
158        account_set_id: AccountSetId,
159        member: impl Into<AccountSetMemberId>,
160    ) -> Result<AccountSet, AccountSetError> {
161        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
162        let account_set = self
163            .add_member_in_op(&mut op, account_set_id, member)
164            .await?;
165        op.commit().await?;
166        Ok(account_set)
167    }
168
169    pub async fn add_member_in_op(
170        &self,
171        op: &mut LedgerOperation<'_>,
172        account_set_id: AccountSetId,
173        member: impl Into<AccountSetMemberId>,
174    ) -> Result<AccountSet, AccountSetError> {
175        let member = member.into();
176        let (time, parents, account_set, member_id) = match member {
177            AccountSetMemberId::Account(id) => {
178                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
179                let (time, parents) = self
180                    .repo
181                    .add_member_account_and_return_parents(op.tx(), account_set_id, id)
182                    .await?;
183                (time, parents, set, id)
184            }
185            AccountSetMemberId::AccountSet(id) => {
186                let mut accounts = self
187                    .repo
188                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
189                    .await?;
190                let target = accounts
191                    .remove(&account_set_id)
192                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
193                let member = accounts
194                    .remove(&id)
195                    .ok_or(AccountSetError::CouldNotFindById(id))?;
196
197                if target.values().journal_id != member.values().journal_id {
198                    return Err(AccountSetError::JournalIdMismatch);
199                }
200
201                let (time, parents) = self
202                    .repo
203                    .add_member_set_and_return_parents(op.tx(), account_set_id, id)
204                    .await?;
205                (time, parents, target, AccountId::from(id))
206            }
207        };
208
209        op.accumulate(std::iter::once(
210            OutboxEventPayload::AccountSetMemberCreated {
211                source: DataSource::Local,
212                account_set_id,
213                member_id: member,
214            },
215        ));
216
217        let balances = self
218            .balances
219            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
220            .await?;
221
222        let target_account_id = AccountId::from(&account_set.id());
223        let mut entries = Vec::new();
224        for balance in balances.into_values() {
225            entries_for_add_balance(&mut entries, target_account_id, balance);
226        }
227
228        if entries.is_empty() {
229            return Ok(account_set);
230        }
231        let entries = self.entries.create_all_in_op(op, entries).await?;
232        let mappings = std::iter::once((target_account_id, parents)).collect();
233        self.balances
234            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
235            .await?;
236
237        Ok(account_set)
238    }
239
240    pub async fn remove_member(
241        &self,
242        account_set_id: AccountSetId,
243        member: impl Into<AccountSetMemberId>,
244    ) -> Result<AccountSet, AccountSetError> {
245        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
246        let account_set = self
247            .remove_member_in_op(&mut op, account_set_id, member)
248            .await?;
249        op.commit().await?;
250        Ok(account_set)
251    }
252
253    pub async fn remove_member_in_op(
254        &self,
255        op: &mut LedgerOperation<'_>,
256        account_set_id: AccountSetId,
257        member: impl Into<AccountSetMemberId>,
258    ) -> Result<AccountSet, AccountSetError> {
259        let member = member.into();
260        let (time, parents, account_set, member_id) = match member {
261            AccountSetMemberId::Account(id) => {
262                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
263                let (time, parents) = self
264                    .repo
265                    .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
266                    .await?;
267                (time, parents, set, id)
268            }
269            AccountSetMemberId::AccountSet(id) => {
270                let mut accounts = self
271                    .repo
272                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
273                    .await?;
274                let target = accounts
275                    .remove(&account_set_id)
276                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
277                let member = accounts
278                    .remove(&id)
279                    .ok_or(AccountSetError::CouldNotFindById(id))?;
280
281                if target.values().journal_id != member.values().journal_id {
282                    return Err(AccountSetError::JournalIdMismatch);
283                }
284
285                let (time, parents) = self
286                    .repo
287                    .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
288                    .await?;
289                (time, parents, target, AccountId::from(id))
290            }
291        };
292
293        op.accumulate(std::iter::once(
294            OutboxEventPayload::AccountSetMemberRemoved {
295                source: DataSource::Local,
296                account_set_id,
297                member_id: member,
298            },
299        ));
300
301        let balances = self
302            .balances
303            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
304            .await?;
305
306        let target_account_id = AccountId::from(&account_set.id());
307        let mut entries = Vec::new();
308        for balance in balances.into_values() {
309            entries_for_remove_balance(&mut entries, target_account_id, balance);
310        }
311
312        if entries.is_empty() {
313            return Ok(account_set);
314        }
315        let entries = self.entries.create_all_in_op(op, entries).await?;
316        let mappings = std::iter::once((target_account_id, parents)).collect();
317        self.balances
318            .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
319            .await?;
320
321        Ok(account_set)
322    }
323
324    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
325    pub async fn find_all<T: From<AccountSet>>(
326        &self,
327        account_set_ids: &[AccountSetId],
328    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
329        self.repo.find_all(account_set_ids).await
330    }
331
332    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
333    pub async fn find_all_in_op<T: From<AccountSet>>(
334        &self,
335        op: &mut LedgerOperation<'_>,
336        account_set_ids: &[AccountSetId],
337    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
338        self.repo.find_all_in_tx(op.tx(), account_set_ids).await
339    }
340
341    #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
342    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
343        self.repo.find_by_id(account_set_id).await
344    }
345
346    #[instrument(
347        name = "cala_ledger.accounts_sets.find_by_external_id",
348        skip(self),
349        err
350    )]
351    pub async fn find_by_external_id(
352        &self,
353        external_id: String,
354    ) -> Result<AccountSet, AccountSetError> {
355        self.repo.find_by_external_id(Some(external_id)).await
356    }
357
358    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
359    pub async fn find_where_member(
360        &self,
361        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
362        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
363    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
364    {
365        match member.into() {
366            AccountSetMemberId::Account(account_id) => {
367                self.repo
368                    .find_where_account_is_member(account_id, query)
369                    .await
370            }
371            AccountSetMemberId::AccountSet(account_set_id) => {
372                self.repo
373                    .find_where_account_set_is_member(account_set_id, query)
374                    .await
375            }
376        }
377    }
378
379    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
380    pub async fn list_for_name(
381        &self,
382        name: String,
383        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
384    ) -> Result<
385        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
386        AccountSetError,
387    > {
388        self.repo
389            .list_for_name_by_created_at(name, args, Default::default())
390            .await
391    }
392
393    #[instrument(
394        name = "cala_ledger.account_sets.list_for_name_in_op",
395        skip(self, op),
396        err
397    )]
398    pub async fn list_for_name_in_op(
399        &self,
400        op: &mut LedgerOperation<'_>,
401        name: String,
402        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
403    ) -> Result<
404        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
405        AccountSetError,
406    > {
407        self.repo
408            .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
409            .await
410    }
411
412    #[instrument(
413        name = "cala_ledger.account_sets.find_where_member_in_op",
414        skip(self, op),
415        err
416    )]
417    pub async fn find_where_member_in_op(
418        &self,
419        op: &mut LedgerOperation<'_>,
420        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
421        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
422    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
423    {
424        match member.into() {
425            AccountSetMemberId::Account(account_id) => {
426                self.repo
427                    .find_where_account_is_member_in_tx(op.tx(), account_id, query)
428                    .await
429            }
430            AccountSetMemberId::AccountSet(account_set_id) => {
431                self.repo
432                    .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
433                    .await
434            }
435        }
436    }
437
438    pub async fn list_members_by_created_at(
439        &self,
440        id: AccountSetId,
441        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
442    ) -> Result<
443        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
444        AccountSetError,
445    > {
446        self.repo.list_children_by_created_at(id, args).await
447    }
448
449    pub async fn list_members_created_at_in_op(
450        &self,
451        op: &mut LedgerOperation<'_>,
452        id: AccountSetId,
453        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
454    ) -> Result<
455        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
456        AccountSetError,
457    > {
458        self.repo
459            .list_children_by_created_at_in_tx(op.tx(), id, args)
460            .await
461    }
462
463    pub async fn list_members_by_external_id(
464        &self,
465        id: AccountSetId,
466        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
467    ) -> Result<
468        es_entity::PaginatedQueryRet<
469            AccountSetMemberByExternalId,
470            AccountSetMembersByExternalIdCursor,
471        >,
472        AccountSetError,
473    > {
474        self.repo.list_children_by_external_id(id, args).await
475    }
476
477    pub async fn list_members_by_external_id_in_op(
478        &self,
479        op: &mut LedgerOperation<'_>,
480        id: AccountSetId,
481        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
482    ) -> Result<
483        es_entity::PaginatedQueryRet<
484            AccountSetMemberByExternalId,
485            AccountSetMembersByExternalIdCursor,
486        >,
487        AccountSetError,
488    > {
489        self.repo
490            .list_children_by_external_id_in_tx(op.tx(), id, args)
491            .await
492    }
493
494    pub(crate) async fn fetch_mappings(
495        &self,
496        journal_id: JournalId,
497        account_ids: &[AccountId],
498    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
499        self.repo.fetch_mappings(journal_id, account_ids).await
500    }
501
502    #[cfg(feature = "import")]
503    pub async fn sync_account_set_creation(
504        &self,
505        mut db: es_entity::DbOp<'_>,
506        origin: DataSourceId,
507        values: AccountSetValues,
508    ) -> Result<(), AccountSetError> {
509        let mut account_set = AccountSet::import(origin, values);
510        self.repo
511            .import_in_op(&mut db, origin, &mut account_set)
512            .await?;
513        let recorded_at = db.now();
514        let outbox_events: Vec<_> = account_set
515            .events
516            .last_persisted(1)
517            .map(|p| OutboxEventPayload::from(&p.event))
518            .collect();
519        self.outbox
520            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
521            .await?;
522        Ok(())
523    }
524
525    #[cfg(feature = "import")]
526    pub async fn sync_account_set_update(
527        &self,
528        mut db: es_entity::DbOp<'_>,
529        values: AccountSetValues,
530        fields: Vec<String>,
531    ) -> Result<(), AccountSetError> {
532        let mut account_set = self.repo.find_by_id(values.id).await?;
533        account_set.update((values, fields));
534        let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
535        let recorded_at = db.now();
536        let outbox_events: Vec<_> = account_set
537            .events
538            .last_persisted(n_events)
539            .map(|p| OutboxEventPayload::from(&p.event))
540            .collect();
541        self.outbox
542            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
543            .await?;
544        Ok(())
545    }
546
547    #[cfg(feature = "import")]
548    pub async fn sync_account_set_member_creation(
549        &self,
550        mut db: es_entity::DbOp<'_>,
551        origin: DataSourceId,
552        account_set_id: AccountSetId,
553        member_id: AccountSetMemberId,
554    ) -> Result<(), AccountSetError> {
555        match member_id {
556            AccountSetMemberId::Account(account_id) => {
557                self.repo
558                    .import_member_account_in_op(&mut db, account_set_id, account_id)
559                    .await?;
560            }
561            AccountSetMemberId::AccountSet(account_set_id) => {
562                self.repo
563                    .import_member_set_in_op(&mut db, account_set_id, account_set_id)
564                    .await?;
565            }
566        }
567        let recorded_at = db.now();
568        self.outbox
569            .persist_events_at(
570                db.into_tx(),
571                std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
572                    source: DataSource::Remote { id: origin },
573                    account_set_id,
574                    member_id,
575                }),
576                recorded_at,
577            )
578            .await?;
579        Ok(())
580    }
581
582    #[cfg(feature = "import")]
583    pub async fn sync_account_set_member_removal(
584        &self,
585        mut db: es_entity::DbOp<'_>,
586        origin: DataSourceId,
587        account_set_id: AccountSetId,
588        member_id: AccountSetMemberId,
589    ) -> Result<(), AccountSetError> {
590        match member_id {
591            AccountSetMemberId::Account(account_id) => {
592                self.repo
593                    .import_remove_member_account(db.tx(), account_set_id, account_id)
594                    .await?;
595            }
596            AccountSetMemberId::AccountSet(account_set_id) => {
597                self.repo
598                    .import_remove_member_set(db.tx(), account_set_id, account_set_id)
599                    .await?;
600            }
601        }
602        let recorded_at = db.now();
603        self.outbox
604            .persist_events_at(
605                db.into_tx(),
606                std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
607                    source: DataSource::Remote { id: origin },
608                    account_set_id,
609                    member_id,
610                }),
611                recorded_at,
612            )
613            .await?;
614        Ok(())
615    }
616}
617
618fn entries_for_add_balance(
619    entries: &mut Vec<NewEntry>,
620    target_account_id: AccountId,
621    balance: BalanceSnapshot,
622) {
623    use rust_decimal::Decimal;
624
625    if balance.settled.cr_balance != Decimal::ZERO {
626        let entry = NewEntry::builder()
627            .id(EntryId::new())
628            .journal_id(balance.journal_id)
629            .account_id(target_account_id)
630            .currency(balance.currency)
631            .sequence(1u32)
632            .layer(Layer::Settled)
633            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
634            .direction(DebitOrCredit::Credit)
635            .units(balance.settled.cr_balance)
636            .transaction_id(UNASSIGNED_TRANSACTION_ID)
637            .build()
638            .expect("Couldn't build entry");
639        entries.push(entry);
640    }
641    if balance.settled.dr_balance != Decimal::ZERO {
642        let entry = NewEntry::builder()
643            .id(EntryId::new())
644            .journal_id(balance.journal_id)
645            .account_id(target_account_id)
646            .currency(balance.currency)
647            .sequence(1u32)
648            .layer(Layer::Settled)
649            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
650            .direction(DebitOrCredit::Debit)
651            .units(balance.settled.dr_balance)
652            .transaction_id(UNASSIGNED_TRANSACTION_ID)
653            .build()
654            .expect("Couldn't build entry");
655        entries.push(entry);
656    }
657    if balance.pending.cr_balance != Decimal::ZERO {
658        let entry = NewEntry::builder()
659            .id(EntryId::new())
660            .journal_id(balance.journal_id)
661            .account_id(target_account_id)
662            .currency(balance.currency)
663            .sequence(1u32)
664            .layer(Layer::Pending)
665            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
666            .direction(DebitOrCredit::Credit)
667            .units(balance.pending.cr_balance)
668            .transaction_id(UNASSIGNED_TRANSACTION_ID)
669            .build()
670            .expect("Couldn't build entry");
671        entries.push(entry);
672    }
673    if balance.pending.dr_balance != Decimal::ZERO {
674        let entry = NewEntry::builder()
675            .id(EntryId::new())
676            .journal_id(balance.journal_id)
677            .account_id(target_account_id)
678            .currency(balance.currency)
679            .sequence(1u32)
680            .layer(Layer::Pending)
681            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
682            .direction(DebitOrCredit::Debit)
683            .units(balance.pending.dr_balance)
684            .transaction_id(UNASSIGNED_TRANSACTION_ID)
685            .build()
686            .expect("Couldn't build entry");
687        entries.push(entry);
688    }
689    if balance.encumbrance.cr_balance != Decimal::ZERO {
690        let entry = NewEntry::builder()
691            .id(EntryId::new())
692            .journal_id(balance.journal_id)
693            .account_id(target_account_id)
694            .currency(balance.currency)
695            .sequence(1u32)
696            .layer(Layer::Encumbrance)
697            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
698            .direction(DebitOrCredit::Credit)
699            .units(balance.encumbrance.cr_balance)
700            .transaction_id(UNASSIGNED_TRANSACTION_ID)
701            .build()
702            .expect("Couldn't build entry");
703        entries.push(entry);
704    }
705    if balance.encumbrance.dr_balance != Decimal::ZERO {
706        let entry = NewEntry::builder()
707            .id(EntryId::new())
708            .journal_id(balance.journal_id)
709            .account_id(target_account_id)
710            .currency(balance.currency)
711            .sequence(1u32)
712            .layer(Layer::Encumbrance)
713            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
714            .direction(DebitOrCredit::Debit)
715            .units(balance.encumbrance.dr_balance)
716            .transaction_id(UNASSIGNED_TRANSACTION_ID)
717            .build()
718            .expect("Couldn't build entry");
719        entries.push(entry);
720    }
721}
722
723fn entries_for_remove_balance(
724    entries: &mut Vec<NewEntry>,
725    target_account_id: AccountId,
726    balance: BalanceSnapshot,
727) {
728    use rust_decimal::Decimal;
729
730    if balance.settled.cr_balance != Decimal::ZERO {
731        let entry = NewEntry::builder()
732            .id(EntryId::new())
733            .journal_id(balance.journal_id)
734            .account_id(target_account_id)
735            .currency(balance.currency)
736            .sequence(1u32)
737            .layer(Layer::Settled)
738            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
739            .direction(DebitOrCredit::Debit)
740            .units(balance.settled.cr_balance)
741            .transaction_id(UNASSIGNED_TRANSACTION_ID)
742            .build()
743            .expect("Couldn't build entry");
744        entries.push(entry);
745    }
746    if balance.settled.dr_balance != Decimal::ZERO {
747        let entry = NewEntry::builder()
748            .id(EntryId::new())
749            .journal_id(balance.journal_id)
750            .account_id(target_account_id)
751            .currency(balance.currency)
752            .sequence(1u32)
753            .layer(Layer::Settled)
754            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
755            .direction(DebitOrCredit::Credit)
756            .units(balance.settled.dr_balance)
757            .transaction_id(UNASSIGNED_TRANSACTION_ID)
758            .build()
759            .expect("Couldn't build entry");
760        entries.push(entry);
761    }
762    if balance.pending.cr_balance != Decimal::ZERO {
763        let entry = NewEntry::builder()
764            .id(EntryId::new())
765            .journal_id(balance.journal_id)
766            .account_id(target_account_id)
767            .currency(balance.currency)
768            .sequence(1u32)
769            .layer(Layer::Pending)
770            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
771            .direction(DebitOrCredit::Debit)
772            .units(balance.pending.cr_balance)
773            .transaction_id(UNASSIGNED_TRANSACTION_ID)
774            .build()
775            .expect("Couldn't build entry");
776        entries.push(entry);
777    }
778    if balance.pending.dr_balance != Decimal::ZERO {
779        let entry = NewEntry::builder()
780            .id(EntryId::new())
781            .journal_id(balance.journal_id)
782            .account_id(target_account_id)
783            .currency(balance.currency)
784            .sequence(1u32)
785            .layer(Layer::Pending)
786            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
787            .direction(DebitOrCredit::Credit)
788            .units(balance.pending.dr_balance)
789            .transaction_id(UNASSIGNED_TRANSACTION_ID)
790            .build()
791            .expect("Couldn't build entry");
792        entries.push(entry);
793    }
794    if balance.encumbrance.cr_balance != Decimal::ZERO {
795        let entry = NewEntry::builder()
796            .id(EntryId::new())
797            .journal_id(balance.journal_id)
798            .account_id(target_account_id)
799            .currency(balance.currency)
800            .sequence(1u32)
801            .layer(Layer::Encumbrance)
802            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
803            .direction(DebitOrCredit::Debit)
804            .units(balance.encumbrance.cr_balance)
805            .transaction_id(UNASSIGNED_TRANSACTION_ID)
806            .build()
807            .expect("Couldn't build entry");
808        entries.push(entry);
809    }
810    if balance.encumbrance.dr_balance != Decimal::ZERO {
811        let entry = NewEntry::builder()
812            .id(EntryId::new())
813            .journal_id(balance.journal_id)
814            .account_id(target_account_id)
815            .currency(balance.currency)
816            .sequence(1u32)
817            .layer(Layer::Encumbrance)
818            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
819            .direction(DebitOrCredit::Credit)
820            .units(balance.encumbrance.dr_balance)
821            .transaction_id(UNASSIGNED_TRANSACTION_ID)
822            .build()
823            .expect("Couldn't build entry");
824        entries.push(entry);
825    }
826}
827
828impl From<&AccountSetEvent> for OutboxEventPayload {
829    fn from(event: &AccountSetEvent) -> Self {
830        match event {
831            #[cfg(feature = "import")]
832            AccountSetEvent::Imported {
833                source,
834                values: account_set,
835            } => OutboxEventPayload::AccountSetCreated {
836                source: *source,
837                account_set: account_set.clone(),
838            },
839            AccountSetEvent::Initialized {
840                values: account_set,
841            } => OutboxEventPayload::AccountSetCreated {
842                source: DataSource::Local,
843                account_set: account_set.clone(),
844            },
845            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
846                source: DataSource::Local,
847                account_set: values.clone(),
848                fields: fields.clone(),
849            },
850        }
851    }
852}