Skip to main content

ff_sdk/
snapshot.rs

1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! **RFC-012 Stage 1c T3 + issue #160:** this module is a pure thin
4//! forwarder onto the `EngineBackend` trait. The decoders
5//! (`build_execution_snapshot`, `build_flow_snapshot`,
6//! `build_edge_snapshot`) live in `ff_core::contracts::decode` and
7//! the pipeline bodies that invoke them live on
8//! [`ff_backend_valkey::ValkeyBackend`] as `EngineBackend` trait
9//! methods. The five `describe_*` / `list_*_edges` functions here are
10//! 3-line delegations to `self.backend`; `list_incoming_edges` /
11//! `list_outgoing_edges` keep their `resolve_flow_id` step (the
12//! trait's `list_edges` requires a `FlowId` argument) but route it
13//! through [`EngineBackend::resolve_execution_flow_id`] plus a local
14//! RFC-011 co-location cross-check.
15//!
16//! See issue #58 for the strategic context (engine-surface sealing
17//! toward the Postgres backend port); issue #160 closed out the last
18//! raw-client call sites so `FlowFabricWorker::client()` could be
19//! deleted.
20//!
21//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
22//! `FlowSnapshot`, `EdgeSnapshot`) live in [`ff_core::contracts`] so
23//! non-SDK consumers (tests, REST server, alternate backends) share
24//! them without depending on ff-sdk.
25
26use ff_core::contracts::{
27    EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, ListExecutionsPage,
28    ListFlowsPage, ListLanesPage, ListSuspendedPage,
29};
30use ff_core::partition::{execution_partition, flow_partition, PartitionKey};
31use ff_core::types::{EdgeId, ExecutionId, FlowId, LaneId};
32
33use crate::SdkError;
34use crate::worker::FlowFabricWorker;
35
36impl FlowFabricWorker {
37    /// Read a typed snapshot of one execution. See
38    /// [`ExecutionSnapshot`] for field semantics.
39    ///
40    /// Post-T3 this is a thin forwarder onto the bundled
41    /// [`EngineBackend::describe_execution`](ff_core::engine_backend::EngineBackend::describe_execution)
42    /// impl; the HGETALL pipeline body and strict-parse decoder both
43    /// live in `ff-backend-valkey` / `ff-core` so alternate backends
44    /// can serve the same call shape. Parse failures surface as
45    /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
46    pub async fn describe_execution(
47        &self,
48        id: &ExecutionId,
49    ) -> Result<Option<ExecutionSnapshot>, SdkError> {
50        Ok(self.backend_ref().describe_execution(id).await?)
51    }
52
53    /// Read a typed snapshot of one flow. See [`FlowSnapshot`] for
54    /// field semantics.
55    ///
56    /// Post-T3 this is a thin forwarder onto the bundled
57    /// [`EngineBackend::describe_flow`](ff_core::engine_backend::EngineBackend::describe_flow)
58    /// impl.
59    pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
60        Ok(self.backend_ref().describe_flow(id).await?)
61    }
62
63    /// RFC-016 Stage A: declare the inbound edge-group policy for a
64    /// downstream execution. Must be called BEFORE the first
65    /// `add_dependency(... -> downstream_execution_id)` on this flow.
66    ///
67    /// Stage A accepts only
68    /// [`EdgeDependencyPolicy::AllOf`](ff_core::contracts::EdgeDependencyPolicy::AllOf);
69    /// `AnyOf` / `Quorum` return a typed
70    /// [`EngineError::Validation`](ff_core::engine_error::EngineError::Validation)
71    /// error until Stage B's resolver lands.
72    pub async fn set_edge_group_policy(
73        &self,
74        flow_id: &FlowId,
75        downstream_execution_id: &ExecutionId,
76        policy: ff_core::contracts::EdgeDependencyPolicy,
77    ) -> Result<ff_core::contracts::SetEdgeGroupPolicyResult, SdkError> {
78        Ok(self
79            .backend_ref()
80            .set_edge_group_policy(flow_id, downstream_execution_id, policy)
81            .await?)
82    }
83
84    /// List flows on a partition with cursor-based pagination
85    /// (issue #185).
86    ///
87    /// `partition` is an opaque [`PartitionKey`] — typically obtained
88    /// from a [`crate::ClaimGrant`] or from
89    /// [`ff_core::partition::flow_partition`] when the caller already
90    /// knows a `FlowId` on the partition it wants to enumerate.
91    /// `cursor` is `None` on the first call and the previous page's
92    /// `next_cursor` on subsequent calls; iteration terminates when
93    /// the returned `next_cursor` is `None`.
94    ///
95    /// Thin forwarder onto
96    /// [`EngineBackend::list_flows`](ff_core::engine_backend::EngineBackend::list_flows).
97    pub async fn list_flows(
98        &self,
99        partition: PartitionKey,
100        cursor: Option<FlowId>,
101        limit: usize,
102    ) -> Result<ListFlowsPage, SdkError> {
103        Ok(self
104            .backend_ref()
105            .list_flows(partition, cursor, limit)
106            .await?)
107    }
108
109    /// Read a typed snapshot of one dependency edge.
110    ///
111    /// Takes both `flow_id` and `edge_id`: the edge hash is stored
112    /// under the flow's partition
113    /// (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`) and FF does not
114    /// maintain a global `edge_id -> flow_id` index. The caller
115    /// already knows the flow from the staging call result or the
116    /// consumer's own metadata.
117    ///
118    /// Returns `Ok(None)` when the edge hash is absent (never staged,
119    /// or staged under a different flow).
120    ///
121    /// Post-#160 this is a thin forwarder onto the bundled
122    /// [`EngineBackend::describe_edge`](ff_core::engine_backend::EngineBackend::describe_edge)
123    /// impl.
124    pub async fn describe_edge(
125        &self,
126        flow_id: &FlowId,
127        edge_id: &EdgeId,
128    ) -> Result<Option<EdgeSnapshot>, SdkError> {
129        Ok(self.backend_ref().describe_edge(flow_id, edge_id).await?)
130    }
131
132    /// List all outgoing dependency edges originating from an execution.
133    ///
134    /// Returns an empty `Vec` when the execution has no outgoing edges
135    /// — including standalone executions not attached to any flow.
136    ///
137    /// # Reads
138    ///
139    /// 1. `EngineBackend::resolve_execution_flow_id` (single HGET on
140    ///    `exec_core.flow_id` for the Valkey backend).
141    /// 2. `EngineBackend::list_edges` on the resolved flow (SMEMBERS
142    ///    + pipelined HGETALL on the flow's partition).
143    ///
144    /// Ordering is unspecified — the adjacency set is an unordered
145    /// SET. Callers that need deterministic order should sort by
146    /// [`EdgeSnapshot::edge_id`] or `created_at`.
147    pub async fn list_outgoing_edges(
148        &self,
149        upstream_eid: &ExecutionId,
150    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
151        let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
152            return Ok(Vec::new());
153        };
154        Ok(self
155            .backend_ref()
156            .list_edges(
157                &flow_id,
158                EdgeDirection::Outgoing {
159                    from_node: upstream_eid.clone(),
160                },
161            )
162            .await?)
163    }
164
165    /// List all incoming dependency edges landing on an execution.
166    /// See [`list_outgoing_edges`] for the read shape.
167    pub async fn list_incoming_edges(
168        &self,
169        downstream_eid: &ExecutionId,
170    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
171        let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
172            return Ok(Vec::new());
173        };
174        Ok(self
175            .backend_ref()
176            .list_edges(
177                &flow_id,
178                EdgeDirection::Incoming {
179                    to_node: downstream_eid.clone(),
180                },
181            )
182            .await?)
183    }
184
185    /// Enumerate registered lanes with cursor-based pagination.
186    ///
187    /// Thin forwarder onto
188    /// [`EngineBackend::list_lanes`](ff_core::engine_backend::EngineBackend::list_lanes).
189    /// Lanes are global (not partition-scoped); the Valkey backend
190    /// serves this from the `ff:idx:lanes` SET, sorts by lane name,
191    /// and returns a `limit`-sized page starting after `cursor`
192    /// (exclusive). Loop until [`ListLanesPage::next_cursor`] is
193    /// `None` to read the full registry.
194    pub async fn list_lanes(
195        &self,
196        cursor: Option<LaneId>,
197        limit: usize,
198    ) -> Result<ListLanesPage, SdkError> {
199        Ok(self.backend_ref().list_lanes(cursor, limit).await?)
200    }
201
202    /// List suspended executions in one partition, cursor-paginated,
203    /// with each entry's suspension `reason_code` populated (issue
204    /// #183).
205    ///
206    /// Thin forwarder onto the bundled
207    /// [`EngineBackend::list_suspended`](ff_core::engine_backend::EngineBackend::list_suspended)
208    /// impl. `cursor = None` starts a fresh scan; feed the returned
209    /// [`ListSuspendedPage::next_cursor`] back in to page forward until
210    /// it returns `None`. See
211    /// [`ff_core::contracts::SuspendedExecutionEntry`] for the per-row
212    /// fields (including the free-form `reason` code).
213    pub async fn list_suspended(
214        &self,
215        partition: PartitionKey,
216        cursor: Option<ExecutionId>,
217        limit: usize,
218    ) -> Result<ListSuspendedPage, SdkError> {
219        Ok(self
220            .backend_ref()
221            .list_suspended(partition, cursor, limit)
222            .await?)
223    }
224
225    /// Forward-only paginated listing of executions in a partition.
226    ///
227    /// Thin forwarder onto
228    /// [`EngineBackend::list_executions`](ff_core::engine_backend::EngineBackend::list_executions).
229    /// Pagination is cursor-based: pass `cursor = None` for the first
230    /// page and feed the returned
231    /// [`ListExecutionsPage::next_cursor`] back as `cursor` until it
232    /// returns `None`. See the trait rustdoc for the full contract.
233    pub async fn list_executions(
234        &self,
235        partition: PartitionKey,
236        cursor: Option<ExecutionId>,
237        limit: usize,
238    ) -> Result<ListExecutionsPage, SdkError> {
239        Ok(self
240            .backend_ref()
241            .list_executions(partition, cursor, limit)
242            .await?)
243    }
244
245    /// Resolve `exec_core.flow_id` via the trait and pin the RFC-011
246    /// co-location invariant
247    /// (`execution_partition(eid) == flow_partition(flow_id)`). A
248    /// parsed-but-wrong flow_id would otherwise silently route the
249    /// follow-up adjacency reads to the wrong partition and return
250    /// bogus empty results. A mismatch surfaces as
251    /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
252    ///
253    /// Returns `None` when the execution has no owning flow
254    /// (standalone) or when exec_core is absent.
255    async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
256        let Some(flow_id) = self.backend_ref().resolve_execution_flow_id(eid).await? else {
257            return Ok(None);
258        };
259        let exec_partition_index = execution_partition(eid, self.partition_config()).index;
260        let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
261        if exec_partition_index != flow_partition_index {
262            return Err(SdkError::from(
263                ff_core::engine_error::EngineError::Validation {
264                    kind: ff_core::engine_error::ValidationKind::Corruption,
265                    detail: format!(
266                        "list_edges: exec_core: flow_id: '{flow_id}' partition \
267                         {flow_partition_index} does not match execution partition \
268                         {exec_partition_index} (RFC-011 co-location violation; \
269                         key corruption?)"
270                    ),
271                },
272            ));
273        }
274        Ok(Some(flow_id))
275    }
276}