1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7use tracing::instrument;
8
9#[cfg(feature = "import")]
10use crate::primitives::DataSourceId;
11use crate::{
12 outbox::*,
13 primitives::{AccountId, AccountSetId, DataSource, JournalId, TransactionId},
14};
15
16pub use entity::*;
17use error::*;
18pub use repo::entry_cursor::EntriesByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Entries {
23 repo: EntryRepo,
24}
25
26impl Entries {
27 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
28 Self {
29 repo: EntryRepo::new(pool, publisher),
30 }
31 }
32
33 #[instrument(name = "cala_ledger.entries.find_all", skip_all)]
34 pub async fn find_all(
35 &self,
36 entry_ids: &[EntryId],
37 ) -> Result<HashMap<EntryId, Entry>, EntryError> {
38 self.repo.find_all(entry_ids).await
39 }
40
41 #[instrument(name = "cala_ledger.entries.list_for_account_id", skip_all)]
42 pub async fn list_for_account_id(
43 &self,
44 account_id: AccountId,
45 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
46 direction: es_entity::ListDirection,
47 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
48 self.repo
49 .list_for_account_id_by_created_at(account_id, query, direction)
50 .await
51 }
52
53 #[instrument(name = "cala_ledger.entries.list_for_account_set_id", skip_all)]
54 pub async fn list_for_account_set_id(
55 &self,
56 account_id: AccountSetId,
57 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
58 direction: es_entity::ListDirection,
59 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
60 self.repo
61 .list_for_account_set_id_by_created_at(account_id, query, direction)
62 .await
63 }
64
65 #[instrument(name = "cala_ledger.entries.list_for_journal_id", skip_all)]
66 pub async fn list_for_journal_id(
67 &self,
68 journal_id: JournalId,
69 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
70 direction: es_entity::ListDirection,
71 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
72 self.repo
73 .list_for_journal_id_by_created_at(journal_id, query, direction)
74 .await
75 }
76
77 #[instrument(name = "cala_ledger.entries.list_for_transaction_id", skip_all)]
78 pub async fn list_for_transaction_id(
79 &self,
80 transaction_id: TransactionId,
81 ) -> Result<Vec<Entry>, EntryError> {
82 let mut entries = self
83 .repo
84 .list_for_transaction_id_by_created_at(
85 transaction_id,
86 Default::default(),
87 Default::default(),
88 )
89 .await?
90 .entities;
91 entries.sort_by(|a, b| {
92 let a_sequence = a.values().sequence;
93 let b_sequence = b.values().sequence;
94 a_sequence.cmp(&b_sequence)
95 });
96 Ok(entries)
97 }
98
99 #[instrument(name = "cala_ledger.entries.new_entries_for_voided_tx", skip_all)]
100 pub async fn new_entries_for_voided_tx(
101 &self,
102 voiding_tx_id: TransactionId,
103 existing_tx_id: TransactionId,
104 ) -> Result<Vec<NewEntry>, EntryError> {
105 let entries = self.list_for_transaction_id(existing_tx_id).await?;
106
107 let new_entries = entries
108 .into_iter()
109 .map(|entry| {
110 let value = entry.into_values();
111
112 let mut builder = NewEntry::builder();
113 builder
114 .id(EntryId::new())
115 .transaction_id(voiding_tx_id)
116 .journal_id(value.journal_id)
117 .sequence(value.sequence)
118 .account_id(value.account_id)
119 .entry_type(format!("{}_VOID", value.entry_type))
120 .layer(value.layer)
121 .currency(value.currency)
122 .units(-value.units)
123 .direction(value.direction);
124
125 if let Some(description) = value.description {
126 builder.description(description);
127 }
128 if let Some(metadata) = value.metadata {
129 builder.metadata(metadata);
130 }
131
132 builder.build().expect("Couldn't build voided entry")
133 })
134 .collect();
135
136 Ok(new_entries)
137 }
138
139 #[instrument(name = "cala_ledger.entries.create_all_in_op", skip_all)]
140 pub(crate) async fn create_all_in_op(
141 &self,
142 db: &mut impl es_entity::AtomicOperation,
143 entries: Vec<NewEntry>,
144 ) -> Result<Vec<EntryValues>, EntryError> {
145 let entries = self.repo.create_all_in_op(db, entries).await?;
146 Ok(entries
147 .into_iter()
148 .map(|entry| entry.into_values())
149 .collect())
150 }
151
152 #[cfg(feature = "import")]
153 #[instrument(name = "cala_ledger.entries.sync_entry_creation", skip_all)]
154 pub(crate) async fn sync_entry_creation(
155 &self,
156 mut db: es_entity::DbOpWithTime<'_>,
157 origin: DataSourceId,
158 values: EntryValues,
159 ) -> Result<(), EntryError> {
160 let mut entry = Entry::import(origin, values);
161 self.repo.import(&mut db, origin, &mut entry).await?;
162 db.commit().await?;
163 Ok(())
164 }
165}
166
167impl From<&EntryEvent> for OutboxEventPayload {
168 fn from(event: &EntryEvent) -> Self {
169 let source = es_entity::context::EventContext::current()
170 .data()
171 .lookup("data_source")
172 .ok()
173 .flatten()
174 .unwrap_or(DataSource::Local);
175
176 match event {
177 #[cfg(feature = "import")]
178 EntryEvent::Imported {
179 source,
180 values: entry,
181 } => OutboxEventPayload::EntryCreated {
182 source: *source,
183 entry: entry.clone(),
184 },
185 EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
186 source,
187 entry: entry.clone(),
188 },
189 }
190 }
191}