1mod entity;
2pub mod error;
3mod repo;
4
5use es_entity::EsEntity;
6use sqlx::PgPool;
7use std::collections::HashMap;
8
9#[cfg(feature = "import")]
10use crate::primitives::DataSourceId;
11use crate::{
12 ledger_operation::*,
13 outbox::*,
14 primitives::{AccountId, AccountSetId, DataSource, JournalId, TransactionId},
15};
16
17pub use entity::*;
18use error::*;
19pub use repo::entry_cursor::EntriesByCreatedAtCursor;
20use repo::*;
21
22#[derive(Clone)]
23pub struct Entries {
24 repo: EntryRepo,
25 outbox: Outbox,
26 _pool: PgPool,
27}
28
29impl Entries {
30 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31 Self {
32 repo: EntryRepo::new(pool),
33 outbox,
34 _pool: pool.clone(),
35 }
36 }
37
38 pub async fn find_all(
39 &self,
40 entry_ids: &[EntryId],
41 ) -> Result<HashMap<EntryId, Entry>, EntryError> {
42 self.repo.find_all(entry_ids).await
43 }
44
45 pub async fn list_for_account_id(
46 &self,
47 account_id: AccountId,
48 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
49 direction: es_entity::ListDirection,
50 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
51 self.repo
52 .list_for_account_id_by_created_at(account_id, query, direction)
53 .await
54 }
55
56 pub async fn list_for_account_set_id(
57 &self,
58 account_id: AccountSetId,
59 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
60 direction: es_entity::ListDirection,
61 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
62 self.repo
63 .list_for_account_set_id_by_created_at(account_id, query, direction)
64 .await
65 }
66
67 pub async fn list_for_journal_id(
68 &self,
69 journal_id: JournalId,
70 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
71 direction: es_entity::ListDirection,
72 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
73 self.repo
74 .list_for_journal_id_by_created_at(journal_id, query, direction)
75 .await
76 }
77
78 pub async fn list_for_transaction_id(
79 &self,
80 transaction_id: TransactionId,
81 ) -> Result<Vec<Entry>, EntryError> {
82 let mut entries = self
83 .repo
84 .list_for_transaction_id_by_created_at(
85 transaction_id,
86 Default::default(),
87 Default::default(),
88 )
89 .await?
90 .entities;
91 entries.sort_by(|a, b| {
92 let a_sequence = a.values().sequence;
93 let b_sequence = b.values().sequence;
94 a_sequence.cmp(&b_sequence)
95 });
96 Ok(entries)
97 }
98
99 pub(crate) async fn create_all_in_op(
100 &self,
101 db: &mut LedgerOperation<'_>,
102 entries: Vec<NewEntry>,
103 ) -> Result<Vec<EntryValues>, EntryError> {
104 let entries = self.repo.create_all_in_op(db.op(), entries).await?;
105 db.accumulate(
106 entries
107 .iter()
108 .map(|entry| OutboxEventPayload::EntryCreated {
109 source: DataSource::Local,
110 entry: entry.values().clone(),
111 }),
112 );
113 Ok(entries
114 .into_iter()
115 .map(|entry| entry.into_values())
116 .collect())
117 }
118
119 #[cfg(feature = "import")]
120 pub(crate) async fn sync_entry_creation(
121 &self,
122 mut db: es_entity::DbOp<'_>,
123 origin: DataSourceId,
124 values: EntryValues,
125 ) -> Result<(), EntryError> {
126 let mut entry = Entry::import(origin, values);
127 self.repo.import(&mut db, origin, &mut entry).await?;
128 let recorded_at = db.now();
129 let outbox_events: Vec<_> = entry
130 .last_persisted(1)
131 .map(|p| OutboxEventPayload::from(&p.event))
132 .collect();
133 self.outbox
134 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
135 .await?;
136 Ok(())
137 }
138}
139
140impl From<&EntryEvent> for OutboxEventPayload {
141 fn from(event: &EntryEvent) -> Self {
142 match event {
143 #[cfg(feature = "import")]
144 EntryEvent::Imported {
145 source,
146 values: entry,
147 } => OutboxEventPayload::EntryCreated {
148 source: *source,
149 entry: entry.clone(),
150 },
151 EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
152 source: DataSource::Local,
153 entry: entry.clone(),
154 },
155 }
156 }
157}