cala_ledger/journal/
mod.rs1mod entity;
2pub mod error;
3mod repo;
4
5use es_entity::clock::ClockHandle;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17use repo::*;
18
19#[derive(Clone)]
21pub struct Journals {
22 repo: JournalRepo,
23 clock: ClockHandle,
24}
25
26impl Journals {
27 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher, clock: &ClockHandle) -> Self {
28 Self {
29 repo: JournalRepo::new(pool, publisher),
30 clock: clock.clone(),
31 }
32 }
33
34 #[instrument(name = "cala_ledger.journals.create", skip(self))]
35 pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
36 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
37 let journal = self.create_in_op(&mut op, new_journal).await?;
38 op.commit().await?;
39 Ok(journal)
40 }
41
42 pub async fn create_in_op(
43 &self,
44 db: &mut impl es_entity::AtomicOperation,
45 new_journal: NewJournal,
46 ) -> Result<Journal, JournalError> {
47 let journal = self.repo.create_in_op(db, new_journal).await?;
48 Ok(journal)
49 }
50
51 #[instrument(name = "cala_ledger.journals.find_all", skip(self))]
52 pub async fn find_all<T: From<Journal>>(
53 &self,
54 journal_ids: &[JournalId],
55 ) -> Result<HashMap<JournalId, T>, JournalError> {
56 self.repo.find_all(journal_ids).await
57 }
58
59 #[instrument(name = "cala_ledger.journals.find_by_id", skip(self))]
60 pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
61 self.repo.find_by_id(journal_id).await
62 }
63
64 #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
65 pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
66 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
67 self.persist_in_op(&mut op, journal).await?;
68 op.commit().await?;
69 Ok(())
70 }
71
72 #[instrument(name = "cala_ledger.journals.persist_in_op", skip_all)]
73 pub async fn persist_in_op(
74 &self,
75 db: &mut impl es_entity::AtomicOperation,
76 journal: &mut Journal,
77 ) -> Result<(), JournalError> {
78 self.repo.update_in_op(db, journal).await?;
79 Ok(())
80 }
81
82 #[instrument(name = "cala_ledger.journal.find_by_code", skip(self))]
83 pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
84 self.repo.find_by_code(Some(code)).await
85 }
86
87 #[cfg(feature = "import")]
88 #[instrument(name = "cala_ledger.journals.sync_journal_creation", skip(self, db))]
89 pub async fn sync_journal_creation(
90 &self,
91 mut db: es_entity::DbOpWithTime<'_>,
92 origin: DataSourceId,
93 values: JournalValues,
94 ) -> Result<(), JournalError> {
95 let mut journal = Journal::import(origin, values);
96 self.repo
97 .import_in_op(&mut db, origin, &mut journal)
98 .await?;
99 db.commit().await?;
100 Ok(())
101 }
102
103 #[cfg(feature = "import")]
104 #[instrument(name = "cala_ledger.journals.sync_journal_update", skip(self, db))]
105 pub async fn sync_journal_update(
106 &self,
107 mut db: es_entity::DbOpWithTime<'_>,
108 values: JournalValues,
109 fields: Vec<String>,
110 ) -> Result<(), JournalError> {
111 let mut journal = self.repo.find_by_id_in_op(&mut db, values.id).await?;
112 let _ = journal.update((values, fields));
113 self.repo.update_in_op(&mut db, &mut journal).await?;
114 db.commit().await?;
115 Ok(())
116 }
117}
118
119impl From<&JournalEvent> for OutboxEventPayload {
120 fn from(event: &JournalEvent) -> Self {
121 let source = es_entity::context::EventContext::current()
122 .data()
123 .lookup("data_source")
124 .ok()
125 .flatten()
126 .unwrap_or(DataSource::Local);
127
128 match event {
129 #[cfg(feature = "import")]
130 JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
131 source: *source,
132 journal: values.clone(),
133 },
134 JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
135 source,
136 journal: values.clone(),
137 },
138 JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
139 source,
140 journal: values.clone(),
141 fields: fields.clone(),
142 },
143 }
144 }
145}