1mod cursor;
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::clock::ClockHandle;
7use sqlx::PgPool;
8use std::collections::HashMap;
9use tracing::instrument;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14 account::*,
15 balance::*,
16 entry::*,
17 outbox::*,
18 primitives::{DataSource, DebitOrCredit, JournalId, Layer},
19};
20
21pub use cursor::*;
22pub use entity::*;
23use error::*;
24use repo::*;
25pub use repo::{account_set_cursor::*, members_cursor::*};
26
27const UNASSIGNED_TRANSACTION_ID: uuid::Uuid = uuid::Uuid::nil();
28
29#[derive(Clone)]
30pub struct AccountSets {
31 repo: AccountSetRepo,
32 accounts: Accounts,
33 entries: Entries,
34 balances: Balances,
35 clock: ClockHandle,
36}
37
38impl AccountSets {
39 pub(crate) fn new(
40 pool: &PgPool,
41 publisher: &OutboxPublisher,
42 accounts: &Accounts,
43 entries: &Entries,
44 balances: &Balances,
45 clock: &ClockHandle,
46 ) -> Self {
47 Self {
48 repo: AccountSetRepo::new(pool, publisher),
49 accounts: accounts.clone(),
50 entries: entries.clone(),
51 balances: balances.clone(),
52 clock: clock.clone(),
53 }
54 }
55 #[instrument(name = "cala_ledger.account_sets.create", skip(self))]
56 pub async fn create(
57 &self,
58 new_account_set: NewAccountSet,
59 ) -> Result<AccountSet, AccountSetError> {
60 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
61 let account_set = self.create_in_op(&mut op, new_account_set).await?;
62 op.commit().await?;
63 Ok(account_set)
64 }
65
66 #[instrument(name = "cala_ledger.account_sets.create_in_op", skip(self, db))]
67 pub async fn create_in_op(
68 &self,
69 db: &mut impl es_entity::AtomicOperation,
70 new_account_set: NewAccountSet,
71 ) -> Result<AccountSet, AccountSetError> {
72 let new_account = NewAccount::builder()
73 .id(new_account_set.id)
74 .name(String::new())
75 .code(new_account_set.id.to_string())
76 .normal_balance_type(new_account_set.normal_balance_type)
77 .is_account_set(true)
78 .velocity_context_values(new_account_set.context_values())
79 .build()
80 .expect("Failed to build account");
81 self.accounts.create_in_op(db, new_account).await?;
82
83 let account_set = self.repo.create_in_op(db, new_account_set).await?;
84
85 Ok(account_set)
86 }
87
88 #[instrument(name = "cala_ledger.account_sets.create_all", skip(self, new_account_sets), fields(count = new_account_sets.len()))]
89 pub async fn create_all(
90 &self,
91 new_account_sets: Vec<NewAccountSet>,
92 ) -> Result<Vec<AccountSet>, AccountSetError> {
93 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
94 let account_sets = self.create_all_in_op(&mut op, new_account_sets).await?;
95 op.commit().await?;
96 Ok(account_sets)
97 }
98
99 #[instrument(name = "cala_ledger.account_sets.create_all_in_op", skip(self, db))]
100 pub async fn create_all_in_op(
101 &self,
102 db: &mut impl es_entity::AtomicOperation,
103 new_account_sets: Vec<NewAccountSet>,
104 ) -> Result<Vec<AccountSet>, AccountSetError> {
105 let mut new_accounts = Vec::new();
106 for new_account_set in new_account_sets.iter() {
107 let new_account = NewAccount::builder()
108 .id(new_account_set.id)
109 .name(String::new())
110 .code(new_account_set.id.to_string())
111 .normal_balance_type(new_account_set.normal_balance_type)
112 .is_account_set(true)
113 .velocity_context_values(new_account_set.context_values())
114 .build()
115 .expect("Failed to build account");
116 new_accounts.push(new_account);
117 }
118 self.accounts.create_all_in_op(db, new_accounts).await?;
119
120 let account_sets = self.repo.create_all_in_op(db, new_account_sets).await?;
121
122 Ok(account_sets)
123 }
124
125 #[instrument(name = "cala_ledger.account_sets.persist", skip(self, account_set))]
126 pub async fn persist(&self, account_set: &mut AccountSet) -> Result<(), AccountSetError> {
127 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
128 self.persist_in_op(&mut op, account_set).await?;
129 op.commit().await?;
130 Ok(())
131 }
132
133 #[instrument(
134 name = "cala_ledger.account_sets.persist_in_op",
135 skip(self, db, account_set)
136 )]
137 pub async fn persist_in_op(
138 &self,
139 db: &mut impl es_entity::AtomicOperation,
140 account_set: &mut AccountSet,
141 ) -> Result<(), AccountSetError> {
142 self.repo.update_in_op(db, account_set).await?;
143
144 self.accounts
145 .update_velocity_context_values_in_op(db, account_set.values())
146 .await?;
147
148 Ok(())
149 }
150
151 #[instrument(name = "cala_ledger.account_sets.add_member", skip(self, member), fields(account_set_id = %account_set_id))]
152 pub async fn add_member(
153 &self,
154 account_set_id: AccountSetId,
155 member: impl Into<AccountSetMemberId>,
156 ) -> Result<AccountSet, AccountSetError> {
157 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
158 let account_set = self
159 .add_member_in_op(&mut op, account_set_id, member)
160 .await?;
161 op.commit().await?;
162 Ok(account_set)
163 }
164
165 #[instrument(name = "cala_ledger.account_sets.add_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id, is_account = tracing::field::Empty, is_account_set = tracing::field::Empty, member_id = tracing::field::Empty))]
166 pub async fn add_member_in_op(
167 &self,
168 op: &mut impl es_entity::AtomicOperation,
169 account_set_id: AccountSetId,
170 member: impl Into<AccountSetMemberId>,
171 ) -> Result<AccountSet, AccountSetError> {
172 let member = member.into();
173 let (time, parents, account_set, member_id) = match member {
174 AccountSetMemberId::Account(id) => {
175 tracing::Span::current().record("is_account", true);
176 tracing::Span::current().record("is_account_set", false);
177 tracing::Span::current().record("member_id", tracing::field::display(&id));
178 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
179 let (time, parents) = self
180 .repo
181 .add_member_account_and_return_parents(&mut *op, account_set_id, id)
182 .await?;
183 (time, parents, set, id)
184 }
185 AccountSetMemberId::AccountSet(id) => {
186 tracing::Span::current().record("is_account", false);
187 tracing::Span::current().record("is_account_set", true);
188 tracing::Span::current().record("member_id", tracing::field::display(&id));
189 let mut accounts = self
190 .repo
191 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
192 .await?;
193 let target = accounts
194 .remove(&account_set_id)
195 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
196 let member = accounts
197 .remove(&id)
198 .ok_or(AccountSetError::CouldNotFindById(id))?;
199
200 if target.values().journal_id != member.values().journal_id {
201 return Err(AccountSetError::JournalIdMismatch);
202 }
203
204 let (time, parents) = self
205 .repo
206 .add_member_set_and_return_parents(op, account_set_id, id)
207 .await?;
208 (time, parents, target, AccountId::from(id))
209 }
210 };
211
212 let balances = self
213 .balances
214 .find_balances_for_update(op, account_set.values().journal_id, member_id)
215 .await?;
216
217 let target_account_id = AccountId::from(&account_set.id());
218 let mut entries = Vec::new();
219 for balance in balances.into_values() {
220 entries_for_add_balance(&mut entries, target_account_id, balance);
221 }
222
223 if entries.is_empty() {
224 return Ok(account_set);
225 }
226 let entries = self.entries.create_all_in_op(op, entries).await?;
227 let mappings = std::iter::once((target_account_id, parents)).collect();
228 self.balances
229 .update_balances_in_op(
230 op,
231 account_set.values().journal_id,
232 entries,
233 time.date_naive(),
234 time,
235 mappings,
236 )
237 .await?;
238
239 Ok(account_set)
240 }
241
242 #[instrument(name = "cala_ledger.account_sets.remove_member", skip(self, member), fields(account_set_id = %account_set_id))]
243 pub async fn remove_member(
244 &self,
245 account_set_id: AccountSetId,
246 member: impl Into<AccountSetMemberId>,
247 ) -> Result<AccountSet, AccountSetError> {
248 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
249 let account_set = self
250 .remove_member_in_op(&mut op, account_set_id, member)
251 .await?;
252 op.commit().await?;
253 Ok(account_set)
254 }
255
256 #[instrument(name = "cala_ledger.account_sets.remove_member_in_op", skip(self, op, member), fields(account_set_id = %account_set_id))]
257 pub async fn remove_member_in_op(
258 &self,
259 op: &mut impl es_entity::AtomicOperation,
260 account_set_id: AccountSetId,
261 member: impl Into<AccountSetMemberId>,
262 ) -> Result<AccountSet, AccountSetError> {
263 let member = member.into();
264 let (time, parents, account_set, member_id) = match member {
265 AccountSetMemberId::Account(id) => {
266 let set = self.repo.find_by_id_in_op(&mut *op, account_set_id).await?;
267 let (time, parents) = self
268 .repo
269 .remove_member_account_and_return_parents(op, account_set_id, id)
270 .await?;
271 (time, parents, set, id)
272 }
273 AccountSetMemberId::AccountSet(id) => {
274 let mut accounts = self
275 .repo
276 .find_all_in_op::<AccountSet>(&mut *op, &[account_set_id, id])
277 .await?;
278 let target = accounts
279 .remove(&account_set_id)
280 .ok_or(AccountSetError::CouldNotFindById(account_set_id))?;
281 let member = accounts
282 .remove(&id)
283 .ok_or(AccountSetError::CouldNotFindById(id))?;
284
285 if target.values().journal_id != member.values().journal_id {
286 return Err(AccountSetError::JournalIdMismatch);
287 }
288
289 let (time, parents) = self
290 .repo
291 .remove_member_set_and_return_parents(op, account_set_id, id)
292 .await?;
293 (time, parents, target, AccountId::from(id))
294 }
295 };
296
297 let balances = self
298 .balances
299 .find_balances_for_update(op, account_set.values().journal_id, member_id)
300 .await?;
301
302 let target_account_id = AccountId::from(&account_set.id());
303 let mut entries = Vec::new();
304 for balance in balances.into_values() {
305 entries_for_remove_balance(&mut entries, target_account_id, balance);
306 }
307
308 if entries.is_empty() {
309 return Ok(account_set);
310 }
311 let entries = self.entries.create_all_in_op(op, entries).await?;
312 let mappings = std::iter::once((target_account_id, parents)).collect();
313 self.balances
314 .update_balances_in_op(
315 op,
316 account_set.values().journal_id,
317 entries,
318 time.date_naive(),
319 time,
320 mappings,
321 )
322 .await?;
323
324 Ok(account_set)
325 }
326
327 #[instrument(name = "cala_ledger.account_sets.find_all", skip(self))]
328 pub async fn find_all<T: From<AccountSet>>(
329 &self,
330 account_set_ids: &[AccountSetId],
331 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
332 self.repo.find_all(account_set_ids).await
333 }
334
335 #[instrument(name = "cala_ledger.account_sets.find_all_in_op", skip(self, op))]
336 pub async fn find_all_in_op<T: From<AccountSet>>(
337 &self,
338 op: &mut impl es_entity::AtomicOperation,
339 account_set_ids: &[AccountSetId],
340 ) -> Result<HashMap<AccountSetId, T>, AccountSetError> {
341 self.repo.find_all_in_op(op, account_set_ids).await
342 }
343
344 #[instrument(name = "cala_ledger.account_sets.find", skip(self))]
345 pub async fn find(&self, account_set_id: AccountSetId) -> Result<AccountSet, AccountSetError> {
346 self.repo.find_by_id(account_set_id).await
347 }
348
349 #[instrument(name = "cala_ledger.account_sets.find_in_op", skip(self, op))]
350 pub async fn find_in_op(
351 &self,
352 op: &mut impl es_entity::AtomicOperation,
353 account_set_id: AccountSetId,
354 ) -> Result<AccountSet, AccountSetError> {
355 self.repo.find_by_id_in_op(op, account_set_id).await
356 }
357
358 #[instrument(name = "cala_ledger.accounts_sets.find_by_external_id", skip(self))]
359 pub async fn find_by_external_id(
360 &self,
361 external_id: String,
362 ) -> Result<AccountSet, AccountSetError> {
363 self.repo.find_by_external_id(Some(external_id)).await
364 }
365
366 #[instrument(name = "cala_ledger.account_sets.find_where_member", skip(self))]
367 pub async fn find_where_member(
368 &self,
369 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
370 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
371 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
372 {
373 match member.into() {
374 AccountSetMemberId::Account(account_id) => {
375 self.repo
376 .find_where_account_is_member(account_id, query)
377 .await
378 }
379 AccountSetMemberId::AccountSet(account_set_id) => {
380 self.repo
381 .find_where_account_set_is_member(account_set_id, query)
382 .await
383 }
384 }
385 }
386
387 #[instrument(name = "cala_ledger.account_sets.list_for_name", skip(self))]
388 pub async fn list_for_name(
389 &self,
390 name: String,
391 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
392 ) -> Result<
393 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
394 AccountSetError,
395 > {
396 self.repo
397 .list_for_name_by_created_at(name, args, Default::default())
398 .await
399 }
400
401 #[instrument(name = "cala_ledger.account_sets.list_for_name_in_op", skip(self, op))]
402 pub async fn list_for_name_in_op(
403 &self,
404 op: &mut impl es_entity::AtomicOperation,
405 name: String,
406 args: es_entity::PaginatedQueryArgs<AccountSetsByCreatedAtCursor>,
407 ) -> Result<
408 es_entity::PaginatedQueryRet<AccountSet, AccountSetsByCreatedAtCursor>,
409 AccountSetError,
410 > {
411 self.repo
412 .list_for_name_by_created_at_in_op(op, name, args, Default::default())
413 .await
414 }
415
416 #[instrument(
417 name = "cala_ledger.account_sets.find_where_member_in_op",
418 skip(self, op)
419 )]
420 pub async fn find_where_member_in_op(
421 &self,
422 op: &mut impl es_entity::AtomicOperation,
423 member: impl Into<AccountSetMemberId> + std::fmt::Debug,
424 query: es_entity::PaginatedQueryArgs<AccountSetsByNameCursor>,
425 ) -> Result<es_entity::PaginatedQueryRet<AccountSet, AccountSetsByNameCursor>, AccountSetError>
426 {
427 match member.into() {
428 AccountSetMemberId::Account(account_id) => {
429 self.repo
430 .find_where_account_is_member_in_op(op, account_id, query)
431 .await
432 }
433 AccountSetMemberId::AccountSet(account_set_id) => {
434 self.repo
435 .find_where_account_set_is_member_in_op(op, account_set_id, query)
436 .await
437 }
438 }
439 }
440
441 pub async fn list_members_by_created_at(
442 &self,
443 id: AccountSetId,
444 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
445 ) -> Result<
446 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
447 AccountSetError,
448 > {
449 self.repo.list_children_by_created_at(id, args).await
450 }
451
452 pub async fn list_members_by_created_at_in_op(
453 &self,
454 op: &mut impl es_entity::AtomicOperation,
455 id: AccountSetId,
456 args: es_entity::PaginatedQueryArgs<AccountSetMembersByCreatedAtCursor>,
457 ) -> Result<
458 es_entity::PaginatedQueryRet<AccountSetMember, AccountSetMembersByCreatedAtCursor>,
459 AccountSetError,
460 > {
461 self.repo
462 .list_children_by_created_at_in_op(op, id, args)
463 .await
464 }
465
466 pub async fn list_members_by_external_id(
467 &self,
468 id: AccountSetId,
469 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
470 ) -> Result<
471 es_entity::PaginatedQueryRet<
472 AccountSetMemberByExternalId,
473 AccountSetMembersByExternalIdCursor,
474 >,
475 AccountSetError,
476 > {
477 self.repo.list_children_by_external_id(id, args).await
478 }
479
480 pub async fn list_members_by_external_id_in_op(
481 &self,
482 op: &mut impl es_entity::AtomicOperation,
483 id: AccountSetId,
484 args: es_entity::PaginatedQueryArgs<AccountSetMembersByExternalIdCursor>,
485 ) -> Result<
486 es_entity::PaginatedQueryRet<
487 AccountSetMemberByExternalId,
488 AccountSetMembersByExternalIdCursor,
489 >,
490 AccountSetError,
491 > {
492 self.repo
493 .list_children_by_external_id_in_op(op, id, args)
494 .await
495 }
496
497 pub(crate) async fn fetch_mappings_in_op(
498 &self,
499 op: &mut impl es_entity::AtomicOperation,
500 journal_id: JournalId,
501 account_ids: &[AccountId],
502 ) -> Result<HashMap<AccountId, Vec<AccountSetId>>, AccountSetError> {
503 self.repo
504 .fetch_mappings_in_op(op, journal_id, account_ids)
505 .await
506 }
507
508 #[cfg(feature = "import")]
509 pub(crate) async fn sync_account_set_creation(
510 &self,
511 mut db: es_entity::DbOpWithTime<'_>,
512 origin: DataSourceId,
513 values: AccountSetValues,
514 ) -> Result<(), AccountSetError> {
515 let mut account_set = AccountSet::import(origin, values);
516 self.repo
517 .import_in_op(&mut db, origin, &mut account_set)
518 .await?;
519 db.commit().await?;
520 Ok(())
521 }
522
523 #[cfg(feature = "import")]
524 pub(crate) async fn sync_account_set_update(
525 &self,
526 mut db: es_entity::DbOpWithTime<'_>,
527 values: AccountSetValues,
528 fields: Vec<String>,
529 ) -> Result<(), AccountSetError> {
530 let mut account_set = self.repo.find_by_id_in_op(&mut db, values.id).await?;
531 let _ = account_set.update((values, fields));
532 self.repo.update_in_op(&mut db, &mut account_set).await?;
533 db.commit().await?;
534 Ok(())
535 }
536
537 #[cfg(feature = "import")]
538 pub(crate) async fn sync_account_set_member_creation(
539 &self,
540 mut db: es_entity::DbOpWithTime<'_>,
541 origin: DataSourceId,
542 account_set_id: AccountSetId,
543 member_id: AccountSetMemberId,
544 ) -> Result<(), AccountSetError> {
545 match member_id {
546 AccountSetMemberId::Account(account_id) => {
547 self.repo
548 .import_member_account_in_op(&mut db, origin, account_set_id, account_id)
549 .await?;
550 }
551 AccountSetMemberId::AccountSet(member_account_set_id) => {
552 self.repo
553 .import_member_set_in_op(&mut db, origin, account_set_id, member_account_set_id)
554 .await?;
555 }
556 }
557 db.commit().await?;
558 Ok(())
559 }
560
561 #[cfg(feature = "import")]
562 pub(crate) async fn sync_account_set_member_removal(
563 &self,
564 mut db: es_entity::DbOpWithTime<'_>,
565 origin: DataSourceId,
566 account_set_id: AccountSetId,
567 member_id: AccountSetMemberId,
568 ) -> Result<(), AccountSetError> {
569 match member_id {
570 AccountSetMemberId::Account(account_id) => {
571 self.repo
572 .import_remove_member_account(&mut db, origin, account_set_id, account_id)
573 .await?;
574 }
575 AccountSetMemberId::AccountSet(member_account_set_id) => {
576 self.repo
577 .import_remove_member_set(
578 &mut db,
579 origin,
580 account_set_id,
581 member_account_set_id,
582 )
583 .await?;
584 }
585 }
586 db.commit().await?;
587 Ok(())
588 }
589}
590
591fn entries_for_add_balance(
592 entries: &mut Vec<NewEntry>,
593 target_account_id: AccountId,
594 balance: BalanceSnapshot,
595) {
596 use rust_decimal::Decimal;
597
598 if balance.settled.cr_balance != Decimal::ZERO {
599 let entry = NewEntry::builder()
600 .id(EntryId::new())
601 .journal_id(balance.journal_id)
602 .account_id(target_account_id)
603 .currency(balance.currency)
604 .sequence(1u32)
605 .layer(Layer::Settled)
606 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_CR")
607 .direction(DebitOrCredit::Credit)
608 .units(balance.settled.cr_balance)
609 .transaction_id(UNASSIGNED_TRANSACTION_ID)
610 .build()
611 .expect("Couldn't build entry");
612 entries.push(entry);
613 }
614 if balance.settled.dr_balance != Decimal::ZERO {
615 let entry = NewEntry::builder()
616 .id(EntryId::new())
617 .journal_id(balance.journal_id)
618 .account_id(target_account_id)
619 .currency(balance.currency)
620 .sequence(1u32)
621 .layer(Layer::Settled)
622 .entry_type("ACCOUNT_SET_ADD_MEMBER_SETTLED_DR")
623 .direction(DebitOrCredit::Debit)
624 .units(balance.settled.dr_balance)
625 .transaction_id(UNASSIGNED_TRANSACTION_ID)
626 .build()
627 .expect("Couldn't build entry");
628 entries.push(entry);
629 }
630 if balance.pending.cr_balance != Decimal::ZERO {
631 let entry = NewEntry::builder()
632 .id(EntryId::new())
633 .journal_id(balance.journal_id)
634 .account_id(target_account_id)
635 .currency(balance.currency)
636 .sequence(1u32)
637 .layer(Layer::Pending)
638 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_CR")
639 .direction(DebitOrCredit::Credit)
640 .units(balance.pending.cr_balance)
641 .transaction_id(UNASSIGNED_TRANSACTION_ID)
642 .build()
643 .expect("Couldn't build entry");
644 entries.push(entry);
645 }
646 if balance.pending.dr_balance != Decimal::ZERO {
647 let entry = NewEntry::builder()
648 .id(EntryId::new())
649 .journal_id(balance.journal_id)
650 .account_id(target_account_id)
651 .currency(balance.currency)
652 .sequence(1u32)
653 .layer(Layer::Pending)
654 .entry_type("ACCOUNT_SET_ADD_MEMBER_PENDING_DR")
655 .direction(DebitOrCredit::Debit)
656 .units(balance.pending.dr_balance)
657 .transaction_id(UNASSIGNED_TRANSACTION_ID)
658 .build()
659 .expect("Couldn't build entry");
660 entries.push(entry);
661 }
662 if balance.encumbrance.cr_balance != Decimal::ZERO {
663 let entry = NewEntry::builder()
664 .id(EntryId::new())
665 .journal_id(balance.journal_id)
666 .account_id(target_account_id)
667 .currency(balance.currency)
668 .sequence(1u32)
669 .layer(Layer::Encumbrance)
670 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_CR")
671 .direction(DebitOrCredit::Credit)
672 .units(balance.encumbrance.cr_balance)
673 .transaction_id(UNASSIGNED_TRANSACTION_ID)
674 .build()
675 .expect("Couldn't build entry");
676 entries.push(entry);
677 }
678 if balance.encumbrance.dr_balance != Decimal::ZERO {
679 let entry = NewEntry::builder()
680 .id(EntryId::new())
681 .journal_id(balance.journal_id)
682 .account_id(target_account_id)
683 .currency(balance.currency)
684 .sequence(1u32)
685 .layer(Layer::Encumbrance)
686 .entry_type("ACCOUNT_SET_ADD_MEMBER_ENCUMBRANCE_DR")
687 .direction(DebitOrCredit::Debit)
688 .units(balance.encumbrance.dr_balance)
689 .transaction_id(UNASSIGNED_TRANSACTION_ID)
690 .build()
691 .expect("Couldn't build entry");
692 entries.push(entry);
693 }
694}
695
696fn entries_for_remove_balance(
697 entries: &mut Vec<NewEntry>,
698 target_account_id: AccountId,
699 balance: BalanceSnapshot,
700) {
701 use rust_decimal::Decimal;
702
703 if balance.settled.cr_balance != Decimal::ZERO {
704 let entry = NewEntry::builder()
705 .id(EntryId::new())
706 .journal_id(balance.journal_id)
707 .account_id(target_account_id)
708 .currency(balance.currency)
709 .sequence(1u32)
710 .layer(Layer::Settled)
711 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_DR")
712 .direction(DebitOrCredit::Debit)
713 .units(balance.settled.cr_balance)
714 .transaction_id(UNASSIGNED_TRANSACTION_ID)
715 .build()
716 .expect("Couldn't build entry");
717 entries.push(entry);
718 }
719 if balance.settled.dr_balance != Decimal::ZERO {
720 let entry = NewEntry::builder()
721 .id(EntryId::new())
722 .journal_id(balance.journal_id)
723 .account_id(target_account_id)
724 .currency(balance.currency)
725 .sequence(1u32)
726 .layer(Layer::Settled)
727 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_SETTLED_CR")
728 .direction(DebitOrCredit::Credit)
729 .units(balance.settled.dr_balance)
730 .transaction_id(UNASSIGNED_TRANSACTION_ID)
731 .build()
732 .expect("Couldn't build entry");
733 entries.push(entry);
734 }
735 if balance.pending.cr_balance != Decimal::ZERO {
736 let entry = NewEntry::builder()
737 .id(EntryId::new())
738 .journal_id(balance.journal_id)
739 .account_id(target_account_id)
740 .currency(balance.currency)
741 .sequence(1u32)
742 .layer(Layer::Pending)
743 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_DR")
744 .direction(DebitOrCredit::Debit)
745 .units(balance.pending.cr_balance)
746 .transaction_id(UNASSIGNED_TRANSACTION_ID)
747 .build()
748 .expect("Couldn't build entry");
749 entries.push(entry);
750 }
751 if balance.pending.dr_balance != Decimal::ZERO {
752 let entry = NewEntry::builder()
753 .id(EntryId::new())
754 .journal_id(balance.journal_id)
755 .account_id(target_account_id)
756 .currency(balance.currency)
757 .sequence(1u32)
758 .layer(Layer::Pending)
759 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_PENDING_CR")
760 .direction(DebitOrCredit::Credit)
761 .units(balance.pending.dr_balance)
762 .transaction_id(UNASSIGNED_TRANSACTION_ID)
763 .build()
764 .expect("Couldn't build entry");
765 entries.push(entry);
766 }
767 if balance.encumbrance.cr_balance != Decimal::ZERO {
768 let entry = NewEntry::builder()
769 .id(EntryId::new())
770 .journal_id(balance.journal_id)
771 .account_id(target_account_id)
772 .currency(balance.currency)
773 .sequence(1u32)
774 .layer(Layer::Encumbrance)
775 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_DR")
776 .direction(DebitOrCredit::Debit)
777 .units(balance.encumbrance.cr_balance)
778 .transaction_id(UNASSIGNED_TRANSACTION_ID)
779 .build()
780 .expect("Couldn't build entry");
781 entries.push(entry);
782 }
783 if balance.encumbrance.dr_balance != Decimal::ZERO {
784 let entry = NewEntry::builder()
785 .id(EntryId::new())
786 .journal_id(balance.journal_id)
787 .account_id(target_account_id)
788 .currency(balance.currency)
789 .sequence(1u32)
790 .layer(Layer::Encumbrance)
791 .entry_type("ACCOUNT_SET_REMOVE_MEMBER_ENCUMBRANCE_CR")
792 .direction(DebitOrCredit::Credit)
793 .units(balance.encumbrance.dr_balance)
794 .transaction_id(UNASSIGNED_TRANSACTION_ID)
795 .build()
796 .expect("Couldn't build entry");
797 entries.push(entry);
798 }
799}
800
801impl From<&AccountSetEvent> for OutboxEventPayload {
802 fn from(event: &AccountSetEvent) -> Self {
803 let source = es_entity::context::EventContext::current()
804 .data()
805 .lookup("data_source")
806 .ok()
807 .flatten()
808 .unwrap_or(DataSource::Local);
809
810 match event {
811 #[cfg(feature = "import")]
812 AccountSetEvent::Imported {
813 source,
814 values: account_set,
815 } => OutboxEventPayload::AccountSetCreated {
816 source: *source,
817 account_set: account_set.clone(),
818 },
819 AccountSetEvent::Initialized {
820 values: account_set,
821 } => OutboxEventPayload::AccountSetCreated {
822 source,
823 account_set: account_set.clone(),
824 },
825 AccountSetEvent::Updated { values, fields } => OutboxEventPayload::AccountSetUpdated {
826 source,
827 account_set: values.clone(),
828 fields: fields.clone(),
829 },
830 }
831 }
832}