cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7use tracing::instrument;
8
9#[cfg(feature = "import")]
10use crate::primitives::DataSourceId;
11use crate::{
12    outbox::*,
13    primitives::{AccountId, AccountSetId, DataSource, JournalId, TransactionId},
14};
15
16pub use entity::*;
17use error::*;
18pub use repo::entry_cursor::EntriesByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Entries {
23    repo: EntryRepo,
24}
25
26impl Entries {
27    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
28        Self {
29            repo: EntryRepo::new(pool, publisher),
30        }
31    }
32
33    #[instrument(name = "cala_ledger.entries.find_all", skip_all)]
34    pub async fn find_all(
35        &self,
36        entry_ids: &[EntryId],
37    ) -> Result<HashMap<EntryId, Entry>, EntryError> {
38        self.repo.find_all(entry_ids).await
39    }
40
41    #[instrument(name = "cala_ledger.entries.list_for_account_id", skip_all)]
42    pub async fn list_for_account_id(
43        &self,
44        account_id: AccountId,
45        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
46        direction: es_entity::ListDirection,
47    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
48        self.repo
49            .list_for_account_id_by_created_at(account_id, query, direction)
50            .await
51    }
52
53    #[instrument(name = "cala_ledger.entries.list_for_account_set_id", skip_all)]
54    pub async fn list_for_account_set_id(
55        &self,
56        account_id: AccountSetId,
57        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
58        direction: es_entity::ListDirection,
59    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
60        self.repo
61            .list_for_account_set_id_by_created_at(account_id, query, direction)
62            .await
63    }
64
65    #[instrument(name = "cala_ledger.entries.list_for_journal_id", skip_all)]
66    pub async fn list_for_journal_id(
67        &self,
68        journal_id: JournalId,
69        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
70        direction: es_entity::ListDirection,
71    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
72        self.repo
73            .list_for_journal_id_by_created_at(journal_id, query, direction)
74            .await
75    }
76
77    #[instrument(name = "cala_ledger.entries.list_for_transaction_id", skip_all)]
78    pub async fn list_for_transaction_id(
79        &self,
80        transaction_id: TransactionId,
81    ) -> Result<Vec<Entry>, EntryError> {
82        let mut entries = self
83            .repo
84            .list_for_transaction_id_by_created_at(
85                transaction_id,
86                Default::default(),
87                Default::default(),
88            )
89            .await?
90            .entities;
91        entries.sort_by(|a, b| {
92            let a_sequence = a.values().sequence;
93            let b_sequence = b.values().sequence;
94            a_sequence.cmp(&b_sequence)
95        });
96        Ok(entries)
97    }
98
99    #[instrument(name = "cala_ledger.entries.new_entries_for_voided_tx", skip_all)]
100    pub async fn new_entries_for_voided_tx(
101        &self,
102        voiding_tx_id: TransactionId,
103        existing_tx_id: TransactionId,
104    ) -> Result<Vec<NewEntry>, EntryError> {
105        let entries = self.list_for_transaction_id(existing_tx_id).await?;
106
107        let new_entries = entries
108            .into_iter()
109            .map(|entry| {
110                let value = entry.into_values();
111
112                let mut builder = NewEntry::builder();
113                builder
114                    .id(EntryId::new())
115                    .transaction_id(voiding_tx_id)
116                    .journal_id(value.journal_id)
117                    .sequence(value.sequence)
118                    .account_id(value.account_id)
119                    .entry_type(format!("{}_VOID", value.entry_type))
120                    .layer(value.layer)
121                    .currency(value.currency)
122                    .units(-value.units)
123                    .direction(value.direction);
124
125                if let Some(description) = value.description {
126                    builder.description(description);
127                }
128                if let Some(metadata) = value.metadata {
129                    builder.metadata(metadata);
130                }
131
132                builder.build().expect("Couldn't build voided entry")
133            })
134            .collect();
135
136        Ok(new_entries)
137    }
138
139    #[instrument(name = "cala_ledger.entries.create_all_in_op", skip_all)]
140    pub(crate) async fn create_all_in_op(
141        &self,
142        db: &mut impl es_entity::AtomicOperation,
143        entries: Vec<NewEntry>,
144    ) -> Result<Vec<EntryValues>, EntryError> {
145        let entries = self.repo.create_all_in_op(db, entries).await?;
146        Ok(entries
147            .into_iter()
148            .map(|entry| entry.into_values())
149            .collect())
150    }
151
152    #[cfg(feature = "import")]
153    #[instrument(name = "cala_ledger.entries.sync_entry_creation", skip_all)]
154    pub(crate) async fn sync_entry_creation(
155        &self,
156        mut db: es_entity::DbOpWithTime<'_>,
157        origin: DataSourceId,
158        values: EntryValues,
159    ) -> Result<(), EntryError> {
160        let mut entry = Entry::import(origin, values);
161        self.repo.import(&mut db, origin, &mut entry).await?;
162        db.commit().await?;
163        Ok(())
164    }
165}
166
167impl From<&EntryEvent> for OutboxEventPayload {
168    fn from(event: &EntryEvent) -> Self {
169        let source = es_entity::context::EventContext::current()
170            .data()
171            .lookup("data_source")
172            .ok()
173            .flatten()
174            .unwrap_or(DataSource::Local);
175
176        match event {
177            #[cfg(feature = "import")]
178            EntryEvent::Imported {
179                source,
180                values: entry,
181            } => OutboxEventPayload::EntryCreated {
182                source: *source,
183                entry: entry.clone(),
184            },
185            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
186                source,
187                entry: entry.clone(),
188            },
189        }
190    }
191}