use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use sqlx::Row;
use sqlx::SqlitePool;
use tokio::sync::broadcast;
use uuid::Uuid;
use ff_core::backend::ScannerFilter;
use ff_core::engine_error::{EngineError, ValidationKind};
use ff_core::stream_events::{LeaseHistoryEvent, LeaseHistorySubscription};
use ff_core::stream_subscribe::{
StreamCursor, decode_postgres_event_cursor, encode_postgres_event_cursor,
};
use ff_core::types::{ExecutionId, LeaseId, TimestampMs};
use crate::completion_subscribe::passes_filter;
use crate::outbox_cursor::{self, OutboxCursorConfig, OutboxCursorStream, RowDecoder};
use crate::pubsub::OutboxEvent;
pub(crate) const SELECT_LEASE_EVENTS_SQL: &str = r#"
SELECT event_id, execution_id, lease_id, event_type, occurred_at_ms,
partition_key, namespace, instance_tag
FROM ff_lease_event
WHERE partition_key = ?1 AND event_id > ?2
ORDER BY event_id ASC
LIMIT ?3
"#;
const PARTITION_KEY: i64 = 0;
const REPLAY_BATCH: i64 = 256;
pub(crate) async fn subscribe(
pool: SqlitePool,
wakeup: broadcast::Receiver<OutboxEvent>,
cursor: StreamCursor,
filter: ScannerFilter,
) -> Result<LeaseHistorySubscription, EngineError> {
let start = decode_postgres_event_cursor(&cursor).map_err(|_| EngineError::Validation {
kind: ValidationKind::InvalidInput,
detail: "invalid event_id cursor for sqlite.subscribe_lease_history".into(),
})?;
let last_seen: i64 = match start {
Some(v) => v,
None => sqlx::query_scalar(
"SELECT COALESCE(MAX(event_id), 0) FROM ff_lease_event \
WHERE partition_key = ?1",
)
.bind(PARTITION_KEY)
.fetch_one(&pool)
.await
.map_err(|_| EngineError::Unavailable {
op: "sqlite.subscribe_lease_history",
})?,
};
let stream = outbox_cursor::spawn(OutboxCursorConfig {
pool,
select_sql: SELECT_LEASE_EVENTS_SQL,
partition_key: PARTITION_KEY,
cursor: last_seen,
batch_size: REPLAY_BATCH,
wakeup,
decoder: make_decoder(filter),
row_event_id: extract_event_id,
});
Ok(Box::pin(TypedStream { inner: stream }))
}
struct TypedStream {
inner: OutboxCursorStream<LeaseHistoryEvent>,
}
impl Stream for TypedStream {
type Item = Result<LeaseHistoryEvent, EngineError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
fn extract_event_id(row: &sqlx::sqlite::SqliteRow) -> Result<i64, EngineError> {
row.try_get("event_id")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("ff_lease_event.event_id: {e}"),
})
}
fn make_decoder(filter: ScannerFilter) -> RowDecoder<LeaseHistoryEvent> {
Box::new(move |row| decode_row(row, &filter))
}
fn decode_row(
row: &sqlx::sqlite::SqliteRow,
filter: &ScannerFilter,
) -> Result<Option<LeaseHistoryEvent>, EngineError> {
let event_id: i64 = row
.try_get("event_id")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("event_id: {e}"),
})?;
let namespace: Option<String> = row
.try_get::<Option<String>, _>("namespace")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("namespace: {e}"),
})?;
let instance_tag: Option<String> = row
.try_get::<Option<String>, _>("instance_tag")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("instance_tag: {e}"),
})?;
if !passes_filter(filter, namespace.as_deref(), instance_tag.as_deref()) {
return Ok(None);
}
let exec_text: String = row
.try_get("execution_id")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("execution_id: {e}"),
})?;
let partition_key: i64 = row
.try_get("partition_key")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("partition_key: {e}"),
})?;
let exec_uuid = Uuid::parse_str(&exec_text).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("ff_lease_event.execution_id parse '{exec_text}': {e}"),
})?;
let execution_id = ExecutionId::parse(&format!("{{fp:{partition_key}}}:{exec_uuid}"))
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("ff_lease_event.execution_id: {e}"),
})?;
let lease_id_str: Option<String> = row
.try_get::<Option<String>, _>("lease_id")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("lease_id column: {e}"),
})?;
let lease_id = match lease_id_str.as_deref() {
Some(s) => Some(LeaseId::parse(s).map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("lease_id parse '{s}': {e}"),
})?),
None => None,
};
let event_type: String = row
.try_get("event_type")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("event_type: {e}"),
})?;
let occurred_at_ms: i64 =
row.try_get("occurred_at_ms")
.map_err(|e| EngineError::Validation {
kind: ValidationKind::Corruption,
detail: format!("occurred_at_ms: {e}"),
})?;
let cursor = encode_postgres_event_cursor(event_id);
let at = TimestampMs::from_millis(occurred_at_ms);
let event = match event_type.as_str() {
"acquired" => LeaseHistoryEvent::Acquired {
cursor,
execution_id,
lease_id,
worker_instance_id: None,
at,
},
"renewed" => LeaseHistoryEvent::Renewed {
cursor,
execution_id,
lease_id,
worker_instance_id: None,
at,
},
"expired" => LeaseHistoryEvent::Expired {
cursor,
execution_id,
lease_id,
prev_owner: None,
at,
},
"reclaimed" => LeaseHistoryEvent::Reclaimed {
cursor,
execution_id,
new_lease_id: lease_id,
new_owner: None,
at,
},
"revoked" => LeaseHistoryEvent::Revoked {
cursor,
execution_id,
lease_id,
revoked_by: "operator".to_string(),
at,
},
other => {
tracing::warn!(
event_id = event_id,
event_type = %other,
"sqlite.lease_history: unknown event_type, skipping"
);
return Ok(None);
}
};
Ok(Some(event))
}