Skip to main content

ff_sdk/
snapshot.rs

1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! **RFC-012 Stage 1c T3 + issue #160:** this module is a pure thin
4//! forwarder onto the `EngineBackend` trait. The decoders
5//! (`build_execution_snapshot`, `build_flow_snapshot`,
6//! `build_edge_snapshot`) live in `ff_core::contracts::decode` and
7//! the pipeline bodies that invoke them live on
8//! [`ff_backend_valkey::ValkeyBackend`] as `EngineBackend` trait
9//! methods. The five `describe_*` / `list_*_edges` functions here are
10//! 3-line delegations to `self.backend`; `list_incoming_edges` /
11//! `list_outgoing_edges` keep their `resolve_flow_id` step (the
12//! trait's `list_edges` requires a `FlowId` argument) but route it
13//! through [`EngineBackend::resolve_execution_flow_id`] plus a local
14//! RFC-011 co-location cross-check.
15//!
16//! See issue #58 for the strategic context (engine-surface sealing
17//! toward the Postgres backend port); issue #160 closed out the last
18//! raw-client call sites so `FlowFabricWorker::client()` could be
19//! deleted.
20//!
21//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
22//! `FlowSnapshot`, `EdgeSnapshot`) live in [`ff_core::contracts`] so
23//! non-SDK consumers (tests, REST server, alternate backends) share
24//! them without depending on ff-sdk.
25
26use ff_core::contracts::{
27    EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, ListExecutionsPage,
28    ListFlowsPage, ListLanesPage, ListSuspendedPage, RotateWaitpointHmacSecretAllArgs,
29    RotateWaitpointHmacSecretAllResult,
30};
31use ff_core::partition::{execution_partition, flow_partition, PartitionKey};
32use ff_core::types::{EdgeId, ExecutionId, FlowId, LaneId};
33
34use crate::SdkError;
35use crate::worker::FlowFabricWorker;
36
37impl FlowFabricWorker {
38    /// Read a typed snapshot of one execution. See
39    /// [`ExecutionSnapshot`] for field semantics.
40    ///
41    /// Post-T3 this is a thin forwarder onto the bundled
42    /// [`EngineBackend::describe_execution`](ff_core::engine_backend::EngineBackend::describe_execution)
43    /// impl; the HGETALL pipeline body and strict-parse decoder both
44    /// live in `ff-backend-valkey` / `ff-core` so alternate backends
45    /// can serve the same call shape. Parse failures surface as
46    /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
47    pub async fn describe_execution(
48        &self,
49        id: &ExecutionId,
50    ) -> Result<Option<ExecutionSnapshot>, SdkError> {
51        Ok(self.backend_ref().describe_execution(id).await?)
52    }
53
54    /// Read a typed snapshot of one flow. See [`FlowSnapshot`] for
55    /// field semantics.
56    ///
57    /// Post-T3 this is a thin forwarder onto the bundled
58    /// [`EngineBackend::describe_flow`](ff_core::engine_backend::EngineBackend::describe_flow)
59    /// impl.
60    pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
61        Ok(self.backend_ref().describe_flow(id).await?)
62    }
63
64    /// RFC-016 Stage A: declare the inbound edge-group policy for a
65    /// downstream execution. Must be called BEFORE the first
66    /// `add_dependency(... -> downstream_execution_id)` on this flow.
67    ///
68    /// Stage A accepts only
69    /// [`EdgeDependencyPolicy::AllOf`](ff_core::contracts::EdgeDependencyPolicy::AllOf);
70    /// `AnyOf` / `Quorum` return a typed
71    /// [`EngineError::Validation`](ff_core::engine_error::EngineError::Validation)
72    /// error until Stage B's resolver lands.
73    pub async fn set_edge_group_policy(
74        &self,
75        flow_id: &FlowId,
76        downstream_execution_id: &ExecutionId,
77        policy: ff_core::contracts::EdgeDependencyPolicy,
78    ) -> Result<ff_core::contracts::SetEdgeGroupPolicyResult, SdkError> {
79        Ok(self
80            .backend_ref()
81            .set_edge_group_policy(flow_id, downstream_execution_id, policy)
82            .await?)
83    }
84
85    /// List flows on a partition with cursor-based pagination
86    /// (issue #185).
87    ///
88    /// `partition` is an opaque [`PartitionKey`] — typically obtained
89    /// from a [`crate::ClaimGrant`] or from
90    /// [`ff_core::partition::flow_partition`] when the caller already
91    /// knows a `FlowId` on the partition it wants to enumerate.
92    /// `cursor` is `None` on the first call and the previous page's
93    /// `next_cursor` on subsequent calls; iteration terminates when
94    /// the returned `next_cursor` is `None`.
95    ///
96    /// Thin forwarder onto
97    /// [`EngineBackend::list_flows`](ff_core::engine_backend::EngineBackend::list_flows).
98    pub async fn list_flows(
99        &self,
100        partition: PartitionKey,
101        cursor: Option<FlowId>,
102        limit: usize,
103    ) -> Result<ListFlowsPage, SdkError> {
104        Ok(self
105            .backend_ref()
106            .list_flows(partition, cursor, limit)
107            .await?)
108    }
109
110    /// Read a typed snapshot of one dependency edge.
111    ///
112    /// Takes both `flow_id` and `edge_id`: the edge hash is stored
113    /// under the flow's partition
114    /// (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`) and FF does not
115    /// maintain a global `edge_id -> flow_id` index. The caller
116    /// already knows the flow from the staging call result or the
117    /// consumer's own metadata.
118    ///
119    /// Returns `Ok(None)` when the edge hash is absent (never staged,
120    /// or staged under a different flow).
121    ///
122    /// Post-#160 this is a thin forwarder onto the bundled
123    /// [`EngineBackend::describe_edge`](ff_core::engine_backend::EngineBackend::describe_edge)
124    /// impl.
125    pub async fn describe_edge(
126        &self,
127        flow_id: &FlowId,
128        edge_id: &EdgeId,
129    ) -> Result<Option<EdgeSnapshot>, SdkError> {
130        Ok(self.backend_ref().describe_edge(flow_id, edge_id).await?)
131    }
132
133    /// List all outgoing dependency edges originating from an execution.
134    ///
135    /// Returns an empty `Vec` when the execution has no outgoing edges
136    /// — including standalone executions not attached to any flow.
137    ///
138    /// # Reads
139    ///
140    /// 1. `EngineBackend::resolve_execution_flow_id` (single HGET on
141    ///    `exec_core.flow_id` for the Valkey backend).
142    /// 2. `EngineBackend::list_edges` on the resolved flow (SMEMBERS
143    ///    + pipelined HGETALL on the flow's partition).
144    ///
145    /// Ordering is unspecified — the adjacency set is an unordered
146    /// SET. Callers that need deterministic order should sort by
147    /// [`EdgeSnapshot::edge_id`] or `created_at`.
148    pub async fn list_outgoing_edges(
149        &self,
150        upstream_eid: &ExecutionId,
151    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
152        let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
153            return Ok(Vec::new());
154        };
155        Ok(self
156            .backend_ref()
157            .list_edges(
158                &flow_id,
159                EdgeDirection::Outgoing {
160                    from_node: upstream_eid.clone(),
161                },
162            )
163            .await?)
164    }
165
166    /// List all incoming dependency edges landing on an execution.
167    /// See [`list_outgoing_edges`] for the read shape.
168    pub async fn list_incoming_edges(
169        &self,
170        downstream_eid: &ExecutionId,
171    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
172        let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
173            return Ok(Vec::new());
174        };
175        Ok(self
176            .backend_ref()
177            .list_edges(
178                &flow_id,
179                EdgeDirection::Incoming {
180                    to_node: downstream_eid.clone(),
181                },
182            )
183            .await?)
184    }
185
186    /// Enumerate registered lanes with cursor-based pagination.
187    ///
188    /// Thin forwarder onto
189    /// [`EngineBackend::list_lanes`](ff_core::engine_backend::EngineBackend::list_lanes).
190    /// Lanes are global (not partition-scoped); the Valkey backend
191    /// serves this from the `ff:idx:lanes` SET, sorts by lane name,
192    /// and returns a `limit`-sized page starting after `cursor`
193    /// (exclusive). Loop until [`ListLanesPage::next_cursor`] is
194    /// `None` to read the full registry.
195    pub async fn list_lanes(
196        &self,
197        cursor: Option<LaneId>,
198        limit: usize,
199    ) -> Result<ListLanesPage, SdkError> {
200        Ok(self.backend_ref().list_lanes(cursor, limit).await?)
201    }
202
203    /// List suspended executions in one partition, cursor-paginated,
204    /// with each entry's suspension `reason_code` populated (issue
205    /// #183).
206    ///
207    /// Thin forwarder onto the bundled
208    /// [`EngineBackend::list_suspended`](ff_core::engine_backend::EngineBackend::list_suspended)
209    /// impl. `cursor = None` starts a fresh scan; feed the returned
210    /// [`ListSuspendedPage::next_cursor`] back in to page forward until
211    /// it returns `None`. See
212    /// [`ff_core::contracts::SuspendedExecutionEntry`] for the per-row
213    /// fields (including the free-form `reason` code).
214    pub async fn list_suspended(
215        &self,
216        partition: PartitionKey,
217        cursor: Option<ExecutionId>,
218        limit: usize,
219    ) -> Result<ListSuspendedPage, SdkError> {
220        Ok(self
221            .backend_ref()
222            .list_suspended(partition, cursor, limit)
223            .await?)
224    }
225
226    /// Forward-only paginated listing of executions in a partition.
227    ///
228    /// Thin forwarder onto
229    /// [`EngineBackend::list_executions`](ff_core::engine_backend::EngineBackend::list_executions).
230    /// Pagination is cursor-based: pass `cursor = None` for the first
231    /// page and feed the returned
232    /// [`ListExecutionsPage::next_cursor`] back as `cursor` until it
233    /// returns `None`. See the trait rustdoc for the full contract.
234    pub async fn list_executions(
235        &self,
236        partition: PartitionKey,
237        cursor: Option<ExecutionId>,
238        limit: usize,
239    ) -> Result<ListExecutionsPage, SdkError> {
240        Ok(self
241            .backend_ref()
242            .list_executions(partition, cursor, limit)
243            .await?)
244    }
245
246    /// Cluster-wide rotation of the waitpoint HMAC signing kid
247    /// (v0.7 Q4).
248    ///
249    /// Thin forwarder onto
250    /// [`EngineBackend::rotate_waitpoint_hmac_secret_all`](ff_core::engine_backend::EngineBackend::rotate_waitpoint_hmac_secret_all).
251    /// On the Valkey backend this fans out one FCALL per execution
252    /// partition; on the Postgres backend (Wave 4) this resolves to a
253    /// single global INSERT. Consumers call this for the clean
254    /// "rotate everywhere" semantic — the older per-partition
255    /// [`crate::admin::rotate_waitpoint_hmac_secret_all_partitions`]
256    /// free function stays available for direct-Valkey callers on
257    /// legacy SDKs.
258    pub async fn rotate_waitpoint_hmac_secret_all(
259        &self,
260        args: RotateWaitpointHmacSecretAllArgs,
261    ) -> Result<RotateWaitpointHmacSecretAllResult, SdkError> {
262        Ok(self
263            .backend_ref()
264            .rotate_waitpoint_hmac_secret_all(args)
265            .await?)
266    }
267
268    /// Resolve `exec_core.flow_id` via the trait and pin the RFC-011
269    /// co-location invariant
270    /// (`execution_partition(eid) == flow_partition(flow_id)`). A
271    /// parsed-but-wrong flow_id would otherwise silently route the
272    /// follow-up adjacency reads to the wrong partition and return
273    /// bogus empty results. A mismatch surfaces as
274    /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
275    ///
276    /// Returns `None` when the execution has no owning flow
277    /// (standalone) or when exec_core is absent.
278    async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
279        let Some(flow_id) = self.backend_ref().resolve_execution_flow_id(eid).await? else {
280            return Ok(None);
281        };
282        let exec_partition_index = execution_partition(eid, self.partition_config()).index;
283        let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
284        if exec_partition_index != flow_partition_index {
285            return Err(SdkError::from(
286                ff_core::engine_error::EngineError::Validation {
287                    kind: ff_core::engine_error::ValidationKind::Corruption,
288                    detail: format!(
289                        "list_edges: exec_core: flow_id: '{flow_id}' partition \
290                         {flow_partition_index} does not match execution partition \
291                         {exec_partition_index} (RFC-011 co-location violation; \
292                         key corruption?)"
293                    ),
294                },
295            ));
296        }
297        Ok(Some(flow_id))
298    }
299}