cala_ledger/entry/
repo.rs

1use crate::primitives::{AccountId, AccountSetId, DataSourceId, EntryId, JournalId, TransactionId};
2use es_entity::*;
3use sqlx::PgPool;
4
5use super::{entity::*, error::*};
6
7#[derive(EsRepo, Debug, Clone)]
8#[es_repo(
9    entity = "Entry",
10    err = "EntryError",
11    columns(
12        account_id(ty = "AccountId", list_for, update(persist = false)),
13        journal_id(ty = "JournalId", list_for, update(persist = false)),
14        transaction_id(ty = "TransactionId", list_for, update(persist = false)),
15        data_source_id(
16            ty = "DataSourceId",
17            create(accessor = "data_source().into()"),
18            update(persist = false),
19        ),
20    ),
21    tbl_prefix = "cala"
22)]
23pub(crate) struct EntryRepo {
24    #[allow(dead_code)]
25    pool: PgPool,
26}
27
28impl EntryRepo {
29    pub(crate) fn new(pool: &PgPool) -> Self {
30        Self { pool: pool.clone() }
31    }
32
33    pub(super) async fn list_for_account_set_id_by_created_at(
34        &self,
35        account_set_id: AccountSetId,
36        query: es_entity::PaginatedQueryArgs<entry_cursor::EntriesByCreatedAtCursor>,
37        direction: es_entity::ListDirection,
38    ) -> Result<
39        es_entity::PaginatedQueryRet<Entry, entry_cursor::EntriesByCreatedAtCursor>,
40        EntryError,
41    > {
42        let es_entity::PaginatedQueryArgs { first, after } = query;
43        let (id, created_at) = if let Some(after) = after {
44            (Some(after.id), Some(after.created_at))
45        } else {
46            (None, None)
47        };
48
49        let executor = &self.pool;
50
51        let (entities, has_next_page) = match direction {
52                    es_entity::ListDirection::Ascending => {
53                        es_entity::es_query!(
54                            entity_ty = Entry,
55                            id_ty = EntryId,
56                            "cala",
57                            executor,
58                            r#"
59                            SELECT created_at, id
60                            FROM cala_entries
61                            JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
62                            WHERE cala_balance_history.account_id = $4
63                              AND (COALESCE((created_at, id) > ($3, $2), $2 IS NULL))
64                            ORDER BY created_at ASC, id ASC
65                            LIMIT $1"#,
66                            (first + 1) as i64,
67                            id as Option<EntryId>,
68                            created_at as Option<chrono::DateTime<chrono::Utc>>,
69                            account_set_id as AccountSetId,
70                        )
71                            .fetch_n(first)
72                            .await?
73                    },
74                    es_entity::ListDirection::Descending => {
75                        es_entity::es_query!(
76                            entity_ty = Entry,
77                            id_ty = EntryId,
78                            "cala",
79                            executor,
80                            r#"
81                            SELECT created_at, id
82                            FROM cala_entries
83                            JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
84                            WHERE cala_balance_history.account_id = $4
85                              AND (COALESCE((created_at, id) < ($3, $2), $2 IS NULL))
86                            ORDER BY created_at DESC, id DESC
87                            LIMIT $1"#,
88                            (first + 1) as i64,
89                            id as Option<EntryId>,
90                            created_at as Option<chrono::DateTime<chrono::Utc>>,
91                            account_set_id as AccountSetId,
92                        )
93                            .fetch_n(first)
94                            .await?
95                    },
96                };
97
98        let end_cursor = entities
99            .last()
100            .map(entry_cursor::EntriesByCreatedAtCursor::from);
101
102        Ok(es_entity::PaginatedQueryRet {
103            entities,
104            has_next_page,
105            end_cursor,
106        })
107    }
108
109    #[cfg(feature = "import")]
110    pub(super) async fn import(
111        &self,
112        op: &mut DbOp<'_>,
113        origin: DataSourceId,
114        entry: &mut Entry,
115    ) -> Result<(), EntryError> {
116        let recorded_at = op.now();
117        sqlx::query!(
118            r#"INSERT INTO cala_entries (data_source_id, id, journal_id, account_id, created_at)
119            VALUES ($1, $2, $3, $4, $5)"#,
120            origin as DataSourceId,
121            entry.values().id as EntryId,
122            entry.values().journal_id as JournalId,
123            entry.values().account_id as AccountId,
124            recorded_at,
125        )
126        .execute(&mut **op.tx())
127        .await?;
128        self.persist_events(op, entry.events_mut()).await?;
129        Ok(())
130    }
131}