Skip to main content

cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7use tracing::instrument;
8
9use crate::{
10    outbox::*,
11    primitives::{AccountId, AccountSetId, JournalId, TransactionId},
12};
13
14pub use entity::*;
15use error::*;
16pub use repo::entry_cursor::EntryByCreatedAtCursor;
17use repo::*;
18
19#[derive(Clone)]
20pub struct Entries {
21    repo: EntryRepo,
22}
23
24impl Entries {
25    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
26        Self {
27            repo: EntryRepo::new(pool, publisher),
28        }
29    }
30
31    #[instrument(name = "cala_ledger.entries.find_all", skip_all)]
32    pub async fn find_all(
33        &self,
34        entry_ids: &[EntryId],
35    ) -> Result<HashMap<EntryId, Entry>, EntryError> {
36        Ok(self.repo.find_all(entry_ids).await?)
37    }
38
39    #[instrument(name = "cala_ledger.entries.list_for_account_id", skip_all)]
40    pub async fn list_for_account_id(
41        &self,
42        account_id: AccountId,
43        query: es_entity::PaginatedQueryArgs<EntryByCreatedAtCursor>,
44        direction: es_entity::ListDirection,
45    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntryByCreatedAtCursor>, EntryError> {
46        Ok(self
47            .repo
48            .list_for_account_id_by_created_at(account_id, query, direction)
49            .await?)
50    }
51
52    #[instrument(name = "cala_ledger.entries.list_for_account_set_id", skip_all)]
53    pub async fn list_for_account_set_id(
54        &self,
55        account_id: AccountSetId,
56        query: es_entity::PaginatedQueryArgs<EntryByCreatedAtCursor>,
57        direction: es_entity::ListDirection,
58    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntryByCreatedAtCursor>, EntryError> {
59        self.repo
60            .list_for_account_set_id_by_created_at(account_id, query, direction)
61            .await
62    }
63
64    #[instrument(name = "cala_ledger.entries.list_for_journal_id", skip_all)]
65    pub async fn list_for_journal_id(
66        &self,
67        journal_id: JournalId,
68        query: es_entity::PaginatedQueryArgs<EntryByCreatedAtCursor>,
69        direction: es_entity::ListDirection,
70    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntryByCreatedAtCursor>, EntryError> {
71        Ok(self
72            .repo
73            .list_for_journal_id_by_created_at(journal_id, query, direction)
74            .await?)
75    }
76
77    #[instrument(name = "cala_ledger.entries.list_for_transaction_id", skip_all)]
78    pub async fn list_for_transaction_id(
79        &self,
80        transaction_id: TransactionId,
81    ) -> Result<Vec<Entry>, EntryError> {
82        let mut entries = self
83            .repo
84            .list_for_transaction_id_by_created_at(
85                transaction_id,
86                Default::default(),
87                Default::default(),
88            )
89            .await?
90            .entities;
91        entries.sort_by(|a, b| {
92            let a_sequence = a.values().sequence;
93            let b_sequence = b.values().sequence;
94            a_sequence.cmp(&b_sequence)
95        });
96        Ok(entries)
97    }
98
99    #[instrument(name = "cala_ledger.entries.new_entries_for_voided_tx", skip_all)]
100    pub async fn new_entries_for_voided_tx(
101        &self,
102        voiding_tx_id: TransactionId,
103        existing_tx_id: TransactionId,
104    ) -> Result<Vec<NewEntry>, EntryError> {
105        let entries = self.list_for_transaction_id(existing_tx_id).await?;
106
107        let new_entries = entries
108            .into_iter()
109            .map(|entry| {
110                let value = entry.into_values();
111
112                let mut builder = NewEntry::builder();
113                builder
114                    .id(EntryId::new())
115                    .transaction_id(voiding_tx_id)
116                    .journal_id(value.journal_id)
117                    .sequence(value.sequence)
118                    .account_id(value.account_id)
119                    .entry_type(format!("{}_VOID", value.entry_type))
120                    .layer(value.layer)
121                    .currency(value.currency)
122                    .units(-value.units)
123                    .direction(value.direction);
124
125                if let Some(description) = value.description {
126                    builder.description(description);
127                }
128                if let Some(metadata) = value.metadata {
129                    builder.metadata(metadata);
130                }
131
132                builder.build().expect("Couldn't build voided entry")
133            })
134            .collect();
135
136        Ok(new_entries)
137    }
138
139    #[instrument(name = "cala_ledger.entries.create_all_in_op", skip_all)]
140    pub(crate) async fn create_all_in_op(
141        &self,
142        db: &mut impl es_entity::AtomicOperation,
143        entries: Vec<NewEntry>,
144    ) -> Result<Vec<EntryValues>, EntryError> {
145        let entries = self.repo.create_all_in_op(db, entries).await?;
146        Ok(entries
147            .into_iter()
148            .map(|entry| entry.into_values())
149            .collect())
150    }
151}
152
153impl From<&EntryEvent> for OutboxEventPayload {
154    fn from(event: &EntryEvent) -> Self {
155        match event {
156            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
157                entry: entry.clone(),
158            },
159        }
160    }
161}