1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7
8#[cfg(feature = "import")]
9use crate::primitives::DataSourceId;
10use crate::{
11 ledger_operation::*,
12 outbox::*,
13 primitives::{AccountId, AccountSetId, DataSource, JournalId, TransactionId},
14};
15
16pub use entity::*;
17use error::*;
18pub use repo::entry_cursor::EntriesByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Entries {
23 repo: EntryRepo,
24 #[allow(dead_code)]
26 outbox: Outbox,
27 _pool: PgPool,
28}
29
30impl Entries {
31 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
32 Self {
33 repo: EntryRepo::new(pool),
34 outbox,
35 _pool: pool.clone(),
36 }
37 }
38
39 pub async fn find_all(
40 &self,
41 entry_ids: &[EntryId],
42 ) -> Result<HashMap<EntryId, Entry>, EntryError> {
43 self.repo.find_all(entry_ids).await
44 }
45
46 pub async fn list_for_account_id(
47 &self,
48 account_id: AccountId,
49 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
50 direction: es_entity::ListDirection,
51 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
52 self.repo
53 .list_for_account_id_by_created_at(account_id, query, direction)
54 .await
55 }
56
57 pub async fn list_for_account_set_id(
58 &self,
59 account_id: AccountSetId,
60 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
61 direction: es_entity::ListDirection,
62 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
63 self.repo
64 .list_for_account_set_id_by_created_at(account_id, query, direction)
65 .await
66 }
67
68 pub async fn list_for_journal_id(
69 &self,
70 journal_id: JournalId,
71 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
72 direction: es_entity::ListDirection,
73 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
74 self.repo
75 .list_for_journal_id_by_created_at(journal_id, query, direction)
76 .await
77 }
78
79 pub async fn list_for_transaction_id(
80 &self,
81 transaction_id: TransactionId,
82 ) -> Result<Vec<Entry>, EntryError> {
83 let mut entries = self
84 .repo
85 .list_for_transaction_id_by_created_at(
86 transaction_id,
87 Default::default(),
88 Default::default(),
89 )
90 .await?
91 .entities;
92 entries.sort_by(|a, b| {
93 let a_sequence = a.values().sequence;
94 let b_sequence = b.values().sequence;
95 a_sequence.cmp(&b_sequence)
96 });
97 Ok(entries)
98 }
99
100 pub async fn new_entries_for_voided_tx(
101 &self,
102 voiding_tx_id: TransactionId,
103 existing_tx_id: TransactionId,
104 ) -> Result<Vec<NewEntry>, EntryError> {
105 let entries = self.list_for_transaction_id(existing_tx_id).await?;
106
107 let new_entries = entries
108 .into_iter()
109 .map(|entry| {
110 let value = entry.into_values();
111
112 let mut builder = NewEntry::builder();
113 builder
114 .id(EntryId::new())
115 .transaction_id(voiding_tx_id)
116 .journal_id(value.journal_id)
117 .sequence(value.sequence)
118 .account_id(value.account_id)
119 .entry_type(format!("{}_VOID", value.entry_type))
120 .layer(value.layer)
121 .currency(value.currency)
122 .units(-value.units)
123 .direction(value.direction);
124
125 if let Some(description) = value.description {
126 builder.description(description);
127 }
128 if let Some(metadata) = value.metadata {
129 builder.metadata(metadata);
130 }
131
132 builder.build().expect("Couldn't build voided entry")
133 })
134 .collect();
135
136 Ok(new_entries)
137 }
138
139 pub(crate) async fn create_all_in_op(
140 &self,
141 db: &mut LedgerOperation<'_>,
142 entries: Vec<NewEntry>,
143 ) -> Result<Vec<EntryValues>, EntryError> {
144 let entries = self.repo.create_all_in_op(db, entries).await?;
145 db.accumulate(
146 entries
147 .iter()
148 .map(|entry| OutboxEventPayload::EntryCreated {
149 source: DataSource::Local,
150 entry: entry.values().clone(),
151 }),
152 );
153 Ok(entries
154 .into_iter()
155 .map(|entry| entry.into_values())
156 .collect())
157 }
158
159 #[cfg(feature = "import")]
160 pub(crate) async fn sync_entry_creation(
161 &self,
162 mut db: es_entity::DbOpWithTime<'_>,
163 origin: DataSourceId,
164 values: EntryValues,
165 ) -> Result<(), EntryError> {
166 use es_entity::EsEntity;
167
168 let mut entry = Entry::import(origin, values);
169 self.repo.import(&mut db, origin, &mut entry).await?;
170 let outbox_events: Vec<_> = entry
171 .last_persisted(1)
172 .map(|p| OutboxEventPayload::from(&p.event))
173 .collect();
174 let time = db.now();
175 self.outbox
176 .persist_events_at(db, outbox_events, time)
177 .await?;
178 Ok(())
179 }
180}
181
182impl From<&EntryEvent> for OutboxEventPayload {
183 fn from(event: &EntryEvent) -> Self {
184 match event {
185 #[cfg(feature = "import")]
186 EntryEvent::Imported {
187 source,
188 values: entry,
189 } => OutboxEventPayload::EntryCreated {
190 source: *source,
191 entry: entry.clone(),
192 },
193 EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
194 source: DataSource::Local,
195 entry: entry.clone(),
196 },
197 }
198 }
199}