cala_ledger/journal/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use es_entity::clock::ClockHandle;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17use repo::*;
18
19/// Service for working with `Journal` entities.
20#[derive(Clone)]
21pub struct Journals {
22    repo: JournalRepo,
23    clock: ClockHandle,
24}
25
26impl Journals {
27    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher, clock: &ClockHandle) -> Self {
28        Self {
29            repo: JournalRepo::new(pool, publisher),
30            clock: clock.clone(),
31        }
32    }
33
34    #[instrument(name = "cala_ledger.journals.create", skip(self))]
35    pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
36        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
37        let journal = self.create_in_op(&mut op, new_journal).await?;
38        op.commit().await?;
39        Ok(journal)
40    }
41
42    pub async fn create_in_op(
43        &self,
44        db: &mut impl es_entity::AtomicOperation,
45        new_journal: NewJournal,
46    ) -> Result<Journal, JournalError> {
47        let journal = self.repo.create_in_op(db, new_journal).await?;
48        Ok(journal)
49    }
50
51    #[instrument(name = "cala_ledger.journals.find_all", skip(self))]
52    pub async fn find_all<T: From<Journal>>(
53        &self,
54        journal_ids: &[JournalId],
55    ) -> Result<HashMap<JournalId, T>, JournalError> {
56        self.repo.find_all(journal_ids).await
57    }
58
59    #[instrument(name = "cala_ledger.journals.find_by_id", skip(self))]
60    pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
61        self.repo.find_by_id(journal_id).await
62    }
63
64    #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
65    pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
66        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
67        self.persist_in_op(&mut op, journal).await?;
68        op.commit().await?;
69        Ok(())
70    }
71
72    #[instrument(name = "cala_ledger.journals.persist_in_op", skip_all)]
73    pub async fn persist_in_op(
74        &self,
75        db: &mut impl es_entity::AtomicOperation,
76        journal: &mut Journal,
77    ) -> Result<(), JournalError> {
78        self.repo.update_in_op(db, journal).await?;
79        Ok(())
80    }
81
82    #[instrument(name = "cala_ledger.journal.find_by_code", skip(self))]
83    pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
84        self.repo.find_by_code(Some(code)).await
85    }
86
87    #[cfg(feature = "import")]
88    #[instrument(name = "cala_ledger.journals.sync_journal_creation", skip(self, db))]
89    pub async fn sync_journal_creation(
90        &self,
91        mut db: es_entity::DbOpWithTime<'_>,
92        origin: DataSourceId,
93        values: JournalValues,
94    ) -> Result<(), JournalError> {
95        let mut journal = Journal::import(origin, values);
96        self.repo
97            .import_in_op(&mut db, origin, &mut journal)
98            .await?;
99        db.commit().await?;
100        Ok(())
101    }
102
103    #[cfg(feature = "import")]
104    #[instrument(name = "cala_ledger.journals.sync_journal_update", skip(self, db))]
105    pub async fn sync_journal_update(
106        &self,
107        mut db: es_entity::DbOpWithTime<'_>,
108        values: JournalValues,
109        fields: Vec<String>,
110    ) -> Result<(), JournalError> {
111        let mut journal = self.repo.find_by_id_in_op(&mut db, values.id).await?;
112        let _ = journal.update((values, fields));
113        self.repo.update_in_op(&mut db, &mut journal).await?;
114        db.commit().await?;
115        Ok(())
116    }
117}
118
119impl From<&JournalEvent> for OutboxEventPayload {
120    fn from(event: &JournalEvent) -> Self {
121        let source = es_entity::context::EventContext::current()
122            .data()
123            .lookup("data_source")
124            .ok()
125            .flatten()
126            .unwrap_or(DataSource::Local);
127
128        match event {
129            #[cfg(feature = "import")]
130            JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
131                source: *source,
132                journal: values.clone(),
133            },
134            JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
135                source,
136                journal: values.clone(),
137            },
138            JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
139                source,
140                journal: values.clone(),
141                fields: fields.clone(),
142            },
143        }
144    }
145}