ff_sdk/snapshot.rs
1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! **RFC-012 Stage 1c T3:** after T3 this module is thin forwarders.
4//! The decoders (`build_execution_snapshot`, `build_flow_snapshot`,
5//! `build_edge_snapshot`) live in `ff_core::contracts::decode` and
6//! the pipeline bodies that invoke them live on
7//! [`ff_backend_valkey::ValkeyBackend`] as `EngineBackend` trait
8//! methods. The three `describe_*` functions here are now 3-line
9//! delegations to `self.backend`; `list_incoming_edges` /
10//! `list_outgoing_edges` keep their `resolve_flow_id` step (the
11//! trait's `list_edges` requires a `FlowId` argument) and then call
12//! the trait method.
13//!
14//! See issue #58 for the strategic context (engine-surface sealing
15//! toward the Postgres backend port).
16//!
17//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
18//! `FlowSnapshot`, `EdgeSnapshot`) live in [`ff_core::contracts`] so
19//! non-SDK consumers (tests, REST server, alternate backends) share
20//! them without depending on ff-sdk.
21
22use std::collections::HashMap;
23
24use ff_core::contracts::{EdgeDirection, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot};
25use ff_core::keys::ExecKeyContext;
26use ff_core::partition::{execution_partition, flow_partition};
27use ff_core::types::{EdgeId, ExecutionId, FlowId};
28
29use crate::SdkError;
30use crate::worker::FlowFabricWorker;
31
32impl FlowFabricWorker {
33 /// Read a typed snapshot of one execution. See
34 /// [`ExecutionSnapshot`] for field semantics.
35 ///
36 /// Post-T3 this is a thin forwarder onto the bundled
37 /// [`EngineBackend::describe_execution`](ff_core::engine_backend::EngineBackend::describe_execution)
38 /// impl; the HGETALL pipeline body and strict-parse decoder both
39 /// live in `ff-backend-valkey` / `ff-core` so alternate backends
40 /// can serve the same call shape. Parse failures surface as
41 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
42 pub async fn describe_execution(
43 &self,
44 id: &ExecutionId,
45 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
46 Ok(self.backend_ref().describe_execution(id).await?)
47 }
48
49 /// Read a typed snapshot of one flow. See [`FlowSnapshot`] for
50 /// field semantics.
51 ///
52 /// Post-T3 this is a thin forwarder onto the bundled
53 /// [`EngineBackend::describe_flow`](ff_core::engine_backend::EngineBackend::describe_flow)
54 /// impl.
55 pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
56 Ok(self.backend_ref().describe_flow(id).await?)
57 }
58
59 /// Read a typed snapshot of one dependency edge.
60 ///
61 /// Takes both `flow_id` and `edge_id`: the edge hash is stored
62 /// under the flow's partition
63 /// (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`) and FF does not
64 /// maintain a global `edge_id -> flow_id` index. The caller
65 /// already knows the flow from the staging call result or the
66 /// consumer's own metadata.
67 ///
68 /// Returns `Ok(None)` when the edge hash is absent (never staged,
69 /// or staged under a different flow).
70 pub async fn describe_edge(
71 &self,
72 flow_id: &FlowId,
73 edge_id: &EdgeId,
74 ) -> Result<Option<EdgeSnapshot>, SdkError> {
75 let partition = flow_partition(flow_id, self.partition_config());
76 let ctx = ff_core::keys::FlowKeyContext::new(&partition, flow_id);
77 let edge_key = ctx.edge(edge_id);
78
79 let raw: HashMap<String, String> = self
80 .client()
81 .cmd("HGETALL")
82 .arg(&edge_key)
83 .execute()
84 .await
85 .map_err(|e| crate::backend_context(e, "describe_edge: HGETALL edge_hash"))?;
86
87 if raw.is_empty() {
88 return Ok(None);
89 }
90
91 ff_core::contracts::decode::build_edge_snapshot(flow_id, edge_id, &raw)
92 .map(Some)
93 .map_err(SdkError::from)
94 }
95
96 /// List all outgoing dependency edges originating from an execution.
97 ///
98 /// Returns an empty `Vec` when the execution has no outgoing edges
99 /// — including standalone executions not attached to any flow.
100 ///
101 /// # Reads
102 ///
103 /// 1. `HGET exec_core flow_id` (via [`Self::resolve_flow_id`]).
104 /// 2. `EngineBackend::list_edges` on the resolved flow (SMEMBERS
105 /// + pipelined HGETALL on the flow's partition).
106 ///
107 /// Ordering is unspecified — the adjacency set is an unordered
108 /// SET. Callers that need deterministic order should sort by
109 /// [`EdgeSnapshot::edge_id`] or `created_at`.
110 pub async fn list_outgoing_edges(
111 &self,
112 upstream_eid: &ExecutionId,
113 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
114 let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
115 return Ok(Vec::new());
116 };
117 Ok(self
118 .backend_ref()
119 .list_edges(
120 &flow_id,
121 EdgeDirection::Outgoing {
122 from_node: upstream_eid.clone(),
123 },
124 )
125 .await?)
126 }
127
128 /// List all incoming dependency edges landing on an execution.
129 /// See [`list_outgoing_edges`] for the read shape.
130 pub async fn list_incoming_edges(
131 &self,
132 downstream_eid: &ExecutionId,
133 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
134 let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
135 return Ok(Vec::new());
136 };
137 Ok(self
138 .backend_ref()
139 .list_edges(
140 &flow_id,
141 EdgeDirection::Incoming {
142 to_node: downstream_eid.clone(),
143 },
144 )
145 .await?)
146 }
147
148 /// `HGET exec_core.flow_id` and parse to a [`FlowId`]. `None` when
149 /// the exec_core hash is absent OR the flow_id field is empty
150 /// (standalone execution).
151 ///
152 /// Also pins the RFC-011 co-location invariant
153 /// (`execution_partition(eid) == flow_partition(flow_id)`) — a
154 /// parsed-but-wrong flow_id would otherwise silently route the
155 /// follow-up adjacency reads to the wrong partition and return
156 /// bogus empty results. A mismatch surfaces as
157 /// `SdkError::Engine(EngineError::Validation { kind: Corruption, .. })`.
158 async fn resolve_flow_id(&self, eid: &ExecutionId) -> Result<Option<FlowId>, SdkError> {
159 let exec_partition = execution_partition(eid, self.partition_config());
160 let ctx = ExecKeyContext::new(&exec_partition, eid);
161 let raw: Option<String> = self
162 .client()
163 .cmd("HGET")
164 .arg(ctx.core())
165 .arg("flow_id")
166 .execute()
167 .await
168 .map_err(|e| crate::backend_context(e, "list_edges: HGET exec_core.flow_id"))?;
169 let Some(raw) = raw.filter(|s| !s.is_empty()) else {
170 return Ok(None);
171 };
172 let flow_id = FlowId::parse(&raw).map_err(|e| {
173 SdkError::from(ff_core::engine_error::EngineError::Validation {
174 kind: ff_core::engine_error::ValidationKind::Corruption,
175 detail: format!(
176 "list_edges: exec_core: flow_id: '{raw}' is not a valid UUID \
177 (key corruption?): {e}"
178 ),
179 })
180 })?;
181 let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
182 if exec_partition.index != flow_partition_index {
183 return Err(SdkError::from(
184 ff_core::engine_error::EngineError::Validation {
185 kind: ff_core::engine_error::ValidationKind::Corruption,
186 detail: format!(
187 "list_edges: exec_core: flow_id: '{flow_id}' partition \
188 {flow_partition_index} does not match execution partition {} \
189 (RFC-011 co-location violation; key corruption?)",
190 exec_partition.index
191 ),
192 },
193 ));
194 }
195 Ok(Some(flow_id))
196 }
197}