Skip to main content

ff_sdk/
snapshot.rs

1//! Typed read-models that decouple consumers from FF's storage engine.
2//!
3//! Today the only call here is [`FlowFabricWorker::describe_execution`],
4//! which replaces the HGETALL-on-exec_core pattern downstream consumers
5//! rely on with a typed snapshot. The engine remains free to rename
6//! hash fields or restructure keys under this surface — see issue #58
7//! for the strategic context (decoupling ahead of a Postgres backend
8//! port).
9//!
10//! Snapshot types (`ExecutionSnapshot`, `AttemptSummary`, `LeaseSummary`,
11//! `FlowSnapshot`) live in [`ff_core::contracts`] so non-SDK consumers
12//! (tests, REST server, future alternate backends) can share them
13//! without depending on ff-sdk.
14
15use std::collections::{BTreeMap, HashMap};
16
17use ff_core::contracts::{
18    AttemptSummary, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, LeaseSummary,
19};
20use ff_core::keys::{ExecKeyContext, FlowKeyContext};
21use ff_core::partition::{execution_partition, flow_partition};
22use ff_core::state::PublicState;
23use ff_core::types::{
24    AttemptId, AttemptIndex, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch, Namespace,
25    TimestampMs, WaitpointId, WorkerInstanceId,
26};
27
28use crate::SdkError;
29use crate::worker::FlowFabricWorker;
30
31impl FlowFabricWorker {
32    /// Read a typed snapshot of one execution.
33    ///
34    /// Returns `Ok(None)` when no execution exists at `id` (exec_core
35    /// hash absent). Returns `Ok(Some(snapshot))` on success. Errors
36    /// propagate Valkey transport faults and decode failures.
37    ///
38    /// # Consistency
39    ///
40    /// The snapshot is assembled from two pipelined `HGETALL`s — one
41    /// for `exec_core`, one for the sibling tags hash — issued in a
42    /// single round trip against the partition holding the execution.
43    /// The two reads share a hash-tag so they always land on the same
44    /// Valkey slot in cluster mode. They are NOT MULTI/EXEC-atomic:
45    /// a concurrent FCALL that HSETs both keys can interleave, and a
46    /// caller may observe exec_core fields from epoch `N+1` alongside
47    /// tags from epoch `N` (or vice versa). This matches the
48    /// last-write-wins-per-field semantics every existing HGETALL
49    /// consumer already assumes.
50    ///
51    /// # Field semantics
52    ///
53    /// * `public_state` is the engine-maintained derived label written
54    ///   atomically by every state-mutating FCALL. Parsed from the
55    ///   snake_case string stored on exec_core.
56    /// * `blocking_reason` / `blocking_detail` — `None` when the
57    ///   exec_core field is the empty string (cleared on transition).
58    /// * `current_attempt` — `None` before the first claim (exec_core
59    ///   `current_attempt_id` empty).
60    /// * `current_lease` — `None` when no lease is held (typical for
61    ///   terminal, suspended, or pre-claim executions).
62    /// * `current_waitpoint` — `None` unless an active suspension has
63    ///   pinned a waitpoint id.
64    /// * `tags` — empty map if the tags hash is absent (common for
65    ///   executions created without `tags_json`).
66    pub async fn describe_execution(
67        &self,
68        id: &ExecutionId,
69    ) -> Result<Option<ExecutionSnapshot>, SdkError> {
70        let partition = execution_partition(id, self.partition_config());
71        let ctx = ExecKeyContext::new(&partition, id);
72        let core_key = ctx.core();
73        let tags_key = ctx.tags();
74
75        // Pipeline the two HGETALLs in one round trip. The two keys
76        // share `{fp:N}` so cluster mode routes them to the same slot.
77        let mut pipe = self.client().pipeline();
78        let core_slot = pipe
79            .cmd::<HashMap<String, String>>("HGETALL")
80            .arg(&core_key)
81            .finish();
82        let tags_slot = pipe
83            .cmd::<HashMap<String, String>>("HGETALL")
84            .arg(&tags_key)
85            .finish();
86        pipe.execute().await.map_err(|e| SdkError::ValkeyContext {
87            source: e,
88            context: "describe_execution: pipeline HGETALL exec_core + tags".into(),
89        })?;
90
91        let core = core_slot.value().map_err(|e| SdkError::ValkeyContext {
92            source: e,
93            context: "describe_execution: decode HGETALL exec_core".into(),
94        })?;
95        if core.is_empty() {
96            return Ok(None);
97        }
98        let tags_raw = tags_slot.value().map_err(|e| SdkError::ValkeyContext {
99            source: e,
100            context: "describe_execution: decode HGETALL tags".into(),
101        })?;
102
103        build_execution_snapshot(id.clone(), &core, tags_raw)
104    }
105}
106
107/// Assemble an [`ExecutionSnapshot`] from the raw HGETALL field maps.
108///
109/// Kept as a free function so a future unit test can feed synthetic
110/// maps without a live Valkey.
111fn build_execution_snapshot(
112    execution_id: ExecutionId,
113    core: &HashMap<String, String>,
114    tags_raw: HashMap<String, String>,
115) -> Result<Option<ExecutionSnapshot>, SdkError> {
116    let public_state = parse_public_state(opt_str(core, "public_state").unwrap_or(""))?;
117
118    // `LaneId::try_new` validates non-empty + ASCII-printable + <= 64 bytes.
119    // Exec_core writes a LaneId that already passed these invariants at
120    // ingress; a read that fails validation here signals on-disk
121    // corruption — surface it rather than silently constructing an
122    // invalid LaneId that would mis-partition downstream.
123    let lane_id = LaneId::try_new(opt_str(core, "lane_id").unwrap_or("")).map_err(|e| {
124        SdkError::Config {
125            context: "describe_execution: exec_core".into(),
126            field: Some("lane_id".into()),
127            message: format!("fails LaneId validation (key corruption?): {e}"),
128        }
129    })?;
130
131    let namespace_str = opt_str(core, "namespace").unwrap_or("").to_owned();
132    let namespace = Namespace::new(namespace_str);
133
134    let flow_id = opt_str(core, "flow_id")
135        .filter(|s| !s.is_empty())
136        .map(|s| {
137            FlowId::parse(s).map_err(|e| SdkError::Config {
138                context: "describe_execution: exec_core".into(),
139                field: Some("flow_id".into()),
140                message: format!("is not a valid UUID (key corruption?): {e}"),
141            })
142        })
143        .transpose()?;
144
145    let blocking_reason = opt_str(core, "blocking_reason")
146        .filter(|s| !s.is_empty())
147        .map(str::to_owned);
148    let blocking_detail = opt_str(core, "blocking_detail")
149        .filter(|s| !s.is_empty())
150        .map(str::to_owned);
151
152    // created_at + last_mutation_at are engine-maintained invariants
153    // (lua/execution.lua writes both on create; every mutating FCALL
154    // updates last_mutation_at). Missing values indicate on-disk
155    // corruption, not a valid pre-create state — fail loudly.
156    let created_at =
157        parse_ts(core, "describe_execution: exec_core", "created_at")?.ok_or_else(|| {
158            SdkError::Config {
159                context: "describe_execution: exec_core".into(),
160                field: Some("created_at".into()),
161                message: "is missing or empty (key corruption?)".into(),
162            }
163        })?;
164    let last_mutation_at = parse_ts(core, "describe_execution: exec_core", "last_mutation_at")?
165        .ok_or_else(|| SdkError::Config {
166            context: "describe_execution: exec_core".into(),
167            field: Some("last_mutation_at".into()),
168            message: "is missing or empty (key corruption?)".into(),
169        })?;
170
171    let total_attempt_count: u32 =
172        parse_u32_strict(core, "describe_execution: exec_core", "total_attempt_count")?
173            .unwrap_or(0);
174
175    let current_attempt = build_attempt_summary(core)?;
176    let current_lease = build_lease_summary(core)?;
177
178    let current_waitpoint = opt_str(core, "current_waitpoint_id")
179        .filter(|s| !s.is_empty())
180        .map(|s| {
181            WaitpointId::parse(s).map_err(|e| SdkError::Config {
182                context: "describe_execution: exec_core".into(),
183                field: Some("current_waitpoint_id".into()),
184                message: format!("is not a valid UUID (key corruption?): {e}"),
185            })
186        })
187        .transpose()?;
188
189    let tags: BTreeMap<String, String> = tags_raw.into_iter().collect();
190
191    Ok(Some(ExecutionSnapshot::new(
192        execution_id,
193        flow_id,
194        lane_id,
195        namespace,
196        public_state,
197        blocking_reason,
198        blocking_detail,
199        current_attempt,
200        current_lease,
201        current_waitpoint,
202        created_at,
203        last_mutation_at,
204        total_attempt_count,
205        tags,
206    )))
207}
208
209fn opt_str<'a>(map: &'a HashMap<String, String>, field: &str) -> Option<&'a str> {
210    map.get(field).map(String::as_str)
211}
212
213/// Strictly parse a ms-timestamp field. `Ok(None)` when absent/empty,
214/// `Err` on unparseable content. `context` names both the calling
215/// FCALL and the hash (e.g. `"describe_execution: exec_core"`) so
216/// error messages point to the exact source of corruption.
217fn parse_ts(
218    map: &HashMap<String, String>,
219    context: &str,
220    field: &str,
221) -> Result<Option<TimestampMs>, SdkError> {
222    match opt_str(map, field).filter(|s| !s.is_empty()) {
223        None => Ok(None),
224        Some(raw) => {
225            let ms: i64 = raw.parse().map_err(|e| SdkError::Config {
226                context: context.to_owned(),
227                field: Some(field.to_owned()),
228                message: format!("is not a valid ms timestamp ('{raw}'): {e}"),
229            })?;
230            Ok(Some(TimestampMs::from_millis(ms)))
231        }
232    }
233}
234
235/// Strictly parse a `u32` field. Returns `Ok(None)` when the field is
236/// absent or empty (a valid pre-write state), `Err` when the value is
237/// present but unparseable (on-disk corruption), `Ok(Some(v))` otherwise.
238/// `context` is the caller/hash prefix used in error messages.
239fn parse_u32_strict(
240    map: &HashMap<String, String>,
241    context: &str,
242    field: &str,
243) -> Result<Option<u32>, SdkError> {
244    match opt_str(map, field).filter(|s| !s.is_empty()) {
245        None => Ok(None),
246        Some(raw) => Ok(Some(raw.parse().map_err(|e| SdkError::Config {
247            context: context.to_owned(),
248            field: Some(field.to_owned()),
249            message: format!("is not a valid u32 ('{raw}'): {e}"),
250        })?)),
251    }
252}
253
254/// Strictly parse a `u64` field. Semantics mirror [`parse_u32_strict`].
255fn parse_u64_strict(
256    map: &HashMap<String, String>,
257    context: &str,
258    field: &str,
259) -> Result<Option<u64>, SdkError> {
260    match opt_str(map, field).filter(|s| !s.is_empty()) {
261        None => Ok(None),
262        Some(raw) => Ok(Some(raw.parse().map_err(|e| SdkError::Config {
263            context: context.to_owned(),
264            field: Some(field.to_owned()),
265            message: format!("is not a valid u64 ('{raw}'): {e}"),
266        })?)),
267    }
268}
269
270fn parse_public_state(raw: &str) -> Result<PublicState, SdkError> {
271    // exec_core stores the snake_case literal (e.g. "waiting"). PublicState's
272    // Deserialize accepts the JSON-quoted form, so wrap + delegate.
273    let quoted = format!("\"{raw}\"");
274    serde_json::from_str(&quoted).map_err(|e| SdkError::Config {
275        context: "describe_execution: exec_core".into(),
276        field: Some("public_state".into()),
277        message: format!("'{raw}' is not a known public state: {e}"),
278    })
279}
280
281fn build_attempt_summary(
282    core: &HashMap<String, String>,
283) -> Result<Option<AttemptSummary>, SdkError> {
284    let attempt_id_str = match opt_str(core, "current_attempt_id").filter(|s| !s.is_empty()) {
285        None => return Ok(None),
286        Some(s) => s,
287    };
288    let attempt_id = AttemptId::parse(attempt_id_str).map_err(|e| SdkError::Config {
289        context: "describe_execution: exec_core".into(),
290        field: Some("current_attempt_id".into()),
291        message: format!("is not a valid UUID: {e}"),
292    })?;
293    // When `current_attempt_id` is set, `current_attempt_index` MUST be
294    // set too — lua/execution.lua writes both atomically in
295    // `ff_claim_execution`. A missing index while the id is populated
296    // is corruption, not a valid intermediate state.
297    let attempt_index = parse_u32_strict(
298        core,
299        "describe_execution: exec_core",
300        "current_attempt_index",
301    )?
302    .ok_or_else(|| SdkError::Config {
303        context: "describe_execution: exec_core".into(),
304        field: Some("current_attempt_index".into()),
305        message: "is missing while current_attempt_id is set (key corruption?)".into(),
306    })?;
307    Ok(Some(AttemptSummary::new(
308        attempt_id,
309        AttemptIndex::new(attempt_index),
310    )))
311}
312
313fn build_lease_summary(core: &HashMap<String, String>) -> Result<Option<LeaseSummary>, SdkError> {
314    // A lease is "held" when the worker_instance_id field is populated
315    // AND lease_expires_at is set. Both clear together on revoke/expire
316    // (see clear_lease_and_indexes in lua/helpers.lua).
317    let wid_str = match opt_str(core, "current_worker_instance_id").filter(|s| !s.is_empty()) {
318        None => return Ok(None),
319        Some(s) => s,
320    };
321    let expires_at = match parse_ts(core, "describe_execution: exec_core", "lease_expires_at")? {
322        None => return Ok(None),
323        Some(ts) => ts,
324    };
325    // A lease is only "held" if the epoch is present too — lua/helpers.lua
326    // sets/clears epoch atomically with wid + expires_at. Parse strictly
327    // and require it: a missing epoch alongside a live wid is corruption.
328    let epoch = parse_u64_strict(core, "describe_execution: exec_core", "current_lease_epoch")?
329        .ok_or_else(|| SdkError::Config {
330            context: "describe_execution: exec_core".into(),
331            field: Some("current_lease_epoch".into()),
332            message: "is missing while current_worker_instance_id is set (key corruption?)".into(),
333        })?;
334    Ok(Some(LeaseSummary::new(
335        LeaseEpoch::new(epoch),
336        WorkerInstanceId::new(wid_str.to_owned()),
337        expires_at,
338    )))
339}
340
341// ═══════════════════════════════════════════════════════════════════════
342// describe_flow (issue #58.2)
343// ═══════════════════════════════════════════════════════════════════════
344
345/// FF-owned snake_case fields on flow_core. Any HGETALL field NOT in
346/// this set AND matching the `^[a-z][a-z0-9_]*\.` namespaced-tag shape
347/// is surfaced on [`FlowSnapshot::tags`]. Fields that are neither FF-
348/// owned nor namespaced (unexpected shapes) are surfaced as a `Config`
349/// error so on-disk corruption or protocol drift fails loud.
350const FLOW_CORE_KNOWN_FIELDS: &[&str] = &[
351    // flow_id is consumed + validated up-front in build_flow_snapshot but
352    // must still be listed here so the unknown-field sweep downstream
353    // doesn't flag it as corruption.
354    "flow_id",
355    "flow_kind",
356    "namespace",
357    "public_flow_state",
358    "graph_revision",
359    "node_count",
360    "edge_count",
361    "created_at",
362    "last_mutation_at",
363    "cancelled_at",
364    "cancel_reason",
365    "cancellation_policy",
366];
367
368impl FlowFabricWorker {
369    /// Read a typed snapshot of one flow.
370    ///
371    /// Returns `Ok(None)` when no flow exists at `id` (flow_core hash
372    /// absent). Returns `Ok(Some(snapshot))` on success. Errors
373    /// propagate Valkey transport faults and decode failures.
374    ///
375    /// # Consistency
376    ///
377    /// The snapshot is assembled from a single `HGETALL flow_core`. No
378    /// second key is pipelined: unlike `describe_execution`, flow tags
379    /// live inline on `flow_core` under the namespaced-prefix
380    /// convention (see [`FlowSnapshot::tags`]). A single round trip is
381    /// sufficient and reflects last-write-wins-per-field semantics
382    /// under concurrent FCALLs — identical to every existing HGETALL
383    /// consumer.
384    ///
385    /// # Field semantics
386    ///
387    /// * `public_flow_state` — engine-written literal (`open`,
388    ///   `running`, `blocked`, `cancelled`, `completed`, `failed`).
389    ///   Surfaced as `String` because FF has no typed enum yet.
390    /// * `cancelled_at` / `cancel_reason` / `cancellation_policy` —
391    ///   populated only after `cancel_flow`. `None` for live flows
392    ///   and for pre-cancel_flow-persistence-era flows.
393    /// * `tags` — any flow_core field matching `^[a-z][a-z0-9_]*\.`
394    ///   (the namespaced-tag convention). FF's own fields stay in
395    ///   snake_case without dots, so there's no collision. Fields
396    ///   that match neither shape are treated as corruption and fail
397    ///   loud.
398    pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
399        let partition = flow_partition(id, self.partition_config());
400        let ctx = FlowKeyContext::new(&partition, id);
401        let core_key = ctx.core();
402
403        let raw: HashMap<String, String> = self
404            .client()
405            .cmd("HGETALL")
406            .arg(&core_key)
407            .execute()
408            .await
409            .map_err(|e| SdkError::ValkeyContext {
410                source: e,
411                context: "describe_flow: HGETALL flow_core".into(),
412            })?;
413
414        if raw.is_empty() {
415            return Ok(None);
416        }
417
418        build_flow_snapshot(id.clone(), &raw).map(Some)
419    }
420}
421
422/// Assemble a [`FlowSnapshot`] from the raw HGETALL field map.
423///
424/// Kept as a free function so a future unit test can feed synthetic
425/// maps without a live Valkey.
426fn build_flow_snapshot(
427    flow_id: FlowId,
428    raw: &HashMap<String, String>,
429) -> Result<FlowSnapshot, SdkError> {
430    // flow_id is engine-written at create time (lua/flow.lua). Validate
431    // it matches the requested FlowId — a disagreement means either
432    // on-disk corruption or a caller accidentally reading the wrong key.
433    let stored_flow_id_str = opt_str(raw, "flow_id")
434        .filter(|s| !s.is_empty())
435        .ok_or_else(|| SdkError::Config {
436            context: "describe_flow: flow_core".into(),
437            field: Some("flow_id".into()),
438            message: "is missing or empty (key corruption?)".into(),
439        })?;
440    if stored_flow_id_str != flow_id.to_string() {
441        return Err(SdkError::Config {
442            context: "describe_flow: flow_core".into(),
443            field: Some("flow_id".into()),
444            message: format!(
445                "'{stored_flow_id_str}' does not match requested flow_id \
446                 '{flow_id}' (key corruption or wrong-key read?)"
447            ),
448        });
449    }
450
451    // namespace and flow_kind are engine-written at create time; absent
452    // or empty values indicate on-disk corruption.
453    let namespace_str = opt_str(raw, "namespace")
454        .filter(|s| !s.is_empty())
455        .ok_or_else(|| SdkError::Config {
456            context: "describe_flow: flow_core".into(),
457            field: Some("namespace".into()),
458            message: "is missing or empty (key corruption?)".into(),
459        })?;
460    let namespace = Namespace::new(namespace_str.to_owned());
461
462    let flow_kind = opt_str(raw, "flow_kind")
463        .filter(|s| !s.is_empty())
464        .ok_or_else(|| SdkError::Config {
465            context: "describe_flow: flow_core".into(),
466            field: Some("flow_kind".into()),
467            message: "is missing or empty (key corruption?)".into(),
468        })?
469        .to_owned();
470
471    let public_flow_state = opt_str(raw, "public_flow_state")
472        .filter(|s| !s.is_empty())
473        .ok_or_else(|| SdkError::Config {
474            context: "describe_flow: flow_core".into(),
475            field: Some("public_flow_state".into()),
476            message: "is missing or empty (key corruption?)".into(),
477        })?
478        .to_owned();
479
480    // graph_revision / node_count / edge_count are engine-maintained
481    // counters; missing values indicate corruption (ff_create_flow
482    // writes "0" to all three). Parse strictly.
483    let graph_revision = parse_u64_strict(raw, "describe_flow: flow_core", "graph_revision")?
484        .ok_or_else(|| SdkError::Config {
485            context: "describe_flow: flow_core".into(),
486            field: Some("graph_revision".into()),
487            message: "is missing (key corruption?)".into(),
488        })?;
489    let node_count =
490        parse_u32_strict(raw, "describe_flow: flow_core", "node_count")?.ok_or_else(|| {
491            SdkError::Config {
492                context: "describe_flow: flow_core".into(),
493                field: Some("node_count".into()),
494                message: "is missing (key corruption?)".into(),
495            }
496        })?;
497    let edge_count =
498        parse_u32_strict(raw, "describe_flow: flow_core", "edge_count")?.ok_or_else(|| {
499            SdkError::Config {
500                context: "describe_flow: flow_core".into(),
501                field: Some("edge_count".into()),
502                message: "is missing (key corruption?)".into(),
503            }
504        })?;
505
506    // created_at + last_mutation_at are engine-maintained; absent values
507    // indicate corruption (ff_create_flow writes both).
508    let created_at = parse_ts(raw, "describe_flow: flow_core", "created_at")?.ok_or_else(|| {
509        SdkError::Config {
510            context: "describe_flow: flow_core".into(),
511            field: Some("created_at".into()),
512            message: "is missing or empty (key corruption?)".into(),
513        }
514    })?;
515    let last_mutation_at = parse_ts(raw, "describe_flow: flow_core", "last_mutation_at")?
516        .ok_or_else(|| SdkError::Config {
517            context: "describe_flow: flow_core".into(),
518            field: Some("last_mutation_at".into()),
519            message: "is missing or empty (key corruption?)".into(),
520        })?;
521
522    let cancelled_at = parse_ts(raw, "describe_flow: flow_core", "cancelled_at")?;
523    let cancel_reason = opt_str(raw, "cancel_reason")
524        .filter(|s| !s.is_empty())
525        .map(str::to_owned);
526    let cancellation_policy = opt_str(raw, "cancellation_policy")
527        .filter(|s| !s.is_empty())
528        .map(str::to_owned);
529
530    // Route unknown fields: namespaced-prefix (e.g. `cairn.task_id`) →
531    // tags; anything else → corruption. This keeps FF's snake_case
532    // field additions distinct from consumer metadata without a second
533    // HGETALL on a non-existent tags hash.
534    let mut tags: BTreeMap<String, String> = BTreeMap::new();
535    for (k, v) in raw {
536        if FLOW_CORE_KNOWN_FIELDS.contains(&k.as_str()) {
537            continue;
538        }
539        if is_namespaced_tag_key(k) {
540            tags.insert(k.clone(), v.clone());
541        } else {
542            return Err(SdkError::Config {
543                context: "describe_flow: flow_core".into(),
544                field: None,
545                message: format!(
546                    "has unexpected field '{k}' — not an FF field and not a namespaced \
547                     tag (lowercase-alphanumeric-prefix + '.')"
548                ),
549            });
550        }
551    }
552
553    Ok(FlowSnapshot::new(
554        flow_id,
555        flow_kind,
556        namespace,
557        public_flow_state,
558        graph_revision,
559        node_count,
560        edge_count,
561        created_at,
562        last_mutation_at,
563        cancelled_at,
564        cancel_reason,
565        cancellation_policy,
566        tags,
567    ))
568}
569
570/// Match the namespaced-tag shape `^[a-z][a-z0-9_]*\.` documented on
571/// [`ExecutionSnapshot::tags`] / [`FlowSnapshot::tags`]. Kept inline
572/// (no regex dependency) — the shape is tight enough to hand-check.
573fn is_namespaced_tag_key(k: &str) -> bool {
574    let mut chars = k.chars();
575    let Some(first) = chars.next() else {
576        return false;
577    };
578    if !first.is_ascii_lowercase() {
579        return false;
580    }
581    let mut saw_dot = false;
582    for c in chars {
583        if c == '.' {
584            saw_dot = true;
585            break;
586        }
587        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
588            return false;
589        }
590    }
591    saw_dot
592}
593
594// ═══════════════════════════════════════════════════════════════════════
595// describe_edge / list_*_edges (issue #58.3)
596// ═══════════════════════════════════════════════════════════════════════
597
598/// FF-owned fields on the flow-scoped `edge:<edge_id>` hash. An HGETALL
599/// field outside this set signals on-disk corruption or protocol drift
600/// and surfaces as `SdkError::Config` — matching the strict-parse
601/// posture on `describe_flow`.
602const EDGE_KNOWN_FIELDS: &[&str] = &[
603    "edge_id",
604    "flow_id",
605    "upstream_execution_id",
606    "downstream_execution_id",
607    "dependency_kind",
608    "satisfaction_condition",
609    "data_passing_ref",
610    "edge_state",
611    "created_at",
612    "created_by",
613];
614
615impl FlowFabricWorker {
616    /// Read a typed snapshot of one dependency edge.
617    ///
618    /// Takes both `flow_id` and `edge_id`: the edge hash is stored under
619    /// the flow's partition (`ff:flow:{fp:N}:<flow_id>:edge:<edge_id>`)
620    /// and FF does not maintain a global `edge_id -> flow_id` index.
621    /// The caller already knows the flow from the staging call result
622    /// or the consumer's own metadata.
623    ///
624    /// Returns `Ok(None)` when the edge hash is absent (never staged,
625    /// or staged under a different flow). Returns `Ok(Some(snapshot))`
626    /// on success.
627    ///
628    /// # Consistency
629    ///
630    /// Single `HGETALL` — the flow-scoped edge hash is written once at
631    /// staging time and never mutated (per-execution resolution state
632    /// lives on a separate `dep:<edge_id>` hash), so a single round
633    /// trip is authoritative.
634    pub async fn describe_edge(
635        &self,
636        flow_id: &FlowId,
637        edge_id: &EdgeId,
638    ) -> Result<Option<EdgeSnapshot>, SdkError> {
639        let partition = flow_partition(flow_id, self.partition_config());
640        let ctx = FlowKeyContext::new(&partition, flow_id);
641        let edge_key = ctx.edge(edge_id);
642
643        let raw: HashMap<String, String> = self
644            .client()
645            .cmd("HGETALL")
646            .arg(&edge_key)
647            .execute()
648            .await
649            .map_err(|e| SdkError::ValkeyContext {
650                source: e,
651                context: "describe_edge: HGETALL edge_hash".into(),
652            })?;
653
654        if raw.is_empty() {
655            return Ok(None);
656        }
657
658        build_edge_snapshot(flow_id, edge_id, &raw).map(Some)
659    }
660
661    /// List all outgoing dependency edges originating from an execution.
662    ///
663    /// Returns an empty `Vec` when the execution has no outgoing edges
664    /// (including the case where the execution is standalone, i.e. not
665    /// attached to any flow — such executions cannot participate in
666    /// dependency edges).
667    ///
668    /// # Reads
669    ///
670    /// 1. `HGET exec_core flow_id` — resolve the flow owning the
671    ///    adjacency set. Missing or empty flow_id returns an empty Vec.
672    /// 2. `SMEMBERS` on the flow-scoped `out:<upstream_eid>` set.
673    /// 3. One pipelined round trip issuing one `HGETALL` per edge_id.
674    ///
675    /// Ordering is unspecified — the adjacency set is an unordered SET.
676    /// Callers that need deterministic order should sort by
677    /// [`EdgeSnapshot::edge_id`] (or `created_at`) themselves.
678    pub async fn list_outgoing_edges(
679        &self,
680        upstream_eid: &ExecutionId,
681    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
682        let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
683            return Ok(Vec::new());
684        };
685        let partition = flow_partition(&flow_id, self.partition_config());
686        let ctx = FlowKeyContext::new(&partition, &flow_id);
687        self.list_edges_from_set(
688            &ctx.outgoing(upstream_eid),
689            &flow_id,
690            upstream_eid,
691            AdjacencySide::Outgoing,
692        )
693        .await
694    }
695
696    /// List all incoming dependency edges landing on an execution.
697    /// See [`list_outgoing_edges`] for the read shape.
698    pub async fn list_incoming_edges(
699        &self,
700        downstream_eid: &ExecutionId,
701    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
702        let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
703            return Ok(Vec::new());
704        };
705        let partition = flow_partition(&flow_id, self.partition_config());
706        let ctx = FlowKeyContext::new(&partition, &flow_id);
707        self.list_edges_from_set(
708            &ctx.incoming(downstream_eid),
709            &flow_id,
710            downstream_eid,
711            AdjacencySide::Incoming,
712        )
713        .await
714    }
715
716    /// `HGET exec_core.flow_id` and parse to a [`FlowId`]. `None` when
717    /// the exec_core hash is absent OR the flow_id field is empty
718    /// (standalone execution).
719    ///
720    /// Also pins the RFC-011 co-location invariant
721    /// (`execution_partition(eid) == flow_partition(flow_id)`) — a
722    /// parsed-but-wrong flow_id would otherwise silently route the
723    /// follow-up adjacency reads to the wrong partition and return
724    /// bogus empty results. A mismatch surfaces as `SdkError::Config`.
725    async fn resolve_flow_id(
726        &self,
727        eid: &ExecutionId,
728    ) -> Result<Option<FlowId>, SdkError> {
729        let exec_partition = execution_partition(eid, self.partition_config());
730        let ctx = ExecKeyContext::new(&exec_partition, eid);
731        let raw: Option<String> = self
732            .client()
733            .cmd("HGET")
734            .arg(ctx.core())
735            .arg("flow_id")
736            .execute()
737            .await
738            .map_err(|e| SdkError::ValkeyContext {
739                source: e,
740                context: "list_edges: HGET exec_core.flow_id".into(),
741            })?;
742        let Some(raw) = raw.filter(|s| !s.is_empty()) else {
743            return Ok(None);
744        };
745        let flow_id = FlowId::parse(&raw).map_err(|e| SdkError::Config {
746            context: "list_edges: exec_core".into(),
747            field: Some("flow_id".into()),
748            message: format!("'{raw}' is not a valid UUID (key corruption?): {e}"),
749        })?;
750        let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
751        if exec_partition.index != flow_partition_index {
752            return Err(SdkError::Config {
753                context: "list_edges: exec_core".into(),
754                field: Some("flow_id".into()),
755                message: format!(
756                    "'{flow_id}' partition {flow_partition_index} does not match \
757                     execution partition {} (RFC-011 co-location violation; key corruption?)",
758                    exec_partition.index
759                ),
760            });
761        }
762        Ok(Some(flow_id))
763    }
764
765    /// Shared body for `list_incoming_edges` / `list_outgoing_edges`:
766    /// SMEMBERS + pipelined HGETALL.
767    ///
768    /// `subject_eid` + `side` pin the expected endpoint on each
769    /// returned `EdgeSnapshot`: an adjacency SET entry whose edge
770    /// hash points at a different upstream (for `Outgoing` listings)
771    /// or downstream (for `Incoming`) is treated as corruption and
772    /// surfaced as `SdkError::Config`, matching the strict-parse
773    /// posture elsewhere in this module.
774    async fn list_edges_from_set(
775        &self,
776        adj_key: &str,
777        flow_id: &FlowId,
778        subject_eid: &ExecutionId,
779        side: AdjacencySide,
780    ) -> Result<Vec<EdgeSnapshot>, SdkError> {
781        let edge_id_strs: Vec<String> = self
782            .client()
783            .cmd("SMEMBERS")
784            .arg(adj_key)
785            .execute()
786            .await
787            .map_err(|e| SdkError::ValkeyContext {
788                source: e,
789                context: "list_edges: SMEMBERS adj_set".into(),
790            })?;
791        if edge_id_strs.is_empty() {
792            return Ok(Vec::new());
793        }
794
795        // Parse edge ids first so a corrupt adjacency entry fails loud
796        // before we spend a round trip on it.
797        let mut edge_ids: Vec<EdgeId> = Vec::with_capacity(edge_id_strs.len());
798        for raw in &edge_id_strs {
799            let parsed = EdgeId::parse(raw).map_err(|e| SdkError::Config {
800                context: "list_edges: adjacency_set".into(),
801                field: Some("edge_id".into()),
802                message: format!("'{raw}' is not a valid EdgeId (key corruption?): {e}"),
803            })?;
804            edge_ids.push(parsed);
805        }
806
807        let partition = flow_partition(flow_id, self.partition_config());
808        let ctx = FlowKeyContext::new(&partition, flow_id);
809
810        let mut pipe = self.client().pipeline();
811        let slots: Vec<_> = edge_ids
812            .iter()
813            .map(|eid| {
814                pipe.cmd::<HashMap<String, String>>("HGETALL")
815                    .arg(ctx.edge(eid))
816                    .finish()
817            })
818            .collect();
819        pipe.execute().await.map_err(|e| SdkError::ValkeyContext {
820            source: e,
821            context: "list_edges: pipeline HGETALL edges".into(),
822        })?;
823
824        let mut out: Vec<EdgeSnapshot> = Vec::with_capacity(edge_ids.len());
825        for (edge_id, slot) in edge_ids.iter().zip(slots) {
826            let raw = slot.value().map_err(|e| SdkError::ValkeyContext {
827                source: e,
828                context: "list_edges: decode HGETALL edge_hash".into(),
829            })?;
830            if raw.is_empty() {
831                // Adjacency SET referenced an edge_hash that no longer
832                // exists. FF does not delete edge hashes today (staging
833                // is write-once), so this is corruption — fail loud.
834                return Err(SdkError::Config {
835                    context: "list_edges: adjacency_set".into(),
836                    field: None,
837                    message: format!(
838                        "refers to edge_id '{edge_id}' but its edge_hash is absent \
839                         (key corruption?)"
840                    ),
841                });
842            }
843            let snap = build_edge_snapshot(flow_id, edge_id, &raw)?;
844            // Cross-check: the edge hash's endpoint on the listed side
845            // must match the execution we're listing for. A mismatch
846            // means the adjacency SET and edge hash disagree (e.g. a
847            // stale or corrupted SET entry) — refuse to silently
848            // return an edge the caller did not ask about.
849            let endpoint = match side {
850                AdjacencySide::Outgoing => &snap.upstream_execution_id,
851                AdjacencySide::Incoming => &snap.downstream_execution_id,
852            };
853            if endpoint != subject_eid {
854                return Err(SdkError::Config {
855                    context: "list_edges: adjacency_set".into(),
856                    field: None,
857                    message: format!(
858                        "for execution '{subject_eid}' (side={side:?}) contains edge \
859                         '{edge_id}' whose stored endpoint is '{endpoint}' \
860                         (adjacency/edge-hash drift?)"
861                    ),
862                });
863            }
864            out.push(snap);
865        }
866        Ok(out)
867    }
868}
869
870/// Which side of an adjacency SET the subject execution lives on.
871/// Used by [`list_edges_from_set`] to cross-check the returned edge's
872/// stored endpoint against the listing's subject.
873#[derive(Clone, Copy, Debug, PartialEq, Eq)]
874enum AdjacencySide {
875    /// Outgoing: subject is the `upstream_execution_id` on each edge.
876    Outgoing,
877    /// Incoming: subject is the `downstream_execution_id` on each edge.
878    Incoming,
879}
880
881/// Crate-visible re-export of [`build_edge_snapshot`] for
882/// [`crate::engine_error::EngineError::enrich_dependency_conflict`].
883#[allow(dead_code)]
884pub(crate) fn build_edge_snapshot_public(
885    flow_id: &FlowId,
886    edge_id: &EdgeId,
887    raw: &HashMap<String, String>,
888) -> Result<EdgeSnapshot, SdkError> {
889    build_edge_snapshot(flow_id, edge_id, raw)
890}
891
892/// Assemble an [`EdgeSnapshot`] from the raw HGETALL field map. Kept
893/// as a free function so unit tests can feed synthetic maps.
894///
895/// `flow_id` / `edge_id` are the caller's expected identities — both
896/// are cross-checked against the stored values to catch wrong-key
897/// reads and on-disk corruption.
898fn build_edge_snapshot(
899    flow_id: &FlowId,
900    edge_id: &EdgeId,
901    raw: &HashMap<String, String>,
902) -> Result<EdgeSnapshot, SdkError> {
903    // Sweep for unknown fields before parsing — a future FF rename
904    // that lands an unrecognised field must fail loud rather than
905    // silently drop data.
906    for k in raw.keys() {
907        if !EDGE_KNOWN_FIELDS.contains(&k.as_str()) {
908            return Err(SdkError::Config {
909                context: "edge_snapshot: edge_hash".into(),
910                field: None,
911                message: format!(
912                    "has unexpected field '{k}' (protocol drift or corruption?)"
913                ),
914            });
915        }
916    }
917
918    let stored_edge_id_str = opt_str(raw, "edge_id")
919        .filter(|s| !s.is_empty())
920        .ok_or_else(|| SdkError::Config {
921            context: "edge_snapshot: edge_hash".into(),
922            field: Some("edge_id".into()),
923            message: "is missing or empty (key corruption?)".into(),
924        })?;
925    if stored_edge_id_str != edge_id.to_string() {
926        return Err(SdkError::Config {
927            context: "edge_snapshot: edge_hash".into(),
928            field: Some("edge_id".into()),
929            message: format!(
930                "'{stored_edge_id_str}' does not match requested edge_id \
931                 '{edge_id}' (key corruption or wrong-key read?)"
932            ),
933        });
934    }
935
936    let stored_flow_id_str = opt_str(raw, "flow_id")
937        .filter(|s| !s.is_empty())
938        .ok_or_else(|| SdkError::Config {
939            context: "edge_snapshot: edge_hash".into(),
940            field: Some("flow_id".into()),
941            message: "is missing or empty (key corruption?)".into(),
942        })?;
943    if stored_flow_id_str != flow_id.to_string() {
944        return Err(SdkError::Config {
945            context: "edge_snapshot: edge_hash".into(),
946            field: Some("flow_id".into()),
947            message: format!(
948                "'{stored_flow_id_str}' does not match requested flow_id \
949                 '{flow_id}' (key corruption or wrong-key read?)"
950            ),
951        });
952    }
953
954    let upstream_execution_id = parse_eid(raw, "upstream_execution_id")?;
955    let downstream_execution_id = parse_eid(raw, "downstream_execution_id")?;
956
957    let dependency_kind = opt_str(raw, "dependency_kind")
958        .filter(|s| !s.is_empty())
959        .ok_or_else(|| SdkError::Config {
960            context: "edge_snapshot: edge_hash".into(),
961            field: Some("dependency_kind".into()),
962            message: "is missing or empty (key corruption?)".into(),
963        })?
964        .to_owned();
965
966    let satisfaction_condition = opt_str(raw, "satisfaction_condition")
967        .filter(|s| !s.is_empty())
968        .ok_or_else(|| SdkError::Config {
969            context: "edge_snapshot: edge_hash".into(),
970            field: Some("satisfaction_condition".into()),
971            message: "is missing or empty (key corruption?)".into(),
972        })?
973        .to_owned();
974
975    // data_passing_ref is stored as "" when the stager passed None.
976    // Treat empty as absent rather than surfacing an empty String.
977    let data_passing_ref = opt_str(raw, "data_passing_ref")
978        .filter(|s| !s.is_empty())
979        .map(str::to_owned);
980
981    let edge_state = opt_str(raw, "edge_state")
982        .filter(|s| !s.is_empty())
983        .ok_or_else(|| SdkError::Config {
984            context: "edge_snapshot: edge_hash".into(),
985            field: Some("edge_state".into()),
986            message: "is missing or empty (key corruption?)".into(),
987        })?
988        .to_owned();
989
990    let created_at =
991        parse_ts(raw, "edge_snapshot: edge_hash", "created_at")?.ok_or_else(|| {
992            SdkError::Config {
993                context: "edge_snapshot: edge_hash".into(),
994                field: Some("created_at".into()),
995                message: "is missing or empty (key corruption?)".into(),
996            }
997        })?;
998
999    let created_by = opt_str(raw, "created_by")
1000        .filter(|s| !s.is_empty())
1001        .ok_or_else(|| SdkError::Config {
1002            context: "edge_snapshot: edge_hash".into(),
1003            field: Some("created_by".into()),
1004            message: "is missing or empty (key corruption?)".into(),
1005        })?
1006        .to_owned();
1007
1008    Ok(EdgeSnapshot::new(
1009        edge_id.clone(),
1010        flow_id.clone(),
1011        upstream_execution_id,
1012        downstream_execution_id,
1013        dependency_kind,
1014        satisfaction_condition,
1015        data_passing_ref,
1016        edge_state,
1017        created_at,
1018        created_by,
1019    ))
1020}
1021
1022fn parse_eid(raw: &HashMap<String, String>, field: &str) -> Result<ExecutionId, SdkError> {
1023    let s = opt_str(raw, field)
1024        .filter(|s| !s.is_empty())
1025        .ok_or_else(|| SdkError::Config {
1026            context: "edge_snapshot: edge_hash".into(),
1027            field: Some(field.to_owned()),
1028            message: "is missing or empty (key corruption?)".into(),
1029        })?;
1030    ExecutionId::parse(s).map_err(|e| SdkError::Config {
1031        context: "edge_snapshot: edge_hash".into(),
1032        field: Some(field.to_owned()),
1033        message: format!("'{s}' is not a valid ExecutionId (key corruption?): {e}"),
1034    })
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039    use super::*;
1040    use ff_core::partition::PartitionConfig;
1041    use ff_core::types::FlowId;
1042
1043    fn eid() -> ExecutionId {
1044        let config = PartitionConfig::default();
1045        ExecutionId::for_flow(&FlowId::new(), &config)
1046    }
1047
1048    fn minimal_core(public_state: &str) -> HashMap<String, String> {
1049        let mut m = HashMap::new();
1050        m.insert("public_state".to_owned(), public_state.to_owned());
1051        m.insert("lane_id".to_owned(), "default".to_owned());
1052        m.insert("namespace".to_owned(), "ns".to_owned());
1053        m.insert("created_at".to_owned(), "1000".to_owned());
1054        m.insert("last_mutation_at".to_owned(), "2000".to_owned());
1055        m.insert("total_attempt_count".to_owned(), "0".to_owned());
1056        m
1057    }
1058
1059    #[test]
1060    fn waiting_exec_no_attempt_no_lease_no_tags() {
1061        let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), HashMap::new())
1062            .unwrap()
1063            .expect("should build");
1064        assert_eq!(snap.public_state, PublicState::Waiting);
1065        assert!(snap.current_attempt.is_none());
1066        assert!(snap.current_lease.is_none());
1067        assert!(snap.current_waitpoint.is_none());
1068        assert_eq!(snap.tags.len(), 0);
1069        assert_eq!(snap.created_at.0, 1000);
1070        assert_eq!(snap.last_mutation_at.0, 2000);
1071        assert!(snap.flow_id.is_none());
1072        assert!(snap.blocking_reason.is_none());
1073    }
1074
1075    #[test]
1076    fn tags_flow_through_sorted() {
1077        let mut tags = HashMap::new();
1078        tags.insert("cairn.task_id".to_owned(), "t-1".to_owned());
1079        tags.insert("cairn.project".to_owned(), "proj".to_owned());
1080        let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), tags)
1081            .unwrap()
1082            .unwrap();
1083        let keys: Vec<_> = snap.tags.keys().cloned().collect();
1084        assert_eq!(
1085            keys,
1086            vec!["cairn.project".to_owned(), "cairn.task_id".to_owned()]
1087        );
1088    }
1089
1090    #[test]
1091    fn invalid_public_state_fails_loud() {
1092        let err =
1093            build_execution_snapshot(eid(), &minimal_core("bogus"), HashMap::new()).unwrap_err();
1094        match err {
1095            SdkError::Config { field, message: msg, .. } => {
1096                assert_eq!(field.as_deref(), Some("public_state"), "msg: {msg}");
1097            }
1098            other => panic!("expected Config, got {other:?}"),
1099        }
1100    }
1101
1102    #[test]
1103    fn invalid_lane_id_fails_loud() {
1104        // LaneId::try_new rejects non-printable bytes. Simulate on-disk
1105        // corruption by stamping a lane_id with an embedded \n.
1106        let mut core = minimal_core("waiting");
1107        core.insert("lane_id".to_owned(), "lane\nbroken".to_owned());
1108        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1109        match err {
1110            SdkError::Config { field, message: msg, .. } => {
1111                assert_eq!(field.as_deref(), Some("lane_id"), "msg: {msg}");
1112            }
1113            other => panic!("expected Config, got {other:?}"),
1114        }
1115    }
1116
1117    #[test]
1118    fn missing_required_timestamps_fail_loud() {
1119        for want in ["created_at", "last_mutation_at"] {
1120            let mut core = minimal_core("waiting");
1121            core.remove(want);
1122            let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1123            match err {
1124                SdkError::Config { field, message: msg, .. } => {
1125                    assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1126                }
1127                other => panic!("expected Config for {want}, got {other:?}"),
1128            }
1129        }
1130    }
1131
1132    #[test]
1133    fn malformed_total_attempt_count_fails_loud() {
1134        let mut core = minimal_core("waiting");
1135        core.insert("total_attempt_count".to_owned(), "not-a-number".to_owned());
1136        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1137        match err {
1138            SdkError::Config { field, message: msg, .. } => {
1139                assert_eq!(field.as_deref(), Some("total_attempt_count"), "msg: {msg}");
1140            }
1141            other => panic!("expected Config, got {other:?}"),
1142        }
1143    }
1144
1145    #[test]
1146    fn attempt_id_without_index_fails_loud() {
1147        // current_attempt_id set but current_attempt_index absent =>
1148        // corruption, since lua/execution.lua writes both atomically.
1149        let mut core = minimal_core("active");
1150        core.insert(
1151            "current_attempt_id".to_owned(),
1152            AttemptId::new().to_string(),
1153        );
1154        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1155        match err {
1156            SdkError::Config { field, message: msg, .. } => {
1157                assert_eq!(field.as_deref(), Some("current_attempt_index"), "msg: {msg}");
1158            }
1159            other => panic!("expected Config, got {other:?}"),
1160        }
1161    }
1162
1163    #[test]
1164    fn lease_without_epoch_fails_loud() {
1165        // wid + expires_at present but epoch missing => corruption.
1166        let mut core = minimal_core("active");
1167        core.insert(
1168            "current_worker_instance_id".to_owned(),
1169            "w-inst-1".to_owned(),
1170        );
1171        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1172        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1173        match err {
1174            SdkError::Config { field, message: msg, .. } => {
1175                assert_eq!(field.as_deref(), Some("current_lease_epoch"), "msg: {msg}");
1176            }
1177            other => panic!("expected Config, got {other:?}"),
1178        }
1179    }
1180
1181    #[test]
1182    fn lease_summary_requires_both_wid_and_expires_at() {
1183        // Only wid → no lease (lease expired + cleared lease_expires_at
1184        // but wid not yet rewritten is not a real state; defensive).
1185        let mut core = minimal_core("active");
1186        core.insert(
1187            "current_worker_instance_id".to_owned(),
1188            "w-inst-1".to_owned(),
1189        );
1190        let snap = build_execution_snapshot(eid(), &core, HashMap::new())
1191            .unwrap()
1192            .unwrap();
1193        assert!(snap.current_lease.is_none());
1194
1195        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1196        core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1197        let snap = build_execution_snapshot(eid(), &core, HashMap::new())
1198            .unwrap()
1199            .unwrap();
1200        let lease = snap.current_lease.expect("lease present");
1201        assert_eq!(lease.lease_epoch, LeaseEpoch::new(3));
1202        assert_eq!(lease.expires_at.0, 9000);
1203        assert_eq!(lease.worker_instance_id.as_str(), "w-inst-1");
1204    }
1205
1206    // ─── FlowSnapshot (describe_flow) ───
1207
1208    fn fid() -> FlowId {
1209        FlowId::new()
1210    }
1211
1212    fn minimal_flow_core(id: &FlowId, state: &str) -> HashMap<String, String> {
1213        let mut m = HashMap::new();
1214        m.insert("flow_id".to_owned(), id.to_string());
1215        m.insert("flow_kind".to_owned(), "dag".to_owned());
1216        m.insert("namespace".to_owned(), "ns".to_owned());
1217        m.insert("public_flow_state".to_owned(), state.to_owned());
1218        m.insert("graph_revision".to_owned(), "0".to_owned());
1219        m.insert("node_count".to_owned(), "0".to_owned());
1220        m.insert("edge_count".to_owned(), "0".to_owned());
1221        m.insert("created_at".to_owned(), "1000".to_owned());
1222        m.insert("last_mutation_at".to_owned(), "1000".to_owned());
1223        m
1224    }
1225
1226    #[test]
1227    fn open_flow_round_trips() {
1228        let f = fid();
1229        let snap = build_flow_snapshot(f.clone(), &minimal_flow_core(&f, "open")).unwrap();
1230        assert_eq!(snap.flow_id, f);
1231        assert_eq!(snap.flow_kind, "dag");
1232        assert_eq!(snap.namespace.as_str(), "ns");
1233        assert_eq!(snap.public_flow_state, "open");
1234        assert_eq!(snap.graph_revision, 0);
1235        assert_eq!(snap.node_count, 0);
1236        assert_eq!(snap.edge_count, 0);
1237        assert_eq!(snap.created_at.0, 1000);
1238        assert_eq!(snap.last_mutation_at.0, 1000);
1239        assert!(snap.cancelled_at.is_none());
1240        assert!(snap.cancel_reason.is_none());
1241        assert!(snap.cancellation_policy.is_none());
1242        assert!(snap.tags.is_empty());
1243    }
1244
1245    #[test]
1246    fn cancelled_flow_surfaces_cancel_fields() {
1247        let f = fid();
1248        let mut core = minimal_flow_core(&f, "cancelled");
1249        core.insert("cancelled_at".to_owned(), "2000".to_owned());
1250        core.insert("cancel_reason".to_owned(), "operator".to_owned());
1251        core.insert("cancellation_policy".to_owned(), "cancel_all".to_owned());
1252        let snap = build_flow_snapshot(f, &core).unwrap();
1253        assert_eq!(snap.public_flow_state, "cancelled");
1254        assert_eq!(snap.cancelled_at.unwrap().0, 2000);
1255        assert_eq!(snap.cancel_reason.as_deref(), Some("operator"));
1256        assert_eq!(snap.cancellation_policy.as_deref(), Some("cancel_all"));
1257    }
1258
1259    #[test]
1260    fn namespaced_tags_routed_to_tags_map() {
1261        let f = fid();
1262        let mut core = minimal_flow_core(&f, "open");
1263        core.insert("cairn.task_id".to_owned(), "t-1".to_owned());
1264        core.insert("cairn.project".to_owned(), "proj".to_owned());
1265        core.insert("operator.label".to_owned(), "v".to_owned());
1266        let snap = build_flow_snapshot(f, &core).unwrap();
1267        assert_eq!(snap.tags.len(), 3);
1268        let keys: Vec<_> = snap.tags.keys().cloned().collect();
1269        // BTreeMap keeps them sorted.
1270        assert_eq!(
1271            keys,
1272            vec![
1273                "cairn.project".to_owned(),
1274                "cairn.task_id".to_owned(),
1275                "operator.label".to_owned()
1276            ]
1277        );
1278    }
1279
1280    #[test]
1281    fn unknown_flat_field_fails_loud() {
1282        // A future FF field rename or on-disk drift lands a non-
1283        // namespaced unknown key. Don't silently bucket it.
1284        let f = fid();
1285        let mut core = minimal_flow_core(&f, "open");
1286        core.insert("bogus_future_field".to_owned(), "v".to_owned());
1287        let err = build_flow_snapshot(f, &core).unwrap_err();
1288        match err {
1289            SdkError::Config { field, message: msg, .. } => {
1290                assert!(field.is_none(), "expected whole-object error, got field={field:?}");
1291                assert!(msg.contains("bogus_future_field"), "msg: {msg}");
1292            }
1293            other => panic!("expected Config, got {other:?}"),
1294        }
1295    }
1296
1297    #[test]
1298    fn missing_required_fields_fail_loud() {
1299        for want in [
1300            "flow_id",
1301            "namespace",
1302            "flow_kind",
1303            "public_flow_state",
1304            "graph_revision",
1305            "node_count",
1306            "edge_count",
1307            "created_at",
1308            "last_mutation_at",
1309        ] {
1310            let f = fid();
1311            let mut core = minimal_flow_core(&f, "open");
1312            core.remove(want);
1313            let err = build_flow_snapshot(f, &core).err().unwrap_or_else(|| {
1314                panic!("field {want} should fail but build_flow_snapshot returned Ok")
1315            });
1316            match err {
1317                SdkError::Config { field, message: msg, .. } => {
1318                    assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1319                }
1320                other => panic!("expected Config for {want}, got {other:?}"),
1321            }
1322        }
1323    }
1324
1325    #[test]
1326    fn empty_required_strings_fail_loud() {
1327        // opt_str + .filter(|s| !s.is_empty()) must treat an empty
1328        // value the same as a missing one for strict-parsed fields.
1329        for want in ["flow_id", "namespace", "flow_kind", "public_flow_state"] {
1330            let f = fid();
1331            let mut core = minimal_flow_core(&f, "open");
1332            core.insert(want.to_owned(), String::new());
1333            let err = build_flow_snapshot(f, &core).err().unwrap_or_else(|| {
1334                panic!("empty {want} should fail but build_flow_snapshot returned Ok")
1335            });
1336            match err {
1337                SdkError::Config { field, message: msg, .. } => {
1338                    assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1339                }
1340                other => panic!("expected Config for {want}, got {other:?}"),
1341            }
1342        }
1343    }
1344
1345    #[test]
1346    fn flow_id_mismatch_fails_loud() {
1347        // flow_core.flow_id disagreeing with the requested FlowId is
1348        // corruption or a wrong-key read — must surface as Config.
1349        let requested = fid();
1350        let other = fid();
1351        let core = minimal_flow_core(&other, "open");
1352        let err = build_flow_snapshot(requested, &core).unwrap_err();
1353        match err {
1354            SdkError::Config { field, message: msg, .. } => {
1355                assert_eq!(field.as_deref(), Some("flow_id"), "msg: {msg}");
1356                assert!(msg.contains("does not match"), "msg: {msg}");
1357            }
1358            other => panic!("expected Config, got {other:?}"),
1359        }
1360    }
1361
1362    #[test]
1363    fn malformed_counter_fails_loud() {
1364        let f = fid();
1365        let mut core = minimal_flow_core(&f, "open");
1366        core.insert("graph_revision".to_owned(), "not-a-number".to_owned());
1367        let err = build_flow_snapshot(f, &core).unwrap_err();
1368        match err {
1369            SdkError::Config { field, message: msg, .. } => {
1370                assert_eq!(field.as_deref(), Some("graph_revision"), "msg: {msg}");
1371            }
1372            other => panic!("expected Config, got {other:?}"),
1373        }
1374    }
1375
1376    // ─── EdgeSnapshot (describe_edge) ───
1377
1378    fn eids_for_flow(f: &FlowId) -> (ExecutionId, ExecutionId) {
1379        let cfg = PartitionConfig::default();
1380        (ExecutionId::for_flow(f, &cfg), ExecutionId::for_flow(f, &cfg))
1381    }
1382
1383    fn minimal_edge_hash(
1384        flow: &FlowId,
1385        edge: &EdgeId,
1386        up: &ExecutionId,
1387        down: &ExecutionId,
1388    ) -> HashMap<String, String> {
1389        let mut m = HashMap::new();
1390        m.insert("edge_id".into(), edge.to_string());
1391        m.insert("flow_id".into(), flow.to_string());
1392        m.insert("upstream_execution_id".into(), up.to_string());
1393        m.insert("downstream_execution_id".into(), down.to_string());
1394        m.insert("dependency_kind".into(), "success_only".into());
1395        m.insert("satisfaction_condition".into(), "all_required".into());
1396        m.insert("data_passing_ref".into(), String::new());
1397        m.insert("edge_state".into(), "pending".into());
1398        m.insert("created_at".into(), "1234".into());
1399        m.insert("created_by".into(), "engine".into());
1400        m
1401    }
1402
1403    #[test]
1404    fn edge_round_trips_all_fields() {
1405        let f = fid();
1406        let edge = EdgeId::new();
1407        let (up, down) = eids_for_flow(&f);
1408        let raw = minimal_edge_hash(&f, &edge, &up, &down);
1409        let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
1410        assert_eq!(snap.edge_id, edge);
1411        assert_eq!(snap.flow_id, f);
1412        assert_eq!(snap.upstream_execution_id, up);
1413        assert_eq!(snap.downstream_execution_id, down);
1414        assert_eq!(snap.dependency_kind, "success_only");
1415        assert_eq!(snap.satisfaction_condition, "all_required");
1416        assert!(snap.data_passing_ref.is_none());
1417        assert_eq!(snap.edge_state, "pending");
1418        assert_eq!(snap.created_at.0, 1234);
1419        assert_eq!(snap.created_by, "engine");
1420    }
1421
1422    #[test]
1423    fn edge_data_passing_ref_round_trips_when_set() {
1424        let f = fid();
1425        let edge = EdgeId::new();
1426        let (up, down) = eids_for_flow(&f);
1427        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1428        raw.insert("data_passing_ref".into(), "ref://blob-42".into());
1429        let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
1430        assert_eq!(snap.data_passing_ref.as_deref(), Some("ref://blob-42"));
1431    }
1432
1433    #[test]
1434    fn edge_unknown_field_fails_loud() {
1435        let f = fid();
1436        let edge = EdgeId::new();
1437        let (up, down) = eids_for_flow(&f);
1438        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1439        raw.insert("bogus_future_field".into(), "v".into());
1440        let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1441        match err {
1442            SdkError::Config { field, message: msg, .. } => {
1443                assert!(field.is_none(), "expected whole-object error, got field={field:?}");
1444                assert!(msg.contains("bogus_future_field"), "msg: {msg}");
1445            }
1446            other => panic!("expected Config, got {other:?}"),
1447        }
1448    }
1449
1450    #[test]
1451    fn edge_flow_id_mismatch_fails_loud() {
1452        let f = fid();
1453        let other = fid();
1454        let edge = EdgeId::new();
1455        let (up, down) = eids_for_flow(&f);
1456        let raw = minimal_edge_hash(&other, &edge, &up, &down);
1457        let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1458        match err {
1459            SdkError::Config { field, message: msg, .. } => {
1460                assert_eq!(field.as_deref(), Some("flow_id"), "msg: {msg}");
1461                assert!(msg.contains("does not match"), "msg: {msg}");
1462            }
1463            other => panic!("expected Config, got {other:?}"),
1464        }
1465    }
1466
1467    #[test]
1468    fn edge_edge_id_mismatch_fails_loud() {
1469        let f = fid();
1470        let edge = EdgeId::new();
1471        let other_edge = EdgeId::new();
1472        let (up, down) = eids_for_flow(&f);
1473        let raw = minimal_edge_hash(&f, &other_edge, &up, &down);
1474        let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1475        match err {
1476            SdkError::Config { field, message: msg, .. } => {
1477                assert_eq!(field.as_deref(), Some("edge_id"), "msg: {msg}");
1478                assert!(msg.contains("does not match"), "msg: {msg}");
1479            }
1480            other => panic!("expected Config, got {other:?}"),
1481        }
1482    }
1483
1484    #[test]
1485    fn edge_missing_required_fields_fail_loud() {
1486        for want in [
1487            "edge_id",
1488            "flow_id",
1489            "upstream_execution_id",
1490            "downstream_execution_id",
1491            "dependency_kind",
1492            "satisfaction_condition",
1493            "edge_state",
1494            "created_at",
1495            "created_by",
1496        ] {
1497            let f = fid();
1498            let edge = EdgeId::new();
1499            let (up, down) = eids_for_flow(&f);
1500            let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1501            raw.remove(want);
1502            let err = build_edge_snapshot(&f, &edge, &raw)
1503                .err()
1504                .unwrap_or_else(|| panic!("missing {want} should fail"));
1505            match err {
1506                SdkError::Config { field, message: msg, .. } => {
1507                    assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1508                }
1509                other => panic!("expected Config for {want}, got {other:?}"),
1510            }
1511        }
1512    }
1513
1514    #[test]
1515    fn edge_malformed_created_at_fails_loud() {
1516        let f = fid();
1517        let edge = EdgeId::new();
1518        let (up, down) = eids_for_flow(&f);
1519        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1520        raw.insert("created_at".into(), "not-a-number".into());
1521        let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1522        match err {
1523            SdkError::Config { field, message: msg, .. } => {
1524                assert_eq!(field.as_deref(), Some("created_at"), "msg: {msg}");
1525            }
1526            other => panic!("expected Config, got {other:?}"),
1527        }
1528    }
1529
1530    #[test]
1531    fn edge_malformed_upstream_eid_fails_loud() {
1532        let f = fid();
1533        let edge = EdgeId::new();
1534        let (up, down) = eids_for_flow(&f);
1535        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1536        raw.insert("upstream_execution_id".into(), "not-an-execution-id".into());
1537        let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1538        match err {
1539            SdkError::Config { field, message: msg, .. } => {
1540                assert_eq!(field.as_deref(), Some("upstream_execution_id"), "msg: {msg}");
1541            }
1542            other => panic!("expected Config, got {other:?}"),
1543        }
1544    }
1545
1546    #[test]
1547    fn namespaced_tag_matcher_boundaries() {
1548        // Positive
1549        assert!(is_namespaced_tag_key("cairn.task_id"));
1550        assert!(is_namespaced_tag_key("a.b"));
1551        assert!(is_namespaced_tag_key("ab_12.field"));
1552        // Negative: no dot
1553        assert!(!is_namespaced_tag_key("cairn_task_id"));
1554        // Negative: uppercase prefix
1555        assert!(!is_namespaced_tag_key("Cairn.task"));
1556        // Negative: leading digit
1557        assert!(!is_namespaced_tag_key("1cairn.task"));
1558        // Negative: empty
1559        assert!(!is_namespaced_tag_key(""));
1560        // Negative: dot at position 0
1561        assert!(!is_namespaced_tag_key(".x"));
1562        // Negative: uppercase in prefix
1563        assert!(!is_namespaced_tag_key("caIrn.task"));
1564    }
1565}