Skip to main content

cala_ledger/account/
repo.rs

1use es_entity::*;
2use sqlx::PgPool;
3use tracing::instrument;
4
5use crate::{
6    outbox::OutboxPublisher,
7    primitives::{AccountId, DebitOrCredit, Status},
8};
9
10use super::{entity::*, error::AccountError};
11
12#[derive(EsRepo, Debug, Clone)]
13#[es_repo(
14    entity = "Account",
15    err = "AccountError",
16    columns(
17        name(ty = "String", update(accessor = "values().name"), list_by),
18        code(ty = "String", update(accessor = "values().code"), list_by),
19        external_id(
20            ty = "Option<String>",
21            update(accessor = "values().external_id"),
22            list_by
23        ),
24        normal_balance_type(
25            ty = "DebitOrCredit",
26            update(accessor = "values().normal_balance_type")
27        ),
28        status(ty = "Status", update(accessor = "values().status")),
29        eventually_consistent(ty = "bool", update(persist = false)),
30        velocity_context_values(
31            ty = "VelocityContextAccountValues",
32            create(accessor = "context_values()"),
33            update(accessor = "context_values()")
34        ),
35    ),
36    tbl_prefix = "cala",
37    post_persist_hook = "publish",
38    persist_event_context = false
39)]
40pub(super) struct AccountRepo {
41    pool: PgPool,
42    publisher: OutboxPublisher,
43}
44
45impl AccountRepo {
46    pub fn new(pool: &PgPool, publisher: &OutboxPublisher) -> Self {
47        Self {
48            pool: pool.clone(),
49            publisher: publisher.clone(),
50        }
51    }
52
53    #[instrument(
54        name = "account.update_velocity_context_values_in_op",
55        skip_all,
56        err(level = "warn")
57    )]
58    pub async fn update_velocity_context_values_in_op(
59        &self,
60        op: &mut impl es_entity::AtomicOperation,
61        latest_values: VelocityContextAccountValues,
62    ) -> Result<(), AccountError> {
63        let account_id = latest_values.id;
64
65        sqlx::query!(
66            r#"UPDATE cala_accounts
67            SET velocity_context_values = $2
68            WHERE id = $1"#,
69            account_id as AccountId,
70            latest_values as VelocityContextAccountValues
71        )
72        .execute(op.as_executor())
73        .await?;
74        Ok(())
75    }
76
77    async fn publish(
78        &self,
79        op: &mut impl es_entity::AtomicOperation,
80        entity: &Account,
81        new_events: es_entity::LastPersisted<'_, AccountEvent>,
82    ) -> Result<(), AccountError> {
83        self.publisher
84            .publish_entity_events(op, entity, new_events)
85            .await?;
86        Ok(())
87    }
88}