cala_ledger/journal/
mod.rs1mod entity;
2pub mod error;
3mod repo;
4
5use es_entity::EsEntity;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17use repo::*;
18
19#[derive(Clone)]
21pub struct Journals {
22 repo: JournalRepo,
23 outbox: Outbox,
24 pool: PgPool,
25}
26
27impl Journals {
28 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
29 Self {
30 repo: JournalRepo::new(pool),
31 outbox,
32 pool: pool.clone(),
33 }
34 }
35
36 #[instrument(name = "cala_ledger.journals.create", skip(self))]
37 pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
38 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
39 let journal = self.create_in_op(&mut op, new_journal).await?;
40 op.commit().await?;
41 Ok(journal)
42 }
43
44 pub async fn create_in_op(
45 &self,
46 db: &mut LedgerOperation<'_>,
47 new_journal: NewJournal,
48 ) -> Result<Journal, JournalError> {
49 let journal = self.repo.create_in_op(db.op(), new_journal).await?;
50 db.accumulate(journal.last_persisted(1).map(|p| &p.event));
51 Ok(journal)
52 }
53
54 #[instrument(name = "cala_ledger.journals.find_all", skip(self), err)]
55 pub async fn find_all<T: From<Journal>>(
56 &self,
57 journal_ids: &[JournalId],
58 ) -> Result<HashMap<JournalId, T>, JournalError> {
59 self.repo.find_all(journal_ids).await
60 }
61
62 #[instrument(name = "cala_ledger.journals.find_by_id", skip(self), err)]
63 pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
64 self.repo.find_by_id(journal_id).await
65 }
66
67 #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
68 pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
69 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
70 self.persist_in_op(&mut op, journal).await?;
71 op.commit().await?;
72 Ok(())
73 }
74
75 pub async fn persist_in_op(
76 &self,
77 db: &mut LedgerOperation<'_>,
78 journal: &mut Journal,
79 ) -> Result<(), JournalError> {
80 let n_events = self.repo.update_in_op(db.op(), journal).await?;
81 db.accumulate(journal.last_persisted(n_events).map(|p| &p.event));
82 Ok(())
83 }
84
85 #[instrument(name = "cala_ledger.journal.find_by_code", skip(self), err)]
86 pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
87 self.repo.find_by_code(Some(code)).await
88 }
89
90 #[cfg(feature = "import")]
91 pub async fn sync_journal_creation(
92 &self,
93 mut db: es_entity::DbOp<'_>,
94 origin: DataSourceId,
95 values: JournalValues,
96 ) -> Result<(), JournalError> {
97 let mut journal = Journal::import(origin, values);
98 self.repo
99 .import_in_op(&mut db, origin, &mut journal)
100 .await?;
101 let recorded_at = db.now();
102 let outbox_events: Vec<_> = journal
103 .last_persisted(1)
104 .map(|p| OutboxEventPayload::from(&p.event))
105 .collect();
106 self.outbox
107 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
108 .await?;
109 Ok(())
110 }
111
112 #[cfg(feature = "import")]
113 pub async fn sync_journal_update(
114 &self,
115 mut db: es_entity::DbOp<'_>,
116 values: JournalValues,
117 fields: Vec<String>,
118 ) -> Result<(), JournalError> {
119 let mut journal = self.repo.find_by_id(values.id).await?;
120 journal.update((values, fields));
121 let n_events = self.repo.update_in_op(&mut db, &mut journal).await?;
122 let recorded_at = db.now();
123 let outbox_events: Vec<_> = journal
124 .last_persisted(n_events)
125 .map(|p| OutboxEventPayload::from(&p.event))
126 .collect();
127 self.outbox
128 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
129 .await?;
130 Ok(())
131 }
132}
133
134impl From<&JournalEvent> for OutboxEventPayload {
135 fn from(event: &JournalEvent) -> Self {
136 match event {
137 #[cfg(feature = "import")]
138 JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
139 source: *source,
140 journal: values.clone(),
141 },
142 JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
143 source: DataSource::Local,
144 journal: values.clone(),
145 },
146 JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
147 source: DataSource::Local,
148 journal: values.clone(),
149 fields: fields.clone(),
150 },
151 }
152 }
153}