cala_ledger/entry/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use es_entity::EsEntity;
6use sqlx::PgPool;
7use std::collections::HashMap;
8
9#[cfg(feature = "import")]
10use crate::primitives::DataSourceId;
11use crate::{
12    ledger_operation::*,
13    outbox::*,
14    primitives::{AccountId, AccountSetId, DataSource, JournalId, TransactionId},
15};
16
17pub use entity::*;
18use error::*;
19pub use repo::entry_cursor::EntriesByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Entries {
24    repo: EntryRepo,
25    outbox: Outbox,
26    _pool: PgPool,
27}
28
29impl Entries {
30    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31        Self {
32            repo: EntryRepo::new(pool),
33            outbox,
34            _pool: pool.clone(),
35        }
36    }
37
38    pub async fn find_all(
39        &self,
40        entry_ids: &[EntryId],
41    ) -> Result<HashMap<EntryId, Entry>, EntryError> {
42        self.repo.find_all(entry_ids).await
43    }
44
45    pub async fn list_for_account_id(
46        &self,
47        account_id: AccountId,
48        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
49        direction: es_entity::ListDirection,
50    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
51        self.repo
52            .list_for_account_id_by_created_at(account_id, query, direction)
53            .await
54    }
55
56    pub async fn list_for_account_set_id(
57        &self,
58        account_id: AccountSetId,
59        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
60        direction: es_entity::ListDirection,
61    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
62        self.repo
63            .list_for_account_set_id_by_created_at(account_id, query, direction)
64            .await
65    }
66
67    pub async fn list_for_journal_id(
68        &self,
69        journal_id: JournalId,
70        query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
71        direction: es_entity::ListDirection,
72    ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
73        self.repo
74            .list_for_journal_id_by_created_at(journal_id, query, direction)
75            .await
76    }
77
78    pub async fn list_for_transaction_id(
79        &self,
80        transaction_id: TransactionId,
81    ) -> Result<Vec<Entry>, EntryError> {
82        let mut entries = self
83            .repo
84            .list_for_transaction_id_by_created_at(
85                transaction_id,
86                Default::default(),
87                Default::default(),
88            )
89            .await?
90            .entities;
91        entries.sort_by(|a, b| {
92            let a_sequence = a.values().sequence;
93            let b_sequence = b.values().sequence;
94            a_sequence.cmp(&b_sequence)
95        });
96        Ok(entries)
97    }
98
99    pub(crate) async fn create_all_in_op(
100        &self,
101        db: &mut LedgerOperation<'_>,
102        entries: Vec<NewEntry>,
103    ) -> Result<Vec<EntryValues>, EntryError> {
104        let entries = self.repo.create_all_in_op(db.op(), entries).await?;
105        db.accumulate(
106            entries
107                .iter()
108                .map(|entry| OutboxEventPayload::EntryCreated {
109                    source: DataSource::Local,
110                    entry: entry.values().clone(),
111                }),
112        );
113        Ok(entries
114            .into_iter()
115            .map(|entry| entry.into_values())
116            .collect())
117    }
118
119    #[cfg(feature = "import")]
120    pub(crate) async fn sync_entry_creation(
121        &self,
122        mut db: es_entity::DbOp<'_>,
123        origin: DataSourceId,
124        values: EntryValues,
125    ) -> Result<(), EntryError> {
126        let mut entry = Entry::import(origin, values);
127        self.repo.import(&mut db, origin, &mut entry).await?;
128        let recorded_at = db.now();
129        let outbox_events: Vec<_> = entry
130            .last_persisted(1)
131            .map(|p| OutboxEventPayload::from(&p.event))
132            .collect();
133        self.outbox
134            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
135            .await?;
136        Ok(())
137    }
138}
139
140impl From<&EntryEvent> for OutboxEventPayload {
141    fn from(event: &EntryEvent) -> Self {
142        match event {
143            #[cfg(feature = "import")]
144            EntryEvent::Imported {
145                source,
146                values: entry,
147            } => OutboxEventPayload::EntryCreated {
148                source: *source,
149                entry: entry.clone(),
150            },
151            EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
152                source: DataSource::Local,
153                entry: entry.clone(),
154            },
155        }
156    }
157}