use sqlx::{PgPool, Row};
use crate::{EventStore, EventStoreError, RecordedEvent};
pub struct Subscription {
id: String,
store: EventStore,
db: PgPool,
last_transaction_id: u64,
last_position: i64,
batch_size: i64,
}
impl Subscription {
pub async fn create(
store: EventStore,
db: PgPool,
id: impl Into<String>,
batch_size: i64,
) -> Result<Self, EventStoreError> {
let id = id.into();
sqlx::query(
"INSERT INTO es_subscriptions (subscription_id, last_position)
VALUES ($1, 0)
ON CONFLICT (subscription_id) DO NOTHING",
)
.bind(&id)
.execute(&db)
.await?;
let row = sqlx::query(
"SELECT last_position, last_transaction_id::text AS last_transaction_id
FROM es_subscriptions WHERE subscription_id = $1",
)
.bind(&id)
.fetch_one(&db)
.await?;
let last_position: i64 = row.get("last_position");
let last_transaction_id: String = row.get("last_transaction_id");
Ok(Self {
id,
store,
db,
last_transaction_id: last_transaction_id.parse().unwrap_or(0),
last_position,
batch_size,
})
}
pub async fn poll(&mut self) -> Result<Vec<RecordedEvent>, EventStoreError> {
let events = self
.store
.read_all_after(
self.last_transaction_id,
self.last_position,
self.batch_size,
)
.await?;
self.advance(&events);
Ok(events)
}
pub async fn poll_category(
&mut self,
category: &str,
) -> Result<Vec<RecordedEvent>, EventStoreError> {
let events = self
.store
.read_category_after(
category,
self.last_transaction_id,
self.last_position,
self.batch_size,
)
.await?;
self.advance(&events);
Ok(events)
}
fn advance(&mut self, events: &[RecordedEvent]) {
if let Some(last) = events.last() {
self.last_transaction_id = last.transaction_id;
self.last_position = last.global_position;
}
}
pub async fn checkpoint(&self) -> Result<(), EventStoreError> {
sqlx::query(
"UPDATE es_subscriptions
SET last_position = $1, last_transaction_id = $2::text::xid8, updated_at = now()
WHERE subscription_id = $3",
)
.bind(self.last_position)
.bind(self.last_transaction_id.to_string())
.bind(&self.id)
.execute(&self.db)
.await?;
Ok(())
}
pub async fn reset(&mut self) -> Result<(), EventStoreError> {
self.last_position = 0;
self.last_transaction_id = 0;
sqlx::query(
"UPDATE es_subscriptions
SET last_position = 0, last_transaction_id = '0'::xid8, updated_at = now()
WHERE subscription_id = $1",
)
.bind(&self.id)
.execute(&self.db)
.await?;
Ok(())
}
pub fn position(&self) -> i64 {
self.last_position
}
}