cala_ledger/journal/
mod.rs1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use tracing::instrument;
7
8use std::collections::HashMap;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{outbox::*, primitives::DataSource};
13
14pub use entity::*;
15use error::*;
16use repo::*;
17
18#[derive(Clone)]
20pub struct Journals {
21 repo: JournalRepo,
22}
23
24impl Journals {
25 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
26 Self {
27 repo: JournalRepo::new(pool, publisher),
28 }
29 }
30
31 #[instrument(name = "cala_ledger.journals.create", skip(self))]
32 pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
33 let mut op = self.repo.begin_op().await?;
34 let journal = self.create_in_op(&mut op, new_journal).await?;
35 op.commit().await?;
36 Ok(journal)
37 }
38
39 pub async fn create_in_op(
40 &self,
41 db: &mut impl es_entity::AtomicOperation,
42 new_journal: NewJournal,
43 ) -> Result<Journal, JournalError> {
44 let journal = self.repo.create_in_op(db, new_journal).await?;
45 Ok(journal)
46 }
47
48 #[instrument(name = "cala_ledger.journals.find_all", skip(self))]
49 pub async fn find_all<T: From<Journal>>(
50 &self,
51 journal_ids: &[JournalId],
52 ) -> Result<HashMap<JournalId, T>, JournalError> {
53 self.repo.find_all(journal_ids).await
54 }
55
56 #[instrument(name = "cala_ledger.journals.find_by_id", skip(self))]
57 pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
58 self.repo.find_by_id(journal_id).await
59 }
60
61 #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
62 pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
63 let mut op = self.repo.begin_op().await?;
64 self.persist_in_op(&mut op, journal).await?;
65 op.commit().await?;
66 Ok(())
67 }
68
69 #[instrument(name = "cala_ledger.journals.persist_in_op", skip_all)]
70 pub async fn persist_in_op(
71 &self,
72 db: &mut impl es_entity::AtomicOperation,
73 journal: &mut Journal,
74 ) -> Result<(), JournalError> {
75 self.repo.update_in_op(db, journal).await?;
76 Ok(())
77 }
78
79 #[instrument(name = "cala_ledger.journal.find_by_code", skip(self))]
80 pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
81 self.repo.find_by_code(Some(code)).await
82 }
83
84 #[cfg(feature = "import")]
85 #[instrument(name = "cala_ledger.journals.sync_journal_creation", skip(self, db))]
86 pub async fn sync_journal_creation(
87 &self,
88 mut db: es_entity::DbOpWithTime<'_>,
89 origin: DataSourceId,
90 values: JournalValues,
91 ) -> Result<(), JournalError> {
92 let mut journal = Journal::import(origin, values);
93 self.repo
94 .import_in_op(&mut db, origin, &mut journal)
95 .await?;
96 db.commit().await?;
97 Ok(())
98 }
99
100 #[cfg(feature = "import")]
101 #[instrument(name = "cala_ledger.journals.sync_journal_update", skip(self, db))]
102 pub async fn sync_journal_update(
103 &self,
104 mut db: es_entity::DbOpWithTime<'_>,
105 values: JournalValues,
106 fields: Vec<String>,
107 ) -> Result<(), JournalError> {
108 let mut journal = self.repo.find_by_id_in_op(&mut db, values.id).await?;
109 let _ = journal.update((values, fields));
110 self.repo.update_in_op(&mut db, &mut journal).await?;
111 db.commit().await?;
112 Ok(())
113 }
114}
115
116impl From<&JournalEvent> for OutboxEventPayload {
117 fn from(event: &JournalEvent) -> Self {
118 let source = es_entity::context::EventContext::current()
119 .data()
120 .lookup("data_source")
121 .ok()
122 .flatten()
123 .unwrap_or(DataSource::Local);
124
125 match event {
126 #[cfg(feature = "import")]
127 JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
128 source: *source,
129 journal: values.clone(),
130 },
131 JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
132 source,
133 journal: values.clone(),
134 },
135 JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
136 source,
137 journal: values.clone(),
138 fields: fields.clone(),
139 },
140 }
141 }
142}