1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use std::collections::HashMap;
7
8#[cfg(feature = "import")]
9use crate::primitives::DataSourceId;
10use crate::{
11 ledger_operation::*,
12 outbox::*,
13 primitives::{AccountId, AccountSetId, DataSource, JournalId},
14};
15
16pub use entity::*;
17use error::*;
18pub use repo::entry_cursor::EntriesByCreatedAtCursor;
19use repo::*;
20
21#[derive(Clone)]
22pub struct Entries {
23 repo: EntryRepo,
24 outbox: Outbox,
25 _pool: PgPool,
26}
27
28impl Entries {
29 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
30 Self {
31 repo: EntryRepo::new(pool),
32 outbox,
33 _pool: pool.clone(),
34 }
35 }
36
37 pub async fn find_all(
38 &self,
39 entry_ids: &[EntryId],
40 ) -> Result<HashMap<EntryId, Entry>, EntryError> {
41 self.repo.find_all(entry_ids).await
42 }
43
44 pub async fn list_for_account_id(
45 &self,
46 account_id: AccountId,
47 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
48 direction: es_entity::ListDirection,
49 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
50 self.repo
51 .list_for_account_id_by_created_at(account_id, query, direction)
52 .await
53 }
54
55 pub async fn list_for_account_set_id(
56 &self,
57 account_id: AccountSetId,
58 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
59 direction: es_entity::ListDirection,
60 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
61 self.repo
62 .list_for_account_set_id_by_created_at(account_id, query, direction)
63 .await
64 }
65
66 pub async fn list_for_journal_id(
67 &self,
68 journal_id: JournalId,
69 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
70 direction: es_entity::ListDirection,
71 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
72 self.repo
73 .list_for_journal_id_by_created_at(journal_id, query, direction)
74 .await
75 }
76
77 pub(crate) async fn create_all_in_op(
78 &self,
79 db: &mut LedgerOperation<'_>,
80 entries: Vec<NewEntry>,
81 ) -> Result<Vec<EntryValues>, EntryError> {
82 let entries = self.repo.create_all_in_op(db.op(), entries).await?;
83 db.accumulate(
84 entries
85 .iter()
86 .map(|entry| OutboxEventPayload::EntryCreated {
87 source: DataSource::Local,
88 entry: entry.values().clone(),
89 }),
90 );
91 Ok(entries
92 .into_iter()
93 .map(|entry| entry.into_values())
94 .collect())
95 }
96
97 #[cfg(feature = "import")]
98 pub(crate) async fn sync_entry_creation(
99 &self,
100 mut db: es_entity::DbOp<'_>,
101 origin: DataSourceId,
102 values: EntryValues,
103 ) -> Result<(), EntryError> {
104 let mut entry = Entry::import(origin, values);
105 self.repo.import(&mut db, origin, &mut entry).await?;
106 let recorded_at = db.now();
107 let outbox_events: Vec<_> = entry
108 .events
109 .last_persisted(1)
110 .map(|p| OutboxEventPayload::from(&p.event))
111 .collect();
112 self.outbox
113 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
114 .await?;
115 Ok(())
116 }
117}
118
119impl From<&EntryEvent> for OutboxEventPayload {
120 fn from(event: &EntryEvent) -> Self {
121 match event {
122 #[cfg(feature = "import")]
123 EntryEvent::Imported {
124 source,
125 values: entry,
126 } => OutboxEventPayload::EntryCreated {
127 source: *source,
128 entry: entry.clone(),
129 },
130 EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
131 source: DataSource::Local,
132 entry: entry.clone(),
133 },
134 }
135 }
136}