use ff_core::backend::ScannerFilter;
use ff_core::engine_error::EngineError;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use crate::error::map_sqlx_error;
const BATCH_SIZE: i64 = 50;
#[derive(Debug, Default, Clone)]
pub struct DispatchReport {
pub dispatched: u64,
pub orphaned: u64,
pub skipped_for_reconciler: u64,
pub skipped_locked: u64,
pub errors: u64,
}
pub async fn dispatcher_tick(
pool: &PgPool,
_filter: &ScannerFilter,
) -> Result<DispatchReport, EngineError> {
let pending: Vec<(i16, Uuid, Uuid)> = sqlx::query_as(
r#"
SELECT partition_key, flow_id, downstream_eid
FROM ff_pending_cancel_groups
ORDER BY enqueued_at_ms
LIMIT $1
"#,
)
.bind(BATCH_SIZE)
.fetch_all(pool)
.await
.map_err(map_sqlx_error)?;
let mut report = DispatchReport::default();
for (partition_key, flow_id, downstream_eid) in pending {
match dispatch_one_group(pool, partition_key, flow_id, downstream_eid).await {
Ok(GroupOutcome::Dispatched) => report.dispatched += 1,
Ok(GroupOutcome::Orphaned) => report.orphaned += 1,
Ok(GroupOutcome::SkippedForReconciler) => report.skipped_for_reconciler += 1,
Ok(GroupOutcome::SkippedLocked) => report.skipped_locked += 1,
Err(e) => {
tracing::warn!(
partition_key,
%flow_id,
%downstream_eid,
error = %e,
"pg edge_cancel_dispatcher: group drain failed; retry next tick"
);
report.errors += 1;
}
}
}
Ok(report)
}
enum GroupOutcome {
Dispatched,
Orphaned,
SkippedForReconciler,
SkippedLocked,
}
async fn dispatch_one_group(
pool: &PgPool,
partition_key: i16,
flow_id: Uuid,
downstream_eid: Uuid,
) -> Result<GroupOutcome, EngineError> {
let mut tx = pool.begin().await.map_err(map_sqlx_error)?;
let row = sqlx::query(
r#"
SELECT cancel_siblings_pending_flag,
cancel_siblings_pending_members
FROM ff_edge_group
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
FOR UPDATE SKIP LOCKED
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
let Some(row) = row else {
let exists: Option<(bool,)> = sqlx::query_as(
"SELECT true FROM ff_edge_group
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.fetch_optional(&mut *tx)
.await
.map_err(map_sqlx_error)?;
if exists.is_some() {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(GroupOutcome::SkippedLocked);
}
sqlx::query(
"DELETE FROM ff_pending_cancel_groups
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
return Ok(GroupOutcome::Orphaned);
};
let flag: bool = row.get("cancel_siblings_pending_flag");
let members_raw: Vec<String> = row.get("cancel_siblings_pending_members");
if !flag {
tx.rollback().await.map_err(map_sqlx_error)?;
return Ok(GroupOutcome::SkippedForReconciler);
}
let reason: &str = "sibling_quorum_satisfied";
let now = now_ms();
let members: Vec<Uuid> = members_raw
.iter()
.filter_map(|s| Uuid::parse_str(s).ok())
.collect();
for sib in &members {
sqlx::query(
r#"
UPDATE ff_exec_core
SET lifecycle_phase = 'terminal',
ownership_state = 'unowned',
eligibility_state = 'not_applicable',
public_state = 'cancelled',
attempt_state = 'cancelled',
terminal_at_ms = COALESCE(terminal_at_ms, $3),
cancellation_reason = COALESCE(cancellation_reason, $4),
cancelled_by = COALESCE(cancelled_by, 'engine'),
raw_fields = jsonb_set(raw_fields, '{last_mutation_at}', to_jsonb($3::text))
WHERE partition_key = $1 AND execution_id = $2
AND lifecycle_phase NOT IN ('terminal','cancelled')
"#,
)
.bind(partition_key)
.bind(sib)
.bind(now)
.bind(reason)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
}
sqlx::query(
r#"
UPDATE ff_edge_group
SET cancel_siblings_pending_flag = false,
cancel_siblings_pending_members = '{}'::text[]
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3
"#,
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
sqlx::query(
"DELETE FROM ff_pending_cancel_groups
WHERE partition_key = $1 AND flow_id = $2 AND downstream_eid = $3",
)
.bind(partition_key)
.bind(flow_id)
.bind(downstream_eid)
.execute(&mut *tx)
.await
.map_err(map_sqlx_error)?;
tx.commit().await.map_err(map_sqlx_error)?;
Ok(GroupOutcome::Dispatched)
}
fn now_ms() -> i64 {
let d = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default();
(d.as_millis() as i64).max(0)
}