cala_ledger/account/
mod.rs

1//! [Account] holds a balance in a [Journal](crate::journal::Journal)
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::account_cursor::*;
19use repo::*;
20
21/// Service for working with `Account` entities.
22#[derive(Clone)]
23pub struct Accounts {
24    repo: AccountRepo,
25    outbox: Outbox,
26    pool: PgPool,
27}
28
29impl Accounts {
30    pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31        Self {
32            repo: AccountRepo::new(pool),
33            outbox,
34            pool: pool.clone(),
35        }
36    }
37
38    pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
39        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
40        let account = self.create_in_op(&mut op, new_account).await?;
41        op.commit().await?;
42        Ok(account)
43    }
44
45    #[instrument(name = "cala_ledger.accounts.create", skip(self, db))]
46    pub async fn create_in_op(
47        &self,
48        db: &mut LedgerOperation<'_>,
49        new_account: NewAccount,
50    ) -> Result<Account, AccountError> {
51        let account = self.repo.create_in_op(db, new_account).await?;
52        db.accumulate(account.last_persisted(1).map(|p| &p.event));
53        Ok(account)
54    }
55
56    pub async fn create_all(
57        &self,
58        new_accounts: Vec<NewAccount>,
59    ) -> Result<Vec<Account>, AccountError> {
60        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
61        let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
62        op.commit().await?;
63        Ok(accounts)
64    }
65
66    #[instrument(name = "cala_ledger.accounts.create_all", skip(self, db))]
67    pub async fn create_all_in_op(
68        &self,
69        db: &mut LedgerOperation<'_>,
70        new_accounts: Vec<NewAccount>,
71    ) -> Result<Vec<Account>, AccountError> {
72        let accounts = self.repo.create_all_in_op(db, new_accounts).await?;
73        db.accumulate(
74            accounts
75                .iter()
76                .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
77        );
78        Ok(accounts)
79    }
80
81    pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
82        self.repo.find_by_id(account_id).await
83    }
84
85    #[instrument(name = "cala_ledger.accounts.find_all", skip(self), err)]
86    pub async fn find_all<T: From<Account>>(
87        &self,
88        account_ids: &[AccountId],
89    ) -> Result<HashMap<AccountId, T>, AccountError> {
90        self.repo.find_all(account_ids).await
91    }
92
93    #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db), err)]
94    pub async fn find_all_in_op<T: From<Account>>(
95        &self,
96        db: &mut LedgerOperation<'_>,
97        account_ids: &[AccountId],
98    ) -> Result<HashMap<AccountId, T>, AccountError> {
99        self.repo.find_all_in_op(db, account_ids).await
100    }
101
102    #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self), err)]
103    pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
104        self.repo.find_by_external_id(Some(external_id)).await
105    }
106
107    #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self), err)]
108    pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
109        self.repo.find_by_code(code).await
110    }
111
112    #[instrument(name = "cala_ledger.accounts.list", skip(self))]
113    pub async fn list(
114        &self,
115        query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
116    ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
117        self.repo.list_by_name(query, Default::default()).await
118    }
119
120    #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
121    pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
122        let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
123        self.persist_in_op(&mut op, account).await?;
124        op.commit().await?;
125        Ok(())
126    }
127
128    pub async fn persist_in_op(
129        &self,
130        db: &mut LedgerOperation<'_>,
131        account: &mut Account,
132    ) -> Result<(), AccountError> {
133        if account.is_account_set() {
134            return Err(AccountError::CannotUpdateAccountSetAccounts);
135        }
136
137        let n_events = self.repo.update_in_op(db, account).await?;
138        db.accumulate(account.last_persisted(n_events).map(|p| &p.event));
139        Ok(())
140    }
141
142    pub(crate) async fn update_velocity_context_values_in_op(
143        &self,
144        db: &mut LedgerOperation<'_>,
145        values: impl Into<VelocityContextAccountValues>,
146    ) -> Result<(), AccountError> {
147        self.repo
148            .update_velocity_context_values_in_op(db, values.into())
149            .await
150    }
151
152    #[cfg(feature = "import")]
153    pub async fn sync_account_creation(
154        &self,
155        mut db: es_entity::DbOpWithTime<'_>,
156        origin: DataSourceId,
157        values: AccountValues,
158    ) -> Result<(), AccountError> {
159        let mut account = Account::import(origin, values);
160        self.repo
161            .import_in_op(&mut db, origin, &mut account)
162            .await?;
163        let outbox_events: Vec<_> = account
164            .last_persisted(1)
165            .map(|p| OutboxEventPayload::from(&p.event))
166            .collect();
167        let now = db.now();
168        self.outbox
169            .persist_events_at(db, outbox_events, now)
170            .await?;
171        Ok(())
172    }
173
174    #[cfg(feature = "import")]
175    pub async fn sync_account_update(
176        &self,
177        mut db: es_entity::DbOpWithTime<'_>,
178        values: AccountValues,
179        fields: Vec<String>,
180    ) -> Result<(), AccountError> {
181        let mut account = self.repo.find_by_id(values.id).await?;
182        account.update((values, fields));
183        let n_events = self.repo.update_in_op(&mut db, &mut account).await?;
184        let outbox_events: Vec<_> = account
185            .last_persisted(n_events)
186            .map(|p| OutboxEventPayload::from(&p.event))
187            .collect();
188        let time = db.now();
189        self.outbox
190            .persist_events_at(db, outbox_events, time)
191            .await?;
192        Ok(())
193    }
194}
195
196impl From<&AccountEvent> for OutboxEventPayload {
197    fn from(event: &AccountEvent) -> Self {
198        match event {
199            #[cfg(feature = "import")]
200            AccountEvent::Imported {
201                source,
202                values: account,
203            } => OutboxEventPayload::AccountCreated {
204                source: *source,
205                account: account.clone(),
206            },
207            AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
208                source: DataSource::Local,
209                account: account.clone(),
210            },
211            AccountEvent::Updated {
212                values: account,
213                fields,
214            } => OutboxEventPayload::AccountUpdated {
215                source: DataSource::Local,
216                account: account.clone(),
217                fields: fields.clone(),
218            },
219        }
220    }
221}