1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14 account::*,
15 balance::*,
16 entry::*,
17 ledger_operation::*,
18 outbox::*,
19 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
20};
21
22pub use cursor::*;
23pub use entity::*;
24use error::*;
25use repo::*;
26pub use repo::{account_set_cursor::*, members_cursor::*};
27
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32 repo: AccountSetRepo,
33 accounts: Accounts,
34 entries: Entries,
35 balances: Balances,
36 outbox: Outbox,
37 pool: PgPool,
38}
39
40impl AccountSets {
41 pub(crate) fn new(
42 pool: &PgPool,
43 outbox: Outbox,
44 accounts: &Accounts,
45 entries: &Entries,
46 balances: &Balances,
47 ) -> Self {
48 Self {
49 repo: AccountSetRepo::new(pool),
50 outbox,
51 accounts: accounts.clone(),
52 entries: entries.clone(),
53 balances: balances.clone(),
54 pool: pool.clone(),
55 }
56 }
57 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58 pub async fn create(
59 &self,
60 new_account_set: NewAccountSet,
61 ) -> Result<AccountSet, AccountSetError> {
62 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63 let account_set = self.create_in_op(&mut op, new_account_set).await?;
64 op.commit().await?;
65 Ok(account_set)
66 }
67
68 #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69 pub async fn create_in_op(
70 &self,
71 db: &mut LedgerOperation<'_>,
72 new_account_set: NewAccountSet,
73 ) -> Result<AccountSet, AccountSetError> {
74 let new_account = NewAccount::builder()
75 .id(new_account_set.id)
76 .name(String::new())
77 .code(new_account_set.id.to_string())
78 .normal_balance_type(new_account_set.normal_balance_type)
79 .is_account_set(true)
80 .velocity_context_values(new_account_set.context_values())
81 .build()
82 .expect("Failed to build account");
83 self.accounts.create_in_op(db, new_account).await?;
84
85 let account_set = self.repo.create_in_op(db, new_account_set).await?;
86 db.accumulate(account_set.last_persisted(1).map(|p| &p.event));
87
88 Ok(account_set)
89 }
90
91 pub async fn create_all(
92 &self,
93 new_account_sets: Vec<NewAccountSet>,
94 ) -> Result<Vec<AccountSet>, AccountSetError> {
95 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
96 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
97 op.commit().await?;
98 Ok(account_sets)
99 }
100
101 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
102 pub async fn create_all_in_op(
103 &self,
104 db: &mut LedgerOperation<'_>,
105 new_account_sets: Vec<NewAccountSet>,
106 ) -> Result<Vec<AccountSet>, AccountSetError> {
107 let mut new_accounts = Vec::new();
108 for new_account_set in new_account_sets.iter() {
109 let new_account = NewAccount::builder()
110 .id(new_account_set.id)
111 .name(String::new())
112 .code(new_account_set.id.to_string())
113 .normal_balance_type(new_account_set.normal_balance_type)
114 .is_account_set(true)
115 .velocity_context_values(new_account_set.context_values())
116 .build()
117 .expect("Failed to build account");
118 new_accounts.push(new_account);
119 }
120 self.accounts.create_all_in_op(db, new_accounts).await?;
121
122 let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
123 db.accumulate(
124 account_sets
125 .iter()
126 .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
127 );
128
129 Ok(account_sets)
130 }
131
132 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
133 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
134 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
135 self.persist_in_op(&mut op, account_set).await?;
136 op.commit().await?;
137 Ok(())
138 }
139
140 #[instrument(
141 name = "cala_ledger.account_sets.persist_in_op",
142 skip(self, db, account_set)
143 )]
144 pub async fn persist_in_op(
145 &self,
146 db: &mut LedgerOperation<'_>,
147 account_set: &mut AccountSet,
148 ) -> Result<(), AccountSetError> {
149 let n_events = self.repo.update_in_op(db, account_set).await?;
150 db.accumulate(account_set.last_persisted(n_events).map(|p| &p.event));
151
152 self.accounts
153 .update_velocity_context_values_in_op(db, account_set.values())
154 .await?;
155
156 Ok(())
157 }
158
159 pub async fn add_member(
160 &self,
161 account_set_id: AccountSetId,
162 member: impl Into<AccountSetMemberId>,
163 ) -> Result<AccountSet, AccountSetError> {
164 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
165 let account_set = self
166 .add_member_in_op(&mut op, account_set_id, member)
167 .await?;
168 op.commit().await?;
169 Ok(account_set)
170 }
171
172 pub async fn add_member_in_op(
173 &self,
174 op: &mut LedgerOperation<'_>,
175 account_set_id: AccountSetId,
176 member: impl Into<AccountSetMemberId>,
177 ) -> Result<AccountSet, AccountSetError> {
178 let member = member.into();
179 let (time, parents, account_set, member_id) = match member {
180 AccountSetMemberId::Account(id) => {
181 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
182 let (time, parents) = self
183 .repo
184 .add_member_account_and_return_parents(&mut *op, account_set_id, id)
185 .await?;
186 (time, parents, set, id)
187 }
188 AccountSetMemberId::AccountSet(id) => {
189 let mut accounts = self
190 .repo
191 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
192 .await?;
193 let target = accounts
194 .remove(&account_set_id)
195 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
196 let member = accounts
197 .remove(&id)
198 .ok_or(AccountSetError::CouldNotFindById(id))?;
199
200 if target.values().journal_id != member.values().journal_id {
201 return Err(AccountSetError::JournalIdMismatch);
202 }
203
204 let (time, parents) = self
205 .repo
206 .add_member_set_and_return_parents(op, account_set_id, id)
207 .await?;
208 (time, parents, target, AccountId::from(id))
209 }
210 };
211
212 op.accumulate(std::iter::once(
213 OutboxEventPayload::AccountSetMemberCreated {
214 source: DataSource::Local,
215 account_set_id,
216 member_id: member,
217 },
218 ));
219
220 let balances = self
221 .balances
222 .find_balances_for_update(op, account_set.values().journal_id, member_id)
223 .await?;
224
225 let target_account_id = AccountId::from(&account_set.id());
226 let mut entries = Vec::new();
227 for balance in balances.into_values() {
228 entries_for_add_balance(&mut entries, target_account_id, balance);
229 }
230
231 if entries.is_empty() {
232 return Ok(account_set);
233 }
234 let entries = self.entries.create_all_in_op(op, entries).await?;
235 let mappings = std::iter::once((target_account_id, parents)).collect();
236 self.balances
237 .update_balances_in_op(
238 op,
239 account_set.values().journal_id,
240 entries,
241 time.date_naive(),
242 time,
243 mappings,
244 )
245 .await?;
246
247 Ok(account_set)
248 }
249
250 pub async fn remove_member(
251 &self,
252 account_set_id: AccountSetId,
253 member: impl Into<AccountSetMemberId>,
254 ) -> Result<AccountSet, AccountSetError> {
255 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
256 let account_set = self
257 .remove_member_in_op(&mut op, account_set_id, member)
258 .await?;
259 op.commit().await?;
260 Ok(account_set)
261 }
262
263 pub async fn remove_member_in_op(
264 &self,
265 op: &mut LedgerOperation<'_>,
266 account_set_id: AccountSetId,
267 member: impl Into<AccountSetMemberId>,
268 ) -> Result<AccountSet, AccountSetError> {
269 let member = member.into();
270 let (time, parents, account_set, member_id) = match member {
271 AccountSetMemberId::Account(id) => {
272 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
273 let (time, parents) = self
274 .repo
275 .remove_member_account_and_return_parents(op, account_set_id, id)
276 .await?;
277 (time, parents, set, id)
278 }
279 AccountSetMemberId::AccountSet(id) => {
280 let mut accounts = self
281 .repo
282 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
283 .await?;
284 let target = accounts
285 .remove(&account_set_id)
286 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
287 let member = accounts
288 .remove(&id)
289 .ok_or(AccountSetError::CouldNotFindById(id))?;
290
291 if target.values().journal_id != member.values().journal_id {
292 return Err(AccountSetError::JournalIdMismatch);
293 }
294
295 let (time, parents) = self
296 .repo
297 .remove_member_set_and_return_parents(op, account_set_id, id)
298 .await?;
299 (time, parents, target, AccountId::from(id))
300 }
301 };
302
303 op.accumulate(std::iter::once(
304 OutboxEventPayload::AccountSetMemberRemoved {
305 source: DataSource::Local,
306 account_set_id,
307 member_id: member,
308 },
309 ));
310
311 let balances = self
312 .balances
313 .find_balances_for_update(op, account_set.values().journal_id, member_id)
314 .await?;
315
316 let target_account_id = AccountId::from(&account_set.id());
317 let mut entries = Vec::new();
318 for balance in balances.into_values() {
319 entries_for_remove_balance(&mut entries, target_account_id, balance);
320 }
321
322 if entries.is_empty() {
323 return Ok(account_set);
324 }
325 let entries = self.entries.create_all_in_op(op, entries).await?;
326 let mappings = std::iter::once((target_account_id, parents)).collect();
327 self.balances
328 .update_balances_in_op(
329 op,
330 account_set.values().journal_id,
331 entries,
332 time.date_naive(),
333 time,
334 mappings,
335 )
336 .await?;
337
338 Ok(account_set)
339 }
340
341 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
342 pub async fn find_all<T: From<AccountSet>>(
343 &self,
344 account_set_ids: &[AccountSetId],
345 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
346 self.repo.find_all(account_set_ids).await
347 }
348
349 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
350 pub async fn find_all_in_op<T: From<AccountSet>>(
351 &self,
352 op: &mut LedgerOperation<'_>,
353 account_set_ids: &[AccountSetId],
354 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
355 self.repo.find_all_in_op(op, account_set_ids).await
356 }
357
358 #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
359 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
360 self.repo.find_by_id(account_set_id).await
361 }
362
363 #[instrument(
364 name = "cala_ledger.accounts_sets.find_by_external_id",
365 skip(self),
366 err
367 )]
368 pub async fn find_by_external_id(
369 &self,
370 external_id: String,
371 ) -> Result<AccountSet, AccountSetError> {
372 self.repo.find_by_external_id(Some(external_id)).await
373 }
374
375 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
376 pub async fn find_where_member(
377 &self,
378 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
379 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
380 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
381 {
382 match member.into() {
383 AccountSetMemberId::Account(account_id) => {
384 self.repo
385 .find_where_account_is_member(account_id, query)
386 .await
387 }
388 AccountSetMemberId::AccountSet(account_set_id) => {
389 self.repo
390 .find_where_account_set_is_member(account_set_id, query)
391 .await
392 }
393 }
394 }
395
396 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
397 pub async fn list_for_name(
398 &self,
399 name: String,
400 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
401 ) -> Result<
402 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
403 AccountSetError,
404 > {
405 self.repo
406 .list_for_name_by_created_at(name, args, Default::default())
407 .await
408 }
409
410 #[instrument(
411 name = "cala_ledger.account_sets.list_for_name_in_op",
412 skip(self, op),
413 err
414 )]
415 pub async fn list_for_name_in_op(
416 &self,
417 op: &mut LedgerOperation<'_>,
418 name: String,
419 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
420 ) -> Result<
421 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
422 AccountSetError,
423 > {
424 self.repo
425 .list_for_name_by_created_at_in_op(op, name, args, Default::default())
426 .await
427 }
428
429 #[instrument(
430 name = "cala_ledger.account_sets.find_where_member_in_op",
431 skip(self, op),
432 err
433 )]
434 pub async fn find_where_member_in_op(
435 &self,
436 op: &mut LedgerOperation<'_>,
437 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
438 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
439 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
440 {
441 match member.into() {
442 AccountSetMemberId::Account(account_id) => {
443 self.repo
444 .find_where_account_is_member_in_op(op, account_id, query)
445 .await
446 }
447 AccountSetMemberId::AccountSet(account_set_id) => {
448 self.repo
449 .find_where_account_set_is_member_in_op(op, account_set_id, query)
450 .await
451 }
452 }
453 }
454
455 pub async fn list_members_by_created_at(
456 &self,
457 id: AccountSetId,
458 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
459 ) -> Result<
460 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
461 AccountSetError,
462 > {
463 self.repo.list_children_by_created_at(id, args).await
464 }
465
466 pub async fn list_members_by_created_at_in_op(
467 &self,
468 op: &mut LedgerOperation<'_>,
469 id: AccountSetId,
470 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
471 ) -> Result<
472 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
473 AccountSetError,
474 > {
475 self.repo
476 .list_children_by_created_at_in_op(op, id, args)
477 .await
478 }
479
480 pub async fn list_members_by_external_id(
481 &self,
482 id: AccountSetId,
483 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
484 ) -> Result<
485 es_entity::PaginatedQueryRet<
486 AccountSetMemberByExternalId,
487 AccountSetMembersByExternalIdCursor,
488 >,
489 AccountSetError,
490 > {
491 self.repo.list_children_by_external_id(id, args).await
492 }
493
494 pub async fn list_members_by_external_id_in_op(
495 &self,
496 op: &mut LedgerOperation<'_>,
497 id: AccountSetId,
498 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
499 ) -> Result<
500 es_entity::PaginatedQueryRet<
501 AccountSetMemberByExternalId,
502 AccountSetMembersByExternalIdCursor,
503 >,
504 AccountSetError,
505 > {
506 self.repo
507 .list_children_by_external_id_in_op(op, id, args)
508 .await
509 }
510
511 pub(crate) async fn fetch_mappings_in_op(
512 &self,
513 op: &mut LedgerOperation<'_>,
514 journal_id: JournalId,
515 account_ids: &[AccountId],
516 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
517 self.repo
518 .fetch_mappings_in_op(op, journal_id, account_ids)
519 .await
520 }
521
522 #[cfg(feature = "import")]
523 pub(crate) async fn sync_account_set_creation(
524 &self,
525 mut db: es_entity::DbOpWithTime<'_>,
526 origin: DataSourceId,
527 values: AccountSetValues,
528 ) -> Result<(), AccountSetError> {
529 let mut account_set = AccountSet::import(origin, values);
530 self.repo
531 .import_in_op(&mut db, origin, &mut account_set)
532 .await?;
533 let outbox_events: Vec<_> = account_set
534 .last_persisted(1)
535 .map(|p| OutboxEventPayload::from(&p.event))
536 .collect();
537 let time = db.now();
538 self.outbox
539 .persist_events_at(db, outbox_events, time)
540 .await?;
541 Ok(())
542 }
543
544 #[cfg(feature = "import")]
545 pub(crate) async fn sync_account_set_update(
546 &self,
547 mut db: es_entity::DbOpWithTime<'_>,
548 values: AccountSetValues,
549 fields: Vec<String>,
550 ) -> Result<(), AccountSetError> {
551 let mut account_set = self.repo.find_by_id(values.id).await?;
552 account_set.update((values, fields));
553 let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
554 let outbox_events: Vec<_> = account_set
555 .last_persisted(n_events)
556 .map(|p| OutboxEventPayload::from(&p.event))
557 .collect();
558 let time = db.now();
559 self.outbox
560 .persist_events_at(db, outbox_events, time)
561 .await?;
562 Ok(())
563 }
564
565 #[cfg(feature = "import")]
566 pub(crate) async fn sync_account_set_member_creation(
567 &self,
568 mut db: es_entity::DbOpWithTime<'_>,
569 origin: DataSourceId,
570 account_set_id: AccountSetId,
571 member_id: AccountSetMemberId,
572 ) -> Result<(), AccountSetError> {
573 match member_id {
574 AccountSetMemberId::Account(account_id) => {
575 self.repo
576 .import_member_account_in_op(&mut db, account_set_id, account_id)
577 .await?;
578 }
579 AccountSetMemberId::AccountSet(account_set_id) => {
580 self.repo
581 .import_member_set_in_op(&mut db, account_set_id, account_set_id)
582 .await?;
583 }
584 }
585 let time = db.now();
586 self.outbox
587 .persist_events_at(
588 db,
589 std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
590 source: DataSource::Remote { id: origin },
591 account_set_id,
592 member_id,
593 }),
594 time,
595 )
596 .await?;
597 Ok(())
598 }
599
600 #[cfg(feature = "import")]
601 pub(crate) async fn sync_account_set_member_removal(
602 &self,
603 mut db: es_entity::DbOpWithTime<'_>,
604 origin: DataSourceId,
605 account_set_id: AccountSetId,
606 member_id: AccountSetMemberId,
607 ) -> Result<(), AccountSetError> {
608 match member_id {
609 AccountSetMemberId::Account(account_id) => {
610 self.repo
611 .import_remove_member_account(&mut db, account_set_id, account_id)
612 .await?;
613 }
614 AccountSetMemberId::AccountSet(account_set_id) => {
615 self.repo
616 .import_remove_member_set(&mut db, account_set_id, account_set_id)
617 .await?;
618 }
619 }
620 let time = db.now();
621 self.outbox
622 .persist_events_at(
623 db,
624 std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
625 source: DataSource::Remote { id: origin },
626 account_set_id,
627 member_id,
628 }),
629 time,
630 )
631 .await?;
632 Ok(())
633 }
634}
635
636fn entries_for_add_balance(
637 entries: &mut Vec<NewEntry>,
638 target_account_id: AccountId,
639 balance: BalanceSnapshot,
640) {
641 use rust_decimal::Decimal;
642
643 if balance.settled.cr_balance != Decimal::ZERO {
644 let entry = NewEntry::builder()
645 .id(EntryId::new())
646 .journal_id(balance.journal_id)
647 .account_id(target_account_id)
648 .currency(balance.currency)
649 .sequence(1u32)
650 .layer(Layer::Settled)
651 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
652 .direction(DebitOrCredit::Credit)
653 .units(balance.settled.cr_balance)
654 .transaction_id(UNASSIGNED_TRANSACTION_ID)
655 .build()
656 .expect("Couldn't build entry");
657 entries.push(entry);
658 }
659 if balance.settled.dr_balance != Decimal::ZERO {
660 let entry = NewEntry::builder()
661 .id(EntryId::new())
662 .journal_id(balance.journal_id)
663 .account_id(target_account_id)
664 .currency(balance.currency)
665 .sequence(1u32)
666 .layer(Layer::Settled)
667 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
668 .direction(DebitOrCredit::Debit)
669 .units(balance.settled.dr_balance)
670 .transaction_id(UNASSIGNED_TRANSACTION_ID)
671 .build()
672 .expect("Couldn't build entry");
673 entries.push(entry);
674 }
675 if balance.pending.cr_balance != Decimal::ZERO {
676 let entry = NewEntry::builder()
677 .id(EntryId::new())
678 .journal_id(balance.journal_id)
679 .account_id(target_account_id)
680 .currency(balance.currency)
681 .sequence(1u32)
682 .layer(Layer::Pending)
683 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
684 .direction(DebitOrCredit::Credit)
685 .units(balance.pending.cr_balance)
686 .transaction_id(UNASSIGNED_TRANSACTION_ID)
687 .build()
688 .expect("Couldn't build entry");
689 entries.push(entry);
690 }
691 if balance.pending.dr_balance != Decimal::ZERO {
692 let entry = NewEntry::builder()
693 .id(EntryId::new())
694 .journal_id(balance.journal_id)
695 .account_id(target_account_id)
696 .currency(balance.currency)
697 .sequence(1u32)
698 .layer(Layer::Pending)
699 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
700 .direction(DebitOrCredit::Debit)
701 .units(balance.pending.dr_balance)
702 .transaction_id(UNASSIGNED_TRANSACTION_ID)
703 .build()
704 .expect("Couldn't build entry");
705 entries.push(entry);
706 }
707 if balance.encumbrance.cr_balance != Decimal::ZERO {
708 let entry = NewEntry::builder()
709 .id(EntryId::new())
710 .journal_id(balance.journal_id)
711 .account_id(target_account_id)
712 .currency(balance.currency)
713 .sequence(1u32)
714 .layer(Layer::Encumbrance)
715 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
716 .direction(DebitOrCredit::Credit)
717 .units(balance.encumbrance.cr_balance)
718 .transaction_id(UNASSIGNED_TRANSACTION_ID)
719 .build()
720 .expect("Couldn't build entry");
721 entries.push(entry);
722 }
723 if balance.encumbrance.dr_balance != Decimal::ZERO {
724 let entry = NewEntry::builder()
725 .id(EntryId::new())
726 .journal_id(balance.journal_id)
727 .account_id(target_account_id)
728 .currency(balance.currency)
729 .sequence(1u32)
730 .layer(Layer::Encumbrance)
731 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
732 .direction(DebitOrCredit::Debit)
733 .units(balance.encumbrance.dr_balance)
734 .transaction_id(UNASSIGNED_TRANSACTION_ID)
735 .build()
736 .expect("Couldn't build entry");
737 entries.push(entry);
738 }
739}
740
741fn entries_for_remove_balance(
742 entries: &mut Vec<NewEntry>,
743 target_account_id: AccountId,
744 balance: BalanceSnapshot,
745) {
746 use rust_decimal::Decimal;
747
748 if balance.settled.cr_balance != Decimal::ZERO {
749 let entry = NewEntry::builder()
750 .id(EntryId::new())
751 .journal_id(balance.journal_id)
752 .account_id(target_account_id)
753 .currency(balance.currency)
754 .sequence(1u32)
755 .layer(Layer::Settled)
756 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
757 .direction(DebitOrCredit::Debit)
758 .units(balance.settled.cr_balance)
759 .transaction_id(UNASSIGNED_TRANSACTION_ID)
760 .build()
761 .expect("Couldn't build entry");
762 entries.push(entry);
763 }
764 if balance.settled.dr_balance != Decimal::ZERO {
765 let entry = NewEntry::builder()
766 .id(EntryId::new())
767 .journal_id(balance.journal_id)
768 .account_id(target_account_id)
769 .currency(balance.currency)
770 .sequence(1u32)
771 .layer(Layer::Settled)
772 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
773 .direction(DebitOrCredit::Credit)
774 .units(balance.settled.dr_balance)
775 .transaction_id(UNASSIGNED_TRANSACTION_ID)
776 .build()
777 .expect("Couldn't build entry");
778 entries.push(entry);
779 }
780 if balance.pending.cr_balance != Decimal::ZERO {
781 let entry = NewEntry::builder()
782 .id(EntryId::new())
783 .journal_id(balance.journal_id)
784 .account_id(target_account_id)
785 .currency(balance.currency)
786 .sequence(1u32)
787 .layer(Layer::Pending)
788 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
789 .direction(DebitOrCredit::Debit)
790 .units(balance.pending.cr_balance)
791 .transaction_id(UNASSIGNED_TRANSACTION_ID)
792 .build()
793 .expect("Couldn't build entry");
794 entries.push(entry);
795 }
796 if balance.pending.dr_balance != Decimal::ZERO {
797 let entry = NewEntry::builder()
798 .id(EntryId::new())
799 .journal_id(balance.journal_id)
800 .account_id(target_account_id)
801 .currency(balance.currency)
802 .sequence(1u32)
803 .layer(Layer::Pending)
804 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
805 .direction(DebitOrCredit::Credit)
806 .units(balance.pending.dr_balance)
807 .transaction_id(UNASSIGNED_TRANSACTION_ID)
808 .build()
809 .expect("Couldn't build entry");
810 entries.push(entry);
811 }
812 if balance.encumbrance.cr_balance != Decimal::ZERO {
813 let entry = NewEntry::builder()
814 .id(EntryId::new())
815 .journal_id(balance.journal_id)
816 .account_id(target_account_id)
817 .currency(balance.currency)
818 .sequence(1u32)
819 .layer(Layer::Encumbrance)
820 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
821 .direction(DebitOrCredit::Debit)
822 .units(balance.encumbrance.cr_balance)
823 .transaction_id(UNASSIGNED_TRANSACTION_ID)
824 .build()
825 .expect("Couldn't build entry");
826 entries.push(entry);
827 }
828 if balance.encumbrance.dr_balance != Decimal::ZERO {
829 let entry = NewEntry::builder()
830 .id(EntryId::new())
831 .journal_id(balance.journal_id)
832 .account_id(target_account_id)
833 .currency(balance.currency)
834 .sequence(1u32)
835 .layer(Layer::Encumbrance)
836 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
837 .direction(DebitOrCredit::Credit)
838 .units(balance.encumbrance.dr_balance)
839 .transaction_id(UNASSIGNED_TRANSACTION_ID)
840 .build()
841 .expect("Couldn't build entry");
842 entries.push(entry);
843 }
844}
845
846impl From<&AccountSetEvent> for OutboxEventPayload {
847 fn from(event: &AccountSetEvent) -> Self {
848 match event {
849 #[cfg(feature = "import")]
850 AccountSetEvent::Imported {
851 source,
852 values: account_set,
853 } => OutboxEventPayload::AccountSetCreated {
854 source: *source,
855 account_set: account_set.clone(),
856 },
857 AccountSetEvent::Initialized {
858 values: account_set,
859 } => OutboxEventPayload::AccountSetCreated {
860 source: DataSource::Local,
861 account_set: account_set.clone(),
862 },
863 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
864 source: DataSource::Local,
865 account_set: values.clone(),
866 fields: fields.clone(),
867 },
868 }
869 }
870}