cala_ledger/entry/
repo.rs

1use crate::{
2    outbox::OutboxPublisher,
3    primitives::{AccountId, AccountSetId, DataSourceId, EntryId, JournalId, TransactionId},
4};
5use es_entity::*;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use super::{entity::*, error::*};
10
11#[derive(EsRepo, Debug, Clone)]
12#[es_repo(
13    entity = "Entry",
14    err = "EntryError",
15    columns(
16        account_id(ty = "AccountId", list_for, update(persist = false)),
17        journal_id(ty = "JournalId", list_for, update(persist = false)),
18        transaction_id(ty = "TransactionId", list_for, update(persist = false)),
19        data_source_id(
20            ty = "DataSourceId",
21            create(accessor = "data_source().into()"),
22            update(persist = false),
23        ),
24    ),
25    tbl_prefix = "cala",
26    post_persist_hook = "publish",
27    persist_event_context = false
28)]
29pub(crate) struct EntryRepo {
30    #[allow(dead_code)]
31    pool: PgPool,
32    publisher: OutboxPublisher,
33}
34
35impl EntryRepo {
36    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
37        Self {
38            pool: pool.clone(),
39            publisher: publisher.clone(),
40        }
41    }
42
43    #[instrument(
44        name = "entry.list_for_account_set_id_by_created_at",
45        skip_all,
46        err(level = "warn")
47    )]
48    pub(super) async fn list_for_account_set_id_by_created_at(
49        &self,
50        account_set_id: AccountSetId,
51        query: es_entity::PaginatedQueryArgs<entry_cursor::EntriesByCreatedAtCursor>,
52        direction: es_entity::ListDirection,
53    ) -> Result<
54        es_entity::PaginatedQueryRet<Entry, entry_cursor::EntriesByCreatedAtCursor>,
55        EntryError,
56    > {
57        let es_entity::PaginatedQueryArgs { first, after } = query;
58        let (id, created_at) = if let Some(after) = after {
59            (Some(after.id), Some(after.created_at))
60        } else {
61            (None, None)
62        };
63
64        let executor = &self.pool;
65
66        let (entities, has_next_page) = match direction {
67                    es_entity::ListDirection::Ascending => {
68                        es_entity::es_query!(
69                            entity = Entry,
70                            r#"
71                            SELECT created_at, id
72                            FROM cala_entries
73                            JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
74                            WHERE cala_balance_history.account_id = $4
75                              AND (COALESCE((created_at, id) > ($3, $2), $2 IS NULL))
76                            ORDER BY created_at ASC, id ASC
77                            LIMIT $1"#,
78                            (first + 1) as i64,
79                            id as Option<EntryId>,
80                            created_at as Option<chrono::DateTime<chrono::Utc>>,
81                            account_set_id as AccountSetId,
82                        )
83                            .fetch_n(executor, first)
84                            .await?
85                    },
86                    es_entity::ListDirection::Descending => {
87                        es_entity::es_query!(
88                            entity = Entry,
89                            r#"
90                            SELECT created_at, id
91                            FROM cala_entries
92                            JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
93                            WHERE cala_balance_history.account_id = $4
94                              AND (COALESCE((created_at, id) < ($3, $2), $2 IS NULL))
95                            ORDER BY created_at DESC, id DESC
96                            LIMIT $1"#,
97                            (first + 1) as i64,
98                            id as Option<EntryId>,
99                            created_at as Option<chrono::DateTime<chrono::Utc>>,
100                            account_set_id as AccountSetId,
101                        )
102                            .fetch_n(executor, first)
103                            .await?
104                    },
105                };
106
107        let end_cursor = entities
108            .last()
109            .map(entry_cursor::EntriesByCreatedAtCursor::from);
110
111        Ok(es_entity::PaginatedQueryRet {
112            entities,
113            has_next_page,
114            end_cursor,
115        })
116    }
117
118    #[cfg(feature = "import")]
119    pub(super) async fn import(
120        &self,
121        op: &mut impl es_entity::AtomicOperationWithTime,
122        origin: DataSourceId,
123        entry: &mut Entry,
124    ) -> Result<(), EntryError> {
125        let recorded_at = op.now();
126        sqlx::query!(
127            r#"INSERT INTO cala_entries (data_source_id, id, journal_id, account_id, transaction_id, created_at)
128            VALUES ($1, $2, $3, $4, $5, $6)"#,
129            origin as DataSourceId,
130            entry.values().id as EntryId,
131            entry.values().journal_id as JournalId,
132            entry.values().account_id as AccountId,
133            entry.values().transaction_id as TransactionId,
134            recorded_at,
135        )
136        .execute(op.as_executor())
137        .await?;
138        self.persist_events(op, entry.events_mut()).await?;
139        Ok(())
140    }
141
142    async fn publish(
143        &self,
144        op: &mut impl es_entity::AtomicOperation,
145        entity: &Entry,
146        new_events: es_entity::LastPersisted<'_, EntryEvent>,
147    ) -> Result<(), EntryError> {
148        self.publisher
149            .publish_entity_events(op, entity, new_events)
150            .await?;
151        Ok(())
152    }
153}