cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7
8#[cfg(feature = "import")]
9use crate::primitives::DataSourceId;
10use crate::{
11    ledger_operation::*,
12    outbox::*,
13    primitives::{AccountId, AccountSetId, DataSource, JournalId, TransactionId},
14};
15
16pub use entity::*;
17use error::*;
18pub use repo::entry_cursor::EntriesByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Entries {
23    repo: EntryRepo,
24    outbox: Outbox,
25    _pool: PgPool,
26}
27
28impl Entries {
29    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
30        Self {
31            repo: EntryRepo::new(pool),
32            outbox,
33            _pool: pool.clone(),
34        }
35    }
36
37    pub async fn find_all(
38        &self,
39        entry_ids: &[EntryId],
40    ) -> Result<HashMap<EntryId, Entry>, EntryError> {
41        self.repo.find_all(entry_ids).await
42    }
43
44    pub async fn list_for_account_id(
45        &self,
46        account_id: AccountId,
47        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
48        direction: es_entity::ListDirection,
49    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
50        self.repo
51            .list_for_account_id_by_created_at(account_id, query, direction)
52            .await
53    }
54
55    pub async fn list_for_account_set_id(
56        &self,
57        account_id: AccountSetId,
58        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
59        direction: es_entity::ListDirection,
60    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
61        self.repo
62            .list_for_account_set_id_by_created_at(account_id, query, direction)
63            .await
64    }
65
66    pub async fn list_for_journal_id(
67        &self,
68        journal_id: JournalId,
69        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
70        direction: es_entity::ListDirection,
71    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
72        self.repo
73            .list_for_journal_id_by_created_at(journal_id, query, direction)
74            .await
75    }
76
77    pub async fn list_for_transaction_id(
78        &self,
79        transaction_id: TransactionId,
80    ) -> Result<Vec<Entry>, EntryError> {
81        let mut entries = self
82            .repo
83            .list_for_transaction_id_by_created_at(
84                transaction_id,
85                Default::default(),
86                Default::default(),
87            )
88            .await?
89            .entities;
90        entries.sort_by(|a, b| {
91            let a_sequence = a.values().sequence;
92            let b_sequence = b.values().sequence;
93            a_sequence.cmp(&b_sequence)
94        });
95        Ok(entries)
96    }
97
98    pub(crate) async fn create_all_in_op(
99        &self,
100        db: &mut LedgerOperation<'_>,
101        entries: Vec<NewEntry>,
102    ) -> Result<Vec<EntryValues>, EntryError> {
103        let entries = self.repo.create_all_in_op(db.op(), entries).await?;
104        db.accumulate(
105            entries
106                .iter()
107                .map(|entry| OutboxEventPayload::EntryCreated {
108                    source: DataSource::Local,
109                    entry: entry.values().clone(),
110                }),
111        );
112        Ok(entries
113            .into_iter()
114            .map(|entry| entry.into_values())
115            .collect())
116    }
117
118    #[cfg(feature = "import")]
119    pub(crate) async fn sync_entry_creation(
120        &self,
121        mut db: es_entity::DbOp<'_>,
122        origin: DataSourceId,
123        values: EntryValues,
124    ) -> Result<(), EntryError> {
125        let mut entry = Entry::import(origin, values);
126        self.repo.import(&mut db, origin, &mut entry).await?;
127        let recorded_at = db.now();
128        let outbox_events: Vec<_> = entry
129            .events
130            .last_persisted(1)
131            .map(|p| OutboxEventPayload::from(&p.event))
132            .collect();
133        self.outbox
134            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
135            .await?;
136        Ok(())
137    }
138}
139
140impl From<&EntryEvent> for OutboxEventPayload {
141    fn from(event: &EntryEvent) -> Self {
142        match event {
143            #[cfg(feature = "import")]
144            EntryEvent::Imported {
145                source,
146                values: entry,
147            } => OutboxEventPayload::EntryCreated {
148                source: *source,
149                entry: entry.clone(),
150            },
151            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
152                source: DataSource::Local,
153                entry: entry.clone(),
154            },
155        }
156    }
157}