ff_sdk/snapshot.rs
1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! **RFC-012 Stage 1c T3 + issue #160:** this module is a pure thin
4//! forwarder onto the `EngineBackend` trait. The decoders
5//! (`build_execution_snapshot`, `build_flow_snapshot`,
6//! `build_edge_snapshot`) live in `ff_core::contracts::decode` and
7//! the pipeline bodies that invoke them live on
8//! [`ff_backend_valkey::ValkeyBackend`] as `EngineBackend` trait
9//! methods. The five `describe_*` / `list_*_edges` functions here are
10//! 3-line delegations to `self.backend`; `list_incoming_edges` /
11//! `list_outgoing_edges` keep their `resolve_flow_id` step (the
12//! trait's `list_edges` requires a `FlowId` argument) but route it
13//! through [`EngineBackend::resolve_execution_flow_id`] plus a local
14//! RFC-011 co-location cross-check.
15//!
16//! See issue #58 for the strategic context (engine-surface sealing
17//! toward the Postgres backend port); issue #160 closed out the last
18//! raw-client call sites so `FlowFabricWorker::client()` could be
19//! deleted.
20//!
21//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
22//! `FlowSnapshot`, `EdgeSnapshot`) live in [`ff_core::contracts`] so
23//! non-SDK consumers (tests, REST server, alternate backends) share
24//! them without depending on ff-sdk.
25
26use ff_core::contracts::{
27 EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, ListExecutionsPage,
28 ListFlowsPage, ListLanesPage, ListSuspendedPage,
29};
30use ff_core::partition::{execution_partition, flow_partition, PartitionKey};
31use ff_core::types::{EdgeId, ExecutionId, FlowId, LaneId};
32
33use crate::SdkError;
34use crate::worker::FlowFabricWorker;
35
36impl FlowFabricWorker {
37 /// Read a typed snapshot of one execution. See
38 /// [`ExecutionSnapshot`] for field semantics.
39 ///
40 /// Post-T3 this is a thin forwarder onto the bundled
41 /// [`EngineBackend::describe_execution`](ff_core::engine_backend::EngineBackend::describe_execution)
42 /// impl; the HGETALL pipeline body and strict-parse decoder both
43 /// live in `ff-backend-valkey` / `ff-core` so alternate backends
44 /// can serve the same call shape. Parse failures surface as
45 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
46 pub async fn describe_execution(
47 &self,
48 id: &ExecutionId,
49 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
50 Ok(self.backend_ref().describe_execution(id).await?)
51 }
52
53 /// Read a typed snapshot of one flow. See [`FlowSnapshot`] for
54 /// field semantics.
55 ///
56 /// Post-T3 this is a thin forwarder onto the bundled
57 /// [`EngineBackend::describe_flow`](ff_core::engine_backend::EngineBackend::describe_flow)
58 /// impl.
59 pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
60 Ok(self.backend_ref().describe_flow(id).await?)
61 }
62
63 /// List flows on a partition with cursor-based pagination
64 /// (issue #185).
65 ///
66 /// `partition` is an opaque [`PartitionKey`] — typically obtained
67 /// from a [`crate::ClaimGrant`] or from
68 /// [`ff_core::partition::flow_partition`] when the caller already
69 /// knows a `FlowId` on the partition it wants to enumerate.
70 /// `cursor` is `None` on the first call and the previous page's
71 /// `next_cursor` on subsequent calls; iteration terminates when
72 /// the returned `next_cursor` is `None`.
73 ///
74 /// Thin forwarder onto
75 /// [`EngineBackend::list_flows`](ff_core::engine_backend::EngineBackend::list_flows).
76 pub async fn list_flows(
77 &self,
78 partition: PartitionKey,
79 cursor: Option<FlowId>,
80 limit: usize,
81 ) -> Result<ListFlowsPage, SdkError> {
82 Ok(self
83 .backend_ref()
84 .list_flows(partition, cursor, limit)
85 .await?)
86 }
87
88 /// Read a typed snapshot of one dependency edge.
89 ///
90 /// Takes both `flow_id` and `edge_id`: the edge hash is stored
91 /// under the flow's partition
92 /// (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`) and FF does not
93 /// maintain a global `edge_id -> flow_id` index. The caller
94 /// already knows the flow from the staging call result or the
95 /// consumer's own metadata.
96 ///
97 /// Returns `Ok(None)` when the edge hash is absent (never staged,
98 /// or staged under a different flow).
99 ///
100 /// Post-#160 this is a thin forwarder onto the bundled
101 /// [`EngineBackend::describe_edge`](ff_core::engine_backend::EngineBackend::describe_edge)
102 /// impl.
103 pub async fn describe_edge(
104 &self,
105 flow_id: &FlowId,
106 edge_id: &EdgeId,
107 ) -> Result<Option<EdgeSnapshot>, SdkError> {
108 Ok(self.backend_ref().describe_edge(flow_id, edge_id).await?)
109 }
110
111 /// List all outgoing dependency edges originating from an execution.
112 ///
113 /// Returns an empty `Vec` when the execution has no outgoing edges
114 /// — including standalone executions not attached to any flow.
115 ///
116 /// # Reads
117 ///
118 /// 1. `EngineBackend::resolve_execution_flow_id` (single HGET on
119 /// `exec_core.flow_id` for the Valkey backend).
120 /// 2. `EngineBackend::list_edges` on the resolved flow (SMEMBERS
121 /// + pipelined HGETALL on the flow's partition).
122 ///
123 /// Ordering is unspecified — the adjacency set is an unordered
124 /// SET. Callers that need deterministic order should sort by
125 /// [`EdgeSnapshot::edge_id`] or `created_at`.
126 pub async fn list_outgoing_edges(
127 &self,
128 upstream_eid: &ExecutionId,
129 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
130 let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
131 return Ok(Vec::new());
132 };
133 Ok(self
134 .backend_ref()
135 .list_edges(
136 &flow_id,
137 EdgeDirection::Outgoing {
138 from_node: upstream_eid.clone(),
139 },
140 )
141 .await?)
142 }
143
144 /// List all incoming dependency edges landing on an execution.
145 /// See [`list_outgoing_edges`] for the read shape.
146 pub async fn list_incoming_edges(
147 &self,
148 downstream_eid: &ExecutionId,
149 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
150 let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
151 return Ok(Vec::new());
152 };
153 Ok(self
154 .backend_ref()
155 .list_edges(
156 &flow_id,
157 EdgeDirection::Incoming {
158 to_node: downstream_eid.clone(),
159 },
160 )
161 .await?)
162 }
163
164 /// Enumerate registered lanes with cursor-based pagination.
165 ///
166 /// Thin forwarder onto
167 /// [`EngineBackend::list_lanes`](ff_core::engine_backend::EngineBackend::list_lanes).
168 /// Lanes are global (not partition-scoped); the Valkey backend
169 /// serves this from the `ff:idx:lanes` SET, sorts by lane name,
170 /// and returns a `limit`-sized page starting after `cursor`
171 /// (exclusive). Loop until [`ListLanesPage::next_cursor`] is
172 /// `None` to read the full registry.
173 pub async fn list_lanes(
174 &self,
175 cursor: Option<LaneId>,
176 limit: usize,
177 ) -> Result<ListLanesPage, SdkError> {
178 Ok(self.backend_ref().list_lanes(cursor, limit).await?)
179 }
180
181 /// List suspended executions in one partition, cursor-paginated,
182 /// with each entry's suspension `reason_code` populated (issue
183 /// #183).
184 ///
185 /// Thin forwarder onto the bundled
186 /// [`EngineBackend::list_suspended`](ff_core::engine_backend::EngineBackend::list_suspended)
187 /// impl. `cursor = None` starts a fresh scan; feed the returned
188 /// [`ListSuspendedPage::next_cursor`] back in to page forward until
189 /// it returns `None`. See
190 /// [`ff_core::contracts::SuspendedExecutionEntry`] for the per-row
191 /// fields (including the free-form `reason` code).
192 pub async fn list_suspended(
193 &self,
194 partition: PartitionKey,
195 cursor: Option<ExecutionId>,
196 limit: usize,
197 ) -> Result<ListSuspendedPage, SdkError> {
198 Ok(self
199 .backend_ref()
200 .list_suspended(partition, cursor, limit)
201 .await?)
202 }
203
204 /// Forward-only paginated listing of executions in a partition.
205 ///
206 /// Thin forwarder onto
207 /// [`EngineBackend::list_executions`](ff_core::engine_backend::EngineBackend::list_executions).
208 /// Pagination is cursor-based: pass `cursor = None` for the first
209 /// page and feed the returned
210 /// [`ListExecutionsPage::next_cursor`] back as `cursor` until it
211 /// returns `None`. See the trait rustdoc for the full contract.
212 pub async fn list_executions(
213 &self,
214 partition: PartitionKey,
215 cursor: Option<ExecutionId>,
216 limit: usize,
217 ) -> Result<ListExecutionsPage, SdkError> {
218 Ok(self
219 .backend_ref()
220 .list_executions(partition, cursor, limit)
221 .await?)
222 }
223
224 /// Resolve `exec_core.flow_id` via the trait and pin the RFC-011
225 /// co-location invariant
226 /// (`execution_partition(eid) == flow_partition(flow_id)`). A
227 /// parsed-but-wrong flow_id would otherwise silently route the
228 /// follow-up adjacency reads to the wrong partition and return
229 /// bogus empty results. A mismatch surfaces as
230 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
231 ///
232 /// Returns `None` when the execution has no owning flow
233 /// (standalone) or when exec_core is absent.
234 async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
235 let Some(flow_id) = self.backend_ref().resolve_execution_flow_id(eid).await? else {
236 return Ok(None);
237 };
238 let exec_partition_index = execution_partition(eid, self.partition_config()).index;
239 let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
240 if exec_partition_index != flow_partition_index {
241 return Err(SdkError::from(
242 ff_core::engine_error::EngineError::Validation {
243 kind: ff_core::engine_error::ValidationKind::Corruption,
244 detail: format!(
245 "list_edges: exec_core: flow_id: '{flow_id}' partition \
246 {flow_partition_index} does not match execution partition \
247 {exec_partition_index} (RFC-011 co-location violation; \
248 key corruption?)"
249 ),
250 },
251 ));
252 }
253 Ok(Some(flow_id))
254 }
255}