cala_ledger/account/
mod.rs

1//! [Account] holds a balance in a [Journal](crate::journal::Journal)
2mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14    outbox::*,
15    primitives::{DataSource, Status},
16};
17
18pub use entity::*;
19use error::*;
20pub use repo::account_cursor::*;
21use repo::*;
22
23/// Service for working with `Account` entities.
24#[derive(Clone)]
25pub struct Accounts {
26    repo: AccountRepo,
27}
28
29impl Accounts {
30    pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
31        Self {
32            repo: AccountRepo::new(pool, publisher),
33        }
34    }
35
36    #[instrument(name = "cala_ledger.accounts.create", skip_all)]
37    pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
38        let mut op = self.repo.begin_op().await?;
39        let account = self.create_in_op(&mut op, new_account).await?;
40        op.commit().await?;
41        Ok(account)
42    }
43
44    #[instrument(name = "cala_ledger.accounts.create_in_op", skip(self, db))]
45    pub async fn create_in_op(
46        &self,
47        db: &mut impl es_entity::AtomicOperation,
48        new_account: NewAccount,
49    ) -> Result<Account, AccountError> {
50        let account = self.repo.create_in_op(db, new_account).await?;
51        Ok(account)
52    }
53
54    #[instrument(name = "cala_ledger.accounts.create_all", skip_all)]
55    pub async fn create_all(
56        &self,
57        new_accounts: Vec<NewAccount>,
58    ) -> Result<Vec<Account>, AccountError> {
59        let mut op = self.repo.begin_op().await?;
60        let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
61        op.commit().await?;
62        Ok(accounts)
63    }
64
65    #[instrument(name = "cala_ledger.accounts.create_all_in_op", skip(self, db))]
66    pub async fn create_all_in_op(
67        &self,
68        db: &mut impl es_entity::AtomicOperation,
69        new_accounts: Vec<NewAccount>,
70    ) -> Result<Vec<Account>, AccountError> {
71        let accounts = self.repo.create_all_in_op(db, new_accounts).await?;
72        Ok(accounts)
73    }
74
75    #[instrument(name = "cala_ledger.accounts.find", skip_all)]
76    pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
77        self.repo.find_by_id(account_id).await
78    }
79
80    #[instrument(name = "cala_ledger.accounts.find_all", skip(self))]
81    pub async fn find_all<T: From<Account>>(
82        &self,
83        account_ids: &[AccountId],
84    ) -> Result<HashMap<AccountId, T>, AccountError> {
85        self.repo.find_all(account_ids).await
86    }
87
88    #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db))]
89    pub async fn find_all_in_op<T: From<Account>>(
90        &self,
91        db: &mut impl es_entity::AtomicOperation,
92        account_ids: &[AccountId],
93    ) -> Result<HashMap<AccountId, T>, AccountError> {
94        self.repo.find_all_in_op(db, account_ids).await
95    }
96
97    #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self))]
98    pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
99        self.repo.find_by_external_id(Some(external_id)).await
100    }
101
102    #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self))]
103    pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
104        self.repo.find_by_code(code).await
105    }
106
107    #[instrument(name = "cala_ledger.accounts.list", skip(self))]
108    pub async fn list(
109        &self,
110        query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
111    ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
112        self.repo.list_by_name(query, Default::default()).await
113    }
114
115    #[instrument(name = "cala_ledger.accounts.lock_in_op", skip(self, db))]
116    pub async fn lock_in_op(
117        &self,
118        db: &mut impl es_entity::AtomicOperation,
119        id: AccountId,
120    ) -> Result<(), AccountError> {
121        let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
122        if account.update_status(Status::Locked).did_execute() {
123            self.persist_in_op(db, &mut account).await?;
124        }
125        Ok(())
126    }
127
128    #[instrument(name = "cala_ledger.accounts.unlock_in_op", skip(self, db))]
129    pub async fn unlock_in_op(
130        &self,
131        db: &mut impl es_entity::AtomicOperation,
132        id: AccountId,
133    ) -> Result<(), AccountError> {
134        let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
135        if account.update_status(Status::Active).did_execute() {
136            self.persist_in_op(db, &mut account).await?;
137        }
138        Ok(())
139    }
140
141    #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
142    pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
143        let mut op = self.repo.begin_op().await?;
144        self.persist_in_op(&mut op, account).await?;
145        op.commit().await?;
146        Ok(())
147    }
148
149    #[instrument(name = "cala_ledger.accounts.persist_in_op", skip_all)]
150    pub async fn persist_in_op(
151        &self,
152        db: &mut impl es_entity::AtomicOperation,
153        account: &mut Account,
154    ) -> Result<(), AccountError> {
155        if account.is_account_set() {
156            return Err(AccountError::CannotUpdateAccountSetAccounts);
157        }
158        self.repo.update_in_op(db, account).await?;
159        Ok(())
160    }
161
162    #[instrument(
163        name = "cala_ledger.accounts.update_velocity_context_values_in_op",
164        skip_all
165    )]
166    pub(crate) async fn update_velocity_context_values_in_op(
167        &self,
168        db: &mut impl es_entity::AtomicOperation,
169        values: impl Into<VelocityContextAccountValues>,
170    ) -> Result<(), AccountError> {
171        self.repo
172            .update_velocity_context_values_in_op(db, values.into())
173            .await
174    }
175
176    #[cfg(feature = "import")]
177    #[instrument(name = "cala_ledger.accounts.sync_account_creation", skip_all)]
178    pub async fn sync_account_creation(
179        &self,
180        mut db: es_entity::DbOpWithTime<'_>,
181        origin: DataSourceId,
182        values: AccountValues,
183    ) -> Result<(), AccountError> {
184        let mut account = Account::import(origin, values);
185        self.repo
186            .import_in_op(&mut db, origin, &mut account)
187            .await?;
188        db.commit().await?;
189        Ok(())
190    }
191
192    #[cfg(feature = "import")]
193    #[instrument(name = "cala_ledger.accounts.sync_account_update", skip_all)]
194    pub async fn sync_account_update(
195        &self,
196        mut db: es_entity::DbOpWithTime<'_>,
197        values: AccountValues,
198        fields: Vec<String>,
199    ) -> Result<(), AccountError> {
200        let mut account = self.repo.find_by_id_in_op(&mut db, values.id).await?;
201        let _ = account.update((values, fields));
202        self.repo.update_in_op(&mut db, &mut account).await?;
203        db.commit().await?;
204        Ok(())
205    }
206}
207
208impl From<&AccountEvent> for OutboxEventPayload {
209    fn from(event: &AccountEvent) -> Self {
210        let source = es_entity::context::EventContext::current()
211            .data()
212            .lookup("data_source")
213            .ok()
214            .flatten()
215            .unwrap_or(DataSource::Local);
216
217        match event {
218            #[cfg(feature = "import")]
219            AccountEvent::Imported {
220                source,
221                values: account,
222            } => OutboxEventPayload::AccountCreated {
223                source: *source,
224                account: account.clone(),
225            },
226            AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
227                source,
228                account: account.clone(),
229            },
230            AccountEvent::Updated {
231                values: account,
232                fields,
233            } => OutboxEventPayload::AccountUpdated {
234                source,
235                account: account.clone(),
236                fields: fields.clone(),
237            },
238        }
239    }
240}