use crate::error::{Error, Result};
use sqlx::PgPool;
use super::ConsumerAck;
#[derive(Debug)]
pub struct OffsetCommitter {
consumer_name: String,
pool: PgPool,
}
impl OffsetCommitter {
pub fn new(consumer_name: String, pool: PgPool) -> Self {
Self {
consumer_name,
pool,
}
}
}
#[async_trait]
impl ConsumerAck for OffsetCommitter {
#[tracing::instrument]
async fn save_offset(&self, offset: u64) -> Result<()> {
sqlx::query_file!(
"queries/consumer/checkpoint/checkout_unchecked.sql",
&self.consumer_name,
offset as i64
)
.execute(&self.pool)
.await
.map(|_| {})
.map_err(Error::from)
}
#[tracing::instrument]
async fn try_save_offset(&self, offset: u64) -> Result<()> {
sqlx::query_file!(
"queries/consumer/checkpoint/checkout.sql",
&self.consumer_name,
offset as i64
)
.execute(&self.pool)
.await
.map(|_| {})
.map_err(Error::from)
}
}