cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6
7#[cfg(feature = "import")]
8use crate::primitives::DataSourceId;
9use crate::{
10    ledger_operation::*,
11    outbox::*,
12    primitives::{AccountId, AccountSetId, DataSource, JournalId},
13};
14
15pub use entity::*;
16use error::*;
17pub use repo::entry_cursor::EntriesByCreatedAtCursor;
18use repo::*;
19
20#[derive(Clone)]
21pub struct Entries {
22    repo: EntryRepo,
23    outbox: Outbox,
24    _pool: PgPool,
25}
26
27impl Entries {
28    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
29        Self {
30            repo: EntryRepo::new(pool),
31            outbox,
32            _pool: pool.clone(),
33        }
34    }
35
36    pub async fn list_for_account_id(
37        &self,
38        account_id: AccountId,
39        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
40        direction: es_entity::ListDirection,
41    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
42        self.repo
43            .list_for_account_id_by_created_at(account_id, query, direction)
44            .await
45    }
46
47    pub async fn list_for_account_set_id(
48        &self,
49        account_id: AccountSetId,
50        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
51        direction: es_entity::ListDirection,
52    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
53        self.repo
54            .list_for_account_set_id_by_created_at(account_id, query, direction)
55            .await
56    }
57
58    pub async fn list_for_journal_id(
59        &self,
60        journal_id: JournalId,
61        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
62        direction: es_entity::ListDirection,
63    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
64        self.repo
65            .list_for_journal_id_by_created_at(journal_id, query, direction)
66            .await
67    }
68
69    pub(crate) async fn create_all_in_op(
70        &self,
71        db: &mut LedgerOperation<'_>,
72        entries: Vec<NewEntry>,
73    ) -> Result<Vec<EntryValues>, EntryError> {
74        let entries = self.repo.create_all_in_op(db.op(), entries).await?;
75        db.accumulate(
76            entries
77                .iter()
78                .map(|entry| OutboxEventPayload::EntryCreated {
79                    source: DataSource::Local,
80                    entry: entry.values().clone(),
81                }),
82        );
83        Ok(entries
84            .into_iter()
85            .map(|entry| entry.into_values())
86            .collect())
87    }
88
89    #[cfg(feature = "import")]
90    pub(crate) async fn sync_entry_creation(
91        &self,
92        mut db: es_entity::DbOp<'_>,
93        origin: DataSourceId,
94        values: EntryValues,
95    ) -> Result<(), EntryError> {
96        let mut entry = Entry::import(origin, values);
97        self.repo.import(&mut db, origin, &mut entry).await?;
98        let recorded_at = db.now();
99        let outbox_events: Vec<_> = entry
100            .events
101            .last_persisted(1)
102            .map(|p| OutboxEventPayload::from(&p.event))
103            .collect();
104        self.outbox
105            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
106            .await?;
107        Ok(())
108    }
109}
110
111impl From<&EntryEvent> for OutboxEventPayload {
112    fn from(event: &EntryEvent) -> Self {
113        match event {
114            #[cfg(feature = "import")]
115            EntryEvent::Imported {
116                source,
117                values: entry,
118            } => OutboxEventPayload::EntryCreated {
119                source: *source,
120                entry: entry.clone(),
121            },
122            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
123                source: DataSource::Local,
124                entry: entry.clone(),
125            },
126        }
127    }
128}