use ff_core::keys::{ExecKeyContext, IndexKeys};
use ff_core::partition::{
Partition, PartitionConfig, PartitionFamily, execution_partition,
};
use ff_core::types::ExecutionId;
pub struct PartitionRouter {
config: PartitionConfig,
}
impl PartitionRouter {
pub fn new(config: PartitionConfig) -> Self {
Self { config }
}
pub fn partition_for(&self, eid: &ExecutionId) -> Partition {
execution_partition(eid, &self.config)
}
pub fn exec_keys(&self, eid: &ExecutionId) -> ExecKeyContext {
let partition = self.partition_for(eid);
ExecKeyContext::new(&partition, eid)
}
pub fn index_keys(&self, partition_index: u16) -> IndexKeys {
let partition = Partition {
family: PartitionFamily::Execution,
index: partition_index,
};
IndexKeys::new(&partition)
}
pub fn config(&self) -> &PartitionConfig {
&self.config
}
pub fn num_flow_partitions(&self) -> u16 {
self.config.num_flow_partitions
}
}
pub async fn dispatch_dependency_resolution(
client: &ferriskey::Client,
router: &PartitionRouter,
eid: &ExecutionId,
flow_id: Option<&str>,
) {
dispatch_dependency_resolution_inner(client, router, eid, flow_id, 0).await;
}
const MAX_CASCADE_DEPTH: u32 = 50;
async fn dispatch_dependency_resolution_inner(
client: &ferriskey::Client,
router: &PartitionRouter,
eid: &ExecutionId,
flow_id: Option<&str>,
cascade_depth: u32,
) {
if cascade_depth > MAX_CASCADE_DEPTH {
tracing::warn!(
execution_id = %eid,
cascade_depth,
"dispatch_dep: cascade depth limit reached, reconciler will catch remaining"
);
return;
}
let flow_id_str = match flow_id {
Some(fid) if !fid.is_empty() => fid,
_ => return, };
let exec_ctx = router.exec_keys(eid);
let core_key = exec_ctx.core();
let outcome: Option<String> = match client
.cmd("HGET")
.arg(&core_key)
.arg("terminal_outcome")
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(
execution_id = %eid,
error = %e,
"dispatch_dep: failed to read terminal_outcome"
);
return;
}
};
let outcome_str = outcome.unwrap_or_default();
let upstream_outcome = outcome_str.as_str();
let fid = match ff_core::types::FlowId::parse(flow_id_str) {
Ok(id) => id,
Err(e) => {
tracing::warn!(
flow_id = flow_id_str,
error = %e,
"dispatch_dep: invalid flow_id"
);
return;
}
};
let flow_partition = ff_core::partition::flow_partition(&fid, router.config());
let flow_ctx = ff_core::keys::FlowKeyContext::new(&flow_partition, &fid);
let out_key = flow_ctx.outgoing(eid);
let edge_ids: Vec<String> = match client
.cmd("SMEMBERS")
.arg(&out_key)
.execute()
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
execution_id = %eid,
flow_id = flow_id_str,
error = %e,
"dispatch_dep: SMEMBERS outgoing failed"
);
return;
}
};
if edge_ids.is_empty() {
return;
}
let now_ms = ff_core::types::TimestampMs::now().0.to_string();
let mut resolved: u32 = 0;
let mut skipped_children: Vec<(ExecutionId, String)> = Vec::new();
for edge_id in &edge_ids {
let parsed_edge_id = match ff_core::types::EdgeId::parse(edge_id) {
Ok(id) => id,
Err(e) => {
tracing::warn!(
edge_id = edge_id.as_str(),
flow_id = flow_id_str,
error = %e,
"dispatch_dep: invalid edge_id in outgoing adjacency set, skipping"
);
continue;
}
};
let edge_key = flow_ctx.edge(&parsed_edge_id);
let downstream_eid_str: Option<String> = match client
.cmd("HGET")
.arg(&edge_key)
.arg("downstream_execution_id")
.execute()
.await
{
Ok(v) => v,
Err(_) => continue,
};
let downstream_eid_str = match downstream_eid_str {
Some(s) if !s.is_empty() => s,
_ => continue,
};
let downstream_eid = match ExecutionId::parse(&downstream_eid_str) {
Ok(id) => id,
Err(_) => continue,
};
let child_partition = router.partition_for(&downstream_eid);
let child_ctx = ExecKeyContext::new(&child_partition, &downstream_eid);
let child_idx = IndexKeys::new(&child_partition);
let child_core_key = child_ctx.core();
let lane_str: Option<String> = match client
.cmd("HGET")
.arg(&child_core_key)
.arg("lane_id")
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(
edge_id = edge_id.as_str(),
downstream = downstream_eid_str.as_str(),
error = %e,
"dispatch_dep: HGET lane_id failed, skipping (reconciler will retry)"
);
continue;
}
};
let lane_id = ff_core::types::LaneId::new(
lane_str.as_deref().unwrap_or("default"),
);
let att_idx_str: Option<String> = match client
.cmd("HGET")
.arg(&child_core_key)
.arg("current_attempt_index")
.execute()
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(
edge_id = edge_id.as_str(),
downstream = downstream_eid_str.as_str(),
error = %e,
"dispatch_dep: HGET current_attempt_index failed, \
skipping (reconciler will retry)"
);
continue;
}
};
let att_idx = ff_core::types::AttemptIndex::new(
att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
);
let dep_hash = child_ctx.dep_edge(&parsed_edge_id);
let deps_meta = child_ctx.deps_meta();
let unresolved = child_ctx.deps_unresolved();
let eligible = child_idx.lane_eligible(&lane_id);
let terminal = child_idx.lane_terminal(&lane_id);
let blocked_deps = child_idx.lane_blocked_dependencies(&lane_id);
let attempt_hash = child_ctx.attempt_hash(att_idx);
let stream_meta = child_ctx.stream_meta(att_idx);
let downstream_payload = child_ctx.payload();
let upstream_ctx = ExecKeyContext::new(&child_partition, eid);
let upstream_result = upstream_ctx.result();
let edgegroup = flow_ctx.edgegroup(&downstream_eid);
let incoming_set = flow_ctx.incoming(&downstream_eid);
let pending_cancel_groups = ff_core::keys::FlowIndexKeys::new(&flow_partition)
.pending_cancel_groups();
let downstream_eid_full = downstream_eid.to_string();
let keys: [&str; 14] = [
&child_core_key, &deps_meta, &unresolved, &dep_hash, &eligible, &terminal, &blocked_deps, &attempt_hash, &stream_meta, &downstream_payload, &upstream_result, &edgegroup, &incoming_set, &pending_cancel_groups, ];
let argv: [&str; 5] = [
edge_id,
upstream_outcome,
&now_ms,
flow_id_str, &downstream_eid_full, ];
match client
.fcall::<ferriskey::Value>("ff_resolve_dependency", &keys, &argv)
.await
{
Ok(val) => {
resolved += 1;
tracing::debug!(
edge_id = edge_id.as_str(),
downstream = downstream_eid_str.as_str(),
outcome = upstream_outcome,
"dispatch_dep: resolved dependency"
);
if is_child_skipped_result(&val) {
skipped_children.push((
downstream_eid.clone(),
flow_id_str.to_string(),
));
}
}
Err(e) => {
tracing::warn!(
edge_id = edge_id.as_str(),
downstream = downstream_eid_str.as_str(),
error = %e,
"dispatch_dep: ff_resolve_dependency failed"
);
}
}
}
if resolved > 0 {
tracing::info!(
execution_id = %eid,
flow_id = flow_id_str,
resolved,
total_edges = edge_ids.len(),
skipped_cascade = skipped_children.len(),
"dispatch_dep: dependency resolution complete"
);
}
for (child_eid, child_flow_id) in &skipped_children {
Box::pin(dispatch_dependency_resolution_inner(
client, router, child_eid, Some(child_flow_id.as_str()),
cascade_depth + 1,
)).await;
}
}
#[cfg(feature = "postgres")]
pub async fn dispatch_via_postgres(
pool: &ff_backend_postgres::PgPool,
event_id: i64,
) -> Result<ff_backend_postgres::dispatch::DispatchOutcome, ff_core::engine_error::EngineError> {
ff_backend_postgres::dispatch::dispatch_completion(pool, event_id).await
}
fn is_child_skipped_result(value: &ferriskey::Value) -> bool {
match value {
ferriskey::Value::Array(arr) => {
if arr.len() < 4 {
return false;
}
arr.get(3)
.and_then(|v| match v {
Ok(ferriskey::Value::BulkString(b)) => {
Some(&b[..] == b"child_skipped")
}
Ok(ferriskey::Value::SimpleString(s)) => {
Some(s == "child_skipped")
}
_ => None,
})
.unwrap_or(false)
}
_ => {
tracing::warn!(
"is_child_skipped_result: expected Array, got non-array value"
);
false
}
}
}