cala_ledger/entry/
repo.rs1use crate::{
2 outbox::OutboxPublisher,
3 primitives::{AccountId, AccountSetId, EntryId, JournalId, TransactionId},
4};
5use es_entity::*;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use super::{entity::*, error::*};
10
11#[derive(EsRepo, Debug, Clone)]
12#[es_repo(
13 entity = "Entry",
14 columns(
15 account_id(ty = "AccountId", list_for(by(created_at)), update(persist = false)),
16 journal_id(ty = "JournalId", list_for(by(created_at)), update(persist = false)),
17 transaction_id(
18 ty = "TransactionId",
19 list_for(by(created_at)),
20 update(persist = false)
21 ),
22 ),
23 tbl_prefix = "cala",
24 post_persist_hook = "publish",
25 persist_event_context = false
26)]
27pub(crate) struct EntryRepo {
28 #[allow(dead_code)]
29 pool: PgPool,
30 publisher: OutboxPublisher,
31}
32
33impl EntryRepo {
34 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
35 Self {
36 pool: pool.clone(),
37 publisher: publisher.clone(),
38 }
39 }
40
41 #[instrument(
42 name = "entry.list_for_account_set_id_by_created_at",
43 skip_all,
44 err(level = "warn")
45 )]
46 pub(super) async fn list_for_account_set_id_by_created_at(
47 &self,
48 account_set_id: AccountSetId,
49 query: es_entity::PaginatedQueryArgs<entry_cursor::EntryByCreatedAtCursor>,
50 direction: es_entity::ListDirection,
51 ) -> Result<es_entity::PaginatedQueryRet<Entry, entry_cursor::EntryByCreatedAtCursor>, EntryError>
52 {
53 let es_entity::PaginatedQueryArgs { first, after } = query;
54 let (id, created_at) = if let Some(after) = after {
55 (Some(after.id), Some(after.created_at))
56 } else {
57 (None, None)
58 };
59
60 let executor = &self.pool;
61
62 let (entities, has_next_page) = match direction {
63 es_entity::ListDirection::Ascending => {
64 es_entity::es_query!(
65 entity = Entry,
66 r#"
67 SELECT created_at, id
68 FROM cala_entries
69 JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
70 WHERE cala_balance_history.account_id = $4
71 AND (COALESCE((created_at, id) > ($3, $2), $2 IS NULL))
72 ORDER BY created_at ASC, id ASC
73 LIMIT $1"#,
74 (first + 1) as i64,
75 id as Option<EntryId>,
76 created_at as Option<chrono::DateTime<chrono::Utc>>,
77 account_set_id as AccountSetId,
78 )
79 .fetch_n(executor, first)
80 .await?
81 },
82 es_entity::ListDirection::Descending => {
83 es_entity::es_query!(
84 entity = Entry,
85 r#"
86 SELECT created_at, id
87 FROM cala_entries
88 JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
89 WHERE cala_balance_history.account_id = $4
90 AND (COALESCE((created_at, id) < ($3, $2), $2 IS NULL))
91 ORDER BY created_at DESC, id DESC
92 LIMIT $1"#,
93 (first + 1) as i64,
94 id as Option<EntryId>,
95 created_at as Option<chrono::DateTime<chrono::Utc>>,
96 account_set_id as AccountSetId,
97 )
98 .fetch_n(executor, first)
99 .await?
100 },
101 };
102
103 let end_cursor = entities
104 .last()
105 .map(entry_cursor::EntryByCreatedAtCursor::from);
106
107 Ok(es_entity::PaginatedQueryRet {
108 entities,
109 has_next_page,
110 end_cursor,
111 })
112 }
113
114 async fn publish(
115 &self,
116 op: &mut impl es_entity::AtomicOperation,
117 entity: &Entry,
118 new_events: es_entity::LastPersisted<'_, EntryEvent>,
119 ) -> Result<(), sqlx::Error> {
120 self.publisher
121 .publish_entity_events(op, entity, new_events)
122 .await?;
123 Ok(())
124 }
125}