cala_ledger/account/
mod.rs

1//! [Account] holds a balance in a [Journal](crate::journal::Journal)
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17pub use repo::account_cursor::*;
18use repo::*;
19
20/// Service for working with `Account` entities.
21#[derive(Clone)]
22pub struct Accounts {
23    repo: AccountRepo,
24    outbox: Outbox,
25    pool: PgPool,
26}
27
28impl Accounts {
29    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
30        Self {
31            repo: AccountRepo::new(pool),
32            outbox,
33            pool: pool.clone(),
34        }
35    }
36
37    pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
38        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
39        let account = self.create_in_op(&mut op, new_account).await?;
40        op.commit().await?;
41        Ok(account)
42    }
43
44    #[instrument(name = "cala_ledger.accounts.create", skip(self, db))]
45    pub async fn create_in_op(
46        &self,
47        db: &mut LedgerOperation<'_>,
48        new_account: NewAccount,
49    ) -> Result<Account, AccountError> {
50        let account = self.repo.create_in_op(db.op(), new_account).await?;
51        db.accumulate(account.events.last_persisted(1).map(|p| &p.event));
52        Ok(account)
53    }
54
55    pub async fn create_all(
56        &self,
57        new_accounts: Vec<NewAccount>,
58    ) -> Result<Vec<Account>, AccountError> {
59        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
60        let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
61        op.commit().await?;
62        Ok(accounts)
63    }
64
65    #[instrument(name = "cala_ledger.accounts.create_all", skip(self, db))]
66    pub async fn create_all_in_op(
67        &self,
68        db: &mut LedgerOperation<'_>,
69        new_accounts: Vec<NewAccount>,
70    ) -> Result<Vec<Account>, AccountError> {
71        let accounts = self.repo.create_all_in_op(db.op(), new_accounts).await?;
72        db.accumulate(
73            accounts
74                .iter()
75                .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
76        );
77        Ok(accounts)
78    }
79
80    pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
81        self.repo.find_by_id(account_id).await
82    }
83
84    #[instrument(name = "cala_ledger.accounts.find_all", skip(self), err)]
85    pub async fn find_all<T: From<Account>>(
86        &self,
87        account_ids: &[AccountId],
88    ) -> Result<HashMap<AccountId, T>, AccountError> {
89        self.repo.find_all(account_ids).await
90    }
91
92    #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db), err)]
93    pub async fn find_all_in_op<T: From<Account>>(
94        &self,
95        db: &mut LedgerOperation<'_>,
96        account_ids: &[AccountId],
97    ) -> Result<HashMap<AccountId, T>, AccountError> {
98        self.repo.find_all_in_tx(db.tx(), account_ids).await
99    }
100
101    #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self), err)]
102    pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
103        self.repo.find_by_external_id(Some(external_id)).await
104    }
105
106    #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self), err)]
107    pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
108        self.repo.find_by_code(code).await
109    }
110
111    #[instrument(name = "cala_ledger.accounts.list", skip(self))]
112    pub async fn list(
113        &self,
114        query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
115    ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
116        self.repo.list_by_name(query, Default::default()).await
117    }
118
119    #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
120    pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
121        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
122        self.persist_in_op(&mut op, account).await?;
123        op.commit().await?;
124        Ok(())
125    }
126
127    pub async fn persist_in_op(
128        &self,
129        db: &mut LedgerOperation<'_>,
130        account: &mut Account,
131    ) -> Result<(), AccountError> {
132        let n_events = self.repo.update_in_op(db.op(), account).await?;
133        db.accumulate(account.events.last_persisted(n_events).map(|p| &p.event));
134        Ok(())
135    }
136
137    #[cfg(feature = "import")]
138    pub async fn sync_account_creation(
139        &self,
140        mut db: es_entity::DbOp<'_>,
141        origin: DataSourceId,
142        values: AccountValues,
143    ) -> Result<(), AccountError> {
144        let mut account = Account::import(origin, values);
145        self.repo
146            .import_in_op(&mut db, origin, &mut account)
147            .await?;
148        let recorded_at = db.now();
149        let outbox_events: Vec<_> = account
150            .events
151            .last_persisted(1)
152            .map(|p| OutboxEventPayload::from(&p.event))
153            .collect();
154        self.outbox
155            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
156            .await?;
157        Ok(())
158    }
159
160    #[cfg(feature = "import")]
161    pub async fn sync_account_update(
162        &self,
163        mut db: es_entity::DbOp<'_>,
164        values: AccountValues,
165        fields: Vec<String>,
166    ) -> Result<(), AccountError> {
167        let mut account = self.repo.find_by_id(values.id).await?;
168        account.update((values, fields));
169        let n_events = self.repo.update_in_op(&mut db, &mut account).await?;
170        let recorded_at = db.now();
171        let outbox_events: Vec<_> = account
172            .events
173            .last_persisted(n_events)
174            .map(|p| OutboxEventPayload::from(&p.event))
175            .collect();
176        self.outbox
177            .persist_events_at(db.into_tx(), outbox_events, recorded_at)
178            .await?;
179        Ok(())
180    }
181}
182
183impl From<&AccountEvent> for OutboxEventPayload {
184    fn from(event: &AccountEvent) -> Self {
185        match event {
186            #[cfg(feature = "import")]
187            AccountEvent::Imported {
188                source,
189                values: account,
190            } => OutboxEventPayload::AccountCreated {
191                source: *source,
192                account: account.clone(),
193            },
194            AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
195                source: DataSource::Local,
196                account: account.clone(),
197            },
198            AccountEvent::Updated {
199                values: account,
200                fields,
201            } => OutboxEventPayload::AccountUpdated {
202                source: DataSource::Local,
203                account: account.clone(),
204                fields: fields.clone(),
205            },
206        }
207    }
208}