1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13 account::*,
14 balance::*,
15 entry::*,
16 ledger_operation::*,
17 outbox::*,
18 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27#[allow(dead_code)]
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32 repo: AccountSetRepo,
33 accounts: Accounts,
34 entries: Entries,
35 balances: Balances,
36 outbox: Outbox,
37 pool: PgPool,
38}
39
40impl AccountSets {
41 pub(crate) fn new(
42 pool: &PgPool,
43 outbox: Outbox,
44 accounts: &Accounts,
45 entries: &Entries,
46 balances: &Balances,
47 ) -> Self {
48 Self {
49 repo: AccountSetRepo::new(pool),
50 outbox,
51 accounts: accounts.clone(),
52 entries: entries.clone(),
53 balances: balances.clone(),
54 pool: pool.clone(),
55 }
56 }
57 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58 pub async fn create(
59 &self,
60 new_account_set: NewAccountSet,
61 ) -> Result<AccountSet, AccountSetError> {
62 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63 let account_set = self.create_in_op(&mut op, new_account_set).await?;
64 op.commit().await?;
65 Ok(account_set)
66 }
67
68 #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69 pub async fn create_in_op(
70 &self,
71 db: &mut LedgerOperation<'_>,
72 new_account_set: NewAccountSet,
73 ) -> Result<AccountSet, AccountSetError> {
74 let new_account = NewAccount::builder()
75 .id(uuid::Uuid::from(new_account_set.id))
76 .name(String::new())
77 .code(new_account_set.id.to_string())
78 .normal_balance_type(new_account_set.normal_balance_type)
79 .is_account_set(true)
80 .build()
81 .expect("Failed to build account");
82 self.accounts.create_in_op(db, new_account).await?;
83 let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84 db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
85 Ok(account_set)
86 }
87
88 pub async fn create_all(
89 &self,
90 new_account_sets: Vec<NewAccountSet>,
91 ) -> Result<Vec<AccountSet>, AccountSetError> {
92 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
93 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
94 op.commit().await?;
95 Ok(account_sets)
96 }
97
98 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, db))]
99 pub async fn create_all_in_op(
100 &self,
101 db: &mut LedgerOperation<'_>,
102 new_account_sets: Vec<NewAccountSet>,
103 ) -> Result<Vec<AccountSet>, AccountSetError> {
104 let mut new_accounts = Vec::new();
105 for new_account_set in new_account_sets.iter() {
106 let new_account = NewAccount::builder()
107 .id(uuid::Uuid::from(new_account_set.id))
108 .name(String::new())
109 .code(new_account_set.id.to_string())
110 .normal_balance_type(new_account_set.normal_balance_type)
111 .is_account_set(true)
112 .build()
113 .expect("Failed to build account");
114 new_accounts.push(new_account);
115 }
116 self.accounts.create_all_in_op(db, new_accounts).await?;
117 let account_sets = self
118 .repo
119 .create_all_in_op(db.op(), new_account_sets)
120 .await?;
121 db.accumulate(
122 account_sets
123 .iter()
124 .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
125 );
126 Ok(account_sets)
127 }
128
129 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
130 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
131 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
132 self.persist_in_op(&mut op, account_set).await?;
133 op.commit().await?;
134 Ok(())
135 }
136
137 #[instrument(
138 name = "cala_ledger.account_sets.persist_in_op",
139 skip(self, db, account_set)
140 )]
141 pub async fn persist_in_op(
142 &self,
143 db: &mut LedgerOperation<'_>,
144 account_set: &mut AccountSet,
145 ) -> Result<(), AccountSetError> {
146 let n_events = self.repo.update_in_op(db.op(), account_set).await?;
147 db.accumulate(
148 account_set
149 .events
150 .last_persisted(n_events)
151 .map(|p| &p.event),
152 );
153 Ok(())
154 }
155
156 pub async fn add_member(
157 &self,
158 account_set_id: AccountSetId,
159 member: impl Into<AccountSetMemberId>,
160 ) -> Result<AccountSet, AccountSetError> {
161 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
162 let account_set = self
163 .add_member_in_op(&mut op, account_set_id, member)
164 .await?;
165 op.commit().await?;
166 Ok(account_set)
167 }
168
169 pub async fn add_member_in_op(
170 &self,
171 op: &mut LedgerOperation<'_>,
172 account_set_id: AccountSetId,
173 member: impl Into<AccountSetMemberId>,
174 ) -> Result<AccountSet, AccountSetError> {
175 let member = member.into();
176 let (time, parents, account_set, member_id) = match member {
177 AccountSetMemberId::Account(id) => {
178 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
179 let (time, parents) = self
180 .repo
181 .add_member_account_and_return_parents(op.tx(), account_set_id, id)
182 .await?;
183 (time, parents, set, id)
184 }
185 AccountSetMemberId::AccountSet(id) => {
186 let mut accounts = self
187 .repo
188 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
189 .await?;
190 let target = accounts
191 .remove(&account_set_id)
192 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
193 let member = accounts
194 .remove(&id)
195 .ok_or(AccountSetError::CouldNotFindById(id))?;
196
197 if target.values().journal_id != member.values().journal_id {
198 return Err(AccountSetError::JournalIdMismatch);
199 }
200
201 let (time, parents) = self
202 .repo
203 .add_member_set_and_return_parents(op.tx(), account_set_id, id)
204 .await?;
205 (time, parents, target, AccountId::from(id))
206 }
207 };
208
209 op.accumulate(std::iter::once(
210 OutboxEventPayload::AccountSetMemberCreated {
211 source: DataSource::Local,
212 account_set_id,
213 member_id: member,
214 },
215 ));
216
217 let balances = self
218 .balances
219 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
220 .await?;
221
222 let target_account_id = AccountId::from(&account_set.id());
223 let mut entries = Vec::new();
224 for balance in balances.into_values() {
225 entries_for_add_balance(&mut entries, target_account_id, balance);
226 }
227
228 if entries.is_empty() {
229 return Ok(account_set);
230 }
231 let entries = self.entries.create_all_in_op(op, entries).await?;
232 let mappings = std::iter::once((target_account_id, parents)).collect();
233 self.balances
234 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
235 .await?;
236
237 Ok(account_set)
238 }
239
240 pub async fn remove_member(
241 &self,
242 account_set_id: AccountSetId,
243 member: impl Into<AccountSetMemberId>,
244 ) -> Result<AccountSet, AccountSetError> {
245 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
246 let account_set = self
247 .remove_member_in_op(&mut op, account_set_id, member)
248 .await?;
249 op.commit().await?;
250 Ok(account_set)
251 }
252
253 pub async fn remove_member_in_op(
254 &self,
255 op: &mut LedgerOperation<'_>,
256 account_set_id: AccountSetId,
257 member: impl Into<AccountSetMemberId>,
258 ) -> Result<AccountSet, AccountSetError> {
259 let member = member.into();
260 let (time, parents, account_set, member_id) = match member {
261 AccountSetMemberId::Account(id) => {
262 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
263 let (time, parents) = self
264 .repo
265 .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
266 .await?;
267 (time, parents, set, id)
268 }
269 AccountSetMemberId::AccountSet(id) => {
270 let mut accounts = self
271 .repo
272 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
273 .await?;
274 let target = accounts
275 .remove(&account_set_id)
276 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
277 let member = accounts
278 .remove(&id)
279 .ok_or(AccountSetError::CouldNotFindById(id))?;
280
281 if target.values().journal_id != member.values().journal_id {
282 return Err(AccountSetError::JournalIdMismatch);
283 }
284
285 let (time, parents) = self
286 .repo
287 .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
288 .await?;
289 (time, parents, target, AccountId::from(id))
290 }
291 };
292
293 op.accumulate(std::iter::once(
294 OutboxEventPayload::AccountSetMemberRemoved {
295 source: DataSource::Local,
296 account_set_id,
297 member_id: member,
298 },
299 ));
300
301 let balances = self
302 .balances
303 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
304 .await?;
305
306 let target_account_id = AccountId::from(&account_set.id());
307 let mut entries = Vec::new();
308 for balance in balances.into_values() {
309 entries_for_remove_balance(&mut entries, target_account_id, balance);
310 }
311
312 if entries.is_empty() {
313 return Ok(account_set);
314 }
315 let entries = self.entries.create_all_in_op(op, entries).await?;
316 let mappings = std::iter::once((target_account_id, parents)).collect();
317 self.balances
318 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
319 .await?;
320
321 Ok(account_set)
322 }
323
324 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
325 pub async fn find_all<T: From<AccountSet>>(
326 &self,
327 account_set_ids: &[AccountSetId],
328 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
329 self.repo.find_all(account_set_ids).await
330 }
331
332 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
333 pub async fn find_all_in_op<T: From<AccountSet>>(
334 &self,
335 op: &mut LedgerOperation<'_>,
336 account_set_ids: &[AccountSetId],
337 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
338 self.repo.find_all_in_tx(op.tx(), account_set_ids).await
339 }
340
341 #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
342 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
343 self.repo.find_by_id(account_set_id).await
344 }
345
346 #[instrument(
347 name = "cala_ledger.accounts_sets.find_by_external_id",
348 skip(self),
349 err
350 )]
351 pub async fn find_by_external_id(
352 &self,
353 external_id: String,
354 ) -> Result<AccountSet, AccountSetError> {
355 self.repo.find_by_external_id(Some(external_id)).await
356 }
357
358 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
359 pub async fn find_where_member(
360 &self,
361 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
362 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
363 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
364 {
365 match member.into() {
366 AccountSetMemberId::Account(account_id) => {
367 self.repo
368 .find_where_account_is_member(account_id, query)
369 .await
370 }
371 AccountSetMemberId::AccountSet(account_set_id) => {
372 self.repo
373 .find_where_account_set_is_member(account_set_id, query)
374 .await
375 }
376 }
377 }
378
379 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
380 pub async fn list_for_name(
381 &self,
382 name: String,
383 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
384 ) -> Result<
385 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
386 AccountSetError,
387 > {
388 self.repo
389 .list_for_name_by_created_at(name, args, Default::default())
390 .await
391 }
392
393 #[instrument(
394 name = "cala_ledger.account_sets.list_for_name_in_op",
395 skip(self, op),
396 err
397 )]
398 pub async fn list_for_name_in_op(
399 &self,
400 op: &mut LedgerOperation<'_>,
401 name: String,
402 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
403 ) -> Result<
404 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
405 AccountSetError,
406 > {
407 self.repo
408 .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
409 .await
410 }
411
412 #[instrument(
413 name = "cala_ledger.account_sets.find_where_member_in_op",
414 skip(self, op),
415 err
416 )]
417 pub async fn find_where_member_in_op(
418 &self,
419 op: &mut LedgerOperation<'_>,
420 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
421 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
422 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
423 {
424 match member.into() {
425 AccountSetMemberId::Account(account_id) => {
426 self.repo
427 .find_where_account_is_member_in_tx(op.tx(), account_id, query)
428 .await
429 }
430 AccountSetMemberId::AccountSet(account_set_id) => {
431 self.repo
432 .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
433 .await
434 }
435 }
436 }
437
438 pub async fn list_members(
439 &self,
440 id: AccountSetId,
441 args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
442 ) -> Result<
443 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
444 AccountSetError,
445 > {
446 self.repo.list_children(id, args).await
447 }
448
449 pub async fn list_members_in_op(
450 &self,
451 op: &mut LedgerOperation<'_>,
452 id: AccountSetId,
453 args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
454 ) -> Result<
455 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
456 AccountSetError,
457 > {
458 self.repo.list_children_in_tx(op.tx(), id, args).await
459 }
460
461 pub(crate) async fn fetch_mappings(
462 &self,
463 journal_id: JournalId,
464 account_ids: &[AccountId],
465 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
466 self.repo.fetch_mappings(journal_id, account_ids).await
467 }
468
469 #[cfg(feature = "import")]
470 pub async fn sync_account_set_creation(
471 &self,
472 mut db: es_entity::DbOp<'_>,
473 origin: DataSourceId,
474 values: AccountSetValues,
475 ) -> Result<(), AccountSetError> {
476 let mut account_set = AccountSet::import(origin, values);
477 self.repo
478 .import_in_op(&mut db, origin, &mut account_set)
479 .await?;
480 let recorded_at = db.now();
481 let outbox_events: Vec<_> = account_set
482 .events
483 .last_persisted(1)
484 .map(|p| OutboxEventPayload::from(&p.event))
485 .collect();
486 self.outbox
487 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
488 .await?;
489 Ok(())
490 }
491
492 #[cfg(feature = "import")]
493 pub async fn sync_account_set_update(
494 &self,
495 mut db: es_entity::DbOp<'_>,
496 values: AccountSetValues,
497 fields: Vec<String>,
498 ) -> Result<(), AccountSetError> {
499 let mut account_set = self.repo.find_by_id(values.id).await?;
500 account_set.update((values, fields));
501 let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
502 let recorded_at = db.now();
503 let outbox_events: Vec<_> = account_set
504 .events
505 .last_persisted(n_events)
506 .map(|p| OutboxEventPayload::from(&p.event))
507 .collect();
508 self.outbox
509 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
510 .await?;
511 Ok(())
512 }
513
514 #[cfg(feature = "import")]
515 pub async fn sync_account_set_member_creation(
516 &self,
517 mut db: es_entity::DbOp<'_>,
518 origin: DataSourceId,
519 account_set_id: AccountSetId,
520 member_id: AccountSetMemberId,
521 ) -> Result<(), AccountSetError> {
522 match member_id {
523 AccountSetMemberId::Account(account_id) => {
524 self.repo
525 .import_member_account_in_op(&mut db, account_set_id, account_id)
526 .await?;
527 }
528 AccountSetMemberId::AccountSet(account_set_id) => {
529 self.repo
530 .import_member_set_in_op(&mut db, account_set_id, account_set_id)
531 .await?;
532 }
533 }
534 let recorded_at = db.now();
535 self.outbox
536 .persist_events_at(
537 db.into_tx(),
538 std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
539 source: DataSource::Remote { id: origin },
540 account_set_id,
541 member_id,
542 }),
543 recorded_at,
544 )
545 .await?;
546 Ok(())
547 }
548
549 #[cfg(feature = "import")]
550 pub async fn sync_account_set_member_removal(
551 &self,
552 mut db: es_entity::DbOp<'_>,
553 origin: DataSourceId,
554 account_set_id: AccountSetId,
555 member_id: AccountSetMemberId,
556 ) -> Result<(), AccountSetError> {
557 match member_id {
558 AccountSetMemberId::Account(account_id) => {
559 self.repo
560 .import_remove_member_account(db.tx(), account_set_id, account_id)
561 .await?;
562 }
563 AccountSetMemberId::AccountSet(account_set_id) => {
564 self.repo
565 .import_remove_member_set(db.tx(), account_set_id, account_set_id)
566 .await?;
567 }
568 }
569 let recorded_at = db.now();
570 self.outbox
571 .persist_events_at(
572 db.into_tx(),
573 std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
574 source: DataSource::Remote { id: origin },
575 account_set_id,
576 member_id,
577 }),
578 recorded_at,
579 )
580 .await?;
581 Ok(())
582 }
583}
584
585fn entries_for_add_balance(
586 entries: &mut Vec<NewEntry>,
587 target_account_id: AccountId,
588 balance: BalanceSnapshot,
589) {
590 use rust_decimal::Decimal;
591
592 if balance.settled.cr_balance != Decimal::ZERO {
593 let entry = NewEntry::builder()
594 .id(EntryId::new())
595 .journal_id(balance.journal_id)
596 .account_id(target_account_id)
597 .currency(balance.currency)
598 .sequence(1u32)
599 .layer(Layer::Settled)
600 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
601 .direction(DebitOrCredit::Credit)
602 .units(balance.settled.cr_balance)
603 .transaction_id(UNASSIGNED_TRANSACTION_ID)
604 .build()
605 .expect("Couldn't build entry");
606 entries.push(entry);
607 }
608 if balance.settled.dr_balance != Decimal::ZERO {
609 let entry = NewEntry::builder()
610 .id(EntryId::new())
611 .journal_id(balance.journal_id)
612 .account_id(target_account_id)
613 .currency(balance.currency)
614 .sequence(1u32)
615 .layer(Layer::Settled)
616 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
617 .direction(DebitOrCredit::Debit)
618 .units(balance.settled.dr_balance)
619 .transaction_id(UNASSIGNED_TRANSACTION_ID)
620 .build()
621 .expect("Couldn't build entry");
622 entries.push(entry);
623 }
624 if balance.pending.cr_balance != Decimal::ZERO {
625 let entry = NewEntry::builder()
626 .id(EntryId::new())
627 .journal_id(balance.journal_id)
628 .account_id(target_account_id)
629 .currency(balance.currency)
630 .sequence(1u32)
631 .layer(Layer::Pending)
632 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
633 .direction(DebitOrCredit::Credit)
634 .units(balance.pending.cr_balance)
635 .transaction_id(UNASSIGNED_TRANSACTION_ID)
636 .build()
637 .expect("Couldn't build entry");
638 entries.push(entry);
639 }
640 if balance.pending.dr_balance != Decimal::ZERO {
641 let entry = NewEntry::builder()
642 .id(EntryId::new())
643 .journal_id(balance.journal_id)
644 .account_id(target_account_id)
645 .currency(balance.currency)
646 .sequence(1u32)
647 .layer(Layer::Pending)
648 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
649 .direction(DebitOrCredit::Debit)
650 .units(balance.pending.dr_balance)
651 .transaction_id(UNASSIGNED_TRANSACTION_ID)
652 .build()
653 .expect("Couldn't build entry");
654 entries.push(entry);
655 }
656 if balance.encumbrance.cr_balance != Decimal::ZERO {
657 let entry = NewEntry::builder()
658 .id(EntryId::new())
659 .journal_id(balance.journal_id)
660 .account_id(target_account_id)
661 .currency(balance.currency)
662 .sequence(1u32)
663 .layer(Layer::Encumbrance)
664 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
665 .direction(DebitOrCredit::Credit)
666 .units(balance.encumbrance.cr_balance)
667 .transaction_id(UNASSIGNED_TRANSACTION_ID)
668 .build()
669 .expect("Couldn't build entry");
670 entries.push(entry);
671 }
672 if balance.encumbrance.dr_balance != Decimal::ZERO {
673 let entry = NewEntry::builder()
674 .id(EntryId::new())
675 .journal_id(balance.journal_id)
676 .account_id(target_account_id)
677 .currency(balance.currency)
678 .sequence(1u32)
679 .layer(Layer::Encumbrance)
680 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
681 .direction(DebitOrCredit::Debit)
682 .units(balance.encumbrance.dr_balance)
683 .transaction_id(UNASSIGNED_TRANSACTION_ID)
684 .build()
685 .expect("Couldn't build entry");
686 entries.push(entry);
687 }
688}
689
690fn entries_for_remove_balance(
691 entries: &mut Vec<NewEntry>,
692 target_account_id: AccountId,
693 balance: BalanceSnapshot,
694) {
695 use rust_decimal::Decimal;
696
697 if balance.settled.cr_balance != Decimal::ZERO {
698 let entry = NewEntry::builder()
699 .id(EntryId::new())
700 .journal_id(balance.journal_id)
701 .account_id(target_account_id)
702 .currency(balance.currency)
703 .sequence(1u32)
704 .layer(Layer::Settled)
705 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
706 .direction(DebitOrCredit::Debit)
707 .units(balance.settled.cr_balance)
708 .transaction_id(UNASSIGNED_TRANSACTION_ID)
709 .build()
710 .expect("Couldn't build entry");
711 entries.push(entry);
712 }
713 if balance.settled.dr_balance != Decimal::ZERO {
714 let entry = NewEntry::builder()
715 .id(EntryId::new())
716 .journal_id(balance.journal_id)
717 .account_id(target_account_id)
718 .currency(balance.currency)
719 .sequence(1u32)
720 .layer(Layer::Settled)
721 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
722 .direction(DebitOrCredit::Credit)
723 .units(balance.settled.dr_balance)
724 .transaction_id(UNASSIGNED_TRANSACTION_ID)
725 .build()
726 .expect("Couldn't build entry");
727 entries.push(entry);
728 }
729 if balance.pending.cr_balance != Decimal::ZERO {
730 let entry = NewEntry::builder()
731 .id(EntryId::new())
732 .journal_id(balance.journal_id)
733 .account_id(target_account_id)
734 .currency(balance.currency)
735 .sequence(1u32)
736 .layer(Layer::Pending)
737 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
738 .direction(DebitOrCredit::Debit)
739 .units(balance.pending.cr_balance)
740 .transaction_id(UNASSIGNED_TRANSACTION_ID)
741 .build()
742 .expect("Couldn't build entry");
743 entries.push(entry);
744 }
745 if balance.pending.dr_balance != Decimal::ZERO {
746 let entry = NewEntry::builder()
747 .id(EntryId::new())
748 .journal_id(balance.journal_id)
749 .account_id(target_account_id)
750 .currency(balance.currency)
751 .sequence(1u32)
752 .layer(Layer::Pending)
753 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
754 .direction(DebitOrCredit::Credit)
755 .units(balance.pending.dr_balance)
756 .transaction_id(UNASSIGNED_TRANSACTION_ID)
757 .build()
758 .expect("Couldn't build entry");
759 entries.push(entry);
760 }
761 if balance.encumbrance.cr_balance != Decimal::ZERO {
762 let entry = NewEntry::builder()
763 .id(EntryId::new())
764 .journal_id(balance.journal_id)
765 .account_id(target_account_id)
766 .currency(balance.currency)
767 .sequence(1u32)
768 .layer(Layer::Encumbrance)
769 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
770 .direction(DebitOrCredit::Debit)
771 .units(balance.encumbrance.cr_balance)
772 .transaction_id(UNASSIGNED_TRANSACTION_ID)
773 .build()
774 .expect("Couldn't build entry");
775 entries.push(entry);
776 }
777 if balance.encumbrance.dr_balance != Decimal::ZERO {
778 let entry = NewEntry::builder()
779 .id(EntryId::new())
780 .journal_id(balance.journal_id)
781 .account_id(target_account_id)
782 .currency(balance.currency)
783 .sequence(1u32)
784 .layer(Layer::Encumbrance)
785 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
786 .direction(DebitOrCredit::Credit)
787 .units(balance.encumbrance.dr_balance)
788 .transaction_id(UNASSIGNED_TRANSACTION_ID)
789 .build()
790 .expect("Couldn't build entry");
791 entries.push(entry);
792 }
793}
794
795impl From<&AccountSetEvent> for OutboxEventPayload {
796 fn from(event: &AccountSetEvent) -> Self {
797 match event {
798 #[cfg(feature = "import")]
799 AccountSetEvent::Imported {
800 source,
801 values: account_set,
802 } => OutboxEventPayload::AccountSetCreated {
803 source: *source,
804 account_set: account_set.clone(),
805 },
806 AccountSetEvent::Initialized {
807 values: account_set,
808 } => OutboxEventPayload::AccountSetCreated {
809 source: DataSource::Local,
810 account_set: account_set.clone(),
811 },
812 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
813 source: DataSource::Local,
814 account_set: values.clone(),
815 fields: fields.clone(),
816 },
817 }
818 }
819}