1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14 account::*,
15 balance::*,
16 entry::*,
17 ledger_operation::*,
18 outbox::*,
19 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
20};
21
22pub use cursor::*;
23pub use entity::*;
24use error::*;
25use repo::*;
26pub use repo::{account_set_cursor::*, members_cursor::*};
27
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32 repo: AccountSetRepo,
33 accounts: Accounts,
34 entries: Entries,
35 balances: Balances,
36 outbox: Outbox,
37 pool: PgPool,
38}
39
40impl AccountSets {
41 pub(crate) fn new(
42 pool: &PgPool,
43 outbox: Outbox,
44 accounts: &Accounts,
45 entries: &Entries,
46 balances: &Balances,
47 ) -> Self {
48 Self {
49 repo: AccountSetRepo::new(pool),
50 outbox,
51 accounts: accounts.clone(),
52 entries: entries.clone(),
53 balances: balances.clone(),
54 pool: pool.clone(),
55 }
56 }
57 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58 pub async fn create(
59 &self,
60 new_account_set: NewAccountSet,
61 ) -> Result<AccountSet, AccountSetError> {
62 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63 let account_set = self.create_in_op(&mut op, new_account_set).await?;
64 op.commit().await?;
65 Ok(account_set)
66 }
67
68 #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69 pub async fn create_in_op(
70 &self,
71 db: &mut LedgerOperation<'_>,
72 new_account_set: NewAccountSet,
73 ) -> Result<AccountSet, AccountSetError> {
74 let new_account = NewAccount::builder()
75 .id(uuid::Uuid::from(new_account_set.id))
76 .name(String::new())
77 .code(new_account_set.id.to_string())
78 .normal_balance_type(new_account_set.normal_balance_type)
79 .is_account_set(true)
80 .build()
81 .expect("Failed to build account");
82 self.accounts.create_in_op(db, new_account).await?;
83 let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84 db.accumulate(account_set.last_persisted(1).map(|p| &p.event));
85 Ok(account_set)
86 }
87
88 pub async fn create_all(
89 &self,
90 new_account_sets: Vec<NewAccountSet>,
91 ) -> Result<Vec<AccountSet>, AccountSetError> {
92 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94 op.commit().await?;
95 Ok(account_sets)
96 }
97
98 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99 pub async fn create_all_in_op(
100 &self,
101 db: &mut LedgerOperation<'_>,
102 new_account_sets: Vec<NewAccountSet>,
103 ) -> Result<Vec<AccountSet>, AccountSetError> {
104 let mut new_accounts = Vec::new();
105 for new_account_set in new_account_sets.iter() {
106 let new_account = NewAccount::builder()
107 .id(uuid::Uuid::from(new_account_set.id))
108 .name(String::new())
109 .code(new_account_set.id.to_string())
110 .normal_balance_type(new_account_set.normal_balance_type)
111 .is_account_set(true)
112 .build()
113 .expect("Failed to build account");
114 new_accounts.push(new_account);
115 }
116 self.accounts.create_all_in_op(db, new_accounts).await?;
117 let account_sets = self
118 .repo
119 .create_all_in_op(db.op(), new_account_sets)
120 .await?;
121 db.accumulate(
122 account_sets
123 .iter()
124 .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
125 );
126 Ok(account_sets)
127 }
128
129 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
130 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
131 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
132 self.persist_in_op(&mut op, account_set).await?;
133 op.commit().await?;
134 Ok(())
135 }
136
137 #[instrument(
138 name = "cala_ledger.account_sets.persist_in_op",
139 skip(self, db, account_set)
140 )]
141 pub async fn persist_in_op(
142 &self,
143 db: &mut LedgerOperation<'_>,
144 account_set: &mut AccountSet,
145 ) -> Result<(), AccountSetError> {
146 let n_events = self.repo.update_in_op(db.op(), account_set).await?;
147 db.accumulate(account_set.last_persisted(n_events).map(|p| &p.event));
148 Ok(())
149 }
150
151 pub async fn add_member(
152 &self,
153 account_set_id: AccountSetId,
154 member: impl Into<AccountSetMemberId>,
155 ) -> Result<AccountSet, AccountSetError> {
156 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
157 let account_set = self
158 .add_member_in_op(&mut op, account_set_id, member)
159 .await?;
160 op.commit().await?;
161 Ok(account_set)
162 }
163
164 pub async fn add_member_in_op(
165 &self,
166 op: &mut LedgerOperation<'_>,
167 account_set_id: AccountSetId,
168 member: impl Into<AccountSetMemberId>,
169 ) -> Result<AccountSet, AccountSetError> {
170 let member = member.into();
171 let (time, parents, account_set, member_id) = match member {
172 AccountSetMemberId::Account(id) => {
173 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
174 let (time, parents) = self
175 .repo
176 .add_member_account_and_return_parents(op.tx(), account_set_id, id)
177 .await?;
178 (time, parents, set, id)
179 }
180 AccountSetMemberId::AccountSet(id) => {
181 let mut accounts = self
182 .repo
183 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
184 .await?;
185 let target = accounts
186 .remove(&account_set_id)
187 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
188 let member = accounts
189 .remove(&id)
190 .ok_or(AccountSetError::CouldNotFindById(id))?;
191
192 if target.values().journal_id != member.values().journal_id {
193 return Err(AccountSetError::JournalIdMismatch);
194 }
195
196 let (time, parents) = self
197 .repo
198 .add_member_set_and_return_parents(op.tx(), account_set_id, id)
199 .await?;
200 (time, parents, target, AccountId::from(id))
201 }
202 };
203
204 op.accumulate(std::iter::once(
205 OutboxEventPayload::AccountSetMemberCreated {
206 source: DataSource::Local,
207 account_set_id,
208 member_id: member,
209 },
210 ));
211
212 let balances = self
213 .balances
214 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
215 .await?;
216
217 let target_account_id = AccountId::from(&account_set.id());
218 let mut entries = Vec::new();
219 for balance in balances.into_values() {
220 entries_for_add_balance(&mut entries, target_account_id, balance);
221 }
222
223 if entries.is_empty() {
224 return Ok(account_set);
225 }
226 let entries = self.entries.create_all_in_op(op, entries).await?;
227 let mappings = std::iter::once((target_account_id, parents)).collect();
228 self.balances
229 .update_balances_in_op(
230 op,
231 account_set.values().journal_id,
232 entries,
233 time.date_naive(),
234 time,
235 mappings,
236 )
237 .await?;
238
239 Ok(account_set)
240 }
241
242 pub async fn remove_member(
243 &self,
244 account_set_id: AccountSetId,
245 member: impl Into<AccountSetMemberId>,
246 ) -> Result<AccountSet, AccountSetError> {
247 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
248 let account_set = self
249 .remove_member_in_op(&mut op, account_set_id, member)
250 .await?;
251 op.commit().await?;
252 Ok(account_set)
253 }
254
255 pub async fn remove_member_in_op(
256 &self,
257 op: &mut LedgerOperation<'_>,
258 account_set_id: AccountSetId,
259 member: impl Into<AccountSetMemberId>,
260 ) -> Result<AccountSet, AccountSetError> {
261 let member = member.into();
262 let (time, parents, account_set, member_id) = match member {
263 AccountSetMemberId::Account(id) => {
264 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
265 let (time, parents) = self
266 .repo
267 .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
268 .await?;
269 (time, parents, set, id)
270 }
271 AccountSetMemberId::AccountSet(id) => {
272 let mut accounts = self
273 .repo
274 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
275 .await?;
276 let target = accounts
277 .remove(&account_set_id)
278 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
279 let member = accounts
280 .remove(&id)
281 .ok_or(AccountSetError::CouldNotFindById(id))?;
282
283 if target.values().journal_id != member.values().journal_id {
284 return Err(AccountSetError::JournalIdMismatch);
285 }
286
287 let (time, parents) = self
288 .repo
289 .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
290 .await?;
291 (time, parents, target, AccountId::from(id))
292 }
293 };
294
295 op.accumulate(std::iter::once(
296 OutboxEventPayload::AccountSetMemberRemoved {
297 source: DataSource::Local,
298 account_set_id,
299 member_id: member,
300 },
301 ));
302
303 let balances = self
304 .balances
305 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
306 .await?;
307
308 let target_account_id = AccountId::from(&account_set.id());
309 let mut entries = Vec::new();
310 for balance in balances.into_values() {
311 entries_for_remove_balance(&mut entries, target_account_id, balance);
312 }
313
314 if entries.is_empty() {
315 return Ok(account_set);
316 }
317 let entries = self.entries.create_all_in_op(op, entries).await?;
318 let mappings = std::iter::once((target_account_id, parents)).collect();
319 self.balances
320 .update_balances_in_op(
321 op,
322 account_set.values().journal_id,
323 entries,
324 time.date_naive(),
325 time,
326 mappings,
327 )
328 .await?;
329
330 Ok(account_set)
331 }
332
333 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
334 pub async fn find_all<T: From<AccountSet>>(
335 &self,
336 account_set_ids: &[AccountSetId],
337 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
338 self.repo.find_all(account_set_ids).await
339 }
340
341 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
342 pub async fn find_all_in_op<T: From<AccountSet>>(
343 &self,
344 op: &mut LedgerOperation<'_>,
345 account_set_ids: &[AccountSetId],
346 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
347 self.repo.find_all_in_tx(op.tx(), account_set_ids).await
348 }
349
350 #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
351 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
352 self.repo.find_by_id(account_set_id).await
353 }
354
355 #[instrument(
356 name = "cala_ledger.accounts_sets.find_by_external_id",
357 skip(self),
358 err
359 )]
360 pub async fn find_by_external_id(
361 &self,
362 external_id: String,
363 ) -> Result<AccountSet, AccountSetError> {
364 self.repo.find_by_external_id(Some(external_id)).await
365 }
366
367 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
368 pub async fn find_where_member(
369 &self,
370 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
371 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
372 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
373 {
374 match member.into() {
375 AccountSetMemberId::Account(account_id) => {
376 self.repo
377 .find_where_account_is_member(account_id, query)
378 .await
379 }
380 AccountSetMemberId::AccountSet(account_set_id) => {
381 self.repo
382 .find_where_account_set_is_member(account_set_id, query)
383 .await
384 }
385 }
386 }
387
388 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
389 pub async fn list_for_name(
390 &self,
391 name: String,
392 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
393 ) -> Result<
394 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
395 AccountSetError,
396 > {
397 self.repo
398 .list_for_name_by_created_at(name, args, Default::default())
399 .await
400 }
401
402 #[instrument(
403 name = "cala_ledger.account_sets.list_for_name_in_op",
404 skip(self, op),
405 err
406 )]
407 pub async fn list_for_name_in_op(
408 &self,
409 op: &mut LedgerOperation<'_>,
410 name: String,
411 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
412 ) -> Result<
413 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
414 AccountSetError,
415 > {
416 self.repo
417 .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
418 .await
419 }
420
421 #[instrument(
422 name = "cala_ledger.account_sets.find_where_member_in_op",
423 skip(self, op),
424 err
425 )]
426 pub async fn find_where_member_in_op(
427 &self,
428 op: &mut LedgerOperation<'_>,
429 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
430 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
431 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
432 {
433 match member.into() {
434 AccountSetMemberId::Account(account_id) => {
435 self.repo
436 .find_where_account_is_member_in_tx(op.tx(), account_id, query)
437 .await
438 }
439 AccountSetMemberId::AccountSet(account_set_id) => {
440 self.repo
441 .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
442 .await
443 }
444 }
445 }
446
447 pub async fn list_members_by_created_at(
448 &self,
449 id: AccountSetId,
450 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
451 ) -> Result<
452 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
453 AccountSetError,
454 > {
455 self.repo.list_children_by_created_at(id, args).await
456 }
457
458 pub async fn list_members_by_created_at_in_op(
459 &self,
460 op: &mut LedgerOperation<'_>,
461 id: AccountSetId,
462 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
463 ) -> Result<
464 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
465 AccountSetError,
466 > {
467 self.repo
468 .list_children_by_created_at_in_tx(op.tx(), id, args)
469 .await
470 }
471
472 pub async fn list_members_by_external_id(
473 &self,
474 id: AccountSetId,
475 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
476 ) -> Result<
477 es_entity::PaginatedQueryRet<
478 AccountSetMemberByExternalId,
479 AccountSetMembersByExternalIdCursor,
480 >,
481 AccountSetError,
482 > {
483 self.repo.list_children_by_external_id(id, args).await
484 }
485
486 pub async fn list_members_by_external_id_in_op(
487 &self,
488 op: &mut LedgerOperation<'_>,
489 id: AccountSetId,
490 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
491 ) -> Result<
492 es_entity::PaginatedQueryRet<
493 AccountSetMemberByExternalId,
494 AccountSetMembersByExternalIdCursor,
495 >,
496 AccountSetError,
497 > {
498 self.repo
499 .list_children_by_external_id_in_tx(op.tx(), id, args)
500 .await
501 }
502
503 pub(crate) async fn fetch_mappings_in_op(
504 &self,
505 op: &mut LedgerOperation<'_>,
506 journal_id: JournalId,
507 account_ids: &[AccountId],
508 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
509 self.repo
510 .fetch_mappings_in_tx(op.tx(), journal_id, account_ids)
511 .await
512 }
513
514 #[cfg(feature = "import")]
515 pub(crate) async fn sync_account_set_creation(
516 &self,
517 mut db: es_entity::DbOp<'_>,
518 origin: DataSourceId,
519 values: AccountSetValues,
520 ) -> Result<(), AccountSetError> {
521 let mut account_set = AccountSet::import(origin, values);
522 self.repo
523 .import_in_op(&mut db, origin, &mut account_set)
524 .await?;
525 let recorded_at = db.now();
526 let outbox_events: Vec<_> = account_set
527 .last_persisted(1)
528 .map(|p| OutboxEventPayload::from(&p.event))
529 .collect();
530 self.outbox
531 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
532 .await?;
533 Ok(())
534 }
535
536 #[cfg(feature = "import")]
537 pub(crate) async fn sync_account_set_update(
538 &self,
539 mut db: es_entity::DbOp<'_>,
540 values: AccountSetValues,
541 fields: Vec<String>,
542 ) -> Result<(), AccountSetError> {
543 let mut account_set = self.repo.find_by_id(values.id).await?;
544 account_set.update((values, fields));
545 let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
546 let recorded_at = db.now();
547 let outbox_events: Vec<_> = account_set
548 .last_persisted(n_events)
549 .map(|p| OutboxEventPayload::from(&p.event))
550 .collect();
551 self.outbox
552 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
553 .await?;
554 Ok(())
555 }
556
557 #[cfg(feature = "import")]
558 pub(crate) async fn sync_account_set_member_creation(
559 &self,
560 mut db: es_entity::DbOp<'_>,
561 origin: DataSourceId,
562 account_set_id: AccountSetId,
563 member_id: AccountSetMemberId,
564 ) -> Result<(), AccountSetError> {
565 match member_id {
566 AccountSetMemberId::Account(account_id) => {
567 self.repo
568 .import_member_account_in_op(&mut db, account_set_id, account_id)
569 .await?;
570 }
571 AccountSetMemberId::AccountSet(account_set_id) => {
572 self.repo
573 .import_member_set_in_op(&mut db, account_set_id, account_set_id)
574 .await?;
575 }
576 }
577 let recorded_at = db.now();
578 self.outbox
579 .persist_events_at(
580 db.into_tx(),
581 std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
582 source: DataSource::Remote { id: origin },
583 account_set_id,
584 member_id,
585 }),
586 recorded_at,
587 )
588 .await?;
589 Ok(())
590 }
591
592 #[cfg(feature = "import")]
593 pub(crate) async fn sync_account_set_member_removal(
594 &self,
595 mut db: es_entity::DbOp<'_>,
596 origin: DataSourceId,
597 account_set_id: AccountSetId,
598 member_id: AccountSetMemberId,
599 ) -> Result<(), AccountSetError> {
600 match member_id {
601 AccountSetMemberId::Account(account_id) => {
602 self.repo
603 .import_remove_member_account(db.tx(), account_set_id, account_id)
604 .await?;
605 }
606 AccountSetMemberId::AccountSet(account_set_id) => {
607 self.repo
608 .import_remove_member_set(db.tx(), account_set_id, account_set_id)
609 .await?;
610 }
611 }
612 let recorded_at = db.now();
613 self.outbox
614 .persist_events_at(
615 db.into_tx(),
616 std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
617 source: DataSource::Remote { id: origin },
618 account_set_id,
619 member_id,
620 }),
621 recorded_at,
622 )
623 .await?;
624 Ok(())
625 }
626}
627
628fn entries_for_add_balance(
629 entries: &mut Vec<NewEntry>,
630 target_account_id: AccountId,
631 balance: BalanceSnapshot,
632) {
633 use rust_decimal::Decimal;
634
635 if balance.settled.cr_balance != Decimal::ZERO {
636 let entry = NewEntry::builder()
637 .id(EntryId::new())
638 .journal_id(balance.journal_id)
639 .account_id(target_account_id)
640 .currency(balance.currency)
641 .sequence(1u32)
642 .layer(Layer::Settled)
643 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
644 .direction(DebitOrCredit::Credit)
645 .units(balance.settled.cr_balance)
646 .transaction_id(UNASSIGNED_TRANSACTION_ID)
647 .build()
648 .expect("Couldn't build entry");
649 entries.push(entry);
650 }
651 if balance.settled.dr_balance != Decimal::ZERO {
652 let entry = NewEntry::builder()
653 .id(EntryId::new())
654 .journal_id(balance.journal_id)
655 .account_id(target_account_id)
656 .currency(balance.currency)
657 .sequence(1u32)
658 .layer(Layer::Settled)
659 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
660 .direction(DebitOrCredit::Debit)
661 .units(balance.settled.dr_balance)
662 .transaction_id(UNASSIGNED_TRANSACTION_ID)
663 .build()
664 .expect("Couldn't build entry");
665 entries.push(entry);
666 }
667 if balance.pending.cr_balance != Decimal::ZERO {
668 let entry = NewEntry::builder()
669 .id(EntryId::new())
670 .journal_id(balance.journal_id)
671 .account_id(target_account_id)
672 .currency(balance.currency)
673 .sequence(1u32)
674 .layer(Layer::Pending)
675 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
676 .direction(DebitOrCredit::Credit)
677 .units(balance.pending.cr_balance)
678 .transaction_id(UNASSIGNED_TRANSACTION_ID)
679 .build()
680 .expect("Couldn't build entry");
681 entries.push(entry);
682 }
683 if balance.pending.dr_balance != Decimal::ZERO {
684 let entry = NewEntry::builder()
685 .id(EntryId::new())
686 .journal_id(balance.journal_id)
687 .account_id(target_account_id)
688 .currency(balance.currency)
689 .sequence(1u32)
690 .layer(Layer::Pending)
691 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
692 .direction(DebitOrCredit::Debit)
693 .units(balance.pending.dr_balance)
694 .transaction_id(UNASSIGNED_TRANSACTION_ID)
695 .build()
696 .expect("Couldn't build entry");
697 entries.push(entry);
698 }
699 if balance.encumbrance.cr_balance != Decimal::ZERO {
700 let entry = NewEntry::builder()
701 .id(EntryId::new())
702 .journal_id(balance.journal_id)
703 .account_id(target_account_id)
704 .currency(balance.currency)
705 .sequence(1u32)
706 .layer(Layer::Encumbrance)
707 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
708 .direction(DebitOrCredit::Credit)
709 .units(balance.encumbrance.cr_balance)
710 .transaction_id(UNASSIGNED_TRANSACTION_ID)
711 .build()
712 .expect("Couldn't build entry");
713 entries.push(entry);
714 }
715 if balance.encumbrance.dr_balance != Decimal::ZERO {
716 let entry = NewEntry::builder()
717 .id(EntryId::new())
718 .journal_id(balance.journal_id)
719 .account_id(target_account_id)
720 .currency(balance.currency)
721 .sequence(1u32)
722 .layer(Layer::Encumbrance)
723 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
724 .direction(DebitOrCredit::Debit)
725 .units(balance.encumbrance.dr_balance)
726 .transaction_id(UNASSIGNED_TRANSACTION_ID)
727 .build()
728 .expect("Couldn't build entry");
729 entries.push(entry);
730 }
731}
732
733fn entries_for_remove_balance(
734 entries: &mut Vec<NewEntry>,
735 target_account_id: AccountId,
736 balance: BalanceSnapshot,
737) {
738 use rust_decimal::Decimal;
739
740 if balance.settled.cr_balance != Decimal::ZERO {
741 let entry = NewEntry::builder()
742 .id(EntryId::new())
743 .journal_id(balance.journal_id)
744 .account_id(target_account_id)
745 .currency(balance.currency)
746 .sequence(1u32)
747 .layer(Layer::Settled)
748 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
749 .direction(DebitOrCredit::Debit)
750 .units(balance.settled.cr_balance)
751 .transaction_id(UNASSIGNED_TRANSACTION_ID)
752 .build()
753 .expect("Couldn't build entry");
754 entries.push(entry);
755 }
756 if balance.settled.dr_balance != Decimal::ZERO {
757 let entry = NewEntry::builder()
758 .id(EntryId::new())
759 .journal_id(balance.journal_id)
760 .account_id(target_account_id)
761 .currency(balance.currency)
762 .sequence(1u32)
763 .layer(Layer::Settled)
764 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
765 .direction(DebitOrCredit::Credit)
766 .units(balance.settled.dr_balance)
767 .transaction_id(UNASSIGNED_TRANSACTION_ID)
768 .build()
769 .expect("Couldn't build entry");
770 entries.push(entry);
771 }
772 if balance.pending.cr_balance != Decimal::ZERO {
773 let entry = NewEntry::builder()
774 .id(EntryId::new())
775 .journal_id(balance.journal_id)
776 .account_id(target_account_id)
777 .currency(balance.currency)
778 .sequence(1u32)
779 .layer(Layer::Pending)
780 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
781 .direction(DebitOrCredit::Debit)
782 .units(balance.pending.cr_balance)
783 .transaction_id(UNASSIGNED_TRANSACTION_ID)
784 .build()
785 .expect("Couldn't build entry");
786 entries.push(entry);
787 }
788 if balance.pending.dr_balance != Decimal::ZERO {
789 let entry = NewEntry::builder()
790 .id(EntryId::new())
791 .journal_id(balance.journal_id)
792 .account_id(target_account_id)
793 .currency(balance.currency)
794 .sequence(1u32)
795 .layer(Layer::Pending)
796 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
797 .direction(DebitOrCredit::Credit)
798 .units(balance.pending.dr_balance)
799 .transaction_id(UNASSIGNED_TRANSACTION_ID)
800 .build()
801 .expect("Couldn't build entry");
802 entries.push(entry);
803 }
804 if balance.encumbrance.cr_balance != Decimal::ZERO {
805 let entry = NewEntry::builder()
806 .id(EntryId::new())
807 .journal_id(balance.journal_id)
808 .account_id(target_account_id)
809 .currency(balance.currency)
810 .sequence(1u32)
811 .layer(Layer::Encumbrance)
812 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
813 .direction(DebitOrCredit::Debit)
814 .units(balance.encumbrance.cr_balance)
815 .transaction_id(UNASSIGNED_TRANSACTION_ID)
816 .build()
817 .expect("Couldn't build entry");
818 entries.push(entry);
819 }
820 if balance.encumbrance.dr_balance != Decimal::ZERO {
821 let entry = NewEntry::builder()
822 .id(EntryId::new())
823 .journal_id(balance.journal_id)
824 .account_id(target_account_id)
825 .currency(balance.currency)
826 .sequence(1u32)
827 .layer(Layer::Encumbrance)
828 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
829 .direction(DebitOrCredit::Credit)
830 .units(balance.encumbrance.dr_balance)
831 .transaction_id(UNASSIGNED_TRANSACTION_ID)
832 .build()
833 .expect("Couldn't build entry");
834 entries.push(entry);
835 }
836}
837
838impl From<&AccountSetEvent> for OutboxEventPayload {
839 fn from(event: &AccountSetEvent) -> Self {
840 match event {
841 #[cfg(feature = "import")]
842 AccountSetEvent::Imported {
843 source,
844 values: account_set,
845 } => OutboxEventPayload::AccountSetCreated {
846 source: *source,
847 account_set: account_set.clone(),
848 },
849 AccountSetEvent::Initialized {
850 values: account_set,
851 } => OutboxEventPayload::AccountSetCreated {
852 source: DataSource::Local,
853 account_set: account_set.clone(),
854 },
855 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
856 source: DataSource::Local,
857 account_set: values.clone(),
858 fields: fields.clone(),
859 },
860 }
861 }
862}