1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13 account::*,
14 balance::*,
15 entry::*,
16 ledger_operation::*,
17 outbox::*,
18 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
28
29#[derive(Clone)]
30pub struct AccountSets {
31 repo: AccountSetRepo,
32 accounts: Accounts,
33 entries: Entries,
34 balances: Balances,
35 outbox: Outbox,
36 pool: PgPool,
37}
38
39impl AccountSets {
40 pub(crate) fn new(
41 pool: &PgPool,
42 outbox: Outbox,
43 accounts: &Accounts,
44 entries: &Entries,
45 balances: &Balances,
46 ) -> Self {
47 Self {
48 repo: AccountSetRepo::new(pool),
49 outbox,
50 accounts: accounts.clone(),
51 entries: entries.clone(),
52 balances: balances.clone(),
53 pool: pool.clone(),
54 }
55 }
56 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
57 pub async fn create(
58 &self,
59 new_account_set: NewAccountSet,
60 ) -> Result<AccountSet, AccountSetError> {
61 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
62 let account_set = self.create_in_op(&mut op, new_account_set).await?;
63 op.commit().await?;
64 Ok(account_set)
65 }
66
67 #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
68 pub async fn create_in_op(
69 &self,
70 db: &mut LedgerOperation<'_>,
71 new_account_set: NewAccountSet,
72 ) -> Result<AccountSet, AccountSetError> {
73 let new_account = NewAccount::builder()
74 .id(uuid::Uuid::from(new_account_set.id))
75 .name(String::new())
76 .code(new_account_set.id.to_string())
77 .normal_balance_type(new_account_set.normal_balance_type)
78 .is_account_set(true)
79 .build()
80 .expect("Failed to build account");
81 self.accounts.create_in_op(db, new_account).await?;
82 let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
83 db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
84 Ok(account_set)
85 }
86
87 pub async fn create_all(
88 &self,
89 new_account_sets: Vec<NewAccountSet>,
90 ) -> Result<Vec<AccountSet>, AccountSetError> {
91 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
92 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
93 op.commit().await?;
94 Ok(account_sets)
95 }
96
97 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
98 pub async fn create_all_in_op(
99 &self,
100 db: &mut LedgerOperation<'_>,
101 new_account_sets: Vec<NewAccountSet>,
102 ) -> Result<Vec<AccountSet>, AccountSetError> {
103 let mut new_accounts = Vec::new();
104 for new_account_set in new_account_sets.iter() {
105 let new_account = NewAccount::builder()
106 .id(uuid::Uuid::from(new_account_set.id))
107 .name(String::new())
108 .code(new_account_set.id.to_string())
109 .normal_balance_type(new_account_set.normal_balance_type)
110 .is_account_set(true)
111 .build()
112 .expect("Failed to build account");
113 new_accounts.push(new_account);
114 }
115 self.accounts.create_all_in_op(db, new_accounts).await?;
116 let account_sets = self
117 .repo
118 .create_all_in_op(db.op(), new_account_sets)
119 .await?;
120 db.accumulate(
121 account_sets
122 .iter()
123 .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
124 );
125 Ok(account_sets)
126 }
127
128 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
129 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
130 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
131 self.persist_in_op(&mut op, account_set).await?;
132 op.commit().await?;
133 Ok(())
134 }
135
136 #[instrument(
137 name = "cala_ledger.account_sets.persist_in_op",
138 skip(self, db, account_set)
139 )]
140 pub async fn persist_in_op(
141 &self,
142 db: &mut LedgerOperation<'_>,
143 account_set: &mut AccountSet,
144 ) -> Result<(), AccountSetError> {
145 let n_events = self.repo.update_in_op(db.op(), account_set).await?;
146 db.accumulate(
147 account_set
148 .events
149 .last_persisted(n_events)
150 .map(|p| &p.event),
151 );
152 Ok(())
153 }
154
155 pub async fn add_member(
156 &self,
157 account_set_id: AccountSetId,
158 member: impl Into<AccountSetMemberId>,
159 ) -> Result<AccountSet, AccountSetError> {
160 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
161 let account_set = self
162 .add_member_in_op(&mut op, account_set_id, member)
163 .await?;
164 op.commit().await?;
165 Ok(account_set)
166 }
167
168 pub async fn add_member_in_op(
169 &self,
170 op: &mut LedgerOperation<'_>,
171 account_set_id: AccountSetId,
172 member: impl Into<AccountSetMemberId>,
173 ) -> Result<AccountSet, AccountSetError> {
174 let member = member.into();
175 let (time, parents, account_set, member_id) = match member {
176 AccountSetMemberId::Account(id) => {
177 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
178 let (time, parents) = self
179 .repo
180 .add_member_account_and_return_parents(op.tx(), account_set_id, id)
181 .await?;
182 (time, parents, set, id)
183 }
184 AccountSetMemberId::AccountSet(id) => {
185 let mut accounts = self
186 .repo
187 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
188 .await?;
189 let target = accounts
190 .remove(&account_set_id)
191 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
192 let member = accounts
193 .remove(&id)
194 .ok_or(AccountSetError::CouldNotFindById(id))?;
195
196 if target.values().journal_id != member.values().journal_id {
197 return Err(AccountSetError::JournalIdMismatch);
198 }
199
200 let (time, parents) = self
201 .repo
202 .add_member_set_and_return_parents(op.tx(), account_set_id, id)
203 .await?;
204 (time, parents, target, AccountId::from(id))
205 }
206 };
207
208 op.accumulate(std::iter::once(
209 OutboxEventPayload::AccountSetMemberCreated {
210 source: DataSource::Local,
211 account_set_id,
212 member_id: member,
213 },
214 ));
215
216 let balances = self
217 .balances
218 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
219 .await?;
220
221 let target_account_id = AccountId::from(&account_set.id());
222 let mut entries = Vec::new();
223 for balance in balances.into_values() {
224 entries_for_add_balance(&mut entries, target_account_id, balance);
225 }
226
227 if entries.is_empty() {
228 return Ok(account_set);
229 }
230 let entries = self.entries.create_all_in_op(op, entries).await?;
231 let mappings = std::iter::once((target_account_id, parents)).collect();
232 self.balances
233 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
234 .await?;
235
236 Ok(account_set)
237 }
238
239 pub async fn remove_member(
240 &self,
241 account_set_id: AccountSetId,
242 member: impl Into<AccountSetMemberId>,
243 ) -> Result<AccountSet, AccountSetError> {
244 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
245 let account_set = self
246 .remove_member_in_op(&mut op, account_set_id, member)
247 .await?;
248 op.commit().await?;
249 Ok(account_set)
250 }
251
252 pub async fn remove_member_in_op(
253 &self,
254 op: &mut LedgerOperation<'_>,
255 account_set_id: AccountSetId,
256 member: impl Into<AccountSetMemberId>,
257 ) -> Result<AccountSet, AccountSetError> {
258 let member = member.into();
259 let (time, parents, account_set, member_id) = match member {
260 AccountSetMemberId::Account(id) => {
261 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
262 let (time, parents) = self
263 .repo
264 .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
265 .await?;
266 (time, parents, set, id)
267 }
268 AccountSetMemberId::AccountSet(id) => {
269 let mut accounts = self
270 .repo
271 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
272 .await?;
273 let target = accounts
274 .remove(&account_set_id)
275 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
276 let member = accounts
277 .remove(&id)
278 .ok_or(AccountSetError::CouldNotFindById(id))?;
279
280 if target.values().journal_id != member.values().journal_id {
281 return Err(AccountSetError::JournalIdMismatch);
282 }
283
284 let (time, parents) = self
285 .repo
286 .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
287 .await?;
288 (time, parents, target, AccountId::from(id))
289 }
290 };
291
292 op.accumulate(std::iter::once(
293 OutboxEventPayload::AccountSetMemberRemoved {
294 source: DataSource::Local,
295 account_set_id,
296 member_id: member,
297 },
298 ));
299
300 let balances = self
301 .balances
302 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
303 .await?;
304
305 let target_account_id = AccountId::from(&account_set.id());
306 let mut entries = Vec::new();
307 for balance in balances.into_values() {
308 entries_for_remove_balance(&mut entries, target_account_id, balance);
309 }
310
311 if entries.is_empty() {
312 return Ok(account_set);
313 }
314 let entries = self.entries.create_all_in_op(op, entries).await?;
315 let mappings = std::iter::once((target_account_id, parents)).collect();
316 self.balances
317 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
318 .await?;
319
320 Ok(account_set)
321 }
322
323 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
324 pub async fn find_all<T: From<AccountSet>>(
325 &self,
326 account_set_ids: &[AccountSetId],
327 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
328 self.repo.find_all(account_set_ids).await
329 }
330
331 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
332 pub async fn find_all_in_op<T: From<AccountSet>>(
333 &self,
334 op: &mut LedgerOperation<'_>,
335 account_set_ids: &[AccountSetId],
336 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
337 self.repo.find_all_in_tx(op.tx(), account_set_ids).await
338 }
339
340 #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
341 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
342 self.repo.find_by_id(account_set_id).await
343 }
344
345 #[instrument(
346 name = "cala_ledger.accounts_sets.find_by_external_id",
347 skip(self),
348 err
349 )]
350 pub async fn find_by_external_id(
351 &self,
352 external_id: String,
353 ) -> Result<AccountSet, AccountSetError> {
354 self.repo.find_by_external_id(Some(external_id)).await
355 }
356
357 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
358 pub async fn find_where_member(
359 &self,
360 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
361 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
362 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
363 {
364 match member.into() {
365 AccountSetMemberId::Account(account_id) => {
366 self.repo
367 .find_where_account_is_member(account_id, query)
368 .await
369 }
370 AccountSetMemberId::AccountSet(account_set_id) => {
371 self.repo
372 .find_where_account_set_is_member(account_set_id, query)
373 .await
374 }
375 }
376 }
377
378 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
379 pub async fn list_for_name(
380 &self,
381 name: String,
382 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
383 ) -> Result<
384 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
385 AccountSetError,
386 > {
387 self.repo
388 .list_for_name_by_created_at(name, args, Default::default())
389 .await
390 }
391
392 #[instrument(
393 name = "cala_ledger.account_sets.list_for_name_in_op",
394 skip(self, op),
395 err
396 )]
397 pub async fn list_for_name_in_op(
398 &self,
399 op: &mut LedgerOperation<'_>,
400 name: String,
401 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
402 ) -> Result<
403 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
404 AccountSetError,
405 > {
406 self.repo
407 .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
408 .await
409 }
410
411 #[instrument(
412 name = "cala_ledger.account_sets.find_where_member_in_op",
413 skip(self, op),
414 err
415 )]
416 pub async fn find_where_member_in_op(
417 &self,
418 op: &mut LedgerOperation<'_>,
419 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
420 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
421 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
422 {
423 match member.into() {
424 AccountSetMemberId::Account(account_id) => {
425 self.repo
426 .find_where_account_is_member_in_tx(op.tx(), account_id, query)
427 .await
428 }
429 AccountSetMemberId::AccountSet(account_set_id) => {
430 self.repo
431 .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
432 .await
433 }
434 }
435 }
436
437 pub async fn list_members_by_created_at(
438 &self,
439 id: AccountSetId,
440 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
441 ) -> Result<
442 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
443 AccountSetError,
444 > {
445 self.repo.list_children_by_created_at(id, args).await
446 }
447
448 pub async fn list_members_created_at_in_op(
449 &self,
450 op: &mut LedgerOperation<'_>,
451 id: AccountSetId,
452 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
453 ) -> Result<
454 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
455 AccountSetError,
456 > {
457 self.repo
458 .list_children_by_created_at_in_tx(op.tx(), id, args)
459 .await
460 }
461
462 pub async fn list_members_by_external_id(
463 &self,
464 id: AccountSetId,
465 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
466 ) -> Result<
467 es_entity::PaginatedQueryRet<
468 AccountSetMemberByExternalId,
469 AccountSetMembersByExternalIdCursor,
470 >,
471 AccountSetError,
472 > {
473 self.repo.list_children_by_external_id(id, args).await
474 }
475
476 pub async fn list_members_by_external_id_in_op(
477 &self,
478 op: &mut LedgerOperation<'_>,
479 id: AccountSetId,
480 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
481 ) -> Result<
482 es_entity::PaginatedQueryRet<
483 AccountSetMemberByExternalId,
484 AccountSetMembersByExternalIdCursor,
485 >,
486 AccountSetError,
487 > {
488 self.repo
489 .list_children_by_external_id_in_tx(op.tx(), id, args)
490 .await
491 }
492
493 pub(crate) async fn fetch_mappings(
494 &self,
495 journal_id: JournalId,
496 account_ids: &[AccountId],
497 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
498 self.repo.fetch_mappings(journal_id, account_ids).await
499 }
500
501 #[cfg(feature = "import")]
502 pub(crate) async fn sync_account_set_creation(
503 &self,
504 mut db: es_entity::DbOp<'_>,
505 origin: DataSourceId,
506 values: AccountSetValues,
507 ) -> Result<(), AccountSetError> {
508 let mut account_set = AccountSet::import(origin, values);
509 self.repo
510 .import_in_op(&mut db, origin, &mut account_set)
511 .await?;
512 let recorded_at = db.now();
513 let outbox_events: Vec<_> = account_set
514 .events
515 .last_persisted(1)
516 .map(|p| OutboxEventPayload::from(&p.event))
517 .collect();
518 self.outbox
519 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
520 .await?;
521 Ok(())
522 }
523
524 #[cfg(feature = "import")]
525 pub(crate) async fn sync_account_set_update(
526 &self,
527 mut db: es_entity::DbOp<'_>,
528 values: AccountSetValues,
529 fields: Vec<String>,
530 ) -> Result<(), AccountSetError> {
531 let mut account_set = self.repo.find_by_id(values.id).await?;
532 account_set.update((values, fields));
533 let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
534 let recorded_at = db.now();
535 let outbox_events: Vec<_> = account_set
536 .events
537 .last_persisted(n_events)
538 .map(|p| OutboxEventPayload::from(&p.event))
539 .collect();
540 self.outbox
541 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
542 .await?;
543 Ok(())
544 }
545
546 #[cfg(feature = "import")]
547 pub(crate) async fn sync_account_set_member_creation(
548 &self,
549 mut db: es_entity::DbOp<'_>,
550 origin: DataSourceId,
551 account_set_id: AccountSetId,
552 member_id: AccountSetMemberId,
553 ) -> Result<(), AccountSetError> {
554 match member_id {
555 AccountSetMemberId::Account(account_id) => {
556 self.repo
557 .import_member_account_in_op(&mut db, account_set_id, account_id)
558 .await?;
559 }
560 AccountSetMemberId::AccountSet(account_set_id) => {
561 self.repo
562 .import_member_set_in_op(&mut db, account_set_id, account_set_id)
563 .await?;
564 }
565 }
566 let recorded_at = db.now();
567 self.outbox
568 .persist_events_at(
569 db.into_tx(),
570 std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
571 source: DataSource::Remote { id: origin },
572 account_set_id,
573 member_id,
574 }),
575 recorded_at,
576 )
577 .await?;
578 Ok(())
579 }
580
581 #[cfg(feature = "import")]
582 pub(crate) async fn sync_account_set_member_removal(
583 &self,
584 mut db: es_entity::DbOp<'_>,
585 origin: DataSourceId,
586 account_set_id: AccountSetId,
587 member_id: AccountSetMemberId,
588 ) -> Result<(), AccountSetError> {
589 match member_id {
590 AccountSetMemberId::Account(account_id) => {
591 self.repo
592 .import_remove_member_account(db.tx(), account_set_id, account_id)
593 .await?;
594 }
595 AccountSetMemberId::AccountSet(account_set_id) => {
596 self.repo
597 .import_remove_member_set(db.tx(), account_set_id, account_set_id)
598 .await?;
599 }
600 }
601 let recorded_at = db.now();
602 self.outbox
603 .persist_events_at(
604 db.into_tx(),
605 std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
606 source: DataSource::Remote { id: origin },
607 account_set_id,
608 member_id,
609 }),
610 recorded_at,
611 )
612 .await?;
613 Ok(())
614 }
615}
616
617fn entries_for_add_balance(
618 entries: &mut Vec<NewEntry>,
619 target_account_id: AccountId,
620 balance: BalanceSnapshot,
621) {
622 use rust_decimal::Decimal;
623
624 if balance.settled.cr_balance != Decimal::ZERO {
625 let entry = NewEntry::builder()
626 .id(EntryId::new())
627 .journal_id(balance.journal_id)
628 .account_id(target_account_id)
629 .currency(balance.currency)
630 .sequence(1u32)
631 .layer(Layer::Settled)
632 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
633 .direction(DebitOrCredit::Credit)
634 .units(balance.settled.cr_balance)
635 .transaction_id(UNASSIGNED_TRANSACTION_ID)
636 .build()
637 .expect("Couldn't build entry");
638 entries.push(entry);
639 }
640 if balance.settled.dr_balance != Decimal::ZERO {
641 let entry = NewEntry::builder()
642 .id(EntryId::new())
643 .journal_id(balance.journal_id)
644 .account_id(target_account_id)
645 .currency(balance.currency)
646 .sequence(1u32)
647 .layer(Layer::Settled)
648 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
649 .direction(DebitOrCredit::Debit)
650 .units(balance.settled.dr_balance)
651 .transaction_id(UNASSIGNED_TRANSACTION_ID)
652 .build()
653 .expect("Couldn't build entry");
654 entries.push(entry);
655 }
656 if balance.pending.cr_balance != Decimal::ZERO {
657 let entry = NewEntry::builder()
658 .id(EntryId::new())
659 .journal_id(balance.journal_id)
660 .account_id(target_account_id)
661 .currency(balance.currency)
662 .sequence(1u32)
663 .layer(Layer::Pending)
664 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
665 .direction(DebitOrCredit::Credit)
666 .units(balance.pending.cr_balance)
667 .transaction_id(UNASSIGNED_TRANSACTION_ID)
668 .build()
669 .expect("Couldn't build entry");
670 entries.push(entry);
671 }
672 if balance.pending.dr_balance != Decimal::ZERO {
673 let entry = NewEntry::builder()
674 .id(EntryId::new())
675 .journal_id(balance.journal_id)
676 .account_id(target_account_id)
677 .currency(balance.currency)
678 .sequence(1u32)
679 .layer(Layer::Pending)
680 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
681 .direction(DebitOrCredit::Debit)
682 .units(balance.pending.dr_balance)
683 .transaction_id(UNASSIGNED_TRANSACTION_ID)
684 .build()
685 .expect("Couldn't build entry");
686 entries.push(entry);
687 }
688 if balance.encumbrance.cr_balance != Decimal::ZERO {
689 let entry = NewEntry::builder()
690 .id(EntryId::new())
691 .journal_id(balance.journal_id)
692 .account_id(target_account_id)
693 .currency(balance.currency)
694 .sequence(1u32)
695 .layer(Layer::Encumbrance)
696 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
697 .direction(DebitOrCredit::Credit)
698 .units(balance.encumbrance.cr_balance)
699 .transaction_id(UNASSIGNED_TRANSACTION_ID)
700 .build()
701 .expect("Couldn't build entry");
702 entries.push(entry);
703 }
704 if balance.encumbrance.dr_balance != Decimal::ZERO {
705 let entry = NewEntry::builder()
706 .id(EntryId::new())
707 .journal_id(balance.journal_id)
708 .account_id(target_account_id)
709 .currency(balance.currency)
710 .sequence(1u32)
711 .layer(Layer::Encumbrance)
712 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
713 .direction(DebitOrCredit::Debit)
714 .units(balance.encumbrance.dr_balance)
715 .transaction_id(UNASSIGNED_TRANSACTION_ID)
716 .build()
717 .expect("Couldn't build entry");
718 entries.push(entry);
719 }
720}
721
722fn entries_for_remove_balance(
723 entries: &mut Vec<NewEntry>,
724 target_account_id: AccountId,
725 balance: BalanceSnapshot,
726) {
727 use rust_decimal::Decimal;
728
729 if balance.settled.cr_balance != Decimal::ZERO {
730 let entry = NewEntry::builder()
731 .id(EntryId::new())
732 .journal_id(balance.journal_id)
733 .account_id(target_account_id)
734 .currency(balance.currency)
735 .sequence(1u32)
736 .layer(Layer::Settled)
737 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
738 .direction(DebitOrCredit::Debit)
739 .units(balance.settled.cr_balance)
740 .transaction_id(UNASSIGNED_TRANSACTION_ID)
741 .build()
742 .expect("Couldn't build entry");
743 entries.push(entry);
744 }
745 if balance.settled.dr_balance != Decimal::ZERO {
746 let entry = NewEntry::builder()
747 .id(EntryId::new())
748 .journal_id(balance.journal_id)
749 .account_id(target_account_id)
750 .currency(balance.currency)
751 .sequence(1u32)
752 .layer(Layer::Settled)
753 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
754 .direction(DebitOrCredit::Credit)
755 .units(balance.settled.dr_balance)
756 .transaction_id(UNASSIGNED_TRANSACTION_ID)
757 .build()
758 .expect("Couldn't build entry");
759 entries.push(entry);
760 }
761 if balance.pending.cr_balance != Decimal::ZERO {
762 let entry = NewEntry::builder()
763 .id(EntryId::new())
764 .journal_id(balance.journal_id)
765 .account_id(target_account_id)
766 .currency(balance.currency)
767 .sequence(1u32)
768 .layer(Layer::Pending)
769 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
770 .direction(DebitOrCredit::Debit)
771 .units(balance.pending.cr_balance)
772 .transaction_id(UNASSIGNED_TRANSACTION_ID)
773 .build()
774 .expect("Couldn't build entry");
775 entries.push(entry);
776 }
777 if balance.pending.dr_balance != Decimal::ZERO {
778 let entry = NewEntry::builder()
779 .id(EntryId::new())
780 .journal_id(balance.journal_id)
781 .account_id(target_account_id)
782 .currency(balance.currency)
783 .sequence(1u32)
784 .layer(Layer::Pending)
785 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
786 .direction(DebitOrCredit::Credit)
787 .units(balance.pending.dr_balance)
788 .transaction_id(UNASSIGNED_TRANSACTION_ID)
789 .build()
790 .expect("Couldn't build entry");
791 entries.push(entry);
792 }
793 if balance.encumbrance.cr_balance != Decimal::ZERO {
794 let entry = NewEntry::builder()
795 .id(EntryId::new())
796 .journal_id(balance.journal_id)
797 .account_id(target_account_id)
798 .currency(balance.currency)
799 .sequence(1u32)
800 .layer(Layer::Encumbrance)
801 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
802 .direction(DebitOrCredit::Debit)
803 .units(balance.encumbrance.cr_balance)
804 .transaction_id(UNASSIGNED_TRANSACTION_ID)
805 .build()
806 .expect("Couldn't build entry");
807 entries.push(entry);
808 }
809 if balance.encumbrance.dr_balance != Decimal::ZERO {
810 let entry = NewEntry::builder()
811 .id(EntryId::new())
812 .journal_id(balance.journal_id)
813 .account_id(target_account_id)
814 .currency(balance.currency)
815 .sequence(1u32)
816 .layer(Layer::Encumbrance)
817 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
818 .direction(DebitOrCredit::Credit)
819 .units(balance.encumbrance.dr_balance)
820 .transaction_id(UNASSIGNED_TRANSACTION_ID)
821 .build()
822 .expect("Couldn't build entry");
823 entries.push(entry);
824 }
825}
826
827impl From<&AccountSetEvent> for OutboxEventPayload {
828 fn from(event: &AccountSetEvent) -> Self {
829 match event {
830 #[cfg(feature = "import")]
831 AccountSetEvent::Imported {
832 source,
833 values: account_set,
834 } => OutboxEventPayload::AccountSetCreated {
835 source: *source,
836 account_set: account_set.clone(),
837 },
838 AccountSetEvent::Initialized {
839 values: account_set,
840 } => OutboxEventPayload::AccountSetCreated {
841 source: DataSource::Local,
842 account_set: account_set.clone(),
843 },
844 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
845 source: DataSource::Local,
846 account_set: values.clone(),
847 fields: fields.clone(),
848 },
849 }
850 }
851}