cala-ledger 0.15.1

An embeddable double sided accounting ledger built on PG/SQLx
Documentation
pub mod config;
pub mod error;

use es_entity::clock::ClockHandle;
use sqlx::PgPool;
pub use tracing::instrument;
use tracing::Instrument;

pub use config::*;
use error::*;

use crate::{
    account::Accounts,
    account_set::AccountSets,
    balance::Balances,
    entry::Entries,
    journal::Journals,
    outbox::OutboxPublisher,
    primitives::TransactionId,
    transaction::{Transaction, Transactions},
    tx_template::{Params, TxTemplates},
    velocity::Velocities,
};

#[derive(Clone)]
pub struct CalaLedger {
    pool: PgPool,
    clock: ClockHandle,
    accounts: Accounts,
    account_sets: AccountSets,
    journals: Journals,
    transactions: Transactions,
    tx_templates: TxTemplates,
    entries: Entries,
    velocities: Velocities,
    balances: Balances,
    publisher: OutboxPublisher,
}

impl CalaLedger {
    #[instrument(name = "cala_ledger.init", skip_all)]
    pub async fn init(config: CalaLedgerConfig) -> Result<Self, LedgerError> {
        let pool = match (config.pool, config.pg_con) {
            (Some(pool), None) => pool,
            (None, Some(pg_con)) => {
                let mut pool_opts = sqlx::postgres::PgPoolOptions::new();
                if let Some(max_connections) = config.max_connections {
                    pool_opts = pool_opts.max_connections(max_connections);
                }
                pool_opts.connect(&pg_con).await?
            }
            _ => {
                return Err(LedgerError::ConfigError(
                    "One of pg_con or pool must be set".to_string(),
                ))
            }
        };
        if config.exec_migrations {
            sqlx::migrate!()
                .run(&pool)
                .instrument(tracing::info_span!("cala_ledger.migrations"))
                .await?;
        }

        let clock = config.clock;
        let publisher = OutboxPublisher::init(&pool, &clock).await?;
        let accounts = Accounts::new(&pool, &publisher, &clock);
        let journals = Journals::new(&pool, &publisher, &clock);
        let tx_templates = TxTemplates::new(&pool, &publisher, &clock);
        let transactions = Transactions::new(&pool, &publisher);
        let entries = Entries::new(&pool, &publisher);
        let balances = Balances::new(&pool, &publisher, &journals);
        let velocities = Velocities::new(&pool, &clock);
        let account_sets = AccountSets::new(&pool, &publisher, &accounts, &balances, &clock);
        Ok(Self {
            accounts,
            account_sets,
            journals,
            tx_templates,
            publisher,
            transactions,
            entries,
            balances,
            velocities,
            pool,
            clock,
        })
    }

    pub fn pool(&self) -> &PgPool {
        &self.pool
    }

    pub fn clock(&self) -> &ClockHandle {
        &self.clock
    }

    pub async fn begin_operation(&self) -> Result<es_entity::DbOpWithTime<'static>, LedgerError> {
        let db_op = es_entity::DbOp::init_with_clock(&self.pool, &self.clock)
            .await?
            .with_clock_time();
        Ok(db_op)
    }

    pub fn accounts(&self) -> &Accounts {
        &self.accounts
    }

    pub fn velocities(&self) -> &Velocities {
        &self.velocities
    }

    pub fn account_sets(&self) -> &AccountSets {
        &self.account_sets
    }

    pub fn journals(&self) -> &Journals {
        &self.journals
    }

    pub fn tx_templates(&self) -> &TxTemplates {
        &self.tx_templates
    }

    pub fn balances(&self) -> &Balances {
        &self.balances
    }

    pub fn entries(&self) -> &Entries {
        &self.entries
    }

    pub fn transactions(&self) -> &Transactions {
        &self.transactions
    }

    #[instrument(
        name = "cala_ledger.post_transaction",
        skip(self, params),
        fields(tx_template_code)
    )]
    pub async fn post_transaction(
        &self,
        tx_id: TransactionId,
        tx_template_code: &str,
        params: impl Into<Params> + std::fmt::Debug,
    ) -> Result<Transaction, LedgerError> {
        let mut db = es_entity::DbOp::init_with_clock(&self.pool, &self.clock).await?;
        let transaction = self
            .post_transaction_in_op(&mut db, tx_id, tx_template_code, params)
            .await?;
        db.commit().await?;
        Ok(transaction)
    }

    #[instrument(
        name = "cala_ledger.post_transaction_in_op",
        skip(self, db)
        fields(transaction_id, external_id)
    )]
    pub async fn post_transaction_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperation,
        tx_id: TransactionId,
        tx_template_code: &str,
        params: impl Into<Params> + std::fmt::Debug,
    ) -> Result<Transaction, LedgerError> {
        let mut db = es_entity::OpWithTime::cached_or_db_time(db).await?;
        let time = db.now();
        let prepared_tx = self
            .tx_templates
            .prepare_transaction_in_op(&mut db, time, tx_id, tx_template_code, params.into())
            .await?;

        let transaction = self
            .transactions
            .create_in_op(&mut db, prepared_tx.transaction)
            .await?;

        let span = tracing::Span::current();
        span.record("transaction_id", transaction.id().to_string());
        span.record("external_id", &transaction.values().external_id);

        let entries = self
            .entries
            .create_all_in_op(&mut db, prepared_tx.entries)
            .await?;

        let account_ids = entries
            .iter()
            .map(|entry| entry.account_id)
            .collect::<Vec<_>>();
        let mappings = self
            .account_sets
            .fetch_mappings_in_op(&mut db, transaction.values().journal_id, &account_ids)
            .await?;

        self.velocities
            .update_balances_with_limit_enforcement_in_op(
                &mut db,
                transaction.created_at(),
                transaction.values(),
                &entries,
                &account_ids,
                &mappings,
            )
            .await?;

        self.balances
            .update_balances_in_op(
                &mut db,
                transaction.journal_id(),
                entries,
                transaction.effective(),
                transaction.created_at(),
                mappings,
            )
            .await?;
        Ok(transaction)
    }

    #[instrument(name = "cala_ledger.void_transaction", skip(self))]
    pub async fn void_transaction(
        &self,
        voiding_tx_id: TransactionId,
        existing_tx_id: TransactionId,
    ) -> Result<Transaction, LedgerError> {
        let mut db = self.begin_operation().await?;
        let transaction = self
            .void_transaction_in_op(&mut db, voiding_tx_id, existing_tx_id)
            .await?;
        db.commit().await?;
        Ok(transaction)
    }

    #[instrument(
        name = "cala_ledger.transaction_void",
        skip(self, db)
        fields(transaction_id, external_id)
    )]
    pub async fn void_transaction_in_op(
        &self,
        db: &mut impl es_entity::AtomicOperationWithTime,
        voiding_tx_id: TransactionId,
        existing_tx_id: TransactionId,
    ) -> Result<Transaction, LedgerError> {
        let new_entries = self
            .entries
            .new_entries_for_voided_tx(voiding_tx_id, existing_tx_id)
            .await?;

        let transaction = self
            .transactions()
            .create_voided_tx_in_op(
                db,
                voiding_tx_id,
                existing_tx_id,
                new_entries.iter().map(|entry| entry.id),
            )
            .await?;

        let span = tracing::Span::current();
        span.record("transaction_id", transaction.id().to_string());
        span.record("external_id", &transaction.values().external_id);

        let entries = self.entries.create_all_in_op(db, new_entries).await?;

        let account_ids = entries
            .iter()
            .map(|entry| entry.account_id)
            .collect::<Vec<_>>();
        let mappings = self
            .account_sets
            .fetch_mappings_in_op(db, transaction.values().journal_id, &account_ids)
            .await?;

        self.velocities
            .update_balances_with_limit_enforcement_in_op(
                db,
                transaction.created_at(),
                transaction.values(),
                &entries,
                &account_ids,
                &mappings,
            )
            .await?;

        self.balances
            .update_balances_in_op(
                db,
                transaction.journal_id(),
                entries,
                transaction.effective(),
                transaction.created_at(),
                mappings,
            )
            .await?;
        Ok(transaction)
    }

    pub fn outbox(&self) -> &crate::outbox::ObixOutbox {
        self.publisher.inner()
    }

    pub fn register_outbox_listener(
        &self,
        start_after: Option<obix::EventSequence>,
    ) -> obix::out::PersistentOutboxListener<crate::outbox::OutboxEventPayload> {
        self.publisher.inner().listen_persisted(start_after)
    }
}