use ff_core::contracts::{
EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, ListExecutionsPage,
ListFlowsPage, ListLanesPage, ListSuspendedPage, RotateWaitpointHmacSecretAllArgs,
RotateWaitpointHmacSecretAllResult,
};
use ff_core::partition::{execution_partition, flow_partition, PartitionKey};
use ff_core::types::{EdgeId, ExecutionId, FlowId, LaneId};
use crate::SdkError;
use crate::worker::FlowFabricWorker;
impl FlowFabricWorker {
pub async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, SdkError> {
Ok(self.backend_ref().describe_execution(id).await?)
}
pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
Ok(self.backend_ref().describe_flow(id).await?)
}
pub async fn set_edge_group_policy(
&self,
flow_id: &FlowId,
downstream_execution_id: &ExecutionId,
policy: ff_core::contracts::EdgeDependencyPolicy,
) -> Result<ff_core::contracts::SetEdgeGroupPolicyResult, SdkError> {
Ok(self
.backend_ref()
.set_edge_group_policy(flow_id, downstream_execution_id, policy)
.await?)
}
pub async fn list_flows(
&self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Result<ListFlowsPage, SdkError> {
Ok(self
.backend_ref()
.list_flows(partition, cursor, limit)
.await?)
}
pub async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, SdkError> {
Ok(self.backend_ref().describe_edge(flow_id, edge_id).await?)
}
pub async fn list_outgoing_edges(
&self,
upstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError> {
let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
return Ok(Vec::new());
};
Ok(self
.backend_ref()
.list_edges(
&flow_id,
EdgeDirection::Outgoing {
from_node: upstream_eid.clone(),
},
)
.await?)
}
pub async fn list_incoming_edges(
&self,
downstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError> {
let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
return Ok(Vec::new());
};
Ok(self
.backend_ref()
.list_edges(
&flow_id,
EdgeDirection::Incoming {
to_node: downstream_eid.clone(),
},
)
.await?)
}
pub async fn list_lanes(
&self,
cursor: Option<LaneId>,
limit: usize,
) -> Result<ListLanesPage, SdkError> {
Ok(self.backend_ref().list_lanes(cursor, limit).await?)
}
pub async fn list_suspended(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListSuspendedPage, SdkError> {
Ok(self
.backend_ref()
.list_suspended(partition, cursor, limit)
.await?)
}
pub async fn list_executions(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, SdkError> {
Ok(self
.backend_ref()
.list_executions(partition, cursor, limit)
.await?)
}
pub async fn rotate_waitpoint_hmac_secret_all(
&self,
args: RotateWaitpointHmacSecretAllArgs,
) -> Result<RotateWaitpointHmacSecretAllResult, SdkError> {
Ok(self
.backend_ref()
.rotate_waitpoint_hmac_secret_all(args)
.await?)
}
async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
let Some(flow_id) = self.backend_ref().resolve_execution_flow_id(eid).await? else {
return Ok(None);
};
let exec_partition_index = execution_partition(eid, self.partition_config()).index;
let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
if exec_partition_index != flow_partition_index {
return Err(SdkError::from(
ff_core::engine_error::EngineError::Validation {
kind: ff_core::engine_error::ValidationKind::Corruption,
detail: format!(
"list_edges: exec_core: flow_id: '{flow_id}' partition \
{flow_partition_index} does not match execution partition \
{exec_partition_index} (RFC-011 co-location violation; \
key corruption?)"
),
},
));
}
Ok(Some(flow_id))
}
}