cala_ledger/account/
repo.rs1use es_entity::*;
2use sqlx::PgPool;
3use tracing::instrument;
4
5use crate::{
6 outbox::OutboxPublisher,
7 primitives::{AccountId, DebitOrCredit, Status},
8};
9
10use super::{entity::*, error::AccountError};
11
12#[derive(EsRepo, Debug, Clone)]
13#[es_repo(
14 entity = "Account",
15 err = "AccountError",
16 columns(
17 name(ty = "String", update(accessor = "values().name"), list_by),
18 code(ty = "String", update(accessor = "values().code"), list_by),
19 external_id(
20 ty = "Option<String>",
21 update(accessor = "values().external_id"),
22 list_by
23 ),
24 normal_balance_type(
25 ty = "DebitOrCredit",
26 update(accessor = "values().normal_balance_type")
27 ),
28 status(ty = "Status", update(accessor = "values().status")),
29 eventually_consistent(ty = "bool", update(persist = false)),
30 velocity_context_values(
31 ty = "VelocityContextAccountValues",
32 create(accessor = "context_values()"),
33 update(accessor = "context_values()")
34 ),
35 ),
36 tbl_prefix = "cala",
37 post_persist_hook = "publish",
38 persist_event_context = false
39)]
40pub(super) struct AccountRepo {
41 pool: PgPool,
42 publisher: OutboxPublisher,
43}
44
45impl AccountRepo {
46 pub fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
47 Self {
48 pool: pool.clone(),
49 publisher: publisher.clone(),
50 }
51 }
52
53 #[instrument(
54 name = "account.update_velocity_context_values_in_op",
55 skip_all,
56 err(level = "warn")
57 )]
58 pub async fn update_velocity_context_values_in_op(
59 &self,
60 op: &mut impl es_entity::AtomicOperation,
61 latest_values: VelocityContextAccountValues,
62 ) -> Result<(), AccountError> {
63 let account_id = latest_values.id;
64
65 sqlx::query!(
66 r#"UPDATE cala_accounts
67 SET velocity_context_values = $2
68 WHERE id = $1"#,
69 account_id as AccountId,
70 latest_values as VelocityContextAccountValues
71 )
72 .execute(op.as_executor())
73 .await?;
74 Ok(())
75 }
76
77 async fn publish(
78 &self,
79 op: &mut impl es_entity::AtomicOperation,
80 entity: &Account,
81 new_events: es_entity::LastPersisted<'_, AccountEvent>,
82 ) -> Result<(), AccountError> {
83 self.publisher
84 .publish_entity_events(op, entity, new_events)
85 .await?;
86 Ok(())
87 }
88}