1mod entity;
3pub mod error;
4mod repo;
5
6use sqlx::PgPool;
7use tracing::instrument;
8
9use std::collections::HashMap;
10
11#[cfg(feature = "import")]
12use crate::primitives::DataSourceId;
13use crate::{ledger_operation::*, outbox::*, primitives::DataSource};
14
15pub use entity::*;
16use error::*;
17pub use repo::account_cursor::*;
18use repo::*;
19
20#[derive(Clone)]
22pub struct Accounts {
23 repo: AccountRepo,
24 outbox: Outbox,
25 pool: PgPool,
26}
27
28impl Accounts {
29 pub(crate) fn new(pool: &PgPool, outbox: Outbox) -> Self {
30 Self {
31 repo: AccountRepo::new(pool),
32 outbox,
33 pool: pool.clone(),
34 }
35 }
36
37 pub async fn create(&self, new_account: NewAccount) -> Result<Account, AccountError> {
38 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
39 let account = self.create_in_op(&mut op, new_account).await?;
40 op.commit().await?;
41 Ok(account)
42 }
43
44 #[instrument(name = "cala_ledger.accounts.create", skip(self, db))]
45 pub async fn create_in_op(
46 &self,
47 db: &mut LedgerOperation<'_>,
48 new_account: NewAccount,
49 ) -> Result<Account, AccountError> {
50 let account = self.repo.create_in_op(db.op(), new_account).await?;
51 db.accumulate(account.events.last_persisted(1).map(|p| &p.event));
52 Ok(account)
53 }
54
55 pub async fn create_all(
56 &self,
57 new_accounts: Vec<NewAccount>,
58 ) -> Result<Vec<Account>, AccountError> {
59 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
60 let accounts = self.create_all_in_op(&mut op, new_accounts).await?;
61 op.commit().await?;
62 Ok(accounts)
63 }
64
65 #[instrument(name = "cala_ledger.accounts.create_all", skip(self, db))]
66 pub async fn create_all_in_op(
67 &self,
68 db: &mut LedgerOperation<'_>,
69 new_accounts: Vec<NewAccount>,
70 ) -> Result<Vec<Account>, AccountError> {
71 let accounts = self.repo.create_all_in_op(db.op(), new_accounts).await?;
72 db.accumulate(
73 accounts
74 .iter()
75 .flat_map(|account| account.events.last_persisted(1).map(|p| &p.event)),
76 );
77 Ok(accounts)
78 }
79
80 pub async fn find(&self, account_id: AccountId) -> Result<Account, AccountError> {
81 self.repo.find_by_id(account_id).await
82 }
83
84 #[instrument(name = "cala_ledger.accounts.find_all", skip(self), err)]
85 pub async fn find_all<T: From<Account>>(
86 &self,
87 account_ids: &[AccountId],
88 ) -> Result<HashMap<AccountId, T>, AccountError> {
89 self.repo.find_all(account_ids).await
90 }
91
92 #[instrument(name = "cala_ledger.accounts.find_all", skip(self, db), err)]
93 pub async fn find_all_in_op<T: From<Account>>(
94 &self,
95 db: &mut LedgerOperation<'_>,
96 account_ids: &[AccountId],
97 ) -> Result<HashMap<AccountId, T>, AccountError> {
98 self.repo.find_all_in_tx(db.tx(), account_ids).await
99 }
100
101 #[instrument(name = "cala_ledger.accounts.find_by_external_id", skip(self), err)]
102 pub async fn find_by_external_id(&self, external_id: String) -> Result<Account, AccountError> {
103 self.repo.find_by_external_id(Some(external_id)).await
104 }
105
106 #[instrument(name = "cala_ledger.accounts.find_by_code", skip(self), err)]
107 pub async fn find_by_code(&self, code: String) -> Result<Account, AccountError> {
108 self.repo.find_by_code(code).await
109 }
110
111 #[instrument(name = "cala_ledger.accounts.list", skip(self))]
112 pub async fn list(
113 &self,
114 query: es_entity::PaginatedQueryArgs<AccountsByNameCursor>,
115 ) -> Result<es_entity::PaginatedQueryRet<Account, AccountsByNameCursor>, AccountError> {
116 self.repo.list_by_name(query, Default::default()).await
117 }
118
119 #[instrument(name = "cala_ledger.accounts.persist", skip(self, account))]
120 pub async fn persist(&self, account: &mut Account) -> Result<(), AccountError> {
121 let mut op = LedgerOperation::init(&self.pool, &self.outbox).await?;
122 self.persist_in_op(&mut op, account).await?;
123 op.commit().await?;
124 Ok(())
125 }
126
127 pub async fn persist_in_op(
128 &self,
129 db: &mut LedgerOperation<'_>,
130 account: &mut Account,
131 ) -> Result<(), AccountError> {
132 let n_events = self.repo.update_in_op(db.op(), account).await?;
133 db.accumulate(account.events.last_persisted(n_events).map(|p| &p.event));
134 Ok(())
135 }
136
137 #[cfg(feature = "import")]
138 pub async fn sync_account_creation(
139 &self,
140 mut db: es_entity::DbOp<'_>,
141 origin: DataSourceId,
142 values: AccountValues,
143 ) -> Result<(), AccountError> {
144 let mut account = Account::import(origin, values);
145 self.repo
146 .import_in_op(&mut db, origin, &mut account)
147 .await?;
148 let recorded_at = db.now();
149 let outbox_events: Vec<_> = account
150 .events
151 .last_persisted(1)
152 .map(|p| OutboxEventPayload::from(&p.event))
153 .collect();
154 self.outbox
155 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
156 .await?;
157 Ok(())
158 }
159
160 #[cfg(feature = "import")]
161 pub async fn sync_account_update(
162 &self,
163 mut db: es_entity::DbOp<'_>,
164 values: AccountValues,
165 fields: Vec<String>,
166 ) -> Result<(), AccountError> {
167 let mut account = self.repo.find_by_id(values.id).await?;
168 account.update((values, fields));
169 let n_events = self.repo.update_in_op(&mut db, &mut account).await?;
170 let recorded_at = db.now();
171 let outbox_events: Vec<_> = account
172 .events
173 .last_persisted(n_events)
174 .map(|p| OutboxEventPayload::from(&p.event))
175 .collect();
176 self.outbox
177 .persist_events_at(db.into_tx(), outbox_events, recorded_at)
178 .await?;
179 Ok(())
180 }
181}
182
183impl From<&AccountEvent> for OutboxEventPayload {
184 fn from(event: &AccountEvent) -> Self {
185 match event {
186 #[cfg(feature = "import")]
187 AccountEvent::Imported {
188 source,
189 values: account,
190 } => OutboxEventPayload::AccountCreated {
191 source: *source,
192 account: account.clone(),
193 },
194 AccountEvent::Initialized { values: account } => OutboxEventPayload::AccountCreated {
195 source: DataSource::Local,
196 account: account.clone(),
197 },
198 AccountEvent::Updated {
199 values: account,
200 fields,
201 } => OutboxEventPayload::AccountUpdated {
202 source: DataSource::Local,
203 account: account.clone(),
204 fields: fields.clone(),
205 },
206 }
207 }
208}