1mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::account_cursor::*;
19use repo::*;
20
21#[derive(Clone)]
23pub struct Accounts {
24 repo: AccountRepo,
25 outbox: Outbox,
26 pool: PgPool,
27}
28
29impl Accounts {
30 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31 Self {
32 repo: AccountRepo::new(pool),
33 outbox,
34 pool: pool.clone(),
35 }
36 }
37
38 pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
39 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
40 let account = self.create_in_op(&mut op, new_account).await?;
41 op.commit().await?;
42 Ok(account)
43 }
44
45 #[instrument(name = "cala_ledger.accounts.create", skip(self, db))]
46 pub async fn create_in_op(
47 &self,
48 db: &mut LedgerOperation<'_>,
49 new_account: NewAccount,
50 ) -> Result<Account, AccountError> {
51 let account = self.repo.create_in_op(db, new_account).await?;
52 db.accumulate(account.last_persisted(1).map(|p| &p.event));
53 Ok(account)
54 }
55
56 pub async fn create_all(
57 &self,
58 new_accounts: Vec<NewAccount>,
59 ) -> Result<Vec<Account>, AccountError> {
60 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
61 let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
62 op.commit().await?;
63 Ok(accounts)
64 }
65
66 #[instrument(name = "cala_ledger.accounts.create_all", skip(self, db))]
67 pub async fn create_all_in_op(
68 &self,
69 db: &mut LedgerOperation<'_>,
70 new_accounts: Vec<NewAccount>,
71 ) -> Result<Vec<Account>, AccountError> {
72 let accounts = self.repo.create_all_in_op(db, new_accounts).await?;
73 db.accumulate(
74 accounts
75 .iter()
76 .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
77 );
78 Ok(accounts)
79 }
80
81 pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
82 self.repo.find_by_id(account_id).await
83 }
84
85 #[instrument(name = "cala_ledger.accounts.find_all", skip(self), err)]
86 pub async fn find_all<T: From<Account>>(
87 &self,
88 account_ids: &[AccountId],
89 ) -> Result<HashMap<AccountId, T>, AccountError> {
90 self.repo.find_all(account_ids).await
91 }
92
93 #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db), err)]
94 pub async fn find_all_in_op<T: From<Account>>(
95 &self,
96 db: &mut LedgerOperation<'_>,
97 account_ids: &[AccountId],
98 ) -> Result<HashMap<AccountId, T>, AccountError> {
99 self.repo.find_all_in_op(db, account_ids).await
100 }
101
102 #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self), err)]
103 pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
104 self.repo.find_by_external_id(Some(external_id)).await
105 }
106
107 #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self), err)]
108 pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
109 self.repo.find_by_code(code).await
110 }
111
112 #[instrument(name = "cala_ledger.accounts.list", skip(self))]
113 pub async fn list(
114 &self,
115 query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
116 ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
117 self.repo.list_by_name(query, Default::default()).await
118 }
119
120 #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
121 pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
122 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
123 self.persist_in_op(&mut op, account).await?;
124 op.commit().await?;
125 Ok(())
126 }
127
128 pub async fn persist_in_op(
129 &self,
130 db: &mut LedgerOperation<'_>,
131 account: &mut Account,
132 ) -> Result<(), AccountError> {
133 if account.is_account_set() {
134 return Err(AccountError::CannotUpdateAccountSetAccounts);
135 }
136
137 let n_events = self.repo.update_in_op(db, account).await?;
138 db.accumulate(account.last_persisted(n_events).map(|p| &p.event));
139 Ok(())
140 }
141
142 pub(crate) async fn update_velocity_context_values_in_op(
143 &self,
144 db: &mut LedgerOperation<'_>,
145 values: impl Into<VelocityContextAccountValues>,
146 ) -> Result<(), AccountError> {
147 self.repo
148 .update_velocity_context_values_in_op(db, values.into())
149 .await
150 }
151
152 #[cfg(feature = "import")]
153 pub async fn sync_account_creation(
154 &self,
155 mut db: es_entity::DbOpWithTime<'_>,
156 origin: DataSourceId,
157 values: AccountValues,
158 ) -> Result<(), AccountError> {
159 let mut account = Account::import(origin, values);
160 self.repo
161 .import_in_op(&mut db, origin, &mut account)
162 .await?;
163 let outbox_events: Vec<_> = account
164 .last_persisted(1)
165 .map(|p| OutboxEventPayload::from(&p.event))
166 .collect();
167 let now = db.now();
168 self.outbox
169 .persist_events_at(db, outbox_events, now)
170 .await?;
171 Ok(())
172 }
173
174 #[cfg(feature = "import")]
175 pub async fn sync_account_update(
176 &self,
177 mut db: es_entity::DbOpWithTime<'_>,
178 values: AccountValues,
179 fields: Vec<String>,
180 ) -> Result<(), AccountError> {
181 let mut account = self.repo.find_by_id(values.id).await?;
182 account.update((values, fields));
183 let n_events = self.repo.update_in_op(&mut db, &mut account).await?;
184 let outbox_events: Vec<_> = account
185 .last_persisted(n_events)
186 .map(|p| OutboxEventPayload::from(&p.event))
187 .collect();
188 let time = db.now();
189 self.outbox
190 .persist_events_at(db, outbox_events, time)
191 .await?;
192 Ok(())
193 }
194}
195
196impl From<&AccountEvent> for OutboxEventPayload {
197 fn from(event: &AccountEvent) -> Self {
198 match event {
199 #[cfg(feature = "import")]
200 AccountEvent::Imported {
201 source,
202 values: account,
203 } => OutboxEventPayload::AccountCreated {
204 source: *source,
205 account: account.clone(),
206 },
207 AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
208 source: DataSource::Local,
209 account: account.clone(),
210 },
211 AccountEvent::Updated {
212 values: account,
213 fields,
214 } => OutboxEventPayload::AccountUpdated {
215 source: DataSource::Local,
216 account: account.clone(),
217 fields: fields.clone(),
218 },
219 }
220 }
221}