1mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{
14 outbox::*,
15 primitives::{DataSource, Status},
16};
17
18pub use entity::*;
19use error::*;
20pub use repo::account_cursor::*;
21use repo::*;
22
23#[derive(Clone)]
25pub struct Accounts {
26 repo: AccountRepo,
27}
28
29impl Accounts {
30 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
31 Self {
32 repo: AccountRepo::new(pool, publisher),
33 }
34 }
35
36 #[instrument(name = "cala_ledger.accounts.create", skip_all)]
37 pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
38 let mut op = self.repo.begin_op().await?;
39 let account = self.create_in_op(&mut op, new_account).await?;
40 op.commit().await?;
41 Ok(account)
42 }
43
44 #[instrument(name = "cala_ledger.accounts.create_in_op", skip(self, db))]
45 pub async fn create_in_op(
46 &self,
47 db: &mut impl es_entity::AtomicOperation,
48 new_account: NewAccount,
49 ) -> Result<Account, AccountError> {
50 let account = self.repo.create_in_op(db, new_account).await?;
51 Ok(account)
52 }
53
54 #[instrument(name = "cala_ledger.accounts.create_all", skip_all)]
55 pub async fn create_all(
56 &self,
57 new_accounts: Vec<NewAccount>,
58 ) -> Result<Vec<Account>, AccountError> {
59 let mut op = self.repo.begin_op().await?;
60 let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
61 op.commit().await?;
62 Ok(accounts)
63 }
64
65 #[instrument(name = "cala_ledger.accounts.create_all_in_op", skip(self, db))]
66 pub async fn create_all_in_op(
67 &self,
68 db: &mut impl es_entity::AtomicOperation,
69 new_accounts: Vec<NewAccount>,
70 ) -> Result<Vec<Account>, AccountError> {
71 let accounts = self.repo.create_all_in_op(db, new_accounts).await?;
72 Ok(accounts)
73 }
74
75 #[instrument(name = "cala_ledger.accounts.find", skip_all)]
76 pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
77 self.repo.find_by_id(account_id).await
78 }
79
80 #[instrument(name = "cala_ledger.accounts.find_all", skip(self))]
81 pub async fn find_all<T: From<Account>>(
82 &self,
83 account_ids: &[AccountId],
84 ) -> Result<HashMap<AccountId, T>, AccountError> {
85 self.repo.find_all(account_ids).await
86 }
87
88 #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db))]
89 pub async fn find_all_in_op<T: From<Account>>(
90 &self,
91 db: &mut impl es_entity::AtomicOperation,
92 account_ids: &[AccountId],
93 ) -> Result<HashMap<AccountId, T>, AccountError> {
94 self.repo.find_all_in_op(db, account_ids).await
95 }
96
97 #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self))]
98 pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
99 self.repo.find_by_external_id(Some(external_id)).await
100 }
101
102 #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self))]
103 pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
104 self.repo.find_by_code(code).await
105 }
106
107 #[instrument(name = "cala_ledger.accounts.list", skip(self))]
108 pub async fn list(
109 &self,
110 query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
111 ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
112 self.repo.list_by_name(query, Default::default()).await
113 }
114
115 #[instrument(name = "cala_ledger.accounts.lock_in_op", skip(self, db))]
116 pub async fn lock_in_op(
117 &self,
118 db: &mut impl es_entity::AtomicOperation,
119 id: AccountId,
120 ) -> Result<(), AccountError> {
121 let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
122 if account.update_status(Status::Locked).did_execute() {
123 self.persist_in_op(db, &mut account).await?;
124 }
125 Ok(())
126 }
127
128 #[instrument(name = "cala_ledger.accounts.unlock_in_op", skip(self, db))]
129 pub async fn unlock_in_op(
130 &self,
131 db: &mut impl es_entity::AtomicOperation,
132 id: AccountId,
133 ) -> Result<(), AccountError> {
134 let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
135 if account.update_status(Status::Active).did_execute() {
136 self.persist_in_op(db, &mut account).await?;
137 }
138 Ok(())
139 }
140
141 #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
142 pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
143 let mut op = self.repo.begin_op().await?;
144 self.persist_in_op(&mut op, account).await?;
145 op.commit().await?;
146 Ok(())
147 }
148
149 #[instrument(name = "cala_ledger.accounts.persist_in_op", skip_all)]
150 pub async fn persist_in_op(
151 &self,
152 db: &mut impl es_entity::AtomicOperation,
153 account: &mut Account,
154 ) -> Result<(), AccountError> {
155 if account.is_account_set() {
156 return Err(AccountError::CannotUpdateAccountSetAccounts);
157 }
158 self.repo.update_in_op(db, account).await?;
159 Ok(())
160 }
161
162 #[instrument(
163 name = "cala_ledger.accounts.update_velocity_context_values_in_op",
164 skip_all
165 )]
166 pub(crate) async fn update_velocity_context_values_in_op(
167 &self,
168 db: &mut impl es_entity::AtomicOperation,
169 values: impl Into<VelocityContextAccountValues>,
170 ) -> Result<(), AccountError> {
171 self.repo
172 .update_velocity_context_values_in_op(db, values.into())
173 .await
174 }
175
176 #[cfg(feature = "import")]
177 #[instrument(name = "cala_ledger.accounts.sync_account_creation", skip_all)]
178 pub async fn sync_account_creation(
179 &self,
180 mut db: es_entity::DbOpWithTime<'_>,
181 origin: DataSourceId,
182 values: AccountValues,
183 ) -> Result<(), AccountError> {
184 let mut account = Account::import(origin, values);
185 self.repo
186 .import_in_op(&mut db, origin, &mut account)
187 .await?;
188 db.commit().await?;
189 Ok(())
190 }
191
192 #[cfg(feature = "import")]
193 #[instrument(name = "cala_ledger.accounts.sync_account_update", skip_all)]
194 pub async fn sync_account_update(
195 &self,
196 mut db: es_entity::DbOpWithTime<'_>,
197 values: AccountValues,
198 fields: Vec<String>,
199 ) -> Result<(), AccountError> {
200 let mut account = self.repo.find_by_id_in_op(&mut db, values.id).await?;
201 let _ = account.update((values, fields));
202 self.repo.update_in_op(&mut db, &mut account).await?;
203 db.commit().await?;
204 Ok(())
205 }
206}
207
208impl From<&AccountEvent> for OutboxEventPayload {
209 fn from(event: &AccountEvent) -> Self {
210 let source = es_entity::context::EventContext::current()
211 .data()
212 .lookup("data_source")
213 .ok()
214 .flatten()
215 .unwrap_or(DataSource::Local);
216
217 match event {
218 #[cfg(feature = "import")]
219 AccountEvent::Imported {
220 source,
221 values: account,
222 } => OutboxEventPayload::AccountCreated {
223 source: *source,
224 account: account.clone(),
225 },
226 AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
227 source,
228 account: account.clone(),
229 },
230 AccountEvent::Updated {
231 values: account,
232 fields,
233 } => OutboxEventPayload::AccountUpdated {
234 source,
235 account: account.clone(),
236 fields: fields.clone(),
237 },
238 }
239 }
240}