use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use async_trait::async_trait;
use ff_core::backend::{CompletionPayload, ScannerFilter};
use ff_core::completion_backend::{CompletionBackend, CompletionStream};
use ff_core::engine_error::EngineError;
use ff_core::types::{ExecutionId, FlowId, Namespace, TimestampMs};
use futures_core::Stream;
use sqlx::postgres::{PgListener, PgPool};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use uuid::Uuid;
use crate::PostgresBackend;
pub const COMPLETION_CHANNEL: &str = "ff_completion";
const STREAM_CAPACITY: usize = 1024;
const REPLAY_BATCH: i64 = 256;
const RECONNECT_BACKOFF: Duration = Duration::from_millis(200);
pub struct PostgresCompletionStream {
rx: mpsc::Receiver<CompletionPayload>,
handle: JoinHandle<()>,
}
impl Drop for PostgresCompletionStream {
fn drop(&mut self) {
self.handle.abort();
}
}
impl Stream for PostgresCompletionStream {
type Item = CompletionPayload;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
struct CompletionEventRow {
event_id: i64,
execution_id: Uuid,
flow_id: Option<Uuid>,
outcome: String,
namespace: Option<String>,
instance_tag: Option<String>,
occurred_at_ms: i64,
partition_key: i16,
}
impl CompletionEventRow {
fn into_payload(self) -> CompletionPayload {
let eid_string = format!("{{fp:{}}}:{}", self.partition_key, self.execution_id);
let execution_id = ExecutionId::parse(&eid_string)
.expect("ff_completion_event row produced malformed ExecutionId");
let mut payload = CompletionPayload::new(
execution_id,
self.outcome,
None, TimestampMs(self.occurred_at_ms),
);
if let Some(fid) = self.flow_id {
payload = payload.with_flow_id(FlowId::from_uuid(fid));
}
payload
}
fn passes(&self, filter: &ScannerFilter) -> bool {
if let Some(ref want_ns) = filter.namespace {
let have: Option<Namespace> = self.namespace.as_deref().map(Namespace::from);
if have.as_ref() != Some(want_ns) {
return false;
}
}
if let Some((ref k, ref v)) = filter.instance_tag {
let want = format!("{k}={v}");
match self.instance_tag {
Some(ref have) if have == &want => {}
_ => return false,
}
}
true
}
}
pub(crate) async fn subscribe(
pool: &PgPool,
filter: Option<ScannerFilter>,
) -> Result<CompletionStream, EngineError> {
let filter = filter.unwrap_or(ScannerFilter::NOOP);
let start: i64 = sqlx::query_scalar("SELECT COALESCE(MAX(event_id), 0) FROM ff_completion_event")
.fetch_one(pool)
.await
.map_err(|e| EngineError::Unavailable {
op: match &e {
sqlx::Error::Database(_) => "pg.subscribe_completions (max(event_id))",
_ => "pg.subscribe_completions (connect)",
},
})?;
let (tx, rx) = mpsc::channel::<CompletionPayload>(STREAM_CAPACITY);
let pool_clone = pool.clone();
let handle = tokio::spawn(subscriber_loop(pool_clone, tx, filter, start));
let stream = PostgresCompletionStream { rx, handle };
Ok(Box::pin(stream))
}
async fn subscriber_loop(
pool: PgPool,
tx: mpsc::Sender<CompletionPayload>,
filter: ScannerFilter,
mut last_seen: i64,
) {
loop {
let mut listener = match PgListener::connect_with(&pool).await {
Ok(l) => l,
Err(e) => {
tracing::warn!(
error = %e,
"pg.completion.subscribe: PgListener::connect_with failed; retrying"
);
if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
return;
}
continue;
}
};
if let Err(e) = listener.listen(COMPLETION_CHANNEL).await {
tracing::warn!(
error = %e,
"pg.completion.subscribe: LISTEN ff_completion failed; retrying"
);
if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
return;
}
continue;
}
if !replay(&pool, &tx, &filter, &mut last_seen).await {
return; }
loop {
tokio::select! {
_ = tx.closed() => return,
res = listener.recv() => {
match res {
Ok(_notification) => {
if !replay(&pool, &tx, &filter, &mut last_seen).await {
return;
}
}
Err(e) => {
tracing::warn!(
error = %e,
"pg.completion.subscribe: listener.recv() error; reconnecting"
);
break; }
}
}
}
}
if wait_or_exit(&tx, RECONNECT_BACKOFF).await {
return;
}
}
}
async fn wait_or_exit(tx: &mpsc::Sender<CompletionPayload>, d: Duration) -> bool {
tokio::select! {
_ = tx.closed() => true,
_ = tokio::time::sleep(d) => false,
}
}
async fn replay(
pool: &PgPool,
tx: &mpsc::Sender<CompletionPayload>,
filter: &ScannerFilter,
last_seen: &mut i64,
) -> bool {
loop {
let rows: Vec<CompletionEventRow> = match sqlx::query_as::<_, (i64, Uuid, Option<Uuid>, String, Option<String>, Option<String>, i64, i16)>(
"SELECT event_id, execution_id, flow_id, outcome, namespace, instance_tag, occurred_at_ms, partition_key \
FROM ff_completion_event \
WHERE event_id > $1 \
ORDER BY event_id ASC \
LIMIT $2"
)
.bind(*last_seen)
.bind(REPLAY_BATCH)
.fetch_all(pool)
.await
{
Ok(rows) => rows
.into_iter()
.map(|(event_id, execution_id, flow_id, outcome, namespace, instance_tag, occurred_at_ms, partition_key)| CompletionEventRow {
event_id,
execution_id,
flow_id,
outcome,
namespace,
instance_tag,
occurred_at_ms,
partition_key,
})
.collect(),
Err(e) => {
tracing::warn!(error = %e, "pg.completion.replay: query failed");
return !tx.is_closed();
}
};
if rows.is_empty() {
return !tx.is_closed();
}
for row in rows {
*last_seen = row.event_id;
let passes = row.passes(filter);
if !passes {
continue;
}
let payload = row.into_payload();
if tx.send(payload).await.is_err() {
return false; }
}
}
}
#[allow(dead_code)]
fn _assert_pg_dyn_completion(b: std::sync::Arc<PostgresBackend>) -> std::sync::Arc<dyn CompletionBackend> {
b
}
#[async_trait]
impl CompletionBackend for PostgresBackend {
async fn subscribe_completions(&self) -> Result<CompletionStream, EngineError> {
subscribe(&self.pool, None).await
}
async fn subscribe_completions_filtered(
&self,
filter: &ScannerFilter,
) -> Result<CompletionStream, EngineError> {
if filter.is_noop() {
return self.subscribe_completions().await;
}
subscribe(&self.pool, Some(filter.clone())).await
}
}