1mod entity;
3pub mod error;
4mod repo;
5
6use es_entity::EsEntity;
7use sqlx::PgPool;
8use tracing::instrument;
9
10use std::collections::HashMap;
11
12#[cfg(feature = "import")]
13use crate::primitives::DataSourceId;
14use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
15
16pub use entity::*;
17use error::*;
18pub use repo::account_cursor::*;
19use repo::*;
20
21#[derive(Clone)]
23pub struct Accounts {
24 repo: AccountRepo,
25 outbox: Outbox,
26 pool: PgPool,
27}
28
29impl Accounts {
30 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
31 Self {
32 repo: AccountRepo::new(pool),
33 outbox,
34 pool: pool.clone(),
35 }
36 }
37
38 pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
39 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
40 let account = self.create_in_op(&mut op, new_account).await?;
41 op.commit().await?;
42 Ok(account)
43 }
44
45 #[instrument(name = "cala_ledger.accounts.create", skip(self, db))]
46 pub async fn create_in_op(
47 &self,
48 db: &mut LedgerOperation<'_>,
49 new_account: NewAccount,
50 ) -> Result<Account, AccountError> {
51 let account = self.repo.create_in_op(db.op(), new_account).await?;
52 db.accumulate(account.last_persisted(1).map(|p| &p.event));
53 Ok(account)
54 }
55
56 pub async fn create_all(
57 &self,
58 new_accounts: Vec<NewAccount>,
59 ) -> Result<Vec<Account>, AccountError> {
60 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
61 let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
62 op.commit().await?;
63 Ok(accounts)
64 }
65
66 #[instrument(name = "cala_ledger.accounts.create_all", skip(self, db))]
67 pub async fn create_all_in_op(
68 &self,
69 db: &mut LedgerOperation<'_>,
70 new_accounts: Vec<NewAccount>,
71 ) -> Result<Vec<Account>, AccountError> {
72 let accounts = self.repo.create_all_in_op(db.op(), new_accounts).await?;
73 db.accumulate(
74 accounts
75 .iter()
76 .flat_map(|account| account.last_persisted(1).map(|p| &p.event)),
77 );
78 Ok(accounts)
79 }
80
81 pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
82 self.repo.find_by_id(account_id).await
83 }
84
85 #[instrument(name = "cala_ledger.accounts.find_all", skip(self), err)]
86 pub async fn find_all<T: From<Account>>(
87 &self,
88 account_ids: &[AccountId],
89 ) -> Result<HashMap<AccountId, T>, AccountError> {
90 self.repo.find_all(account_ids).await
91 }
92
93 #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db), err)]
94 pub async fn find_all_in_op<T: From<Account>>(
95 &self,
96 db: &mut LedgerOperation<'_>,
97 account_ids: &[AccountId],
98 ) -> Result<HashMap<AccountId, T>, AccountError> {
99 self.repo.find_all_in_tx(db.tx(), account_ids).await
100 }
101
102 #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self), err)]
103 pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
104 self.repo.find_by_external_id(Some(external_id)).await
105 }
106
107 #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self), err)]
108 pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
109 self.repo.find_by_code(code).await
110 }
111
112 #[instrument(name = "cala_ledger.accounts.list", skip(self))]
113 pub async fn list(
114 &self,
115 query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
116 ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
117 self.repo.list_by_name(query, Default::default()).await
118 }
119
120 #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
121 pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
122 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
123 self.persist_in_op(&mut op, account).await?;
124 op.commit().await?;
125 Ok(())
126 }
127
128 pub async fn persist_in_op(
129 &self,
130 db: &mut LedgerOperation<'_>,
131 account: &mut Account,
132 ) -> Result<(), AccountError> {
133 let n_events = self.repo.update_in_op(db.op(), account).await?;
134 db.accumulate(account.last_persisted(n_events).map(|p| &p.event));
135 Ok(())
136 }
137
138 #[cfg(feature = "import")]
139 pub async fn sync_account_creation(
140 &self,
141 mut db: es_entity::DbOp<'_>,
142 origin: DataSourceId,
143 values: AccountValues,
144 ) -> Result<(), AccountError> {
145 let mut account = Account::import(origin, values);
146 self.repo
147 .import_in_op(&mut db, origin, &mut account)
148 .await?;
149 let recorded_at = db.now();
150 let outbox_events: Vec<_> = account
151 .last_persisted(1)
152 .map(|p| OutboxEventPayload::from(&p.event))
153 .collect();
154 self.outbox
155 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
156 .await?;
157 Ok(())
158 }
159
160 #[cfg(feature = "import")]
161 pub async fn sync_account_update(
162 &self,
163 mut db: es_entity::DbOp<'_>,
164 values: AccountValues,
165 fields: Vec<String>,
166 ) -> Result<(), AccountError> {
167 let mut account = self.repo.find_by_id(values.id).await?;
168 account.update((values, fields));
169 let n_events = self.repo.update_in_op(&mut db, &mut account).await?;
170 let recorded_at = db.now();
171 let outbox_events: Vec<_> = account
172 .last_persisted(n_events)
173 .map(|p| OutboxEventPayload::from(&p.event))
174 .collect();
175 self.outbox
176 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
177 .await?;
178 Ok(())
179 }
180}
181
182impl From<&AccountEvent> for OutboxEventPayload {
183 fn from(event: &AccountEvent) -> Self {
184 match event {
185 #[cfg(feature = "import")]
186 AccountEvent::Imported {
187 source,
188 values: account,
189 } => OutboxEventPayload::AccountCreated {
190 source: *source,
191 account: account.clone(),
192 },
193 AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
194 source: DataSource::Local,
195 account: account.clone(),
196 },
197 AccountEvent::Updated {
198 values: account,
199 fields,
200 } => OutboxEventPayload::AccountUpdated {
201 source: DataSource::Local,
202 account: account.clone(),
203 fields: fields.clone(),
204 },
205 }
206 }
207}