cala_ledger/account/
mod.rs

1//! [Account] holds a balance in a [Journal](crate::journal::Journal)
2mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::clock::ClockHandle;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::{
15    outbox::*,
16    primitives::{DataSource, Status},
17};
18
19pub use entity::*;
20use error::*;
21pub use repo::account_cursor::*;
22use repo::*;
23
24/// Service for working with `Account` entities.
25#[derive(Clone)]
26pub struct Accounts {
27    repo: AccountRepo,
28    clock: ClockHandle,
29}
30
31impl Accounts {
32    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher, clock: &ClockHandle) -> Self {
33        Self {
34            repo: AccountRepo::new(pool, publisher),
35            clock: clock.clone(),
36        }
37    }
38
39    #[instrument(name = "cala_ledger.accounts.create", skip_all)]
40    pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
41        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
42        let account = self.create_in_op(&mut op, new_account).await?;
43        op.commit().await?;
44        Ok(account)
45    }
46
47    #[instrument(name = "cala_ledger.accounts.create_in_op", skip(self, db))]
48    pub async fn create_in_op(
49        &self,
50        db: &mut impl es_entity::AtomicOperation,
51        new_account: NewAccount,
52    ) -> Result<Account, AccountError> {
53        let account = self.repo.create_in_op(db, new_account).await?;
54        Ok(account)
55    }
56
57    #[instrument(name = "cala_ledger.accounts.create_all", skip_all)]
58    pub async fn create_all(
59        &self,
60        new_accounts: Vec<NewAccount>,
61    ) -> Result<Vec<Account>, AccountError> {
62        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
63        let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
64        op.commit().await?;
65        Ok(accounts)
66    }
67
68    #[instrument(name = "cala_ledger.accounts.create_all_in_op", skip(self, db))]
69    pub async fn create_all_in_op(
70        &self,
71        db: &mut impl es_entity::AtomicOperation,
72        new_accounts: Vec<NewAccount>,
73    ) -> Result<Vec<Account>, AccountError> {
74        let accounts = self.repo.create_all_in_op(db, new_accounts).await?;
75        Ok(accounts)
76    }
77
78    #[instrument(name = "cala_ledger.accounts.find", skip_all)]
79    pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
80        self.repo.find_by_id(account_id).await
81    }
82
83    #[instrument(name = "cala_ledger.accounts.find_all", skip(self))]
84    pub async fn find_all<T: From<Account>>(
85        &self,
86        account_ids: &[AccountId],
87    ) -> Result<HashMap<AccountId, T>, AccountError> {
88        self.repo.find_all(account_ids).await
89    }
90
91    #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db))]
92    pub async fn find_all_in_op<T: From<Account>>(
93        &self,
94        db: &mut impl es_entity::AtomicOperation,
95        account_ids: &[AccountId],
96    ) -> Result<HashMap<AccountId, T>, AccountError> {
97        self.repo.find_all_in_op(db, account_ids).await
98    }
99
100    #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self))]
101    pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
102        self.repo.find_by_external_id(Some(external_id)).await
103    }
104
105    #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self))]
106    pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
107        self.repo.find_by_code(code).await
108    }
109
110    #[instrument(name = "cala_ledger.accounts.list", skip(self))]
111    pub async fn list(
112        &self,
113        query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
114    ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
115        self.repo.list_by_name(query, Default::default()).await
116    }
117
118    #[instrument(name = "cala_ledger.accounts.lock_in_op", skip(self, db))]
119    pub async fn lock_in_op(
120        &self,
121        db: &mut impl es_entity::AtomicOperation,
122        id: AccountId,
123    ) -> Result<(), AccountError> {
124        let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
125        if account.update_status(Status::Locked).did_execute() {
126            self.persist_in_op(db, &mut account).await?;
127        }
128        Ok(())
129    }
130
131    #[instrument(name = "cala_ledger.accounts.unlock_in_op", skip(self, db))]
132    pub async fn unlock_in_op(
133        &self,
134        db: &mut impl es_entity::AtomicOperation,
135        id: AccountId,
136    ) -> Result<(), AccountError> {
137        let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
138        if account.update_status(Status::Active).did_execute() {
139            self.persist_in_op(db, &mut account).await?;
140        }
141        Ok(())
142    }
143
144    #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
145    pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
146        let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
147        self.persist_in_op(&mut op, account).await?;
148        op.commit().await?;
149        Ok(())
150    }
151
152    #[instrument(name = "cala_ledger.accounts.persist_in_op", skip_all)]
153    pub async fn persist_in_op(
154        &self,
155        db: &mut impl es_entity::AtomicOperation,
156        account: &mut Account,
157    ) -> Result<(), AccountError> {
158        if account.is_account_set() {
159            return Err(AccountError::CannotUpdateAccountSetAccounts);
160        }
161        self.repo.update_in_op(db, account).await?;
162        Ok(())
163    }
164
165    #[instrument(
166        name = "cala_ledger.accounts.update_velocity_context_values_in_op",
167        skip_all
168    )]
169    pub(crate) async fn update_velocity_context_values_in_op(
170        &self,
171        db: &mut impl es_entity::AtomicOperation,
172        values: impl Into<VelocityContextAccountValues>,
173    ) -> Result<(), AccountError> {
174        self.repo
175            .update_velocity_context_values_in_op(db, values.into())
176            .await
177    }
178
179    #[cfg(feature = "import")]
180    #[instrument(name = "cala_ledger.accounts.sync_account_creation", skip_all)]
181    pub async fn sync_account_creation(
182        &self,
183        mut db: es_entity::DbOpWithTime<'_>,
184        origin: DataSourceId,
185        values: AccountValues,
186    ) -> Result<(), AccountError> {
187        let mut account = Account::import(origin, values);
188        self.repo
189            .import_in_op(&mut db, origin, &mut account)
190            .await?;
191        db.commit().await?;
192        Ok(())
193    }
194
195    #[cfg(feature = "import")]
196    #[instrument(name = "cala_ledger.accounts.sync_account_update", skip_all)]
197    pub async fn sync_account_update(
198        &self,
199        mut db: es_entity::DbOpWithTime<'_>,
200        values: AccountValues,
201        fields: Vec<String>,
202    ) -> Result<(), AccountError> {
203        let mut account = self.repo.find_by_id_in_op(&mut db, values.id).await?;
204        let _ = account.update((values, fields));
205        self.repo.update_in_op(&mut db, &mut account).await?;
206        db.commit().await?;
207        Ok(())
208    }
209}
210
211impl From<&AccountEvent> for OutboxEventPayload {
212    fn from(event: &AccountEvent) -> Self {
213        let source = es_entity::context::EventContext::current()
214            .data()
215            .lookup("data_source")
216            .ok()
217            .flatten()
218            .unwrap_or(DataSource::Local);
219
220        match event {
221            #[cfg(feature = "import")]
222            AccountEvent::Imported {
223                source,
224                values: account,
225            } => OutboxEventPayload::AccountCreated {
226                source: *source,
227                account: account.clone(),
228            },
229            AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
230                source,
231                account: account.clone(),
232            },
233            AccountEvent::Updated {
234                values: account,
235                fields,
236            } => OutboxEventPayload::AccountUpdated {
237                source,
238                account: account.clone(),
239                fields: fields.clone(),
240            },
241        }
242    }
243}