1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6
7#[cfg(feature = "import")]
8use crate::primitives::DataSourceId;
9use crate::{
10 ledger_operation::*,
11 outbox::*,
12 primitives::{AccountId, AccountSetId, DataSource, JournalId},
13};
14
15pub use entity::*;
16use error::*;
17pub use repo::entry_cursor::EntriesByCreatedAtCursor;
18use repo::*;
19
20#[derive(Clone)]
21pub struct Entries {
22 repo: EntryRepo,
23 outbox: Outbox,
24 _pool: PgPool,
25}
26
27impl Entries {
28 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
29 Self {
30 repo: EntryRepo::new(pool),
31 outbox,
32 _pool: pool.clone(),
33 }
34 }
35
36 pub async fn list_for_account_id(
37 &self,
38 account_id: AccountId,
39 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
40 direction: es_entity::ListDirection,
41 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
42 self.repo
43 .list_for_account_id_by_created_at(account_id, query, direction)
44 .await
45 }
46
47 pub async fn list_for_account_set_id(
48 &self,
49 account_id: AccountSetId,
50 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
51 direction: es_entity::ListDirection,
52 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
53 self.repo
54 .list_for_account_set_id_by_created_at(account_id, query, direction)
55 .await
56 }
57
58 pub async fn list_for_journal_id(
59 &self,
60 journal_id: JournalId,
61 query: es_entity::PaginatedQueryArgs<EntriesByCreatedAtCursor>,
62 direction: es_entity::ListDirection,
63 ) -> Result<es_entity::PaginatedQueryRet<Entry, EntriesByCreatedAtCursor>, EntryError> {
64 self.repo
65 .list_for_journal_id_by_created_at(journal_id, query, direction)
66 .await
67 }
68
69 pub(crate) async fn create_all_in_op(
70 &self,
71 db: &mut LedgerOperation<'_>,
72 entries: Vec<NewEntry>,
73 ) -> Result<Vec<EntryValues>, EntryError> {
74 let entries = self.repo.create_all_in_op(db.op(), entries).await?;
75 db.accumulate(
76 entries
77 .iter()
78 .map(|entry| OutboxEventPayload::EntryCreated {
79 source: DataSource::Local,
80 entry: entry.values().clone(),
81 }),
82 );
83 Ok(entries
84 .into_iter()
85 .map(|entry| entry.into_values())
86 .collect())
87 }
88
89 #[cfg(feature = "import")]
90 pub(crate) async fn sync_entry_creation(
91 &self,
92 mut db: es_entity::DbOp<'_>,
93 origin: DataSourceId,
94 values: EntryValues,
95 ) -> Result<(), EntryError> {
96 let mut entry = Entry::import(origin, values);
97 self.repo.import(&mut db, origin, &mut entry).await?;
98 let recorded_at = db.now();
99 let outbox_events: Vec<_> = entry
100 .events
101 .last_persisted(1)
102 .map(|p| OutboxEventPayload::from(&p.event))
103 .collect();
104 self.outbox
105 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
106 .await?;
107 Ok(())
108 }
109}
110
111impl From<&EntryEvent> for OutboxEventPayload {
112 fn from(event: &EntryEvent) -> Self {
113 match event {
114 #[cfg(feature = "import")]
115 EntryEvent::Imported {
116 source,
117 values: entry,
118 } => OutboxEventPayload::EntryCreated {
119 source: *source,
120 entry: entry.clone(),
121 },
122 EntryEvent::Initialized { values: entry } => OutboxEventPayload::EntryCreated {
123 source: DataSource::Local,
124 entry: entry.clone(),
125 },
126 }
127 }
128}