cala-ledger 0.15.8

An embeddable double sided accounting ledger built on PG/SQLx
Documentation
mod account_control;
mod balance;
mod context;
mod control;
pub mod error;
mod limit;

use chrono::{DateTime, Utc};
use es_entity::clock::ClockHandle;
use sqlx::PgPool;
use std::collections::HashMap;
use tracing::instrument;

use cala_types::{entry::EntryValues, transaction::TransactionValues};

pub use crate::param::Params;

use account_control::*;
use balance::*;
pub use control::*;
use error::*;
pub use limit::*;

#[derive(Clone)]
pub struct Velocities {
    limits: VelocityLimitRepo,
    controls: VelocityControlRepo,
    account_controls: AccountControls,
    balances: VelocityBalances,
    clock: ClockHandle,
}

impl Velocities {
    pub(crate) fn new(pool: &PgPool, clock: &ClockHandle) -> Self {
        Self {
            limits: VelocityLimitRepo::new(pool),
            controls: VelocityControlRepo::new(pool),
            account_controls: AccountControls::new(pool, clock),
            balances: VelocityBalances::new(pool, clock),
            clock: clock.clone(),
        }
    }

    #[instrument(name = "velocity.create_limit", skip_all)]
    pub async fn create_limit(
        &self,
        new_limit: NewVelocityLimit,
    ) -> Result<VelocityLimit, VelocityError> {
        let mut db = self.limits.begin_op_with_clock(&self.clock).await?;
        let limit = self.create_limit_in_op(&mut db, new_limit).await?;
        db.commit().await?;
        Ok(limit)
    }

    #[instrument(name = "velocity.create_limit_in_op", skip_all)]
    pub async fn create_limit_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        new_limit: NewVelocityLimit,
    ) -> Result<VelocityLimit, VelocityError> {
        let res = self.limits.create_in_op(db, new_limit).await?;
        Ok(res)
    }

    #[instrument(name = "velocity.create_control", skip_all)]
    pub async fn create_control(
        &self,
        new_control: NewVelocityControl,
    ) -> Result<VelocityControl, VelocityError> {
        let mut db = self.controls.begin_op_with_clock(&self.clock).await?;
        let control = self.create_control_in_op(&mut db, new_control).await?;
        db.commit().await?;
        Ok(control)
    }

    #[instrument(name = "velocity.create_control_in_op", skip_all)]
    pub async fn create_control_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        new_control: NewVelocityControl,
    ) -> Result<VelocityControl, VelocityError> {
        let res = self.controls.create_in_op(db, new_control).await?;
        Ok(res)
    }

    #[instrument(name = "velocity.add_limit_to_control", skip(self), fields(control_id = %control, limit_id = %limit))]
    pub async fn add_limit_to_control(
        &self,
        control: VelocityControlId,
        limit: VelocityLimitId,
    ) -> Result<VelocityControl, VelocityError> {
        let mut db = self.controls.begin_op_with_clock(&self.clock).await?;
        let control = self
            .add_limit_to_control_in_op(&mut db, control, limit)
            .await?;
        db.commit().await?;
        Ok(control)
    }

    #[instrument(name = "velocity.add_limit_to_control_in_op", skip(self, db), fields(control_id = %control, limit_id = %limit))]
    pub async fn add_limit_to_control_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        control: VelocityControlId,
        limit: VelocityLimitId,
    ) -> Result<VelocityControl, VelocityError> {
        self.limits.add_limit_to_control(db, control, limit).await?;
        Ok(self.controls.find_by_id_in_op(db, control).await?)
    }

    #[instrument(name = "velocity.attach_control_to_account", skip(self), fields(control_id = %control, account_id = %account_id))]
    pub async fn attach_control_to_account(
        &self,
        control: VelocityControlId,
        account_id: AccountId,
        params: impl Into<Params> + std::fmt::Debug,
    ) -> Result<VelocityControl, VelocityError> {
        let mut op = self.controls.begin_op_with_clock(&self.clock).await?;
        let control = self
            .attach_control_to_account_or_account_set_in_op(&mut op, control, account_id, params)
            .await?;
        op.commit().await?;
        Ok(control)
    }

    #[instrument(name = "velocity.attach_control_to_account_set", skip(self), fields(control_id = %control, account_set_id = %account_set_id))]
    pub async fn attach_control_to_account_set(
        &self,
        control: VelocityControlId,
        account_set_id: AccountSetId,
        params: impl Into<Params> + std::fmt::Debug,
    ) -> Result<VelocityControl, VelocityError> {
        let mut op = self.controls.begin_op_with_clock(&self.clock).await?;
        let control = self
            .attach_control_to_account_or_account_set_in_op(
                &mut op,
                control,
                account_set_id,
                params,
            )
            .await?;
        op.commit().await?;
        Ok(control)
    }

    #[instrument(name = "velocity.attach_control_to_account_in_op", skip(self, db), fields(control_id = %control_id, account_id = %account_id))]
    pub async fn attach_control_to_account_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        control_id: VelocityControlId,
        account_id: AccountId,
        params: impl Into<Params> + std::fmt::Debug,
    ) -> Result<VelocityControl, VelocityError> {
        self.attach_control_to_account_or_account_set_in_op(db, control_id, account_id, params)
            .await
    }

    #[instrument(name = "velocity.attach_control_to_account_set_in_op", skip(self, db), fields(control_id = %control_id, account_set_id = %account_set_id))]
    pub async fn attach_control_to_account_set_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        control_id: VelocityControlId,
        account_set_id: AccountSetId,
        params: impl Into<Params> + std::fmt::Debug,
    ) -> Result<VelocityControl, VelocityError> {
        self.attach_control_to_account_or_account_set_in_op(db, control_id, account_set_id, params)
            .await
    }

    #[instrument(name = "velocity.attach_control_internal", skip(self, db, account_id), fields(control_id = %control_id, account_id = tracing::field::Empty))]
    async fn attach_control_to_account_or_account_set_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        control_id: VelocityControlId,
        account_id: impl Into<AccountId>,
        params: impl Into<Params> + std::fmt::Debug,
    ) -> Result<VelocityControl, VelocityError> {
        let account_id = account_id.into();
        tracing::Span::current().record("account_id", account_id.to_string());

        let control = self.controls.find_by_id_in_op(&mut *db, control_id).await?;
        let limits = self
            .limits
            .list_for_control(&mut *db, control_id)
            .await?
            .into_iter()
            .map(|l| l.into_values())
            .collect();

        self.account_controls
            .attach_control_in_op(db, control.values(), account_id, limits, params)
            .await?;
        Ok(control)
    }

    #[instrument(name = "velocity.update_balances_with_limit_enforcement_in_op", skip(self, db, transaction, entries, account_set_mappings), fields(account_ids_count = account_ids.len(), entries_count = entries.len()), err(level = tracing::Level::WARN))]
    pub(crate) async fn update_balances_with_limit_enforcement_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        created_at: DateTime<Utc>,
        transaction: &TransactionValues,
        entries: &[EntryValues],
        account_ids: &[AccountId],
        account_set_mappings: &HashMap<AccountId, Vec<AccountSetId>>,
    ) -> Result<(), VelocityError> {
        let mut all_account_ids = account_ids.to_vec();
        all_account_ids.extend(
            account_ids
                .iter()
                .filter_map(|id| account_set_mappings.get(id))
                .flat_map(|ids| ids.iter().map(AccountId::from)),
        );

        let controls = self
            .account_controls
            .find_for_enforcement(db, &all_account_ids)
            .await?;

        self.balances
            .update_balances_with_limit_enforcement_in_op(
                db,
                created_at,
                transaction,
                entries,
                controls,
                account_set_mappings,
            )
            .await
    }

    #[instrument(name = "velocity.list_limits_for_control", skip(self), fields(control_id = %control_id))]
    pub async fn list_limits_for_control(
        &self,
        control_id: VelocityControlId,
    ) -> Result<Vec<VelocityLimit>, VelocityError> {
        let mut op = self.limits.begin_op_with_clock(&self.clock).await?;
        let limits = self
            .list_limits_for_control_in_op(&mut op, control_id)
            .await?;
        op.commit().await?;
        Ok(limits)
    }

    #[instrument(name = "velocity.list_limits_for_control_in_op", skip(self, op), fields(control_id = %control_id), err(level = tracing::Level::WARN))]
    pub async fn list_limits_for_control_in_op(
        &self,
        op: &mut impl es_entity::AtomicOperation,
        control_id: VelocityControlId,
    ) -> Result<Vec<VelocityLimit>, VelocityError> {
        self.limits.list_for_control(op, control_id).await
    }

    #[instrument(name = "velocity.find_all_limits", skip(self), fields(count = limit_ids.len()), err(level = tracing::Level::WARN))]
    pub async fn find_all_limits<T: From<VelocityLimit>>(
        &self,
        limit_ids: &[VelocityLimitId],
    ) -> Result<HashMap<VelocityLimitId, T>, VelocityError> {
        Ok(self.limits.find_all(limit_ids).await?)
    }

    #[instrument(name = "velocity.find_all_controls", skip(self), fields(count = control_ids.len()))]
    pub async fn find_all_controls<T: From<VelocityControl>>(
        &self,
        control_ids: &[VelocityControlId],
    ) -> Result<HashMap<VelocityControlId, T>, VelocityError> {
        Ok(self.controls.find_all(control_ids).await?)
    }
}