cala_ledger/journal/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use tracing::instrument;
7
8use std::collections::HashMap;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{outbox::*, primitives::DataSource};
13
14pub use entity::*;
15use error::*;
16use repo::*;
17
18/// Service for working with `Journal` entities.
19#[derive(Clone)]
20pub struct Journals {
21    repo: JournalRepo,
22}
23
24impl Journals {
25    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
26        Self {
27            repo: JournalRepo::new(pool, publisher),
28        }
29    }
30
31    #[instrument(name = "cala_ledger.journals.create", skip(self))]
32    pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
33        let mut op = self.repo.begin_op().await?;
34        let journal = self.create_in_op(&mut op, new_journal).await?;
35        op.commit().await?;
36        Ok(journal)
37    }
38
39    pub async fn create_in_op(
40        &self,
41        db: &mut impl es_entity::AtomicOperation,
42        new_journal: NewJournal,
43    ) -> Result<Journal, JournalError> {
44        let journal = self.repo.create_in_op(db, new_journal).await?;
45        Ok(journal)
46    }
47
48    #[instrument(name = "cala_ledger.journals.find_all", skip(self))]
49    pub async fn find_all<T: From<Journal>>(
50        &self,
51        journal_ids: &[JournalId],
52    ) -> Result<HashMap<JournalId, T>, JournalError> {
53        self.repo.find_all(journal_ids).await
54    }
55
56    #[instrument(name = "cala_ledger.journals.find_by_id", skip(self))]
57    pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
58        self.repo.find_by_id(journal_id).await
59    }
60
61    #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
62    pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
63        let mut op = self.repo.begin_op().await?;
64        self.persist_in_op(&mut op, journal).await?;
65        op.commit().await?;
66        Ok(())
67    }
68
69    #[instrument(name = "cala_ledger.journals.persist_in_op", skip_all)]
70    pub async fn persist_in_op(
71        &self,
72        db: &mut impl es_entity::AtomicOperation,
73        journal: &mut Journal,
74    ) -> Result<(), JournalError> {
75        self.repo.update_in_op(db, journal).await?;
76        Ok(())
77    }
78
79    #[instrument(name = "cala_ledger.journal.find_by_code", skip(self))]
80    pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
81        self.repo.find_by_code(Some(code)).await
82    }
83
84    #[cfg(feature = "import")]
85    #[instrument(name = "cala_ledger.journals.sync_journal_creation", skip(self, db))]
86    pub async fn sync_journal_creation(
87        &self,
88        mut db: es_entity::DbOpWithTime<'_>,
89        origin: DataSourceId,
90        values: JournalValues,
91    ) -> Result<(), JournalError> {
92        let mut journal = Journal::import(origin, values);
93        self.repo
94            .import_in_op(&mut db, origin, &mut journal)
95            .await?;
96        db.commit().await?;
97        Ok(())
98    }
99
100    #[cfg(feature = "import")]
101    #[instrument(name = "cala_ledger.journals.sync_journal_update", skip(self, db))]
102    pub async fn sync_journal_update(
103        &self,
104        mut db: es_entity::DbOpWithTime<'_>,
105        values: JournalValues,
106        fields: Vec<String>,
107    ) -> Result<(), JournalError> {
108        let mut journal = self.repo.find_by_id_in_op(&mut db, values.id).await?;
109        let _ = journal.update((values, fields));
110        self.repo.update_in_op(&mut db, &mut journal).await?;
111        db.commit().await?;
112        Ok(())
113    }
114}
115
116impl From<&JournalEvent> for OutboxEventPayload {
117    fn from(event: &JournalEvent) -> Self {
118        let source = es_entity::context::EventContext::current()
119            .data()
120            .lookup("data_source")
121            .ok()
122            .flatten()
123            .unwrap_or(DataSource::Local);
124
125        match event {
126            #[cfg(feature = "import")]
127            JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
128                source: *source,
129                journal: values.clone(),
130            },
131            JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
132                source,
133                journal: values.clone(),
134            },
135            JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
136                source,
137                journal: values.clone(),
138                fields: fields.clone(),
139            },
140        }
141    }
142}