Skip to main content

ff_sdk/
snapshot.rs

1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! **RFC-012 Stage 1c T3 + issue #160:** this module is a pure thin
4//! forwarder onto the `EngineBackend` trait. The decoders
5//! (`build_execution_snapshot`, `build_flow_snapshot`,
6//! `build_edge_snapshot`) live in `ff_core::contracts::decode` and
7//! the pipeline bodies that invoke them live on
8//! [`ff_backend_valkey::ValkeyBackend`] as `EngineBackend` trait
9//! methods. The five `describe_*` / `list_*_edges` functions here are
10//! 3-line delegations to `self.backend`; `list_incoming_edges` /
11//! `list_outgoing_edges` keep their `resolve_flow_id` step (the
12//! trait's `list_edges` requires a `FlowId` argument) but route it
13//! through [`EngineBackend::resolve_execution_flow_id`] plus a local
14//! RFC-011 co-location cross-check.
15//!
16//! See issue #58 for the strategic context (engine-surface sealing
17//! toward the Postgres backend port); issue #160 closed out the last
18//! raw-client call sites so `FlowFabricWorker::client()` could be
19//! deleted.
20//!
21//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
22//! `FlowSnapshot`, `EdgeSnapshot`) live in [`ff_core::contracts`] so
23//! non-SDK consumers (tests, REST server, alternate backends) share
24//! them without depending on ff-sdk.
25
26use ff_core::contracts::{
27    EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, ListExecutionsPage,
28    ListFlowsPage, ListLanesPage, ListSuspendedPage,
29};
30use ff_core::partition::{execution_partition, flow_partition, PartitionKey};
31use ff_core::types::{EdgeId, ExecutionId, FlowId, LaneId};
32
33use crate::SdkError;
34use crate::worker::FlowFabricWorker;
35
36impl FlowFabricWorker {
37    /// Read a typed snapshot of one execution. See
38    /// [`ExecutionSnapshot`] for field semantics.
39    ///
40    /// Post-T3 this is a thin forwarder onto the bundled
41    /// [`EngineBackend::describe_execution`](ff_core::engine_backend::EngineBackend::describe_execution)
42    /// impl; the HGETALL pipeline body and strict-parse decoder both
43    /// live in `ff-backend-valkey` / `ff-core` so alternate backends
44    /// can serve the same call shape. Parse failures surface as
45    /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
46    pub async fn describe_execution(
47        &self,
48        id: &ExecutionId,
49    ) -> Result<Option<ExecutionSnapshot>, SdkError> {
50        Ok(self.backend_ref().describe_execution(id).await?)
51    }
52
53    /// Read a typed snapshot of one flow. See [`FlowSnapshot`] for
54    /// field semantics.
55    ///
56    /// Post-T3 this is a thin forwarder onto the bundled
57    /// [`EngineBackend::describe_flow`](ff_core::engine_backend::EngineBackend::describe_flow)
58    /// impl.
59    pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
60        Ok(self.backend_ref().describe_flow(id).await?)
61    }
62
63    /// List flows on a partition with cursor-based pagination
64    /// (issue #185).
65    ///
66    /// `partition` is an opaque [`PartitionKey`] — typically obtained
67    /// from a [`crate::ClaimGrant`] or from
68    /// [`ff_core::partition::flow_partition`] when the caller already
69    /// knows a `FlowId` on the partition it wants to enumerate.
70    /// `cursor` is `None` on the first call and the previous page's
71    /// `next_cursor` on subsequent calls; iteration terminates when
72    /// the returned `next_cursor` is `None`.
73    ///
74    /// Thin forwarder onto
75    /// [`EngineBackend::list_flows`](ff_core::engine_backend::EngineBackend::list_flows).
76    pub async fn list_flows(
77        &self,
78        partition: PartitionKey,
79        cursor: Option<FlowId>,
80        limit: usize,
81    ) -> Result<ListFlowsPage, SdkError> {
82        Ok(self
83            .backend_ref()
84            .list_flows(partition, cursor, limit)
85            .await?)
86    }
87
88    /// Read a typed snapshot of one dependency edge.
89    ///
90    /// Takes both `flow_id` and `edge_id`: the edge hash is stored
91    /// under the flow's partition
92    /// (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`) and FF does not
93    /// maintain a global `edge_id -> flow_id` index. The caller
94    /// already knows the flow from the staging call result or the
95    /// consumer's own metadata.
96    ///
97    /// Returns `Ok(None)` when the edge hash is absent (never staged,
98    /// or staged under a different flow).
99    ///
100    /// Post-#160 this is a thin forwarder onto the bundled
101    /// [`EngineBackend::describe_edge`](ff_core::engine_backend::EngineBackend::describe_edge)
102    /// impl.
103    pub async fn describe_edge(
104        &self,
105        flow_id: &FlowId,
106        edge_id: &EdgeId,
107    ) -> Result<Option<EdgeSnapshot>, SdkError> {
108        Ok(self.backend_ref().describe_edge(flow_id, edge_id).await?)
109    }
110
111    /// List all outgoing dependency edges originating from an execution.
112    ///
113    /// Returns an empty `Vec` when the execution has no outgoing edges
114    /// — including standalone executions not attached to any flow.
115    ///
116    /// # Reads
117    ///
118    /// 1. `EngineBackend::resolve_execution_flow_id` (single HGET on
119    ///    `exec_core.flow_id` for the Valkey backend).
120    /// 2. `EngineBackend::list_edges` on the resolved flow (SMEMBERS
121    ///    + pipelined HGETALL on the flow's partition).
122    ///
123    /// Ordering is unspecified — the adjacency set is an unordered
124    /// SET. Callers that need deterministic order should sort by
125    /// [`EdgeSnapshot::edge_id`] or `created_at`.
126    pub async fn list_outgoing_edges(
127        &self,
128        upstream_eid: &ExecutionId,
129    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
130        let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
131            return Ok(Vec::new());
132        };
133        Ok(self
134            .backend_ref()
135            .list_edges(
136                &flow_id,
137                EdgeDirection::Outgoing {
138                    from_node: upstream_eid.clone(),
139                },
140            )
141            .await?)
142    }
143
144    /// List all incoming dependency edges landing on an execution.
145    /// See [`list_outgoing_edges`] for the read shape.
146    pub async fn list_incoming_edges(
147        &self,
148        downstream_eid: &ExecutionId,
149    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
150        let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
151            return Ok(Vec::new());
152        };
153        Ok(self
154            .backend_ref()
155            .list_edges(
156                &flow_id,
157                EdgeDirection::Incoming {
158                    to_node: downstream_eid.clone(),
159                },
160            )
161            .await?)
162    }
163
164    /// Enumerate registered lanes with cursor-based pagination.
165    ///
166    /// Thin forwarder onto
167    /// [`EngineBackend::list_lanes`](ff_core::engine_backend::EngineBackend::list_lanes).
168    /// Lanes are global (not partition-scoped); the Valkey backend
169    /// serves this from the `ff:idx:lanes` SET, sorts by lane name,
170    /// and returns a `limit`-sized page starting after `cursor`
171    /// (exclusive). Loop until [`ListLanesPage::next_cursor`] is
172    /// `None` to read the full registry.
173    pub async fn list_lanes(
174        &self,
175        cursor: Option<LaneId>,
176        limit: usize,
177    ) -> Result<ListLanesPage, SdkError> {
178        Ok(self.backend_ref().list_lanes(cursor, limit).await?)
179    }
180
181    /// List suspended executions in one partition, cursor-paginated,
182    /// with each entry's suspension `reason_code` populated (issue
183    /// #183).
184    ///
185    /// Thin forwarder onto the bundled
186    /// [`EngineBackend::list_suspended`](ff_core::engine_backend::EngineBackend::list_suspended)
187    /// impl. `cursor = None` starts a fresh scan; feed the returned
188    /// [`ListSuspendedPage::next_cursor`] back in to page forward until
189    /// it returns `None`. See
190    /// [`ff_core::contracts::SuspendedExecutionEntry`] for the per-row
191    /// fields (including the free-form `reason` code).
192    pub async fn list_suspended(
193        &self,
194        partition: PartitionKey,
195        cursor: Option<ExecutionId>,
196        limit: usize,
197    ) -> Result<ListSuspendedPage, SdkError> {
198        Ok(self
199            .backend_ref()
200            .list_suspended(partition, cursor, limit)
201            .await?)
202    }
203
204    /// Forward-only paginated listing of executions in a partition.
205    ///
206    /// Thin forwarder onto
207    /// [`EngineBackend::list_executions`](ff_core::engine_backend::EngineBackend::list_executions).
208    /// Pagination is cursor-based: pass `cursor = None` for the first
209    /// page and feed the returned
210    /// [`ListExecutionsPage::next_cursor`] back as `cursor` until it
211    /// returns `None`. See the trait rustdoc for the full contract.
212    pub async fn list_executions(
213        &self,
214        partition: PartitionKey,
215        cursor: Option<ExecutionId>,
216        limit: usize,
217    ) -> Result<ListExecutionsPage, SdkError> {
218        Ok(self
219            .backend_ref()
220            .list_executions(partition, cursor, limit)
221            .await?)
222    }
223
224    /// Resolve `exec_core.flow_id` via the trait and pin the RFC-011
225    /// co-location invariant
226    /// (`execution_partition(eid) == flow_partition(flow_id)`). A
227    /// parsed-but-wrong flow_id would otherwise silently route the
228    /// follow-up adjacency reads to the wrong partition and return
229    /// bogus empty results. A mismatch surfaces as
230    /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
231    ///
232    /// Returns `None` when the execution has no owning flow
233    /// (standalone) or when exec_core is absent.
234    async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
235        let Some(flow_id) = self.backend_ref().resolve_execution_flow_id(eid).await? else {
236            return Ok(None);
237        };
238        let exec_partition_index = execution_partition(eid, self.partition_config()).index;
239        let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
240        if exec_partition_index != flow_partition_index {
241            return Err(SdkError::from(
242                ff_core::engine_error::EngineError::Validation {
243                    kind: ff_core::engine_error::ValidationKind::Corruption,
244                    detail: format!(
245                        "list_edges: exec_core: flow_id: '{flow_id}' partition \
246                         {flow_partition_index} does not match execution partition \
247                         {exec_partition_index} (RFC-011 co-location violation; \
248                         key corruption?)"
249                    ),
250                },
251            ));
252        }
253        Ok(Some(flow_id))
254    }
255}