Skip to main content

ff_core/contracts/
decode.rs

1// `EngineError` is ~200 bytes; the decoder and its helpers return
2// `Result<_, EngineError>` throughout to match the
3// [`crate::engine_backend::EngineBackend::list_edges`] contract. The
4// variant size is a cross-crate design point (see ff-backend-valkey's
5// crate-level allow for the same rationale); a future PR can box the
6// large `Conflict`/`Transport` variants. Module-local allow to
7// contain the exception to this one file.
8#![allow(clippy::result_large_err)]
9
10//! Canonical decoders for engine-owned hash shapes.
11//!
12//! RFC-012 Stage 1c (T2): the edge-hash decoder lives here so every
13//! `EngineBackend` implementation — not just `ff-backend-valkey` —
14//! shares one strict-parse posture and one error surface
15//! ([`EngineError::Validation { kind: Corruption, .. }`]). ff-sdk's
16//! snapshot module historically owned this code and surfaced
17//! `SdkError::Config`; the pre-migration wrapper still maps to that
18//! shape so public ff-sdk callers see no behavior change while the
19//! engine-side decoder moves.
20//!
21//! Stage 1c T3 adds [`build_execution_snapshot`] and
22//! [`build_flow_snapshot`] alongside the edge decoder: every
23//! engine-owned hash shape now parses through one canonical strict-parse
24//! surface, freeing `ff-backend-valkey` to implement
25//! `describe_execution` / `describe_flow` against the trait and letting
26//! ff-sdk collapse its snapshot module into thin trait forwarders.
27//!
28//! [`EngineError::Validation { kind: Corruption, .. }`]: crate::engine_error::EngineError::Validation
29
30use std::collections::{BTreeMap, HashMap};
31
32use crate::contracts::{
33    AttemptSummary, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, LeaseSummary,
34};
35use crate::engine_error::{EngineError, ValidationKind};
36use crate::state::PublicState;
37use crate::types::{
38    AttemptId, AttemptIndex, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch, LeaseId, Namespace,
39    TimestampMs, WaitpointId, WorkerInstanceId,
40};
41
42/// FF-owned fields on the flow-scoped `edge:<edge_id>` hash.
43///
44/// An HGETALL field outside this set signals on-disk corruption or
45/// protocol drift — see [`build_edge_snapshot`]'s unknown-field
46/// sweep. Kept `pub` so test fixtures / diagnostic tooling can share
47/// the canonical list instead of hard-coding duplicates.
48pub const EDGE_KNOWN_FIELDS: &[&str] = &[
49    "edge_id",
50    "flow_id",
51    "upstream_execution_id",
52    "downstream_execution_id",
53    "dependency_kind",
54    "satisfaction_condition",
55    "data_passing_ref",
56    "edge_state",
57    "created_at",
58    "created_by",
59];
60
61/// Assemble an [`EdgeSnapshot`] from the raw HGETALL field map.
62///
63/// Mirrors the pre-T2 ff-sdk free-fn: every validation gate (unknown
64/// fields, missing required fields, identity cross-check against the
65/// caller-supplied `flow_id`/`edge_id`) returns the same diagnostic
66/// shape, just routed through [`EngineError::Validation`] with
67/// [`ValidationKind::Corruption`] instead of `SdkError::Config`. The
68/// pre-migration ff-sdk wrapper re-maps to `SdkError::Config` for
69/// public-API parity; direct backend callers read the
70/// `EngineError::Validation` payload.
71///
72/// `flow_id` + `edge_id` are the caller's expected identities. The
73/// decoder verifies both are present and match the stored values; a
74/// mismatch or absence surfaces as `Corruption` because it indicates
75/// a wrong-key read or an on-disk drift.
76pub fn build_edge_snapshot(
77    flow_id: &FlowId,
78    edge_id: &EdgeId,
79    raw: &HashMap<String, String>,
80) -> Result<EdgeSnapshot, EngineError> {
81    // Unknown-field sweep — reject eagerly so a future FF rename that
82    // landed a new field surfaces as an explicit parse failure rather
83    // than silently dropping data.
84    for k in raw.keys() {
85        if !EDGE_KNOWN_FIELDS.contains(&k.as_str()) {
86            return Err(corruption(
87                "edge_snapshot: edge_hash",
88                None,
89                &format!("has unexpected field '{k}' (protocol drift or corruption?)"),
90            ));
91        }
92    }
93
94    // edge_id cross-check.
95    let stored_edge_id_str = required(raw, "edge_snapshot: edge_hash", "edge_id")?;
96    if stored_edge_id_str != edge_id.to_string() {
97        return Err(corruption(
98            "edge_snapshot: edge_hash",
99            Some("edge_id"),
100            &format!(
101                "'{stored_edge_id_str}' does not match requested edge_id \
102                 '{edge_id}' (key corruption or wrong-key read?)"
103            ),
104        ));
105    }
106
107    // flow_id cross-check.
108    let stored_flow_id_str = required(raw, "edge_snapshot: edge_hash", "flow_id")?;
109    if stored_flow_id_str != flow_id.to_string() {
110        return Err(corruption(
111            "edge_snapshot: edge_hash",
112            Some("flow_id"),
113            &format!(
114                "'{stored_flow_id_str}' does not match requested flow_id \
115                 '{flow_id}' (key corruption or wrong-key read?)"
116            ),
117        ));
118    }
119
120    let upstream_execution_id = parse_eid(raw, "upstream_execution_id")?;
121    let downstream_execution_id = parse_eid(raw, "downstream_execution_id")?;
122
123    let dependency_kind = required(raw, "edge_snapshot: edge_hash", "dependency_kind")?;
124    let satisfaction_condition =
125        required(raw, "edge_snapshot: edge_hash", "satisfaction_condition")?;
126
127    // data_passing_ref is stored as "" when the stager passed None.
128    // Treat empty as absent rather than surfacing an empty String.
129    let data_passing_ref = raw
130        .get("data_passing_ref")
131        .filter(|s| !s.is_empty())
132        .cloned();
133
134    let edge_state = required(raw, "edge_snapshot: edge_hash", "edge_state")?;
135
136    let created_at = parse_ts_required(raw, "edge_snapshot: edge_hash", "created_at")?;
137    let created_by = required(raw, "edge_snapshot: edge_hash", "created_by")?;
138
139    Ok(EdgeSnapshot::new(
140        edge_id.clone(),
141        flow_id.clone(),
142        upstream_execution_id,
143        downstream_execution_id,
144        dependency_kind,
145        satisfaction_condition,
146        data_passing_ref,
147        edge_state,
148        created_at,
149        created_by,
150    ))
151}
152
153/// Format a `Corruption` detail string in the
154/// `"<context>: <field?>: <message>"` shape documented on
155/// [`ValidationKind::Corruption`].
156fn corruption(context: &str, field: Option<&str>, message: &str) -> EngineError {
157    let detail = match field {
158        Some(f) => format!("{context}: {f}: {message}"),
159        None => format!("{context}: {message}"),
160    };
161    EngineError::Validation {
162        kind: ValidationKind::Corruption,
163        detail,
164    }
165}
166
167/// Fetch a required non-empty string field, emitting a `Corruption`
168/// error when the field is absent or empty.
169fn required(
170    raw: &HashMap<String, String>,
171    context: &str,
172    field: &str,
173) -> Result<String, EngineError> {
174    raw.get(field)
175        .filter(|s| !s.is_empty())
176        .cloned()
177        .ok_or_else(|| {
178            corruption(
179                context,
180                Some(field),
181                "is missing or empty (key corruption?)",
182            )
183        })
184}
185
186/// Parse a ms-timestamp field that must be present.
187fn parse_ts_required(
188    raw: &HashMap<String, String>,
189    context: &str,
190    field: &str,
191) -> Result<TimestampMs, EngineError> {
192    let s = required(raw, context, field)?;
193    let ms: i64 = s.parse().map_err(|e| {
194        corruption(
195            context,
196            Some(field),
197            &format!("is not a valid ms timestamp ('{s}'): {e}"),
198        )
199    })?;
200    Ok(TimestampMs::from_millis(ms))
201}
202
203/// Parse a required ExecutionId field.
204fn parse_eid(raw: &HashMap<String, String>, field: &str) -> Result<ExecutionId, EngineError> {
205    let s = required(raw, "edge_snapshot: edge_hash", field)?;
206    ExecutionId::parse(&s).map_err(|e| {
207        corruption(
208            "edge_snapshot: edge_hash",
209            Some(field),
210            &format!("'{s}' is not a valid ExecutionId (key corruption?): {e}"),
211        )
212    })
213}
214
215// ═══════════════════════════════════════════════════════════════════════
216// execution decoder (describe_execution)
217// ═══════════════════════════════════════════════════════════════════════
218
219/// Assemble an [`ExecutionSnapshot`] from the raw HGETALL field maps.
220///
221/// `core` is the HGETALL of `exec_core`, `tags_raw` the HGETALL of the
222/// sibling tags hash (which may be empty for executions created without
223/// tags). Every parse failure surfaces as
224/// [`EngineError::Validation { kind: Corruption, .. }`] — fields that
225/// FCALLs write atomically are strict-required, while fields that clear
226/// on transition (`blocking_reason`, `current_attempt_id`, etc.) are
227/// treated as absent when empty.
228pub fn build_execution_snapshot(
229    execution_id: ExecutionId,
230    core: &HashMap<String, String>,
231    tags_raw: HashMap<String, String>,
232) -> Result<Option<ExecutionSnapshot>, EngineError> {
233    let ctx = "describe_execution: exec_core";
234
235    let public_state = parse_public_state(opt_str(core, "public_state").unwrap_or(""))?;
236
237    // `LaneId::try_new` validates non-empty + ASCII-printable + <= 64 bytes.
238    // Exec_core writes a LaneId that already passed these invariants at
239    // ingress; a read that fails validation here signals on-disk
240    // corruption — surface it rather than silently constructing an
241    // invalid LaneId that would mis-partition downstream.
242    let lane_id = LaneId::try_new(opt_str(core, "lane_id").unwrap_or("")).map_err(|e| {
243        corruption(
244            ctx,
245            Some("lane_id"),
246            &format!("fails LaneId validation (key corruption?): {e}"),
247        )
248    })?;
249
250    let namespace_str = opt_str(core, "namespace").unwrap_or("").to_owned();
251    let namespace = Namespace::new(namespace_str);
252
253    let flow_id = opt_str(core, "flow_id")
254        .filter(|s| !s.is_empty())
255        .map(|s| {
256            FlowId::parse(s).map_err(|e| {
257                corruption(
258                    ctx,
259                    Some("flow_id"),
260                    &format!("is not a valid UUID (key corruption?): {e}"),
261                )
262            })
263        })
264        .transpose()?;
265
266    let blocking_reason = opt_str(core, "blocking_reason")
267        .filter(|s| !s.is_empty())
268        .map(str::to_owned);
269    let blocking_detail = opt_str(core, "blocking_detail")
270        .filter(|s| !s.is_empty())
271        .map(str::to_owned);
272
273    // created_at + last_mutation_at are engine-maintained invariants
274    // (lua/execution.lua writes both on create; every mutating FCALL
275    // updates last_mutation_at). Missing values indicate on-disk
276    // corruption, not a valid pre-create state — fail loudly.
277    let created_at = parse_ts(core, ctx, "created_at")?.ok_or_else(|| {
278        corruption(
279            ctx,
280            Some("created_at"),
281            "is missing or empty (key corruption?)",
282        )
283    })?;
284    let last_mutation_at = parse_ts(core, ctx, "last_mutation_at")?.ok_or_else(|| {
285        corruption(
286            ctx,
287            Some("last_mutation_at"),
288            "is missing or empty (key corruption?)",
289        )
290    })?;
291
292    let total_attempt_count: u32 =
293        parse_u32_strict(core, ctx, "total_attempt_count")?.unwrap_or(0);
294
295    let current_attempt = build_attempt_summary(core)?;
296    let current_lease = build_lease_summary(core)?;
297
298    let current_waitpoint = opt_str(core, "current_waitpoint_id")
299        .filter(|s| !s.is_empty())
300        .map(|s| {
301            WaitpointId::parse(s).map_err(|e| {
302                corruption(
303                    ctx,
304                    Some("current_waitpoint_id"),
305                    &format!("is not a valid UUID (key corruption?): {e}"),
306                )
307            })
308        })
309        .transpose()?;
310
311    let tags: BTreeMap<String, String> = tags_raw.into_iter().collect();
312
313    Ok(Some(ExecutionSnapshot::new(
314        execution_id,
315        flow_id,
316        lane_id,
317        namespace,
318        public_state,
319        blocking_reason,
320        blocking_detail,
321        current_attempt,
322        current_lease,
323        current_waitpoint,
324        created_at,
325        last_mutation_at,
326        total_attempt_count,
327        tags,
328    )))
329}
330
331fn opt_str<'a>(map: &'a HashMap<String, String>, field: &str) -> Option<&'a str> {
332    map.get(field).map(String::as_str)
333}
334
335/// Strictly parse a ms-timestamp field. `Ok(None)` when absent/empty,
336/// `Err` on unparseable content. `context` names both the calling
337/// FCALL and the hash (e.g. `"describe_execution: exec_core"`) so
338/// error messages point to the exact source of corruption.
339fn parse_ts(
340    map: &HashMap<String, String>,
341    context: &str,
342    field: &str,
343) -> Result<Option<TimestampMs>, EngineError> {
344    match opt_str(map, field).filter(|s| !s.is_empty()) {
345        None => Ok(None),
346        Some(raw) => {
347            let ms: i64 = raw.parse().map_err(|e| {
348                corruption(
349                    context,
350                    Some(field),
351                    &format!("is not a valid ms timestamp ('{raw}'): {e}"),
352                )
353            })?;
354            Ok(Some(TimestampMs::from_millis(ms)))
355        }
356    }
357}
358
359/// Strictly parse a `u32` field. Returns `Ok(None)` when the field is
360/// absent or empty (a valid pre-write state), `Err` when the value is
361/// present but unparseable (on-disk corruption).
362fn parse_u32_strict(
363    map: &HashMap<String, String>,
364    context: &str,
365    field: &str,
366) -> Result<Option<u32>, EngineError> {
367    match opt_str(map, field).filter(|s| !s.is_empty()) {
368        None => Ok(None),
369        Some(raw) => Ok(Some(raw.parse().map_err(|e| {
370            corruption(
371                context,
372                Some(field),
373                &format!("is not a valid u32 ('{raw}'): {e}"),
374            )
375        })?)),
376    }
377}
378
379/// Strictly parse a `u64` field. Semantics mirror [`parse_u32_strict`].
380fn parse_u64_strict(
381    map: &HashMap<String, String>,
382    context: &str,
383    field: &str,
384) -> Result<Option<u64>, EngineError> {
385    match opt_str(map, field).filter(|s| !s.is_empty()) {
386        None => Ok(None),
387        Some(raw) => Ok(Some(raw.parse().map_err(|e| {
388            corruption(
389                context,
390                Some(field),
391                &format!("is not a valid u64 ('{raw}'): {e}"),
392            )
393        })?)),
394    }
395}
396
397fn parse_public_state(raw: &str) -> Result<PublicState, EngineError> {
398    // exec_core stores the snake_case literal (e.g. "waiting"). PublicState's
399    // Deserialize accepts the JSON-quoted form, so wrap + delegate.
400    let quoted = format!("\"{raw}\"");
401    serde_json::from_str(&quoted).map_err(|e| {
402        corruption(
403            "describe_execution: exec_core",
404            Some("public_state"),
405            &format!("'{raw}' is not a known public state: {e}"),
406        )
407    })
408}
409
410fn build_attempt_summary(
411    core: &HashMap<String, String>,
412) -> Result<Option<AttemptSummary>, EngineError> {
413    let ctx = "describe_execution: exec_core";
414    let attempt_id_str = match opt_str(core, "current_attempt_id").filter(|s| !s.is_empty()) {
415        None => return Ok(None),
416        Some(s) => s,
417    };
418    let attempt_id = AttemptId::parse(attempt_id_str).map_err(|e| {
419        corruption(
420            ctx,
421            Some("current_attempt_id"),
422            &format!("is not a valid UUID: {e}"),
423        )
424    })?;
425    // When `current_attempt_id` is set, `current_attempt_index` MUST be
426    // set too — lua/execution.lua writes both atomically in
427    // `ff_claim_execution`. A missing index while the id is populated
428    // is corruption, not a valid intermediate state.
429    let attempt_index = parse_u32_strict(core, ctx, "current_attempt_index")?.ok_or_else(|| {
430        corruption(
431            ctx,
432            Some("current_attempt_index"),
433            "is missing while current_attempt_id is set (key corruption?)",
434        )
435    })?;
436    Ok(Some(AttemptSummary::new(
437        attempt_id,
438        AttemptIndex::new(attempt_index),
439    )))
440}
441
442fn build_lease_summary(
443    core: &HashMap<String, String>,
444) -> Result<Option<LeaseSummary>, EngineError> {
445    let ctx = "describe_execution: exec_core";
446    // A lease is "held" when the worker_instance_id field is populated
447    // AND lease_expires_at is set. Both clear together on revoke/expire
448    // (see clear_lease_and_indexes in lua/helpers.lua).
449    let wid_str = match opt_str(core, "current_worker_instance_id").filter(|s| !s.is_empty()) {
450        None => return Ok(None),
451        Some(s) => s,
452    };
453    let expires_at = match parse_ts(core, ctx, "lease_expires_at")? {
454        None => return Ok(None),
455        Some(ts) => ts,
456    };
457    // A lease is only "held" if the epoch is present too — lua/helpers.lua
458    // sets/clears epoch atomically with wid + expires_at. Parse strictly
459    // and require it: a missing epoch alongside a live wid is corruption.
460    let epoch = parse_u64_strict(core, ctx, "current_lease_epoch")?.ok_or_else(|| {
461        corruption(
462            ctx,
463            Some("current_lease_epoch"),
464            "is missing while current_worker_instance_id is set (key corruption?)",
465        )
466    })?;
467    // FF#278: surface lease_id + attempt_index + last_heartbeat_at.
468    //
469    // lease_id + current_attempt_index are set atomically alongside
470    // current_worker_instance_id at claim time (see
471    // ff_claim_execution / ff_claim_resumed_execution in
472    // flowfabric.lua); a populated worker_instance_id with an empty /
473    // missing lease_id or attempt_index is corruption.
474    let lease_id_str = opt_str(core, "current_lease_id")
475        .filter(|s| !s.is_empty())
476        .ok_or_else(|| {
477            corruption(
478                ctx,
479                Some("current_lease_id"),
480                "is missing while current_worker_instance_id is set (key corruption?)",
481            )
482        })?;
483    let lease_id = LeaseId::parse(lease_id_str).map_err(|e| {
484        corruption(
485            ctx,
486            Some("current_lease_id"),
487            &format!("is not a valid UUID: {e}"),
488        )
489    })?;
490    let attempt_index =
491        parse_u32_strict(core, ctx, "current_attempt_index")?.ok_or_else(|| {
492            corruption(
493                ctx,
494                Some("current_attempt_index"),
495                "is missing while current_worker_instance_id is set (key corruption?)",
496            )
497        })?;
498    // lease_last_renewed_at is set on every claim + renew, but legacy
499    // data (pre-0.9) and unrelated backends may leave it empty. Treat
500    // missing/empty as None rather than surfacing a synthetic value.
501    let last_heartbeat_at = parse_ts(core, ctx, "lease_last_renewed_at")?;
502
503    let mut summary = LeaseSummary::new(
504        LeaseEpoch::new(epoch),
505        WorkerInstanceId::new(wid_str.to_owned()),
506        expires_at,
507    )
508    .with_lease_id(lease_id)
509    .with_attempt_index(AttemptIndex::new(attempt_index));
510    if let Some(ts) = last_heartbeat_at {
511        summary = summary.with_last_heartbeat_at(ts);
512    }
513    Ok(Some(summary))
514}
515
516// ═══════════════════════════════════════════════════════════════════════
517// flow decoder (describe_flow)
518// ═══════════════════════════════════════════════════════════════════════
519
520/// FF-owned snake_case fields on flow_core. Any HGETALL field NOT in
521/// this set AND matching the `^[a-z][a-z0-9_]*\.` namespaced-tag shape
522/// is surfaced on [`FlowSnapshot::tags`]. Fields that are neither FF-
523/// owned nor namespaced (unexpected shapes) are surfaced as a
524/// `Corruption` error so on-disk corruption or protocol drift fails loud.
525pub const FLOW_CORE_KNOWN_FIELDS: &[&str] = &[
526    "flow_id",
527    "flow_kind",
528    "namespace",
529    "public_flow_state",
530    "graph_revision",
531    "node_count",
532    "edge_count",
533    "created_at",
534    "last_mutation_at",
535    "cancelled_at",
536    "cancel_reason",
537    "cancellation_policy",
538];
539
540/// Assemble a [`FlowSnapshot`] from the raw HGETALL field map.
541///
542/// Cross-checks the stored `flow_id` against the caller's expected id.
543/// Unknown fields that match the `^[a-z][a-z0-9_]*\.` namespaced-tag
544/// shape are routed to `tags`; any other unknown field surfaces as
545/// `Corruption`.
546pub fn build_flow_snapshot(
547    flow_id: FlowId,
548    raw: &HashMap<String, String>,
549    edge_groups: Vec<crate::contracts::EdgeGroupSnapshot>,
550) -> Result<FlowSnapshot, EngineError> {
551    let ctx = "describe_flow: flow_core";
552
553    // flow_id cross-check — corruption or wrong-key read.
554    let stored_flow_id_str = opt_str(raw, "flow_id")
555        .filter(|s| !s.is_empty())
556        .ok_or_else(|| corruption(ctx, Some("flow_id"), "is missing or empty (key corruption?)"))?;
557    if stored_flow_id_str != flow_id.to_string() {
558        return Err(corruption(
559            ctx,
560            Some("flow_id"),
561            &format!(
562                "'{stored_flow_id_str}' does not match requested flow_id \
563                 '{flow_id}' (key corruption or wrong-key read?)"
564            ),
565        ));
566    }
567
568    let namespace_str = opt_str(raw, "namespace")
569        .filter(|s| !s.is_empty())
570        .ok_or_else(|| {
571            corruption(ctx, Some("namespace"), "is missing or empty (key corruption?)")
572        })?;
573    let namespace = Namespace::new(namespace_str.to_owned());
574
575    let flow_kind = opt_str(raw, "flow_kind")
576        .filter(|s| !s.is_empty())
577        .ok_or_else(|| {
578            corruption(ctx, Some("flow_kind"), "is missing or empty (key corruption?)")
579        })?
580        .to_owned();
581
582    let public_flow_state = opt_str(raw, "public_flow_state")
583        .filter(|s| !s.is_empty())
584        .ok_or_else(|| {
585            corruption(
586                ctx,
587                Some("public_flow_state"),
588                "is missing or empty (key corruption?)",
589            )
590        })?
591        .to_owned();
592
593    let graph_revision = parse_u64_strict(raw, ctx, "graph_revision")?
594        .ok_or_else(|| corruption(ctx, Some("graph_revision"), "is missing (key corruption?)"))?;
595    let node_count = parse_u32_strict(raw, ctx, "node_count")?
596        .ok_or_else(|| corruption(ctx, Some("node_count"), "is missing (key corruption?)"))?;
597    let edge_count = parse_u32_strict(raw, ctx, "edge_count")?
598        .ok_or_else(|| corruption(ctx, Some("edge_count"), "is missing (key corruption?)"))?;
599
600    let created_at = parse_ts(raw, ctx, "created_at")?.ok_or_else(|| {
601        corruption(
602            ctx,
603            Some("created_at"),
604            "is missing or empty (key corruption?)",
605        )
606    })?;
607    let last_mutation_at = parse_ts(raw, ctx, "last_mutation_at")?.ok_or_else(|| {
608        corruption(
609            ctx,
610            Some("last_mutation_at"),
611            "is missing or empty (key corruption?)",
612        )
613    })?;
614
615    let cancelled_at = parse_ts(raw, ctx, "cancelled_at")?;
616    let cancel_reason = opt_str(raw, "cancel_reason")
617        .filter(|s| !s.is_empty())
618        .map(str::to_owned);
619    let cancellation_policy = opt_str(raw, "cancellation_policy")
620        .filter(|s| !s.is_empty())
621        .map(str::to_owned);
622
623    // Route unknown fields: namespaced-prefix (e.g. `cairn.task_id`) →
624    // tags; anything else → corruption.
625    let mut tags: BTreeMap<String, String> = BTreeMap::new();
626    for (k, v) in raw {
627        if FLOW_CORE_KNOWN_FIELDS.contains(&k.as_str()) {
628            continue;
629        }
630        if is_namespaced_tag_key(k) {
631            tags.insert(k.clone(), v.clone());
632        } else {
633            return Err(corruption(
634                ctx,
635                None,
636                &format!(
637                    "has unexpected field '{k}' — not an FF field and not a namespaced \
638                     tag (lowercase-alphanumeric-prefix + '.')"
639                ),
640            ));
641        }
642    }
643
644    Ok(FlowSnapshot::new(
645        flow_id,
646        flow_kind,
647        namespace,
648        public_flow_state,
649        graph_revision,
650        node_count,
651        edge_count,
652        created_at,
653        last_mutation_at,
654        cancelled_at,
655        cancel_reason,
656        cancellation_policy,
657        tags,
658        edge_groups,
659    ))
660}
661
662/// Match the namespaced-tag shape `^[a-z][a-z0-9_]*\.` documented on
663/// [`ExecutionSnapshot::tags`] / [`FlowSnapshot::tags`]. Kept inline
664/// (no regex dependency) — the shape is tight enough to hand-check.
665pub(crate) fn is_namespaced_tag_key(k: &str) -> bool {
666    let mut chars = k.chars();
667    let Some(first) = chars.next() else {
668        return false;
669    };
670    if !first.is_ascii_lowercase() {
671        return false;
672    }
673    let mut saw_dot = false;
674    for c in chars {
675        if c == '.' {
676            saw_dot = true;
677            break;
678        }
679        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
680            return false;
681        }
682    }
683    saw_dot
684}
685
686#[cfg(test)]
687mod tests {
688    use super::*;
689    use crate::partition::PartitionConfig;
690
691    fn fid() -> FlowId {
692        FlowId::new()
693    }
694
695    fn eids_for_flow(f: &FlowId) -> (ExecutionId, ExecutionId) {
696        let cfg = PartitionConfig::default();
697        (
698            ExecutionId::for_flow(f, &cfg),
699            ExecutionId::for_flow(f, &cfg),
700        )
701    }
702
703    fn minimal_edge_hash(
704        flow: &FlowId,
705        edge: &EdgeId,
706        up: &ExecutionId,
707        down: &ExecutionId,
708    ) -> HashMap<String, String> {
709        let mut m = HashMap::new();
710        m.insert("edge_id".into(), edge.to_string());
711        m.insert("flow_id".into(), flow.to_string());
712        m.insert("upstream_execution_id".into(), up.to_string());
713        m.insert("downstream_execution_id".into(), down.to_string());
714        m.insert("dependency_kind".into(), "success_only".into());
715        m.insert("satisfaction_condition".into(), "all_required".into());
716        m.insert("data_passing_ref".into(), String::new());
717        m.insert("edge_state".into(), "pending".into());
718        m.insert("created_at".into(), "1234".into());
719        m.insert("created_by".into(), "engine".into());
720        m
721    }
722
723    #[test]
724    fn round_trips_all_fields() {
725        let f = fid();
726        let edge = EdgeId::new();
727        let (up, down) = eids_for_flow(&f);
728        let raw = minimal_edge_hash(&f, &edge, &up, &down);
729        let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
730        assert_eq!(snap.edge_id, edge);
731        assert_eq!(snap.flow_id, f);
732        assert_eq!(snap.upstream_execution_id, up);
733        assert_eq!(snap.downstream_execution_id, down);
734        assert_eq!(snap.dependency_kind, "success_only");
735        assert_eq!(snap.satisfaction_condition, "all_required");
736        assert!(snap.data_passing_ref.is_none());
737        assert_eq!(snap.edge_state, "pending");
738        assert_eq!(snap.created_at.0, 1234);
739        assert_eq!(snap.created_by, "engine");
740    }
741
742    #[test]
743    fn data_passing_ref_round_trips_when_set() {
744        let f = fid();
745        let edge = EdgeId::new();
746        let (up, down) = eids_for_flow(&f);
747        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
748        raw.insert("data_passing_ref".into(), "ref://blob-42".into());
749        let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
750        assert_eq!(snap.data_passing_ref.as_deref(), Some("ref://blob-42"));
751    }
752
753    fn expect_corruption(err: EngineError) -> String {
754        match err {
755            EngineError::Validation {
756                kind: ValidationKind::Corruption,
757                detail,
758            } => detail,
759            other => panic!("expected Validation::Corruption, got {other:?}"),
760        }
761    }
762
763    #[test]
764    fn unknown_field_fails_loud() {
765        let f = fid();
766        let edge = EdgeId::new();
767        let (up, down) = eids_for_flow(&f);
768        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
769        raw.insert("bogus_future_field".into(), "v".into());
770        let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
771        assert!(detail.contains("bogus_future_field"), "{detail}");
772    }
773
774    #[test]
775    fn flow_id_mismatch_fails_loud() {
776        let f = fid();
777        let other = fid();
778        let edge = EdgeId::new();
779        let (up, down) = eids_for_flow(&f);
780        let raw = minimal_edge_hash(&other, &edge, &up, &down);
781        let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
782        assert!(detail.contains("flow_id"), "{detail}");
783        assert!(detail.contains("does not match"), "{detail}");
784    }
785
786    #[test]
787    fn edge_id_mismatch_fails_loud() {
788        let f = fid();
789        let edge = EdgeId::new();
790        let other_edge = EdgeId::new();
791        let (up, down) = eids_for_flow(&f);
792        let raw = minimal_edge_hash(&f, &other_edge, &up, &down);
793        let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
794        assert!(detail.contains("edge_id"), "{detail}");
795        assert!(detail.contains("does not match"), "{detail}");
796    }
797
798    #[test]
799    fn missing_required_fields_fail_loud() {
800        for want in [
801            "edge_id",
802            "flow_id",
803            "upstream_execution_id",
804            "downstream_execution_id",
805            "dependency_kind",
806            "satisfaction_condition",
807            "edge_state",
808            "created_at",
809            "created_by",
810        ] {
811            let f = fid();
812            let edge = EdgeId::new();
813            let (up, down) = eids_for_flow(&f);
814            let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
815            raw.remove(want);
816            let err = build_edge_snapshot(&f, &edge, &raw)
817                .err()
818                .unwrap_or_else(|| panic!("missing {want} should fail"));
819            let detail = expect_corruption(err);
820            assert!(detail.contains(want), "detail for {want}: {detail}");
821        }
822    }
823
824    #[test]
825    fn malformed_created_at_fails_loud() {
826        let f = fid();
827        let edge = EdgeId::new();
828        let (up, down) = eids_for_flow(&f);
829        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
830        raw.insert("created_at".into(), "not-a-number".into());
831        let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
832        assert!(detail.contains("created_at"), "{detail}");
833    }
834
835    #[test]
836    fn malformed_upstream_eid_fails_loud() {
837        let f = fid();
838        let edge = EdgeId::new();
839        let (up, down) = eids_for_flow(&f);
840        let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
841        raw.insert("upstream_execution_id".into(), "not-an-execution-id".into());
842        let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
843        assert!(detail.contains("upstream_execution_id"), "{detail}");
844    }
845
846    // ─── ExecutionSnapshot (describe_execution) ───────────────────────
847
848    fn eid() -> ExecutionId {
849        let config = PartitionConfig::default();
850        ExecutionId::for_flow(&FlowId::new(), &config)
851    }
852
853    fn minimal_core(public_state: &str) -> HashMap<String, String> {
854        let mut m = HashMap::new();
855        m.insert("public_state".to_owned(), public_state.to_owned());
856        m.insert("lane_id".to_owned(), "default".to_owned());
857        m.insert("namespace".to_owned(), "ns".to_owned());
858        m.insert("created_at".to_owned(), "1000".to_owned());
859        m.insert("last_mutation_at".to_owned(), "2000".to_owned());
860        m.insert("total_attempt_count".to_owned(), "0".to_owned());
861        m
862    }
863
864    fn expect_corruption_field<F>(err: EngineError, pred: F)
865    where
866        F: FnOnce(&str) -> bool,
867    {
868        let detail = expect_corruption(err);
869        assert!(pred(&detail), "detail did not match predicate: {detail}");
870    }
871
872    #[test]
873    fn waiting_exec_no_attempt_no_lease_no_tags() {
874        let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), HashMap::new())
875            .unwrap()
876            .expect("should build");
877        assert_eq!(snap.public_state, PublicState::Waiting);
878        assert!(snap.current_attempt.is_none());
879        assert!(snap.current_lease.is_none());
880        assert!(snap.current_waitpoint.is_none());
881        assert_eq!(snap.tags.len(), 0);
882        assert_eq!(snap.created_at.0, 1000);
883        assert_eq!(snap.last_mutation_at.0, 2000);
884        assert!(snap.flow_id.is_none());
885        assert!(snap.blocking_reason.is_none());
886    }
887
888    #[test]
889    fn tags_flow_through_sorted() {
890        let mut tags = HashMap::new();
891        tags.insert("cairn.task_id".to_owned(), "t-1".to_owned());
892        tags.insert("cairn.project".to_owned(), "proj".to_owned());
893        let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), tags)
894            .unwrap()
895            .unwrap();
896        let keys: Vec<_> = snap.tags.keys().cloned().collect();
897        assert_eq!(
898            keys,
899            vec!["cairn.project".to_owned(), "cairn.task_id".to_owned()]
900        );
901    }
902
903    #[test]
904    fn invalid_public_state_fails_loud() {
905        let err =
906            build_execution_snapshot(eid(), &minimal_core("bogus"), HashMap::new()).unwrap_err();
907        expect_corruption_field(err, |d| d.contains("public_state"));
908    }
909
910    #[test]
911    fn invalid_lane_id_fails_loud() {
912        let mut core = minimal_core("waiting");
913        core.insert("lane_id".to_owned(), "lane\nbroken".to_owned());
914        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
915        expect_corruption_field(err, |d| d.contains("lane_id"));
916    }
917
918    #[test]
919    fn missing_required_timestamps_fail_loud() {
920        for want in ["created_at", "last_mutation_at"] {
921            let mut core = minimal_core("waiting");
922            core.remove(want);
923            let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
924            expect_corruption_field(err, |d| d.contains(want));
925        }
926    }
927
928    #[test]
929    fn malformed_total_attempt_count_fails_loud() {
930        let mut core = minimal_core("waiting");
931        core.insert("total_attempt_count".to_owned(), "not-a-number".to_owned());
932        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
933        expect_corruption_field(err, |d| d.contains("total_attempt_count"));
934    }
935
936    #[test]
937    fn attempt_id_without_index_fails_loud() {
938        let mut core = minimal_core("active");
939        core.insert(
940            "current_attempt_id".to_owned(),
941            AttemptId::new().to_string(),
942        );
943        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
944        expect_corruption_field(err, |d| d.contains("current_attempt_index"));
945    }
946
947    #[test]
948    fn lease_without_epoch_fails_loud() {
949        let mut core = minimal_core("active");
950        core.insert(
951            "current_worker_instance_id".to_owned(),
952            "w-inst-1".to_owned(),
953        );
954        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
955        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
956        expect_corruption_field(err, |d| d.contains("current_lease_epoch"));
957    }
958
959    #[test]
960    fn lease_summary_requires_both_wid_and_expires_at() {
961        let mut core = minimal_core("active");
962        core.insert(
963            "current_worker_instance_id".to_owned(),
964            "w-inst-1".to_owned(),
965        );
966        let snap = build_execution_snapshot(eid(), &core, HashMap::new())
967            .unwrap()
968            .unwrap();
969        assert!(snap.current_lease.is_none());
970
971        let lid = LeaseId::new();
972        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
973        core.insert("current_lease_epoch".to_owned(), "3".to_owned());
974        core.insert("current_lease_id".to_owned(), lid.to_string());
975        core.insert("current_attempt_index".to_owned(), "2".to_owned());
976        core.insert("lease_last_renewed_at".to_owned(), "8500".to_owned());
977        let snap = build_execution_snapshot(eid(), &core, HashMap::new())
978            .unwrap()
979            .unwrap();
980        let lease = snap.current_lease.expect("lease present");
981        assert_eq!(lease.lease_epoch, LeaseEpoch::new(3));
982        assert_eq!(lease.expires_at.0, 9000);
983        assert_eq!(lease.worker_instance_id.as_str(), "w-inst-1");
984        // FF#278 additions.
985        assert_eq!(lease.lease_id, lid);
986        assert_eq!(lease.attempt_index, AttemptIndex::new(2));
987        assert_eq!(lease.last_heartbeat_at.map(|t| t.0), Some(8500));
988    }
989
990    // FF#278: when lease_last_renewed_at is missing (legacy / backend
991    // that does not surface heartbeat ticks), last_heartbeat_at is None.
992    #[test]
993    fn lease_summary_without_heartbeat_returns_none() {
994        let mut core = minimal_core("active");
995        core.insert(
996            "current_worker_instance_id".to_owned(),
997            "w-inst-1".to_owned(),
998        );
999        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1000        core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1001        core.insert("current_lease_id".to_owned(), LeaseId::new().to_string());
1002        core.insert("current_attempt_index".to_owned(), "1".to_owned());
1003        let snap = build_execution_snapshot(eid(), &core, HashMap::new())
1004            .unwrap()
1005            .unwrap();
1006        let lease = snap.current_lease.expect("lease present");
1007        assert!(lease.last_heartbeat_at.is_none());
1008    }
1009
1010    // FF#278: populated worker_instance_id with missing lease_id is
1011    // corruption — they are written atomically at claim time.
1012    #[test]
1013    fn lease_without_lease_id_fails_loud() {
1014        let mut core = minimal_core("active");
1015        core.insert(
1016            "current_worker_instance_id".to_owned(),
1017            "w-inst-1".to_owned(),
1018        );
1019        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1020        core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1021        core.insert("current_attempt_index".to_owned(), "1".to_owned());
1022        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1023        expect_corruption_field(err, |d| d.contains("current_lease_id"));
1024    }
1025
1026    #[test]
1027    fn lease_with_bad_lease_id_fails_loud() {
1028        let mut core = minimal_core("active");
1029        core.insert(
1030            "current_worker_instance_id".to_owned(),
1031            "w-inst-1".to_owned(),
1032        );
1033        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1034        core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1035        core.insert("current_lease_id".to_owned(), "not-a-uuid".to_owned());
1036        core.insert("current_attempt_index".to_owned(), "1".to_owned());
1037        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1038        expect_corruption_field(err, |d| d.contains("current_lease_id"));
1039    }
1040
1041    // FF#278: populated worker_instance_id with missing attempt_index
1042    // is corruption — they are written atomically at claim time.
1043    #[test]
1044    fn lease_without_attempt_index_fails_loud() {
1045        let mut core = minimal_core("active");
1046        core.insert(
1047            "current_worker_instance_id".to_owned(),
1048            "w-inst-1".to_owned(),
1049        );
1050        core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1051        core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1052        core.insert("current_lease_id".to_owned(), LeaseId::new().to_string());
1053        let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1054        expect_corruption_field(err, |d| d.contains("current_attempt_index"));
1055    }
1056
1057    // ─── FlowSnapshot (describe_flow) ─────────────────────────────────
1058
1059    fn minimal_flow_core(id: &FlowId, state: &str) -> HashMap<String, String> {
1060        let mut m = HashMap::new();
1061        m.insert("flow_id".to_owned(), id.to_string());
1062        m.insert("flow_kind".to_owned(), "dag".to_owned());
1063        m.insert("namespace".to_owned(), "ns".to_owned());
1064        m.insert("public_flow_state".to_owned(), state.to_owned());
1065        m.insert("graph_revision".to_owned(), "0".to_owned());
1066        m.insert("node_count".to_owned(), "0".to_owned());
1067        m.insert("edge_count".to_owned(), "0".to_owned());
1068        m.insert("created_at".to_owned(), "1000".to_owned());
1069        m.insert("last_mutation_at".to_owned(), "1000".to_owned());
1070        m
1071    }
1072
1073    #[test]
1074    fn open_flow_round_trips() {
1075        let f = fid();
1076        let snap = build_flow_snapshot(f.clone(), &minimal_flow_core(&f, "open"), Vec::new()).unwrap();
1077        assert_eq!(snap.flow_id, f);
1078        assert_eq!(snap.flow_kind, "dag");
1079        assert_eq!(snap.namespace.as_str(), "ns");
1080        assert_eq!(snap.public_flow_state, "open");
1081        assert_eq!(snap.graph_revision, 0);
1082        assert_eq!(snap.node_count, 0);
1083        assert_eq!(snap.edge_count, 0);
1084        assert_eq!(snap.created_at.0, 1000);
1085        assert_eq!(snap.last_mutation_at.0, 1000);
1086        assert!(snap.cancelled_at.is_none());
1087        assert!(snap.cancel_reason.is_none());
1088        assert!(snap.cancellation_policy.is_none());
1089        assert!(snap.tags.is_empty());
1090    }
1091
1092    #[test]
1093    fn cancelled_flow_surfaces_cancel_fields() {
1094        let f = fid();
1095        let mut core = minimal_flow_core(&f, "cancelled");
1096        core.insert("cancelled_at".to_owned(), "2000".to_owned());
1097        core.insert("cancel_reason".to_owned(), "operator".to_owned());
1098        core.insert("cancellation_policy".to_owned(), "cancel_all".to_owned());
1099        let snap = build_flow_snapshot(f, &core, Vec::new()).unwrap();
1100        assert_eq!(snap.public_flow_state, "cancelled");
1101        assert_eq!(snap.cancelled_at.unwrap().0, 2000);
1102        assert_eq!(snap.cancel_reason.as_deref(), Some("operator"));
1103        assert_eq!(snap.cancellation_policy.as_deref(), Some("cancel_all"));
1104    }
1105
1106    #[test]
1107    fn namespaced_tags_routed_to_tags_map() {
1108        let f = fid();
1109        let mut core = minimal_flow_core(&f, "open");
1110        core.insert("cairn.task_id".to_owned(), "t-1".to_owned());
1111        core.insert("cairn.project".to_owned(), "proj".to_owned());
1112        core.insert("operator.label".to_owned(), "v".to_owned());
1113        let snap = build_flow_snapshot(f, &core, Vec::new()).unwrap();
1114        assert_eq!(snap.tags.len(), 3);
1115        let keys: Vec<_> = snap.tags.keys().cloned().collect();
1116        assert_eq!(
1117            keys,
1118            vec![
1119                "cairn.project".to_owned(),
1120                "cairn.task_id".to_owned(),
1121                "operator.label".to_owned()
1122            ]
1123        );
1124    }
1125
1126    #[test]
1127    fn unknown_flat_field_fails_loud() {
1128        let f = fid();
1129        let mut core = minimal_flow_core(&f, "open");
1130        core.insert("bogus_future_field".to_owned(), "v".to_owned());
1131        let err = build_flow_snapshot(f, &core, Vec::new()).unwrap_err();
1132        expect_corruption_field(err, |d| d.contains("bogus_future_field"));
1133    }
1134
1135    #[test]
1136    fn missing_required_flow_fields_fail_loud() {
1137        for want in [
1138            "flow_id",
1139            "namespace",
1140            "flow_kind",
1141            "public_flow_state",
1142            "graph_revision",
1143            "node_count",
1144            "edge_count",
1145            "created_at",
1146            "last_mutation_at",
1147        ] {
1148            let f = fid();
1149            let mut core = minimal_flow_core(&f, "open");
1150            core.remove(want);
1151            let err = build_flow_snapshot(f, &core, Vec::new()).err().unwrap_or_else(|| {
1152                panic!("field {want} should fail but build_flow_snapshot returned Ok")
1153            });
1154            expect_corruption_field(err, |d| d.contains(want));
1155        }
1156    }
1157
1158    #[test]
1159    fn empty_required_strings_fail_loud() {
1160        for want in ["flow_id", "namespace", "flow_kind", "public_flow_state"] {
1161            let f = fid();
1162            let mut core = minimal_flow_core(&f, "open");
1163            core.insert(want.to_owned(), String::new());
1164            let err = build_flow_snapshot(f, &core, Vec::new()).err().unwrap_or_else(|| {
1165                panic!("empty {want} should fail but build_flow_snapshot returned Ok")
1166            });
1167            expect_corruption_field(err, |d| d.contains(want));
1168        }
1169    }
1170
1171    #[test]
1172    fn flow_snapshot_flow_id_mismatch_fails_loud() {
1173        let requested = fid();
1174        let other = fid();
1175        let core = minimal_flow_core(&other, "open");
1176        let err = build_flow_snapshot(requested, &core, Vec::new()).unwrap_err();
1177        expect_corruption_field(err, |d| d.contains("flow_id") && d.contains("does not match"));
1178    }
1179
1180    #[test]
1181    fn malformed_counter_fails_loud() {
1182        let f = fid();
1183        let mut core = minimal_flow_core(&f, "open");
1184        core.insert("graph_revision".to_owned(), "not-a-number".to_owned());
1185        let err = build_flow_snapshot(f, &core, Vec::new()).unwrap_err();
1186        expect_corruption_field(err, |d| d.contains("graph_revision"));
1187    }
1188
1189    #[test]
1190    fn namespaced_tag_matcher_boundaries() {
1191        assert!(is_namespaced_tag_key("cairn.task_id"));
1192        assert!(is_namespaced_tag_key("a.b"));
1193        assert!(is_namespaced_tag_key("ab_12.field"));
1194        assert!(!is_namespaced_tag_key("cairn_task_id"));
1195        assert!(!is_namespaced_tag_key("Cairn.task"));
1196        assert!(!is_namespaced_tag_key("1cairn.task"));
1197        assert!(!is_namespaced_tag_key(""));
1198        assert!(!is_namespaced_tag_key(".x"));
1199        assert!(!is_namespaced_tag_key("caIrn.task"));
1200    }
1201}