cala_ledger/account_set/
mod.rs

1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14    account::*,
15    balance::*,
16    entry::*,
17    ledger_operation::*,
18    outbox::*,
19    primitives::{DataSource, DebitOrCredit, JournalId, Layer},
20};
21
22pub use cursor::*;
23pub use entity::*;
24use error::*;
25use repo::*;
26pub use repo::{account_set_cursor::*, members_cursor::*};
27
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32    repo: AccountSetRepo,
33    accounts: Accounts,
34    entries: Entries,
35    balances: Balances,
36    outbox: Outbox,
37    pool: PgPool,
38}
39
40impl AccountSets {
41    pub(crate) fn new(
42        pool: &PgPool,
43        outbox: Outbox,
44        accounts: &Accounts,
45        entries: &Entries,
46        balances: &Balances,
47    ) -> Self {
48        Self {
49            repo: AccountSetRepo::new(pool),
50            outbox,
51            accounts: accounts.clone(),
52            entries: entries.clone(),
53            balances: balances.clone(),
54            pool: pool.clone(),
55        }
56    }
57    #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58    pub async fn create(
59        &self,
60        new_account_set: NewAccountSet,
61    ) -> Result<AccountSet, AccountSetError> {
62        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63        let account_set = self.create_in_op(&mut op, new_account_set).await?;
64        op.commit().await?;
65        Ok(account_set)
66    }
67
68    #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69    pub async fn create_in_op(
70        &self,
71        db: &mut LedgerOperation<'_>,
72        new_account_set: NewAccountSet,
73    ) -> Result<AccountSet, AccountSetError> {
74        let new_account = NewAccount::builder()
75            .id(uuid::Uuid::from(new_account_set.id))
76            .name(String::new())
77            .code(new_account_set.id.to_string())
78            .normal_balance_type(new_account_set.normal_balance_type)
79            .is_account_set(true)
80            .build()
81            .expect("Failed to build account");
82        self.accounts.create_in_op(db, new_account).await?;
83        let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84        db.accumulate(account_set.last_persisted(1).map(|p| &p.event));
85        Ok(account_set)
86    }
87
88    pub async fn create_all(
89        &self,
90        new_account_sets: Vec<NewAccountSet>,
91    ) -> Result<Vec<AccountSet>, AccountSetError> {
92        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93        let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94        op.commit().await?;
95        Ok(account_sets)
96    }
97
98    #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99    pub async fn create_all_in_op(
100        &self,
101        db: &mut LedgerOperation<'_>,
102        new_account_sets: Vec<NewAccountSet>,
103    ) -> Result<Vec<AccountSet>, AccountSetError> {
104        let mut new_accounts = Vec::new();
105        for new_account_set in new_account_sets.iter() {
106            let new_account = NewAccount::builder()
107                .id(uuid::Uuid::from(new_account_set.id))
108                .name(String::new())
109                .code(new_account_set.id.to_string())
110                .normal_balance_type(new_account_set.normal_balance_type)
111                .is_account_set(true)
112                .build()
113                .expect("Failed to build account");
114            new_accounts.push(new_account);
115        }
116        self.accounts.create_all_in_op(db, new_accounts).await?;
117        let account_sets = self
118            .repo
119            .create_all_in_op(db.op(), new_account_sets)
120            .await?;
121        db.accumulate(
122            account_sets
123                .iter()
124                .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
125        );
126        Ok(account_sets)
127    }
128
129    #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
130    pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
131        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
132        self.persist_in_op(&mut op, account_set).await?;
133        op.commit().await?;
134        Ok(())
135    }
136
137    #[instrument(
138        name = "cala_ledger.account_sets.persist_in_op",
139        skip(self, db, account_set)
140    )]
141    pub async fn persist_in_op(
142        &self,
143        db: &mut LedgerOperation<'_>,
144        account_set: &mut AccountSet,
145    ) -> Result<(), AccountSetError> {
146        let n_events = self.repo.update_in_op(db.op(), account_set).await?;
147        db.accumulate(account_set.last_persisted(n_events).map(|p| &p.event));
148        Ok(())
149    }
150
151    pub async fn add_member(
152        &self,
153        account_set_id: AccountSetId,
154        member: impl Into<AccountSetMemberId>,
155    ) -> Result<AccountSet, AccountSetError> {
156        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
157        let account_set = self
158            .add_member_in_op(&mut op, account_set_id, member)
159            .await?;
160        op.commit().await?;
161        Ok(account_set)
162    }
163
164    pub async fn add_member_in_op(
165        &self,
166        op: &mut LedgerOperation<'_>,
167        account_set_id: AccountSetId,
168        member: impl Into<AccountSetMemberId>,
169    ) -> Result<AccountSet, AccountSetError> {
170        let member = member.into();
171        let (time, parents, account_set, member_id) = match member {
172            AccountSetMemberId::Account(id) => {
173                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
174                let (time, parents) = self
175                    .repo
176                    .add_member_account_and_return_parents(op.tx(), account_set_id, id)
177                    .await?;
178                (time, parents, set, id)
179            }
180            AccountSetMemberId::AccountSet(id) => {
181                let mut accounts = self
182                    .repo
183                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
184                    .await?;
185                let target = accounts
186                    .remove(&account_set_id)
187                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
188                let member = accounts
189                    .remove(&id)
190                    .ok_or(AccountSetError::CouldNotFindById(id))?;
191
192                if target.values().journal_id != member.values().journal_id {
193                    return Err(AccountSetError::JournalIdMismatch);
194                }
195
196                let (time, parents) = self
197                    .repo
198                    .add_member_set_and_return_parents(op.tx(), account_set_id, id)
199                    .await?;
200                (time, parents, target, AccountId::from(id))
201            }
202        };
203
204        op.accumulate(std::iter::once(
205            OutboxEventPayload::AccountSetMemberCreated {
206                source: DataSource::Local,
207                account_set_id,
208                member_id: member,
209            },
210        ));
211
212        let balances = self
213            .balances
214            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
215            .await?;
216
217        let target_account_id = AccountId::from(&account_set.id());
218        let mut entries = Vec::new();
219        for balance in balances.into_values() {
220            entries_for_add_balance(&mut entries, target_account_id, balance);
221        }
222
223        if entries.is_empty() {
224            return Ok(account_set);
225        }
226        let entries = self.entries.create_all_in_op(op, entries).await?;
227        let mappings = std::iter::once((target_account_id, parents)).collect();
228        self.balances
229            .update_balances_in_op(
230                op,
231                account_set.values().journal_id,
232                entries,
233                time.date_naive(),
234                time,
235                mappings,
236            )
237            .await?;
238
239        Ok(account_set)
240    }
241
242    pub async fn remove_member(
243        &self,
244        account_set_id: AccountSetId,
245        member: impl Into<AccountSetMemberId>,
246    ) -> Result<AccountSet, AccountSetError> {
247        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
248        let account_set = self
249            .remove_member_in_op(&mut op, account_set_id, member)
250            .await?;
251        op.commit().await?;
252        Ok(account_set)
253    }
254
255    pub async fn remove_member_in_op(
256        &self,
257        op: &mut LedgerOperation<'_>,
258        account_set_id: AccountSetId,
259        member: impl Into<AccountSetMemberId>,
260    ) -> Result<AccountSet, AccountSetError> {
261        let member = member.into();
262        let (time, parents, account_set, member_id) = match member {
263            AccountSetMemberId::Account(id) => {
264                let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
265                let (time, parents) = self
266                    .repo
267                    .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
268                    .await?;
269                (time, parents, set, id)
270            }
271            AccountSetMemberId::AccountSet(id) => {
272                let mut accounts = self
273                    .repo
274                    .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
275                    .await?;
276                let target = accounts
277                    .remove(&account_set_id)
278                    .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
279                let member = accounts
280                    .remove(&id)
281                    .ok_or(AccountSetError::CouldNotFindById(id))?;
282
283                if target.values().journal_id != member.values().journal_id {
284                    return Err(AccountSetError::JournalIdMismatch);
285                }
286
287                let (time, parents) = self
288                    .repo
289                    .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
290                    .await?;
291                (time, parents, target, AccountId::from(id))
292            }
293        };
294
295        op.accumulate(std::iter::once(
296            OutboxEventPayload::AccountSetMemberRemoved {
297                source: DataSource::Local,
298                account_set_id,
299                member_id: member,
300            },
301        ));
302
303        let balances = self
304            .balances
305            .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
306            .await?;
307
308        let target_account_id = AccountId::from(&account_set.id());
309        let mut entries = Vec::new();
310        for balance in balances.into_values() {
311            entries_for_remove_balance(&mut entries, target_account_id, balance);
312        }
313
314        if entries.is_empty() {
315            return Ok(account_set);
316        }
317        let entries = self.entries.create_all_in_op(op, entries).await?;
318        let mappings = std::iter::once((target_account_id, parents)).collect();
319        self.balances
320            .update_balances_in_op(
321                op,
322                account_set.values().journal_id,
323                entries,
324                time.date_naive(),
325                time,
326                mappings,
327            )
328            .await?;
329
330        Ok(account_set)
331    }
332
333    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
334    pub async fn find_all<T: From<AccountSet>>(
335        &self,
336        account_set_ids: &[AccountSetId],
337    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
338        self.repo.find_all(account_set_ids).await
339    }
340
341    #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
342    pub async fn find_all_in_op<T: From<AccountSet>>(
343        &self,
344        op: &mut LedgerOperation<'_>,
345        account_set_ids: &[AccountSetId],
346    ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
347        self.repo.find_all_in_tx(op.tx(), account_set_ids).await
348    }
349
350    #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
351    pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
352        self.repo.find_by_id(account_set_id).await
353    }
354
355    #[instrument(
356        name = "cala_ledger.accounts_sets.find_by_external_id",
357        skip(self),
358        err
359    )]
360    pub async fn find_by_external_id(
361        &self,
362        external_id: String,
363    ) -> Result<AccountSet, AccountSetError> {
364        self.repo.find_by_external_id(Some(external_id)).await
365    }
366
367    #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
368    pub async fn find_where_member(
369        &self,
370        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
371        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
372    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
373    {
374        match member.into() {
375            AccountSetMemberId::Account(account_id) => {
376                self.repo
377                    .find_where_account_is_member(account_id, query)
378                    .await
379            }
380            AccountSetMemberId::AccountSet(account_set_id) => {
381                self.repo
382                    .find_where_account_set_is_member(account_set_id, query)
383                    .await
384            }
385        }
386    }
387
388    #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
389    pub async fn list_for_name(
390        &self,
391        name: String,
392        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
393    ) -> Result<
394        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
395        AccountSetError,
396    > {
397        self.repo
398            .list_for_name_by_created_at(name, args, Default::default())
399            .await
400    }
401
402    #[instrument(
403        name = "cala_ledger.account_sets.list_for_name_in_op",
404        skip(self, op),
405        err
406    )]
407    pub async fn list_for_name_in_op(
408        &self,
409        op: &mut LedgerOperation<'_>,
410        name: String,
411        args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
412    ) -> Result<
413        es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
414        AccountSetError,
415    > {
416        self.repo
417            .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
418            .await
419    }
420
421    #[instrument(
422        name = "cala_ledger.account_sets.find_where_member_in_op",
423        skip(self, op),
424        err
425    )]
426    pub async fn find_where_member_in_op(
427        &self,
428        op: &mut LedgerOperation<'_>,
429        member: impl Into<AccountSetMemberId> + std::fmt::Debug,
430        query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
431    ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
432    {
433        match member.into() {
434            AccountSetMemberId::Account(account_id) => {
435                self.repo
436                    .find_where_account_is_member_in_tx(op.tx(), account_id, query)
437                    .await
438            }
439            AccountSetMemberId::AccountSet(account_set_id) => {
440                self.repo
441                    .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
442                    .await
443            }
444        }
445    }
446
447    pub async fn list_members_by_created_at(
448        &self,
449        id: AccountSetId,
450        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
451    ) -> Result<
452        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
453        AccountSetError,
454    > {
455        self.repo.list_children_by_created_at(id, args).await
456    }
457
458    pub async fn list_members_by_created_at_in_op(
459        &self,
460        op: &mut LedgerOperation<'_>,
461        id: AccountSetId,
462        args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
463    ) -> Result<
464        es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
465        AccountSetError,
466    > {
467        self.repo
468            .list_children_by_created_at_in_tx(op.tx(), id, args)
469            .await
470    }
471
472    pub async fn list_members_by_external_id(
473        &self,
474        id: AccountSetId,
475        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
476    ) -> Result<
477        es_entity::PaginatedQueryRet<
478            AccountSetMemberByExternalId,
479            AccountSetMembersByExternalIdCursor,
480        >,
481        AccountSetError,
482    > {
483        self.repo.list_children_by_external_id(id, args).await
484    }
485
486    pub async fn list_members_by_external_id_in_op(
487        &self,
488        op: &mut LedgerOperation<'_>,
489        id: AccountSetId,
490        args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
491    ) -> Result<
492        es_entity::PaginatedQueryRet<
493            AccountSetMemberByExternalId,
494            AccountSetMembersByExternalIdCursor,
495        >,
496        AccountSetError,
497    > {
498        self.repo
499            .list_children_by_external_id_in_tx(op.tx(), id, args)
500            .await
501    }
502
503    pub(crate) async fn fetch_mappings_in_op(
504        &self,
505        op: &mut LedgerOperation<'_>,
506        journal_id: JournalId,
507        account_ids: &[AccountId],
508    ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
509        self.repo
510            .fetch_mappings_in_tx(op.tx(), journal_id, account_ids)
511            .await
512    }
513
514    #[cfg(feature = "import")]
515    pub(crate) async fn sync_account_set_creation(
516        &self,
517        mut db: es_entity::DbOp<'_>,
518        origin: DataSourceId,
519        values: AccountSetValues,
520    ) -> Result<(), AccountSetError> {
521        let mut account_set = AccountSet::import(origin, values);
522        self.repo
523            .import_in_op(&mut db, origin, &mut account_set)
524            .await?;
525        let recorded_at = db.now();
526        let outbox_events: Vec<_> = account_set
527            .last_persisted(1)
528            .map(|p| OutboxEventPayload::from(&p.event))
529            .collect();
530        self.outbox
531            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
532            .await?;
533        Ok(())
534    }
535
536    #[cfg(feature = "import")]
537    pub(crate) async fn sync_account_set_update(
538        &self,
539        mut db: es_entity::DbOp<'_>,
540        values: AccountSetValues,
541        fields: Vec<String>,
542    ) -> Result<(), AccountSetError> {
543        let mut account_set = self.repo.find_by_id(values.id).await?;
544        account_set.update((values, fields));
545        let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
546        let recorded_at = db.now();
547        let outbox_events: Vec<_> = account_set
548            .last_persisted(n_events)
549            .map(|p| OutboxEventPayload::from(&p.event))
550            .collect();
551        self.outbox
552            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
553            .await?;
554        Ok(())
555    }
556
557    #[cfg(feature = "import")]
558    pub(crate) async fn sync_account_set_member_creation(
559        &self,
560        mut db: es_entity::DbOp<'_>,
561        origin: DataSourceId,
562        account_set_id: AccountSetId,
563        member_id: AccountSetMemberId,
564    ) -> Result<(), AccountSetError> {
565        match member_id {
566            AccountSetMemberId::Account(account_id) => {
567                self.repo
568                    .import_member_account_in_op(&mut db, account_set_id, account_id)
569                    .await?;
570            }
571            AccountSetMemberId::AccountSet(account_set_id) => {
572                self.repo
573                    .import_member_set_in_op(&mut db, account_set_id, account_set_id)
574                    .await?;
575            }
576        }
577        let recorded_at = db.now();
578        self.outbox
579            .persist_events_at(
580                db.into_tx(),
581                std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
582                    source: DataSource::Remote { id: origin },
583                    account_set_id,
584                    member_id,
585                }),
586                recorded_at,
587            )
588            .await?;
589        Ok(())
590    }
591
592    #[cfg(feature = "import")]
593    pub(crate) async fn sync_account_set_member_removal(
594        &self,
595        mut db: es_entity::DbOp<'_>,
596        origin: DataSourceId,
597        account_set_id: AccountSetId,
598        member_id: AccountSetMemberId,
599    ) -> Result<(), AccountSetError> {
600        match member_id {
601            AccountSetMemberId::Account(account_id) => {
602                self.repo
603                    .import_remove_member_account(db.tx(), account_set_id, account_id)
604                    .await?;
605            }
606            AccountSetMemberId::AccountSet(account_set_id) => {
607                self.repo
608                    .import_remove_member_set(db.tx(), account_set_id, account_set_id)
609                    .await?;
610            }
611        }
612        let recorded_at = db.now();
613        self.outbox
614            .persist_events_at(
615                db.into_tx(),
616                std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
617                    source: DataSource::Remote { id: origin },
618                    account_set_id,
619                    member_id,
620                }),
621                recorded_at,
622            )
623            .await?;
624        Ok(())
625    }
626}
627
628fn entries_for_add_balance(
629    entries: &mut Vec<NewEntry>,
630    target_account_id: AccountId,
631    balance: BalanceSnapshot,
632) {
633    use rust_decimal::Decimal;
634
635    if balance.settled.cr_balance != Decimal::ZERO {
636        let entry = NewEntry::builder()
637            .id(EntryId::new())
638            .journal_id(balance.journal_id)
639            .account_id(target_account_id)
640            .currency(balance.currency)
641            .sequence(1u32)
642            .layer(Layer::Settled)
643            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
644            .direction(DebitOrCredit::Credit)
645            .units(balance.settled.cr_balance)
646            .transaction_id(UNASSIGNED_TRANSACTION_ID)
647            .build()
648            .expect("Couldn't build entry");
649        entries.push(entry);
650    }
651    if balance.settled.dr_balance != Decimal::ZERO {
652        let entry = NewEntry::builder()
653            .id(EntryId::new())
654            .journal_id(balance.journal_id)
655            .account_id(target_account_id)
656            .currency(balance.currency)
657            .sequence(1u32)
658            .layer(Layer::Settled)
659            .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
660            .direction(DebitOrCredit::Debit)
661            .units(balance.settled.dr_balance)
662            .transaction_id(UNASSIGNED_TRANSACTION_ID)
663            .build()
664            .expect("Couldn't build entry");
665        entries.push(entry);
666    }
667    if balance.pending.cr_balance != Decimal::ZERO {
668        let entry = NewEntry::builder()
669            .id(EntryId::new())
670            .journal_id(balance.journal_id)
671            .account_id(target_account_id)
672            .currency(balance.currency)
673            .sequence(1u32)
674            .layer(Layer::Pending)
675            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
676            .direction(DebitOrCredit::Credit)
677            .units(balance.pending.cr_balance)
678            .transaction_id(UNASSIGNED_TRANSACTION_ID)
679            .build()
680            .expect("Couldn't build entry");
681        entries.push(entry);
682    }
683    if balance.pending.dr_balance != Decimal::ZERO {
684        let entry = NewEntry::builder()
685            .id(EntryId::new())
686            .journal_id(balance.journal_id)
687            .account_id(target_account_id)
688            .currency(balance.currency)
689            .sequence(1u32)
690            .layer(Layer::Pending)
691            .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
692            .direction(DebitOrCredit::Debit)
693            .units(balance.pending.dr_balance)
694            .transaction_id(UNASSIGNED_TRANSACTION_ID)
695            .build()
696            .expect("Couldn't build entry");
697        entries.push(entry);
698    }
699    if balance.encumbrance.cr_balance != Decimal::ZERO {
700        let entry = NewEntry::builder()
701            .id(EntryId::new())
702            .journal_id(balance.journal_id)
703            .account_id(target_account_id)
704            .currency(balance.currency)
705            .sequence(1u32)
706            .layer(Layer::Encumbrance)
707            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
708            .direction(DebitOrCredit::Credit)
709            .units(balance.encumbrance.cr_balance)
710            .transaction_id(UNASSIGNED_TRANSACTION_ID)
711            .build()
712            .expect("Couldn't build entry");
713        entries.push(entry);
714    }
715    if balance.encumbrance.dr_balance != Decimal::ZERO {
716        let entry = NewEntry::builder()
717            .id(EntryId::new())
718            .journal_id(balance.journal_id)
719            .account_id(target_account_id)
720            .currency(balance.currency)
721            .sequence(1u32)
722            .layer(Layer::Encumbrance)
723            .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
724            .direction(DebitOrCredit::Debit)
725            .units(balance.encumbrance.dr_balance)
726            .transaction_id(UNASSIGNED_TRANSACTION_ID)
727            .build()
728            .expect("Couldn't build entry");
729        entries.push(entry);
730    }
731}
732
733fn entries_for_remove_balance(
734    entries: &mut Vec<NewEntry>,
735    target_account_id: AccountId,
736    balance: BalanceSnapshot,
737) {
738    use rust_decimal::Decimal;
739
740    if balance.settled.cr_balance != Decimal::ZERO {
741        let entry = NewEntry::builder()
742            .id(EntryId::new())
743            .journal_id(balance.journal_id)
744            .account_id(target_account_id)
745            .currency(balance.currency)
746            .sequence(1u32)
747            .layer(Layer::Settled)
748            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
749            .direction(DebitOrCredit::Debit)
750            .units(balance.settled.cr_balance)
751            .transaction_id(UNASSIGNED_TRANSACTION_ID)
752            .build()
753            .expect("Couldn't build entry");
754        entries.push(entry);
755    }
756    if balance.settled.dr_balance != Decimal::ZERO {
757        let entry = NewEntry::builder()
758            .id(EntryId::new())
759            .journal_id(balance.journal_id)
760            .account_id(target_account_id)
761            .currency(balance.currency)
762            .sequence(1u32)
763            .layer(Layer::Settled)
764            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
765            .direction(DebitOrCredit::Credit)
766            .units(balance.settled.dr_balance)
767            .transaction_id(UNASSIGNED_TRANSACTION_ID)
768            .build()
769            .expect("Couldn't build entry");
770        entries.push(entry);
771    }
772    if balance.pending.cr_balance != Decimal::ZERO {
773        let entry = NewEntry::builder()
774            .id(EntryId::new())
775            .journal_id(balance.journal_id)
776            .account_id(target_account_id)
777            .currency(balance.currency)
778            .sequence(1u32)
779            .layer(Layer::Pending)
780            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
781            .direction(DebitOrCredit::Debit)
782            .units(balance.pending.cr_balance)
783            .transaction_id(UNASSIGNED_TRANSACTION_ID)
784            .build()
785            .expect("Couldn't build entry");
786        entries.push(entry);
787    }
788    if balance.pending.dr_balance != Decimal::ZERO {
789        let entry = NewEntry::builder()
790            .id(EntryId::new())
791            .journal_id(balance.journal_id)
792            .account_id(target_account_id)
793            .currency(balance.currency)
794            .sequence(1u32)
795            .layer(Layer::Pending)
796            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
797            .direction(DebitOrCredit::Credit)
798            .units(balance.pending.dr_balance)
799            .transaction_id(UNASSIGNED_TRANSACTION_ID)
800            .build()
801            .expect("Couldn't build entry");
802        entries.push(entry);
803    }
804    if balance.encumbrance.cr_balance != Decimal::ZERO {
805        let entry = NewEntry::builder()
806            .id(EntryId::new())
807            .journal_id(balance.journal_id)
808            .account_id(target_account_id)
809            .currency(balance.currency)
810            .sequence(1u32)
811            .layer(Layer::Encumbrance)
812            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
813            .direction(DebitOrCredit::Debit)
814            .units(balance.encumbrance.cr_balance)
815            .transaction_id(UNASSIGNED_TRANSACTION_ID)
816            .build()
817            .expect("Couldn't build entry");
818        entries.push(entry);
819    }
820    if balance.encumbrance.dr_balance != Decimal::ZERO {
821        let entry = NewEntry::builder()
822            .id(EntryId::new())
823            .journal_id(balance.journal_id)
824            .account_id(target_account_id)
825            .currency(balance.currency)
826            .sequence(1u32)
827            .layer(Layer::Encumbrance)
828            .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
829            .direction(DebitOrCredit::Credit)
830            .units(balance.encumbrance.dr_balance)
831            .transaction_id(UNASSIGNED_TRANSACTION_ID)
832            .build()
833            .expect("Couldn't build entry");
834        entries.push(entry);
835    }
836}
837
838impl From<&AccountSetEvent> for OutboxEventPayload {
839    fn from(event: &AccountSetEvent) -> Self {
840        match event {
841            #[cfg(feature = "import")]
842            AccountSetEvent::Imported {
843                source,
844                values: account_set,
845            } => OutboxEventPayload::AccountSetCreated {
846                source: *source,
847                account_set: account_set.clone(),
848            },
849            AccountSetEvent::Initialized {
850                values: account_set,
851            } => OutboxEventPayload::AccountSetCreated {
852                source: DataSource::Local,
853                account_set: account_set.clone(),
854            },
855            AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
856                source: DataSource::Local,
857                account_set: values.clone(),
858                fields: fields.clone(),
859            },
860        }
861    }
862}