cala_ledger/account/
mod.rs

1//! [Account] holds a balance in a [Journal](crate::journal::Journal)
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::account_cursor::*;
19use repo::*;
20
21/// Service for working with `Account` entities.
22#[derive(Clone)]
23pub struct Accounts {
24    repo: AccountRepo,
25    outbox: Outbox,
26    pool: PgPool,
27}
28
29impl Accounts {
30    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31        Self {
32            repo: AccountRepo::new(pool),
33            outbox,
34            pool: pool.clone(),
35        }
36    }
37
38    pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
39        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
40        let account = self.create_in_op(&mut op, new_account).await?;
41        op.commit().await?;
42        Ok(account)
43    }
44
45    #[instrument(name = "cala_ledger.accounts.create", skip(self, db))]
46    pub async fn create_in_op(
47        &self,
48        db: &mut LedgerOperation<'_>,
49        new_account: NewAccount,
50    ) -> Result<Account, AccountError> {
51        let account = self.repo.create_in_op(db.op(), new_account).await?;
52        db.accumulate(account.last_persisted(1).map(|p| &p.event));
53        Ok(account)
54    }
55
56    pub async fn create_all(
57        &self,
58        new_accounts: Vec<NewAccount>,
59    ) -> Result<Vec<Account>, AccountError> {
60        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
61        let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
62        op.commit().await?;
63        Ok(accounts)
64    }
65
66    #[instrument(name = "cala_ledger.accounts.create_all", skip(self, db))]
67    pub async fn create_all_in_op(
68        &self,
69        db: &mut LedgerOperation<'_>,
70        new_accounts: Vec<NewAccount>,
71    ) -> Result<Vec<Account>, AccountError> {
72        let accounts = self.repo.create_all_in_op(db.op(), new_accounts).await?;
73        db.accumulate(
74            accounts
75                .iter()
76                .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
77        );
78        Ok(accounts)
79    }
80
81    pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
82        self.repo.find_by_id(account_id).await
83    }
84
85    #[instrument(name = "cala_ledger.accounts.find_all", skip(self), err)]
86    pub async fn find_all<T: From<Account>>(
87        &self,
88        account_ids: &[AccountId],
89    ) -> Result<HashMap<AccountId, T>, AccountError> {
90        self.repo.find_all(account_ids).await
91    }
92
93    #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db), err)]
94    pub async fn find_all_in_op<T: From<Account>>(
95        &self,
96        db: &mut LedgerOperation<'_>,
97        account_ids: &[AccountId],
98    ) -> Result<HashMap<AccountId, T>, AccountError> {
99        self.repo.find_all_in_tx(db.tx(), account_ids).await
100    }
101
102    #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self), err)]
103    pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
104        self.repo.find_by_external_id(Some(external_id)).await
105    }
106
107    #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self), err)]
108    pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
109        self.repo.find_by_code(code).await
110    }
111
112    #[instrument(name = "cala_ledger.accounts.list", skip(self))]
113    pub async fn list(
114        &self,
115        query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
116    ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
117        self.repo.list_by_name(query, Default::default()).await
118    }
119
120    #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
121    pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
122        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
123        self.persist_in_op(&mut op, account).await?;
124        op.commit().await?;
125        Ok(())
126    }
127
128    pub async fn persist_in_op(
129        &self,
130        db: &mut LedgerOperation<'_>,
131        account: &mut Account,
132    ) -> Result<(), AccountError> {
133        let n_events = self.repo.update_in_op(db.op(), account).await?;
134        db.accumulate(account.last_persisted(n_events).map(|p| &p.event));
135        Ok(())
136    }
137
138    #[cfg(feature = "import")]
139    pub async fn sync_account_creation(
140        &self,
141        mut db: es_entity::DbOp<'_>,
142        origin: DataSourceId,
143        values: AccountValues,
144    ) -> Result<(), AccountError> {
145        let mut account = Account::import(origin, values);
146        self.repo
147            .import_in_op(&mut db, origin, &mut account)
148            .await?;
149        let recorded_at = db.now();
150        let outbox_events: Vec<_> = account
151            .last_persisted(1)
152            .map(|p| OutboxEventPayload::from(&p.event))
153            .collect();
154        self.outbox
155            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
156            .await?;
157        Ok(())
158    }
159
160    #[cfg(feature = "import")]
161    pub async fn sync_account_update(
162        &self,
163        mut db: es_entity::DbOp<'_>,
164        values: AccountValues,
165        fields: Vec<String>,
166    ) -> Result<(), AccountError> {
167        let mut account = self.repo.find_by_id(values.id).await?;
168        account.update((values, fields));
169        let n_events = self.repo.update_in_op(&mut db, &mut account).await?;
170        let recorded_at = db.now();
171        let outbox_events: Vec<_> = account
172            .last_persisted(n_events)
173            .map(|p| OutboxEventPayload::from(&p.event))
174            .collect();
175        self.outbox
176            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
177            .await?;
178        Ok(())
179    }
180}
181
182impl From<&AccountEvent> for OutboxEventPayload {
183    fn from(event: &AccountEvent) -> Self {
184        match event {
185            #[cfg(feature = "import")]
186            AccountEvent::Imported {
187                source,
188                values: account,
189            } => OutboxEventPayload::AccountCreated {
190                source: *source,
191                account: account.clone(),
192            },
193            AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
194                source: DataSource::Local,
195                account: account.clone(),
196            },
197            AccountEvent::Updated {
198                values: account,
199                fields,
200            } => OutboxEventPayload::AccountUpdated {
201                source: DataSource::Local,
202                account: account.clone(),
203                fields: fields.clone(),
204            },
205        }
206    }
207}