1use std::collections::{BTreeMap, HashMap};
16
17use ff_core::contracts::{
18 AttemptSummary, EdgeSnapshot, ExecutionSnapshot, FlowSnapshot, LeaseSummary,
19};
20use ff_core::keys::{ExecKeyContext, FlowKeyContext};
21use ff_core::partition::{execution_partition, flow_partition};
22use ff_core::state::PublicState;
23use ff_core::types::{
24 AttemptId, AttemptIndex, EdgeId, ExecutionId, FlowId, LaneId, LeaseEpoch, Namespace,
25 TimestampMs, WaitpointId, WorkerInstanceId,
26};
27
28use crate::SdkError;
29use crate::worker::FlowFabricWorker;
30
31impl FlowFabricWorker {
32 pub async fn describe_execution(
67 &self,
68 id: &ExecutionId,
69 ) -> Result<Option<ExecutionSnapshot>, SdkError> {
70 let partition = execution_partition(id, self.partition_config());
71 let ctx = ExecKeyContext::new(&partition, id);
72 let core_key = ctx.core();
73 let tags_key = ctx.tags();
74
75 let mut pipe = self.client().pipeline();
78 let core_slot = pipe
79 .cmd::<HashMap<String, String>>("HGETALL")
80 .arg(&core_key)
81 .finish();
82 let tags_slot = pipe
83 .cmd::<HashMap<String, String>>("HGETALL")
84 .arg(&tags_key)
85 .finish();
86 pipe.execute().await.map_err(|e| SdkError::ValkeyContext {
87 source: e,
88 context: "describe_execution: pipeline HGETALL exec_core + tags".into(),
89 })?;
90
91 let core = core_slot.value().map_err(|e| SdkError::ValkeyContext {
92 source: e,
93 context: "describe_execution: decode HGETALL exec_core".into(),
94 })?;
95 if core.is_empty() {
96 return Ok(None);
97 }
98 let tags_raw = tags_slot.value().map_err(|e| SdkError::ValkeyContext {
99 source: e,
100 context: "describe_execution: decode HGETALL tags".into(),
101 })?;
102
103 build_execution_snapshot(id.clone(), &core, tags_raw)
104 }
105}
106
107fn build_execution_snapshot(
112 execution_id: ExecutionId,
113 core: &HashMap<String, String>,
114 tags_raw: HashMap<String, String>,
115) -> Result<Option<ExecutionSnapshot>, SdkError> {
116 let public_state = parse_public_state(opt_str(core, "public_state").unwrap_or(""))?;
117
118 let lane_id = LaneId::try_new(opt_str(core, "lane_id").unwrap_or("")).map_err(|e| {
124 SdkError::Config {
125 context: "describe_execution: exec_core".into(),
126 field: Some("lane_id".into()),
127 message: format!("fails LaneId validation (key corruption?): {e}"),
128 }
129 })?;
130
131 let namespace_str = opt_str(core, "namespace").unwrap_or("").to_owned();
132 let namespace = Namespace::new(namespace_str);
133
134 let flow_id = opt_str(core, "flow_id")
135 .filter(|s| !s.is_empty())
136 .map(|s| {
137 FlowId::parse(s).map_err(|e| SdkError::Config {
138 context: "describe_execution: exec_core".into(),
139 field: Some("flow_id".into()),
140 message: format!("is not a valid UUID (key corruption?): {e}"),
141 })
142 })
143 .transpose()?;
144
145 let blocking_reason = opt_str(core, "blocking_reason")
146 .filter(|s| !s.is_empty())
147 .map(str::to_owned);
148 let blocking_detail = opt_str(core, "blocking_detail")
149 .filter(|s| !s.is_empty())
150 .map(str::to_owned);
151
152 let created_at =
157 parse_ts(core, "describe_execution: exec_core", "created_at")?.ok_or_else(|| {
158 SdkError::Config {
159 context: "describe_execution: exec_core".into(),
160 field: Some("created_at".into()),
161 message: "is missing or empty (key corruption?)".into(),
162 }
163 })?;
164 let last_mutation_at = parse_ts(core, "describe_execution: exec_core", "last_mutation_at")?
165 .ok_or_else(|| SdkError::Config {
166 context: "describe_execution: exec_core".into(),
167 field: Some("last_mutation_at".into()),
168 message: "is missing or empty (key corruption?)".into(),
169 })?;
170
171 let total_attempt_count: u32 =
172 parse_u32_strict(core, "describe_execution: exec_core", "total_attempt_count")?
173 .unwrap_or(0);
174
175 let current_attempt = build_attempt_summary(core)?;
176 let current_lease = build_lease_summary(core)?;
177
178 let current_waitpoint = opt_str(core, "current_waitpoint_id")
179 .filter(|s| !s.is_empty())
180 .map(|s| {
181 WaitpointId::parse(s).map_err(|e| SdkError::Config {
182 context: "describe_execution: exec_core".into(),
183 field: Some("current_waitpoint_id".into()),
184 message: format!("is not a valid UUID (key corruption?): {e}"),
185 })
186 })
187 .transpose()?;
188
189 let tags: BTreeMap<String, String> = tags_raw.into_iter().collect();
190
191 Ok(Some(ExecutionSnapshot::new(
192 execution_id,
193 flow_id,
194 lane_id,
195 namespace,
196 public_state,
197 blocking_reason,
198 blocking_detail,
199 current_attempt,
200 current_lease,
201 current_waitpoint,
202 created_at,
203 last_mutation_at,
204 total_attempt_count,
205 tags,
206 )))
207}
208
209fn opt_str<'a>(map: &'a HashMap<String, String>, field: &str) -> Option<&'a str> {
210 map.get(field).map(String::as_str)
211}
212
213fn parse_ts(
218 map: &HashMap<String, String>,
219 context: &str,
220 field: &str,
221) -> Result<Option<TimestampMs>, SdkError> {
222 match opt_str(map, field).filter(|s| !s.is_empty()) {
223 None => Ok(None),
224 Some(raw) => {
225 let ms: i64 = raw.parse().map_err(|e| SdkError::Config {
226 context: context.to_owned(),
227 field: Some(field.to_owned()),
228 message: format!("is not a valid ms timestamp ('{raw}'): {e}"),
229 })?;
230 Ok(Some(TimestampMs::from_millis(ms)))
231 }
232 }
233}
234
235fn parse_u32_strict(
240 map: &HashMap<String, String>,
241 context: &str,
242 field: &str,
243) -> Result<Option<u32>, SdkError> {
244 match opt_str(map, field).filter(|s| !s.is_empty()) {
245 None => Ok(None),
246 Some(raw) => Ok(Some(raw.parse().map_err(|e| SdkError::Config {
247 context: context.to_owned(),
248 field: Some(field.to_owned()),
249 message: format!("is not a valid u32 ('{raw}'): {e}"),
250 })?)),
251 }
252}
253
254fn parse_u64_strict(
256 map: &HashMap<String, String>,
257 context: &str,
258 field: &str,
259) -> Result<Option<u64>, SdkError> {
260 match opt_str(map, field).filter(|s| !s.is_empty()) {
261 None => Ok(None),
262 Some(raw) => Ok(Some(raw.parse().map_err(|e| SdkError::Config {
263 context: context.to_owned(),
264 field: Some(field.to_owned()),
265 message: format!("is not a valid u64 ('{raw}'): {e}"),
266 })?)),
267 }
268}
269
270fn parse_public_state(raw: &str) -> Result<PublicState, SdkError> {
271 let quoted = format!("\"{raw}\"");
274 serde_json::from_str("ed).map_err(|e| SdkError::Config {
275 context: "describe_execution: exec_core".into(),
276 field: Some("public_state".into()),
277 message: format!("'{raw}' is not a known public state: {e}"),
278 })
279}
280
281fn build_attempt_summary(
282 core: &HashMap<String, String>,
283) -> Result<Option<AttemptSummary>, SdkError> {
284 let attempt_id_str = match opt_str(core, "current_attempt_id").filter(|s| !s.is_empty()) {
285 None => return Ok(None),
286 Some(s) => s,
287 };
288 let attempt_id = AttemptId::parse(attempt_id_str).map_err(|e| SdkError::Config {
289 context: "describe_execution: exec_core".into(),
290 field: Some("current_attempt_id".into()),
291 message: format!("is not a valid UUID: {e}"),
292 })?;
293 let attempt_index = parse_u32_strict(
298 core,
299 "describe_execution: exec_core",
300 "current_attempt_index",
301 )?
302 .ok_or_else(|| SdkError::Config {
303 context: "describe_execution: exec_core".into(),
304 field: Some("current_attempt_index".into()),
305 message: "is missing while current_attempt_id is set (key corruption?)".into(),
306 })?;
307 Ok(Some(AttemptSummary::new(
308 attempt_id,
309 AttemptIndex::new(attempt_index),
310 )))
311}
312
313fn build_lease_summary(core: &HashMap<String, String>) -> Result<Option<LeaseSummary>, SdkError> {
314 let wid_str = match opt_str(core, "current_worker_instance_id").filter(|s| !s.is_empty()) {
318 None => return Ok(None),
319 Some(s) => s,
320 };
321 let expires_at = match parse_ts(core, "describe_execution: exec_core", "lease_expires_at")? {
322 None => return Ok(None),
323 Some(ts) => ts,
324 };
325 let epoch = parse_u64_strict(core, "describe_execution: exec_core", "current_lease_epoch")?
329 .ok_or_else(|| SdkError::Config {
330 context: "describe_execution: exec_core".into(),
331 field: Some("current_lease_epoch".into()),
332 message: "is missing while current_worker_instance_id is set (key corruption?)".into(),
333 })?;
334 Ok(Some(LeaseSummary::new(
335 LeaseEpoch::new(epoch),
336 WorkerInstanceId::new(wid_str.to_owned()),
337 expires_at,
338 )))
339}
340
341const FLOW_CORE_KNOWN_FIELDS: &[&str] = &[
351 "flow_id",
355 "flow_kind",
356 "namespace",
357 "public_flow_state",
358 "graph_revision",
359 "node_count",
360 "edge_count",
361 "created_at",
362 "last_mutation_at",
363 "cancelled_at",
364 "cancel_reason",
365 "cancellation_policy",
366];
367
368impl FlowFabricWorker {
369 pub async fn describe_flow(&self, id: &FlowId) -> Result<Option<FlowSnapshot>, SdkError> {
399 let partition = flow_partition(id, self.partition_config());
400 let ctx = FlowKeyContext::new(&partition, id);
401 let core_key = ctx.core();
402
403 let raw: HashMap<String, String> = self
404 .client()
405 .cmd("HGETALL")
406 .arg(&core_key)
407 .execute()
408 .await
409 .map_err(|e| SdkError::ValkeyContext {
410 source: e,
411 context: "describe_flow: HGETALL flow_core".into(),
412 })?;
413
414 if raw.is_empty() {
415 return Ok(None);
416 }
417
418 build_flow_snapshot(id.clone(), &raw).map(Some)
419 }
420}
421
422fn build_flow_snapshot(
427 flow_id: FlowId,
428 raw: &HashMap<String, String>,
429) -> Result<FlowSnapshot, SdkError> {
430 let stored_flow_id_str = opt_str(raw, "flow_id")
434 .filter(|s| !s.is_empty())
435 .ok_or_else(|| SdkError::Config {
436 context: "describe_flow: flow_core".into(),
437 field: Some("flow_id".into()),
438 message: "is missing or empty (key corruption?)".into(),
439 })?;
440 if stored_flow_id_str != flow_id.to_string() {
441 return Err(SdkError::Config {
442 context: "describe_flow: flow_core".into(),
443 field: Some("flow_id".into()),
444 message: format!(
445 "'{stored_flow_id_str}' does not match requested flow_id \
446 '{flow_id}' (key corruption or wrong-key read?)"
447 ),
448 });
449 }
450
451 let namespace_str = opt_str(raw, "namespace")
454 .filter(|s| !s.is_empty())
455 .ok_or_else(|| SdkError::Config {
456 context: "describe_flow: flow_core".into(),
457 field: Some("namespace".into()),
458 message: "is missing or empty (key corruption?)".into(),
459 })?;
460 let namespace = Namespace::new(namespace_str.to_owned());
461
462 let flow_kind = opt_str(raw, "flow_kind")
463 .filter(|s| !s.is_empty())
464 .ok_or_else(|| SdkError::Config {
465 context: "describe_flow: flow_core".into(),
466 field: Some("flow_kind".into()),
467 message: "is missing or empty (key corruption?)".into(),
468 })?
469 .to_owned();
470
471 let public_flow_state = opt_str(raw, "public_flow_state")
472 .filter(|s| !s.is_empty())
473 .ok_or_else(|| SdkError::Config {
474 context: "describe_flow: flow_core".into(),
475 field: Some("public_flow_state".into()),
476 message: "is missing or empty (key corruption?)".into(),
477 })?
478 .to_owned();
479
480 let graph_revision = parse_u64_strict(raw, "describe_flow: flow_core", "graph_revision")?
484 .ok_or_else(|| SdkError::Config {
485 context: "describe_flow: flow_core".into(),
486 field: Some("graph_revision".into()),
487 message: "is missing (key corruption?)".into(),
488 })?;
489 let node_count =
490 parse_u32_strict(raw, "describe_flow: flow_core", "node_count")?.ok_or_else(|| {
491 SdkError::Config {
492 context: "describe_flow: flow_core".into(),
493 field: Some("node_count".into()),
494 message: "is missing (key corruption?)".into(),
495 }
496 })?;
497 let edge_count =
498 parse_u32_strict(raw, "describe_flow: flow_core", "edge_count")?.ok_or_else(|| {
499 SdkError::Config {
500 context: "describe_flow: flow_core".into(),
501 field: Some("edge_count".into()),
502 message: "is missing (key corruption?)".into(),
503 }
504 })?;
505
506 let created_at = parse_ts(raw, "describe_flow: flow_core", "created_at")?.ok_or_else(|| {
509 SdkError::Config {
510 context: "describe_flow: flow_core".into(),
511 field: Some("created_at".into()),
512 message: "is missing or empty (key corruption?)".into(),
513 }
514 })?;
515 let last_mutation_at = parse_ts(raw, "describe_flow: flow_core", "last_mutation_at")?
516 .ok_or_else(|| SdkError::Config {
517 context: "describe_flow: flow_core".into(),
518 field: Some("last_mutation_at".into()),
519 message: "is missing or empty (key corruption?)".into(),
520 })?;
521
522 let cancelled_at = parse_ts(raw, "describe_flow: flow_core", "cancelled_at")?;
523 let cancel_reason = opt_str(raw, "cancel_reason")
524 .filter(|s| !s.is_empty())
525 .map(str::to_owned);
526 let cancellation_policy = opt_str(raw, "cancellation_policy")
527 .filter(|s| !s.is_empty())
528 .map(str::to_owned);
529
530 let mut tags: BTreeMap<String, String> = BTreeMap::new();
535 for (k, v) in raw {
536 if FLOW_CORE_KNOWN_FIELDS.contains(&k.as_str()) {
537 continue;
538 }
539 if is_namespaced_tag_key(k) {
540 tags.insert(k.clone(), v.clone());
541 } else {
542 return Err(SdkError::Config {
543 context: "describe_flow: flow_core".into(),
544 field: None,
545 message: format!(
546 "has unexpected field '{k}' — not an FF field and not a namespaced \
547 tag (lowercase-alphanumeric-prefix + '.')"
548 ),
549 });
550 }
551 }
552
553 Ok(FlowSnapshot::new(
554 flow_id,
555 flow_kind,
556 namespace,
557 public_flow_state,
558 graph_revision,
559 node_count,
560 edge_count,
561 created_at,
562 last_mutation_at,
563 cancelled_at,
564 cancel_reason,
565 cancellation_policy,
566 tags,
567 ))
568}
569
570fn is_namespaced_tag_key(k: &str) -> bool {
574 let mut chars = k.chars();
575 let Some(first) = chars.next() else {
576 return false;
577 };
578 if !first.is_ascii_lowercase() {
579 return false;
580 }
581 let mut saw_dot = false;
582 for c in chars {
583 if c == '.' {
584 saw_dot = true;
585 break;
586 }
587 if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
588 return false;
589 }
590 }
591 saw_dot
592}
593
594const EDGE_KNOWN_FIELDS: &[&str] = &[
603 "edge_id",
604 "flow_id",
605 "upstream_execution_id",
606 "downstream_execution_id",
607 "dependency_kind",
608 "satisfaction_condition",
609 "data_passing_ref",
610 "edge_state",
611 "created_at",
612 "created_by",
613];
614
615impl FlowFabricWorker {
616 pub async fn describe_edge(
635 &self,
636 flow_id: &FlowId,
637 edge_id: &EdgeId,
638 ) -> Result<Option<EdgeSnapshot>, SdkError> {
639 let partition = flow_partition(flow_id, self.partition_config());
640 let ctx = FlowKeyContext::new(&partition, flow_id);
641 let edge_key = ctx.edge(edge_id);
642
643 let raw: HashMap<String, String> = self
644 .client()
645 .cmd("HGETALL")
646 .arg(&edge_key)
647 .execute()
648 .await
649 .map_err(|e| SdkError::ValkeyContext {
650 source: e,
651 context: "describe_edge: HGETALL edge_hash".into(),
652 })?;
653
654 if raw.is_empty() {
655 return Ok(None);
656 }
657
658 build_edge_snapshot(flow_id, edge_id, &raw).map(Some)
659 }
660
661 pub async fn list_outgoing_edges(
679 &self,
680 upstream_eid: &ExecutionId,
681 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
682 let Some(flow_id) = self.resolve_flow_id(upstream_eid).await? else {
683 return Ok(Vec::new());
684 };
685 let partition = flow_partition(&flow_id, self.partition_config());
686 let ctx = FlowKeyContext::new(&partition, &flow_id);
687 self.list_edges_from_set(
688 &ctx.outgoing(upstream_eid),
689 &flow_id,
690 upstream_eid,
691 AdjacencySide::Outgoing,
692 )
693 .await
694 }
695
696 pub async fn list_incoming_edges(
699 &self,
700 downstream_eid: &ExecutionId,
701 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
702 let Some(flow_id) = self.resolve_flow_id(downstream_eid).await? else {
703 return Ok(Vec::new());
704 };
705 let partition = flow_partition(&flow_id, self.partition_config());
706 let ctx = FlowKeyContext::new(&partition, &flow_id);
707 self.list_edges_from_set(
708 &ctx.incoming(downstream_eid),
709 &flow_id,
710 downstream_eid,
711 AdjacencySide::Incoming,
712 )
713 .await
714 }
715
716 async fn resolve_flow_id(
726 &self,
727 eid: &ExecutionId,
728 ) -> Result<Option<FlowId>, SdkError> {
729 let exec_partition = execution_partition(eid, self.partition_config());
730 let ctx = ExecKeyContext::new(&exec_partition, eid);
731 let raw: Option<String> = self
732 .client()
733 .cmd("HGET")
734 .arg(ctx.core())
735 .arg("flow_id")
736 .execute()
737 .await
738 .map_err(|e| SdkError::ValkeyContext {
739 source: e,
740 context: "list_edges: HGET exec_core.flow_id".into(),
741 })?;
742 let Some(raw) = raw.filter(|s| !s.is_empty()) else {
743 return Ok(None);
744 };
745 let flow_id = FlowId::parse(&raw).map_err(|e| SdkError::Config {
746 context: "list_edges: exec_core".into(),
747 field: Some("flow_id".into()),
748 message: format!("'{raw}' is not a valid UUID (key corruption?): {e}"),
749 })?;
750 let flow_partition_index = flow_partition(&flow_id, self.partition_config()).index;
751 if exec_partition.index != flow_partition_index {
752 return Err(SdkError::Config {
753 context: "list_edges: exec_core".into(),
754 field: Some("flow_id".into()),
755 message: format!(
756 "'{flow_id}' partition {flow_partition_index} does not match \
757 execution partition {} (RFC-011 co-location violation; key corruption?)",
758 exec_partition.index
759 ),
760 });
761 }
762 Ok(Some(flow_id))
763 }
764
765 async fn list_edges_from_set(
775 &self,
776 adj_key: &str,
777 flow_id: &FlowId,
778 subject_eid: &ExecutionId,
779 side: AdjacencySide,
780 ) -> Result<Vec<EdgeSnapshot>, SdkError> {
781 let edge_id_strs: Vec<String> = self
782 .client()
783 .cmd("SMEMBERS")
784 .arg(adj_key)
785 .execute()
786 .await
787 .map_err(|e| SdkError::ValkeyContext {
788 source: e,
789 context: "list_edges: SMEMBERS adj_set".into(),
790 })?;
791 if edge_id_strs.is_empty() {
792 return Ok(Vec::new());
793 }
794
795 let mut edge_ids: Vec<EdgeId> = Vec::with_capacity(edge_id_strs.len());
798 for raw in &edge_id_strs {
799 let parsed = EdgeId::parse(raw).map_err(|e| SdkError::Config {
800 context: "list_edges: adjacency_set".into(),
801 field: Some("edge_id".into()),
802 message: format!("'{raw}' is not a valid EdgeId (key corruption?): {e}"),
803 })?;
804 edge_ids.push(parsed);
805 }
806
807 let partition = flow_partition(flow_id, self.partition_config());
808 let ctx = FlowKeyContext::new(&partition, flow_id);
809
810 let mut pipe = self.client().pipeline();
811 let slots: Vec<_> = edge_ids
812 .iter()
813 .map(|eid| {
814 pipe.cmd::<HashMap<String, String>>("HGETALL")
815 .arg(ctx.edge(eid))
816 .finish()
817 })
818 .collect();
819 pipe.execute().await.map_err(|e| SdkError::ValkeyContext {
820 source: e,
821 context: "list_edges: pipeline HGETALL edges".into(),
822 })?;
823
824 let mut out: Vec<EdgeSnapshot> = Vec::with_capacity(edge_ids.len());
825 for (edge_id, slot) in edge_ids.iter().zip(slots) {
826 let raw = slot.value().map_err(|e| SdkError::ValkeyContext {
827 source: e,
828 context: "list_edges: decode HGETALL edge_hash".into(),
829 })?;
830 if raw.is_empty() {
831 return Err(SdkError::Config {
835 context: "list_edges: adjacency_set".into(),
836 field: None,
837 message: format!(
838 "refers to edge_id '{edge_id}' but its edge_hash is absent \
839 (key corruption?)"
840 ),
841 });
842 }
843 let snap = build_edge_snapshot(flow_id, edge_id, &raw)?;
844 let endpoint = match side {
850 AdjacencySide::Outgoing => &snap.upstream_execution_id,
851 AdjacencySide::Incoming => &snap.downstream_execution_id,
852 };
853 if endpoint != subject_eid {
854 return Err(SdkError::Config {
855 context: "list_edges: adjacency_set".into(),
856 field: None,
857 message: format!(
858 "for execution '{subject_eid}' (side={side:?}) contains edge \
859 '{edge_id}' whose stored endpoint is '{endpoint}' \
860 (adjacency/edge-hash drift?)"
861 ),
862 });
863 }
864 out.push(snap);
865 }
866 Ok(out)
867 }
868}
869
870#[derive(Clone, Copy, Debug, PartialEq, Eq)]
874enum AdjacencySide {
875 Outgoing,
877 Incoming,
879}
880
881#[allow(dead_code)]
884pub(crate) fn build_edge_snapshot_public(
885 flow_id: &FlowId,
886 edge_id: &EdgeId,
887 raw: &HashMap<String, String>,
888) -> Result<EdgeSnapshot, SdkError> {
889 build_edge_snapshot(flow_id, edge_id, raw)
890}
891
892fn build_edge_snapshot(
899 flow_id: &FlowId,
900 edge_id: &EdgeId,
901 raw: &HashMap<String, String>,
902) -> Result<EdgeSnapshot, SdkError> {
903 for k in raw.keys() {
907 if !EDGE_KNOWN_FIELDS.contains(&k.as_str()) {
908 return Err(SdkError::Config {
909 context: "edge_snapshot: edge_hash".into(),
910 field: None,
911 message: format!(
912 "has unexpected field '{k}' (protocol drift or corruption?)"
913 ),
914 });
915 }
916 }
917
918 let stored_edge_id_str = opt_str(raw, "edge_id")
919 .filter(|s| !s.is_empty())
920 .ok_or_else(|| SdkError::Config {
921 context: "edge_snapshot: edge_hash".into(),
922 field: Some("edge_id".into()),
923 message: "is missing or empty (key corruption?)".into(),
924 })?;
925 if stored_edge_id_str != edge_id.to_string() {
926 return Err(SdkError::Config {
927 context: "edge_snapshot: edge_hash".into(),
928 field: Some("edge_id".into()),
929 message: format!(
930 "'{stored_edge_id_str}' does not match requested edge_id \
931 '{edge_id}' (key corruption or wrong-key read?)"
932 ),
933 });
934 }
935
936 let stored_flow_id_str = opt_str(raw, "flow_id")
937 .filter(|s| !s.is_empty())
938 .ok_or_else(|| SdkError::Config {
939 context: "edge_snapshot: edge_hash".into(),
940 field: Some("flow_id".into()),
941 message: "is missing or empty (key corruption?)".into(),
942 })?;
943 if stored_flow_id_str != flow_id.to_string() {
944 return Err(SdkError::Config {
945 context: "edge_snapshot: edge_hash".into(),
946 field: Some("flow_id".into()),
947 message: format!(
948 "'{stored_flow_id_str}' does not match requested flow_id \
949 '{flow_id}' (key corruption or wrong-key read?)"
950 ),
951 });
952 }
953
954 let upstream_execution_id = parse_eid(raw, "upstream_execution_id")?;
955 let downstream_execution_id = parse_eid(raw, "downstream_execution_id")?;
956
957 let dependency_kind = opt_str(raw, "dependency_kind")
958 .filter(|s| !s.is_empty())
959 .ok_or_else(|| SdkError::Config {
960 context: "edge_snapshot: edge_hash".into(),
961 field: Some("dependency_kind".into()),
962 message: "is missing or empty (key corruption?)".into(),
963 })?
964 .to_owned();
965
966 let satisfaction_condition = opt_str(raw, "satisfaction_condition")
967 .filter(|s| !s.is_empty())
968 .ok_or_else(|| SdkError::Config {
969 context: "edge_snapshot: edge_hash".into(),
970 field: Some("satisfaction_condition".into()),
971 message: "is missing or empty (key corruption?)".into(),
972 })?
973 .to_owned();
974
975 let data_passing_ref = opt_str(raw, "data_passing_ref")
978 .filter(|s| !s.is_empty())
979 .map(str::to_owned);
980
981 let edge_state = opt_str(raw, "edge_state")
982 .filter(|s| !s.is_empty())
983 .ok_or_else(|| SdkError::Config {
984 context: "edge_snapshot: edge_hash".into(),
985 field: Some("edge_state".into()),
986 message: "is missing or empty (key corruption?)".into(),
987 })?
988 .to_owned();
989
990 let created_at =
991 parse_ts(raw, "edge_snapshot: edge_hash", "created_at")?.ok_or_else(|| {
992 SdkError::Config {
993 context: "edge_snapshot: edge_hash".into(),
994 field: Some("created_at".into()),
995 message: "is missing or empty (key corruption?)".into(),
996 }
997 })?;
998
999 let created_by = opt_str(raw, "created_by")
1000 .filter(|s| !s.is_empty())
1001 .ok_or_else(|| SdkError::Config {
1002 context: "edge_snapshot: edge_hash".into(),
1003 field: Some("created_by".into()),
1004 message: "is missing or empty (key corruption?)".into(),
1005 })?
1006 .to_owned();
1007
1008 Ok(EdgeSnapshot::new(
1009 edge_id.clone(),
1010 flow_id.clone(),
1011 upstream_execution_id,
1012 downstream_execution_id,
1013 dependency_kind,
1014 satisfaction_condition,
1015 data_passing_ref,
1016 edge_state,
1017 created_at,
1018 created_by,
1019 ))
1020}
1021
1022fn parse_eid(raw: &HashMap<String, String>, field: &str) -> Result<ExecutionId, SdkError> {
1023 let s = opt_str(raw, field)
1024 .filter(|s| !s.is_empty())
1025 .ok_or_else(|| SdkError::Config {
1026 context: "edge_snapshot: edge_hash".into(),
1027 field: Some(field.to_owned()),
1028 message: "is missing or empty (key corruption?)".into(),
1029 })?;
1030 ExecutionId::parse(s).map_err(|e| SdkError::Config {
1031 context: "edge_snapshot: edge_hash".into(),
1032 field: Some(field.to_owned()),
1033 message: format!("'{s}' is not a valid ExecutionId (key corruption?): {e}"),
1034 })
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use super::*;
1040 use ff_core::partition::PartitionConfig;
1041 use ff_core::types::FlowId;
1042
1043 fn eid() -> ExecutionId {
1044 let config = PartitionConfig::default();
1045 ExecutionId::for_flow(&FlowId::new(), &config)
1046 }
1047
1048 fn minimal_core(public_state: &str) -> HashMap<String, String> {
1049 let mut m = HashMap::new();
1050 m.insert("public_state".to_owned(), public_state.to_owned());
1051 m.insert("lane_id".to_owned(), "default".to_owned());
1052 m.insert("namespace".to_owned(), "ns".to_owned());
1053 m.insert("created_at".to_owned(), "1000".to_owned());
1054 m.insert("last_mutation_at".to_owned(), "2000".to_owned());
1055 m.insert("total_attempt_count".to_owned(), "0".to_owned());
1056 m
1057 }
1058
1059 #[test]
1060 fn waiting_exec_no_attempt_no_lease_no_tags() {
1061 let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), HashMap::new())
1062 .unwrap()
1063 .expect("should build");
1064 assert_eq!(snap.public_state, PublicState::Waiting);
1065 assert!(snap.current_attempt.is_none());
1066 assert!(snap.current_lease.is_none());
1067 assert!(snap.current_waitpoint.is_none());
1068 assert_eq!(snap.tags.len(), 0);
1069 assert_eq!(snap.created_at.0, 1000);
1070 assert_eq!(snap.last_mutation_at.0, 2000);
1071 assert!(snap.flow_id.is_none());
1072 assert!(snap.blocking_reason.is_none());
1073 }
1074
1075 #[test]
1076 fn tags_flow_through_sorted() {
1077 let mut tags = HashMap::new();
1078 tags.insert("cairn.task_id".to_owned(), "t-1".to_owned());
1079 tags.insert("cairn.project".to_owned(), "proj".to_owned());
1080 let snap = build_execution_snapshot(eid(), &minimal_core("waiting"), tags)
1081 .unwrap()
1082 .unwrap();
1083 let keys: Vec<_> = snap.tags.keys().cloned().collect();
1084 assert_eq!(
1085 keys,
1086 vec!["cairn.project".to_owned(), "cairn.task_id".to_owned()]
1087 );
1088 }
1089
1090 #[test]
1091 fn invalid_public_state_fails_loud() {
1092 let err =
1093 build_execution_snapshot(eid(), &minimal_core("bogus"), HashMap::new()).unwrap_err();
1094 match err {
1095 SdkError::Config { field, message: msg, .. } => {
1096 assert_eq!(field.as_deref(), Some("public_state"), "msg: {msg}");
1097 }
1098 other => panic!("expected Config, got {other:?}"),
1099 }
1100 }
1101
1102 #[test]
1103 fn invalid_lane_id_fails_loud() {
1104 let mut core = minimal_core("waiting");
1107 core.insert("lane_id".to_owned(), "lane\nbroken".to_owned());
1108 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1109 match err {
1110 SdkError::Config { field, message: msg, .. } => {
1111 assert_eq!(field.as_deref(), Some("lane_id"), "msg: {msg}");
1112 }
1113 other => panic!("expected Config, got {other:?}"),
1114 }
1115 }
1116
1117 #[test]
1118 fn missing_required_timestamps_fail_loud() {
1119 for want in ["created_at", "last_mutation_at"] {
1120 let mut core = minimal_core("waiting");
1121 core.remove(want);
1122 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1123 match err {
1124 SdkError::Config { field, message: msg, .. } => {
1125 assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1126 }
1127 other => panic!("expected Config for {want}, got {other:?}"),
1128 }
1129 }
1130 }
1131
1132 #[test]
1133 fn malformed_total_attempt_count_fails_loud() {
1134 let mut core = minimal_core("waiting");
1135 core.insert("total_attempt_count".to_owned(), "not-a-number".to_owned());
1136 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1137 match err {
1138 SdkError::Config { field, message: msg, .. } => {
1139 assert_eq!(field.as_deref(), Some("total_attempt_count"), "msg: {msg}");
1140 }
1141 other => panic!("expected Config, got {other:?}"),
1142 }
1143 }
1144
1145 #[test]
1146 fn attempt_id_without_index_fails_loud() {
1147 let mut core = minimal_core("active");
1150 core.insert(
1151 "current_attempt_id".to_owned(),
1152 AttemptId::new().to_string(),
1153 );
1154 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1155 match err {
1156 SdkError::Config { field, message: msg, .. } => {
1157 assert_eq!(field.as_deref(), Some("current_attempt_index"), "msg: {msg}");
1158 }
1159 other => panic!("expected Config, got {other:?}"),
1160 }
1161 }
1162
1163 #[test]
1164 fn lease_without_epoch_fails_loud() {
1165 let mut core = minimal_core("active");
1167 core.insert(
1168 "current_worker_instance_id".to_owned(),
1169 "w-inst-1".to_owned(),
1170 );
1171 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1172 let err = build_execution_snapshot(eid(), &core, HashMap::new()).unwrap_err();
1173 match err {
1174 SdkError::Config { field, message: msg, .. } => {
1175 assert_eq!(field.as_deref(), Some("current_lease_epoch"), "msg: {msg}");
1176 }
1177 other => panic!("expected Config, got {other:?}"),
1178 }
1179 }
1180
1181 #[test]
1182 fn lease_summary_requires_both_wid_and_expires_at() {
1183 let mut core = minimal_core("active");
1186 core.insert(
1187 "current_worker_instance_id".to_owned(),
1188 "w-inst-1".to_owned(),
1189 );
1190 let snap = build_execution_snapshot(eid(), &core, HashMap::new())
1191 .unwrap()
1192 .unwrap();
1193 assert!(snap.current_lease.is_none());
1194
1195 core.insert("lease_expires_at".to_owned(), "9000".to_owned());
1196 core.insert("current_lease_epoch".to_owned(), "3".to_owned());
1197 let snap = build_execution_snapshot(eid(), &core, HashMap::new())
1198 .unwrap()
1199 .unwrap();
1200 let lease = snap.current_lease.expect("lease present");
1201 assert_eq!(lease.lease_epoch, LeaseEpoch::new(3));
1202 assert_eq!(lease.expires_at.0, 9000);
1203 assert_eq!(lease.worker_instance_id.as_str(), "w-inst-1");
1204 }
1205
1206 fn fid() -> FlowId {
1209 FlowId::new()
1210 }
1211
1212 fn minimal_flow_core(id: &FlowId, state: &str) -> HashMap<String, String> {
1213 let mut m = HashMap::new();
1214 m.insert("flow_id".to_owned(), id.to_string());
1215 m.insert("flow_kind".to_owned(), "dag".to_owned());
1216 m.insert("namespace".to_owned(), "ns".to_owned());
1217 m.insert("public_flow_state".to_owned(), state.to_owned());
1218 m.insert("graph_revision".to_owned(), "0".to_owned());
1219 m.insert("node_count".to_owned(), "0".to_owned());
1220 m.insert("edge_count".to_owned(), "0".to_owned());
1221 m.insert("created_at".to_owned(), "1000".to_owned());
1222 m.insert("last_mutation_at".to_owned(), "1000".to_owned());
1223 m
1224 }
1225
1226 #[test]
1227 fn open_flow_round_trips() {
1228 let f = fid();
1229 let snap = build_flow_snapshot(f.clone(), &minimal_flow_core(&f, "open")).unwrap();
1230 assert_eq!(snap.flow_id, f);
1231 assert_eq!(snap.flow_kind, "dag");
1232 assert_eq!(snap.namespace.as_str(), "ns");
1233 assert_eq!(snap.public_flow_state, "open");
1234 assert_eq!(snap.graph_revision, 0);
1235 assert_eq!(snap.node_count, 0);
1236 assert_eq!(snap.edge_count, 0);
1237 assert_eq!(snap.created_at.0, 1000);
1238 assert_eq!(snap.last_mutation_at.0, 1000);
1239 assert!(snap.cancelled_at.is_none());
1240 assert!(snap.cancel_reason.is_none());
1241 assert!(snap.cancellation_policy.is_none());
1242 assert!(snap.tags.is_empty());
1243 }
1244
1245 #[test]
1246 fn cancelled_flow_surfaces_cancel_fields() {
1247 let f = fid();
1248 let mut core = minimal_flow_core(&f, "cancelled");
1249 core.insert("cancelled_at".to_owned(), "2000".to_owned());
1250 core.insert("cancel_reason".to_owned(), "operator".to_owned());
1251 core.insert("cancellation_policy".to_owned(), "cancel_all".to_owned());
1252 let snap = build_flow_snapshot(f, &core).unwrap();
1253 assert_eq!(snap.public_flow_state, "cancelled");
1254 assert_eq!(snap.cancelled_at.unwrap().0, 2000);
1255 assert_eq!(snap.cancel_reason.as_deref(), Some("operator"));
1256 assert_eq!(snap.cancellation_policy.as_deref(), Some("cancel_all"));
1257 }
1258
1259 #[test]
1260 fn namespaced_tags_routed_to_tags_map() {
1261 let f = fid();
1262 let mut core = minimal_flow_core(&f, "open");
1263 core.insert("cairn.task_id".to_owned(), "t-1".to_owned());
1264 core.insert("cairn.project".to_owned(), "proj".to_owned());
1265 core.insert("operator.label".to_owned(), "v".to_owned());
1266 let snap = build_flow_snapshot(f, &core).unwrap();
1267 assert_eq!(snap.tags.len(), 3);
1268 let keys: Vec<_> = snap.tags.keys().cloned().collect();
1269 assert_eq!(
1271 keys,
1272 vec![
1273 "cairn.project".to_owned(),
1274 "cairn.task_id".to_owned(),
1275 "operator.label".to_owned()
1276 ]
1277 );
1278 }
1279
1280 #[test]
1281 fn unknown_flat_field_fails_loud() {
1282 let f = fid();
1285 let mut core = minimal_flow_core(&f, "open");
1286 core.insert("bogus_future_field".to_owned(), "v".to_owned());
1287 let err = build_flow_snapshot(f, &core).unwrap_err();
1288 match err {
1289 SdkError::Config { field, message: msg, .. } => {
1290 assert!(field.is_none(), "expected whole-object error, got field={field:?}");
1291 assert!(msg.contains("bogus_future_field"), "msg: {msg}");
1292 }
1293 other => panic!("expected Config, got {other:?}"),
1294 }
1295 }
1296
1297 #[test]
1298 fn missing_required_fields_fail_loud() {
1299 for want in [
1300 "flow_id",
1301 "namespace",
1302 "flow_kind",
1303 "public_flow_state",
1304 "graph_revision",
1305 "node_count",
1306 "edge_count",
1307 "created_at",
1308 "last_mutation_at",
1309 ] {
1310 let f = fid();
1311 let mut core = minimal_flow_core(&f, "open");
1312 core.remove(want);
1313 let err = build_flow_snapshot(f, &core).err().unwrap_or_else(|| {
1314 panic!("field {want} should fail but build_flow_snapshot returned Ok")
1315 });
1316 match err {
1317 SdkError::Config { field, message: msg, .. } => {
1318 assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1319 }
1320 other => panic!("expected Config for {want}, got {other:?}"),
1321 }
1322 }
1323 }
1324
1325 #[test]
1326 fn empty_required_strings_fail_loud() {
1327 for want in ["flow_id", "namespace", "flow_kind", "public_flow_state"] {
1330 let f = fid();
1331 let mut core = minimal_flow_core(&f, "open");
1332 core.insert(want.to_owned(), String::new());
1333 let err = build_flow_snapshot(f, &core).err().unwrap_or_else(|| {
1334 panic!("empty {want} should fail but build_flow_snapshot returned Ok")
1335 });
1336 match err {
1337 SdkError::Config { field, message: msg, .. } => {
1338 assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1339 }
1340 other => panic!("expected Config for {want}, got {other:?}"),
1341 }
1342 }
1343 }
1344
1345 #[test]
1346 fn flow_id_mismatch_fails_loud() {
1347 let requested = fid();
1350 let other = fid();
1351 let core = minimal_flow_core(&other, "open");
1352 let err = build_flow_snapshot(requested, &core).unwrap_err();
1353 match err {
1354 SdkError::Config { field, message: msg, .. } => {
1355 assert_eq!(field.as_deref(), Some("flow_id"), "msg: {msg}");
1356 assert!(msg.contains("does not match"), "msg: {msg}");
1357 }
1358 other => panic!("expected Config, got {other:?}"),
1359 }
1360 }
1361
1362 #[test]
1363 fn malformed_counter_fails_loud() {
1364 let f = fid();
1365 let mut core = minimal_flow_core(&f, "open");
1366 core.insert("graph_revision".to_owned(), "not-a-number".to_owned());
1367 let err = build_flow_snapshot(f, &core).unwrap_err();
1368 match err {
1369 SdkError::Config { field, message: msg, .. } => {
1370 assert_eq!(field.as_deref(), Some("graph_revision"), "msg: {msg}");
1371 }
1372 other => panic!("expected Config, got {other:?}"),
1373 }
1374 }
1375
1376 fn eids_for_flow(f: &FlowId) -> (ExecutionId, ExecutionId) {
1379 let cfg = PartitionConfig::default();
1380 (ExecutionId::for_flow(f, &cfg), ExecutionId::for_flow(f, &cfg))
1381 }
1382
1383 fn minimal_edge_hash(
1384 flow: &FlowId,
1385 edge: &EdgeId,
1386 up: &ExecutionId,
1387 down: &ExecutionId,
1388 ) -> HashMap<String, String> {
1389 let mut m = HashMap::new();
1390 m.insert("edge_id".into(), edge.to_string());
1391 m.insert("flow_id".into(), flow.to_string());
1392 m.insert("upstream_execution_id".into(), up.to_string());
1393 m.insert("downstream_execution_id".into(), down.to_string());
1394 m.insert("dependency_kind".into(), "success_only".into());
1395 m.insert("satisfaction_condition".into(), "all_required".into());
1396 m.insert("data_passing_ref".into(), String::new());
1397 m.insert("edge_state".into(), "pending".into());
1398 m.insert("created_at".into(), "1234".into());
1399 m.insert("created_by".into(), "engine".into());
1400 m
1401 }
1402
1403 #[test]
1404 fn edge_round_trips_all_fields() {
1405 let f = fid();
1406 let edge = EdgeId::new();
1407 let (up, down) = eids_for_flow(&f);
1408 let raw = minimal_edge_hash(&f, &edge, &up, &down);
1409 let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
1410 assert_eq!(snap.edge_id, edge);
1411 assert_eq!(snap.flow_id, f);
1412 assert_eq!(snap.upstream_execution_id, up);
1413 assert_eq!(snap.downstream_execution_id, down);
1414 assert_eq!(snap.dependency_kind, "success_only");
1415 assert_eq!(snap.satisfaction_condition, "all_required");
1416 assert!(snap.data_passing_ref.is_none());
1417 assert_eq!(snap.edge_state, "pending");
1418 assert_eq!(snap.created_at.0, 1234);
1419 assert_eq!(snap.created_by, "engine");
1420 }
1421
1422 #[test]
1423 fn edge_data_passing_ref_round_trips_when_set() {
1424 let f = fid();
1425 let edge = EdgeId::new();
1426 let (up, down) = eids_for_flow(&f);
1427 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1428 raw.insert("data_passing_ref".into(), "ref://blob-42".into());
1429 let snap = build_edge_snapshot(&f, &edge, &raw).unwrap();
1430 assert_eq!(snap.data_passing_ref.as_deref(), Some("ref://blob-42"));
1431 }
1432
1433 #[test]
1434 fn edge_unknown_field_fails_loud() {
1435 let f = fid();
1436 let edge = EdgeId::new();
1437 let (up, down) = eids_for_flow(&f);
1438 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1439 raw.insert("bogus_future_field".into(), "v".into());
1440 let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1441 match err {
1442 SdkError::Config { field, message: msg, .. } => {
1443 assert!(field.is_none(), "expected whole-object error, got field={field:?}");
1444 assert!(msg.contains("bogus_future_field"), "msg: {msg}");
1445 }
1446 other => panic!("expected Config, got {other:?}"),
1447 }
1448 }
1449
1450 #[test]
1451 fn edge_flow_id_mismatch_fails_loud() {
1452 let f = fid();
1453 let other = fid();
1454 let edge = EdgeId::new();
1455 let (up, down) = eids_for_flow(&f);
1456 let raw = minimal_edge_hash(&other, &edge, &up, &down);
1457 let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1458 match err {
1459 SdkError::Config { field, message: msg, .. } => {
1460 assert_eq!(field.as_deref(), Some("flow_id"), "msg: {msg}");
1461 assert!(msg.contains("does not match"), "msg: {msg}");
1462 }
1463 other => panic!("expected Config, got {other:?}"),
1464 }
1465 }
1466
1467 #[test]
1468 fn edge_edge_id_mismatch_fails_loud() {
1469 let f = fid();
1470 let edge = EdgeId::new();
1471 let other_edge = EdgeId::new();
1472 let (up, down) = eids_for_flow(&f);
1473 let raw = minimal_edge_hash(&f, &other_edge, &up, &down);
1474 let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1475 match err {
1476 SdkError::Config { field, message: msg, .. } => {
1477 assert_eq!(field.as_deref(), Some("edge_id"), "msg: {msg}");
1478 assert!(msg.contains("does not match"), "msg: {msg}");
1479 }
1480 other => panic!("expected Config, got {other:?}"),
1481 }
1482 }
1483
1484 #[test]
1485 fn edge_missing_required_fields_fail_loud() {
1486 for want in [
1487 "edge_id",
1488 "flow_id",
1489 "upstream_execution_id",
1490 "downstream_execution_id",
1491 "dependency_kind",
1492 "satisfaction_condition",
1493 "edge_state",
1494 "created_at",
1495 "created_by",
1496 ] {
1497 let f = fid();
1498 let edge = EdgeId::new();
1499 let (up, down) = eids_for_flow(&f);
1500 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1501 raw.remove(want);
1502 let err = build_edge_snapshot(&f, &edge, &raw)
1503 .err()
1504 .unwrap_or_else(|| panic!("missing {want} should fail"));
1505 match err {
1506 SdkError::Config { field, message: msg, .. } => {
1507 assert_eq!(field.as_deref(), Some(want), "msg for {want}: {msg}");
1508 }
1509 other => panic!("expected Config for {want}, got {other:?}"),
1510 }
1511 }
1512 }
1513
1514 #[test]
1515 fn edge_malformed_created_at_fails_loud() {
1516 let f = fid();
1517 let edge = EdgeId::new();
1518 let (up, down) = eids_for_flow(&f);
1519 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1520 raw.insert("created_at".into(), "not-a-number".into());
1521 let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1522 match err {
1523 SdkError::Config { field, message: msg, .. } => {
1524 assert_eq!(field.as_deref(), Some("created_at"), "msg: {msg}");
1525 }
1526 other => panic!("expected Config, got {other:?}"),
1527 }
1528 }
1529
1530 #[test]
1531 fn edge_malformed_upstream_eid_fails_loud() {
1532 let f = fid();
1533 let edge = EdgeId::new();
1534 let (up, down) = eids_for_flow(&f);
1535 let mut raw = minimal_edge_hash(&f, &edge, &up, &down);
1536 raw.insert("upstream_execution_id".into(), "not-an-execution-id".into());
1537 let err = build_edge_snapshot(&f, &edge, &raw).unwrap_err();
1538 match err {
1539 SdkError::Config { field, message: msg, .. } => {
1540 assert_eq!(field.as_deref(), Some("upstream_execution_id"), "msg: {msg}");
1541 }
1542 other => panic!("expected Config, got {other:?}"),
1543 }
1544 }
1545
1546 #[test]
1547 fn namespaced_tag_matcher_boundaries() {
1548 assert!(is_namespaced_tag_key("cairn.task_id"));
1550 assert!(is_namespaced_tag_key("a.b"));
1551 assert!(is_namespaced_tag_key("ab_12.field"));
1552 assert!(!is_namespaced_tag_key("cairn_task_id"));
1554 assert!(!is_namespaced_tag_key("Cairn.task"));
1556 assert!(!is_namespaced_tag_key("1cairn.task"));
1558 assert!(!is_namespaced_tag_key(""));
1560 assert!(!is_namespaced_tag_key(".x"));
1562 assert!(!is_namespaced_tag_key("caIrn.task"));
1564 }
1565}