1#![allow(clippy::result_large_err)]
9
10use std::collections::{BTreeMap, HashMap};
31
32use crate::contracts::{
33 AttemptSummary, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, LeaseSummary,
34};
35use crate::engine_error::{EngineError, ValidationKind};
36use crate::state::PublicState;
37use crate::types::{
38 AttemptId, AttemptIndex, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch, LeaseId, Namespace,
39 TimestampMs, WaitpointId, WorkerInstanceId,
40};
41
42pub const EDGE_KNOWN_FIELDS: &[&str] = &[
49 "edge_id",
50 "flow_id",
51 "upstream_execution_id",
52 "downstream_execution_id",
53 "dependency_kind",
54 "satisfaction_condition",
55 "data_passing_ref",
56 "edge_state",
57 "created_at",
58 "created_by",
59];
60
61pub fn build_edge_snapshot(
77 flow_id: &FlowId,
78 edge_id: &EdgeId,
79 raw: &HashMap<String, String>,
80) -> Result<EdgeSnapshot, EngineError> {
81 for k in raw.keys() {
85 if !EDGE_KNOWN_FIELDS.contains(&k.as_str()) {
86 return Err(corruption(
87 "edge_snapshot: edge_hash",
88 None,
89 &format!("has unexpected field '{k}' (protocol drift or corruption?)"),
90 ));
91 }
92 }
93
94 let stored_edge_id_str = required(raw, "edge_snapshot: edge_hash", "edge_id")?;
96 if stored_edge_id_str != edge_id.to_string() {
97 return Err(corruption(
98 "edge_snapshot: edge_hash",
99 Some("edge_id"),
100 &format!(
101 "'{stored_edge_id_str}' does not match requested edge_id \
102 '{edge_id}' (key corruption or wrong-key read?)"
103 ),
104 ));
105 }
106
107 let stored_flow_id_str = required(raw, "edge_snapshot: edge_hash", "flow_id")?;
109 if stored_flow_id_str != flow_id.to_string() {
110 return Err(corruption(
111 "edge_snapshot: edge_hash",
112 Some("flow_id"),
113 &format!(
114 "'{stored_flow_id_str}' does not match requested flow_id \
115 '{flow_id}' (key corruption or wrong-key read?)"
116 ),
117 ));
118 }
119
120 let upstream_execution_id = parse_eid(raw, "upstream_execution_id")?;
121 let downstream_execution_id = parse_eid(raw, "downstream_execution_id")?;
122
123 let dependency_kind = required(raw, "edge_snapshot: edge_hash", "dependency_kind")?;
124 let satisfaction_condition =
125 required(raw, "edge_snapshot: edge_hash", "satisfaction_condition")?;
126
127 let data_passing_ref = raw
130 .get("data_passing_ref")
131 .filter(|s| !s.is_empty())
132 .cloned();
133
134 let edge_state = required(raw, "edge_snapshot: edge_hash", "edge_state")?;
135
136 let created_at = parse_ts_required(raw, "edge_snapshot: edge_hash", "created_at")?;
137 let created_by = required(raw, "edge_snapshot: edge_hash", "created_by")?;
138
139 Ok(EdgeSnapshot::new(
140 edge_id.clone(),
141 flow_id.clone(),
142 upstream_execution_id,
143 downstream_execution_id,
144 dependency_kind,
145 satisfaction_condition,
146 data_passing_ref,
147 edge_state,
148 created_at,
149 created_by,
150 ))
151}
152
153fn corruption(context: &str, field: Option<&str>, message: &str) -> EngineError {
157 let detail = match field {
158 Some(f) => format!("{context}: {f}: {message}"),
159 None => format!("{context}: {message}"),
160 };
161 EngineError::Validation {
162 kind: ValidationKind::Corruption,
163 detail,
164 }
165}
166
167fn required(
170 raw: &HashMap<String, String>,
171 context: &str,
172 field: &str,
173) -> Result<String, EngineError> {
174 raw.get(field)
175 .filter(|s| !s.is_empty())
176 .cloned()
177 .ok_or_else(|| {
178 corruption(
179 context,
180 Some(field),
181 "is missing or empty (key corruption?)",
182 )
183 })
184}
185
186fn parse_ts_required(
188 raw: &HashMap<String, String>,
189 context: &str,
190 field: &str,
191) -> Result<TimestampMs, EngineError> {
192 let s = required(raw, context, field)?;
193 let ms: i64 = s.parse().map_err(|e| {
194 corruption(
195 context,
196 Some(field),
197 &format!("is not a valid ms timestamp ('{s}'): {e}"),
198 )
199 })?;
200 Ok(TimestampMs::from_millis(ms))
201}
202
203fn parse_eid(raw: &HashMap<String, String>, field: &str) -> Result<ExecutionId, EngineError> {
205 let s = required(raw, "edge_snapshot: edge_hash", field)?;
206 ExecutionId::parse(&s).map_err(|e| {
207 corruption(
208 "edge_snapshot: edge_hash",
209 Some(field),
210 &format!("'{s}' is not a valid ExecutionId (key corruption?): {e}"),
211 )
212 })
213}
214
215pub fn build_execution_snapshot(
229 execution_id: ExecutionId,
230 core: &HashMap<String, String>,
231 tags_raw: HashMap<String, String>,
232) -> Result<Option<ExecutionSnapshot>, EngineError> {
233 let ctx = "describe_execution: exec_core";
234
235 let public_state = parse_public_state(opt_str(core, "public_state").unwrap_or(""))?;
236
237 let lane_id = LaneId::try_new(opt_str(core, "lane_id").unwrap_or("")).map_err(|e| {
243 corruption(
244 ctx,
245 Some("lane_id"),
246 &format!("fails LaneId validation (key corruption?): {e}"),
247 )
248 })?;
249
250 let namespace_str = opt_str(core, "namespace").unwrap_or("").to_owned();
251 let namespace = Namespace::new(namespace_str);
252
253 let flow_id = opt_str(core, "flow_id")
254 .filter(|s| !s.is_empty())
255 .map(|s| {
256 FlowId::parse(s).map_err(|e| {
257 corruption(
258 ctx,
259 Some("flow_id"),
260 &format!("is not a valid UUID (key corruption?): {e}"),
261 )
262 })
263 })
264 .transpose()?;
265
266 let blocking_reason = opt_str(core, "blocking_reason")
267 .filter(|s| !s.is_empty())
268 .map(str::to_owned);
269 let blocking_detail = opt_str(core, "blocking_detail")
270 .filter(|s| !s.is_empty())
271 .map(str::to_owned);
272
273 let created_at = parse_ts(core, ctx, "created_at")?.ok_or_else(|| {
278 corruption(
279 ctx,
280 Some("created_at"),
281 "is missing or empty (key corruption?)",
282 )
283 })?;
284 let last_mutation_at = parse_ts(core, ctx, "last_mutation_at")?.ok_or_else(|| {
285 corruption(
286 ctx,
287 Some("last_mutation_at"),
288 "is missing or empty (key corruption?)",
289 )
290 })?;
291
292 let total_attempt_count: u32 =
293 parse_u32_strict(core, ctx, "total_attempt_count")?.unwrap_or(0);
294
295 let current_attempt = build_attempt_summary(core)?;
296 let current_lease = build_lease_summary(core)?;
297
298 let current_waitpoint = opt_str(core, "current_waitpoint_id")
299 .filter(|s| !s.is_empty())
300 .map(|s| {
301 WaitpointId::parse(s).map_err(|e| {
302 corruption(
303 ctx,
304 Some("current_waitpoint_id"),
305 &format!("is not a valid UUID (key corruption?): {e}"),
306 )
307 })
308 })
309 .transpose()?;
310
311 let tags: BTreeMap<String, String> = tags_raw.into_iter().collect();
312
313 Ok(Some(ExecutionSnapshot::new(
314 execution_id,
315 flow_id,
316 lane_id,
317 namespace,
318 public_state,
319 blocking_reason,
320 blocking_detail,
321 current_attempt,
322 current_lease,
323 current_waitpoint,
324 created_at,
325 last_mutation_at,
326 total_attempt_count,
327 tags,
328 )))
329}
330
331fn opt_str<'a>(map: &'a HashMap<String, String>, field: &str) -> Option<&'a str> {
332 map.get(field).map(String::as_str)
333}
334
335fn parse_ts(
340 map: &HashMap<String, String>,
341 context: &str,
342 field: &str,
343) -> Result<Option<TimestampMs>, EngineError> {
344 match opt_str(map, field).filter(|s| !s.is_empty()) {
345 None => Ok(None),
346 Some(raw) => {
347 let ms: i64 = raw.parse().map_err(|e| {
348 corruption(
349 context,
350 Some(field),
351 &format!("is not a valid ms timestamp ('{raw}'): {e}"),
352 )
353 })?;
354 Ok(Some(TimestampMs::from_millis(ms)))
355 }
356 }
357}
358
359fn parse_u32_strict(
363 map: &HashMap<String, String>,
364 context: &str,
365 field: &str,
366) -> Result<Option<u32>, EngineError> {
367 match opt_str(map, field).filter(|s| !s.is_empty()) {
368 None => Ok(None),
369 Some(raw) => Ok(Some(raw.parse().map_err(|e| {
370 corruption(
371 context,
372 Some(field),
373 &format!("is not a valid u32 ('{raw}'): {e}"),
374 )
375 })?)),
376 }
377}
378
379fn parse_u64_strict(
381 map: &HashMap<String, String>,
382 context: &str,
383 field: &str,
384) -> Result<Option<u64>, EngineError> {
385 match opt_str(map, field).filter(|s| !s.is_empty()) {
386 None => Ok(None),
387 Some(raw) => Ok(Some(raw.parse().map_err(|e| {
388 corruption(
389 context,
390 Some(field),
391 &format!("is not a valid u64 ('{raw}'): {e}"),
392 )
393 })?)),
394 }
395}
396
397fn parse_public_state(raw: &str) -> Result<PublicState, EngineError> {
398 let quoted = format!("\"{raw}\"");
401 serde_json::from_str("ed).map_err(|e| {
402 corruption(
403 "describe_execution: exec_core",
404 Some("public_state"),
405 &format!("'{raw}' is not a known public state: {e}"),
406 )
407 })
408}
409
410fn build_attempt_summary(
411 core: &HashMap<String, String>,
412) -> Result<Option<AttemptSummary>, EngineError> {
413 let ctx = "describe_execution: exec_core";
414 let attempt_id_str = match opt_str(core, "current_attempt_id").filter(|s| !s.is_empty()) {
415 None => return Ok(None),
416 Some(s) => s,
417 };
418 let attempt_id = AttemptId::parse(attempt_id_str).map_err(|e| {
419 corruption(
420 ctx,
421 Some("current_attempt_id"),
422 &format!("is not a valid UUID: {e}"),
423 )
424 })?;
425 let attempt_index = parse_u32_strict(core, ctx, "current_attempt_index")?.ok_or_else(|| {
430 corruption(
431 ctx,
432 Some("current_attempt_index"),
433 "is missing while current_attempt_id is set (key corruption?)",
434 )
435 })?;
436 Ok(Some(AttemptSummary::new(
437 attempt_id,
438 AttemptIndex::new(attempt_index),
439 )))
440}
441
442fn build_lease_summary(
443 core: &HashMap<String, String>,
444) -> Result<Option<LeaseSummary>, EngineError> {
445 let ctx = "describe_execution: exec_core";
446 let wid_str = match opt_str(core, "current_worker_instance_id").filter(|s| !s.is_empty()) {
450 None => return Ok(None),
451 Some(s) => s,
452 };
453 let expires_at = match parse_ts(core, ctx, "lease_expires_at")? {
454 None => return Ok(None),
455 Some(ts) => ts,
456 };
457 let epoch = parse_u64_strict(core, ctx, "current_lease_epoch")?.ok_or_else(|| {
461 corruption(
462 ctx,
463 Some("current_lease_epoch"),
464 "is missing while current_worker_instance_id is set (key corruption?)",
465 )
466 })?;
467 let lease_id_str = opt_str(core, "current_lease_id")
475 .filter(|s| !s.is_empty())
476 .ok_or_else(|| {
477 corruption(
478 ctx,
479 Some("current_lease_id"),
480 "is missing while current_worker_instance_id is set (key corruption?)",
481 )
482 })?;
483 let lease_id = LeaseId::parse(lease_id_str).map_err(|e| {
484 corruption(
485 ctx,
486 Some("current_lease_id"),
487 &format!("is not a valid UUID: {e}"),
488 )
489 })?;
490 let attempt_index =
491 parse_u32_strict(core, ctx, "current_attempt_index")?.ok_or_else(|| {
492 corruption(
493 ctx,
494 Some("current_attempt_index"),
495 "is missing while current_worker_instance_id is set (key corruption?)",
496 )
497 })?;
498 let last_heartbeat_at = parse_ts(core, ctx, "lease_last_renewed_at")?;
502
503 let mut summary = LeaseSummary::new(
504 LeaseEpoch::new(epoch),
505 WorkerInstanceId::new(wid_str.to_owned()),
506 expires_at,
507 )
508 .with_lease_id(lease_id)
509 .with_attempt_index(AttemptIndex::new(attempt_index));
510 if let Some(ts) = last_heartbeat_at {
511 summary = summary.with_last_heartbeat_at(ts);
512 }
513 Ok(Some(summary))
514}
515
516pub const FLOW_CORE_KNOWN_FIELDS: &[&str] = &[
526 "flow_id",
527 "flow_kind",
528 "namespace",
529 "public_flow_state",
530 "graph_revision",
531 "node_count",
532 "edge_count",
533 "created_at",
534 "last_mutation_at",
535 "cancelled_at",
536 "cancel_reason",
537 "cancellation_policy",
538];
539
540pub fn build_flow_snapshot(
547 flow_id: FlowId,
548 raw: &HashMap<String, String>,
549 edge_groups: Vec<crate::contracts::EdgeGroupSnapshot>,
550) -> Result<FlowSnapshot, EngineError> {
551 let ctx = "describe_flow: flow_core";
552
553 let stored_flow_id_str = opt_str(raw, "flow_id")
555 .filter(|s| !s.is_empty())
556 .ok_or_else(|| corruption(ctx, Some("flow_id"), "is missing or empty (key corruption?)"))?;
557 if stored_flow_id_str != flow_id.to_string() {
558 return Err(corruption(
559 ctx,
560 Some("flow_id"),
561 &format!(
562 "'{stored_flow_id_str}' does not match requested flow_id \
563 '{flow_id}' (key corruption or wrong-key read?)"
564 ),
565 ));
566 }
567
568 let namespace_str = opt_str(raw, "namespace")
569 .filter(|s| !s.is_empty())
570 .ok_or_else(|| {
571 corruption(ctx, Some("namespace"), "is missing or empty (key corruption?)")
572 })?;
573 let namespace = Namespace::new(namespace_str.to_owned());
574
575 let flow_kind = opt_str(raw, "flow_kind")
576 .filter(|s| !s.is_empty())
577 .ok_or_else(|| {
578 corruption(ctx, Some("flow_kind"), "is missing or empty (key corruption?)")
579 })?
580 .to_owned();
581
582 let public_flow_state = opt_str(raw, "public_flow_state")
583 .filter(|s| !s.is_empty())
584 .ok_or_else(|| {
585 corruption(
586 ctx,
587 Some("public_flow_state"),
588 "is missing or empty (key corruption?)",
589 )
590 })?
591 .to_owned();
592
593 let graph_revision = parse_u64_strict(raw, ctx, "graph_revision")?
594 .ok_or_else(|| corruption(ctx, Some("graph_revision"), "is missing (key corruption?)"))?;
595 let node_count = parse_u32_strict(raw, ctx, "node_count")?
596 .ok_or_else(|| corruption(ctx, Some("node_count"), "is missing (key corruption?)"))?;
597 let edge_count = parse_u32_strict(raw, ctx, "edge_count")?
598 .ok_or_else(|| corruption(ctx, Some("edge_count"), "is missing (key corruption?)"))?;
599
600 let created_at = parse_ts(raw, ctx, "created_at")?.ok_or_else(|| {
601 corruption(
602 ctx,
603 Some("created_at"),
604 "is missing or empty (key corruption?)",
605 )
606 })?;
607 let last_mutation_at = parse_ts(raw, ctx, "last_mutation_at")?.ok_or_else(|| {
608 corruption(
609 ctx,
610 Some("last_mutation_at"),
611 "is missing or empty (key corruption?)",
612 )
613 })?;
614
615 let cancelled_at = parse_ts(raw, ctx, "cancelled_at")?;
616 let cancel_reason = opt_str(raw, "cancel_reason")
617 .filter(|s| !s.is_empty())
618 .map(str::to_owned);
619 let cancellation_policy = opt_str(raw, "cancellation_policy")
620 .filter(|s| !s.is_empty())
621 .map(str::to_owned);
622
623 let mut tags: BTreeMap<String, String> = BTreeMap::new();
626 for (k, v) in raw {
627 if FLOW_CORE_KNOWN_FIELDS.contains(&k.as_str()) {
628 continue;
629 }
630 if is_namespaced_tag_key(k) {
631 tags.insert(k.clone(), v.clone());
632 } else {
633 return Err(corruption(
634 ctx,
635 None,
636 &format!(
637 "has unexpected field '{k}' — not an FF field and not a namespaced \
638 tag (lowercase-alphanumeric-prefix + '.')"
639 ),
640 ));
641 }
642 }
643
644 Ok(FlowSnapshot::new(
645 flow_id,
646 flow_kind,
647 namespace,
648 public_flow_state,
649 graph_revision,
650 node_count,
651 edge_count,
652 created_at,
653 last_mutation_at,
654 cancelled_at,
655 cancel_reason,
656 cancellation_policy,
657 tags,
658 edge_groups,
659 ))
660}
661
662pub(crate) fn is_namespaced_tag_key(k: &str) -> bool {
666 let mut chars = k.chars();
667 let Some(first) = chars.next() else {
668 return false;
669 };
670 if !first.is_ascii_lowercase() {
671 return false;
672 }
673 let mut saw_dot = false;
674 for c in chars {
675 if c == '.' {
676 saw_dot = true;
677 break;
678 }
679 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
680 return false;
681 }
682 }
683 saw_dot
684}
685
686#[cfg(test)]
687mod tests {
688 use super::*;
689 use crate::partition::PartitionConfig;
690
691 fn fid() -> FlowId {
692 FlowId::new()
693 }
694
695 fn eids_for_flow(f: &FlowId) -> (ExecutionId, ExecutionId) {
696 let cfg = PartitionConfig::default();
697 (
698 ExecutionId::for_flow(f, &cfg),
699 ExecutionId::for_flow(f, &cfg),
700 )
701 }
702
703 fn minimal_edge_hash(
704 flow: &FlowId,
705 edge: &EdgeId,
706 up: &ExecutionId,
707 down: &ExecutionId,
708 ) -> HashMap<String, String> {
709 let mut m = HashMap::new();
710 m.insert("edge_id".into(), edge.to_string());
711 m.insert("flow_id".into(), flow.to_string());
712 m.insert("upstream_execution_id".into(), up.to_string());
713 m.insert("downstream_execution_id".into(), down.to_string());
714 m.insert("dependency_kind".into(), "success_only".into());
715 m.insert("satisfaction_condition".into(), "all_required".into());
716 m.insert("data_passing_ref".into(), String::new());
717 m.insert("edge_state".into(), "pending".into());
718 m.insert("created_at".into(), "1234".into());
719 m.insert("created_by".into(), "engine".into());
720 m
721 }
722
723 #[test]
724 fn round_trips_all_fields() {
725 let f = fid();
726 let edge = EdgeId::new();
727 let (up, down) = eids_for_flow(&f);
728 let raw = minimal_edge_hash(&f, &edge, &up, &down);
729 let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
730 assert_eq!(snap.edge_id, edge);
731 assert_eq!(snap.flow_id, f);
732 assert_eq!(snap.upstream_execution_id, up);
733 assert_eq!(snap.downstream_execution_id, down);
734 assert_eq!(snap.dependency_kind, "success_only");
735 assert_eq!(snap.satisfaction_condition, "all_required");
736 assert!(snap.data_passing_ref.is_none());
737 assert_eq!(snap.edge_state, "pending");
738 assert_eq!(snap.created_at.0, 1234);
739 assert_eq!(snap.created_by, "engine");
740 }
741
742 #[test]
743 fn data_passing_ref_round_trips_when_set() {
744 let f = fid();
745 let edge = EdgeId::new();
746 let (up, down) = eids_for_flow(&f);
747 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
748 raw.insert("data_passing_ref".into(), "ref://blob-42".into());
749 let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
750 assert_eq!(snap.data_passing_ref.as_deref(), Some("ref://blob-42"));
751 }
752
753 fn expect_corruption(err: EngineError) -> String {
754 match err {
755 EngineError::Validation {
756 kind: ValidationKind::Corruption,
757 detail,
758 } => detail,
759 other => panic!("expected Validation::Corruption, got {other:?}"),
760 }
761 }
762
763 #[test]
764 fn unknown_field_fails_loud() {
765 let f = fid();
766 let edge = EdgeId::new();
767 let (up, down) = eids_for_flow(&f);
768 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
769 raw.insert("bogus_future_field".into(), "v".into());
770 let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
771 assert!(detail.contains("bogus_future_field"), "{detail}");
772 }
773
774 #[test]
775 fn flow_id_mismatch_fails_loud() {
776 let f = fid();
777 let other = fid();
778 let edge = EdgeId::new();
779 let (up, down) = eids_for_flow(&f);
780 let raw = minimal_edge_hash(&other, &edge, &up, &down);
781 let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
782 assert!(detail.contains("flow_id"), "{detail}");
783 assert!(detail.contains("does not match"), "{detail}");
784 }
785
786 #[test]
787 fn edge_id_mismatch_fails_loud() {
788 let f = fid();
789 let edge = EdgeId::new();
790 let other_edge = EdgeId::new();
791 let (up, down) = eids_for_flow(&f);
792 let raw = minimal_edge_hash(&f, &other_edge, &up, &down);
793 let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
794 assert!(detail.contains("edge_id"), "{detail}");
795 assert!(detail.contains("does not match"), "{detail}");
796 }
797
798 #[test]
799 fn missing_required_fields_fail_loud() {
800 for want in [
801 "edge_id",
802 "flow_id",
803 "upstream_execution_id",
804 "downstream_execution_id",
805 "dependency_kind",
806 "satisfaction_condition",
807 "edge_state",
808 "created_at",
809 "created_by",
810 ] {
811 let f = fid();
812 let edge = EdgeId::new();
813 let (up, down) = eids_for_flow(&f);
814 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
815 raw.remove(want);
816 let err = build_edge_snapshot(&f, &edge, &raw)
817 .err()
818 .unwrap_or_else(|| panic!("missing {want} should fail"));
819 let detail = expect_corruption(err);
820 assert!(detail.contains(want), "detail for {want}: {detail}");
821 }
822 }
823
824 #[test]
825 fn malformed_created_at_fails_loud() {
826 let f = fid();
827 let edge = EdgeId::new();
828 let (up, down) = eids_for_flow(&f);
829 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
830 raw.insert("created_at".into(), "not-a-number".into());
831 let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
832 assert!(detail.contains("created_at"), "{detail}");
833 }
834
835 #[test]
836 fn malformed_upstream_eid_fails_loud() {
837 let f = fid();
838 let edge = EdgeId::new();
839 let (up, down) = eids_for_flow(&f);
840 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
841 raw.insert("upstream_execution_id".into(), "not-an-execution-id".into());
842 let detail = expect_corruption(build_edge_snapshot(&f, &edge, &raw).unwrap_err());
843 assert!(detail.contains("upstream_execution_id"), "{detail}");
844 }
845
846 fn eid() -> ExecutionId {
849 let config = PartitionConfig::default();
850 ExecutionId::for_flow(&FlowId::new(), &config)
851 }
852
853 fn minimal_core(public_state: &str) -> HashMap<String, String> {
854 let mut m = HashMap::new();
855 m.insert("public_state".to_owned(), public_state.to_owned());
856 m.insert("lane_id".to_owned(), "default".to_owned());
857 m.insert("namespace".to_owned(), "ns".to_owned());
858 m.insert("created_at".to_owned(), "1000".to_owned());
859 m.insert("last_mutation_at".to_owned(), "2000".to_owned());
860 m.insert("total_attempt_count".to_owned(), "0".to_owned());
861 m
862 }
863
864 fn expect_corruption_field<F>(err: EngineError, pred: F)
865 where
866 F: FnOnce(&str) -> bool,
867 {
868 let detail = expect_corruption(err);
869 assert!(pred(&detail), "detail did not match predicate: {detail}");
870 }
871
872 #[test]
873 fn waiting_exec_no_attempt_no_lease_no_tags() {
874 let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), HashMap::new())
875 .unwrap()
876 .expect("should build");
877 assert_eq!(snap.public_state, PublicState::Waiting);
878 assert!(snap.current_attempt.is_none());
879 assert!(snap.current_lease.is_none());
880 assert!(snap.current_waitpoint.is_none());
881 assert_eq!(snap.tags.len(), 0);
882 assert_eq!(snap.created_at.0, 1000);
883 assert_eq!(snap.last_mutation_at.0, 2000);
884 assert!(snap.flow_id.is_none());
885 assert!(snap.blocking_reason.is_none());
886 }
887
888 #[test]
889 fn tags_flow_through_sorted() {
890 let mut tags = HashMap::new();
891 tags.insert("cairn.task_id".to_owned(), "t-1".to_owned());
892 tags.insert("cairn.project".to_owned(), "proj".to_owned());
893 let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), tags)
894 .unwrap()
895 .unwrap();
896 let keys: Vec<_> = snap.tags.keys().cloned().collect();
897 assert_eq!(
898 keys,
899 vec!["cairn.project".to_owned(), "cairn.task_id".to_owned()]
900 );
901 }
902
903 #[test]
904 fn invalid_public_state_fails_loud() {
905 let err =
906 build_execution_snapshot(eid(), &minimal_core("bogus"), HashMap::new()).unwrap_err();
907 expect_corruption_field(err, |d| d.contains("public_state"));
908 }
909
910 #[test]
911 fn invalid_lane_id_fails_loud() {
912 let mut core = minimal_core("waiting");
913 core.insert("lane_id".to_owned(), "lane\nbroken".to_owned());
914 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
915 expect_corruption_field(err, |d| d.contains("lane_id"));
916 }
917
918 #[test]
919 fn missing_required_timestamps_fail_loud() {
920 for want in ["created_at", "last_mutation_at"] {
921 let mut core = minimal_core("waiting");
922 core.remove(want);
923 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
924 expect_corruption_field(err, |d| d.contains(want));
925 }
926 }
927
928 #[test]
929 fn malformed_total_attempt_count_fails_loud() {
930 let mut core = minimal_core("waiting");
931 core.insert("total_attempt_count".to_owned(), "not-a-number".to_owned());
932 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
933 expect_corruption_field(err, |d| d.contains("total_attempt_count"));
934 }
935
936 #[test]
937 fn attempt_id_without_index_fails_loud() {
938 let mut core = minimal_core("active");
939 core.insert(
940 "current_attempt_id".to_owned(),
941 AttemptId::new().to_string(),
942 );
943 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
944 expect_corruption_field(err, |d| d.contains("current_attempt_index"));
945 }
946
947 #[test]
948 fn lease_without_epoch_fails_loud() {
949 let mut core = minimal_core("active");
950 core.insert(
951 "current_worker_instance_id".to_owned(),
952 "w-inst-1".to_owned(),
953 );
954 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
955 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
956 expect_corruption_field(err, |d| d.contains("current_lease_epoch"));
957 }
958
959 #[test]
960 fn lease_summary_requires_both_wid_and_expires_at() {
961 let mut core = minimal_core("active");
962 core.insert(
963 "current_worker_instance_id".to_owned(),
964 "w-inst-1".to_owned(),
965 );
966 let snap = build_execution_snapshot(eid(), &core, HashMap::new())
967 .unwrap()
968 .unwrap();
969 assert!(snap.current_lease.is_none());
970
971 let lid = LeaseId::new();
972 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
973 core.insert("current_lease_epoch".to_owned(), "3".to_owned());
974 core.insert("current_lease_id".to_owned(), lid.to_string());
975 core.insert("current_attempt_index".to_owned(), "2".to_owned());
976 core.insert("lease_last_renewed_at".to_owned(), "8500".to_owned());
977 let snap = build_execution_snapshot(eid(), &core, HashMap::new())
978 .unwrap()
979 .unwrap();
980 let lease = snap.current_lease.expect("lease present");
981 assert_eq!(lease.lease_epoch, LeaseEpoch::new(3));
982 assert_eq!(lease.expires_at.0, 9000);
983 assert_eq!(lease.worker_instance_id.as_str(), "w-inst-1");
984 assert_eq!(lease.lease_id, lid);
986 assert_eq!(lease.attempt_index, AttemptIndex::new(2));
987 assert_eq!(lease.last_heartbeat_at.map(|t| t.0), Some(8500));
988 }
989
990 #[test]
993 fn lease_summary_without_heartbeat_returns_none() {
994 let mut core = minimal_core("active");
995 core.insert(
996 "current_worker_instance_id".to_owned(),
997 "w-inst-1".to_owned(),
998 );
999 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1000 core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1001 core.insert("current_lease_id".to_owned(), LeaseId::new().to_string());
1002 core.insert("current_attempt_index".to_owned(), "1".to_owned());
1003 let snap = build_execution_snapshot(eid(), &core, HashMap::new())
1004 .unwrap()
1005 .unwrap();
1006 let lease = snap.current_lease.expect("lease present");
1007 assert!(lease.last_heartbeat_at.is_none());
1008 }
1009
1010 #[test]
1013 fn lease_without_lease_id_fails_loud() {
1014 let mut core = minimal_core("active");
1015 core.insert(
1016 "current_worker_instance_id".to_owned(),
1017 "w-inst-1".to_owned(),
1018 );
1019 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1020 core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1021 core.insert("current_attempt_index".to_owned(), "1".to_owned());
1022 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1023 expect_corruption_field(err, |d| d.contains("current_lease_id"));
1024 }
1025
1026 #[test]
1027 fn lease_with_bad_lease_id_fails_loud() {
1028 let mut core = minimal_core("active");
1029 core.insert(
1030 "current_worker_instance_id".to_owned(),
1031 "w-inst-1".to_owned(),
1032 );
1033 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1034 core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1035 core.insert("current_lease_id".to_owned(), "not-a-uuid".to_owned());
1036 core.insert("current_attempt_index".to_owned(), "1".to_owned());
1037 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1038 expect_corruption_field(err, |d| d.contains("current_lease_id"));
1039 }
1040
1041 #[test]
1044 fn lease_without_attempt_index_fails_loud() {
1045 let mut core = minimal_core("active");
1046 core.insert(
1047 "current_worker_instance_id".to_owned(),
1048 "w-inst-1".to_owned(),
1049 );
1050 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1051 core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1052 core.insert("current_lease_id".to_owned(), LeaseId::new().to_string());
1053 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1054 expect_corruption_field(err, |d| d.contains("current_attempt_index"));
1055 }
1056
1057 fn minimal_flow_core(id: &FlowId, state: &str) -> HashMap<String, String> {
1060 let mut m = HashMap::new();
1061 m.insert("flow_id".to_owned(), id.to_string());
1062 m.insert("flow_kind".to_owned(), "dag".to_owned());
1063 m.insert("namespace".to_owned(), "ns".to_owned());
1064 m.insert("public_flow_state".to_owned(), state.to_owned());
1065 m.insert("graph_revision".to_owned(), "0".to_owned());
1066 m.insert("node_count".to_owned(), "0".to_owned());
1067 m.insert("edge_count".to_owned(), "0".to_owned());
1068 m.insert("created_at".to_owned(), "1000".to_owned());
1069 m.insert("last_mutation_at".to_owned(), "1000".to_owned());
1070 m
1071 }
1072
1073 #[test]
1074 fn open_flow_round_trips() {
1075 let f = fid();
1076 let snap = build_flow_snapshot(f.clone(), &minimal_flow_core(&f, "open"), Vec::new()).unwrap();
1077 assert_eq!(snap.flow_id, f);
1078 assert_eq!(snap.flow_kind, "dag");
1079 assert_eq!(snap.namespace.as_str(), "ns");
1080 assert_eq!(snap.public_flow_state, "open");
1081 assert_eq!(snap.graph_revision, 0);
1082 assert_eq!(snap.node_count, 0);
1083 assert_eq!(snap.edge_count, 0);
1084 assert_eq!(snap.created_at.0, 1000);
1085 assert_eq!(snap.last_mutation_at.0, 1000);
1086 assert!(snap.cancelled_at.is_none());
1087 assert!(snap.cancel_reason.is_none());
1088 assert!(snap.cancellation_policy.is_none());
1089 assert!(snap.tags.is_empty());
1090 }
1091
1092 #[test]
1093 fn cancelled_flow_surfaces_cancel_fields() {
1094 let f = fid();
1095 let mut core = minimal_flow_core(&f, "cancelled");
1096 core.insert("cancelled_at".to_owned(), "2000".to_owned());
1097 core.insert("cancel_reason".to_owned(), "operator".to_owned());
1098 core.insert("cancellation_policy".to_owned(), "cancel_all".to_owned());
1099 let snap = build_flow_snapshot(f, &core, Vec::new()).unwrap();
1100 assert_eq!(snap.public_flow_state, "cancelled");
1101 assert_eq!(snap.cancelled_at.unwrap().0, 2000);
1102 assert_eq!(snap.cancel_reason.as_deref(), Some("operator"));
1103 assert_eq!(snap.cancellation_policy.as_deref(), Some("cancel_all"));
1104 }
1105
1106 #[test]
1107 fn namespaced_tags_routed_to_tags_map() {
1108 let f = fid();
1109 let mut core = minimal_flow_core(&f, "open");
1110 core.insert("cairn.task_id".to_owned(), "t-1".to_owned());
1111 core.insert("cairn.project".to_owned(), "proj".to_owned());
1112 core.insert("operator.label".to_owned(), "v".to_owned());
1113 let snap = build_flow_snapshot(f, &core, Vec::new()).unwrap();
1114 assert_eq!(snap.tags.len(), 3);
1115 let keys: Vec<_> = snap.tags.keys().cloned().collect();
1116 assert_eq!(
1117 keys,
1118 vec![
1119 "cairn.project".to_owned(),
1120 "cairn.task_id".to_owned(),
1121 "operator.label".to_owned()
1122 ]
1123 );
1124 }
1125
1126 #[test]
1127 fn unknown_flat_field_fails_loud() {
1128 let f = fid();
1129 let mut core = minimal_flow_core(&f, "open");
1130 core.insert("bogus_future_field".to_owned(), "v".to_owned());
1131 let err = build_flow_snapshot(f, &core, Vec::new()).unwrap_err();
1132 expect_corruption_field(err, |d| d.contains("bogus_future_field"));
1133 }
1134
1135 #[test]
1136 fn missing_required_flow_fields_fail_loud() {
1137 for want in [
1138 "flow_id",
1139 "namespace",
1140 "flow_kind",
1141 "public_flow_state",
1142 "graph_revision",
1143 "node_count",
1144 "edge_count",
1145 "created_at",
1146 "last_mutation_at",
1147 ] {
1148 let f = fid();
1149 let mut core = minimal_flow_core(&f, "open");
1150 core.remove(want);
1151 let err = build_flow_snapshot(f, &core, Vec::new()).err().unwrap_or_else(|| {
1152 panic!("field {want} should fail but build_flow_snapshot returned Ok")
1153 });
1154 expect_corruption_field(err, |d| d.contains(want));
1155 }
1156 }
1157
1158 #[test]
1159 fn empty_required_strings_fail_loud() {
1160 for want in ["flow_id", "namespace", "flow_kind", "public_flow_state"] {
1161 let f = fid();
1162 let mut core = minimal_flow_core(&f, "open");
1163 core.insert(want.to_owned(), String::new());
1164 let err = build_flow_snapshot(f, &core, Vec::new()).err().unwrap_or_else(|| {
1165 panic!("empty {want} should fail but build_flow_snapshot returned Ok")
1166 });
1167 expect_corruption_field(err, |d| d.contains(want));
1168 }
1169 }
1170
1171 #[test]
1172 fn flow_snapshot_flow_id_mismatch_fails_loud() {
1173 let requested = fid();
1174 let other = fid();
1175 let core = minimal_flow_core(&other, "open");
1176 let err = build_flow_snapshot(requested, &core, Vec::new()).unwrap_err();
1177 expect_corruption_field(err, |d| d.contains("flow_id") && d.contains("does not match"));
1178 }
1179
1180 #[test]
1181 fn malformed_counter_fails_loud() {
1182 let f = fid();
1183 let mut core = minimal_flow_core(&f, "open");
1184 core.insert("graph_revision".to_owned(), "not-a-number".to_owned());
1185 let err = build_flow_snapshot(f, &core, Vec::new()).unwrap_err();
1186 expect_corruption_field(err, |d| d.contains("graph_revision"));
1187 }
1188
1189 #[test]
1190 fn namespaced_tag_matcher_boundaries() {
1191 assert!(is_namespaced_tag_key("cairn.task_id"));
1192 assert!(is_namespaced_tag_key("a.b"));
1193 assert!(is_namespaced_tag_key("ab_12.field"));
1194 assert!(!is_namespaced_tag_key("cairn_task_id"));
1195 assert!(!is_namespaced_tag_key("Cairn.task"));
1196 assert!(!is_namespaced_tag_key("1cairn.task"));
1197 assert!(!is_namespaced_tag_key(""));
1198 assert!(!is_namespaced_tag_key(".x"));
1199 assert!(!is_namespaced_tag_key("caIrn.task"));
1200 }
1201}