1use crate::{
2 outbox::OutboxPublisher,
3 primitives::{AccountId, AccountSetId, DataSourceId, EntryId, JournalId, TransactionId},
4};
5use es_entity::*;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use super::{entity::*, error::*};
10
11#[derive(EsRepo, Debug, Clone)]
12#[es_repo(
13 entity = "Entry",
14 err = "EntryError",
15 columns(
16 account_id(ty = "AccountId", list_for, update(persist = false)),
17 journal_id(ty = "JournalId", list_for, update(persist = false)),
18 transaction_id(ty = "TransactionId", list_for, update(persist = false)),
19 data_source_id(
20 ty = "DataSourceId",
21 create(accessor = "data_source().into()"),
22 update(persist = false),
23 ),
24 ),
25 tbl_prefix = "cala",
26 post_persist_hook = "publish",
27 persist_event_context = false
28)]
29pub(crate) struct EntryRepo {
30 #[allow(dead_code)]
31 pool: PgPool,
32 publisher: OutboxPublisher,
33}
34
35impl EntryRepo {
36 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
37 Self {
38 pool: pool.clone(),
39 publisher: publisher.clone(),
40 }
41 }
42
43 #[instrument(
44 name = "entry.list_for_account_set_id_by_created_at",
45 skip_all,
46 err(level = "warn")
47 )]
48 pub(super) async fn list_for_account_set_id_by_created_at(
49 &self,
50 account_set_id: AccountSetId,
51 query: es_entity::PaginatedQueryArgs<entry_cursor::EntriesByCreatedAtCursor>,
52 direction: es_entity::ListDirection,
53 ) -> Result<
54 es_entity::PaginatedQueryRet<Entry, entry_cursor::EntriesByCreatedAtCursor>,
55 EntryError,
56 > {
57 let es_entity::PaginatedQueryArgs { first, after } = query;
58 let (id, created_at) = if let Some(after) = after {
59 (Some(after.id), Some(after.created_at))
60 } else {
61 (None, None)
62 };
63
64 let executor = &self.pool;
65
66 let (entities, has_next_page) = match direction {
67 es_entity::ListDirection::Ascending => {
68 es_entity::es_query!(
69 entity = Entry,
70 r#"
71 SELECT created_at, id
72 FROM cala_entries
73 JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
74 WHERE cala_balance_history.account_id = $4
75 AND (COALESCE((created_at, id) > ($3, $2), $2 IS NULL))
76 ORDER BY created_at ASC, id ASC
77 LIMIT $1"#,
78 (first + 1) as i64,
79 id as Option<EntryId>,
80 created_at as Option<chrono::DateTime<chrono::Utc>>,
81 account_set_id as AccountSetId,
82 )
83 .fetch_n(executor, first)
84 .await?
85 },
86 es_entity::ListDirection::Descending => {
87 es_entity::es_query!(
88 entity = Entry,
89 r#"
90 SELECT created_at, id
91 FROM cala_entries
92 JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
93 WHERE cala_balance_history.account_id = $4
94 AND (COALESCE((created_at, id) < ($3, $2), $2 IS NULL))
95 ORDER BY created_at DESC, id DESC
96 LIMIT $1"#,
97 (first + 1) as i64,
98 id as Option<EntryId>,
99 created_at as Option<chrono::DateTime<chrono::Utc>>,
100 account_set_id as AccountSetId,
101 )
102 .fetch_n(executor, first)
103 .await?
104 },
105 };
106
107 let end_cursor = entities
108 .last()
109 .map(entry_cursor::EntriesByCreatedAtCursor::from);
110
111 Ok(es_entity::PaginatedQueryRet {
112 entities,
113 has_next_page,
114 end_cursor,
115 })
116 }
117
118 #[cfg(feature = "import")]
119 pub(super) async fn import(
120 &self,
121 op: &mut impl es_entity::AtomicOperationWithTime,
122 origin: DataSourceId,
123 entry: &mut Entry,
124 ) -> Result<(), EntryError> {
125 let recorded_at = op.now();
126 sqlx::query!(
127 r#"INSERT INTO cala_entries (data_source_id, id, journal_id, account_id, transaction_id, created_at)
128 VALUES ($1, $2, $3, $4, $5, $6)"#,
129 origin as DataSourceId,
130 entry.values().id as EntryId,
131 entry.values().journal_id as JournalId,
132 entry.values().account_id as AccountId,
133 entry.values().transaction_id as TransactionId,
134 recorded_at,
135 )
136 .execute(op.as_executor())
137 .await?;
138 self.persist_events(op, entry.events_mut()).await?;
139 Ok(())
140 }
141
142 async fn publish(
143 &self,
144 op: &mut impl es_entity::AtomicOperation,
145 entity: &Entry,
146 new_events: es_entity::LastPersisted<'_, EntryEvent>,
147 ) -> Result<(), EntryError> {
148 self.publisher
149 .publish_entity_events(op, entity, new_events)
150 .await?;
151 Ok(())
152 }
153}