cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7
8#[cfg(feature = "import")]
9use crate::primitives::DataSourceId;
10use crate::{
11    ledger_operation::*,
12    outbox::*,
13    primitives::{AccountId, AccountSetId, DataSource, JournalId, TransactionId},
14};
15
16pub use entity::*;
17use error::*;
18pub use repo::entry_cursor::EntriesByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Entries {
23    repo: EntryRepo,
24    // Used only for "import" feature
25    #[allow(dead_code)]
26    outbox: Outbox,
27    _pool: PgPool,
28}
29
30impl Entries {
31    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
32        Self {
33            repo: EntryRepo::new(pool),
34            outbox,
35            _pool: pool.clone(),
36        }
37    }
38
39    pub async fn find_all(
40        &self,
41        entry_ids: &[EntryId],
42    ) -> Result<HashMap<EntryId, Entry>, EntryError> {
43        self.repo.find_all(entry_ids).await
44    }
45
46    pub async fn list_for_account_id(
47        &self,
48        account_id: AccountId,
49        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
50        direction: es_entity::ListDirection,
51    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
52        self.repo
53            .list_for_account_id_by_created_at(account_id, query, direction)
54            .await
55    }
56
57    pub async fn list_for_account_set_id(
58        &self,
59        account_id: AccountSetId,
60        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
61        direction: es_entity::ListDirection,
62    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
63        self.repo
64            .list_for_account_set_id_by_created_at(account_id, query, direction)
65            .await
66    }
67
68    pub async fn list_for_journal_id(
69        &self,
70        journal_id: JournalId,
71        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
72        direction: es_entity::ListDirection,
73    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
74        self.repo
75            .list_for_journal_id_by_created_at(journal_id, query, direction)
76            .await
77    }
78
79    pub async fn list_for_transaction_id(
80        &self,
81        transaction_id: TransactionId,
82    ) -> Result<Vec<Entry>, EntryError> {
83        let mut entries = self
84            .repo
85            .list_for_transaction_id_by_created_at(
86                transaction_id,
87                Default::default(),
88                Default::default(),
89            )
90            .await?
91            .entities;
92        entries.sort_by(|a, b| {
93            let a_sequence = a.values().sequence;
94            let b_sequence = b.values().sequence;
95            a_sequence.cmp(&b_sequence)
96        });
97        Ok(entries)
98    }
99
100    pub async fn new_entries_for_voided_tx(
101        &self,
102        voiding_tx_id: TransactionId,
103        existing_tx_id: TransactionId,
104    ) -> Result<Vec<NewEntry>, EntryError> {
105        let entries = self.list_for_transaction_id(existing_tx_id).await?;
106
107        let new_entries = entries
108            .into_iter()
109            .map(|entry| {
110                let value = entry.into_values();
111
112                let mut builder = NewEntry::builder();
113                builder
114                    .id(EntryId::new())
115                    .transaction_id(voiding_tx_id)
116                    .journal_id(value.journal_id)
117                    .sequence(value.sequence)
118                    .account_id(value.account_id)
119                    .entry_type(format!("{}_VOID", value.entry_type))
120                    .layer(value.layer)
121                    .currency(value.currency)
122                    .units(-value.units)
123                    .direction(value.direction);
124
125                if let Some(description) = value.description {
126                    builder.description(description);
127                }
128                if let Some(metadata) = value.metadata {
129                    builder.metadata(metadata);
130                }
131
132                builder.build().expect("Couldn't build voided entry")
133            })
134            .collect();
135
136        Ok(new_entries)
137    }
138
139    pub(crate) async fn create_all_in_op(
140        &self,
141        db: &mut LedgerOperation<'_>,
142        entries: Vec<NewEntry>,
143    ) -> Result<Vec<EntryValues>, EntryError> {
144        let entries = self.repo.create_all_in_op(db, entries).await?;
145        db.accumulate(
146            entries
147                .iter()
148                .map(|entry| OutboxEventPayload::EntryCreated {
149                    source: DataSource::Local,
150                    entry: entry.values().clone(),
151                }),
152        );
153        Ok(entries
154            .into_iter()
155            .map(|entry| entry.into_values())
156            .collect())
157    }
158
159    #[cfg(feature = "import")]
160    pub(crate) async fn sync_entry_creation(
161        &self,
162        mut db: es_entity::DbOpWithTime<'_>,
163        origin: DataSourceId,
164        values: EntryValues,
165    ) -> Result<(), EntryError> {
166        use es_entity::EsEntity;
167
168        let mut entry = Entry::import(origin, values);
169        self.repo.import(&mut db, origin, &mut entry).await?;
170        let outbox_events: Vec<_> = entry
171            .last_persisted(1)
172            .map(|p| OutboxEventPayload::from(&p.event))
173            .collect();
174        let time = db.now();
175        self.outbox
176            .persist_events_at(db, outbox_events, time)
177            .await?;
178        Ok(())
179    }
180}
181
182impl From<&EntryEvent> for OutboxEventPayload {
183    fn from(event: &EntryEvent) -> Self {
184        match event {
185            #[cfg(feature = "import")]
186            EntryEvent::Imported {
187                source,
188                values: entry,
189            } => OutboxEventPayload::EntryCreated {
190                source: *source,
191                entry: entry.clone(),
192            },
193            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
194                source: DataSource::Local,
195                entry: entry.clone(),
196            },
197        }
198    }
199}