ff_sdk/snapshot.rs
1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! **RFC-012 Stage 1c T3 + issue #160:** this module is a pure thin
4//! forwarder onto the `EngineBackend` trait. The decoders
5//! (`build_execution_snapshot`, `build_flow_snapshot`,
6//! `build_edge_snapshot`) live in `ff_core::contracts::decode` and
7//! the pipeline bodies that invoke them live on
8//! [`ff_backend_valkey::ValkeyBackend`] as `EngineBackend` trait
9//! methods. The five `describe_*` / `list_*_edges` functions here are
10//! 3-line delegations to `self.backend`; `list_incoming_edges` /
11//! `list_outgoing_edges` keep their `resolve_flow_id` step (the
12//! trait's `list_edges` requires a `FlowId` argument) but route it
13//! through [`EngineBackend::resolve_execution_flow_id`] plus a local
14//! RFC-011 co-location cross-check.
15//!
16//! See issue #58 for the strategic context (engine-surface sealing
17//! toward the Postgres backend port); issue #160 closed out the last
18//! raw-client call sites so `FlowFabricWorker::client()` could be
19//! deleted.
20//!
21//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
22//! `FlowSnapshot`, `EdgeSnapshot`) live in [`ff_core::contracts`] so
23//! non-SDK consumers (tests, REST server, alternate backends) share
24//! them without depending on ff-sdk.
25
26use ff_core::contracts::{
27 EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, ListExecutionsPage,
28 ListFlowsPage, ListLanesPage, ListSuspendedPage, RotateWaitpointHmacSecretAllArgs,
29 RotateWaitpointHmacSecretAllResult,
30};
31use ff_core::partition::{execution_partition, flow_partition, PartitionKey};
32use ff_core::types::{EdgeId, ExecutionId, FlowId, LaneId};
33
34use crate::SdkError;
35use crate::worker::FlowFabricWorker;
36
37impl FlowFabricWorker {
38 /// Read a typed snapshot of one execution. See
39 /// [`ExecutionSnapshot`] for field semantics.
40 ///
41 /// Post-T3 this is a thin forwarder onto the bundled
42 /// [`EngineBackend::describe_execution`](ff_core::engine_backend::EngineBackend::describe_execution)
43 /// impl; the HGETALL pipeline body and strict-parse decoder both
44 /// live in `ff-backend-valkey` / `ff-core` so alternate backends
45 /// can serve the same call shape. Parse failures surface as
46 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
47 pub async fn describe_execution(
48 &self,
49 id: &ExecutionId,
50 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
51 Ok(self.backend_ref().describe_execution(id).await?)
52 }
53
54 /// Read a typed snapshot of one flow. See [`FlowSnapshot`] for
55 /// field semantics.
56 ///
57 /// Post-T3 this is a thin forwarder onto the bundled
58 /// [`EngineBackend::describe_flow`](ff_core::engine_backend::EngineBackend::describe_flow)
59 /// impl.
60 pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
61 Ok(self.backend_ref().describe_flow(id).await?)
62 }
63
64 /// RFC-016 Stage A: declare the inbound edge-group policy for a
65 /// downstream execution. Must be called BEFORE the first
66 /// `add_dependency(... -> downstream_execution_id)` on this flow.
67 ///
68 /// Stage A accepts only
69 /// [`EdgeDependencyPolicy::AllOf`](ff_core::contracts::EdgeDependencyPolicy::AllOf);
70 /// `AnyOf` / `Quorum` return a typed
71 /// [`EngineError::Validation`](ff_core::engine_error::EngineError::Validation)
72 /// error until Stage B's resolver lands.
73 pub async fn set_edge_group_policy(
74 &self,
75 flow_id: &FlowId,
76 downstream_execution_id: &ExecutionId,
77 policy: ff_core::contracts::EdgeDependencyPolicy,
78 ) -> Result<ff_core::contracts::SetEdgeGroupPolicyResult, SdkError> {
79 Ok(self
80 .backend_ref()
81 .set_edge_group_policy(flow_id, downstream_execution_id, policy)
82 .await?)
83 }
84
85 /// List flows on a partition with cursor-based pagination
86 /// (issue #185).
87 ///
88 /// `partition` is an opaque [`PartitionKey`] — typically obtained
89 /// from a [`crate::ClaimGrant`] or from
90 /// [`ff_core::partition::flow_partition`] when the caller already
91 /// knows a `FlowId` on the partition it wants to enumerate.
92 /// `cursor` is `None` on the first call and the previous page's
93 /// `next_cursor` on subsequent calls; iteration terminates when
94 /// the returned `next_cursor` is `None`.
95 ///
96 /// Thin forwarder onto
97 /// [`EngineBackend::list_flows`](ff_core::engine_backend::EngineBackend::list_flows).
98 pub async fn list_flows(
99 &self,
100 partition: PartitionKey,
101 cursor: Option<FlowId>,
102 limit: usize,
103 ) -> Result<ListFlowsPage, SdkError> {
104 Ok(self
105 .backend_ref()
106 .list_flows(partition, cursor, limit)
107 .await?)
108 }
109
110 /// Read a typed snapshot of one dependency edge.
111 ///
112 /// Takes both `flow_id` and `edge_id`: the edge hash is stored
113 /// under the flow's partition
114 /// (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`) and FF does not
115 /// maintain a global `edge_id -> flow_id` index. The caller
116 /// already knows the flow from the staging call result or the
117 /// consumer's own metadata.
118 ///
119 /// Returns `Ok(None)` when the edge hash is absent (never staged,
120 /// or staged under a different flow).
121 ///
122 /// Post-#160 this is a thin forwarder onto the bundled
123 /// [`EngineBackend::describe_edge`](ff_core::engine_backend::EngineBackend::describe_edge)
124 /// impl.
125 pub async fn describe_edge(
126 &self,
127 flow_id: &FlowId,
128 edge_id: &EdgeId,
129 ) -> Result<Option<EdgeSnapshot>, SdkError> {
130 Ok(self.backend_ref().describe_edge(flow_id, edge_id).await?)
131 }
132
133 /// List all outgoing dependency edges originating from an execution.
134 ///
135 /// Returns an empty `Vec` when the execution has no outgoing edges
136 /// — including standalone executions not attached to any flow.
137 ///
138 /// # Reads
139 ///
140 /// 1. `EngineBackend::resolve_execution_flow_id` (single HGET on
141 /// `exec_core.flow_id` for the Valkey backend).
142 /// 2. `EngineBackend::list_edges` on the resolved flow (SMEMBERS
143 /// + pipelined HGETALL on the flow's partition).
144 ///
145 /// Ordering is unspecified — the adjacency set is an unordered
146 /// SET. Callers that need deterministic order should sort by
147 /// [`EdgeSnapshot::edge_id`] or `created_at`.
148 pub async fn list_outgoing_edges(
149 &self,
150 upstream_eid: &ExecutionId,
151 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
152 let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
153 return Ok(Vec::new());
154 };
155 Ok(self
156 .backend_ref()
157 .list_edges(
158 &flow_id,
159 EdgeDirection::Outgoing {
160 from_node: upstream_eid.clone(),
161 },
162 )
163 .await?)
164 }
165
166 /// List all incoming dependency edges landing on an execution.
167 /// See [`list_outgoing_edges`] for the read shape.
168 pub async fn list_incoming_edges(
169 &self,
170 downstream_eid: &ExecutionId,
171 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
172 let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
173 return Ok(Vec::new());
174 };
175 Ok(self
176 .backend_ref()
177 .list_edges(
178 &flow_id,
179 EdgeDirection::Incoming {
180 to_node: downstream_eid.clone(),
181 },
182 )
183 .await?)
184 }
185
186 /// Enumerate registered lanes with cursor-based pagination.
187 ///
188 /// Thin forwarder onto
189 /// [`EngineBackend::list_lanes`](ff_core::engine_backend::EngineBackend::list_lanes).
190 /// Lanes are global (not partition-scoped); the Valkey backend
191 /// serves this from the `ff:idx:lanes` SET, sorts by lane name,
192 /// and returns a `limit`-sized page starting after `cursor`
193 /// (exclusive). Loop until [`ListLanesPage::next_cursor`] is
194 /// `None` to read the full registry.
195 pub async fn list_lanes(
196 &self,
197 cursor: Option<LaneId>,
198 limit: usize,
199 ) -> Result<ListLanesPage, SdkError> {
200 Ok(self.backend_ref().list_lanes(cursor, limit).await?)
201 }
202
203 /// List suspended executions in one partition, cursor-paginated,
204 /// with each entry's suspension `reason_code` populated (issue
205 /// #183).
206 ///
207 /// Thin forwarder onto the bundled
208 /// [`EngineBackend::list_suspended`](ff_core::engine_backend::EngineBackend::list_suspended)
209 /// impl. `cursor = None` starts a fresh scan; feed the returned
210 /// [`ListSuspendedPage::next_cursor`] back in to page forward until
211 /// it returns `None`. See
212 /// [`ff_core::contracts::SuspendedExecutionEntry`] for the per-row
213 /// fields (including the free-form `reason` code).
214 pub async fn list_suspended(
215 &self,
216 partition: PartitionKey,
217 cursor: Option<ExecutionId>,
218 limit: usize,
219 ) -> Result<ListSuspendedPage, SdkError> {
220 Ok(self
221 .backend_ref()
222 .list_suspended(partition, cursor, limit)
223 .await?)
224 }
225
226 /// Forward-only paginated listing of executions in a partition.
227 ///
228 /// Thin forwarder onto
229 /// [`EngineBackend::list_executions`](ff_core::engine_backend::EngineBackend::list_executions).
230 /// Pagination is cursor-based: pass `cursor = None` for the first
231 /// page and feed the returned
232 /// [`ListExecutionsPage::next_cursor`] back as `cursor` until it
233 /// returns `None`. See the trait rustdoc for the full contract.
234 pub async fn list_executions(
235 &self,
236 partition: PartitionKey,
237 cursor: Option<ExecutionId>,
238 limit: usize,
239 ) -> Result<ListExecutionsPage, SdkError> {
240 Ok(self
241 .backend_ref()
242 .list_executions(partition, cursor, limit)
243 .await?)
244 }
245
246 /// Cluster-wide rotation of the waitpoint HMAC signing kid
247 /// (v0.7 Q4).
248 ///
249 /// Thin forwarder onto
250 /// [`EngineBackend::rotate_waitpoint_hmac_secret_all`](ff_core::engine_backend::EngineBackend::rotate_waitpoint_hmac_secret_all).
251 /// On the Valkey backend this fans out one FCALL per execution
252 /// partition; on the Postgres backend (Wave 4) this resolves to a
253 /// single global INSERT. Consumers call this for the clean
254 /// "rotate everywhere" semantic — the older per-partition
255 /// [`crate::admin::rotate_waitpoint_hmac_secret_all_partitions`]
256 /// free function stays available for direct-Valkey callers on
257 /// legacy SDKs.
258 pub async fn rotate_waitpoint_hmac_secret_all(
259 &self,
260 args: RotateWaitpointHmacSecretAllArgs,
261 ) -> Result<RotateWaitpointHmacSecretAllResult, SdkError> {
262 Ok(self
263 .backend_ref()
264 .rotate_waitpoint_hmac_secret_all(args)
265 .await?)
266 }
267
268 /// Resolve `exec_core.flow_id` via the trait and pin the RFC-011
269 /// co-location invariant
270 /// (`execution_partition(eid) == flow_partition(flow_id)`). A
271 /// parsed-but-wrong flow_id would otherwise silently route the
272 /// follow-up adjacency reads to the wrong partition and return
273 /// bogus empty results. A mismatch surfaces as
274 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
275 ///
276 /// Returns `None` when the execution has no owning flow
277 /// (standalone) or when exec_core is absent.
278 async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
279 let Some(flow_id) = self.backend_ref().resolve_execution_flow_id(eid).await? else {
280 return Ok(None);
281 };
282 let exec_partition_index = execution_partition(eid, self.partition_config()).index;
283 let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
284 if exec_partition_index != flow_partition_index {
285 return Err(SdkError::from(
286 ff_core::engine_error::EngineError::Validation {
287 kind: ff_core::engine_error::ValidationKind::Corruption,
288 detail: format!(
289 "list_edges: exec_core: flow_id: '{flow_id}' partition \
290 {flow_partition_index} does not match execution partition \
291 {exec_partition_index} (RFC-011 co-location violation; \
292 key corruption?)"
293 ),
294 },
295 ));
296 }
297 Ok(Some(flow_id))
298 }
299}