timesource 0.1.3

Event sourcing with TimescaleDb
Documentation
use crate::error::{Error, Result};
use sqlx::PgPool;

use super::ConsumerAck;

#[derive(Debug)]
pub struct OffsetCommitter {
    consumer_name: String,
    pool: PgPool,
}

impl OffsetCommitter {
    pub fn new(consumer_name: String, pool: PgPool) -> Self {
        Self {
            consumer_name,
            pool,
        }
    }
}

#[async_trait]
impl ConsumerAck for OffsetCommitter {
    #[tracing::instrument]
    async fn save_offset(&self, offset: u64) -> Result<()> {
        sqlx::query_file!(
            "queries/consumer/checkpoint/checkout_unchecked.sql",
            &self.consumer_name,
            offset as i64
        )
        .execute(&self.pool)
        .await
        .map(|_| {})
        .map_err(Error::from)
    }

    #[tracing::instrument]
    async fn try_save_offset(&self, offset: u64) -> Result<()> {
        sqlx::query_file!(
            "queries/consumer/checkpoint/checkout.sql",
            &self.consumer_name,
            offset as i64
        )
        .execute(&self.pool)
        .await
        .map(|_| {})
        .map_err(Error::from)
    }
}