1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14 account::*,
15 balance::*,
16 entry::*,
17 ledger_operation::*,
18 outbox::*,
19 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
20};
21
22pub use cursor::*;
23pub use entity::*;
24use error::*;
25use repo::*;
26pub use repo::{account_set_cursor::*, members_cursor::*};
27
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32 repo: AccountSetRepo,
33 accounts: Accounts,
34 entries: Entries,
35 balances: Balances,
36 outbox: Outbox,
37 pool: PgPool,
38}
39
40impl AccountSets {
41 pub(crate) fn new(
42 pool: &PgPool,
43 outbox: Outbox,
44 accounts: &Accounts,
45 entries: &Entries,
46 balances: &Balances,
47 ) -> Self {
48 Self {
49 repo: AccountSetRepo::new(pool),
50 outbox,
51 accounts: accounts.clone(),
52 entries: entries.clone(),
53 balances: balances.clone(),
54 pool: pool.clone(),
55 }
56 }
57 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58 pub async fn create(
59 &self,
60 new_account_set: NewAccountSet,
61 ) -> Result<AccountSet, AccountSetError> {
62 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63 let account_set = self.create_in_op(&mut op, new_account_set).await?;
64 op.commit().await?;
65 Ok(account_set)
66 }
67
68 #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69 pub async fn create_in_op(
70 &self,
71 db: &mut LedgerOperation<'_>,
72 new_account_set: NewAccountSet,
73 ) -> Result<AccountSet, AccountSetError> {
74 let new_account = NewAccount::builder()
75 .id(uuid::Uuid::from(new_account_set.id))
76 .name(String::new())
77 .code(new_account_set.id.to_string())
78 .normal_balance_type(new_account_set.normal_balance_type)
79 .is_account_set(true)
80 .build()
81 .expect("Failed to build account");
82 self.accounts.create_in_op(db, new_account).await?;
83 let account_set = self.repo.create_in_op(db, new_account_set).await?;
84 db.accumulate(account_set.last_persisted(1).map(|p| &p.event));
85 Ok(account_set)
86 }
87
88 pub async fn create_all(
89 &self,
90 new_account_sets: Vec<NewAccountSet>,
91 ) -> Result<Vec<AccountSet>, AccountSetError> {
92 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94 op.commit().await?;
95 Ok(account_sets)
96 }
97
98 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99 pub async fn create_all_in_op(
100 &self,
101 db: &mut LedgerOperation<'_>,
102 new_account_sets: Vec<NewAccountSet>,
103 ) -> Result<Vec<AccountSet>, AccountSetError> {
104 let mut new_accounts = Vec::new();
105 for new_account_set in new_account_sets.iter() {
106 let new_account = NewAccount::builder()
107 .id(uuid::Uuid::from(new_account_set.id))
108 .name(String::new())
109 .code(new_account_set.id.to_string())
110 .normal_balance_type(new_account_set.normal_balance_type)
111 .is_account_set(true)
112 .build()
113 .expect("Failed to build account");
114 new_accounts.push(new_account);
115 }
116 self.accounts.create_all_in_op(db, new_accounts).await?;
117 let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
118 db.accumulate(
119 account_sets
120 .iter()
121 .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
122 );
123 Ok(account_sets)
124 }
125
126 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
127 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
128 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
129 self.persist_in_op(&mut op, account_set).await?;
130 op.commit().await?;
131 Ok(())
132 }
133
134 #[instrument(
135 name = "cala_ledger.account_sets.persist_in_op",
136 skip(self, db, account_set)
137 )]
138 pub async fn persist_in_op(
139 &self,
140 db: &mut LedgerOperation<'_>,
141 account_set: &mut AccountSet,
142 ) -> Result<(), AccountSetError> {
143 let n_events = self.repo.update_in_op(db, account_set).await?;
144 db.accumulate(account_set.last_persisted(n_events).map(|p| &p.event));
145 Ok(())
146 }
147
148 pub async fn add_member(
149 &self,
150 account_set_id: AccountSetId,
151 member: impl Into<AccountSetMemberId>,
152 ) -> Result<AccountSet, AccountSetError> {
153 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
154 let account_set = self
155 .add_member_in_op(&mut op, account_set_id, member)
156 .await?;
157 op.commit().await?;
158 Ok(account_set)
159 }
160
161 pub async fn add_member_in_op(
162 &self,
163 op: &mut LedgerOperation<'_>,
164 account_set_id: AccountSetId,
165 member: impl Into<AccountSetMemberId>,
166 ) -> Result<AccountSet, AccountSetError> {
167 let member = member.into();
168 let (time, parents, account_set, member_id) = match member {
169 AccountSetMemberId::Account(id) => {
170 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
171 let (time, parents) = self
172 .repo
173 .add_member_account_and_return_parents(&mut *op, account_set_id, id)
174 .await?;
175 (time, parents, set, id)
176 }
177 AccountSetMemberId::AccountSet(id) => {
178 let mut accounts = self
179 .repo
180 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
181 .await?;
182 let target = accounts
183 .remove(&account_set_id)
184 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
185 let member = accounts
186 .remove(&id)
187 .ok_or(AccountSetError::CouldNotFindById(id))?;
188
189 if target.values().journal_id != member.values().journal_id {
190 return Err(AccountSetError::JournalIdMismatch);
191 }
192
193 let (time, parents) = self
194 .repo
195 .add_member_set_and_return_parents(op, account_set_id, id)
196 .await?;
197 (time, parents, target, AccountId::from(id))
198 }
199 };
200
201 op.accumulate(std::iter::once(
202 OutboxEventPayload::AccountSetMemberCreated {
203 source: DataSource::Local,
204 account_set_id,
205 member_id: member,
206 },
207 ));
208
209 let balances = self
210 .balances
211 .find_balances_for_update(op, account_set.values().journal_id, member_id)
212 .await?;
213
214 let target_account_id = AccountId::from(&account_set.id());
215 let mut entries = Vec::new();
216 for balance in balances.into_values() {
217 entries_for_add_balance(&mut entries, target_account_id, balance);
218 }
219
220 if entries.is_empty() {
221 return Ok(account_set);
222 }
223 let entries = self.entries.create_all_in_op(op, entries).await?;
224 let mappings = std::iter::once((target_account_id, parents)).collect();
225 self.balances
226 .update_balances_in_op(
227 op,
228 account_set.values().journal_id,
229 entries,
230 time.date_naive(),
231 time,
232 mappings,
233 )
234 .await?;
235
236 Ok(account_set)
237 }
238
239 pub async fn remove_member(
240 &self,
241 account_set_id: AccountSetId,
242 member: impl Into<AccountSetMemberId>,
243 ) -> Result<AccountSet, AccountSetError> {
244 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
245 let account_set = self
246 .remove_member_in_op(&mut op, account_set_id, member)
247 .await?;
248 op.commit().await?;
249 Ok(account_set)
250 }
251
252 pub async fn remove_member_in_op(
253 &self,
254 op: &mut LedgerOperation<'_>,
255 account_set_id: AccountSetId,
256 member: impl Into<AccountSetMemberId>,
257 ) -> Result<AccountSet, AccountSetError> {
258 let member = member.into();
259 let (time, parents, account_set, member_id) = match member {
260 AccountSetMemberId::Account(id) => {
261 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
262 let (time, parents) = self
263 .repo
264 .remove_member_account_and_return_parents(op, account_set_id, id)
265 .await?;
266 (time, parents, set, id)
267 }
268 AccountSetMemberId::AccountSet(id) => {
269 let mut accounts = self
270 .repo
271 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
272 .await?;
273 let target = accounts
274 .remove(&account_set_id)
275 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
276 let member = accounts
277 .remove(&id)
278 .ok_or(AccountSetError::CouldNotFindById(id))?;
279
280 if target.values().journal_id != member.values().journal_id {
281 return Err(AccountSetError::JournalIdMismatch);
282 }
283
284 let (time, parents) = self
285 .repo
286 .remove_member_set_and_return_parents(op, account_set_id, id)
287 .await?;
288 (time, parents, target, AccountId::from(id))
289 }
290 };
291
292 op.accumulate(std::iter::once(
293 OutboxEventPayload::AccountSetMemberRemoved {
294 source: DataSource::Local,
295 account_set_id,
296 member_id: member,
297 },
298 ));
299
300 let balances = self
301 .balances
302 .find_balances_for_update(op, account_set.values().journal_id, member_id)
303 .await?;
304
305 let target_account_id = AccountId::from(&account_set.id());
306 let mut entries = Vec::new();
307 for balance in balances.into_values() {
308 entries_for_remove_balance(&mut entries, target_account_id, balance);
309 }
310
311 if entries.is_empty() {
312 return Ok(account_set);
313 }
314 let entries = self.entries.create_all_in_op(op, entries).await?;
315 let mappings = std::iter::once((target_account_id, parents)).collect();
316 self.balances
317 .update_balances_in_op(
318 op,
319 account_set.values().journal_id,
320 entries,
321 time.date_naive(),
322 time,
323 mappings,
324 )
325 .await?;
326
327 Ok(account_set)
328 }
329
330 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
331 pub async fn find_all<T: From<AccountSet>>(
332 &self,
333 account_set_ids: &[AccountSetId],
334 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
335 self.repo.find_all(account_set_ids).await
336 }
337
338 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
339 pub async fn find_all_in_op<T: From<AccountSet>>(
340 &self,
341 op: &mut LedgerOperation<'_>,
342 account_set_ids: &[AccountSetId],
343 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
344 self.repo.find_all_in_op(op, account_set_ids).await
345 }
346
347 #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
348 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
349 self.repo.find_by_id(account_set_id).await
350 }
351
352 #[instrument(
353 name = "cala_ledger.accounts_sets.find_by_external_id",
354 skip(self),
355 err
356 )]
357 pub async fn find_by_external_id(
358 &self,
359 external_id: String,
360 ) -> Result<AccountSet, AccountSetError> {
361 self.repo.find_by_external_id(Some(external_id)).await
362 }
363
364 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
365 pub async fn find_where_member(
366 &self,
367 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
368 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
369 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
370 {
371 match member.into() {
372 AccountSetMemberId::Account(account_id) => {
373 self.repo
374 .find_where_account_is_member(account_id, query)
375 .await
376 }
377 AccountSetMemberId::AccountSet(account_set_id) => {
378 self.repo
379 .find_where_account_set_is_member(account_set_id, query)
380 .await
381 }
382 }
383 }
384
385 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
386 pub async fn list_for_name(
387 &self,
388 name: String,
389 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
390 ) -> Result<
391 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
392 AccountSetError,
393 > {
394 self.repo
395 .list_for_name_by_created_at(name, args, Default::default())
396 .await
397 }
398
399 #[instrument(
400 name = "cala_ledger.account_sets.list_for_name_in_op",
401 skip(self, op),
402 err
403 )]
404 pub async fn list_for_name_in_op(
405 &self,
406 op: &mut LedgerOperation<'_>,
407 name: String,
408 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
409 ) -> Result<
410 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
411 AccountSetError,
412 > {
413 self.repo
414 .list_for_name_by_created_at_in_op(op, name, args, Default::default())
415 .await
416 }
417
418 #[instrument(
419 name = "cala_ledger.account_sets.find_where_member_in_op",
420 skip(self, op),
421 err
422 )]
423 pub async fn find_where_member_in_op(
424 &self,
425 op: &mut LedgerOperation<'_>,
426 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
427 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
428 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
429 {
430 match member.into() {
431 AccountSetMemberId::Account(account_id) => {
432 self.repo
433 .find_where_account_is_member_in_op(op, account_id, query)
434 .await
435 }
436 AccountSetMemberId::AccountSet(account_set_id) => {
437 self.repo
438 .find_where_account_set_is_member_in_op(op, account_set_id, query)
439 .await
440 }
441 }
442 }
443
444 pub async fn list_members_by_created_at(
445 &self,
446 id: AccountSetId,
447 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
448 ) -> Result<
449 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
450 AccountSetError,
451 > {
452 self.repo.list_children_by_created_at(id, args).await
453 }
454
455 pub async fn list_members_by_created_at_in_op(
456 &self,
457 op: &mut LedgerOperation<'_>,
458 id: AccountSetId,
459 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
460 ) -> Result<
461 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
462 AccountSetError,
463 > {
464 self.repo
465 .list_children_by_created_at_in_op(op, id, args)
466 .await
467 }
468
469 pub async fn list_members_by_external_id(
470 &self,
471 id: AccountSetId,
472 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
473 ) -> Result<
474 es_entity::PaginatedQueryRet<
475 AccountSetMemberByExternalId,
476 AccountSetMembersByExternalIdCursor,
477 >,
478 AccountSetError,
479 > {
480 self.repo.list_children_by_external_id(id, args).await
481 }
482
483 pub async fn list_members_by_external_id_in_op(
484 &self,
485 op: &mut LedgerOperation<'_>,
486 id: AccountSetId,
487 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
488 ) -> Result<
489 es_entity::PaginatedQueryRet<
490 AccountSetMemberByExternalId,
491 AccountSetMembersByExternalIdCursor,
492 >,
493 AccountSetError,
494 > {
495 self.repo
496 .list_children_by_external_id_in_op(op, id, args)
497 .await
498 }
499
500 pub(crate) async fn fetch_mappings_in_op(
501 &self,
502 op: &mut LedgerOperation<'_>,
503 journal_id: JournalId,
504 account_ids: &[AccountId],
505 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
506 self.repo
507 .fetch_mappings_in_op(op, journal_id, account_ids)
508 .await
509 }
510
511 #[cfg(feature = "import")]
512 pub(crate) async fn sync_account_set_creation(
513 &self,
514 mut db: es_entity::DbOpWithTime<'_>,
515 origin: DataSourceId,
516 values: AccountSetValues,
517 ) -> Result<(), AccountSetError> {
518 let mut account_set = AccountSet::import(origin, values);
519 self.repo
520 .import_in_op(&mut db, origin, &mut account_set)
521 .await?;
522 let outbox_events: Vec<_> = account_set
523 .last_persisted(1)
524 .map(|p| OutboxEventPayload::from(&p.event))
525 .collect();
526 let time = db.now();
527 self.outbox
528 .persist_events_at(db, outbox_events, time)
529 .await?;
530 Ok(())
531 }
532
533 #[cfg(feature = "import")]
534 pub(crate) async fn sync_account_set_update(
535 &self,
536 mut db: es_entity::DbOpWithTime<'_>,
537 values: AccountSetValues,
538 fields: Vec<String>,
539 ) -> Result<(), AccountSetError> {
540 let mut account_set = self.repo.find_by_id(values.id).await?;
541 account_set.update((values, fields));
542 let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
543 let outbox_events: Vec<_> = account_set
544 .last_persisted(n_events)
545 .map(|p| OutboxEventPayload::from(&p.event))
546 .collect();
547 let time = db.now();
548 self.outbox
549 .persist_events_at(db, outbox_events, time)
550 .await?;
551 Ok(())
552 }
553
554 #[cfg(feature = "import")]
555 pub(crate) async fn sync_account_set_member_creation(
556 &self,
557 mut db: es_entity::DbOpWithTime<'_>,
558 origin: DataSourceId,
559 account_set_id: AccountSetId,
560 member_id: AccountSetMemberId,
561 ) -> Result<(), AccountSetError> {
562 match member_id {
563 AccountSetMemberId::Account(account_id) => {
564 self.repo
565 .import_member_account_in_op(&mut db, account_set_id, account_id)
566 .await?;
567 }
568 AccountSetMemberId::AccountSet(account_set_id) => {
569 self.repo
570 .import_member_set_in_op(&mut db, account_set_id, account_set_id)
571 .await?;
572 }
573 }
574 let time = db.now();
575 self.outbox
576 .persist_events_at(
577 db,
578 std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
579 source: DataSource::Remote { id: origin },
580 account_set_id,
581 member_id,
582 }),
583 time,
584 )
585 .await?;
586 Ok(())
587 }
588
589 #[cfg(feature = "import")]
590 pub(crate) async fn sync_account_set_member_removal(
591 &self,
592 mut db: es_entity::DbOpWithTime<'_>,
593 origin: DataSourceId,
594 account_set_id: AccountSetId,
595 member_id: AccountSetMemberId,
596 ) -> Result<(), AccountSetError> {
597 match member_id {
598 AccountSetMemberId::Account(account_id) => {
599 self.repo
600 .import_remove_member_account(&mut db, account_set_id, account_id)
601 .await?;
602 }
603 AccountSetMemberId::AccountSet(account_set_id) => {
604 self.repo
605 .import_remove_member_set(&mut db, account_set_id, account_set_id)
606 .await?;
607 }
608 }
609 let time = db.now();
610 self.outbox
611 .persist_events_at(
612 db,
613 std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
614 source: DataSource::Remote { id: origin },
615 account_set_id,
616 member_id,
617 }),
618 time,
619 )
620 .await?;
621 Ok(())
622 }
623}
624
625fn entries_for_add_balance(
626 entries: &mut Vec<NewEntry>,
627 target_account_id: AccountId,
628 balance: BalanceSnapshot,
629) {
630 use rust_decimal::Decimal;
631
632 if balance.settled.cr_balance != Decimal::ZERO {
633 let entry = NewEntry::builder()
634 .id(EntryId::new())
635 .journal_id(balance.journal_id)
636 .account_id(target_account_id)
637 .currency(balance.currency)
638 .sequence(1u32)
639 .layer(Layer::Settled)
640 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
641 .direction(DebitOrCredit::Credit)
642 .units(balance.settled.cr_balance)
643 .transaction_id(UNASSIGNED_TRANSACTION_ID)
644 .build()
645 .expect("Couldn't build entry");
646 entries.push(entry);
647 }
648 if balance.settled.dr_balance != Decimal::ZERO {
649 let entry = NewEntry::builder()
650 .id(EntryId::new())
651 .journal_id(balance.journal_id)
652 .account_id(target_account_id)
653 .currency(balance.currency)
654 .sequence(1u32)
655 .layer(Layer::Settled)
656 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
657 .direction(DebitOrCredit::Debit)
658 .units(balance.settled.dr_balance)
659 .transaction_id(UNASSIGNED_TRANSACTION_ID)
660 .build()
661 .expect("Couldn't build entry");
662 entries.push(entry);
663 }
664 if balance.pending.cr_balance != Decimal::ZERO {
665 let entry = NewEntry::builder()
666 .id(EntryId::new())
667 .journal_id(balance.journal_id)
668 .account_id(target_account_id)
669 .currency(balance.currency)
670 .sequence(1u32)
671 .layer(Layer::Pending)
672 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
673 .direction(DebitOrCredit::Credit)
674 .units(balance.pending.cr_balance)
675 .transaction_id(UNASSIGNED_TRANSACTION_ID)
676 .build()
677 .expect("Couldn't build entry");
678 entries.push(entry);
679 }
680 if balance.pending.dr_balance != Decimal::ZERO {
681 let entry = NewEntry::builder()
682 .id(EntryId::new())
683 .journal_id(balance.journal_id)
684 .account_id(target_account_id)
685 .currency(balance.currency)
686 .sequence(1u32)
687 .layer(Layer::Pending)
688 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
689 .direction(DebitOrCredit::Debit)
690 .units(balance.pending.dr_balance)
691 .transaction_id(UNASSIGNED_TRANSACTION_ID)
692 .build()
693 .expect("Couldn't build entry");
694 entries.push(entry);
695 }
696 if balance.encumbrance.cr_balance != Decimal::ZERO {
697 let entry = NewEntry::builder()
698 .id(EntryId::new())
699 .journal_id(balance.journal_id)
700 .account_id(target_account_id)
701 .currency(balance.currency)
702 .sequence(1u32)
703 .layer(Layer::Encumbrance)
704 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
705 .direction(DebitOrCredit::Credit)
706 .units(balance.encumbrance.cr_balance)
707 .transaction_id(UNASSIGNED_TRANSACTION_ID)
708 .build()
709 .expect("Couldn't build entry");
710 entries.push(entry);
711 }
712 if balance.encumbrance.dr_balance != Decimal::ZERO {
713 let entry = NewEntry::builder()
714 .id(EntryId::new())
715 .journal_id(balance.journal_id)
716 .account_id(target_account_id)
717 .currency(balance.currency)
718 .sequence(1u32)
719 .layer(Layer::Encumbrance)
720 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
721 .direction(DebitOrCredit::Debit)
722 .units(balance.encumbrance.dr_balance)
723 .transaction_id(UNASSIGNED_TRANSACTION_ID)
724 .build()
725 .expect("Couldn't build entry");
726 entries.push(entry);
727 }
728}
729
730fn entries_for_remove_balance(
731 entries: &mut Vec<NewEntry>,
732 target_account_id: AccountId,
733 balance: BalanceSnapshot,
734) {
735 use rust_decimal::Decimal;
736
737 if balance.settled.cr_balance != Decimal::ZERO {
738 let entry = NewEntry::builder()
739 .id(EntryId::new())
740 .journal_id(balance.journal_id)
741 .account_id(target_account_id)
742 .currency(balance.currency)
743 .sequence(1u32)
744 .layer(Layer::Settled)
745 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
746 .direction(DebitOrCredit::Debit)
747 .units(balance.settled.cr_balance)
748 .transaction_id(UNASSIGNED_TRANSACTION_ID)
749 .build()
750 .expect("Couldn't build entry");
751 entries.push(entry);
752 }
753 if balance.settled.dr_balance != Decimal::ZERO {
754 let entry = NewEntry::builder()
755 .id(EntryId::new())
756 .journal_id(balance.journal_id)
757 .account_id(target_account_id)
758 .currency(balance.currency)
759 .sequence(1u32)
760 .layer(Layer::Settled)
761 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
762 .direction(DebitOrCredit::Credit)
763 .units(balance.settled.dr_balance)
764 .transaction_id(UNASSIGNED_TRANSACTION_ID)
765 .build()
766 .expect("Couldn't build entry");
767 entries.push(entry);
768 }
769 if balance.pending.cr_balance != Decimal::ZERO {
770 let entry = NewEntry::builder()
771 .id(EntryId::new())
772 .journal_id(balance.journal_id)
773 .account_id(target_account_id)
774 .currency(balance.currency)
775 .sequence(1u32)
776 .layer(Layer::Pending)
777 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
778 .direction(DebitOrCredit::Debit)
779 .units(balance.pending.cr_balance)
780 .transaction_id(UNASSIGNED_TRANSACTION_ID)
781 .build()
782 .expect("Couldn't build entry");
783 entries.push(entry);
784 }
785 if balance.pending.dr_balance != Decimal::ZERO {
786 let entry = NewEntry::builder()
787 .id(EntryId::new())
788 .journal_id(balance.journal_id)
789 .account_id(target_account_id)
790 .currency(balance.currency)
791 .sequence(1u32)
792 .layer(Layer::Pending)
793 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
794 .direction(DebitOrCredit::Credit)
795 .units(balance.pending.dr_balance)
796 .transaction_id(UNASSIGNED_TRANSACTION_ID)
797 .build()
798 .expect("Couldn't build entry");
799 entries.push(entry);
800 }
801 if balance.encumbrance.cr_balance != Decimal::ZERO {
802 let entry = NewEntry::builder()
803 .id(EntryId::new())
804 .journal_id(balance.journal_id)
805 .account_id(target_account_id)
806 .currency(balance.currency)
807 .sequence(1u32)
808 .layer(Layer::Encumbrance)
809 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
810 .direction(DebitOrCredit::Debit)
811 .units(balance.encumbrance.cr_balance)
812 .transaction_id(UNASSIGNED_TRANSACTION_ID)
813 .build()
814 .expect("Couldn't build entry");
815 entries.push(entry);
816 }
817 if balance.encumbrance.dr_balance != Decimal::ZERO {
818 let entry = NewEntry::builder()
819 .id(EntryId::new())
820 .journal_id(balance.journal_id)
821 .account_id(target_account_id)
822 .currency(balance.currency)
823 .sequence(1u32)
824 .layer(Layer::Encumbrance)
825 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
826 .direction(DebitOrCredit::Credit)
827 .units(balance.encumbrance.dr_balance)
828 .transaction_id(UNASSIGNED_TRANSACTION_ID)
829 .build()
830 .expect("Couldn't build entry");
831 entries.push(entry);
832 }
833}
834
835impl From<&AccountSetEvent> for OutboxEventPayload {
836 fn from(event: &AccountSetEvent) -> Self {
837 match event {
838 #[cfg(feature = "import")]
839 AccountSetEvent::Imported {
840 source,
841 values: account_set,
842 } => OutboxEventPayload::AccountSetCreated {
843 source: *source,
844 account_set: account_set.clone(),
845 },
846 AccountSetEvent::Initialized {
847 values: account_set,
848 } => OutboxEventPayload::AccountSetCreated {
849 source: DataSource::Local,
850 account_set: account_set.clone(),
851 },
852 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
853 source: DataSource::Local,
854 account_set: values.clone(),
855 fields: fields.clone(),
856 },
857 }
858 }
859}