use std::collections::HashMap;
use ff_core::contracts::{EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot};
use ff_core::keys::ExecKeyContext;
use ff_core::partition::{execution_partition, flow_partition};
use ff_core::types::{EdgeId, ExecutionId, FlowId};
use crate::SdkError;
use crate::worker::FlowFabricWorker;
impl FlowFabricWorker {
pub async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, SdkError> {
Ok(self.backend_ref().describe_execution(id).await?)
}
pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
Ok(self.backend_ref().describe_flow(id).await?)
}
pub async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, SdkError> {
let partition = flow_partition(flow_id, self.partition_config());
let ctx = ff_core::keys::FlowKeyContext::new(&partition, flow_id);
let edge_key = ctx.edge(edge_id);
let raw: HashMap<String, String> = self
.client()
.cmd("HGETALL")
.arg(&edge_key)
.execute()
.await
.map_err(|e| crate::backend_context(e, "describe_edge: HGETALL edge_hash"))?;
if raw.is_empty() {
return Ok(None);
}
ff_core::contracts::decode::build_edge_snapshot(flow_id, edge_id, &raw)
.map(Some)
.map_err(SdkError::from)
}
pub async fn list_outgoing_edges(
&self,
upstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError> {
let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
return Ok(Vec::new());
};
Ok(self
.backend_ref()
.list_edges(
&flow_id,
EdgeDirection::Outgoing {
from_node: upstream_eid.clone(),
},
)
.await?)
}
pub async fn list_incoming_edges(
&self,
downstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError> {
let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
return Ok(Vec::new());
};
Ok(self
.backend_ref()
.list_edges(
&flow_id,
EdgeDirection::Incoming {
to_node: downstream_eid.clone(),
},
)
.await?)
}
async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
let exec_partition = execution_partition(eid, self.partition_config());
let ctx = ExecKeyContext::new(&exec_partition, eid);
let raw: Option<String> = self
.client()
.cmd("HGET")
.arg(ctx.core())
.arg("flow_id")
.execute()
.await
.map_err(|e| crate::backend_context(e, "list_edges: HGET exec_core.flow_id"))?;
let Some(raw) = raw.filter(|s| !s.is_empty()) else {
return Ok(None);
};
let flow_id = FlowId::parse(&raw).map_err(|e| {
SdkError::from(ff_core::engine_error::EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!(
"list_edges: exec_core: flow_id: '{raw}' is not a valid UUID \
(key corruption?): {e}"
),
})
})?;
let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
if exec_partition.index != flow_partition_index {
return Err(SdkError::from(
ff_core::engine_error::EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!(
"list_edges: exec_core: flow_id: '{flow_id}' partition \
{flow_partition_index} does not match execution partition {} \
(RFC-011 co-location violation; key corruption?)",
exec_partition.index
),
},
));
}
Ok(Some(flow_id))
}
}