ff_sdk/snapshot.rs
1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! **RFC-012 Stage 1c T3 + issue #160:** this module is a pure thin
4//! forwarder onto the `EngineBackend` trait. The decoders
5//! (`build_execution_snapshot`, `build_flow_snapshot`,
6//! `build_edge_snapshot`) live in `ff_core::contracts::decode` and
7//! the pipeline bodies that invoke them live on
8//! [`ff_backend_valkey::ValkeyBackend`] as `EngineBackend` trait
9//! methods. The five `describe_*` / `list_*_edges` functions here are
10//! 3-line delegations to `self.backend`; `list_incoming_edges` /
11//! `list_outgoing_edges` keep their `resolve_flow_id` step (the
12//! trait's `list_edges` requires a `FlowId` argument) but route it
13//! through [`EngineBackend::resolve_execution_flow_id`] plus a local
14//! RFC-011 co-location cross-check.
15//!
16//! See issue #58 for the strategic context (engine-surface sealing
17//! toward the Postgres backend port); issue #160 closed out the last
18//! raw-client call sites so `FlowFabricWorker::client()` could be
19//! deleted.
20//!
21//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
22//! `FlowSnapshot`, `EdgeSnapshot`) live in [`ff_core::contracts`] so
23//! non-SDK consumers (tests, REST server, alternate backends) share
24//! them without depending on ff-sdk.
25
26use ff_core::contracts::{
27 EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, ListExecutionsPage,
28 ListFlowsPage, ListLanesPage, ListSuspendedPage,
29};
30use ff_core::partition::{execution_partition, flow_partition, PartitionKey};
31use ff_core::types::{EdgeId, ExecutionId, FlowId, LaneId};
32
33use crate::SdkError;
34use crate::worker::FlowFabricWorker;
35
36impl FlowFabricWorker {
37 /// Read a typed snapshot of one execution. See
38 /// [`ExecutionSnapshot`] for field semantics.
39 ///
40 /// Post-T3 this is a thin forwarder onto the bundled
41 /// [`EngineBackend::describe_execution`](ff_core::engine_backend::EngineBackend::describe_execution)
42 /// impl; the HGETALL pipeline body and strict-parse decoder both
43 /// live in `ff-backend-valkey` / `ff-core` so alternate backends
44 /// can serve the same call shape. Parse failures surface as
45 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
46 pub async fn describe_execution(
47 &self,
48 id: &ExecutionId,
49 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
50 Ok(self.backend_ref().describe_execution(id).await?)
51 }
52
53 /// Read a typed snapshot of one flow. See [`FlowSnapshot`] for
54 /// field semantics.
55 ///
56 /// Post-T3 this is a thin forwarder onto the bundled
57 /// [`EngineBackend::describe_flow`](ff_core::engine_backend::EngineBackend::describe_flow)
58 /// impl.
59 pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
60 Ok(self.backend_ref().describe_flow(id).await?)
61 }
62
63 /// RFC-016 Stage A: declare the inbound edge-group policy for a
64 /// downstream execution. Must be called BEFORE the first
65 /// `add_dependency(... -> downstream_execution_id)` on this flow.
66 ///
67 /// Stage A accepts only
68 /// [`EdgeDependencyPolicy::AllOf`](ff_core::contracts::EdgeDependencyPolicy::AllOf);
69 /// `AnyOf` / `Quorum` return a typed
70 /// [`EngineError::Validation`](ff_core::engine_error::EngineError::Validation)
71 /// error until Stage B's resolver lands.
72 pub async fn set_edge_group_policy(
73 &self,
74 flow_id: &FlowId,
75 downstream_execution_id: &ExecutionId,
76 policy: ff_core::contracts::EdgeDependencyPolicy,
77 ) -> Result<ff_core::contracts::SetEdgeGroupPolicyResult, SdkError> {
78 Ok(self
79 .backend_ref()
80 .set_edge_group_policy(flow_id, downstream_execution_id, policy)
81 .await?)
82 }
83
84 /// List flows on a partition with cursor-based pagination
85 /// (issue #185).
86 ///
87 /// `partition` is an opaque [`PartitionKey`] — typically obtained
88 /// from a [`crate::ClaimGrant`] or from
89 /// [`ff_core::partition::flow_partition`] when the caller already
90 /// knows a `FlowId` on the partition it wants to enumerate.
91 /// `cursor` is `None` on the first call and the previous page's
92 /// `next_cursor` on subsequent calls; iteration terminates when
93 /// the returned `next_cursor` is `None`.
94 ///
95 /// Thin forwarder onto
96 /// [`EngineBackend::list_flows`](ff_core::engine_backend::EngineBackend::list_flows).
97 pub async fn list_flows(
98 &self,
99 partition: PartitionKey,
100 cursor: Option<FlowId>,
101 limit: usize,
102 ) -> Result<ListFlowsPage, SdkError> {
103 Ok(self
104 .backend_ref()
105 .list_flows(partition, cursor, limit)
106 .await?)
107 }
108
109 /// Read a typed snapshot of one dependency edge.
110 ///
111 /// Takes both `flow_id` and `edge_id`: the edge hash is stored
112 /// under the flow's partition
113 /// (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`) and FF does not
114 /// maintain a global `edge_id -> flow_id` index. The caller
115 /// already knows the flow from the staging call result or the
116 /// consumer's own metadata.
117 ///
118 /// Returns `Ok(None)` when the edge hash is absent (never staged,
119 /// or staged under a different flow).
120 ///
121 /// Post-#160 this is a thin forwarder onto the bundled
122 /// [`EngineBackend::describe_edge`](ff_core::engine_backend::EngineBackend::describe_edge)
123 /// impl.
124 pub async fn describe_edge(
125 &self,
126 flow_id: &FlowId,
127 edge_id: &EdgeId,
128 ) -> Result<Option<EdgeSnapshot>, SdkError> {
129 Ok(self.backend_ref().describe_edge(flow_id, edge_id).await?)
130 }
131
132 /// List all outgoing dependency edges originating from an execution.
133 ///
134 /// Returns an empty `Vec` when the execution has no outgoing edges
135 /// — including standalone executions not attached to any flow.
136 ///
137 /// # Reads
138 ///
139 /// 1. `EngineBackend::resolve_execution_flow_id` (single HGET on
140 /// `exec_core.flow_id` for the Valkey backend).
141 /// 2. `EngineBackend::list_edges` on the resolved flow (SMEMBERS
142 /// + pipelined HGETALL on the flow's partition).
143 ///
144 /// Ordering is unspecified — the adjacency set is an unordered
145 /// SET. Callers that need deterministic order should sort by
146 /// [`EdgeSnapshot::edge_id`] or `created_at`.
147 pub async fn list_outgoing_edges(
148 &self,
149 upstream_eid: &ExecutionId,
150 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
151 let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
152 return Ok(Vec::new());
153 };
154 Ok(self
155 .backend_ref()
156 .list_edges(
157 &flow_id,
158 EdgeDirection::Outgoing {
159 from_node: upstream_eid.clone(),
160 },
161 )
162 .await?)
163 }
164
165 /// List all incoming dependency edges landing on an execution.
166 /// See [`list_outgoing_edges`] for the read shape.
167 pub async fn list_incoming_edges(
168 &self,
169 downstream_eid: &ExecutionId,
170 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
171 let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
172 return Ok(Vec::new());
173 };
174 Ok(self
175 .backend_ref()
176 .list_edges(
177 &flow_id,
178 EdgeDirection::Incoming {
179 to_node: downstream_eid.clone(),
180 },
181 )
182 .await?)
183 }
184
185 /// Enumerate registered lanes with cursor-based pagination.
186 ///
187 /// Thin forwarder onto
188 /// [`EngineBackend::list_lanes`](ff_core::engine_backend::EngineBackend::list_lanes).
189 /// Lanes are global (not partition-scoped); the Valkey backend
190 /// serves this from the `ff:idx:lanes` SET, sorts by lane name,
191 /// and returns a `limit`-sized page starting after `cursor`
192 /// (exclusive). Loop until [`ListLanesPage::next_cursor`] is
193 /// `None` to read the full registry.
194 pub async fn list_lanes(
195 &self,
196 cursor: Option<LaneId>,
197 limit: usize,
198 ) -> Result<ListLanesPage, SdkError> {
199 Ok(self.backend_ref().list_lanes(cursor, limit).await?)
200 }
201
202 /// List suspended executions in one partition, cursor-paginated,
203 /// with each entry's suspension `reason_code` populated (issue
204 /// #183).
205 ///
206 /// Thin forwarder onto the bundled
207 /// [`EngineBackend::list_suspended`](ff_core::engine_backend::EngineBackend::list_suspended)
208 /// impl. `cursor = None` starts a fresh scan; feed the returned
209 /// [`ListSuspendedPage::next_cursor`] back in to page forward until
210 /// it returns `None`. See
211 /// [`ff_core::contracts::SuspendedExecutionEntry`] for the per-row
212 /// fields (including the free-form `reason` code).
213 pub async fn list_suspended(
214 &self,
215 partition: PartitionKey,
216 cursor: Option<ExecutionId>,
217 limit: usize,
218 ) -> Result<ListSuspendedPage, SdkError> {
219 Ok(self
220 .backend_ref()
221 .list_suspended(partition, cursor, limit)
222 .await?)
223 }
224
225 /// Forward-only paginated listing of executions in a partition.
226 ///
227 /// Thin forwarder onto
228 /// [`EngineBackend::list_executions`](ff_core::engine_backend::EngineBackend::list_executions).
229 /// Pagination is cursor-based: pass `cursor = None` for the first
230 /// page and feed the returned
231 /// [`ListExecutionsPage::next_cursor`] back as `cursor` until it
232 /// returns `None`. See the trait rustdoc for the full contract.
233 pub async fn list_executions(
234 &self,
235 partition: PartitionKey,
236 cursor: Option<ExecutionId>,
237 limit: usize,
238 ) -> Result<ListExecutionsPage, SdkError> {
239 Ok(self
240 .backend_ref()
241 .list_executions(partition, cursor, limit)
242 .await?)
243 }
244
245 /// Resolve `exec_core.flow_id` via the trait and pin the RFC-011
246 /// co-location invariant
247 /// (`execution_partition(eid) == flow_partition(flow_id)`). A
248 /// parsed-but-wrong flow_id would otherwise silently route the
249 /// follow-up adjacency reads to the wrong partition and return
250 /// bogus empty results. A mismatch surfaces as
251 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
252 ///
253 /// Returns `None` when the execution has no owning flow
254 /// (standalone) or when exec_core is absent.
255 async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
256 let Some(flow_id) = self.backend_ref().resolve_execution_flow_id(eid).await? else {
257 return Ok(None);
258 };
259 let exec_partition_index = execution_partition(eid, self.partition_config()).index;
260 let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
261 if exec_partition_index != flow_partition_index {
262 return Err(SdkError::from(
263 ff_core::engine_error::EngineError::Validation {
264 kind: ff_core::engine_error::ValidationKind::Corruption,
265 detail: format!(
266 "list_edges: exec_core: flow_id: '{flow_id}' partition \
267 {flow_partition_index} does not match execution partition \
268 {exec_partition_index} (RFC-011 co-location violation; \
269 key corruption?)"
270 ),
271 },
272 ));
273 }
274 Ok(Some(flow_id))
275 }
276}