1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13 account::*,
14 balance::*,
15 entry::*,
16 ledger_operation::*,
17 outbox::*,
18 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27#[allow(dead_code)]
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32 repo: AccountSetRepo,
33 accounts: Accounts,
34 entries: Entries,
35 balances: Balances,
36 outbox: Outbox,
37 pool: PgPool,
38}
39
40impl AccountSets {
41 pub(crate) fn new(
42 pool: &PgPool,
43 outbox: Outbox,
44 accounts: &Accounts,
45 entries: &Entries,
46 balances: &Balances,
47 ) -> Self {
48 Self {
49 repo: AccountSetRepo::new(pool),
50 outbox,
51 accounts: accounts.clone(),
52 entries: entries.clone(),
53 balances: balances.clone(),
54 pool: pool.clone(),
55 }
56 }
57 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58 pub async fn create(
59 &self,
60 new_account_set: NewAccountSet,
61 ) -> Result<AccountSet, AccountSetError> {
62 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63 let account_set = self.create_in_op(&mut op, new_account_set).await?;
64 op.commit().await?;
65 Ok(account_set)
66 }
67
68 #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69 pub async fn create_in_op(
70 &self,
71 db: &mut LedgerOperation<'_>,
72 new_account_set: NewAccountSet,
73 ) -> Result<AccountSet, AccountSetError> {
74 let new_account = NewAccount::builder()
75 .id(uuid::Uuid::from(new_account_set.id))
76 .name(String::new())
77 .code(new_account_set.id.to_string())
78 .normal_balance_type(new_account_set.normal_balance_type)
79 .is_account_set(true)
80 .build()
81 .expect("Failed to build account");
82 self.accounts.create_in_op(db, new_account).await?;
83 let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84 db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
85 Ok(account_set)
86 }
87
88 pub async fn create_all(
89 &self,
90 new_account_sets: Vec<NewAccountSet>,
91 ) -> Result<Vec<AccountSet>, AccountSetError> {
92 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94 op.commit().await?;
95 Ok(account_sets)
96 }
97
98 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99 pub async fn create_all_in_op(
100 &self,
101 db: &mut LedgerOperation<'_>,
102 new_account_sets: Vec<NewAccountSet>,
103 ) -> Result<Vec<AccountSet>, AccountSetError> {
104 let mut new_accounts = Vec::new();
105 for new_account_set in new_account_sets.iter() {
106 let new_account = NewAccount::builder()
107 .id(uuid::Uuid::from(new_account_set.id))
108 .name(String::new())
109 .code(new_account_set.id.to_string())
110 .normal_balance_type(new_account_set.normal_balance_type)
111 .is_account_set(true)
112 .build()
113 .expect("Failed to build account");
114 new_accounts.push(new_account);
115 }
116 self.accounts.create_all_in_op(db, new_accounts).await?;
117 let account_sets = self
118 .repo
119 .create_all_in_op(db.op(), new_account_sets)
120 .await?;
121 db.accumulate(
122 account_sets
123 .iter()
124 .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
125 );
126 Ok(account_sets)
127 }
128
129 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
130 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
131 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
132 self.persist_in_op(&mut op, account_set).await?;
133 op.commit().await?;
134 Ok(())
135 }
136
137 #[instrument(
138 name = "cala_ledger.account_sets.persist_in_op",
139 skip(self, db, account_set)
140 )]
141 pub async fn persist_in_op(
142 &self,
143 db: &mut LedgerOperation<'_>,
144 account_set: &mut AccountSet,
145 ) -> Result<(), AccountSetError> {
146 let n_events = self.repo.update_in_op(db.op(), account_set).await?;
147 db.accumulate(
148 account_set
149 .events
150 .last_persisted(n_events)
151 .map(|p| &p.event),
152 );
153 Ok(())
154 }
155
156 pub async fn add_member(
157 &self,
158 account_set_id: AccountSetId,
159 member: impl Into<AccountSetMemberId>,
160 ) -> Result<AccountSet, AccountSetError> {
161 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
162 let account_set = self
163 .add_member_in_op(&mut op, account_set_id, member)
164 .await?;
165 op.commit().await?;
166 Ok(account_set)
167 }
168
169 pub async fn add_member_in_op(
170 &self,
171 op: &mut LedgerOperation<'_>,
172 account_set_id: AccountSetId,
173 member: impl Into<AccountSetMemberId>,
174 ) -> Result<AccountSet, AccountSetError> {
175 let member = member.into();
176 let (time, parents, account_set, member_id) = match member {
177 AccountSetMemberId::Account(id) => {
178 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
179 let (time, parents) = self
180 .repo
181 .add_member_account_and_return_parents(op.tx(), account_set_id, id)
182 .await?;
183 (time, parents, set, id)
184 }
185 AccountSetMemberId::AccountSet(id) => {
186 let mut accounts = self
187 .repo
188 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
189 .await?;
190 let target = accounts
191 .remove(&account_set_id)
192 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
193 let member = accounts
194 .remove(&id)
195 .ok_or(AccountSetError::CouldNotFindById(id))?;
196
197 if target.values().journal_id != member.values().journal_id {
198 return Err(AccountSetError::JournalIdMismatch);
199 }
200
201 let (time, parents) = self
202 .repo
203 .add_member_set_and_return_parents(op.tx(), account_set_id, id)
204 .await?;
205 (time, parents, target, AccountId::from(id))
206 }
207 };
208
209 op.accumulate(std::iter::once(
210 OutboxEventPayload::AccountSetMemberCreated {
211 source: DataSource::Local,
212 account_set_id,
213 member_id: member,
214 },
215 ));
216
217 let balances = self
218 .balances
219 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
220 .await?;
221
222 let target_account_id = AccountId::from(&account_set.id());
223 let mut entries = Vec::new();
224 for balance in balances.into_values() {
225 entries_for_add_balance(&mut entries, target_account_id, balance);
226 }
227
228 if entries.is_empty() {
229 return Ok(account_set);
230 }
231 let entries = self.entries.create_all_in_op(op, entries).await?;
232 let mappings = std::iter::once((target_account_id, parents)).collect();
233 self.balances
234 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
235 .await?;
236
237 Ok(account_set)
238 }
239
240 pub async fn remove_member(
241 &self,
242 account_set_id: AccountSetId,
243 member: impl Into<AccountSetMemberId>,
244 ) -> Result<AccountSet, AccountSetError> {
245 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
246 let account_set = self
247 .remove_member_in_op(&mut op, account_set_id, member)
248 .await?;
249 op.commit().await?;
250 Ok(account_set)
251 }
252
253 pub async fn remove_member_in_op(
254 &self,
255 op: &mut LedgerOperation<'_>,
256 account_set_id: AccountSetId,
257 member: impl Into<AccountSetMemberId>,
258 ) -> Result<AccountSet, AccountSetError> {
259 let member = member.into();
260 let (time, parents, account_set, member_id) = match member {
261 AccountSetMemberId::Account(id) => {
262 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
263 let (time, parents) = self
264 .repo
265 .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
266 .await?;
267 (time, parents, set, id)
268 }
269 AccountSetMemberId::AccountSet(id) => {
270 let mut accounts = self
271 .repo
272 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
273 .await?;
274 let target = accounts
275 .remove(&account_set_id)
276 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
277 let member = accounts
278 .remove(&id)
279 .ok_or(AccountSetError::CouldNotFindById(id))?;
280
281 if target.values().journal_id != member.values().journal_id {
282 return Err(AccountSetError::JournalIdMismatch);
283 }
284
285 let (time, parents) = self
286 .repo
287 .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
288 .await?;
289 (time, parents, target, AccountId::from(id))
290 }
291 };
292
293 op.accumulate(std::iter::once(
294 OutboxEventPayload::AccountSetMemberRemoved {
295 source: DataSource::Local,
296 account_set_id,
297 member_id: member,
298 },
299 ));
300
301 let balances = self
302 .balances
303 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
304 .await?;
305
306 let target_account_id = AccountId::from(&account_set.id());
307 let mut entries = Vec::new();
308 for balance in balances.into_values() {
309 entries_for_remove_balance(&mut entries, target_account_id, balance);
310 }
311
312 if entries.is_empty() {
313 return Ok(account_set);
314 }
315 let entries = self.entries.create_all_in_op(op, entries).await?;
316 let mappings = std::iter::once((target_account_id, parents)).collect();
317 self.balances
318 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
319 .await?;
320
321 Ok(account_set)
322 }
323
324 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
325 pub async fn find_all<T: From<AccountSet>>(
326 &self,
327 account_set_ids: &[AccountSetId],
328 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
329 self.repo.find_all(account_set_ids).await
330 }
331
332 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
333 pub async fn find_all_in_op<T: From<AccountSet>>(
334 &self,
335 op: &mut LedgerOperation<'_>,
336 account_set_ids: &[AccountSetId],
337 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
338 self.repo.find_all_in_tx(op.tx(), account_set_ids).await
339 }
340
341 #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
342 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
343 self.repo.find_by_id(account_set_id).await
344 }
345
346 #[instrument(
347 name = "cala_ledger.accounts_sets.find_by_external_id",
348 skip(self),
349 err
350 )]
351 pub async fn find_by_external_id(
352 &self,
353 external_id: String,
354 ) -> Result<AccountSet, AccountSetError> {
355 self.repo.find_by_external_id(Some(external_id)).await
356 }
357
358 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
359 pub async fn find_where_member(
360 &self,
361 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
362 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
363 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
364 {
365 match member.into() {
366 AccountSetMemberId::Account(account_id) => {
367 self.repo
368 .find_where_account_is_member(account_id, query)
369 .await
370 }
371 AccountSetMemberId::AccountSet(account_set_id) => {
372 self.repo
373 .find_where_account_set_is_member(account_set_id, query)
374 .await
375 }
376 }
377 }
378
379 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
380 pub async fn list_for_name(
381 &self,
382 name: String,
383 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
384 ) -> Result<
385 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
386 AccountSetError,
387 > {
388 self.repo
389 .list_for_name_by_created_at(name, args, Default::default())
390 .await
391 }
392
393 #[instrument(
394 name = "cala_ledger.account_sets.list_for_name_in_op",
395 skip(self, op),
396 err
397 )]
398 pub async fn list_for_name_in_op(
399 &self,
400 op: &mut LedgerOperation<'_>,
401 name: String,
402 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
403 ) -> Result<
404 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
405 AccountSetError,
406 > {
407 self.repo
408 .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
409 .await
410 }
411
412 #[instrument(
413 name = "cala_ledger.account_sets.find_where_member_in_op",
414 skip(self, op),
415 err
416 )]
417 pub async fn find_where_member_in_op(
418 &self,
419 op: &mut LedgerOperation<'_>,
420 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
421 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
422 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
423 {
424 match member.into() {
425 AccountSetMemberId::Account(account_id) => {
426 self.repo
427 .find_where_account_is_member_in_tx(op.tx(), account_id, query)
428 .await
429 }
430 AccountSetMemberId::AccountSet(account_set_id) => {
431 self.repo
432 .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
433 .await
434 }
435 }
436 }
437
438 pub async fn list_members_by_created_at(
439 &self,
440 id: AccountSetId,
441 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
442 ) -> Result<
443 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
444 AccountSetError,
445 > {
446 self.repo.list_children_by_created_at(id, args).await
447 }
448
449 pub async fn list_members_created_at_in_op(
450 &self,
451 op: &mut LedgerOperation<'_>,
452 id: AccountSetId,
453 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
454 ) -> Result<
455 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
456 AccountSetError,
457 > {
458 self.repo
459 .list_children_by_created_at_in_tx(op.tx(), id, args)
460 .await
461 }
462
463 pub async fn list_members_by_external_id(
464 &self,
465 id: AccountSetId,
466 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
467 ) -> Result<
468 es_entity::PaginatedQueryRet<
469 AccountSetMemberByExternalId,
470 AccountSetMembersByExternalIdCursor,
471 >,
472 AccountSetError,
473 > {
474 self.repo.list_children_by_external_id(id, args).await
475 }
476
477 pub async fn list_members_by_external_id_in_op(
478 &self,
479 op: &mut LedgerOperation<'_>,
480 id: AccountSetId,
481 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
482 ) -> Result<
483 es_entity::PaginatedQueryRet<
484 AccountSetMemberByExternalId,
485 AccountSetMembersByExternalIdCursor,
486 >,
487 AccountSetError,
488 > {
489 self.repo
490 .list_children_by_external_id_in_tx(op.tx(), id, args)
491 .await
492 }
493
494 pub(crate) async fn fetch_mappings(
495 &self,
496 journal_id: JournalId,
497 account_ids: &[AccountId],
498 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
499 self.repo.fetch_mappings(journal_id, account_ids).await
500 }
501
502 #[cfg(feature = "import")]
503 pub async fn sync_account_set_creation(
504 &self,
505 mut db: es_entity::DbOp<'_>,
506 origin: DataSourceId,
507 values: AccountSetValues,
508 ) -> Result<(), AccountSetError> {
509 let mut account_set = AccountSet::import(origin, values);
510 self.repo
511 .import_in_op(&mut db, origin, &mut account_set)
512 .await?;
513 let recorded_at = db.now();
514 let outbox_events: Vec<_> = account_set
515 .events
516 .last_persisted(1)
517 .map(|p| OutboxEventPayload::from(&p.event))
518 .collect();
519 self.outbox
520 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
521 .await?;
522 Ok(())
523 }
524
525 #[cfg(feature = "import")]
526 pub async fn sync_account_set_update(
527 &self,
528 mut db: es_entity::DbOp<'_>,
529 values: AccountSetValues,
530 fields: Vec<String>,
531 ) -> Result<(), AccountSetError> {
532 let mut account_set = self.repo.find_by_id(values.id).await?;
533 account_set.update((values, fields));
534 let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
535 let recorded_at = db.now();
536 let outbox_events: Vec<_> = account_set
537 .events
538 .last_persisted(n_events)
539 .map(|p| OutboxEventPayload::from(&p.event))
540 .collect();
541 self.outbox
542 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
543 .await?;
544 Ok(())
545 }
546
547 #[cfg(feature = "import")]
548 pub async fn sync_account_set_member_creation(
549 &self,
550 mut db: es_entity::DbOp<'_>,
551 origin: DataSourceId,
552 account_set_id: AccountSetId,
553 member_id: AccountSetMemberId,
554 ) -> Result<(), AccountSetError> {
555 match member_id {
556 AccountSetMemberId::Account(account_id) => {
557 self.repo
558 .import_member_account_in_op(&mut db, account_set_id, account_id)
559 .await?;
560 }
561 AccountSetMemberId::AccountSet(account_set_id) => {
562 self.repo
563 .import_member_set_in_op(&mut db, account_set_id, account_set_id)
564 .await?;
565 }
566 }
567 let recorded_at = db.now();
568 self.outbox
569 .persist_events_at(
570 db.into_tx(),
571 std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
572 source: DataSource::Remote { id: origin },
573 account_set_id,
574 member_id,
575 }),
576 recorded_at,
577 )
578 .await?;
579 Ok(())
580 }
581
582 #[cfg(feature = "import")]
583 pub async fn sync_account_set_member_removal(
584 &self,
585 mut db: es_entity::DbOp<'_>,
586 origin: DataSourceId,
587 account_set_id: AccountSetId,
588 member_id: AccountSetMemberId,
589 ) -> Result<(), AccountSetError> {
590 match member_id {
591 AccountSetMemberId::Account(account_id) => {
592 self.repo
593 .import_remove_member_account(db.tx(), account_set_id, account_id)
594 .await?;
595 }
596 AccountSetMemberId::AccountSet(account_set_id) => {
597 self.repo
598 .import_remove_member_set(db.tx(), account_set_id, account_set_id)
599 .await?;
600 }
601 }
602 let recorded_at = db.now();
603 self.outbox
604 .persist_events_at(
605 db.into_tx(),
606 std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
607 source: DataSource::Remote { id: origin },
608 account_set_id,
609 member_id,
610 }),
611 recorded_at,
612 )
613 .await?;
614 Ok(())
615 }
616}
617
618fn entries_for_add_balance(
619 entries: &mut Vec<NewEntry>,
620 target_account_id: AccountId,
621 balance: BalanceSnapshot,
622) {
623 use rust_decimal::Decimal;
624
625 if balance.settled.cr_balance != Decimal::ZERO {
626 let entry = NewEntry::builder()
627 .id(EntryId::new())
628 .journal_id(balance.journal_id)
629 .account_id(target_account_id)
630 .currency(balance.currency)
631 .sequence(1u32)
632 .layer(Layer::Settled)
633 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
634 .direction(DebitOrCredit::Credit)
635 .units(balance.settled.cr_balance)
636 .transaction_id(UNASSIGNED_TRANSACTION_ID)
637 .build()
638 .expect("Couldn't build entry");
639 entries.push(entry);
640 }
641 if balance.settled.dr_balance != Decimal::ZERO {
642 let entry = NewEntry::builder()
643 .id(EntryId::new())
644 .journal_id(balance.journal_id)
645 .account_id(target_account_id)
646 .currency(balance.currency)
647 .sequence(1u32)
648 .layer(Layer::Settled)
649 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
650 .direction(DebitOrCredit::Debit)
651 .units(balance.settled.dr_balance)
652 .transaction_id(UNASSIGNED_TRANSACTION_ID)
653 .build()
654 .expect("Couldn't build entry");
655 entries.push(entry);
656 }
657 if balance.pending.cr_balance != Decimal::ZERO {
658 let entry = NewEntry::builder()
659 .id(EntryId::new())
660 .journal_id(balance.journal_id)
661 .account_id(target_account_id)
662 .currency(balance.currency)
663 .sequence(1u32)
664 .layer(Layer::Pending)
665 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
666 .direction(DebitOrCredit::Credit)
667 .units(balance.pending.cr_balance)
668 .transaction_id(UNASSIGNED_TRANSACTION_ID)
669 .build()
670 .expect("Couldn't build entry");
671 entries.push(entry);
672 }
673 if balance.pending.dr_balance != Decimal::ZERO {
674 let entry = NewEntry::builder()
675 .id(EntryId::new())
676 .journal_id(balance.journal_id)
677 .account_id(target_account_id)
678 .currency(balance.currency)
679 .sequence(1u32)
680 .layer(Layer::Pending)
681 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
682 .direction(DebitOrCredit::Debit)
683 .units(balance.pending.dr_balance)
684 .transaction_id(UNASSIGNED_TRANSACTION_ID)
685 .build()
686 .expect("Couldn't build entry");
687 entries.push(entry);
688 }
689 if balance.encumbrance.cr_balance != Decimal::ZERO {
690 let entry = NewEntry::builder()
691 .id(EntryId::new())
692 .journal_id(balance.journal_id)
693 .account_id(target_account_id)
694 .currency(balance.currency)
695 .sequence(1u32)
696 .layer(Layer::Encumbrance)
697 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
698 .direction(DebitOrCredit::Credit)
699 .units(balance.encumbrance.cr_balance)
700 .transaction_id(UNASSIGNED_TRANSACTION_ID)
701 .build()
702 .expect("Couldn't build entry");
703 entries.push(entry);
704 }
705 if balance.encumbrance.dr_balance != Decimal::ZERO {
706 let entry = NewEntry::builder()
707 .id(EntryId::new())
708 .journal_id(balance.journal_id)
709 .account_id(target_account_id)
710 .currency(balance.currency)
711 .sequence(1u32)
712 .layer(Layer::Encumbrance)
713 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
714 .direction(DebitOrCredit::Debit)
715 .units(balance.encumbrance.dr_balance)
716 .transaction_id(UNASSIGNED_TRANSACTION_ID)
717 .build()
718 .expect("Couldn't build entry");
719 entries.push(entry);
720 }
721}
722
723fn entries_for_remove_balance(
724 entries: &mut Vec<NewEntry>,
725 target_account_id: AccountId,
726 balance: BalanceSnapshot,
727) {
728 use rust_decimal::Decimal;
729
730 if balance.settled.cr_balance != Decimal::ZERO {
731 let entry = NewEntry::builder()
732 .id(EntryId::new())
733 .journal_id(balance.journal_id)
734 .account_id(target_account_id)
735 .currency(balance.currency)
736 .sequence(1u32)
737 .layer(Layer::Settled)
738 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
739 .direction(DebitOrCredit::Debit)
740 .units(balance.settled.cr_balance)
741 .transaction_id(UNASSIGNED_TRANSACTION_ID)
742 .build()
743 .expect("Couldn't build entry");
744 entries.push(entry);
745 }
746 if balance.settled.dr_balance != Decimal::ZERO {
747 let entry = NewEntry::builder()
748 .id(EntryId::new())
749 .journal_id(balance.journal_id)
750 .account_id(target_account_id)
751 .currency(balance.currency)
752 .sequence(1u32)
753 .layer(Layer::Settled)
754 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
755 .direction(DebitOrCredit::Credit)
756 .units(balance.settled.dr_balance)
757 .transaction_id(UNASSIGNED_TRANSACTION_ID)
758 .build()
759 .expect("Couldn't build entry");
760 entries.push(entry);
761 }
762 if balance.pending.cr_balance != Decimal::ZERO {
763 let entry = NewEntry::builder()
764 .id(EntryId::new())
765 .journal_id(balance.journal_id)
766 .account_id(target_account_id)
767 .currency(balance.currency)
768 .sequence(1u32)
769 .layer(Layer::Pending)
770 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
771 .direction(DebitOrCredit::Debit)
772 .units(balance.pending.cr_balance)
773 .transaction_id(UNASSIGNED_TRANSACTION_ID)
774 .build()
775 .expect("Couldn't build entry");
776 entries.push(entry);
777 }
778 if balance.pending.dr_balance != Decimal::ZERO {
779 let entry = NewEntry::builder()
780 .id(EntryId::new())
781 .journal_id(balance.journal_id)
782 .account_id(target_account_id)
783 .currency(balance.currency)
784 .sequence(1u32)
785 .layer(Layer::Pending)
786 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
787 .direction(DebitOrCredit::Credit)
788 .units(balance.pending.dr_balance)
789 .transaction_id(UNASSIGNED_TRANSACTION_ID)
790 .build()
791 .expect("Couldn't build entry");
792 entries.push(entry);
793 }
794 if balance.encumbrance.cr_balance != Decimal::ZERO {
795 let entry = NewEntry::builder()
796 .id(EntryId::new())
797 .journal_id(balance.journal_id)
798 .account_id(target_account_id)
799 .currency(balance.currency)
800 .sequence(1u32)
801 .layer(Layer::Encumbrance)
802 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
803 .direction(DebitOrCredit::Debit)
804 .units(balance.encumbrance.cr_balance)
805 .transaction_id(UNASSIGNED_TRANSACTION_ID)
806 .build()
807 .expect("Couldn't build entry");
808 entries.push(entry);
809 }
810 if balance.encumbrance.dr_balance != Decimal::ZERO {
811 let entry = NewEntry::builder()
812 .id(EntryId::new())
813 .journal_id(balance.journal_id)
814 .account_id(target_account_id)
815 .currency(balance.currency)
816 .sequence(1u32)
817 .layer(Layer::Encumbrance)
818 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
819 .direction(DebitOrCredit::Credit)
820 .units(balance.encumbrance.dr_balance)
821 .transaction_id(UNASSIGNED_TRANSACTION_ID)
822 .build()
823 .expect("Couldn't build entry");
824 entries.push(entry);
825 }
826}
827
828impl From<&AccountSetEvent> for OutboxEventPayload {
829 fn from(event: &AccountSetEvent) -> Self {
830 match event {
831 #[cfg(feature = "import")]
832 AccountSetEvent::Imported {
833 source,
834 values: account_set,
835 } => OutboxEventPayload::AccountSetCreated {
836 source: *source,
837 account_set: account_set.clone(),
838 },
839 AccountSetEvent::Initialized {
840 values: account_set,
841 } => OutboxEventPayload::AccountSetCreated {
842 source: DataSource::Local,
843 account_set: account_set.clone(),
844 },
845 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
846 source: DataSource::Local,
847 account_set: values.clone(),
848 fields: fields.clone(),
849 },
850 }
851 }
852}