cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::clock::ClockHandle;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14    account::*,
15    balance::*,
16    entry::*,
17    outbox::*,
18    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
28
29#[derive(Clone)]
30pub struct AccountSets {
31    repo: AccountSetRepo,
32    accounts: Accounts,
33    entries: Entries,
34    balances: Balances,
35    clock: ClockHandle,
36}
37
38impl AccountSets {
39    pub(crate) fn new(
40        pool: &PgPool,
41        publisher: &OutboxPublisher,
42        accounts: &Accounts,
43        entries: &Entries,
44        balances: &Balances,
45        clock: &ClockHandle,
46    ) -> Self {
47        Self {
48            repo: AccountSetRepo::new(pool, publisher),
49            accounts: accounts.clone(),
50            entries: entries.clone(),
51            balances: balances.clone(),
52            clock: clock.clone(),
53        }
54    }
55    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
56    pub async fn create(
57        &self,
58        new_account_set: NewAccountSet,
59    ) -> Result<AccountSet, AccountSetError> {
60        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
61        let account_set = self.create_in_op(&mut op, new_account_set).await?;
62        op.commit().await?;
63        Ok(account_set)
64    }
65
66    #[instrument(name = "cala_ledger.account_sets.create_in_op", skip(self, db))]
67    pub async fn create_in_op(
68        &self,
69        db: &mut impl es_entity::AtomicOperation,
70        new_account_set: NewAccountSet,
71    ) -> Result<AccountSet, AccountSetError> {
72        let new_account = NewAccount::builder()
73            .id(new_account_set.id)
74            .name(String::new())
75            .code(new_account_set.id.to_string())
76            .normal_balance_type(new_account_set.normal_balance_type)
77            .is_account_set(true)
78            .velocity_context_values(new_account_set.context_values())
79            .build()
80            .expect("Failed to build account");
81        self.accounts.create_in_op(db, new_account).await?;
82
83        let account_set = self.repo.create_in_op(db, new_account_set).await?;
84
85        Ok(account_set)
86    }
87
88    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, new_account_sets), fields(count = new_account_sets.len()))]
89    pub async fn create_all(
90        &self,
91        new_account_sets: Vec<NewAccountSet>,
92    ) -> Result<Vec<AccountSet>, AccountSetError> {
93        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
94        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
95        op.commit().await?;
96        Ok(account_sets)
97    }
98
99    #[instrument(name = "cala_ledger.account_sets.create_all_in_op", skip(self, db))]
100    pub async fn create_all_in_op(
101        &self,
102        db: &mut impl es_entity::AtomicOperation,
103        new_account_sets: Vec<NewAccountSet>,
104    ) -> Result<Vec<AccountSet>, AccountSetError> {
105        let mut new_accounts = Vec::new();
106        for new_account_set in new_account_sets.iter() {
107            let new_account = NewAccount::builder()
108                .id(new_account_set.id)
109                .name(String::new())
110                .code(new_account_set.id.to_string())
111                .normal_balance_type(new_account_set.normal_balance_type)
112                .is_account_set(true)
113                .velocity_context_values(new_account_set.context_values())
114                .build()
115                .expect("Failed to build account");
116            new_accounts.push(new_account);
117        }
118        self.accounts.create_all_in_op(db, new_accounts).await?;
119
120        let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
121
122        Ok(account_sets)
123    }
124
125    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
126    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
127        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
128        self.persist_in_op(&mut op, account_set).await?;
129        op.commit().await?;
130        Ok(())
131    }
132
133    #[instrument(
134        name = "cala_ledger.account_sets.persist_in_op",
135        skip(self, db, account_set)
136    )]
137    pub async fn persist_in_op(
138        &self,
139        db: &mut impl es_entity::AtomicOperation,
140        account_set: &mut AccountSet,
141    ) -> Result<(), AccountSetError> {
142        self.repo.update_in_op(db, account_set).await?;
143
144        self.accounts
145            .update_velocity_context_values_in_op(db, account_set.values())
146            .await?;
147
148        Ok(())
149    }
150
151    #[instrument(name = "cala_ledger.account_sets.add_member", skip(self, member), fields(account_set_id = %account_set_id))]
152    pub async fn add_member(
153        &self,
154        account_set_id: AccountSetId,
155        member: impl Into<AccountSetMemberId>,
156    ) -> Result<AccountSet, AccountSetError> {
157        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
158        let account_set = self
159            .add_member_in_op(&mut op, account_set_id, member)
160            .await?;
161        op.commit().await?;
162        Ok(account_set)
163    }
164
165    #[instrument(name = "cala_ledger.account_sets.add_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id, is_account = tracing::field::Empty, is_account_set = tracing::field::Empty, member_id = tracing::field::Empty))]
166    pub async fn add_member_in_op(
167        &self,
168        op: &mut impl es_entity::AtomicOperation,
169        account_set_id: AccountSetId,
170        member: impl Into<AccountSetMemberId>,
171    ) -> Result<AccountSet, AccountSetError> {
172        let member = member.into();
173        let (time, parents, account_set, member_id) = match member {
174            AccountSetMemberId::Account(id) => {
175                tracing::Span::current().record("is_account", true);
176                tracing::Span::current().record("is_account_set", false);
177                tracing::Span::current().record("member_id", tracing::field::display(&id));
178                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
179                let (time, parents) = self
180                    .repo
181                    .add_member_account_and_return_parents(&mut *op, account_set_id, id)
182                    .await?;
183                (time, parents, set, id)
184            }
185            AccountSetMemberId::AccountSet(id) => {
186                tracing::Span::current().record("is_account", false);
187                tracing::Span::current().record("is_account_set", true);
188                tracing::Span::current().record("member_id", tracing::field::display(&id));
189                let mut accounts = self
190                    .repo
191                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
192                    .await?;
193                let target = accounts
194                    .remove(&account_set_id)
195                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
196                let member = accounts
197                    .remove(&id)
198                    .ok_or(AccountSetError::CouldNotFindById(id))?;
199
200                if target.values().journal_id != member.values().journal_id {
201                    return Err(AccountSetError::JournalIdMismatch);
202                }
203
204                let (time, parents) = self
205                    .repo
206                    .add_member_set_and_return_parents(op, account_set_id, id)
207                    .await?;
208                (time, parents, target, AccountId::from(id))
209            }
210        };
211
212        let balances = self
213            .balances
214            .find_balances_for_update(op, account_set.values().journal_id, member_id)
215            .await?;
216
217        let target_account_id = AccountId::from(&account_set.id());
218        let mut entries = Vec::new();
219        for balance in balances.into_values() {
220            entries_for_add_balance(&mut entries, target_account_id, balance);
221        }
222
223        if entries.is_empty() {
224            return Ok(account_set);
225        }
226        let entries = self.entries.create_all_in_op(op, entries).await?;
227        let mappings = std::iter::once((target_account_id, parents)).collect();
228        self.balances
229            .update_balances_in_op(
230                op,
231                account_set.values().journal_id,
232                entries,
233                time.date_naive(),
234                time,
235                mappings,
236            )
237            .await?;
238
239        Ok(account_set)
240    }
241
242    #[instrument(name = "cala_ledger.account_sets.remove_member", skip(self, member), fields(account_set_id = %account_set_id))]
243    pub async fn remove_member(
244        &self,
245        account_set_id: AccountSetId,
246        member: impl Into<AccountSetMemberId>,
247    ) -> Result<AccountSet, AccountSetError> {
248        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
249        let account_set = self
250            .remove_member_in_op(&mut op, account_set_id, member)
251            .await?;
252        op.commit().await?;
253        Ok(account_set)
254    }
255
256    #[instrument(name = "cala_ledger.account_sets.remove_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id))]
257    pub async fn remove_member_in_op(
258        &self,
259        op: &mut impl es_entity::AtomicOperation,
260        account_set_id: AccountSetId,
261        member: impl Into<AccountSetMemberId>,
262    ) -> Result<AccountSet, AccountSetError> {
263        let member = member.into();
264        let (time, parents, account_set, member_id) = match member {
265            AccountSetMemberId::Account(id) => {
266                let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
267                let (time, parents) = self
268                    .repo
269                    .remove_member_account_and_return_parents(op, account_set_id, id)
270                    .await?;
271                (time, parents, set, id)
272            }
273            AccountSetMemberId::AccountSet(id) => {
274                let mut accounts = self
275                    .repo
276                    .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
277                    .await?;
278                let target = accounts
279                    .remove(&account_set_id)
280                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
281                let member = accounts
282                    .remove(&id)
283                    .ok_or(AccountSetError::CouldNotFindById(id))?;
284
285                if target.values().journal_id != member.values().journal_id {
286                    return Err(AccountSetError::JournalIdMismatch);
287                }
288
289                let (time, parents) = self
290                    .repo
291                    .remove_member_set_and_return_parents(op, account_set_id, id)
292                    .await?;
293                (time, parents, target, AccountId::from(id))
294            }
295        };
296
297        let balances = self
298            .balances
299            .find_balances_for_update(op, account_set.values().journal_id, member_id)
300            .await?;
301
302        let target_account_id = AccountId::from(&account_set.id());
303        let mut entries = Vec::new();
304        for balance in balances.into_values() {
305            entries_for_remove_balance(&mut entries, target_account_id, balance);
306        }
307
308        if entries.is_empty() {
309            return Ok(account_set);
310        }
311        let entries = self.entries.create_all_in_op(op, entries).await?;
312        let mappings = std::iter::once((target_account_id, parents)).collect();
313        self.balances
314            .update_balances_in_op(
315                op,
316                account_set.values().journal_id,
317                entries,
318                time.date_naive(),
319                time,
320                mappings,
321            )
322            .await?;
323
324        Ok(account_set)
325    }
326
327    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self))]
328    pub async fn find_all<T: From<AccountSet>>(
329        &self,
330        account_set_ids: &[AccountSetId],
331    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
332        self.repo.find_all(account_set_ids).await
333    }
334
335    #[instrument(name = "cala_ledger.account_sets.find_all_in_op", skip(self, op))]
336    pub async fn find_all_in_op<T: From<AccountSet>>(
337        &self,
338        op: &mut impl es_entity::AtomicOperation,
339        account_set_ids: &[AccountSetId],
340    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
341        self.repo.find_all_in_op(op, account_set_ids).await
342    }
343
344    #[instrument(name = "cala_ledger.account_sets.find", skip(self))]
345    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
346        self.repo.find_by_id(account_set_id).await
347    }
348
349    #[instrument(name = "cala_ledger.account_sets.find_in_op", skip(self, op))]
350    pub async fn find_in_op(
351        &self,
352        op: &mut impl es_entity::AtomicOperation,
353        account_set_id: AccountSetId,
354    ) -> Result<AccountSet, AccountSetError> {
355        self.repo.find_by_id_in_op(op, account_set_id).await
356    }
357
358    #[instrument(name = "cala_ledger.accounts_sets.find_by_external_id", skip(self))]
359    pub async fn find_by_external_id(
360        &self,
361        external_id: String,
362    ) -> Result<AccountSet, AccountSetError> {
363        self.repo.find_by_external_id(Some(external_id)).await
364    }
365
366    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self))]
367    pub async fn find_where_member(
368        &self,
369        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
370        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
371    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
372    {
373        match member.into() {
374            AccountSetMemberId::Account(account_id) => {
375                self.repo
376                    .find_where_account_is_member(account_id, query)
377                    .await
378            }
379            AccountSetMemberId::AccountSet(account_set_id) => {
380                self.repo
381                    .find_where_account_set_is_member(account_set_id, query)
382                    .await
383            }
384        }
385    }
386
387    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self))]
388    pub async fn list_for_name(
389        &self,
390        name: String,
391        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
392    ) -> Result<
393        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
394        AccountSetError,
395    > {
396        self.repo
397            .list_for_name_by_created_at(name, args, Default::default())
398            .await
399    }
400
401    #[instrument(name = "cala_ledger.account_sets.list_for_name_in_op", skip(self, op))]
402    pub async fn list_for_name_in_op(
403        &self,
404        op: &mut impl es_entity::AtomicOperation,
405        name: String,
406        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
407    ) -> Result<
408        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
409        AccountSetError,
410    > {
411        self.repo
412            .list_for_name_by_created_at_in_op(op, name, args, Default::default())
413            .await
414    }
415
416    #[instrument(
417        name = "cala_ledger.account_sets.find_where_member_in_op",
418        skip(self, op)
419    )]
420    pub async fn find_where_member_in_op(
421        &self,
422        op: &mut impl es_entity::AtomicOperation,
423        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
424        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
425    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
426    {
427        match member.into() {
428            AccountSetMemberId::Account(account_id) => {
429                self.repo
430                    .find_where_account_is_member_in_op(op, account_id, query)
431                    .await
432            }
433            AccountSetMemberId::AccountSet(account_set_id) => {
434                self.repo
435                    .find_where_account_set_is_member_in_op(op, account_set_id, query)
436                    .await
437            }
438        }
439    }
440
441    pub async fn list_members_by_created_at(
442        &self,
443        id: AccountSetId,
444        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
445    ) -> Result<
446        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
447        AccountSetError,
448    > {
449        self.repo.list_children_by_created_at(id, args).await
450    }
451
452    pub async fn list_members_by_created_at_in_op(
453        &self,
454        op: &mut impl es_entity::AtomicOperation,
455        id: AccountSetId,
456        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
457    ) -> Result<
458        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
459        AccountSetError,
460    > {
461        self.repo
462            .list_children_by_created_at_in_op(op, id, args)
463            .await
464    }
465
466    pub async fn list_members_by_external_id(
467        &self,
468        id: AccountSetId,
469        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
470    ) -> Result<
471        es_entity::PaginatedQueryRet<
472            AccountSetMemberByExternalId,
473            AccountSetMembersByExternalIdCursor,
474        >,
475        AccountSetError,
476    > {
477        self.repo.list_children_by_external_id(id, args).await
478    }
479
480    pub async fn list_members_by_external_id_in_op(
481        &self,
482        op: &mut impl es_entity::AtomicOperation,
483        id: AccountSetId,
484        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
485    ) -> Result<
486        es_entity::PaginatedQueryRet<
487            AccountSetMemberByExternalId,
488            AccountSetMembersByExternalIdCursor,
489        >,
490        AccountSetError,
491    > {
492        self.repo
493            .list_children_by_external_id_in_op(op, id, args)
494            .await
495    }
496
497    pub(crate) async fn fetch_mappings_in_op(
498        &self,
499        op: &mut impl es_entity::AtomicOperation,
500        journal_id: JournalId,
501        account_ids: &[AccountId],
502    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
503        self.repo
504            .fetch_mappings_in_op(op, journal_id, account_ids)
505            .await
506    }
507
508    #[cfg(feature = "import")]
509    pub(crate) async fn sync_account_set_creation(
510        &self,
511        mut db: es_entity::DbOpWithTime<'_>,
512        origin: DataSourceId,
513        values: AccountSetValues,
514    ) -> Result<(), AccountSetError> {
515        let mut account_set = AccountSet::import(origin, values);
516        self.repo
517            .import_in_op(&mut db, origin, &mut account_set)
518            .await?;
519        db.commit().await?;
520        Ok(())
521    }
522
523    #[cfg(feature = "import")]
524    pub(crate) async fn sync_account_set_update(
525        &self,
526        mut db: es_entity::DbOpWithTime<'_>,
527        values: AccountSetValues,
528        fields: Vec<String>,
529    ) -> Result<(), AccountSetError> {
530        let mut account_set = self.repo.find_by_id_in_op(&mut db, values.id).await?;
531        let _ = account_set.update((values, fields));
532        self.repo.update_in_op(&mut db, &mut account_set).await?;
533        db.commit().await?;
534        Ok(())
535    }
536
537    #[cfg(feature = "import")]
538    pub(crate) async fn sync_account_set_member_creation(
539        &self,
540        mut db: es_entity::DbOpWithTime<'_>,
541        origin: DataSourceId,
542        account_set_id: AccountSetId,
543        member_id: AccountSetMemberId,
544    ) -> Result<(), AccountSetError> {
545        match member_id {
546            AccountSetMemberId::Account(account_id) => {
547                self.repo
548                    .import_member_account_in_op(&mut db, origin, account_set_id, account_id)
549                    .await?;
550            }
551            AccountSetMemberId::AccountSet(member_account_set_id) => {
552                self.repo
553                    .import_member_set_in_op(&mut db, origin, account_set_id, member_account_set_id)
554                    .await?;
555            }
556        }
557        db.commit().await?;
558        Ok(())
559    }
560
561    #[cfg(feature = "import")]
562    pub(crate) async fn sync_account_set_member_removal(
563        &self,
564        mut db: es_entity::DbOpWithTime<'_>,
565        origin: DataSourceId,
566        account_set_id: AccountSetId,
567        member_id: AccountSetMemberId,
568    ) -> Result<(), AccountSetError> {
569        match member_id {
570            AccountSetMemberId::Account(account_id) => {
571                self.repo
572                    .import_remove_member_account(&mut db, origin, account_set_id, account_id)
573                    .await?;
574            }
575            AccountSetMemberId::AccountSet(member_account_set_id) => {
576                self.repo
577                    .import_remove_member_set(
578                        &mut db,
579                        origin,
580                        account_set_id,
581                        member_account_set_id,
582                    )
583                    .await?;
584            }
585        }
586        db.commit().await?;
587        Ok(())
588    }
589}
590
591fn entries_for_add_balance(
592    entries: &mut Vec<NewEntry>,
593    target_account_id: AccountId,
594    balance: BalanceSnapshot,
595) {
596    use rust_decimal::Decimal;
597
598    if balance.settled.cr_balance != Decimal::ZERO {
599        let entry = NewEntry::builder()
600            .id(EntryId::new())
601            .journal_id(balance.journal_id)
602            .account_id(target_account_id)
603            .currency(balance.currency)
604            .sequence(1u32)
605            .layer(Layer::Settled)
606            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
607            .direction(DebitOrCredit::Credit)
608            .units(balance.settled.cr_balance)
609            .transaction_id(UNASSIGNED_TRANSACTION_ID)
610            .build()
611            .expect("Couldn't build entry");
612        entries.push(entry);
613    }
614    if balance.settled.dr_balance != Decimal::ZERO {
615        let entry = NewEntry::builder()
616            .id(EntryId::new())
617            .journal_id(balance.journal_id)
618            .account_id(target_account_id)
619            .currency(balance.currency)
620            .sequence(1u32)
621            .layer(Layer::Settled)
622            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
623            .direction(DebitOrCredit::Debit)
624            .units(balance.settled.dr_balance)
625            .transaction_id(UNASSIGNED_TRANSACTION_ID)
626            .build()
627            .expect("Couldn't build entry");
628        entries.push(entry);
629    }
630    if balance.pending.cr_balance != Decimal::ZERO {
631        let entry = NewEntry::builder()
632            .id(EntryId::new())
633            .journal_id(balance.journal_id)
634            .account_id(target_account_id)
635            .currency(balance.currency)
636            .sequence(1u32)
637            .layer(Layer::Pending)
638            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
639            .direction(DebitOrCredit::Credit)
640            .units(balance.pending.cr_balance)
641            .transaction_id(UNASSIGNED_TRANSACTION_ID)
642            .build()
643            .expect("Couldn't build entry");
644        entries.push(entry);
645    }
646    if balance.pending.dr_balance != Decimal::ZERO {
647        let entry = NewEntry::builder()
648            .id(EntryId::new())
649            .journal_id(balance.journal_id)
650            .account_id(target_account_id)
651            .currency(balance.currency)
652            .sequence(1u32)
653            .layer(Layer::Pending)
654            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
655            .direction(DebitOrCredit::Debit)
656            .units(balance.pending.dr_balance)
657            .transaction_id(UNASSIGNED_TRANSACTION_ID)
658            .build()
659            .expect("Couldn't build entry");
660        entries.push(entry);
661    }
662    if balance.encumbrance.cr_balance != Decimal::ZERO {
663        let entry = NewEntry::builder()
664            .id(EntryId::new())
665            .journal_id(balance.journal_id)
666            .account_id(target_account_id)
667            .currency(balance.currency)
668            .sequence(1u32)
669            .layer(Layer::Encumbrance)
670            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
671            .direction(DebitOrCredit::Credit)
672            .units(balance.encumbrance.cr_balance)
673            .transaction_id(UNASSIGNED_TRANSACTION_ID)
674            .build()
675            .expect("Couldn't build entry");
676        entries.push(entry);
677    }
678    if balance.encumbrance.dr_balance != Decimal::ZERO {
679        let entry = NewEntry::builder()
680            .id(EntryId::new())
681            .journal_id(balance.journal_id)
682            .account_id(target_account_id)
683            .currency(balance.currency)
684            .sequence(1u32)
685            .layer(Layer::Encumbrance)
686            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
687            .direction(DebitOrCredit::Debit)
688            .units(balance.encumbrance.dr_balance)
689            .transaction_id(UNASSIGNED_TRANSACTION_ID)
690            .build()
691            .expect("Couldn't build entry");
692        entries.push(entry);
693    }
694}
695
696fn entries_for_remove_balance(
697    entries: &mut Vec<NewEntry>,
698    target_account_id: AccountId,
699    balance: BalanceSnapshot,
700) {
701    use rust_decimal::Decimal;
702
703    if balance.settled.cr_balance != Decimal::ZERO {
704        let entry = NewEntry::builder()
705            .id(EntryId::new())
706            .journal_id(balance.journal_id)
707            .account_id(target_account_id)
708            .currency(balance.currency)
709            .sequence(1u32)
710            .layer(Layer::Settled)
711            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
712            .direction(DebitOrCredit::Debit)
713            .units(balance.settled.cr_balance)
714            .transaction_id(UNASSIGNED_TRANSACTION_ID)
715            .build()
716            .expect("Couldn't build entry");
717        entries.push(entry);
718    }
719    if balance.settled.dr_balance != Decimal::ZERO {
720        let entry = NewEntry::builder()
721            .id(EntryId::new())
722            .journal_id(balance.journal_id)
723            .account_id(target_account_id)
724            .currency(balance.currency)
725            .sequence(1u32)
726            .layer(Layer::Settled)
727            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
728            .direction(DebitOrCredit::Credit)
729            .units(balance.settled.dr_balance)
730            .transaction_id(UNASSIGNED_TRANSACTION_ID)
731            .build()
732            .expect("Couldn't build entry");
733        entries.push(entry);
734    }
735    if balance.pending.cr_balance != Decimal::ZERO {
736        let entry = NewEntry::builder()
737            .id(EntryId::new())
738            .journal_id(balance.journal_id)
739            .account_id(target_account_id)
740            .currency(balance.currency)
741            .sequence(1u32)
742            .layer(Layer::Pending)
743            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
744            .direction(DebitOrCredit::Debit)
745            .units(balance.pending.cr_balance)
746            .transaction_id(UNASSIGNED_TRANSACTION_ID)
747            .build()
748            .expect("Couldn't build entry");
749        entries.push(entry);
750    }
751    if balance.pending.dr_balance != Decimal::ZERO {
752        let entry = NewEntry::builder()
753            .id(EntryId::new())
754            .journal_id(balance.journal_id)
755            .account_id(target_account_id)
756            .currency(balance.currency)
757            .sequence(1u32)
758            .layer(Layer::Pending)
759            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
760            .direction(DebitOrCredit::Credit)
761            .units(balance.pending.dr_balance)
762            .transaction_id(UNASSIGNED_TRANSACTION_ID)
763            .build()
764            .expect("Couldn't build entry");
765        entries.push(entry);
766    }
767    if balance.encumbrance.cr_balance != Decimal::ZERO {
768        let entry = NewEntry::builder()
769            .id(EntryId::new())
770            .journal_id(balance.journal_id)
771            .account_id(target_account_id)
772            .currency(balance.currency)
773            .sequence(1u32)
774            .layer(Layer::Encumbrance)
775            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
776            .direction(DebitOrCredit::Debit)
777            .units(balance.encumbrance.cr_balance)
778            .transaction_id(UNASSIGNED_TRANSACTION_ID)
779            .build()
780            .expect("Couldn't build entry");
781        entries.push(entry);
782    }
783    if balance.encumbrance.dr_balance != Decimal::ZERO {
784        let entry = NewEntry::builder()
785            .id(EntryId::new())
786            .journal_id(balance.journal_id)
787            .account_id(target_account_id)
788            .currency(balance.currency)
789            .sequence(1u32)
790            .layer(Layer::Encumbrance)
791            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
792            .direction(DebitOrCredit::Credit)
793            .units(balance.encumbrance.dr_balance)
794            .transaction_id(UNASSIGNED_TRANSACTION_ID)
795            .build()
796            .expect("Couldn't build entry");
797        entries.push(entry);
798    }
799}
800
801impl From<&AccountSetEvent> for OutboxEventPayload {
802    fn from(event: &AccountSetEvent) -> Self {
803        let source = es_entity::context::EventContext::current()
804            .data()
805            .lookup("data_source")
806            .ok()
807            .flatten()
808            .unwrap_or(DataSource::Local);
809
810        match event {
811            #[cfg(feature = "import")]
812            AccountSetEvent::Imported {
813                source,
814                values: account_set,
815            } => OutboxEventPayload::AccountSetCreated {
816                source: *source,
817                account_set: account_set.clone(),
818            },
819            AccountSetEvent::Initialized {
820                values: account_set,
821            } => OutboxEventPayload::AccountSetCreated {
822                source,
823                account_set: account_set.clone(),
824            },
825            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
826                source,
827                account_set: values.clone(),
828                fields: fields.clone(),
829            },
830        }
831    }
832}