cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6
7#[cfg(feature = "import")]
8use crate::primitives::DataSourceId;
9use crate::{
10    ledger_operation::*,
11    outbox::*,
12    primitives::{AccountId, AccountSetId, DataSource},
13};
14
15pub use entity::*;
16use error::*;
17pub use repo::entry_cursor::EntriesByCreatedAtCursor;
18use repo::*;
19
20#[derive(Clone)]
21pub struct Entries {
22    repo: EntryRepo,
23    outbox: Outbox,
24    _pool: PgPool,
25}
26
27impl Entries {
28    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
29        Self {
30            repo: EntryRepo::new(pool),
31            outbox,
32            _pool: pool.clone(),
33        }
34    }
35
36    pub async fn list_for_account_id(
37        &self,
38        account_id: AccountId,
39        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
40        direction: es_entity::ListDirection,
41    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
42        self.repo
43            .list_for_account_id_by_created_at(account_id, query, direction)
44            .await
45    }
46
47    pub async fn list_for_account_set_id(
48        &self,
49        account_id: AccountSetId,
50        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
51        direction: es_entity::ListDirection,
52    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
53        self.repo
54            .list_for_account_set_id_by_created_at(account_id, query, direction)
55            .await
56    }
57
58    pub(crate) async fn create_all_in_op(
59        &self,
60        db: &mut LedgerOperation<'_>,
61        entries: Vec<NewEntry>,
62    ) -> Result<Vec<EntryValues>, EntryError> {
63        let entries = self.repo.create_all_in_op(db.op(), entries).await?;
64        db.accumulate(
65            entries
66                .iter()
67                .map(|entry| OutboxEventPayload::EntryCreated {
68                    source: DataSource::Local,
69                    entry: entry.values().clone(),
70                }),
71        );
72        Ok(entries
73            .into_iter()
74            .map(|entry| entry.into_values())
75            .collect())
76    }
77
78    #[cfg(feature = "import")]
79    pub(crate) async fn sync_entry_creation(
80        &self,
81        mut db: es_entity::DbOp<'_>,
82        origin: DataSourceId,
83        values: EntryValues,
84    ) -> Result<(), EntryError> {
85        let mut entry = Entry::import(origin, values);
86        self.repo.import(&mut db, origin, &mut entry).await?;
87        let recorded_at = db.now();
88        let outbox_events: Vec<_> = entry
89            .events
90            .last_persisted(1)
91            .map(|p| OutboxEventPayload::from(&p.event))
92            .collect();
93        self.outbox
94            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
95            .await?;
96        Ok(())
97    }
98}
99
100impl From<&EntryEvent> for OutboxEventPayload {
101    fn from(event: &EntryEvent) -> Self {
102        match event {
103            #[cfg(feature = "import")]
104            EntryEvent::Imported {
105                source,
106                values: entry,
107            } => OutboxEventPayload::EntryCreated {
108                source: *source,
109                entry: entry.clone(),
110            },
111            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
112                source: DataSource::Local,
113                entry: entry.clone(),
114            },
115        }
116    }
117}