1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7use tracing::instrument;
8
9use crate::{
10 outbox::*,
11 primitives::{AccountId, AccountSetId, JournalId, TransactionId},
12};
13
14pub use entity::*;
15use error::*;
16pub use repo::entry_cursor::EntryByCreatedAtCursor;
17use repo::*;
18
19#[derive(Clone)]
20pub struct Entries {
21 repo: EntryRepo,
22}
23
24impl Entries {
25 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
26 Self {
27 repo: EntryRepo::new(pool, publisher),
28 }
29 }
30
31 #[instrument(name = "cala_ledger.entries.find_all", skip_all)]
32 pub async fn find_all(
33 &self,
34 entry_ids: &[EntryId],
35 ) -> Result<HashMap<EntryId, Entry>, EntryError> {
36 Ok(self.repo.find_all(entry_ids).await?)
37 }
38
39 #[instrument(name = "cala_ledger.entries.list_for_account_id", skip_all)]
40 pub async fn list_for_account_id(
41 &self,
42 account_id: AccountId,
43 query: es_entity::PaginatedQueryArgs<EntryByCreatedAtCursor>,
44 direction: es_entity::ListDirection,
45 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntryByCreatedAtCursor>, EntryError> {
46 Ok(self
47 .repo
48 .list_for_account_id_by_created_at(account_id, query, direction)
49 .await?)
50 }
51
52 #[instrument(name = "cala_ledger.entries.list_for_account_set_id", skip_all)]
53 pub async fn list_for_account_set_id(
54 &self,
55 account_id: AccountSetId,
56 query: es_entity::PaginatedQueryArgs<EntryByCreatedAtCursor>,
57 direction: es_entity::ListDirection,
58 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntryByCreatedAtCursor>, EntryError> {
59 self.repo
60 .list_for_account_set_id_by_created_at(account_id, query, direction)
61 .await
62 }
63
64 #[instrument(name = "cala_ledger.entries.list_for_journal_id", skip_all)]
65 pub async fn list_for_journal_id(
66 &self,
67 journal_id: JournalId,
68 query: es_entity::PaginatedQueryArgs<EntryByCreatedAtCursor>,
69 direction: es_entity::ListDirection,
70 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntryByCreatedAtCursor>, EntryError> {
71 Ok(self
72 .repo
73 .list_for_journal_id_by_created_at(journal_id, query, direction)
74 .await?)
75 }
76
77 #[instrument(name = "cala_ledger.entries.list_for_transaction_id", skip_all)]
78 pub async fn list_for_transaction_id(
79 &self,
80 transaction_id: TransactionId,
81 ) -> Result<Vec<Entry>, EntryError> {
82 let mut entries = self
83 .repo
84 .list_for_transaction_id_by_created_at(
85 transaction_id,
86 Default::default(),
87 Default::default(),
88 )
89 .await?
90 .entities;
91 entries.sort_by(|a, b| {
92 let a_sequence = a.values().sequence;
93 let b_sequence = b.values().sequence;
94 a_sequence.cmp(&b_sequence)
95 });
96 Ok(entries)
97 }
98
99 #[instrument(name = "cala_ledger.entries.new_entries_for_voided_tx", skip_all)]
100 pub async fn new_entries_for_voided_tx(
101 &self,
102 voiding_tx_id: TransactionId,
103 existing_tx_id: TransactionId,
104 ) -> Result<Vec<NewEntry>, EntryError> {
105 let entries = self.list_for_transaction_id(existing_tx_id).await?;
106
107 let new_entries = entries
108 .into_iter()
109 .map(|entry| {
110 let value = entry.into_values();
111
112 let mut builder = NewEntry::builder();
113 builder
114 .id(EntryId::new())
115 .transaction_id(voiding_tx_id)
116 .journal_id(value.journal_id)
117 .sequence(value.sequence)
118 .account_id(value.account_id)
119 .entry_type(format!("{}_VOID", value.entry_type))
120 .layer(value.layer)
121 .currency(value.currency)
122 .units(-value.units)
123 .direction(value.direction);
124
125 if let Some(description) = value.description {
126 builder.description(description);
127 }
128 if let Some(metadata) = value.metadata {
129 builder.metadata(metadata);
130 }
131
132 builder.build().expect("Couldn't build voided entry")
133 })
134 .collect();
135
136 Ok(new_entries)
137 }
138
139 #[instrument(name = "cala_ledger.entries.create_all_in_op", skip_all)]
140 pub(crate) async fn create_all_in_op(
141 &self,
142 db: &mut impl es_entity::AtomicOperation,
143 entries: Vec<NewEntry>,
144 ) -> Result<Vec<EntryValues>, EntryError> {
145 let entries = self.repo.create_all_in_op(db, entries).await?;
146 Ok(entries
147 .into_iter()
148 .map(|entry| entry.into_values())
149 .collect())
150 }
151}
152
153impl From<&EntryEvent> for OutboxEventPayload {
154 fn from(event: &EntryEvent) -> Self {
155 match event {
156 EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
157 entry: entry.clone(),
158 },
159 }
160 }
161}