cala_ledger/journal/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use sqlx::PgPool;
6use tracing::instrument;
7
8use std::collections::HashMap;
9
10#[cfg(feature = "import")]
11use crate::primitives::DataSourceId;
12use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
13
14pub use entity::*;
15use error::*;
16use repo::*;
17
18/// Service for working with `Journal` entities.
19#[derive(Clone)]
20pub struct Journals {
21    repo: JournalRepo,
22    outbox: Outbox,
23    pool: PgPool,
24}
25
26impl Journals {
27    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
28        Self {
29            repo: JournalRepo::new(pool),
30            outbox,
31            pool: pool.clone(),
32        }
33    }
34
35    #[instrument(name = "cala_ledger.journals.create", skip(self))]
36    pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
37        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
38        let journal = self.create_in_op(&mut op, new_journal).await?;
39        op.commit().await?;
40        Ok(journal)
41    }
42
43    pub async fn create_in_op(
44        &self,
45        db: &mut LedgerOperation<'_>,
46        new_journal: NewJournal,
47    ) -> Result<Journal, JournalError> {
48        let journal = self.repo.create_in_op(db.op(), new_journal).await?;
49        db.accumulate(journal.events.last_persisted(1).map(|p| &p.event));
50        Ok(journal)
51    }
52
53    #[instrument(name = "cala_ledger.journals.find_all", skip(self), err)]
54    pub async fn find_all<T: From<Journal>>(
55        &self,
56        journal_ids: &[JournalId],
57    ) -> Result<HashMap<JournalId, T>, JournalError> {
58        self.repo.find_all(journal_ids).await
59    }
60
61    #[instrument(name = "cala_ledger.journals.find_by_id", skip(self), err)]
62    pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
63        self.repo.find_by_id(journal_id).await
64    }
65
66    #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
67    pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
68        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
69        self.persist_in_op(&mut op, journal).await?;
70        op.commit().await?;
71        Ok(())
72    }
73
74    pub async fn persist_in_op(
75        &self,
76        db: &mut LedgerOperation<'_>,
77        journal: &mut Journal,
78    ) -> Result<(), JournalError> {
79        let n_events = self.repo.update_in_op(db.op(), journal).await?;
80        db.accumulate(journal.events.last_persisted(n_events).map(|p| &p.event));
81        Ok(())
82    }
83
84    #[instrument(name = "cala_ledger.journal.find_by_code", skip(self), err)]
85    pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
86        self.repo.find_by_code(Some(code)).await
87    }
88
89    #[cfg(feature = "import")]
90    pub async fn sync_journal_creation(
91        &self,
92        mut db: es_entity::DbOp<'_>,
93        origin: DataSourceId,
94        values: JournalValues,
95    ) -> Result<(), JournalError> {
96        let mut journal = Journal::import(origin, values);
97        self.repo
98            .import_in_op(&mut db, origin, &mut journal)
99            .await?;
100        let recorded_at = db.now();
101        let outbox_events: Vec<_> = journal
102            .events
103            .last_persisted(1)
104            .map(|p| OutboxEventPayload::from(&p.event))
105            .collect();
106        self.outbox
107            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
108            .await?;
109        Ok(())
110    }
111
112    #[cfg(feature = "import")]
113    pub async fn sync_journal_update(
114        &self,
115        mut db: es_entity::DbOp<'_>,
116        values: JournalValues,
117        fields: Vec<String>,
118    ) -> Result<(), JournalError> {
119        let mut journal = self.repo.find_by_id(values.id).await?;
120        journal.update((values, fields));
121        let n_events = self.repo.update_in_op(&mut db, &mut journal).await?;
122        let recorded_at = db.now();
123        let outbox_events: Vec<_> = journal
124            .events
125            .last_persisted(n_events)
126            .map(|p| OutboxEventPayload::from(&p.event))
127            .collect();
128        self.outbox
129            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
130            .await?;
131        Ok(())
132    }
133}
134
135impl From<&JournalEvent> for OutboxEventPayload {
136    fn from(event: &JournalEvent) -> Self {
137        match event {
138            #[cfg(feature = "import")]
139            JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
140                source: *source,
141                journal: values.clone(),
142            },
143            JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
144                source: DataSource::Local,
145                journal: values.clone(),
146            },
147            JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
148                source: DataSource::Local,
149                journal: values.clone(),
150                fields: fields.clone(),
151            },
152        }
153    }
154}