use std::collections::{BTreeMap, HashMap};
use ff_core::contracts::{
AttemptSummary, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, LeaseSummary,
};
use ff_core::keys::{ExecKeyContext, FlowKeyContext};
use ff_core::partition::{execution_partition, flow_partition};
use ff_core::state::PublicState;
use ff_core::types::{
AttemptId, AttemptIndex, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch, Namespace,
TimestampMs, WaitpointId, WorkerInstanceId,
};
use crate::SdkError;
use crate::worker::FlowFabricWorker;
impl FlowFabricWorker {
pub async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, SdkError> {
let partition = execution_partition(id, self.partition_config());
let ctx = ExecKeyContext::new(&partition, id);
let core_key = ctx.core();
let tags_key = ctx.tags();
let mut pipe = self.client().pipeline();
let core_slot = pipe
.cmd::<HashMap<String, String>>("HGETALL")
.arg(&core_key)
.finish();
let tags_slot = pipe
.cmd::<HashMap<String, String>>("HGETALL")
.arg(&tags_key)
.finish();
pipe.execute().await.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "describe_execution: pipeline HGETALL exec_core + tags".into(),
})?;
let core = core_slot.value().map_err(|e| SdkError::ValkeyContext {
source: e,
context: "describe_execution: decode HGETALL exec_core".into(),
})?;
if core.is_empty() {
return Ok(None);
}
let tags_raw = tags_slot.value().map_err(|e| SdkError::ValkeyContext {
source: e,
context: "describe_execution: decode HGETALL tags".into(),
})?;
build_execution_snapshot(id.clone(), &core, tags_raw)
}
}
fn build_execution_snapshot(
execution_id: ExecutionId,
core: &HashMap<String, String>,
tags_raw: HashMap<String, String>,
) -> Result<Option<ExecutionSnapshot>, SdkError> {
let public_state = parse_public_state(opt_str(core, "public_state").unwrap_or(""))?;
let lane_id = LaneId::try_new(opt_str(core, "lane_id").unwrap_or("")).map_err(|e| {
SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("lane_id".into()),
message: format!("fails LaneId validation (key corruption?): {e}"),
}
})?;
let namespace_str = opt_str(core, "namespace").unwrap_or("").to_owned();
let namespace = Namespace::new(namespace_str);
let flow_id = opt_str(core, "flow_id")
.filter(|s| !s.is_empty())
.map(|s| {
FlowId::parse(s).map_err(|e| SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("flow_id".into()),
message: format!("is not a valid UUID (key corruption?): {e}"),
})
})
.transpose()?;
let blocking_reason = opt_str(core, "blocking_reason")
.filter(|s| !s.is_empty())
.map(str::to_owned);
let blocking_detail = opt_str(core, "blocking_detail")
.filter(|s| !s.is_empty())
.map(str::to_owned);
let created_at =
parse_ts(core, "describe_execution: exec_core", "created_at")?.ok_or_else(|| {
SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("created_at".into()),
message: "is missing or empty (key corruption?)".into(),
}
})?;
let last_mutation_at = parse_ts(core, "describe_execution: exec_core", "last_mutation_at")?
.ok_or_else(|| SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("last_mutation_at".into()),
message: "is missing or empty (key corruption?)".into(),
})?;
let total_attempt_count: u32 =
parse_u32_strict(core, "describe_execution: exec_core", "total_attempt_count")?
.unwrap_or(0);
let current_attempt = build_attempt_summary(core)?;
let current_lease = build_lease_summary(core)?;
let current_waitpoint = opt_str(core, "current_waitpoint_id")
.filter(|s| !s.is_empty())
.map(|s| {
WaitpointId::parse(s).map_err(|e| SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("current_waitpoint_id".into()),
message: format!("is not a valid UUID (key corruption?): {e}"),
})
})
.transpose()?;
let tags: BTreeMap<String, String> = tags_raw.into_iter().collect();
Ok(Some(ExecutionSnapshot::new(
execution_id,
flow_id,
lane_id,
namespace,
public_state,
blocking_reason,
blocking_detail,
current_attempt,
current_lease,
current_waitpoint,
created_at,
last_mutation_at,
total_attempt_count,
tags,
)))
}
fn opt_str<'a>(map: &'a HashMap<String, String>, field: &str) -> Option<&'a str> {
map.get(field).map(String::as_str)
}
fn parse_ts(
map: &HashMap<String, String>,
context: &str,
field: &str,
) -> Result<Option<TimestampMs>, SdkError> {
match opt_str(map, field).filter(|s| !s.is_empty()) {
None => Ok(None),
Some(raw) => {
let ms: i64 = raw.parse().map_err(|e| SdkError::Config {
context: context.to_owned(),
field: Some(field.to_owned()),
message: format!("is not a valid ms timestamp ('{raw}'): {e}"),
})?;
Ok(Some(TimestampMs::from_millis(ms)))
}
}
}
fn parse_u32_strict(
map: &HashMap<String, String>,
context: &str,
field: &str,
) -> Result<Option<u32>, SdkError> {
match opt_str(map, field).filter(|s| !s.is_empty()) {
None => Ok(None),
Some(raw) => Ok(Some(raw.parse().map_err(|e| SdkError::Config {
context: context.to_owned(),
field: Some(field.to_owned()),
message: format!("is not a valid u32 ('{raw}'): {e}"),
})?)),
}
}
fn parse_u64_strict(
map: &HashMap<String, String>,
context: &str,
field: &str,
) -> Result<Option<u64>, SdkError> {
match opt_str(map, field).filter(|s| !s.is_empty()) {
None => Ok(None),
Some(raw) => Ok(Some(raw.parse().map_err(|e| SdkError::Config {
context: context.to_owned(),
field: Some(field.to_owned()),
message: format!("is not a valid u64 ('{raw}'): {e}"),
})?)),
}
}
fn parse_public_state(raw: &str) -> Result<PublicState, SdkError> {
let quoted = format!("\"{raw}\"");
serde_json::from_str("ed).map_err(|e| SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("public_state".into()),
message: format!("'{raw}' is not a known public state: {e}"),
})
}
fn build_attempt_summary(
core: &HashMap<String, String>,
) -> Result<Option<AttemptSummary>, SdkError> {
let attempt_id_str = match opt_str(core, "current_attempt_id").filter(|s| !s.is_empty()) {
None => return Ok(None),
Some(s) => s,
};
let attempt_id = AttemptId::parse(attempt_id_str).map_err(|e| SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("current_attempt_id".into()),
message: format!("is not a valid UUID: {e}"),
})?;
let attempt_index = parse_u32_strict(
core,
"describe_execution: exec_core",
"current_attempt_index",
)?
.ok_or_else(|| SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("current_attempt_index".into()),
message: "is missing while current_attempt_id is set (key corruption?)".into(),
})?;
Ok(Some(AttemptSummary::new(
attempt_id,
AttemptIndex::new(attempt_index),
)))
}
fn build_lease_summary(core: &HashMap<String, String>) -> Result<Option<LeaseSummary>, SdkError> {
let wid_str = match opt_str(core, "current_worker_instance_id").filter(|s| !s.is_empty()) {
None => return Ok(None),
Some(s) => s,
};
let expires_at = match parse_ts(core, "describe_execution: exec_core", "lease_expires_at")? {
None => return Ok(None),
Some(ts) => ts,
};
let epoch = parse_u64_strict(core, "describe_execution: exec_core", "current_lease_epoch")?
.ok_or_else(|| SdkError::Config {
context: "describe_execution: exec_core".into(),
field: Some("current_lease_epoch".into()),
message: "is missing while current_worker_instance_id is set (key corruption?)".into(),
})?;
Ok(Some(LeaseSummary::new(
LeaseEpoch::new(epoch),
WorkerInstanceId::new(wid_str.to_owned()),
expires_at,
)))
}
const FLOW_CORE_KNOWN_FIELDS: &[&str] = &[
"flow_id",
"flow_kind",
"namespace",
"public_flow_state",
"graph_revision",
"node_count",
"edge_count",
"created_at",
"last_mutation_at",
"cancelled_at",
"cancel_reason",
"cancellation_policy",
];
impl FlowFabricWorker {
pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
let partition = flow_partition(id, self.partition_config());
let ctx = FlowKeyContext::new(&partition, id);
let core_key = ctx.core();
let raw: HashMap<String, String> = self
.client()
.cmd("HGETALL")
.arg(&core_key)
.execute()
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "describe_flow: HGETALL flow_core".into(),
})?;
if raw.is_empty() {
return Ok(None);
}
build_flow_snapshot(id.clone(), &raw).map(Some)
}
}
fn build_flow_snapshot(
flow_id: FlowId,
raw: &HashMap<String, String>,
) -> Result<FlowSnapshot, SdkError> {
let stored_flow_id_str = opt_str(raw, "flow_id")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("flow_id".into()),
message: "is missing or empty (key corruption?)".into(),
})?;
if stored_flow_id_str != flow_id.to_string() {
return Err(SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("flow_id".into()),
message: format!(
"'{stored_flow_id_str}' does not match requested flow_id \
'{flow_id}' (key corruption or wrong-key read?)"
),
});
}
let namespace_str = opt_str(raw, "namespace")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("namespace".into()),
message: "is missing or empty (key corruption?)".into(),
})?;
let namespace = Namespace::new(namespace_str.to_owned());
let flow_kind = opt_str(raw, "flow_kind")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("flow_kind".into()),
message: "is missing or empty (key corruption?)".into(),
})?
.to_owned();
let public_flow_state = opt_str(raw, "public_flow_state")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("public_flow_state".into()),
message: "is missing or empty (key corruption?)".into(),
})?
.to_owned();
let graph_revision = parse_u64_strict(raw, "describe_flow: flow_core", "graph_revision")?
.ok_or_else(|| SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("graph_revision".into()),
message: "is missing (key corruption?)".into(),
})?;
let node_count =
parse_u32_strict(raw, "describe_flow: flow_core", "node_count")?.ok_or_else(|| {
SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("node_count".into()),
message: "is missing (key corruption?)".into(),
}
})?;
let edge_count =
parse_u32_strict(raw, "describe_flow: flow_core", "edge_count")?.ok_or_else(|| {
SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("edge_count".into()),
message: "is missing (key corruption?)".into(),
}
})?;
let created_at = parse_ts(raw, "describe_flow: flow_core", "created_at")?.ok_or_else(|| {
SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("created_at".into()),
message: "is missing or empty (key corruption?)".into(),
}
})?;
let last_mutation_at = parse_ts(raw, "describe_flow: flow_core", "last_mutation_at")?
.ok_or_else(|| SdkError::Config {
context: "describe_flow: flow_core".into(),
field: Some("last_mutation_at".into()),
message: "is missing or empty (key corruption?)".into(),
})?;
let cancelled_at = parse_ts(raw, "describe_flow: flow_core", "cancelled_at")?;
let cancel_reason = opt_str(raw, "cancel_reason")
.filter(|s| !s.is_empty())
.map(str::to_owned);
let cancellation_policy = opt_str(raw, "cancellation_policy")
.filter(|s| !s.is_empty())
.map(str::to_owned);
let mut tags: BTreeMap<String, String> = BTreeMap::new();
for (k, v) in raw {
if FLOW_CORE_KNOWN_FIELDS.contains(&k.as_str()) {
continue;
}
if is_namespaced_tag_key(k) {
tags.insert(k.clone(), v.clone());
} else {
return Err(SdkError::Config {
context: "describe_flow: flow_core".into(),
field: None,
message: format!(
"has unexpected field '{k}' — not an FF field and not a namespaced \
tag (lowercase-alphanumeric-prefix + '.')"
),
});
}
}
Ok(FlowSnapshot::new(
flow_id,
flow_kind,
namespace,
public_flow_state,
graph_revision,
node_count,
edge_count,
created_at,
last_mutation_at,
cancelled_at,
cancel_reason,
cancellation_policy,
tags,
))
}
fn is_namespaced_tag_key(k: &str) -> bool {
let mut chars = k.chars();
let Some(first) = chars.next() else {
return false;
};
if !first.is_ascii_lowercase() {
return false;
}
let mut saw_dot = false;
for c in chars {
if c == '.' {
saw_dot = true;
break;
}
if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
return false;
}
}
saw_dot
}
const EDGE_KNOWN_FIELDS: &[&str] = &[
"edge_id",
"flow_id",
"upstream_execution_id",
"downstream_execution_id",
"dependency_kind",
"satisfaction_condition",
"data_passing_ref",
"edge_state",
"created_at",
"created_by",
];
impl FlowFabricWorker {
pub async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, SdkError> {
let partition = flow_partition(flow_id, self.partition_config());
let ctx = FlowKeyContext::new(&partition, flow_id);
let edge_key = ctx.edge(edge_id);
let raw: HashMap<String, String> = self
.client()
.cmd("HGETALL")
.arg(&edge_key)
.execute()
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "describe_edge: HGETALL edge_hash".into(),
})?;
if raw.is_empty() {
return Ok(None);
}
build_edge_snapshot(flow_id, edge_id, &raw).map(Some)
}
pub async fn list_outgoing_edges(
&self,
upstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError> {
let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
return Ok(Vec::new());
};
let partition = flow_partition(&flow_id, self.partition_config());
let ctx = FlowKeyContext::new(&partition, &flow_id);
self.list_edges_from_set(
&ctx.outgoing(upstream_eid),
&flow_id,
upstream_eid,
AdjacencySide::Outgoing,
)
.await
}
pub async fn list_incoming_edges(
&self,
downstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError> {
let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
return Ok(Vec::new());
};
let partition = flow_partition(&flow_id, self.partition_config());
let ctx = FlowKeyContext::new(&partition, &flow_id);
self.list_edges_from_set(
&ctx.incoming(downstream_eid),
&flow_id,
downstream_eid,
AdjacencySide::Incoming,
)
.await
}
async fn resolve_flow_id(
&self,
eid: &ExecutionId,
) -> Result<Option<FlowId>, SdkError> {
let exec_partition = execution_partition(eid, self.partition_config());
let ctx = ExecKeyContext::new(&exec_partition, eid);
let raw: Option<String> = self
.client()
.cmd("HGET")
.arg(ctx.core())
.arg("flow_id")
.execute()
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "list_edges: HGET exec_core.flow_id".into(),
})?;
let Some(raw) = raw.filter(|s| !s.is_empty()) else {
return Ok(None);
};
let flow_id = FlowId::parse(&raw).map_err(|e| SdkError::Config {
context: "list_edges: exec_core".into(),
field: Some("flow_id".into()),
message: format!("'{raw}' is not a valid UUID (key corruption?): {e}"),
})?;
let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
if exec_partition.index != flow_partition_index {
return Err(SdkError::Config {
context: "list_edges: exec_core".into(),
field: Some("flow_id".into()),
message: format!(
"'{flow_id}' partition {flow_partition_index} does not match \
execution partition {} (RFC-011 co-location violation; key corruption?)",
exec_partition.index
),
});
}
Ok(Some(flow_id))
}
async fn list_edges_from_set(
&self,
adj_key: &str,
flow_id: &FlowId,
subject_eid: &ExecutionId,
side: AdjacencySide,
) -> Result<Vec<EdgeSnapshot>, SdkError> {
let edge_id_strs: Vec<String> = self
.client()
.cmd("SMEMBERS")
.arg(adj_key)
.execute()
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "list_edges: SMEMBERS adj_set".into(),
})?;
if edge_id_strs.is_empty() {
return Ok(Vec::new());
}
let mut edge_ids: Vec<EdgeId> = Vec::with_capacity(edge_id_strs.len());
for raw in &edge_id_strs {
let parsed = EdgeId::parse(raw).map_err(|e| SdkError::Config {
context: "list_edges: adjacency_set".into(),
field: Some("edge_id".into()),
message: format!("'{raw}' is not a valid EdgeId (key corruption?): {e}"),
})?;
edge_ids.push(parsed);
}
let partition = flow_partition(flow_id, self.partition_config());
let ctx = FlowKeyContext::new(&partition, flow_id);
let mut pipe = self.client().pipeline();
let slots: Vec<_> = edge_ids
.iter()
.map(|eid| {
pipe.cmd::<HashMap<String, String>>("HGETALL")
.arg(ctx.edge(eid))
.finish()
})
.collect();
pipe.execute().await.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "list_edges: pipeline HGETALL edges".into(),
})?;
let mut out: Vec<EdgeSnapshot> = Vec::with_capacity(edge_ids.len());
for (edge_id, slot) in edge_ids.iter().zip(slots) {
let raw = slot.value().map_err(|e| SdkError::ValkeyContext {
source: e,
context: "list_edges: decode HGETALL edge_hash".into(),
})?;
if raw.is_empty() {
return Err(SdkError::Config {
context: "list_edges: adjacency_set".into(),
field: None,
message: format!(
"refers to edge_id '{edge_id}' but its edge_hash is absent \
(key corruption?)"
),
});
}
let snap = build_edge_snapshot(flow_id, edge_id, &raw)?;
let endpoint = match side {
AdjacencySide::Outgoing => &snap.upstream_execution_id,
AdjacencySide::Incoming => &snap.downstream_execution_id,
};
if endpoint != subject_eid {
return Err(SdkError::Config {
context: "list_edges: adjacency_set".into(),
field: None,
message: format!(
"for execution '{subject_eid}' (side={side:?}) contains edge \
'{edge_id}' whose stored endpoint is '{endpoint}' \
(adjacency/edge-hash drift?)"
),
});
}
out.push(snap);
}
Ok(out)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum AdjacencySide {
Outgoing,
Incoming,
}
#[allow(dead_code)]
pub(crate) fn build_edge_snapshot_public(
flow_id: &FlowId,
edge_id: &EdgeId,
raw: &HashMap<String, String>,
) -> Result<EdgeSnapshot, SdkError> {
build_edge_snapshot(flow_id, edge_id, raw)
}
fn build_edge_snapshot(
flow_id: &FlowId,
edge_id: &EdgeId,
raw: &HashMap<String, String>,
) -> Result<EdgeSnapshot, SdkError> {
for k in raw.keys() {
if !EDGE_KNOWN_FIELDS.contains(&k.as_str()) {
return Err(SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: None,
message: format!(
"has unexpected field '{k}' (protocol drift or corruption?)"
),
});
}
}
let stored_edge_id_str = opt_str(raw, "edge_id")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("edge_id".into()),
message: "is missing or empty (key corruption?)".into(),
})?;
if stored_edge_id_str != edge_id.to_string() {
return Err(SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("edge_id".into()),
message: format!(
"'{stored_edge_id_str}' does not match requested edge_id \
'{edge_id}' (key corruption or wrong-key read?)"
),
});
}
let stored_flow_id_str = opt_str(raw, "flow_id")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("flow_id".into()),
message: "is missing or empty (key corruption?)".into(),
})?;
if stored_flow_id_str != flow_id.to_string() {
return Err(SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("flow_id".into()),
message: format!(
"'{stored_flow_id_str}' does not match requested flow_id \
'{flow_id}' (key corruption or wrong-key read?)"
),
});
}
let upstream_execution_id = parse_eid(raw, "upstream_execution_id")?;
let downstream_execution_id = parse_eid(raw, "downstream_execution_id")?;
let dependency_kind = opt_str(raw, "dependency_kind")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("dependency_kind".into()),
message: "is missing or empty (key corruption?)".into(),
})?
.to_owned();
let satisfaction_condition = opt_str(raw, "satisfaction_condition")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("satisfaction_condition".into()),
message: "is missing or empty (key corruption?)".into(),
})?
.to_owned();
let data_passing_ref = opt_str(raw, "data_passing_ref")
.filter(|s| !s.is_empty())
.map(str::to_owned);
let edge_state = opt_str(raw, "edge_state")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("edge_state".into()),
message: "is missing or empty (key corruption?)".into(),
})?
.to_owned();
let created_at =
parse_ts(raw, "edge_snapshot: edge_hash", "created_at")?.ok_or_else(|| {
SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("created_at".into()),
message: "is missing or empty (key corruption?)".into(),
}
})?;
let created_by = opt_str(raw, "created_by")
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some("created_by".into()),
message: "is missing or empty (key corruption?)".into(),
})?
.to_owned();
Ok(EdgeSnapshot::new(
edge_id.clone(),
flow_id.clone(),
upstream_execution_id,
downstream_execution_id,
dependency_kind,
satisfaction_condition,
data_passing_ref,
edge_state,
created_at,
created_by,
))
}
fn parse_eid(raw: &HashMap<String, String>, field: &str) -> Result<ExecutionId, SdkError> {
let s = opt_str(raw, field)
.filter(|s| !s.is_empty())
.ok_or_else(|| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some(field.to_owned()),
message: "is missing or empty (key corruption?)".into(),
})?;
ExecutionId::parse(s).map_err(|e| SdkError::Config {
context: "edge_snapshot: edge_hash".into(),
field: Some(field.to_owned()),
message: format!("'{s}' is not a valid ExecutionId (key corruption?): {e}"),
})
}
#[cfg(test)]
mod tests {
use super::*;
use ff_core::partition::PartitionConfig;
use ff_core::types::FlowId;
fn eid() -> ExecutionId {
let config = PartitionConfig::default();
ExecutionId::for_flow(&FlowId::new(), &config)
}
fn minimal_core(public_state: &str) -> HashMap<String, String> {
let mut m = HashMap::new();
m.insert("public_state".to_owned(), public_state.to_owned());
m.insert("lane_id".to_owned(), "default".to_owned());
m.insert("namespace".to_owned(), "ns".to_owned());
m.insert("created_at".to_owned(), "1000".to_owned());
m.insert("last_mutation_at".to_owned(), "2000".to_owned());
m.insert("total_attempt_count".to_owned(), "0".to_owned());
m
}
#[test]
fn waiting_exec_no_attempt_no_lease_no_tags() {
let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), HashMap::new())
.unwrap()
.expect("should build");
assert_eq!(snap.public_state, PublicState::Waiting);
assert!(snap.current_attempt.is_none());
assert!(snap.current_lease.is_none());
assert!(snap.current_waitpoint.is_none());
assert_eq!(snap.tags.len(), 0);
assert_eq!(snap.created_at.0, 1000);
assert_eq!(snap.last_mutation_at.0, 2000);
assert!(snap.flow_id.is_none());
assert!(snap.blocking_reason.is_none());
}
#[test]
fn tags_flow_through_sorted() {
let mut tags = HashMap::new();
tags.insert("cairn.task_id".to_owned(), "t-1".to_owned());
tags.insert("cairn.project".to_owned(), "proj".to_owned());
let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), tags)
.unwrap()
.unwrap();
let keys: Vec<_> = snap.tags.keys().cloned().collect();
assert_eq!(
keys,
vec!["cairn.project".to_owned(), "cairn.task_id".to_owned()]
);
}
#[test]
fn invalid_public_state_fails_loud() {
let err =
build_execution_snapshot(eid(), &minimal_core("bogus"), HashMap::new()).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("public_state"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn invalid_lane_id_fails_loud() {
let mut core = minimal_core("waiting");
core.insert("lane_id".to_owned(), "lane\nbroken".to_owned());
let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("lane_id"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn missing_required_timestamps_fail_loud() {
for want in ["created_at", "last_mutation_at"] {
let mut core = minimal_core("waiting");
core.remove(want);
let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
}
other => panic!("expected Config for {want}, got {other:?}"),
}
}
}
#[test]
fn malformed_total_attempt_count_fails_loud() {
let mut core = minimal_core("waiting");
core.insert("total_attempt_count".to_owned(), "not-a-number".to_owned());
let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("total_attempt_count"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn attempt_id_without_index_fails_loud() {
let mut core = minimal_core("active");
core.insert(
"current_attempt_id".to_owned(),
AttemptId::new().to_string(),
);
let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("current_attempt_index"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn lease_without_epoch_fails_loud() {
let mut core = minimal_core("active");
core.insert(
"current_worker_instance_id".to_owned(),
"w-inst-1".to_owned(),
);
core.insert("lease_expires_at".to_owned(), "9000".to_owned());
let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("current_lease_epoch"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn lease_summary_requires_both_wid_and_expires_at() {
let mut core = minimal_core("active");
core.insert(
"current_worker_instance_id".to_owned(),
"w-inst-1".to_owned(),
);
let snap = build_execution_snapshot(eid(), &core, HashMap::new())
.unwrap()
.unwrap();
assert!(snap.current_lease.is_none());
core.insert("lease_expires_at".to_owned(), "9000".to_owned());
core.insert("current_lease_epoch".to_owned(), "3".to_owned());
let snap = build_execution_snapshot(eid(), &core, HashMap::new())
.unwrap()
.unwrap();
let lease = snap.current_lease.expect("lease present");
assert_eq!(lease.lease_epoch, LeaseEpoch::new(3));
assert_eq!(lease.expires_at.0, 9000);
assert_eq!(lease.worker_instance_id.as_str(), "w-inst-1");
}
fn fid() -> FlowId {
FlowId::new()
}
fn minimal_flow_core(id: &FlowId, state: &str) -> HashMap<String, String> {
let mut m = HashMap::new();
m.insert("flow_id".to_owned(), id.to_string());
m.insert("flow_kind".to_owned(), "dag".to_owned());
m.insert("namespace".to_owned(), "ns".to_owned());
m.insert("public_flow_state".to_owned(), state.to_owned());
m.insert("graph_revision".to_owned(), "0".to_owned());
m.insert("node_count".to_owned(), "0".to_owned());
m.insert("edge_count".to_owned(), "0".to_owned());
m.insert("created_at".to_owned(), "1000".to_owned());
m.insert("last_mutation_at".to_owned(), "1000".to_owned());
m
}
#[test]
fn open_flow_round_trips() {
let f = fid();
let snap = build_flow_snapshot(f.clone(), &minimal_flow_core(&f, "open")).unwrap();
assert_eq!(snap.flow_id, f);
assert_eq!(snap.flow_kind, "dag");
assert_eq!(snap.namespace.as_str(), "ns");
assert_eq!(snap.public_flow_state, "open");
assert_eq!(snap.graph_revision, 0);
assert_eq!(snap.node_count, 0);
assert_eq!(snap.edge_count, 0);
assert_eq!(snap.created_at.0, 1000);
assert_eq!(snap.last_mutation_at.0, 1000);
assert!(snap.cancelled_at.is_none());
assert!(snap.cancel_reason.is_none());
assert!(snap.cancellation_policy.is_none());
assert!(snap.tags.is_empty());
}
#[test]
fn cancelled_flow_surfaces_cancel_fields() {
let f = fid();
let mut core = minimal_flow_core(&f, "cancelled");
core.insert("cancelled_at".to_owned(), "2000".to_owned());
core.insert("cancel_reason".to_owned(), "operator".to_owned());
core.insert("cancellation_policy".to_owned(), "cancel_all".to_owned());
let snap = build_flow_snapshot(f, &core).unwrap();
assert_eq!(snap.public_flow_state, "cancelled");
assert_eq!(snap.cancelled_at.unwrap().0, 2000);
assert_eq!(snap.cancel_reason.as_deref(), Some("operator"));
assert_eq!(snap.cancellation_policy.as_deref(), Some("cancel_all"));
}
#[test]
fn namespaced_tags_routed_to_tags_map() {
let f = fid();
let mut core = minimal_flow_core(&f, "open");
core.insert("cairn.task_id".to_owned(), "t-1".to_owned());
core.insert("cairn.project".to_owned(), "proj".to_owned());
core.insert("operator.label".to_owned(), "v".to_owned());
let snap = build_flow_snapshot(f, &core).unwrap();
assert_eq!(snap.tags.len(), 3);
let keys: Vec<_> = snap.tags.keys().cloned().collect();
assert_eq!(
keys,
vec![
"cairn.project".to_owned(),
"cairn.task_id".to_owned(),
"operator.label".to_owned()
]
);
}
#[test]
fn unknown_flat_field_fails_loud() {
let f = fid();
let mut core = minimal_flow_core(&f, "open");
core.insert("bogus_future_field".to_owned(), "v".to_owned());
let err = build_flow_snapshot(f, &core).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert!(field.is_none(), "expected whole-object error, got field={field:?}");
assert!(msg.contains("bogus_future_field"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn missing_required_fields_fail_loud() {
for want in [
"flow_id",
"namespace",
"flow_kind",
"public_flow_state",
"graph_revision",
"node_count",
"edge_count",
"created_at",
"last_mutation_at",
] {
let f = fid();
let mut core = minimal_flow_core(&f, "open");
core.remove(want);
let err = build_flow_snapshot(f, &core).err().unwrap_or_else(|| {
panic!("field {want} should fail but build_flow_snapshot returned Ok")
});
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
}
other => panic!("expected Config for {want}, got {other:?}"),
}
}
}
#[test]
fn empty_required_strings_fail_loud() {
for want in ["flow_id", "namespace", "flow_kind", "public_flow_state"] {
let f = fid();
let mut core = minimal_flow_core(&f, "open");
core.insert(want.to_owned(), String::new());
let err = build_flow_snapshot(f, &core).err().unwrap_or_else(|| {
panic!("empty {want} should fail but build_flow_snapshot returned Ok")
});
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
}
other => panic!("expected Config for {want}, got {other:?}"),
}
}
}
#[test]
fn flow_id_mismatch_fails_loud() {
let requested = fid();
let other = fid();
let core = minimal_flow_core(&other, "open");
let err = build_flow_snapshot(requested, &core).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("flow_id"), "msg: {msg}");
assert!(msg.contains("does not match"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn malformed_counter_fails_loud() {
let f = fid();
let mut core = minimal_flow_core(&f, "open");
core.insert("graph_revision".to_owned(), "not-a-number".to_owned());
let err = build_flow_snapshot(f, &core).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("graph_revision"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
fn eids_for_flow(f: &FlowId) -> (ExecutionId, ExecutionId) {
let cfg = PartitionConfig::default();
(ExecutionId::for_flow(f, &cfg), ExecutionId::for_flow(f, &cfg))
}
fn minimal_edge_hash(
flow: &FlowId,
edge: &EdgeId,
up: &ExecutionId,
down: &ExecutionId,
) -> HashMap<String, String> {
let mut m = HashMap::new();
m.insert("edge_id".into(), edge.to_string());
m.insert("flow_id".into(), flow.to_string());
m.insert("upstream_execution_id".into(), up.to_string());
m.insert("downstream_execution_id".into(), down.to_string());
m.insert("dependency_kind".into(), "success_only".into());
m.insert("satisfaction_condition".into(), "all_required".into());
m.insert("data_passing_ref".into(), String::new());
m.insert("edge_state".into(), "pending".into());
m.insert("created_at".into(), "1234".into());
m.insert("created_by".into(), "engine".into());
m
}
#[test]
fn edge_round_trips_all_fields() {
let f = fid();
let edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let raw = minimal_edge_hash(&f, &edge, &up, &down);
let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
assert_eq!(snap.edge_id, edge);
assert_eq!(snap.flow_id, f);
assert_eq!(snap.upstream_execution_id, up);
assert_eq!(snap.downstream_execution_id, down);
assert_eq!(snap.dependency_kind, "success_only");
assert_eq!(snap.satisfaction_condition, "all_required");
assert!(snap.data_passing_ref.is_none());
assert_eq!(snap.edge_state, "pending");
assert_eq!(snap.created_at.0, 1234);
assert_eq!(snap.created_by, "engine");
}
#[test]
fn edge_data_passing_ref_round_trips_when_set() {
let f = fid();
let edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
raw.insert("data_passing_ref".into(), "ref://blob-42".into());
let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
assert_eq!(snap.data_passing_ref.as_deref(), Some("ref://blob-42"));
}
#[test]
fn edge_unknown_field_fails_loud() {
let f = fid();
let edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
raw.insert("bogus_future_field".into(), "v".into());
let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert!(field.is_none(), "expected whole-object error, got field={field:?}");
assert!(msg.contains("bogus_future_field"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn edge_flow_id_mismatch_fails_loud() {
let f = fid();
let other = fid();
let edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let raw = minimal_edge_hash(&other, &edge, &up, &down);
let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("flow_id"), "msg: {msg}");
assert!(msg.contains("does not match"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn edge_edge_id_mismatch_fails_loud() {
let f = fid();
let edge = EdgeId::new();
let other_edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let raw = minimal_edge_hash(&f, &other_edge, &up, &down);
let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("edge_id"), "msg: {msg}");
assert!(msg.contains("does not match"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn edge_missing_required_fields_fail_loud() {
for want in [
"edge_id",
"flow_id",
"upstream_execution_id",
"downstream_execution_id",
"dependency_kind",
"satisfaction_condition",
"edge_state",
"created_at",
"created_by",
] {
let f = fid();
let edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
raw.remove(want);
let err = build_edge_snapshot(&f, &edge, &raw)
.err()
.unwrap_or_else(|| panic!("missing {want} should fail"));
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
}
other => panic!("expected Config for {want}, got {other:?}"),
}
}
}
#[test]
fn edge_malformed_created_at_fails_loud() {
let f = fid();
let edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
raw.insert("created_at".into(), "not-a-number".into());
let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("created_at"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn edge_malformed_upstream_eid_fails_loud() {
let f = fid();
let edge = EdgeId::new();
let (up, down) = eids_for_flow(&f);
let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
raw.insert("upstream_execution_id".into(), "not-an-execution-id".into());
let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
match err {
SdkError::Config { field, message: msg, .. } => {
assert_eq!(field.as_deref(), Some("upstream_execution_id"), "msg: {msg}");
}
other => panic!("expected Config, got {other:?}"),
}
}
#[test]
fn namespaced_tag_matcher_boundaries() {
assert!(is_namespaced_tag_key("cairn.task_id"));
assert!(is_namespaced_tag_key("a.b"));
assert!(is_namespaced_tag_key("ab_12.field"));
assert!(!is_namespaced_tag_key("cairn_task_id"));
assert!(!is_namespaced_tag_key("Cairn.task"));
assert!(!is_namespaced_tag_key("1cairn.task"));
assert!(!is_namespaced_tag_key(""));
assert!(!is_namespaced_tag_key(".x"));
assert!(!is_namespaced_tag_key("caIrn.task"));
}
}