1use crate::primitives::{AccountId, AccountSetId, DataSourceId, EntryId, JournalId, TransactionId};
2use es_entity::*;
3use sqlx::PgPool;
4
5use super::{entity::*, error::*};
6
7#[derive(EsRepo, Debug, Clone)]
8#[es_repo(
9 entity = "Entry",
10 err = "EntryError",
11 columns(
12 account_id(ty = "AccountId", list_for, update(persist = false)),
13 journal_id(ty = "JournalId", list_for, update(persist = false)),
14 transaction_id(ty = "TransactionId", list_for, update(persist = false)),
15 data_source_id(
16 ty = "DataSourceId",
17 create(accessor = "data_source().into()"),
18 update(persist = false),
19 ),
20 ),
21 tbl_prefix = "cala"
22)]
23pub(crate) struct EntryRepo {
24 #[allow(dead_code)]
25 pool: PgPool,
26}
27
28impl EntryRepo {
29 pub(crate) fn new(pool: &PgPool) -> Self {
30 Self { pool: pool.clone() }
31 }
32
33 pub(super) async fn list_for_account_set_id_by_created_at(
34 &self,
35 account_set_id: AccountSetId,
36 query: es_entity::PaginatedQueryArgs<entry_cursor::EntriesByCreatedAtCursor>,
37 direction: es_entity::ListDirection,
38 ) -> Result<
39 es_entity::PaginatedQueryRet<Entry, entry_cursor::EntriesByCreatedAtCursor>,
40 EntryError,
41 > {
42 let es_entity::PaginatedQueryArgs { first, after } = query;
43 let (id, created_at) = if let Some(after) = after {
44 (Some(after.id), Some(after.created_at))
45 } else {
46 (None, None)
47 };
48
49 let executor = &self.pool;
50
51 let (entities, has_next_page) = match direction {
52 es_entity::ListDirection::Ascending => {
53 es_entity::es_query!(
54 entity_ty = Entry,
55 id_ty = EntryId,
56 "cala",
57 executor,
58 r#"
59 SELECT created_at, id
60 FROM cala_entries
61 JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
62 WHERE cala_balance_history.account_id = $4
63 AND (COALESCE((created_at, id) > ($3, $2), $2 IS NULL))
64 ORDER BY created_at ASC, id ASC
65 LIMIT $1"#,
66 (first + 1) as i64,
67 id as Option<EntryId>,
68 created_at as Option<chrono::DateTime<chrono::Utc>>,
69 account_set_id as AccountSetId,
70 )
71 .fetch_n(first)
72 .await?
73 },
74 es_entity::ListDirection::Descending => {
75 es_entity::es_query!(
76 entity_ty = Entry,
77 id_ty = EntryId,
78 "cala",
79 executor,
80 r#"
81 SELECT created_at, id
82 FROM cala_entries
83 JOIN cala_balance_history ON cala_entries.id = cala_balance_history.latest_entry_id
84 WHERE cala_balance_history.account_id = $4
85 AND (COALESCE((created_at, id) < ($3, $2), $2 IS NULL))
86 ORDER BY created_at DESC, id DESC
87 LIMIT $1"#,
88 (first + 1) as i64,
89 id as Option<EntryId>,
90 created_at as Option<chrono::DateTime<chrono::Utc>>,
91 account_set_id as AccountSetId,
92 )
93 .fetch_n(first)
94 .await?
95 },
96 };
97
98 let end_cursor = entities
99 .last()
100 .map(entry_cursor::EntriesByCreatedAtCursor::from);
101
102 Ok(es_entity::PaginatedQueryRet {
103 entities,
104 has_next_page,
105 end_cursor,
106 })
107 }
108
109 #[cfg(feature = "import")]
110 pub(super) async fn import(
111 &self,
112 op: &mut DbOp<'_>,
113 origin: DataSourceId,
114 entry: &mut Entry,
115 ) -> Result<(), EntryError> {
116 let recorded_at = op.now();
117 sqlx::query!(
118 r#"INSERT INTO cala_entries (data_source_id, id, journal_id, account_id, created_at)
119 VALUES ($1, $2, $3, $4, $5)"#,
120 origin as DataSourceId,
121 entry.values().id as EntryId,
122 entry.values().journal_id as JournalId,
123 entry.values().account_id as AccountId,
124 recorded_at,
125 )
126 .execute(&mut **op.tx())
127 .await?;
128 self.persist_events(op, entry.events_mut()).await?;
129 Ok(())
130 }
131}