cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13    account::*,
14    balance::*,
15    entry::*,
16    outbox::*,
17    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
18};
19
20pub use cursor::*;
21pub use entity::*;
22use error::*;
23use repo::*;
24pub use repo::{account_set_cursor::*, members_cursor::*};
25
26const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
27
28#[derive(Clone)]
29pub struct AccountSets {
30    repo: AccountSetRepo,
31    accounts: Accounts,
32    entries: Entries,
33    balances: Balances,
34}
35
36impl AccountSets {
37    pub(crate) fn new(
38        pool: &PgPool,
39        publisher: &OutboxPublisher,
40        accounts: &Accounts,
41        entries: &Entries,
42        balances: &Balances,
43    ) -> Self {
44        Self {
45            repo: AccountSetRepo::new(pool, publisher),
46            accounts: accounts.clone(),
47            entries: entries.clone(),
48            balances: balances.clone(),
49        }
50    }
51    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
52    pub async fn create(
53        &self,
54        new_account_set: NewAccountSet,
55    ) -> Result<AccountSet, AccountSetError> {
56        let mut op = self.repo.begin_op().await?;
57        let account_set = self.create_in_op(&mut op, new_account_set).await?;
58        op.commit().await?;
59        Ok(account_set)
60    }
61
62    #[instrument(name = "cala_ledger.account_sets.create_in_op", skip(self, db))]
63    pub async fn create_in_op(
64        &self,
65        db: &mut impl es_entity::AtomicOperation,
66        new_account_set: NewAccountSet,
67    ) -> Result<AccountSet, AccountSetError> {
68        let new_account = NewAccount::builder()
69            .id(new_account_set.id)
70            .name(String::new())
71            .code(new_account_set.id.to_string())
72            .normal_balance_type(new_account_set.normal_balance_type)
73            .is_account_set(true)
74            .velocity_context_values(new_account_set.context_values())
75            .build()
76            .expect("Failed to build account");
77        self.accounts.create_in_op(db, new_account).await?;
78
79        let account_set = self.repo.create_in_op(db, new_account_set).await?;
80
81        Ok(account_set)
82    }
83
84    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, new_account_sets), fields(count = new_account_sets.len()))]
85    pub async fn create_all(
86        &self,
87        new_account_sets: Vec<NewAccountSet>,
88    ) -> Result<Vec<AccountSet>, AccountSetError> {
89        let mut op = self.repo.begin_op().await?;
90        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
91        op.commit().await?;
92        Ok(account_sets)
93    }
94
95    #[instrument(name = "cala_ledger.account_sets.create_all_in_op", skip(self, db))]
96    pub async fn create_all_in_op(
97        &self,
98        db: &mut impl es_entity::AtomicOperation,
99        new_account_sets: Vec<NewAccountSet>,
100    ) -> Result<Vec<AccountSet>, AccountSetError> {
101        let mut new_accounts = Vec::new();
102        for new_account_set in new_account_sets.iter() {
103            let new_account = NewAccount::builder()
104                .id(new_account_set.id)
105                .name(String::new())
106                .code(new_account_set.id.to_string())
107                .normal_balance_type(new_account_set.normal_balance_type)
108                .is_account_set(true)
109                .velocity_context_values(new_account_set.context_values())
110                .build()
111                .expect("Failed to build account");
112            new_accounts.push(new_account);
113        }
114        self.accounts.create_all_in_op(db, new_accounts).await?;
115
116        let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
117
118        Ok(account_sets)
119    }
120
121    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
122    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
123        let mut op = self.repo.begin_op().await?;
124        self.persist_in_op(&mut op, account_set).await?;
125        op.commit().await?;
126        Ok(())
127    }
128
129    #[instrument(
130        name = "cala_ledger.account_sets.persist_in_op",
131        skip(self, db, account_set)
132    )]
133    pub async fn persist_in_op(
134        &self,
135        db: &mut impl es_entity::AtomicOperation,
136        account_set: &mut AccountSet,
137    ) -> Result<(), AccountSetError> {
138        self.repo.update_in_op(db, account_set).await?;
139
140        self.accounts
141            .update_velocity_context_values_in_op(db, account_set.values())
142            .await?;
143
144        Ok(())
145    }
146
147    #[instrument(name = "cala_ledger.account_sets.add_member", skip(self, member), fields(account_set_id = %account_set_id))]
148    pub async fn add_member(
149        &self,
150        account_set_id: AccountSetId,
151        member: impl Into<AccountSetMemberId>,
152    ) -> Result<AccountSet, AccountSetError> {
153        let mut op = self.repo.begin_op().await?;
154        let account_set = self
155            .add_member_in_op(&mut op, account_set_id, member)
156            .await?;
157        op.commit().await?;
158        Ok(account_set)
159    }
160
161    #[instrument(name = "cala_ledger.account_sets.add_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id, is_account = tracing::field::Empty, is_account_set = tracing::field::Empty, member_id = tracing::field::Empty))]
162    pub async fn add_member_in_op(
163        &self,
164        op: &mut impl es_entity::AtomicOperation,
165        account_set_id: AccountSetId,
166        member: impl Into<AccountSetMemberId>,
167    ) -> Result<AccountSet, AccountSetError> {
168        let member = member.into();
169        let (time, parents, account_set, member_id) = match member {
170            AccountSetMemberId::Account(id) => {
171                tracing::Span::current().record("is_account", true);
172                tracing::Span::current().record("is_account_set", false);
173                tracing::Span::current().record("member_id", tracing::field::display(&id));
174                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
175                let (time, parents) = self
176                    .repo
177                    .add_member_account_and_return_parents(&mut *op, account_set_id, id)
178                    .await?;
179                (time, parents, set, id)
180            }
181            AccountSetMemberId::AccountSet(id) => {
182                tracing::Span::current().record("is_account", false);
183                tracing::Span::current().record("is_account_set", true);
184                tracing::Span::current().record("member_id", tracing::field::display(&id));
185                let mut accounts = self
186                    .repo
187                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
188                    .await?;
189                let target = accounts
190                    .remove(&account_set_id)
191                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
192                let member = accounts
193                    .remove(&id)
194                    .ok_or(AccountSetError::CouldNotFindById(id))?;
195
196                if target.values().journal_id != member.values().journal_id {
197                    return Err(AccountSetError::JournalIdMismatch);
198                }
199
200                let (time, parents) = self
201                    .repo
202                    .add_member_set_and_return_parents(op, account_set_id, id)
203                    .await?;
204                (time, parents, target, AccountId::from(id))
205            }
206        };
207
208        let balances = self
209            .balances
210            .find_balances_for_update(op, account_set.values().journal_id, member_id)
211            .await?;
212
213        let target_account_id = AccountId::from(&account_set.id());
214        let mut entries = Vec::new();
215        for balance in balances.into_values() {
216            entries_for_add_balance(&mut entries, target_account_id, balance);
217        }
218
219        if entries.is_empty() {
220            return Ok(account_set);
221        }
222        let entries = self.entries.create_all_in_op(op, entries).await?;
223        let mappings = std::iter::once((target_account_id, parents)).collect();
224        self.balances
225            .update_balances_in_op(
226                op,
227                account_set.values().journal_id,
228                entries,
229                time.date_naive(),
230                time,
231                mappings,
232            )
233            .await?;
234
235        Ok(account_set)
236    }
237
238    #[instrument(name = "cala_ledger.account_sets.remove_member", skip(self, member), fields(account_set_id = %account_set_id))]
239    pub async fn remove_member(
240        &self,
241        account_set_id: AccountSetId,
242        member: impl Into<AccountSetMemberId>,
243    ) -> Result<AccountSet, AccountSetError> {
244        let mut op = self.repo.begin_op().await?;
245        let account_set = self
246            .remove_member_in_op(&mut op, account_set_id, member)
247            .await?;
248        op.commit().await?;
249        Ok(account_set)
250    }
251
252    #[instrument(name = "cala_ledger.account_sets.remove_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id))]
253    pub async fn remove_member_in_op(
254        &self,
255        op: &mut impl es_entity::AtomicOperation,
256        account_set_id: AccountSetId,
257        member: impl Into<AccountSetMemberId>,
258    ) -> Result<AccountSet, AccountSetError> {
259        let member = member.into();
260        let (time, parents, account_set, member_id) = match member {
261            AccountSetMemberId::Account(id) => {
262                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
263                let (time, parents) = self
264                    .repo
265                    .remove_member_account_and_return_parents(op, account_set_id, id)
266                    .await?;
267                (time, parents, set, id)
268            }
269            AccountSetMemberId::AccountSet(id) => {
270                let mut accounts = self
271                    .repo
272                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
273                    .await?;
274                let target = accounts
275                    .remove(&account_set_id)
276                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
277                let member = accounts
278                    .remove(&id)
279                    .ok_or(AccountSetError::CouldNotFindById(id))?;
280
281                if target.values().journal_id != member.values().journal_id {
282                    return Err(AccountSetError::JournalIdMismatch);
283                }
284
285                let (time, parents) = self
286                    .repo
287                    .remove_member_set_and_return_parents(op, account_set_id, id)
288                    .await?;
289                (time, parents, target, AccountId::from(id))
290            }
291        };
292
293        let balances = self
294            .balances
295            .find_balances_for_update(op, account_set.values().journal_id, member_id)
296            .await?;
297
298        let target_account_id = AccountId::from(&account_set.id());
299        let mut entries = Vec::new();
300        for balance in balances.into_values() {
301            entries_for_remove_balance(&mut entries, target_account_id, balance);
302        }
303
304        if entries.is_empty() {
305            return Ok(account_set);
306        }
307        let entries = self.entries.create_all_in_op(op, entries).await?;
308        let mappings = std::iter::once((target_account_id, parents)).collect();
309        self.balances
310            .update_balances_in_op(
311                op,
312                account_set.values().journal_id,
313                entries,
314                time.date_naive(),
315                time,
316                mappings,
317            )
318            .await?;
319
320        Ok(account_set)
321    }
322
323    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self))]
324    pub async fn find_all<T: From<AccountSet>>(
325        &self,
326        account_set_ids: &[AccountSetId],
327    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
328        self.repo.find_all(account_set_ids).await
329    }
330
331    #[instrument(name = "cala_ledger.account_sets.find_all_in_op", skip(self, op))]
332    pub async fn find_all_in_op<T: From<AccountSet>>(
333        &self,
334        op: &mut impl es_entity::AtomicOperation,
335        account_set_ids: &[AccountSetId],
336    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
337        self.repo.find_all_in_op(op, account_set_ids).await
338    }
339
340    #[instrument(name = "cala_ledger.account_sets.find", skip(self))]
341    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
342        self.repo.find_by_id(account_set_id).await
343    }
344
345    #[instrument(name = "cala_ledger.account_sets.find_in_op", skip(self, op))]
346    pub async fn find_in_op(
347        &self,
348        op: &mut impl es_entity::AtomicOperation,
349        account_set_id: AccountSetId,
350    ) -> Result<AccountSet, AccountSetError> {
351        self.repo.find_by_id_in_op(op, account_set_id).await
352    }
353
354    #[instrument(name = "cala_ledger.accounts_sets.find_by_external_id", skip(self))]
355    pub async fn find_by_external_id(
356        &self,
357        external_id: String,
358    ) -> Result<AccountSet, AccountSetError> {
359        self.repo.find_by_external_id(Some(external_id)).await
360    }
361
362    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self))]
363    pub async fn find_where_member(
364        &self,
365        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
366        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
367    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
368    {
369        match member.into() {
370            AccountSetMemberId::Account(account_id) => {
371                self.repo
372                    .find_where_account_is_member(account_id, query)
373                    .await
374            }
375            AccountSetMemberId::AccountSet(account_set_id) => {
376                self.repo
377                    .find_where_account_set_is_member(account_set_id, query)
378                    .await
379            }
380        }
381    }
382
383    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self))]
384    pub async fn list_for_name(
385        &self,
386        name: String,
387        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
388    ) -> Result<
389        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
390        AccountSetError,
391    > {
392        self.repo
393            .list_for_name_by_created_at(name, args, Default::default())
394            .await
395    }
396
397    #[instrument(name = "cala_ledger.account_sets.list_for_name_in_op", skip(self, op))]
398    pub async fn list_for_name_in_op(
399        &self,
400        op: &mut impl es_entity::AtomicOperation,
401        name: String,
402        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
403    ) -> Result<
404        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
405        AccountSetError,
406    > {
407        self.repo
408            .list_for_name_by_created_at_in_op(op, name, args, Default::default())
409            .await
410    }
411
412    #[instrument(
413        name = "cala_ledger.account_sets.find_where_member_in_op",
414        skip(self, op)
415    )]
416    pub async fn find_where_member_in_op(
417        &self,
418        op: &mut impl es_entity::AtomicOperation,
419        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
420        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
421    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
422    {
423        match member.into() {
424            AccountSetMemberId::Account(account_id) => {
425                self.repo
426                    .find_where_account_is_member_in_op(op, account_id, query)
427                    .await
428            }
429            AccountSetMemberId::AccountSet(account_set_id) => {
430                self.repo
431                    .find_where_account_set_is_member_in_op(op, account_set_id, query)
432                    .await
433            }
434        }
435    }
436
437    pub async fn list_members_by_created_at(
438        &self,
439        id: AccountSetId,
440        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
441    ) -> Result<
442        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
443        AccountSetError,
444    > {
445        self.repo.list_children_by_created_at(id, args).await
446    }
447
448    pub async fn list_members_by_created_at_in_op(
449        &self,
450        op: &mut impl es_entity::AtomicOperation,
451        id: AccountSetId,
452        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
453    ) -> Result<
454        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
455        AccountSetError,
456    > {
457        self.repo
458            .list_children_by_created_at_in_op(op, id, args)
459            .await
460    }
461
462    pub async fn list_members_by_external_id(
463        &self,
464        id: AccountSetId,
465        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
466    ) -> Result<
467        es_entity::PaginatedQueryRet<
468            AccountSetMemberByExternalId,
469            AccountSetMembersByExternalIdCursor,
470        >,
471        AccountSetError,
472    > {
473        self.repo.list_children_by_external_id(id, args).await
474    }
475
476    pub async fn list_members_by_external_id_in_op(
477        &self,
478        op: &mut impl es_entity::AtomicOperation,
479        id: AccountSetId,
480        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
481    ) -> Result<
482        es_entity::PaginatedQueryRet<
483            AccountSetMemberByExternalId,
484            AccountSetMembersByExternalIdCursor,
485        >,
486        AccountSetError,
487    > {
488        self.repo
489            .list_children_by_external_id_in_op(op, id, args)
490            .await
491    }
492
493    pub(crate) async fn fetch_mappings_in_op(
494        &self,
495        op: &mut impl es_entity::AtomicOperation,
496        journal_id: JournalId,
497        account_ids: &[AccountId],
498    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
499        self.repo
500            .fetch_mappings_in_op(op, journal_id, account_ids)
501            .await
502    }
503
504    #[cfg(feature = "import")]
505    pub(crate) async fn sync_account_set_creation(
506        &self,
507        mut db: es_entity::DbOpWithTime<'_>,
508        origin: DataSourceId,
509        values: AccountSetValues,
510    ) -> Result<(), AccountSetError> {
511        let mut account_set = AccountSet::import(origin, values);
512        self.repo
513            .import_in_op(&mut db, origin, &mut account_set)
514            .await?;
515        db.commit().await?;
516        Ok(())
517    }
518
519    #[cfg(feature = "import")]
520    pub(crate) async fn sync_account_set_update(
521        &self,
522        mut db: es_entity::DbOpWithTime<'_>,
523        values: AccountSetValues,
524        fields: Vec<String>,
525    ) -> Result<(), AccountSetError> {
526        let mut account_set = self.repo.find_by_id_in_op(&mut db, values.id).await?;
527        let _ = account_set.update((values, fields));
528        self.repo.update_in_op(&mut db, &mut account_set).await?;
529        db.commit().await?;
530        Ok(())
531    }
532
533    #[cfg(feature = "import")]
534    pub(crate) async fn sync_account_set_member_creation(
535        &self,
536        mut db: es_entity::DbOpWithTime<'_>,
537        origin: DataSourceId,
538        account_set_id: AccountSetId,
539        member_id: AccountSetMemberId,
540    ) -> Result<(), AccountSetError> {
541        match member_id {
542            AccountSetMemberId::Account(account_id) => {
543                self.repo
544                    .import_member_account_in_op(&mut db, origin, account_set_id, account_id)
545                    .await?;
546            }
547            AccountSetMemberId::AccountSet(member_account_set_id) => {
548                self.repo
549                    .import_member_set_in_op(&mut db, origin, account_set_id, member_account_set_id)
550                    .await?;
551            }
552        }
553        db.commit().await?;
554        Ok(())
555    }
556
557    #[cfg(feature = "import")]
558    pub(crate) async fn sync_account_set_member_removal(
559        &self,
560        mut db: es_entity::DbOpWithTime<'_>,
561        origin: DataSourceId,
562        account_set_id: AccountSetId,
563        member_id: AccountSetMemberId,
564    ) -> Result<(), AccountSetError> {
565        match member_id {
566            AccountSetMemberId::Account(account_id) => {
567                self.repo
568                    .import_remove_member_account(&mut db, origin, account_set_id, account_id)
569                    .await?;
570            }
571            AccountSetMemberId::AccountSet(member_account_set_id) => {
572                self.repo
573                    .import_remove_member_set(
574                        &mut db,
575                        origin,
576                        account_set_id,
577                        member_account_set_id,
578                    )
579                    .await?;
580            }
581        }
582        db.commit().await?;
583        Ok(())
584    }
585}
586
587fn entries_for_add_balance(
588    entries: &mut Vec<NewEntry>,
589    target_account_id: AccountId,
590    balance: BalanceSnapshot,
591) {
592    use rust_decimal::Decimal;
593
594    if balance.settled.cr_balance != Decimal::ZERO {
595        let entry = NewEntry::builder()
596            .id(EntryId::new())
597            .journal_id(balance.journal_id)
598            .account_id(target_account_id)
599            .currency(balance.currency)
600            .sequence(1u32)
601            .layer(Layer::Settled)
602            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
603            .direction(DebitOrCredit::Credit)
604            .units(balance.settled.cr_balance)
605            .transaction_id(UNASSIGNED_TRANSACTION_ID)
606            .build()
607            .expect("Couldn't build entry");
608        entries.push(entry);
609    }
610    if balance.settled.dr_balance != Decimal::ZERO {
611        let entry = NewEntry::builder()
612            .id(EntryId::new())
613            .journal_id(balance.journal_id)
614            .account_id(target_account_id)
615            .currency(balance.currency)
616            .sequence(1u32)
617            .layer(Layer::Settled)
618            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
619            .direction(DebitOrCredit::Debit)
620            .units(balance.settled.dr_balance)
621            .transaction_id(UNASSIGNED_TRANSACTION_ID)
622            .build()
623            .expect("Couldn't build entry");
624        entries.push(entry);
625    }
626    if balance.pending.cr_balance != Decimal::ZERO {
627        let entry = NewEntry::builder()
628            .id(EntryId::new())
629            .journal_id(balance.journal_id)
630            .account_id(target_account_id)
631            .currency(balance.currency)
632            .sequence(1u32)
633            .layer(Layer::Pending)
634            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
635            .direction(DebitOrCredit::Credit)
636            .units(balance.pending.cr_balance)
637            .transaction_id(UNASSIGNED_TRANSACTION_ID)
638            .build()
639            .expect("Couldn't build entry");
640        entries.push(entry);
641    }
642    if balance.pending.dr_balance != Decimal::ZERO {
643        let entry = NewEntry::builder()
644            .id(EntryId::new())
645            .journal_id(balance.journal_id)
646            .account_id(target_account_id)
647            .currency(balance.currency)
648            .sequence(1u32)
649            .layer(Layer::Pending)
650            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
651            .direction(DebitOrCredit::Debit)
652            .units(balance.pending.dr_balance)
653            .transaction_id(UNASSIGNED_TRANSACTION_ID)
654            .build()
655            .expect("Couldn't build entry");
656        entries.push(entry);
657    }
658    if balance.encumbrance.cr_balance != Decimal::ZERO {
659        let entry = NewEntry::builder()
660            .id(EntryId::new())
661            .journal_id(balance.journal_id)
662            .account_id(target_account_id)
663            .currency(balance.currency)
664            .sequence(1u32)
665            .layer(Layer::Encumbrance)
666            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
667            .direction(DebitOrCredit::Credit)
668            .units(balance.encumbrance.cr_balance)
669            .transaction_id(UNASSIGNED_TRANSACTION_ID)
670            .build()
671            .expect("Couldn't build entry");
672        entries.push(entry);
673    }
674    if balance.encumbrance.dr_balance != Decimal::ZERO {
675        let entry = NewEntry::builder()
676            .id(EntryId::new())
677            .journal_id(balance.journal_id)
678            .account_id(target_account_id)
679            .currency(balance.currency)
680            .sequence(1u32)
681            .layer(Layer::Encumbrance)
682            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
683            .direction(DebitOrCredit::Debit)
684            .units(balance.encumbrance.dr_balance)
685            .transaction_id(UNASSIGNED_TRANSACTION_ID)
686            .build()
687            .expect("Couldn't build entry");
688        entries.push(entry);
689    }
690}
691
692fn entries_for_remove_balance(
693    entries: &mut Vec<NewEntry>,
694    target_account_id: AccountId,
695    balance: BalanceSnapshot,
696) {
697    use rust_decimal::Decimal;
698
699    if balance.settled.cr_balance != Decimal::ZERO {
700        let entry = NewEntry::builder()
701            .id(EntryId::new())
702            .journal_id(balance.journal_id)
703            .account_id(target_account_id)
704            .currency(balance.currency)
705            .sequence(1u32)
706            .layer(Layer::Settled)
707            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
708            .direction(DebitOrCredit::Debit)
709            .units(balance.settled.cr_balance)
710            .transaction_id(UNASSIGNED_TRANSACTION_ID)
711            .build()
712            .expect("Couldn't build entry");
713        entries.push(entry);
714    }
715    if balance.settled.dr_balance != Decimal::ZERO {
716        let entry = NewEntry::builder()
717            .id(EntryId::new())
718            .journal_id(balance.journal_id)
719            .account_id(target_account_id)
720            .currency(balance.currency)
721            .sequence(1u32)
722            .layer(Layer::Settled)
723            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
724            .direction(DebitOrCredit::Credit)
725            .units(balance.settled.dr_balance)
726            .transaction_id(UNASSIGNED_TRANSACTION_ID)
727            .build()
728            .expect("Couldn't build entry");
729        entries.push(entry);
730    }
731    if balance.pending.cr_balance != Decimal::ZERO {
732        let entry = NewEntry::builder()
733            .id(EntryId::new())
734            .journal_id(balance.journal_id)
735            .account_id(target_account_id)
736            .currency(balance.currency)
737            .sequence(1u32)
738            .layer(Layer::Pending)
739            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
740            .direction(DebitOrCredit::Debit)
741            .units(balance.pending.cr_balance)
742            .transaction_id(UNASSIGNED_TRANSACTION_ID)
743            .build()
744            .expect("Couldn't build entry");
745        entries.push(entry);
746    }
747    if balance.pending.dr_balance != Decimal::ZERO {
748        let entry = NewEntry::builder()
749            .id(EntryId::new())
750            .journal_id(balance.journal_id)
751            .account_id(target_account_id)
752            .currency(balance.currency)
753            .sequence(1u32)
754            .layer(Layer::Pending)
755            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
756            .direction(DebitOrCredit::Credit)
757            .units(balance.pending.dr_balance)
758            .transaction_id(UNASSIGNED_TRANSACTION_ID)
759            .build()
760            .expect("Couldn't build entry");
761        entries.push(entry);
762    }
763    if balance.encumbrance.cr_balance != Decimal::ZERO {
764        let entry = NewEntry::builder()
765            .id(EntryId::new())
766            .journal_id(balance.journal_id)
767            .account_id(target_account_id)
768            .currency(balance.currency)
769            .sequence(1u32)
770            .layer(Layer::Encumbrance)
771            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
772            .direction(DebitOrCredit::Debit)
773            .units(balance.encumbrance.cr_balance)
774            .transaction_id(UNASSIGNED_TRANSACTION_ID)
775            .build()
776            .expect("Couldn't build entry");
777        entries.push(entry);
778    }
779    if balance.encumbrance.dr_balance != Decimal::ZERO {
780        let entry = NewEntry::builder()
781            .id(EntryId::new())
782            .journal_id(balance.journal_id)
783            .account_id(target_account_id)
784            .currency(balance.currency)
785            .sequence(1u32)
786            .layer(Layer::Encumbrance)
787            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
788            .direction(DebitOrCredit::Credit)
789            .units(balance.encumbrance.dr_balance)
790            .transaction_id(UNASSIGNED_TRANSACTION_ID)
791            .build()
792            .expect("Couldn't build entry");
793        entries.push(entry);
794    }
795}
796
797impl From<&AccountSetEvent> for OutboxEventPayload {
798    fn from(event: &AccountSetEvent) -> Self {
799        let source = es_entity::context::EventContext::current()
800            .data()
801            .lookup("data_source")
802            .ok()
803            .flatten()
804            .unwrap_or(DataSource::Local);
805
806        match event {
807            #[cfg(feature = "import")]
808            AccountSetEvent::Imported {
809                source,
810                values: account_set,
811            } => OutboxEventPayload::AccountSetCreated {
812                source: *source,
813                account_set: account_set.clone(),
814            },
815            AccountSetEvent::Initialized {
816                values: account_set,
817            } => OutboxEventPayload::AccountSetCreated {
818                source,
819                account_set: account_set.clone(),
820            },
821            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
822                source,
823                account_set: values.clone(),
824                fields: fields.clone(),
825            },
826        }
827    }
828}