1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13 account::*,
14 balance::*,
15 entry::*,
16 ledger_operation::*,
17 outbox::*,
18 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27#[allow(dead_code)]
28const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
29
30#[derive(Clone)]
31pub struct AccountSets {
32 repo: AccountSetRepo,
33 accounts: Accounts,
34 entries: Entries,
35 balances: Balances,
36 outbox: Outbox,
37 pool: PgPool,
38}
39
40impl AccountSets {
41 pub(crate) fn new(
42 pool: &PgPool,
43 outbox: Outbox,
44 accounts: &Accounts,
45 entries: &Entries,
46 balances: &Balances,
47 ) -> Self {
48 Self {
49 repo: AccountSetRepo::new(pool),
50 outbox,
51 accounts: accounts.clone(),
52 entries: entries.clone(),
53 balances: balances.clone(),
54 pool: pool.clone(),
55 }
56 }
57 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
58 pub async fn create(
59 &self,
60 new_account_set: NewAccountSet,
61 ) -> Result<AccountSet, AccountSetError> {
62 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
63 let account_set = self.create_in_op(&mut op, new_account_set).await?;
64 op.commit().await?;
65 Ok(account_set)
66 }
67
68 #[instrument(name = "cala_ledger.account_sets.create", skip(self, db))]
69 pub async fn create_in_op(
70 &self,
71 db: &mut LedgerOperation<'_>,
72 new_account_set: NewAccountSet,
73 ) -> Result<AccountSet, AccountSetError> {
74 let new_account = NewAccount::builder()
75 .id(uuid::Uuid::from(new_account_set.id))
76 .name(String::new())
77 .code(new_account_set.id.to_string())
78 .normal_balance_type(new_account_set.normal_balance_type)
79 .is_account_set(true)
80 .build()
81 .expect("Failed to build account");
82 self.accounts.create_in_op(db, new_account).await?;
83 let account_set = self.repo.create_in_op(db.op(), new_account_set).await?;
84 db.accumulate(account_set.events.last_persisted(1).map(|p| &p.event));
85 Ok(account_set)
86 }
87
88 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
89 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
90 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
91 self.persist_in_op(&mut op, account_set).await?;
92 op.commit().await?;
93 Ok(())
94 }
95
96 #[instrument(
97 name = "cala_ledger.account_sets.persist_in_op",
98 skip(self, db, account_set)
99 )]
100 pub async fn persist_in_op(
101 &self,
102 db: &mut LedgerOperation<'_>,
103 account_set: &mut AccountSet,
104 ) -> Result<(), AccountSetError> {
105 let n_events = self.repo.update_in_op(db.op(), account_set).await?;
106 db.accumulate(
107 account_set
108 .events
109 .last_persisted(n_events)
110 .map(|p| &p.event),
111 );
112 Ok(())
113 }
114
115 pub async fn add_member(
116 &self,
117 account_set_id: AccountSetId,
118 member: impl Into<AccountSetMemberId>,
119 ) -> Result<AccountSet, AccountSetError> {
120 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
121 let account_set = self
122 .add_member_in_op(&mut op, account_set_id, member)
123 .await?;
124 op.commit().await?;
125 Ok(account_set)
126 }
127
128 pub async fn add_member_in_op(
129 &self,
130 op: &mut LedgerOperation<'_>,
131 account_set_id: AccountSetId,
132 member: impl Into<AccountSetMemberId>,
133 ) -> Result<AccountSet, AccountSetError> {
134 let member = member.into();
135 let (time, parents, account_set, member_id) = match member {
136 AccountSetMemberId::Account(id) => {
137 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
138 let (time, parents) = self
139 .repo
140 .add_member_account_and_return_parents(op.tx(), account_set_id, id)
141 .await?;
142 (time, parents, set, id)
143 }
144 AccountSetMemberId::AccountSet(id) => {
145 let mut accounts = self
146 .repo
147 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
148 .await?;
149 let target = accounts
150 .remove(&account_set_id)
151 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
152 let member = accounts
153 .remove(&id)
154 .ok_or(AccountSetError::CouldNotFindById(id))?;
155
156 if target.values().journal_id != member.values().journal_id {
157 return Err(AccountSetError::JournalIdMismatch);
158 }
159
160 let (time, parents) = self
161 .repo
162 .add_member_set_and_return_parents(op.tx(), account_set_id, id)
163 .await?;
164 (time, parents, target, AccountId::from(id))
165 }
166 };
167
168 op.accumulate(std::iter::once(
169 OutboxEventPayload::AccountSetMemberCreated {
170 source: DataSource::Local,
171 account_set_id,
172 member_id: member,
173 },
174 ));
175
176 let balances = self
177 .balances
178 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
179 .await?;
180
181 let target_account_id = AccountId::from(&account_set.id());
182 let mut entries = Vec::new();
183 for balance in balances.into_values() {
184 entries_for_add_balance(&mut entries, target_account_id, balance);
185 }
186
187 if entries.is_empty() {
188 return Ok(account_set);
189 }
190 let entries = self.entries.create_all_in_op(op, entries).await?;
191 let mappings = std::iter::once((target_account_id, parents)).collect();
192 self.balances
193 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
194 .await?;
195
196 Ok(account_set)
197 }
198
199 pub async fn remove_member(
200 &self,
201 account_set_id: AccountSetId,
202 member: impl Into<AccountSetMemberId>,
203 ) -> Result<AccountSet, AccountSetError> {
204 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
205 let account_set = self
206 .remove_member_in_op(&mut op, account_set_id, member)
207 .await?;
208 op.commit().await?;
209 Ok(account_set)
210 }
211
212 pub async fn remove_member_in_op(
213 &self,
214 op: &mut LedgerOperation<'_>,
215 account_set_id: AccountSetId,
216 member: impl Into<AccountSetMemberId>,
217 ) -> Result<AccountSet, AccountSetError> {
218 let member = member.into();
219 let (time, parents, account_set, member_id) = match member {
220 AccountSetMemberId::Account(id) => {
221 let set = self.repo.find_by_id_in_tx(op.tx(), account_set_id).await?;
222 let (time, parents) = self
223 .repo
224 .remove_member_account_and_return_parents(op.tx(), account_set_id, id)
225 .await?;
226 (time, parents, set, id)
227 }
228 AccountSetMemberId::AccountSet(id) => {
229 let mut accounts = self
230 .repo
231 .find_all_in_tx::<AccountSet>(op.tx(), &[account_set_id, id])
232 .await?;
233 let target = accounts
234 .remove(&account_set_id)
235 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
236 let member = accounts
237 .remove(&id)
238 .ok_or(AccountSetError::CouldNotFindById(id))?;
239
240 if target.values().journal_id != member.values().journal_id {
241 return Err(AccountSetError::JournalIdMismatch);
242 }
243
244 let (time, parents) = self
245 .repo
246 .remove_member_set_and_return_parents(op.tx(), account_set_id, id)
247 .await?;
248 (time, parents, target, AccountId::from(id))
249 }
250 };
251
252 op.accumulate(std::iter::once(
253 OutboxEventPayload::AccountSetMemberRemoved {
254 source: DataSource::Local,
255 account_set_id,
256 member_id: member,
257 },
258 ));
259
260 let balances = self
261 .balances
262 .find_balances_for_update(op.tx(), account_set.values().journal_id, member_id)
263 .await?;
264
265 let target_account_id = AccountId::from(&account_set.id());
266 let mut entries = Vec::new();
267 for balance in balances.into_values() {
268 entries_for_remove_balance(&mut entries, target_account_id, balance);
269 }
270
271 if entries.is_empty() {
272 return Ok(account_set);
273 }
274 let entries = self.entries.create_all_in_op(op, entries).await?;
275 let mappings = std::iter::once((target_account_id, parents)).collect();
276 self.balances
277 .update_balances_in_op(op, time, account_set.values().journal_id, entries, mappings)
278 .await?;
279
280 Ok(account_set)
281 }
282
283 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self), err)]
284 pub async fn find_all<T: From<AccountSet>>(
285 &self,
286 account_set_ids: &[AccountSetId],
287 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
288 self.repo.find_all(account_set_ids).await
289 }
290
291 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self, op), err)]
292 pub async fn find_all_in_op<T: From<AccountSet>>(
293 &self,
294 op: &mut LedgerOperation<'_>,
295 account_set_ids: &[AccountSetId],
296 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
297 self.repo.find_all_in_tx(op.tx(), account_set_ids).await
298 }
299
300 #[instrument(name = "cala_ledger.account_sets.find", skip(self), err)]
301 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
302 self.repo.find_by_id(account_set_id).await
303 }
304
305 #[instrument(
306 name = "cala_ledger.accounts_sets.find_by_external_id",
307 skip(self),
308 err
309 )]
310 pub async fn find_by_external_id(
311 &self,
312 external_id: String,
313 ) -> Result<AccountSet, AccountSetError> {
314 self.repo.find_by_external_id(Some(external_id)).await
315 }
316
317 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self), err)]
318 pub async fn find_where_member(
319 &self,
320 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
321 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
322 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
323 {
324 match member.into() {
325 AccountSetMemberId::Account(account_id) => {
326 self.repo
327 .find_where_account_is_member(account_id, query)
328 .await
329 }
330 AccountSetMemberId::AccountSet(account_set_id) => {
331 self.repo
332 .find_where_account_set_is_member(account_set_id, query)
333 .await
334 }
335 }
336 }
337
338 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self), err)]
339 pub async fn list_for_name(
340 &self,
341 name: String,
342 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
343 ) -> Result<
344 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
345 AccountSetError,
346 > {
347 self.repo
348 .list_for_name_by_created_at(name, args, Default::default())
349 .await
350 }
351
352 #[instrument(
353 name = "cala_ledger.account_sets.list_for_name_in_op",
354 skip(self, op),
355 err
356 )]
357 pub async fn list_for_name_in_op(
358 &self,
359 op: &mut LedgerOperation<'_>,
360 name: String,
361 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
362 ) -> Result<
363 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
364 AccountSetError,
365 > {
366 self.repo
367 .list_for_name_by_created_at_in_tx(op.tx(), name, args, Default::default())
368 .await
369 }
370
371 #[instrument(
372 name = "cala_ledger.account_sets.find_where_member_in_op",
373 skip(self, op),
374 err
375 )]
376 pub async fn find_where_member_in_op(
377 &self,
378 op: &mut LedgerOperation<'_>,
379 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
380 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
381 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
382 {
383 match member.into() {
384 AccountSetMemberId::Account(account_id) => {
385 self.repo
386 .find_where_account_is_member_in_tx(op.tx(), account_id, query)
387 .await
388 }
389 AccountSetMemberId::AccountSet(account_set_id) => {
390 self.repo
391 .find_where_account_set_is_member_in_tx(op.tx(), account_set_id, query)
392 .await
393 }
394 }
395 }
396
397 pub async fn list_members(
398 &self,
399 id: AccountSetId,
400 args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
401 ) -> Result<
402 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
403 AccountSetError,
404 > {
405 self.repo.list_children(id, args).await
406 }
407
408 pub async fn list_members_in_op(
409 &self,
410 op: &mut LedgerOperation<'_>,
411 id: AccountSetId,
412 args: es_entity::PaginatedQueryArgs<AccountSetMembersCursor>,
413 ) -> Result<
414 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersCursor>,
415 AccountSetError,
416 > {
417 self.repo.list_children_in_tx(op.tx(), id, args).await
418 }
419
420 pub(crate) async fn fetch_mappings(
421 &self,
422 journal_id: JournalId,
423 account_ids: &[AccountId],
424 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
425 self.repo.fetch_mappings(journal_id, account_ids).await
426 }
427
428 #[cfg(feature = "import")]
429 pub async fn sync_account_set_creation(
430 &self,
431 mut db: es_entity::DbOp<'_>,
432 origin: DataSourceId,
433 values: AccountSetValues,
434 ) -> Result<(), AccountSetError> {
435 let mut account_set = AccountSet::import(origin, values);
436 self.repo
437 .import_in_op(&mut db, origin, &mut account_set)
438 .await?;
439 let recorded_at = db.now();
440 let outbox_events: Vec<_> = account_set
441 .events
442 .last_persisted(1)
443 .map(|p| OutboxEventPayload::from(&p.event))
444 .collect();
445 self.outbox
446 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
447 .await?;
448 Ok(())
449 }
450
451 #[cfg(feature = "import")]
452 pub async fn sync_account_set_update(
453 &self,
454 mut db: es_entity::DbOp<'_>,
455 values: AccountSetValues,
456 fields: Vec<String>,
457 ) -> Result<(), AccountSetError> {
458 let mut account_set = self.repo.find_by_id(values.id).await?;
459 account_set.update((values, fields));
460 let n_events = self.repo.update_in_op(&mut db, &mut account_set).await?;
461 let recorded_at = db.now();
462 let outbox_events: Vec<_> = account_set
463 .events
464 .last_persisted(n_events)
465 .map(|p| OutboxEventPayload::from(&p.event))
466 .collect();
467 self.outbox
468 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
469 .await?;
470 Ok(())
471 }
472
473 #[cfg(feature = "import")]
474 pub async fn sync_account_set_member_creation(
475 &self,
476 mut db: es_entity::DbOp<'_>,
477 origin: DataSourceId,
478 account_set_id: AccountSetId,
479 member_id: AccountSetMemberId,
480 ) -> Result<(), AccountSetError> {
481 match member_id {
482 AccountSetMemberId::Account(account_id) => {
483 self.repo
484 .import_member_account_in_op(&mut db, account_set_id, account_id)
485 .await?;
486 }
487 AccountSetMemberId::AccountSet(account_set_id) => {
488 self.repo
489 .import_member_set_in_op(&mut db, account_set_id, account_set_id)
490 .await?;
491 }
492 }
493 let recorded_at = db.now();
494 self.outbox
495 .persist_events_at(
496 db.into_tx(),
497 std::iter::once(OutboxEventPayload::AccountSetMemberCreated {
498 source: DataSource::Remote { id: origin },
499 account_set_id,
500 member_id,
501 }),
502 recorded_at,
503 )
504 .await?;
505 Ok(())
506 }
507
508 #[cfg(feature = "import")]
509 pub async fn sync_account_set_member_removal(
510 &self,
511 mut db: es_entity::DbOp<'_>,
512 origin: DataSourceId,
513 account_set_id: AccountSetId,
514 member_id: AccountSetMemberId,
515 ) -> Result<(), AccountSetError> {
516 match member_id {
517 AccountSetMemberId::Account(account_id) => {
518 self.repo
519 .import_remove_member_account(db.tx(), account_set_id, account_id)
520 .await?;
521 }
522 AccountSetMemberId::AccountSet(account_set_id) => {
523 self.repo
524 .import_remove_member_set(db.tx(), account_set_id, account_set_id)
525 .await?;
526 }
527 }
528 let recorded_at = db.now();
529 self.outbox
530 .persist_events_at(
531 db.into_tx(),
532 std::iter::once(OutboxEventPayload::AccountSetMemberRemoved {
533 source: DataSource::Remote { id: origin },
534 account_set_id,
535 member_id,
536 }),
537 recorded_at,
538 )
539 .await?;
540 Ok(())
541 }
542}
543
544fn entries_for_add_balance(
545 entries: &mut Vec<NewEntry>,
546 target_account_id: AccountId,
547 balance: BalanceSnapshot,
548) {
549 use rust_decimal::Decimal;
550
551 if balance.settled.cr_balance != Decimal::ZERO {
552 let entry = NewEntry::builder()
553 .id(EntryId::new())
554 .journal_id(balance.journal_id)
555 .account_id(target_account_id)
556 .currency(balance.currency)
557 .sequence(1u32)
558 .layer(Layer::Settled)
559 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
560 .direction(DebitOrCredit::Credit)
561 .units(balance.settled.cr_balance)
562 .transaction_id(UNASSIGNED_TRANSACTION_ID)
563 .build()
564 .expect("Couldn't build entry");
565 entries.push(entry);
566 }
567 if balance.settled.dr_balance != Decimal::ZERO {
568 let entry = NewEntry::builder()
569 .id(EntryId::new())
570 .journal_id(balance.journal_id)
571 .account_id(target_account_id)
572 .currency(balance.currency)
573 .sequence(1u32)
574 .layer(Layer::Settled)
575 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
576 .direction(DebitOrCredit::Debit)
577 .units(balance.settled.dr_balance)
578 .transaction_id(UNASSIGNED_TRANSACTION_ID)
579 .build()
580 .expect("Couldn't build entry");
581 entries.push(entry);
582 }
583 if balance.pending.cr_balance != Decimal::ZERO {
584 let entry = NewEntry::builder()
585 .id(EntryId::new())
586 .journal_id(balance.journal_id)
587 .account_id(target_account_id)
588 .currency(balance.currency)
589 .sequence(1u32)
590 .layer(Layer::Pending)
591 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
592 .direction(DebitOrCredit::Credit)
593 .units(balance.pending.cr_balance)
594 .transaction_id(UNASSIGNED_TRANSACTION_ID)
595 .build()
596 .expect("Couldn't build entry");
597 entries.push(entry);
598 }
599 if balance.pending.dr_balance != Decimal::ZERO {
600 let entry = NewEntry::builder()
601 .id(EntryId::new())
602 .journal_id(balance.journal_id)
603 .account_id(target_account_id)
604 .currency(balance.currency)
605 .sequence(1u32)
606 .layer(Layer::Pending)
607 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
608 .direction(DebitOrCredit::Debit)
609 .units(balance.pending.dr_balance)
610 .transaction_id(UNASSIGNED_TRANSACTION_ID)
611 .build()
612 .expect("Couldn't build entry");
613 entries.push(entry);
614 }
615 if balance.encumbrance.cr_balance != Decimal::ZERO {
616 let entry = NewEntry::builder()
617 .id(EntryId::new())
618 .journal_id(balance.journal_id)
619 .account_id(target_account_id)
620 .currency(balance.currency)
621 .sequence(1u32)
622 .layer(Layer::Encumbrance)
623 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
624 .direction(DebitOrCredit::Credit)
625 .units(balance.encumbrance.cr_balance)
626 .transaction_id(UNASSIGNED_TRANSACTION_ID)
627 .build()
628 .expect("Couldn't build entry");
629 entries.push(entry);
630 }
631 if balance.encumbrance.dr_balance != Decimal::ZERO {
632 let entry = NewEntry::builder()
633 .id(EntryId::new())
634 .journal_id(balance.journal_id)
635 .account_id(target_account_id)
636 .currency(balance.currency)
637 .sequence(1u32)
638 .layer(Layer::Encumbrance)
639 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
640 .direction(DebitOrCredit::Debit)
641 .units(balance.encumbrance.dr_balance)
642 .transaction_id(UNASSIGNED_TRANSACTION_ID)
643 .build()
644 .expect("Couldn't build entry");
645 entries.push(entry);
646 }
647}
648
649fn entries_for_remove_balance(
650 entries: &mut Vec<NewEntry>,
651 target_account_id: AccountId,
652 balance: BalanceSnapshot,
653) {
654 use rust_decimal::Decimal;
655
656 if balance.settled.cr_balance != Decimal::ZERO {
657 let entry = NewEntry::builder()
658 .id(EntryId::new())
659 .journal_id(balance.journal_id)
660 .account_id(target_account_id)
661 .currency(balance.currency)
662 .sequence(1u32)
663 .layer(Layer::Settled)
664 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
665 .direction(DebitOrCredit::Debit)
666 .units(balance.settled.cr_balance)
667 .transaction_id(UNASSIGNED_TRANSACTION_ID)
668 .build()
669 .expect("Couldn't build entry");
670 entries.push(entry);
671 }
672 if balance.settled.dr_balance != Decimal::ZERO {
673 let entry = NewEntry::builder()
674 .id(EntryId::new())
675 .journal_id(balance.journal_id)
676 .account_id(target_account_id)
677 .currency(balance.currency)
678 .sequence(1u32)
679 .layer(Layer::Settled)
680 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
681 .direction(DebitOrCredit::Credit)
682 .units(balance.settled.dr_balance)
683 .transaction_id(UNASSIGNED_TRANSACTION_ID)
684 .build()
685 .expect("Couldn't build entry");
686 entries.push(entry);
687 }
688 if balance.pending.cr_balance != Decimal::ZERO {
689 let entry = NewEntry::builder()
690 .id(EntryId::new())
691 .journal_id(balance.journal_id)
692 .account_id(target_account_id)
693 .currency(balance.currency)
694 .sequence(1u32)
695 .layer(Layer::Pending)
696 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
697 .direction(DebitOrCredit::Debit)
698 .units(balance.pending.cr_balance)
699 .transaction_id(UNASSIGNED_TRANSACTION_ID)
700 .build()
701 .expect("Couldn't build entry");
702 entries.push(entry);
703 }
704 if balance.pending.dr_balance != Decimal::ZERO {
705 let entry = NewEntry::builder()
706 .id(EntryId::new())
707 .journal_id(balance.journal_id)
708 .account_id(target_account_id)
709 .currency(balance.currency)
710 .sequence(1u32)
711 .layer(Layer::Pending)
712 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
713 .direction(DebitOrCredit::Credit)
714 .units(balance.pending.dr_balance)
715 .transaction_id(UNASSIGNED_TRANSACTION_ID)
716 .build()
717 .expect("Couldn't build entry");
718 entries.push(entry);
719 }
720 if balance.encumbrance.cr_balance != Decimal::ZERO {
721 let entry = NewEntry::builder()
722 .id(EntryId::new())
723 .journal_id(balance.journal_id)
724 .account_id(target_account_id)
725 .currency(balance.currency)
726 .sequence(1u32)
727 .layer(Layer::Encumbrance)
728 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
729 .direction(DebitOrCredit::Debit)
730 .units(balance.encumbrance.cr_balance)
731 .transaction_id(UNASSIGNED_TRANSACTION_ID)
732 .build()
733 .expect("Couldn't build entry");
734 entries.push(entry);
735 }
736 if balance.encumbrance.dr_balance != Decimal::ZERO {
737 let entry = NewEntry::builder()
738 .id(EntryId::new())
739 .journal_id(balance.journal_id)
740 .account_id(target_account_id)
741 .currency(balance.currency)
742 .sequence(1u32)
743 .layer(Layer::Encumbrance)
744 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
745 .direction(DebitOrCredit::Credit)
746 .units(balance.encumbrance.dr_balance)
747 .transaction_id(UNASSIGNED_TRANSACTION_ID)
748 .build()
749 .expect("Couldn't build entry");
750 entries.push(entry);
751 }
752}
753
754impl From<&AccountSetEvent> for OutboxEventPayload {
755 fn from(event: &AccountSetEvent) -> Self {
756 match event {
757 #[cfg(feature = "import")]
758 AccountSetEvent::Imported {
759 source,
760 values: account_set,
761 } => OutboxEventPayload::AccountSetCreated {
762 source: *source,
763 account_set: account_set.clone(),
764 },
765 AccountSetEvent::Initialized {
766 values: account_set,
767 } => OutboxEventPayload::AccountSetCreated {
768 source: DataSource::Local,
769 account_set: account_set.clone(),
770 },
771 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
772 source: DataSource::Local,
773 account_set: values.clone(),
774 fields: fields.clone(),
775 },
776 }
777 }
778}