cala_ledger/journal/
mod.rs1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use tracing::instrument;
7
8use std::collections::HashMap;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
13
14pub use entity::*;
15use error::*;
16use repo::*;
17
18#[derive(Clone)]
20pub struct Journals {
21 repo: JournalRepo,
22 outbox: Outbox,
23 pool: PgPool,
24}
25
26impl Journals {
27 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
28 Self {
29 repo: JournalRepo::new(pool),
30 outbox,
31 pool: pool.clone(),
32 }
33 }
34
35 #[instrument(name = "cala_ledger.journals.create", skip(self))]
36 pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
37 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
38 let journal = self.create_in_op(&mut op, new_journal).await?;
39 op.commit().await?;
40 Ok(journal)
41 }
42
43 pub async fn create_in_op(
44 &self,
45 db: &mut LedgerOperation<'_>,
46 new_journal: NewJournal,
47 ) -> Result<Journal, JournalError> {
48 let journal = self.repo.create_in_op(db.op(), new_journal).await?;
49 db.accumulate(journal.events.last_persisted(1).map(|p| &p.event));
50 Ok(journal)
51 }
52
53 #[instrument(name = "cala_ledger.journals.find_all", skip(self), err)]
54 pub async fn find_all<T: From<Journal>>(
55 &self,
56 journal_ids: &[JournalId],
57 ) -> Result<HashMap<JournalId, T>, JournalError> {
58 self.repo.find_all(journal_ids).await
59 }
60
61 #[instrument(name = "cala_ledger.journals.find_by_id", skip(self), err)]
62 pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
63 self.repo.find_by_id(journal_id).await
64 }
65
66 #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
67 pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
68 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
69 self.persist_in_op(&mut op, journal).await?;
70 op.commit().await?;
71 Ok(())
72 }
73
74 pub async fn persist_in_op(
75 &self,
76 db: &mut LedgerOperation<'_>,
77 journal: &mut Journal,
78 ) -> Result<(), JournalError> {
79 let n_events = self.repo.update_in_op(db.op(), journal).await?;
80 db.accumulate(journal.events.last_persisted(n_events).map(|p| &p.event));
81 Ok(())
82 }
83
84 #[instrument(name = "cala_ledger.journal.find_by_code", skip(self), err)]
85 pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
86 self.repo.find_by_code(Some(code)).await
87 }
88
89 #[cfg(feature = "import")]
90 pub async fn sync_journal_creation(
91 &self,
92 mut db: es_entity::DbOp<'_>,
93 origin: DataSourceId,
94 values: JournalValues,
95 ) -> Result<(), JournalError> {
96 let mut journal = Journal::import(origin, values);
97 self.repo
98 .import_in_op(&mut db, origin, &mut journal)
99 .await?;
100 let recorded_at = db.now();
101 let outbox_events: Vec<_> = journal
102 .events
103 .last_persisted(1)
104 .map(|p| OutboxEventPayload::from(&p.event))
105 .collect();
106 self.outbox
107 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
108 .await?;
109 Ok(())
110 }
111
112 #[cfg(feature = "import")]
113 pub async fn sync_journal_update(
114 &self,
115 mut db: es_entity::DbOp<'_>,
116 values: JournalValues,
117 fields: Vec<String>,
118 ) -> Result<(), JournalError> {
119 let mut journal = self.repo.find_by_id(values.id).await?;
120 journal.update((values, fields));
121 let n_events = self.repo.update_in_op(&mut db, &mut journal).await?;
122 let recorded_at = db.now();
123 let outbox_events: Vec<_> = journal
124 .events
125 .last_persisted(n_events)
126 .map(|p| OutboxEventPayload::from(&p.event))
127 .collect();
128 self.outbox
129 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
130 .await?;
131 Ok(())
132 }
133}
134
135impl From<&JournalEvent> for OutboxEventPayload {
136 fn from(event: &JournalEvent) -> Self {
137 match event {
138 #[cfg(feature = "import")]
139 JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
140 source: *source,
141 journal: values.clone(),
142 },
143 JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
144 source: DataSource::Local,
145 journal: values.clone(),
146 },
147 JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
148 source: DataSource::Local,
149 journal: values.clone(),
150 fields: fields.clone(),
151 },
152 }
153 }
154}