cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7
8#[cfg(feature = "import")]
9use crate::primitives::DataSourceId;
10use crate::{
11    ledger_operation::*,
12    outbox::*,
13    primitives::{AccountId, AccountSetId, DataSource, JournalId},
14};
15
16pub use entity::*;
17use error::*;
18pub use repo::entry_cursor::EntriesByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Entries {
23    repo: EntryRepo,
24    outbox: Outbox,
25    _pool: PgPool,
26}
27
28impl Entries {
29    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
30        Self {
31            repo: EntryRepo::new(pool),
32            outbox,
33            _pool: pool.clone(),
34        }
35    }
36
37    pub async fn find_all(
38        &self,
39        entry_ids: &[EntryId],
40    ) -> Result<HashMap<EntryId, Entry>, EntryError> {
41        self.repo.find_all(entry_ids).await
42    }
43
44    pub async fn list_for_account_id(
45        &self,
46        account_id: AccountId,
47        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
48        direction: es_entity::ListDirection,
49    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
50        self.repo
51            .list_for_account_id_by_created_at(account_id, query, direction)
52            .await
53    }
54
55    pub async fn list_for_account_set_id(
56        &self,
57        account_id: AccountSetId,
58        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
59        direction: es_entity::ListDirection,
60    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
61        self.repo
62            .list_for_account_set_id_by_created_at(account_id, query, direction)
63            .await
64    }
65
66    pub async fn list_for_journal_id(
67        &self,
68        journal_id: JournalId,
69        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
70        direction: es_entity::ListDirection,
71    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
72        self.repo
73            .list_for_journal_id_by_created_at(journal_id, query, direction)
74            .await
75    }
76
77    pub(crate) async fn create_all_in_op(
78        &self,
79        db: &mut LedgerOperation<'_>,
80        entries: Vec<NewEntry>,
81    ) -> Result<Vec<EntryValues>, EntryError> {
82        let entries = self.repo.create_all_in_op(db.op(), entries).await?;
83        db.accumulate(
84            entries
85                .iter()
86                .map(|entry| OutboxEventPayload::EntryCreated {
87                    source: DataSource::Local,
88                    entry: entry.values().clone(),
89                }),
90        );
91        Ok(entries
92            .into_iter()
93            .map(|entry| entry.into_values())
94            .collect())
95    }
96
97    #[cfg(feature = "import")]
98    pub(crate) async fn sync_entry_creation(
99        &self,
100        mut db: es_entity::DbOp<'_>,
101        origin: DataSourceId,
102        values: EntryValues,
103    ) -> Result<(), EntryError> {
104        let mut entry = Entry::import(origin, values);
105        self.repo.import(&mut db, origin, &mut entry).await?;
106        let recorded_at = db.now();
107        let outbox_events: Vec<_> = entry
108            .events
109            .last_persisted(1)
110            .map(|p| OutboxEventPayload::from(&p.event))
111            .collect();
112        self.outbox
113            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
114            .await?;
115        Ok(())
116    }
117}
118
119impl From<&EntryEvent> for OutboxEventPayload {
120    fn from(event: &EntryEvent) -> Self {
121        match event {
122            #[cfg(feature = "import")]
123            EntryEvent::Imported {
124                source,
125                values: entry,
126            } => OutboxEventPayload::EntryCreated {
127                source: *source,
128                entry: entry.clone(),
129            },
130            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
131                source: DataSource::Local,
132                entry: entry.clone(),
133            },
134        }
135    }
136}