cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6
7#[cfg(feature = "import")]
8use crate::primitives::DataSourceId;
9use crate::{
10    ledger_operation::*,
11    outbox::*,
12    primitives::{AccountId, DataSource},
13};
14
15pub use entity::*;
16use error::*;
17pub use repo::entry_cursor::EntriesByCreatedAtCursor;
18use repo::*;
19
20#[derive(Clone)]
21pub struct Entries {
22    repo: EntryRepo,
23    outbox: Outbox,
24    _pool: PgPool,
25}
26
27impl Entries {
28    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
29        Self {
30            repo: EntryRepo::new(pool),
31            outbox,
32            _pool: pool.clone(),
33        }
34    }
35
36    pub async fn list_for_account_id(
37        &self,
38        account_id: AccountId,
39        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
40        direction: es_entity::ListDirection,
41    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
42        self.repo
43            .list_for_account_id_by_created_at(account_id, query, direction)
44            .await
45    }
46
47    pub(crate) async fn create_all_in_op(
48        &self,
49        db: &mut LedgerOperation<'_>,
50        entries: Vec<NewEntry>,
51    ) -> Result<Vec<EntryValues>, EntryError> {
52        let entries = self.repo.create_all_in_op(db.op(), entries).await?;
53        db.accumulate(
54            entries
55                .iter()
56                .map(|entry| OutboxEventPayload::EntryCreated {
57                    source: DataSource::Local,
58                    entry: entry.values().clone(),
59                }),
60        );
61        Ok(entries
62            .into_iter()
63            .map(|entry| entry.into_values())
64            .collect())
65    }
66
67    #[cfg(feature = "import")]
68    pub(crate) async fn sync_entry_creation(
69        &self,
70        mut db: es_entity::DbOp<'_>,
71        origin: DataSourceId,
72        values: EntryValues,
73    ) -> Result<(), EntryError> {
74        let mut entry = Entry::import(origin, values);
75        self.repo.import(&mut db, origin, &mut entry).await?;
76        let recorded_at = db.now();
77        let outbox_events: Vec<_> = entry
78            .events
79            .last_persisted(1)
80            .map(|p| OutboxEventPayload::from(&p.event))
81            .collect();
82        self.outbox
83            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
84            .await?;
85        Ok(())
86    }
87}
88
89impl From<&EntryEvent> for OutboxEventPayload {
90    fn from(event: &EntryEvent) -> Self {
91        match event {
92            #[cfg(feature = "import")]
93            EntryEvent::Imported {
94                source,
95                values: entry,
96            } => OutboxEventPayload::EntryCreated {
97                source: *source,
98                entry: entry.clone(),
99            },
100            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
101                source: DataSource::Local,
102                entry: entry.clone(),
103            },
104        }
105    }
106}