cala_ledger/journal/
mod.rs

1mod entity;
2pub mod error;
3mod repo;
4
5use es_entity::EsEntity;
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17use repo::*;
18
19/// Service for working with `Journal` entities.
20#[derive(Clone)]
21pub struct Journals {
22    repo: JournalRepo,
23    outbox: Outbox,
24    pool: PgPool,
25}
26
27impl Journals {
28    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
29        Self {
30            repo: JournalRepo::new(pool),
31            outbox,
32            pool: pool.clone(),
33        }
34    }
35
36    #[instrument(name = "cala_ledger.journals.create", skip(self))]
37    pub async fn create(&self, new_journal: NewJournal) -> Result<Journal, JournalError> {
38        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
39        let journal = self.create_in_op(&mut op, new_journal).await?;
40        op.commit().await?;
41        Ok(journal)
42    }
43
44    pub async fn create_in_op(
45        &self,
46        db: &mut LedgerOperation<'_>,
47        new_journal: NewJournal,
48    ) -> Result<Journal, JournalError> {
49        let journal = self.repo.create_in_op(db.op(), new_journal).await?;
50        db.accumulate(journal.last_persisted(1).map(|p| &p.event));
51        Ok(journal)
52    }
53
54    #[instrument(name = "cala_ledger.journals.find_all", skip(self), err)]
55    pub async fn find_all<T: From<Journal>>(
56        &self,
57        journal_ids: &[JournalId],
58    ) -> Result<HashMap<JournalId, T>, JournalError> {
59        self.repo.find_all(journal_ids).await
60    }
61
62    #[instrument(name = "cala_ledger.journals.find_by_id", skip(self), err)]
63    pub async fn find(&self, journal_id: JournalId) -> Result<Journal, JournalError> {
64        self.repo.find_by_id(journal_id).await
65    }
66
67    #[instrument(name = "cala_ledger.journals.persist", skip(self, journal))]
68    pub async fn persist(&self, journal: &mut Journal) -> Result<(), JournalError> {
69        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
70        self.persist_in_op(&mut op, journal).await?;
71        op.commit().await?;
72        Ok(())
73    }
74
75    pub async fn persist_in_op(
76        &self,
77        db: &mut LedgerOperation<'_>,
78        journal: &mut Journal,
79    ) -> Result<(), JournalError> {
80        let n_events = self.repo.update_in_op(db.op(), journal).await?;
81        db.accumulate(journal.last_persisted(n_events).map(|p| &p.event));
82        Ok(())
83    }
84
85    #[instrument(name = "cala_ledger.journal.find_by_code", skip(self), err)]
86    pub async fn find_by_code(&self, code: String) -> Result<Journal, JournalError> {
87        self.repo.find_by_code(Some(code)).await
88    }
89
90    #[cfg(feature = "import")]
91    pub async fn sync_journal_creation(
92        &self,
93        mut db: es_entity::DbOp<'_>,
94        origin: DataSourceId,
95        values: JournalValues,
96    ) -> Result<(), JournalError> {
97        let mut journal = Journal::import(origin, values);
98        self.repo
99            .import_in_op(&mut db, origin, &mut journal)
100            .await?;
101        let recorded_at = db.now();
102        let outbox_events: Vec<_> = journal
103            .last_persisted(1)
104            .map(|p| OutboxEventPayload::from(&p.event))
105            .collect();
106        self.outbox
107            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
108            .await?;
109        Ok(())
110    }
111
112    #[cfg(feature = "import")]
113    pub async fn sync_journal_update(
114        &self,
115        mut db: es_entity::DbOp<'_>,
116        values: JournalValues,
117        fields: Vec<String>,
118    ) -> Result<(), JournalError> {
119        let mut journal = self.repo.find_by_id(values.id).await?;
120        journal.update((values, fields));
121        let n_events = self.repo.update_in_op(&mut db, &mut journal).await?;
122        let recorded_at = db.now();
123        let outbox_events: Vec<_> = journal
124            .last_persisted(n_events)
125            .map(|p| OutboxEventPayload::from(&p.event))
126            .collect();
127        self.outbox
128            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
129            .await?;
130        Ok(())
131    }
132}
133
134impl From<&JournalEvent> for OutboxEventPayload {
135    fn from(event: &JournalEvent) -> Self {
136        match event {
137            #[cfg(feature = "import")]
138            JournalEvent::Imported { source, values } => OutboxEventPayload::JournalCreated {
139                source: *source,
140                journal: values.clone(),
141            },
142            JournalEvent::Initialized { values } => OutboxEventPayload::JournalCreated {
143                source: DataSource::Local,
144                journal: values.clone(),
145            },
146            JournalEvent::Updated { values, fields } => OutboxEventPayload::JournalUpdated {
147                source: DataSource::Local,
148                journal: values.clone(),
149                fields: fields.clone(),
150            },
151        }
152    }
153}