1mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::clock::ClockHandle;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::{
15 outbox::*,
16 primitives::{DataSource, Status},
17};
18
19pub use entity::*;
20use error::*;
21pub use repo::account_cursor::*;
22use repo::*;
23
24#[derive(Clone)]
26pub struct Accounts {
27 repo: AccountRepo,
28 clock: ClockHandle,
29}
30
31impl Accounts {
32 pub(crate) fn new(pool: &PgPool, publisher: &OutboxPublisher, clock: &ClockHandle) -> Self {
33 Self {
34 repo: AccountRepo::new(pool, publisher),
35 clock: clock.clone(),
36 }
37 }
38
39 #[instrument(name = "cala_ledger.accounts.create", skip_all)]
40 pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
41 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
42 let account = self.create_in_op(&mut op, new_account).await?;
43 op.commit().await?;
44 Ok(account)
45 }
46
47 #[instrument(name = "cala_ledger.accounts.create_in_op", skip(self, db))]
48 pub async fn create_in_op(
49 &self,
50 db: &mut impl es_entity::AtomicOperation,
51 new_account: NewAccount,
52 ) -> Result<Account, AccountError> {
53 let account = self.repo.create_in_op(db, new_account).await?;
54 Ok(account)
55 }
56
57 #[instrument(name = "cala_ledger.accounts.create_all", skip_all)]
58 pub async fn create_all(
59 &self,
60 new_accounts: Vec<NewAccount>,
61 ) -> Result<Vec<Account>, AccountError> {
62 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
63 let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
64 op.commit().await?;
65 Ok(accounts)
66 }
67
68 #[instrument(name = "cala_ledger.accounts.create_all_in_op", skip(self, db))]
69 pub async fn create_all_in_op(
70 &self,
71 db: &mut impl es_entity::AtomicOperation,
72 new_accounts: Vec<NewAccount>,
73 ) -> Result<Vec<Account>, AccountError> {
74 let accounts = self.repo.create_all_in_op(db, new_accounts).await?;
75 Ok(accounts)
76 }
77
78 #[instrument(name = "cala_ledger.accounts.find", skip_all)]
79 pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
80 self.repo.find_by_id(account_id).await
81 }
82
83 #[instrument(name = "cala_ledger.accounts.find_all", skip(self))]
84 pub async fn find_all<T: From<Account>>(
85 &self,
86 account_ids: &[AccountId],
87 ) -> Result<HashMap<AccountId, T>, AccountError> {
88 self.repo.find_all(account_ids).await
89 }
90
91 #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db))]
92 pub async fn find_all_in_op<T: From<Account>>(
93 &self,
94 db: &mut impl es_entity::AtomicOperation,
95 account_ids: &[AccountId],
96 ) -> Result<HashMap<AccountId, T>, AccountError> {
97 self.repo.find_all_in_op(db, account_ids).await
98 }
99
100 #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self))]
101 pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
102 self.repo.find_by_external_id(Some(external_id)).await
103 }
104
105 #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self))]
106 pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
107 self.repo.find_by_code(code).await
108 }
109
110 #[instrument(name = "cala_ledger.accounts.list", skip(self))]
111 pub async fn list(
112 &self,
113 query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
114 ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
115 self.repo.list_by_name(query, Default::default()).await
116 }
117
118 #[instrument(name = "cala_ledger.accounts.lock_in_op", skip(self, db))]
119 pub async fn lock_in_op(
120 &self,
121 db: &mut impl es_entity::AtomicOperation,
122 id: AccountId,
123 ) -> Result<(), AccountError> {
124 let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
125 if account.update_status(Status::Locked).did_execute() {
126 self.persist_in_op(db, &mut account).await?;
127 }
128 Ok(())
129 }
130
131 #[instrument(name = "cala_ledger.accounts.unlock_in_op", skip(self, db))]
132 pub async fn unlock_in_op(
133 &self,
134 db: &mut impl es_entity::AtomicOperation,
135 id: AccountId,
136 ) -> Result<(), AccountError> {
137 let mut account = self.repo.find_by_id_in_op(&mut *db, id).await?;
138 if account.update_status(Status::Active).did_execute() {
139 self.persist_in_op(db, &mut account).await?;
140 }
141 Ok(())
142 }
143
144 #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
145 pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
146 let mut op = self.repo.begin_op_with_clock(&self.clock).await?;
147 self.persist_in_op(&mut op, account).await?;
148 op.commit().await?;
149 Ok(())
150 }
151
152 #[instrument(name = "cala_ledger.accounts.persist_in_op", skip_all)]
153 pub async fn persist_in_op(
154 &self,
155 db: &mut impl es_entity::AtomicOperation,
156 account: &mut Account,
157 ) -> Result<(), AccountError> {
158 if account.is_account_set() {
159 return Err(AccountError::CannotUpdateAccountSetAccounts);
160 }
161 self.repo.update_in_op(db, account).await?;
162 Ok(())
163 }
164
165 #[instrument(
166 name = "cala_ledger.accounts.update_velocity_context_values_in_op",
167 skip_all
168 )]
169 pub(crate) async fn update_velocity_context_values_in_op(
170 &self,
171 db: &mut impl es_entity::AtomicOperation,
172 values: impl Into<VelocityContextAccountValues>,
173 ) -> Result<(), AccountError> {
174 self.repo
175 .update_velocity_context_values_in_op(db, values.into())
176 .await
177 }
178
179 #[cfg(feature = "import")]
180 #[instrument(name = "cala_ledger.accounts.sync_account_creation", skip_all)]
181 pub async fn sync_account_creation(
182 &self,
183 mut db: es_entity::DbOpWithTime<'_>,
184 origin: DataSourceId,
185 values: AccountValues,
186 ) -> Result<(), AccountError> {
187 let mut account = Account::import(origin, values);
188 self.repo
189 .import_in_op(&mut db, origin, &mut account)
190 .await?;
191 db.commit().await?;
192 Ok(())
193 }
194
195 #[cfg(feature = "import")]
196 #[instrument(name = "cala_ledger.accounts.sync_account_update", skip_all)]
197 pub async fn sync_account_update(
198 &self,
199 mut db: es_entity::DbOpWithTime<'_>,
200 values: AccountValues,
201 fields: Vec<String>,
202 ) -> Result<(), AccountError> {
203 let mut account = self.repo.find_by_id_in_op(&mut db, values.id).await?;
204 let _ = account.update((values, fields));
205 self.repo.update_in_op(&mut db, &mut account).await?;
206 db.commit().await?;
207 Ok(())
208 }
209}
210
211impl From<&AccountEvent> for OutboxEventPayload {
212 fn from(event: &AccountEvent) -> Self {
213 let source = es_entity::context::EventContext::current()
214 .data()
215 .lookup("data_source")
216 .ok()
217 .flatten()
218 .unwrap_or(DataSource::Local);
219
220 match event {
221 #[cfg(feature = "import")]
222 AccountEvent::Imported {
223 source,
224 values: account,
225 } => OutboxEventPayload::AccountCreated {
226 source: *source,
227 account: account.clone(),
228 },
229 AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
230 source,
231 account: account.clone(),
232 },
233 AccountEvent::Updated {
234 values: account,
235 fields,
236 } => OutboxEventPayload::AccountUpdated {
237 source,
238 account: account.clone(),
239 fields: fields.clone(),
240 },
241 }
242 }
243}