use std::time::Duration;
use chrono::{DateTime, Utc};
use sqlx::{PgPool, Postgres, Row, postgres::PgConnection};
use crate::EventStoreError;
#[derive(Debug)]
pub enum AcquireOutcome {
Acquired { fence_token: i64 },
Held,
}
#[derive(Debug, Clone)]
pub struct LeaseStatus {
pub worker_id: String,
pub fence_token: i64,
pub leased_until: DateTime<Utc>,
pub heartbeat_at: DateTime<Utc>,
}
pub async fn try_acquire(
pool: &PgPool,
subscription_id: &str,
worker_id: &str,
ttl: Duration,
) -> Result<AcquireOutcome, EventStoreError> {
let ttl_secs = ttl.as_secs_f64();
let mut tx = pool.begin().await?;
sqlx::query(
"INSERT INTO es_subscriptions (subscription_id, last_position)
VALUES ($1, 0)
ON CONFLICT (subscription_id) DO NOTHING",
)
.bind(subscription_id)
.execute(&mut *tx)
.await?;
let row = sqlx::query(
"INSERT INTO es_projection_leases
(subscription_id, worker_id, fence_token, leased_until, heartbeat_at)
VALUES ($1, $2, 1, now() + ($3 || ' seconds')::interval, now())
ON CONFLICT (subscription_id) DO UPDATE
SET worker_id = EXCLUDED.worker_id,
fence_token = CASE
WHEN es_projection_leases.worker_id = EXCLUDED.worker_id
THEN es_projection_leases.fence_token
ELSE es_projection_leases.fence_token + 1
END,
leased_until = EXCLUDED.leased_until,
heartbeat_at = now()
WHERE es_projection_leases.leased_until < now()
OR es_projection_leases.worker_id = EXCLUDED.worker_id
RETURNING fence_token",
)
.bind(subscription_id)
.bind(worker_id)
.bind(ttl_secs.to_string())
.fetch_optional(&mut *tx)
.await?;
let Some(row) = row else {
tx.rollback().await?;
return Ok(AcquireOutcome::Held);
};
let fence_token: i64 = row.get("fence_token");
sqlx::query(
"UPDATE es_subscriptions
SET fence_token = $1
WHERE subscription_id = $2
AND fence_token < $1",
)
.bind(fence_token)
.bind(subscription_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(AcquireOutcome::Acquired { fence_token })
}
pub async fn heartbeat(
conn: &mut PgConnection,
subscription_id: &str,
worker_id: &str,
fence_token: i64,
ttl: Duration,
) -> Result<bool, EventStoreError> {
let ttl_secs = ttl.as_secs_f64();
let affected = sqlx::query(
"UPDATE es_projection_leases
SET leased_until = now() + ($1 || ' seconds')::interval,
heartbeat_at = now()
WHERE subscription_id = $2
AND worker_id = $3
AND fence_token = $4",
)
.bind(ttl_secs.to_string())
.bind(subscription_id)
.bind(worker_id)
.bind(fence_token)
.execute(conn)
.await?
.rows_affected();
Ok(affected == 1)
}
pub async fn release<'e, E>(
executor: E,
subscription_id: &str,
worker_id: &str,
fence_token: i64,
) -> Result<bool, EventStoreError>
where
E: sqlx::Executor<'e, Database = Postgres>,
{
let affected = sqlx::query(
"UPDATE es_projection_leases
SET leased_until = '-infinity'
WHERE subscription_id = $1
AND worker_id = $2
AND fence_token = $3",
)
.bind(subscription_id)
.bind(worker_id)
.bind(fence_token)
.execute(executor)
.await?
.rows_affected();
Ok(affected == 1)
}
pub async fn checkpoint(
pool: &PgPool,
subscription_id: &str,
fence_token: i64,
last_position: i64,
last_transaction_id: u64,
) -> Result<bool, EventStoreError> {
let affected = sqlx::query(
"UPDATE es_subscriptions
SET last_position = $1,
last_transaction_id = $2::text::xid8,
updated_at = now()
WHERE subscription_id = $3
AND fence_token = $4",
)
.bind(last_position)
.bind(last_transaction_id.to_string())
.bind(subscription_id)
.bind(fence_token)
.execute(pool)
.await?
.rows_affected();
Ok(affected == 1)
}
pub async fn status(
pool: &PgPool,
subscription_id: &str,
) -> Result<Option<LeaseStatus>, EventStoreError> {
let row = sqlx::query(
"SELECT worker_id, fence_token, leased_until, heartbeat_at
FROM es_projection_leases
WHERE subscription_id = $1",
)
.bind(subscription_id)
.fetch_optional(pool)
.await?;
Ok(row.map(|r| LeaseStatus {
worker_id: r.get("worker_id"),
fence_token: r.get("fence_token"),
leased_until: r.get("leased_until"),
heartbeat_at: r.get("heartbeat_at"),
}))
}