Skip to main content

cala_ledger/entry/
repo.rs

1use crate::{
2    outbox::OutboxPublisher,
3    primitives::{AccountId, AccountSetId, EntryId, JournalId, TransactionId},
4};
5use es_entity::*;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use super::{entity::*, error::*};
10
11#[derive(EsRepo, Debug, Clone)]
12#[es_repo(
13    entity = "Entry",
14    columns(
15        account_id(ty = "AccountId", list_for(by(created_at)), update(persist = false)),
16        journal_id(ty = "JournalId", list_for(by(created_at)), update(persist = false)),
17        transaction_id(
18            ty = "TransactionId",
19            list_for(by(created_at)),
20            update(persist = false)
21        ),
22    ),
23    tbl_prefix = "cala",
24    post_persist_hook = "publish",
25    persist_event_context = false
26)]
27pub(crate) struct EntryRepo {
28    #[allow(dead_code)]
29    pool: PgPool,
30    publisher: OutboxPublisher,
31}
32
33impl EntryRepo {
34    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
35        Self {
36            pool: pool.clone(),
37            publisher: publisher.clone(),
38        }
39    }
40
41    #[instrument(
42        name = "entry.list_for_account_set_id_by_created_at",
43        skip_all,
44        err(level = "warn")
45    )]
46    pub(super) async fn list_for_account_set_id_by_created_at(
47        &self,
48        account_set_id: AccountSetId,
49        query: es_entity::PaginatedQueryArgs<entry_cursor::EntryByCreatedAtCursor>,
50        direction: es_entity::ListDirection,
51    ) -> Result<es_entity::PaginatedQueryRet<Entry, entry_cursor::EntryByCreatedAtCursor>, EntryError>
52    {
53        let es_entity::PaginatedQueryArgs { first, after } = query;
54        let (id, created_at) = if let Some(after) = after {
55            (Some(after.id), Some(after.created_at))
56        } else {
57            (None, None)
58        };
59
60        let executor = &self.pool;
61
62        let (entities, has_next_page) = match direction {
63                    es_entity::ListDirection::Ascending => {
64                        es_entity::es_query!(
65                            entity = Entry,
66                            r#"
67                            SELECT created_at, id
68                            FROM cala_entries
69                            JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
70                            WHERE cala_balance_history.account_id = $4
71                              AND (COALESCE((created_at, id) > ($3, $2), $2 IS NULL))
72                            ORDER BY created_at ASC, id ASC
73                            LIMIT $1"#,
74                            (first + 1) as i64,
75                            id as Option<EntryId>,
76                            created_at as Option<chrono::DateTime<chrono::Utc>>,
77                            account_set_id as AccountSetId,
78                        )
79                            .fetch_n(executor, first)
80                            .await?
81                    },
82                    es_entity::ListDirection::Descending => {
83                        es_entity::es_query!(
84                            entity = Entry,
85                            r#"
86                            SELECT created_at, id
87                            FROM cala_entries
88                            JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
89                            WHERE cala_balance_history.account_id = $4
90                              AND (COALESCE((created_at, id) < ($3, $2), $2 IS NULL))
91                            ORDER BY created_at DESC, id DESC
92                            LIMIT $1"#,
93                            (first + 1) as i64,
94                            id as Option<EntryId>,
95                            created_at as Option<chrono::DateTime<chrono::Utc>>,
96                            account_set_id as AccountSetId,
97                        )
98                            .fetch_n(executor, first)
99                            .await?
100                    },
101                };
102
103        let end_cursor = entities
104            .last()
105            .map(entry_cursor::EntryByCreatedAtCursor::from);
106
107        Ok(es_entity::PaginatedQueryRet {
108            entities,
109            has_next_page,
110            end_cursor,
111        })
112    }
113
114    async fn publish(
115        &self,
116        op: &mut impl es_entity::AtomicOperation,
117        entity: &Entry,
118        new_events: es_entity::LastPersisted<'_, EntryEvent>,
119    ) -> Result<(), sqlx::Error> {
120        self.publisher
121            .publish_entity_events(op, entity, new_events)
122            .await?;
123        Ok(())
124    }
125}