1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use std::collections::HashMap;
8use tracing::instrument;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{
13 account::*,
14 balance::*,
15 entry::*,
16 outbox::*,
17 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
18};
19
20pub use cursor::*;
21pub use entity::*;
22use error::*;
23use repo::*;
24pub use repo::{account_set_cursor::*, members_cursor::*};
25
26const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
27
28#[derive(Clone)]
29pub struct AccountSets {
30 repo: AccountSetRepo,
31 accounts: Accounts,
32 entries: Entries,
33 balances: Balances,
34}
35
36impl AccountSets {
37 pub(crate) fn new(
38 pool: &PgPool,
39 publisher: &OutboxPublisher,
40 accounts: &Accounts,
41 entries: &Entries,
42 balances: &Balances,
43 ) -> Self {
44 Self {
45 repo: AccountSetRepo::new(pool, publisher),
46 accounts: accounts.clone(),
47 entries: entries.clone(),
48 balances: balances.clone(),
49 }
50 }
51 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
52 pub async fn create(
53 &self,
54 new_account_set: NewAccountSet,
55 ) -> Result<AccountSet, AccountSetError> {
56 let mut op = self.repo.begin_op().await?;
57 let account_set = self.create_in_op(&mut op, new_account_set).await?;
58 op.commit().await?;
59 Ok(account_set)
60 }
61
62 #[instrument(name = "cala_ledger.account_sets.create_in_op", skip(self, db))]
63 pub async fn create_in_op(
64 &self,
65 db: &mut impl es_entity::AtomicOperation,
66 new_account_set: NewAccountSet,
67 ) -> Result<AccountSet, AccountSetError> {
68 let new_account = NewAccount::builder()
69 .id(new_account_set.id)
70 .name(String::new())
71 .code(new_account_set.id.to_string())
72 .normal_balance_type(new_account_set.normal_balance_type)
73 .is_account_set(true)
74 .velocity_context_values(new_account_set.context_values())
75 .build()
76 .expect("Failed to build account");
77 self.accounts.create_in_op(db, new_account).await?;
78
79 let account_set = self.repo.create_in_op(db, new_account_set).await?;
80
81 Ok(account_set)
82 }
83
84 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, new_account_sets), fields(count = new_account_sets.len()))]
85 pub async fn create_all(
86 &self,
87 new_account_sets: Vec<NewAccountSet>,
88 ) -> Result<Vec<AccountSet>, AccountSetError> {
89 let mut op = self.repo.begin_op().await?;
90 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
91 op.commit().await?;
92 Ok(account_sets)
93 }
94
95 #[instrument(name = "cala_ledger.account_sets.create_all_in_op", skip(self, db))]
96 pub async fn create_all_in_op(
97 &self,
98 db: &mut impl es_entity::AtomicOperation,
99 new_account_sets: Vec<NewAccountSet>,
100 ) -> Result<Vec<AccountSet>, AccountSetError> {
101 let mut new_accounts = Vec::new();
102 for new_account_set in new_account_sets.iter() {
103 let new_account = NewAccount::builder()
104 .id(new_account_set.id)
105 .name(String::new())
106 .code(new_account_set.id.to_string())
107 .normal_balance_type(new_account_set.normal_balance_type)
108 .is_account_set(true)
109 .velocity_context_values(new_account_set.context_values())
110 .build()
111 .expect("Failed to build account");
112 new_accounts.push(new_account);
113 }
114 self.accounts.create_all_in_op(db, new_accounts).await?;
115
116 let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
117
118 Ok(account_sets)
119 }
120
121 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
122 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
123 let mut op = self.repo.begin_op().await?;
124 self.persist_in_op(&mut op, account_set).await?;
125 op.commit().await?;
126 Ok(())
127 }
128
129 #[instrument(
130 name = "cala_ledger.account_sets.persist_in_op",
131 skip(self, db, account_set)
132 )]
133 pub async fn persist_in_op(
134 &self,
135 db: &mut impl es_entity::AtomicOperation,
136 account_set: &mut AccountSet,
137 ) -> Result<(), AccountSetError> {
138 self.repo.update_in_op(db, account_set).await?;
139
140 self.accounts
141 .update_velocity_context_values_in_op(db, account_set.values())
142 .await?;
143
144 Ok(())
145 }
146
147 #[instrument(name = "cala_ledger.account_sets.add_member", skip(self, member), fields(account_set_id = %account_set_id))]
148 pub async fn add_member(
149 &self,
150 account_set_id: AccountSetId,
151 member: impl Into<AccountSetMemberId>,
152 ) -> Result<AccountSet, AccountSetError> {
153 let mut op = self.repo.begin_op().await?;
154 let account_set = self
155 .add_member_in_op(&mut op, account_set_id, member)
156 .await?;
157 op.commit().await?;
158 Ok(account_set)
159 }
160
161 #[instrument(name = "cala_ledger.account_sets.add_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id, is_account = tracing::field::Empty, is_account_set = tracing::field::Empty, member_id = tracing::field::Empty))]
162 pub async fn add_member_in_op(
163 &self,
164 op: &mut impl es_entity::AtomicOperation,
165 account_set_id: AccountSetId,
166 member: impl Into<AccountSetMemberId>,
167 ) -> Result<AccountSet, AccountSetError> {
168 let member = member.into();
169 let (time, parents, account_set, member_id) = match member {
170 AccountSetMemberId::Account(id) => {
171 tracing::Span::current().record("is_account", true);
172 tracing::Span::current().record("is_account_set", false);
173 tracing::Span::current().record("member_id", tracing::field::display(&id));
174 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
175 let (time, parents) = self
176 .repo
177 .add_member_account_and_return_parents(&mut *op, account_set_id, id)
178 .await?;
179 (time, parents, set, id)
180 }
181 AccountSetMemberId::AccountSet(id) => {
182 tracing::Span::current().record("is_account", false);
183 tracing::Span::current().record("is_account_set", true);
184 tracing::Span::current().record("member_id", tracing::field::display(&id));
185 let mut accounts = self
186 .repo
187 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
188 .await?;
189 let target = accounts
190 .remove(&account_set_id)
191 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
192 let member = accounts
193 .remove(&id)
194 .ok_or(AccountSetError::CouldNotFindById(id))?;
195
196 if target.values().journal_id != member.values().journal_id {
197 return Err(AccountSetError::JournalIdMismatch);
198 }
199
200 let (time, parents) = self
201 .repo
202 .add_member_set_and_return_parents(op, account_set_id, id)
203 .await?;
204 (time, parents, target, AccountId::from(id))
205 }
206 };
207
208 let balances = self
209 .balances
210 .find_balances_for_update(op, account_set.values().journal_id, member_id)
211 .await?;
212
213 let target_account_id = AccountId::from(&account_set.id());
214 let mut entries = Vec::new();
215 for balance in balances.into_values() {
216 entries_for_add_balance(&mut entries, target_account_id, balance);
217 }
218
219 if entries.is_empty() {
220 return Ok(account_set);
221 }
222 let entries = self.entries.create_all_in_op(op, entries).await?;
223 let mappings = std::iter::once((target_account_id, parents)).collect();
224 self.balances
225 .update_balances_in_op(
226 op,
227 account_set.values().journal_id,
228 entries,
229 time.date_naive(),
230 time,
231 mappings,
232 )
233 .await?;
234
235 Ok(account_set)
236 }
237
238 #[instrument(name = "cala_ledger.account_sets.remove_member", skip(self, member), fields(account_set_id = %account_set_id))]
239 pub async fn remove_member(
240 &self,
241 account_set_id: AccountSetId,
242 member: impl Into<AccountSetMemberId>,
243 ) -> Result<AccountSet, AccountSetError> {
244 let mut op = self.repo.begin_op().await?;
245 let account_set = self
246 .remove_member_in_op(&mut op, account_set_id, member)
247 .await?;
248 op.commit().await?;
249 Ok(account_set)
250 }
251
252 #[instrument(name = "cala_ledger.account_sets.remove_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id))]
253 pub async fn remove_member_in_op(
254 &self,
255 op: &mut impl es_entity::AtomicOperation,
256 account_set_id: AccountSetId,
257 member: impl Into<AccountSetMemberId>,
258 ) -> Result<AccountSet, AccountSetError> {
259 let member = member.into();
260 let (time, parents, account_set, member_id) = match member {
261 AccountSetMemberId::Account(id) => {
262 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
263 let (time, parents) = self
264 .repo
265 .remove_member_account_and_return_parents(op, account_set_id, id)
266 .await?;
267 (time, parents, set, id)
268 }
269 AccountSetMemberId::AccountSet(id) => {
270 let mut accounts = self
271 .repo
272 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
273 .await?;
274 let target = accounts
275 .remove(&account_set_id)
276 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
277 let member = accounts
278 .remove(&id)
279 .ok_or(AccountSetError::CouldNotFindById(id))?;
280
281 if target.values().journal_id != member.values().journal_id {
282 return Err(AccountSetError::JournalIdMismatch);
283 }
284
285 let (time, parents) = self
286 .repo
287 .remove_member_set_and_return_parents(op, account_set_id, id)
288 .await?;
289 (time, parents, target, AccountId::from(id))
290 }
291 };
292
293 let balances = self
294 .balances
295 .find_balances_for_update(op, account_set.values().journal_id, member_id)
296 .await?;
297
298 let target_account_id = AccountId::from(&account_set.id());
299 let mut entries = Vec::new();
300 for balance in balances.into_values() {
301 entries_for_remove_balance(&mut entries, target_account_id, balance);
302 }
303
304 if entries.is_empty() {
305 return Ok(account_set);
306 }
307 let entries = self.entries.create_all_in_op(op, entries).await?;
308 let mappings = std::iter::once((target_account_id, parents)).collect();
309 self.balances
310 .update_balances_in_op(
311 op,
312 account_set.values().journal_id,
313 entries,
314 time.date_naive(),
315 time,
316 mappings,
317 )
318 .await?;
319
320 Ok(account_set)
321 }
322
323 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self))]
324 pub async fn find_all<T: From<AccountSet>>(
325 &self,
326 account_set_ids: &[AccountSetId],
327 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
328 self.repo.find_all(account_set_ids).await
329 }
330
331 #[instrument(name = "cala_ledger.account_sets.find_all_in_op", skip(self, op))]
332 pub async fn find_all_in_op<T: From<AccountSet>>(
333 &self,
334 op: &mut impl es_entity::AtomicOperation,
335 account_set_ids: &[AccountSetId],
336 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
337 self.repo.find_all_in_op(op, account_set_ids).await
338 }
339
340 #[instrument(name = "cala_ledger.account_sets.find", skip(self))]
341 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
342 self.repo.find_by_id(account_set_id).await
343 }
344
345 #[instrument(name = "cala_ledger.account_sets.find_in_op", skip(self, op))]
346 pub async fn find_in_op(
347 &self,
348 op: &mut impl es_entity::AtomicOperation,
349 account_set_id: AccountSetId,
350 ) -> Result<AccountSet, AccountSetError> {
351 self.repo.find_by_id_in_op(op, account_set_id).await
352 }
353
354 #[instrument(name = "cala_ledger.accounts_sets.find_by_external_id", skip(self))]
355 pub async fn find_by_external_id(
356 &self,
357 external_id: String,
358 ) -> Result<AccountSet, AccountSetError> {
359 self.repo.find_by_external_id(Some(external_id)).await
360 }
361
362 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self))]
363 pub async fn find_where_member(
364 &self,
365 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
366 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
367 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
368 {
369 match member.into() {
370 AccountSetMemberId::Account(account_id) => {
371 self.repo
372 .find_where_account_is_member(account_id, query)
373 .await
374 }
375 AccountSetMemberId::AccountSet(account_set_id) => {
376 self.repo
377 .find_where_account_set_is_member(account_set_id, query)
378 .await
379 }
380 }
381 }
382
383 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self))]
384 pub async fn list_for_name(
385 &self,
386 name: String,
387 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
388 ) -> Result<
389 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
390 AccountSetError,
391 > {
392 self.repo
393 .list_for_name_by_created_at(name, args, Default::default())
394 .await
395 }
396
397 #[instrument(name = "cala_ledger.account_sets.list_for_name_in_op", skip(self, op))]
398 pub async fn list_for_name_in_op(
399 &self,
400 op: &mut impl es_entity::AtomicOperation,
401 name: String,
402 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
403 ) -> Result<
404 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
405 AccountSetError,
406 > {
407 self.repo
408 .list_for_name_by_created_at_in_op(op, name, args, Default::default())
409 .await
410 }
411
412 #[instrument(
413 name = "cala_ledger.account_sets.find_where_member_in_op",
414 skip(self, op)
415 )]
416 pub async fn find_where_member_in_op(
417 &self,
418 op: &mut impl es_entity::AtomicOperation,
419 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
420 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
421 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
422 {
423 match member.into() {
424 AccountSetMemberId::Account(account_id) => {
425 self.repo
426 .find_where_account_is_member_in_op(op, account_id, query)
427 .await
428 }
429 AccountSetMemberId::AccountSet(account_set_id) => {
430 self.repo
431 .find_where_account_set_is_member_in_op(op, account_set_id, query)
432 .await
433 }
434 }
435 }
436
437 pub async fn list_members_by_created_at(
438 &self,
439 id: AccountSetId,
440 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
441 ) -> Result<
442 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
443 AccountSetError,
444 > {
445 self.repo.list_children_by_created_at(id, args).await
446 }
447
448 pub async fn list_members_by_created_at_in_op(
449 &self,
450 op: &mut impl es_entity::AtomicOperation,
451 id: AccountSetId,
452 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
453 ) -> Result<
454 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
455 AccountSetError,
456 > {
457 self.repo
458 .list_children_by_created_at_in_op(op, id, args)
459 .await
460 }
461
462 pub async fn list_members_by_external_id(
463 &self,
464 id: AccountSetId,
465 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
466 ) -> Result<
467 es_entity::PaginatedQueryRet<
468 AccountSetMemberByExternalId,
469 AccountSetMembersByExternalIdCursor,
470 >,
471 AccountSetError,
472 > {
473 self.repo.list_children_by_external_id(id, args).await
474 }
475
476 pub async fn list_members_by_external_id_in_op(
477 &self,
478 op: &mut impl es_entity::AtomicOperation,
479 id: AccountSetId,
480 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
481 ) -> Result<
482 es_entity::PaginatedQueryRet<
483 AccountSetMemberByExternalId,
484 AccountSetMembersByExternalIdCursor,
485 >,
486 AccountSetError,
487 > {
488 self.repo
489 .list_children_by_external_id_in_op(op, id, args)
490 .await
491 }
492
493 pub(crate) async fn fetch_mappings_in_op(
494 &self,
495 op: &mut impl es_entity::AtomicOperation,
496 journal_id: JournalId,
497 account_ids: &[AccountId],
498 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
499 self.repo
500 .fetch_mappings_in_op(op, journal_id, account_ids)
501 .await
502 }
503
504 #[cfg(feature = "import")]
505 pub(crate) async fn sync_account_set_creation(
506 &self,
507 mut db: es_entity::DbOpWithTime<'_>,
508 origin: DataSourceId,
509 values: AccountSetValues,
510 ) -> Result<(), AccountSetError> {
511 let mut account_set = AccountSet::import(origin, values);
512 self.repo
513 .import_in_op(&mut db, origin, &mut account_set)
514 .await?;
515 db.commit().await?;
516 Ok(())
517 }
518
519 #[cfg(feature = "import")]
520 pub(crate) async fn sync_account_set_update(
521 &self,
522 mut db: es_entity::DbOpWithTime<'_>,
523 values: AccountSetValues,
524 fields: Vec<String>,
525 ) -> Result<(), AccountSetError> {
526 let mut account_set = self.repo.find_by_id_in_op(&mut db, values.id).await?;
527 let _ = account_set.update((values, fields));
528 self.repo.update_in_op(&mut db, &mut account_set).await?;
529 db.commit().await?;
530 Ok(())
531 }
532
533 #[cfg(feature = "import")]
534 pub(crate) async fn sync_account_set_member_creation(
535 &self,
536 mut db: es_entity::DbOpWithTime<'_>,
537 origin: DataSourceId,
538 account_set_id: AccountSetId,
539 member_id: AccountSetMemberId,
540 ) -> Result<(), AccountSetError> {
541 match member_id {
542 AccountSetMemberId::Account(account_id) => {
543 self.repo
544 .import_member_account_in_op(&mut db, origin, account_set_id, account_id)
545 .await?;
546 }
547 AccountSetMemberId::AccountSet(member_account_set_id) => {
548 self.repo
549 .import_member_set_in_op(&mut db, origin, account_set_id, member_account_set_id)
550 .await?;
551 }
552 }
553 db.commit().await?;
554 Ok(())
555 }
556
557 #[cfg(feature = "import")]
558 pub(crate) async fn sync_account_set_member_removal(
559 &self,
560 mut db: es_entity::DbOpWithTime<'_>,
561 origin: DataSourceId,
562 account_set_id: AccountSetId,
563 member_id: AccountSetMemberId,
564 ) -> Result<(), AccountSetError> {
565 match member_id {
566 AccountSetMemberId::Account(account_id) => {
567 self.repo
568 .import_remove_member_account(&mut db, origin, account_set_id, account_id)
569 .await?;
570 }
571 AccountSetMemberId::AccountSet(member_account_set_id) => {
572 self.repo
573 .import_remove_member_set(
574 &mut db,
575 origin,
576 account_set_id,
577 member_account_set_id,
578 )
579 .await?;
580 }
581 }
582 db.commit().await?;
583 Ok(())
584 }
585}
586
587fn entries_for_add_balance(
588 entries: &mut Vec<NewEntry>,
589 target_account_id: AccountId,
590 balance: BalanceSnapshot,
591) {
592 use rust_decimal::Decimal;
593
594 if balance.settled.cr_balance != Decimal::ZERO {
595 let entry = NewEntry::builder()
596 .id(EntryId::new())
597 .journal_id(balance.journal_id)
598 .account_id(target_account_id)
599 .currency(balance.currency)
600 .sequence(1u32)
601 .layer(Layer::Settled)
602 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
603 .direction(DebitOrCredit::Credit)
604 .units(balance.settled.cr_balance)
605 .transaction_id(UNASSIGNED_TRANSACTION_ID)
606 .build()
607 .expect("Couldn't build entry");
608 entries.push(entry);
609 }
610 if balance.settled.dr_balance != Decimal::ZERO {
611 let entry = NewEntry::builder()
612 .id(EntryId::new())
613 .journal_id(balance.journal_id)
614 .account_id(target_account_id)
615 .currency(balance.currency)
616 .sequence(1u32)
617 .layer(Layer::Settled)
618 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
619 .direction(DebitOrCredit::Debit)
620 .units(balance.settled.dr_balance)
621 .transaction_id(UNASSIGNED_TRANSACTION_ID)
622 .build()
623 .expect("Couldn't build entry");
624 entries.push(entry);
625 }
626 if balance.pending.cr_balance != Decimal::ZERO {
627 let entry = NewEntry::builder()
628 .id(EntryId::new())
629 .journal_id(balance.journal_id)
630 .account_id(target_account_id)
631 .currency(balance.currency)
632 .sequence(1u32)
633 .layer(Layer::Pending)
634 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
635 .direction(DebitOrCredit::Credit)
636 .units(balance.pending.cr_balance)
637 .transaction_id(UNASSIGNED_TRANSACTION_ID)
638 .build()
639 .expect("Couldn't build entry");
640 entries.push(entry);
641 }
642 if balance.pending.dr_balance != Decimal::ZERO {
643 let entry = NewEntry::builder()
644 .id(EntryId::new())
645 .journal_id(balance.journal_id)
646 .account_id(target_account_id)
647 .currency(balance.currency)
648 .sequence(1u32)
649 .layer(Layer::Pending)
650 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
651 .direction(DebitOrCredit::Debit)
652 .units(balance.pending.dr_balance)
653 .transaction_id(UNASSIGNED_TRANSACTION_ID)
654 .build()
655 .expect("Couldn't build entry");
656 entries.push(entry);
657 }
658 if balance.encumbrance.cr_balance != Decimal::ZERO {
659 let entry = NewEntry::builder()
660 .id(EntryId::new())
661 .journal_id(balance.journal_id)
662 .account_id(target_account_id)
663 .currency(balance.currency)
664 .sequence(1u32)
665 .layer(Layer::Encumbrance)
666 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
667 .direction(DebitOrCredit::Credit)
668 .units(balance.encumbrance.cr_balance)
669 .transaction_id(UNASSIGNED_TRANSACTION_ID)
670 .build()
671 .expect("Couldn't build entry");
672 entries.push(entry);
673 }
674 if balance.encumbrance.dr_balance != Decimal::ZERO {
675 let entry = NewEntry::builder()
676 .id(EntryId::new())
677 .journal_id(balance.journal_id)
678 .account_id(target_account_id)
679 .currency(balance.currency)
680 .sequence(1u32)
681 .layer(Layer::Encumbrance)
682 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
683 .direction(DebitOrCredit::Debit)
684 .units(balance.encumbrance.dr_balance)
685 .transaction_id(UNASSIGNED_TRANSACTION_ID)
686 .build()
687 .expect("Couldn't build entry");
688 entries.push(entry);
689 }
690}
691
692fn entries_for_remove_balance(
693 entries: &mut Vec<NewEntry>,
694 target_account_id: AccountId,
695 balance: BalanceSnapshot,
696) {
697 use rust_decimal::Decimal;
698
699 if balance.settled.cr_balance != Decimal::ZERO {
700 let entry = NewEntry::builder()
701 .id(EntryId::new())
702 .journal_id(balance.journal_id)
703 .account_id(target_account_id)
704 .currency(balance.currency)
705 .sequence(1u32)
706 .layer(Layer::Settled)
707 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
708 .direction(DebitOrCredit::Debit)
709 .units(balance.settled.cr_balance)
710 .transaction_id(UNASSIGNED_TRANSACTION_ID)
711 .build()
712 .expect("Couldn't build entry");
713 entries.push(entry);
714 }
715 if balance.settled.dr_balance != Decimal::ZERO {
716 let entry = NewEntry::builder()
717 .id(EntryId::new())
718 .journal_id(balance.journal_id)
719 .account_id(target_account_id)
720 .currency(balance.currency)
721 .sequence(1u32)
722 .layer(Layer::Settled)
723 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
724 .direction(DebitOrCredit::Credit)
725 .units(balance.settled.dr_balance)
726 .transaction_id(UNASSIGNED_TRANSACTION_ID)
727 .build()
728 .expect("Couldn't build entry");
729 entries.push(entry);
730 }
731 if balance.pending.cr_balance != Decimal::ZERO {
732 let entry = NewEntry::builder()
733 .id(EntryId::new())
734 .journal_id(balance.journal_id)
735 .account_id(target_account_id)
736 .currency(balance.currency)
737 .sequence(1u32)
738 .layer(Layer::Pending)
739 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
740 .direction(DebitOrCredit::Debit)
741 .units(balance.pending.cr_balance)
742 .transaction_id(UNASSIGNED_TRANSACTION_ID)
743 .build()
744 .expect("Couldn't build entry");
745 entries.push(entry);
746 }
747 if balance.pending.dr_balance != Decimal::ZERO {
748 let entry = NewEntry::builder()
749 .id(EntryId::new())
750 .journal_id(balance.journal_id)
751 .account_id(target_account_id)
752 .currency(balance.currency)
753 .sequence(1u32)
754 .layer(Layer::Pending)
755 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
756 .direction(DebitOrCredit::Credit)
757 .units(balance.pending.dr_balance)
758 .transaction_id(UNASSIGNED_TRANSACTION_ID)
759 .build()
760 .expect("Couldn't build entry");
761 entries.push(entry);
762 }
763 if balance.encumbrance.cr_balance != Decimal::ZERO {
764 let entry = NewEntry::builder()
765 .id(EntryId::new())
766 .journal_id(balance.journal_id)
767 .account_id(target_account_id)
768 .currency(balance.currency)
769 .sequence(1u32)
770 .layer(Layer::Encumbrance)
771 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
772 .direction(DebitOrCredit::Debit)
773 .units(balance.encumbrance.cr_balance)
774 .transaction_id(UNASSIGNED_TRANSACTION_ID)
775 .build()
776 .expect("Couldn't build entry");
777 entries.push(entry);
778 }
779 if balance.encumbrance.dr_balance != Decimal::ZERO {
780 let entry = NewEntry::builder()
781 .id(EntryId::new())
782 .journal_id(balance.journal_id)
783 .account_id(target_account_id)
784 .currency(balance.currency)
785 .sequence(1u32)
786 .layer(Layer::Encumbrance)
787 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
788 .direction(DebitOrCredit::Credit)
789 .units(balance.encumbrance.dr_balance)
790 .transaction_id(UNASSIGNED_TRANSACTION_ID)
791 .build()
792 .expect("Couldn't build entry");
793 entries.push(entry);
794 }
795}
796
797impl From<&AccountSetEvent> for OutboxEventPayload {
798 fn from(event: &AccountSetEvent) -> Self {
799 let source = es_entity::context::EventContext::current()
800 .data()
801 .lookup("data_source")
802 .ok()
803 .flatten()
804 .unwrap_or(DataSource::Local);
805
806 match event {
807 #[cfg(feature = "import")]
808 AccountSetEvent::Imported {
809 source,
810 values: account_set,
811 } => OutboxEventPayload::AccountSetCreated {
812 source: *source,
813 account_set: account_set.clone(),
814 },
815 AccountSetEvent::Initialized {
816 values: account_set,
817 } => OutboxEventPayload::AccountSetCreated {
818 source,
819 account_set: account_set.clone(),
820 },
821 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
822 source,
823 account_set: values.clone(),
824 fields: fields.clone(),
825 },
826 }
827 }
828}