1use std::{
13 fs::File,
14 io::{BufWriter, Write},
15 num::NonZeroUsize,
16 path::Path,
17 sync::{Mutex, MutexGuard, PoisonError},
18};
19
20use serde_json::{Value, json};
21
22use pureflow_types::{NodeId, PortId, WorkflowId};
23
24use crate::{
25 PureflowError, Result,
26 capability::EffectCapability,
27 context::{CancellationState, ExecutionMetadata, NodeContext},
28 lifecycle::{LifecycleEvent, LifecycleEventKind},
29 message::{MessageEndpoint, MessageMetadata, MessageRoute},
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum MessageBoundaryKind {
35 Enqueued,
37 Dequeued,
39 Dropped,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct MessageBoundaryRecord {
46 kind: MessageBoundaryKind,
47 metadata: MessageMetadata,
48}
49
50impl MessageBoundaryRecord {
51 #[must_use]
53 pub const fn new(kind: MessageBoundaryKind, metadata: MessageMetadata) -> Self {
54 Self { kind, metadata }
55 }
56
57 #[must_use]
59 pub const fn kind(&self) -> MessageBoundaryKind {
60 self.kind
61 }
62
63 #[must_use]
65 pub const fn metadata(&self) -> &MessageMetadata {
66 &self.metadata
67 }
68}
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum QueuePortDirection {
73 Input,
75 Output,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum QueuePressureBoundaryKind {
82 ReceiveAttempted,
84 ReceiveReady,
86 ReceiveEmpty,
88 ReceiveClosed,
90 ReserveAttempted,
92 ReserveReady,
94 ReserveFull,
96 SendCommitted,
98 SendDropped,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct QueuePressureRecord {
105 context: Option<NodeContext>,
106 direction: QueuePortDirection,
107 port_id: PortId,
108 kind: QueuePressureBoundaryKind,
109 connected_edge_count: usize,
110 capacity: Option<usize>,
111 queued_count: Option<usize>,
112}
113
114impl QueuePressureRecord {
115 #[must_use]
117 pub const fn new(
118 context: Option<NodeContext>,
119 direction: QueuePortDirection,
120 port_id: PortId,
121 kind: QueuePressureBoundaryKind,
122 connected_edge_count: usize,
123 capacity: Option<usize>,
124 queued_count: Option<usize>,
125 ) -> Self {
126 Self {
127 context,
128 direction,
129 port_id,
130 kind,
131 connected_edge_count,
132 capacity,
133 queued_count,
134 }
135 }
136
137 #[must_use]
139 pub const fn context(&self) -> Option<&NodeContext> {
140 self.context.as_ref()
141 }
142
143 #[must_use]
145 pub const fn direction(&self) -> QueuePortDirection {
146 self.direction
147 }
148
149 #[must_use]
151 pub const fn port_id(&self) -> &PortId {
152 &self.port_id
153 }
154
155 #[must_use]
157 pub const fn kind(&self) -> QueuePressureBoundaryKind {
158 self.kind
159 }
160
161 #[must_use]
163 pub const fn connected_edge_count(&self) -> usize {
164 self.connected_edge_count
165 }
166
167 #[must_use]
169 pub const fn capacity(&self) -> Option<usize> {
170 self.capacity
171 }
172
173 #[must_use]
175 pub const fn queued_count(&self) -> Option<usize> {
176 self.queued_count
177 }
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub enum ErrorMetadataKind {
183 NodeFailed,
185 WorkflowFailed,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct ErrorMetadataRecord {
192 kind: ErrorMetadataKind,
193 workflow_id: WorkflowId,
194 node_id: Option<NodeId>,
195 execution: ExecutionMetadata,
196 error: PureflowError,
197 diagnostic: Option<ErrorDiagnosticMetadata>,
198}
199
200impl ErrorMetadataRecord {
201 #[must_use]
203 pub fn node_failed(context: &NodeContext, error: PureflowError) -> Self {
204 Self {
205 kind: ErrorMetadataKind::NodeFailed,
206 workflow_id: context.workflow_id().clone(),
207 node_id: Some(context.node_id().clone()),
208 execution: context.execution().clone(),
209 error,
210 diagnostic: None,
211 }
212 }
213
214 #[must_use]
216 pub const fn workflow_failed(
217 workflow_id: WorkflowId,
218 execution: ExecutionMetadata,
219 error: PureflowError,
220 ) -> Self {
221 Self {
222 kind: ErrorMetadataKind::WorkflowFailed,
223 workflow_id,
224 node_id: None,
225 execution,
226 error,
227 diagnostic: None,
228 }
229 }
230
231 #[must_use]
233 pub const fn workflow_failed_with_diagnostic(
234 workflow_id: WorkflowId,
235 execution: ExecutionMetadata,
236 error: PureflowError,
237 diagnostic: ErrorDiagnosticMetadata,
238 ) -> Self {
239 Self {
240 kind: ErrorMetadataKind::WorkflowFailed,
241 workflow_id,
242 node_id: None,
243 execution,
244 error,
245 diagnostic: Some(diagnostic),
246 }
247 }
248
249 #[must_use]
251 pub const fn kind(&self) -> ErrorMetadataKind {
252 self.kind
253 }
254
255 #[must_use]
257 pub const fn workflow_id(&self) -> &WorkflowId {
258 &self.workflow_id
259 }
260
261 #[must_use]
263 pub const fn node_id(&self) -> Option<&NodeId> {
264 self.node_id.as_ref()
265 }
266
267 #[must_use]
269 pub const fn execution(&self) -> &ExecutionMetadata {
270 &self.execution
271 }
272
273 #[must_use]
275 pub const fn error(&self) -> &PureflowError {
276 &self.error
277 }
278
279 #[must_use]
281 pub const fn diagnostic(&self) -> Option<&ErrorDiagnosticMetadata> {
282 self.diagnostic.as_ref()
283 }
284}
285
286#[derive(Debug, Clone, PartialEq, Eq)]
288pub enum ErrorDiagnosticMetadata {
289 WorkflowDeadlock(DeadlockDiagnosticMetadata),
291}
292
293impl ErrorDiagnosticMetadata {
294 #[must_use]
296 pub const fn workflow_deadlock(diagnostic: DeadlockDiagnosticMetadata) -> Self {
297 Self::WorkflowDeadlock(diagnostic)
298 }
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
303pub enum ExternalEffectMetadataKind {
304 Requested,
306 Completed,
308 Failed,
310}
311
312#[derive(Debug, Clone, PartialEq, Eq)]
314pub struct ExternalEffectMetadataRecord {
315 kind: ExternalEffectMetadataKind,
316 context: NodeContext,
317 effect: EffectCapability,
318 operation: String,
319 target: String,
320 response_status: Option<String>,
321}
322
323impl ExternalEffectMetadataRecord {
324 #[must_use]
326 pub fn new(
327 kind: ExternalEffectMetadataKind,
328 context: NodeContext,
329 effect: EffectCapability,
330 operation: impl Into<String>,
331 target: impl Into<String>,
332 response_status: Option<String>,
333 ) -> Self {
334 Self {
335 kind,
336 context,
337 effect,
338 operation: operation.into(),
339 target: target.into(),
340 response_status,
341 }
342 }
343
344 #[must_use]
346 pub fn requested(
347 context: NodeContext,
348 effect: EffectCapability,
349 operation: impl Into<String>,
350 target: impl Into<String>,
351 ) -> Self {
352 Self::new(
353 ExternalEffectMetadataKind::Requested,
354 context,
355 effect,
356 operation,
357 target,
358 None,
359 )
360 }
361
362 #[must_use]
364 pub fn completed(
365 context: NodeContext,
366 effect: EffectCapability,
367 operation: impl Into<String>,
368 target: impl Into<String>,
369 response_status: impl Into<String>,
370 ) -> Self {
371 Self::new(
372 ExternalEffectMetadataKind::Completed,
373 context,
374 effect,
375 operation,
376 target,
377 Some(response_status.into()),
378 )
379 }
380
381 #[must_use]
383 pub fn failed(
384 context: NodeContext,
385 effect: EffectCapability,
386 operation: impl Into<String>,
387 target: impl Into<String>,
388 response_status: Option<String>,
389 ) -> Self {
390 Self::new(
391 ExternalEffectMetadataKind::Failed,
392 context,
393 effect,
394 operation,
395 target,
396 response_status,
397 )
398 }
399
400 #[must_use]
402 pub const fn kind(&self) -> ExternalEffectMetadataKind {
403 self.kind
404 }
405
406 #[must_use]
408 pub const fn context(&self) -> &NodeContext {
409 &self.context
410 }
411
412 #[must_use]
414 pub const fn effect(&self) -> EffectCapability {
415 self.effect
416 }
417
418 #[must_use]
420 pub fn operation(&self) -> &str {
421 &self.operation
422 }
423
424 #[must_use]
426 pub fn target(&self) -> &str {
427 &self.target
428 }
429
430 #[must_use]
432 pub fn response_status(&self) -> Option<&str> {
433 self.response_status.as_deref()
434 }
435}
436
437#[derive(Debug, Clone, PartialEq, Eq)]
439pub struct DeadlockDiagnosticMetadata {
440 scheduled_node_count: usize,
441 pending_node_count: usize,
442 completed_node_count: usize,
443 failed_node_count: usize,
444 cancelled_node_count: usize,
445 bounded_edge_count: usize,
446 no_progress_timeout_ms: u64,
447 cycle_policy: &'static str,
448 feedback_loop_startup: Option<&'static str>,
449 feedback_loop_termination: Option<&'static str>,
450}
451
452impl DeadlockDiagnosticMetadata {
453 #[must_use]
455 pub const fn new(
456 scheduled_node_count: usize,
457 pending_node_count: usize,
458 bounded_edge_count: usize,
459 no_progress_timeout_ms: u64,
460 cycle_policy: &'static str,
461 ) -> Self {
462 Self {
463 scheduled_node_count,
464 pending_node_count,
465 completed_node_count: 0,
466 failed_node_count: 0,
467 cancelled_node_count: 0,
468 bounded_edge_count,
469 no_progress_timeout_ms,
470 cycle_policy,
471 feedback_loop_startup: None,
472 feedback_loop_termination: None,
473 }
474 }
475
476 #[must_use]
478 pub const fn with_terminal_counts(
479 mut self,
480 completed_node_count: usize,
481 failed_node_count: usize,
482 cancelled_node_count: usize,
483 ) -> Self {
484 self.completed_node_count = completed_node_count;
485 self.failed_node_count = failed_node_count;
486 self.cancelled_node_count = cancelled_node_count;
487 self
488 }
489
490 #[must_use]
492 pub const fn with_feedback_loop(
493 mut self,
494 startup: &'static str,
495 termination: &'static str,
496 ) -> Self {
497 self.feedback_loop_startup = Some(startup);
498 self.feedback_loop_termination = Some(termination);
499 self
500 }
501
502 #[must_use]
504 pub const fn scheduled_node_count(&self) -> usize {
505 self.scheduled_node_count
506 }
507
508 #[must_use]
510 pub const fn pending_node_count(&self) -> usize {
511 self.pending_node_count
512 }
513
514 #[must_use]
516 pub const fn completed_node_count(&self) -> usize {
517 self.completed_node_count
518 }
519
520 #[must_use]
522 pub const fn failed_node_count(&self) -> usize {
523 self.failed_node_count
524 }
525
526 #[must_use]
528 pub const fn cancelled_node_count(&self) -> usize {
529 self.cancelled_node_count
530 }
531
532 #[must_use]
534 pub const fn bounded_edge_count(&self) -> usize {
535 self.bounded_edge_count
536 }
537
538 #[must_use]
540 pub const fn no_progress_timeout_ms(&self) -> u64 {
541 self.no_progress_timeout_ms
542 }
543
544 #[must_use]
546 pub const fn cycle_policy(&self) -> &'static str {
547 self.cycle_policy
548 }
549
550 #[must_use]
552 pub const fn feedback_loop_startup(&self) -> Option<&'static str> {
553 self.feedback_loop_startup
554 }
555
556 #[must_use]
558 pub const fn feedback_loop_termination(&self) -> Option<&'static str> {
559 self.feedback_loop_termination
560 }
561}
562
563#[derive(Debug, Clone, PartialEq, Eq)]
565pub enum MetadataRecord {
566 ExecutionContext(NodeContext),
568 Lifecycle(LifecycleEvent),
570 Message(MessageBoundaryRecord),
572 QueuePressure(QueuePressureRecord),
574 Error(ErrorMetadataRecord),
576 ExternalEffect(ExternalEffectMetadataRecord),
578}
579
580#[derive(Debug, Clone, Copy, PartialEq, Eq)]
582pub enum MetadataTier {
583 Control,
585 Data,
587 HighCostData,
589}
590
591#[derive(Debug, Clone, Copy, PartialEq, Eq)]
593pub struct TieredMetadataPolicy {
594 data_sample_rate: Option<NonZeroUsize>,
595 record_high_cost_data: bool,
596}
597
598impl TieredMetadataPolicy {
599 #[must_use]
601 pub const fn control_only() -> Self {
602 Self {
603 data_sample_rate: None,
604 record_high_cost_data: false,
605 }
606 }
607
608 #[must_use]
610 pub const fn record_data() -> Self {
611 Self {
612 data_sample_rate: Some(NonZeroUsize::MIN),
613 record_high_cost_data: false,
614 }
615 }
616
617 #[must_use]
619 pub const fn sample_data_every(sample_rate: NonZeroUsize) -> Self {
620 Self {
621 data_sample_rate: Some(sample_rate),
622 record_high_cost_data: false,
623 }
624 }
625
626 #[must_use]
628 pub const fn with_high_cost_data(mut self) -> Self {
629 self.record_high_cost_data = true;
630 self
631 }
632
633 fn should_record_data(self, ordinal: usize) -> bool {
634 self.data_sample_rate
635 .is_some_and(|sample_rate: NonZeroUsize| ordinal.is_multiple_of(sample_rate.get()))
636 }
637
638 const fn should_record_high_cost_data(self) -> bool {
639 self.record_high_cost_data
640 }
641}
642
643impl Default for TieredMetadataPolicy {
644 fn default() -> Self {
645 Self::control_only()
646 }
647}
648
649#[must_use]
655pub fn metadata_record_to_json_value(record: &MetadataRecord) -> Value {
656 match record {
657 MetadataRecord::ExecutionContext(context) => json!({
658 "record_type": "execution_context",
659 "kind": "execution_context",
660 "context": node_context_to_json_value(context),
661 }),
662 MetadataRecord::Lifecycle(event) => json!({
663 "record_type": "lifecycle",
664 "kind": lifecycle_event_kind_label(event.kind()),
665 "context": node_context_to_json_value(event.context()),
666 }),
667 MetadataRecord::Message(message) => json!({
668 "record_type": "message",
669 "kind": message_boundary_kind_label(message.kind()),
670 "message": message_metadata_to_json_value(message.metadata()),
671 }),
672 MetadataRecord::QueuePressure(queue) => json!({
673 "record_type": "queue_pressure",
674 "kind": queue_pressure_boundary_kind_label(queue.kind()),
675 "direction": queue_port_direction_label(queue.direction()),
676 "port_id": queue.port_id().as_str(),
677 "context": queue
678 .context()
679 .map_or(Value::Null, node_context_to_json_value),
680 "connected_edge_count": queue.connected_edge_count(),
681 "capacity": queue.capacity(),
682 "queued_count": queue.queued_count(),
683 }),
684 MetadataRecord::Error(error) => json!({
685 "record_type": "error",
686 "kind": error_metadata_kind_label(error.kind()),
687 "workflow_id": error.workflow_id().as_str(),
688 "node_id": error
689 .node_id()
690 .map_or(Value::Null, |node_id: &NodeId| json!(node_id.as_str())),
691 "execution": execution_metadata_to_json_value(error.execution()),
692 "error": pureflow_error_to_json_value(error.error()),
693 "diagnostic": error
694 .diagnostic()
695 .map_or(Value::Null, error_diagnostic_metadata_to_json_value),
696 }),
697 MetadataRecord::ExternalEffect(effect) => json!({
698 "record_type": "external_effect",
699 "kind": external_effect_metadata_kind_label(effect.kind()),
700 "context": node_context_to_json_value(effect.context()),
701 "effect": effect.effect().as_str(),
702 "operation": effect.operation(),
703 "target": effect.target(),
704 "response_status": effect.response_status(),
705 }),
706 }
707}
708
709pub trait MetadataSink: Send + Sync {
711 fn record(&self, record: &MetadataRecord) -> Result<()>;
717}
718
719#[derive(Debug, Clone, Copy, Default)]
721pub struct NoopMetadataSink;
722
723impl MetadataSink for NoopMetadataSink {
724 fn record(&self, _record: &MetadataRecord) -> Result<()> {
725 Ok(())
726 }
727}
728
729#[derive(Debug)]
731pub struct TieredMetadataSink<S> {
732 inner: S,
733 policy: TieredMetadataPolicy,
734 counters: Mutex<TieredMetadataCounters>,
735}
736
737#[derive(Debug, Default)]
738struct TieredMetadataCounters {
739 data_seen: usize,
740}
741
742impl<S> TieredMetadataSink<S> {
743 #[must_use]
745 pub const fn new(inner: S) -> Self {
746 Self::with_policy(inner, TieredMetadataPolicy::control_only())
747 }
748
749 #[must_use]
751 pub const fn with_policy(inner: S, policy: TieredMetadataPolicy) -> Self {
752 Self {
753 inner,
754 policy,
755 counters: Mutex::new(TieredMetadataCounters { data_seen: 0 }),
756 }
757 }
758
759 #[must_use]
761 pub const fn policy(&self) -> TieredMetadataPolicy {
762 self.policy
763 }
764
765 #[must_use]
767 pub fn into_inner(self) -> S {
768 self.inner
769 }
770
771 fn should_record(&self, tier: MetadataTier) -> Result<bool> {
772 match tier {
773 MetadataTier::Control => Ok(true),
774 MetadataTier::Data => {
775 let ordinal: usize = {
776 let mut counters: MutexGuard<'_, TieredMetadataCounters> =
777 self.counters.lock().map_err(
778 |_err: PoisonError<MutexGuard<'_, TieredMetadataCounters>>| {
779 tiered_lock_error()
780 },
781 )?;
782 let ordinal: usize = counters.data_seen;
783 counters.data_seen = counters.data_seen.saturating_add(1);
784 ordinal
785 };
786 Ok(self.policy.should_record_data(ordinal))
787 }
788 MetadataTier::HighCostData => Ok(self.policy.should_record_high_cost_data()),
789 }
790 }
791}
792
793impl<S> TieredMetadataSink<S>
794where
795 S: MetadataSink,
796{
797 pub fn record_with_tier(&self, tier: MetadataTier, record: &MetadataRecord) -> Result<()> {
804 if self.should_record(tier)? {
805 self.inner.record(record)
806 } else {
807 Ok(())
808 }
809 }
810}
811
812impl<S> MetadataSink for TieredMetadataSink<S>
813where
814 S: MetadataSink,
815{
816 fn record(&self, record: &MetadataRecord) -> Result<()> {
817 self.record_with_tier(MetadataTier::Control, record)
818 }
819}
820
821#[derive(Debug)]
823pub struct JsonlMetadataSink<W> {
824 writer: Mutex<W>,
825}
826
827impl<W> JsonlMetadataSink<W> {
828 #[must_use]
830 pub const fn new(writer: W) -> Self {
831 Self {
832 writer: Mutex::new(writer),
833 }
834 }
835
836 pub fn into_inner(self) -> Result<W> {
842 self.writer
843 .into_inner()
844 .map_err(|_err: PoisonError<W>| jsonl_lock_error())
845 }
846
847 fn lock_writer(&self) -> Result<MutexGuard<'_, W>> {
848 self.writer
849 .lock()
850 .map_err(|_err: PoisonError<MutexGuard<'_, W>>| jsonl_lock_error())
851 }
852}
853
854impl<W> JsonlMetadataSink<W>
855where
856 W: Write,
857{
858 pub fn flush(&self) -> Result<()> {
864 let mut writer: MutexGuard<'_, W> = self.lock_writer()?;
865 writer.flush().map_err(|source: std::io::Error| {
866 crate::PureflowError::metadata(format!("failed to flush metadata JSONL: {source}"))
867 })
868 }
869}
870
871impl JsonlMetadataSink<BufWriter<File>> {
872 pub fn create(path: impl AsRef<Path>) -> Result<Self> {
878 let file: File = File::create(path).map_err(|source: std::io::Error| {
879 crate::PureflowError::metadata(format!("failed to create metadata JSONL file: {source}"))
880 })?;
881
882 Ok(Self::new(BufWriter::new(file)))
883 }
884}
885
886impl<W> MetadataSink for JsonlMetadataSink<W>
887where
888 W: Write + Send,
889{
890 fn record(&self, record: &MetadataRecord) -> Result<()> {
891 let value: Value = metadata_record_to_json_value(record);
892 let mut writer: MutexGuard<'_, W> = self.lock_writer()?;
893
894 serde_json::to_writer(&mut *writer, &value).map_err(|source: serde_json::Error| {
895 crate::PureflowError::metadata(format!(
896 "failed to encode metadata JSONL record: {source}"
897 ))
898 })?;
899 writer.write_all(b"\n").map_err(|source: std::io::Error| {
900 crate::PureflowError::metadata(format!(
901 "failed to write metadata JSONL newline: {source}"
902 ))
903 })
904 }
905}
906
907fn node_context_to_json_value(context: &NodeContext) -> Value {
908 json!({
909 "workflow_id": context.workflow_id().as_str(),
910 "node_id": context.node_id().as_str(),
911 "execution": execution_metadata_to_json_value(context.execution()),
912 "cancellation": cancellation_state_to_json_value(context.cancellation()),
913 })
914}
915
916fn execution_metadata_to_json_value(execution: &ExecutionMetadata) -> Value {
917 json!({
918 "execution_id": execution.execution_id().as_str(),
919 "attempt": execution.attempt().get(),
920 })
921}
922
923fn cancellation_state_to_json_value(cancellation: CancellationState) -> Value {
924 match cancellation {
925 CancellationState::Active => json!({
926 "state": "active",
927 }),
928 CancellationState::Requested(request) => json!({
929 "state": "requested",
930 "reason": request.reason(),
931 }),
932 }
933}
934
935const fn lifecycle_event_kind_label(kind: LifecycleEventKind) -> &'static str {
936 match kind {
937 LifecycleEventKind::NodeScheduled => "node_scheduled",
938 LifecycleEventKind::NodeStarted => "node_started",
939 LifecycleEventKind::NodeCompleted => "node_completed",
940 LifecycleEventKind::NodeFailed => "node_failed",
941 LifecycleEventKind::NodeCancelled => "node_cancelled",
942 }
943}
944
945const fn message_boundary_kind_label(kind: MessageBoundaryKind) -> &'static str {
946 match kind {
947 MessageBoundaryKind::Enqueued => "enqueued",
948 MessageBoundaryKind::Dequeued => "dequeued",
949 MessageBoundaryKind::Dropped => "dropped",
950 }
951}
952
953const fn queue_port_direction_label(direction: QueuePortDirection) -> &'static str {
954 match direction {
955 QueuePortDirection::Input => "input",
956 QueuePortDirection::Output => "output",
957 }
958}
959
960const fn queue_pressure_boundary_kind_label(kind: QueuePressureBoundaryKind) -> &'static str {
961 match kind {
962 QueuePressureBoundaryKind::ReceiveAttempted => "receive_attempted",
963 QueuePressureBoundaryKind::ReceiveReady => "receive_ready",
964 QueuePressureBoundaryKind::ReceiveEmpty => "receive_empty",
965 QueuePressureBoundaryKind::ReceiveClosed => "receive_closed",
966 QueuePressureBoundaryKind::ReserveAttempted => "reserve_attempted",
967 QueuePressureBoundaryKind::ReserveReady => "reserve_ready",
968 QueuePressureBoundaryKind::ReserveFull => "reserve_full",
969 QueuePressureBoundaryKind::SendCommitted => "send_committed",
970 QueuePressureBoundaryKind::SendDropped => "send_dropped",
971 }
972}
973
974const fn error_metadata_kind_label(kind: ErrorMetadataKind) -> &'static str {
975 match kind {
976 ErrorMetadataKind::NodeFailed => "node_failed",
977 ErrorMetadataKind::WorkflowFailed => "workflow_failed",
978 }
979}
980
981const fn external_effect_metadata_kind_label(kind: ExternalEffectMetadataKind) -> &'static str {
982 match kind {
983 ExternalEffectMetadataKind::Requested => "external_effect_requested",
984 ExternalEffectMetadataKind::Completed => "external_effect_completed",
985 ExternalEffectMetadataKind::Failed => "external_effect_failed",
986 }
987}
988
989fn pureflow_error_to_json_value(error: &PureflowError) -> Value {
990 json!({
991 "code": error.code().as_str(),
992 "message": error.to_string(),
993 "visibility": error_visibility_label(error.visibility()),
994 "retry_disposition": retry_disposition_label(error.retry_disposition()),
995 })
996}
997
998fn error_diagnostic_metadata_to_json_value(diagnostic: &ErrorDiagnosticMetadata) -> Value {
999 match diagnostic {
1000 ErrorDiagnosticMetadata::WorkflowDeadlock(deadlock) => json!({
1001 "type": "workflow_deadlock",
1002 "scheduled_node_count": deadlock.scheduled_node_count(),
1003 "pending_node_count": deadlock.pending_node_count(),
1004 "completed_node_count": deadlock.completed_node_count(),
1005 "failed_node_count": deadlock.failed_node_count(),
1006 "cancelled_node_count": deadlock.cancelled_node_count(),
1007 "bounded_edge_count": deadlock.bounded_edge_count(),
1008 "no_progress_timeout_ms": deadlock.no_progress_timeout_ms(),
1009 "cycle_policy": deadlock.cycle_policy(),
1010 "feedback_loop_startup": deadlock.feedback_loop_startup(),
1011 "feedback_loop_termination": deadlock.feedback_loop_termination(),
1012 }),
1013 }
1014}
1015
1016const fn error_visibility_label(visibility: crate::ErrorVisibility) -> &'static str {
1017 match visibility {
1018 crate::ErrorVisibility::User => "user",
1019 crate::ErrorVisibility::Internal => "internal",
1020 }
1021}
1022
1023const fn retry_disposition_label(disposition: crate::RetryDisposition) -> &'static str {
1024 match disposition {
1025 crate::RetryDisposition::Never => "never",
1026 crate::RetryDisposition::Safe => "safe",
1027 crate::RetryDisposition::Unknown => "unknown",
1028 }
1029}
1030
1031fn message_metadata_to_json_value(metadata: &MessageMetadata) -> Value {
1032 json!({
1033 "message_id": metadata.message_id().as_str(),
1034 "workflow_id": metadata.workflow_id().as_str(),
1035 "execution": execution_metadata_to_json_value(metadata.execution()),
1036 "route": message_route_to_json_value(metadata.route()),
1037 })
1038}
1039
1040fn message_route_to_json_value(route: &MessageRoute) -> Value {
1041 json!({
1042 "source": route
1043 .source()
1044 .map_or(Value::Null, message_endpoint_to_json_value),
1045 "target": message_endpoint_to_json_value(route.target()),
1046 })
1047}
1048
1049fn message_endpoint_to_json_value(endpoint: &MessageEndpoint) -> Value {
1050 json!({
1051 "node_id": endpoint.node_id().as_str(),
1052 "port_id": endpoint.port_id().as_str(),
1053 })
1054}
1055
1056fn jsonl_lock_error() -> crate::PureflowError {
1057 crate::PureflowError::metadata("metadata JSONL writer lock poisoned")
1058}
1059
1060fn tiered_lock_error() -> crate::PureflowError {
1061 crate::PureflowError::metadata("tiered metadata policy lock poisoned")
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067 use crate::context::{CancellationRequest, ExecutionAttempt, ExecutionMetadata};
1068 use pureflow_types::{ExecutionId, MessageId, NodeId, PortId, WorkflowId};
1069 use std::io;
1070 use std::num::NonZeroU32;
1071 use std::sync::Arc;
1072
1073 use quickcheck::{Arbitrary, Gen, QuickCheck};
1074
1075 fn execution_id(value: &str) -> ExecutionId {
1076 ExecutionId::new(value).expect("valid execution id")
1077 }
1078
1079 fn node_id(value: &str) -> NodeId {
1080 NodeId::new(value).expect("valid node id")
1081 }
1082
1083 fn workflow_id(value: &str) -> WorkflowId {
1084 WorkflowId::new(value).expect("valid workflow id")
1085 }
1086
1087 fn port_id(value: &str) -> PortId {
1088 PortId::new(value).expect("valid port id")
1089 }
1090
1091 fn context() -> NodeContext {
1092 NodeContext::new(
1093 workflow_id("flow"),
1094 node_id("node"),
1095 ExecutionMetadata::first_attempt(execution_id("run-1")),
1096 )
1097 }
1098
1099 fn message_metadata() -> MessageMetadata {
1100 let source: crate::message::MessageEndpoint =
1101 crate::message::MessageEndpoint::new(node_id("source"), port_id("out"));
1102 let target: crate::message::MessageEndpoint =
1103 crate::message::MessageEndpoint::new(node_id("sink"), port_id("in"));
1104 let route: crate::message::MessageRoute =
1105 crate::message::MessageRoute::new(Some(source), target);
1106
1107 MessageMetadata::new(
1108 MessageId::new("msg-1").expect("valid message id"),
1109 workflow_id("flow"),
1110 ExecutionMetadata::first_attempt(execution_id("run-1")),
1111 route,
1112 )
1113 }
1114
1115 #[derive(Debug, Clone)]
1116 struct ArbitraryMetadataRecord(MetadataRecord);
1117
1118 impl Arbitrary for ArbitraryMetadataRecord {
1119 fn arbitrary(g: &mut Gen) -> Self {
1120 let record = match u8::arbitrary(g) % 6 {
1121 0 => MetadataRecord::ExecutionContext(generated_context(g)),
1122 1 => MetadataRecord::Lifecycle(LifecycleEvent::new(
1123 generated_lifecycle_kind(g),
1124 generated_context(g),
1125 )),
1126 2 => MetadataRecord::Message(MessageBoundaryRecord::new(
1127 generated_message_boundary_kind(g),
1128 generated_message_metadata(g),
1129 )),
1130 3 => MetadataRecord::QueuePressure(QueuePressureRecord::new(
1131 bool::arbitrary(g).then(|| generated_context(g)),
1132 generated_queue_port_direction(g),
1133 generated_port_id(g, "port"),
1134 generated_queue_pressure_kind(g),
1135 bounded_usize(g, 8),
1136 bool::arbitrary(g).then(|| bounded_usize(g, 256)),
1137 bool::arbitrary(g).then(|| bounded_usize(g, 256)),
1138 )),
1139 4 => MetadataRecord::Error(generated_error_metadata(g)),
1140 _ => MetadataRecord::ExternalEffect(generated_external_effect_metadata(g)),
1141 };
1142
1143 Self(record)
1144 }
1145 }
1146
1147 fn bounded_usize(g: &mut Gen, upper_exclusive: usize) -> usize {
1148 usize::arbitrary(g) % upper_exclusive
1149 }
1150
1151 fn generated_execution(g: &mut Gen) -> ExecutionMetadata {
1152 let attempt = NonZeroU32::new(u32::arbitrary(g) % 32 + 1)
1153 .expect("generated execution attempt is nonzero");
1154
1155 ExecutionMetadata::new(
1156 generated_execution_id(g, "run"),
1157 ExecutionAttempt::new(attempt),
1158 )
1159 }
1160
1161 fn generated_context(g: &mut Gen) -> NodeContext {
1162 let context = NodeContext::new(
1163 generated_workflow_id(g, "flow"),
1164 generated_node_id(g, "node"),
1165 generated_execution(g),
1166 );
1167
1168 if bool::arbitrary(g) {
1169 context.with_cancellation(CancellationRequest::new(format!(
1170 "cancelled_{}",
1171 bounded_usize(g, 16)
1172 )))
1173 } else {
1174 context
1175 }
1176 }
1177
1178 fn generated_message_metadata(g: &mut Gen) -> MessageMetadata {
1179 let source = bool::arbitrary(g).then(|| {
1180 MessageEndpoint::new(generated_node_id(g, "source"), generated_port_id(g, "out"))
1181 });
1182 let target = MessageEndpoint::new(generated_node_id(g, "sink"), generated_port_id(g, "in"));
1183
1184 MessageMetadata::new(
1185 generated_message_id(g, "msg"),
1186 generated_workflow_id(g, "flow"),
1187 generated_execution(g),
1188 MessageRoute::new(source, target),
1189 )
1190 }
1191
1192 fn generated_error_metadata(g: &mut Gen) -> ErrorMetadataRecord {
1193 match u8::arbitrary(g) % 3 {
1194 0 => ErrorMetadataRecord::node_failed(&generated_context(g), generated_error(g)),
1195 1 => ErrorMetadataRecord::workflow_failed(
1196 generated_workflow_id(g, "flow"),
1197 generated_execution(g),
1198 generated_error(g),
1199 ),
1200 _ => ErrorMetadataRecord::workflow_failed_with_diagnostic(
1201 generated_workflow_id(g, "flow"),
1202 generated_execution(g),
1203 generated_error(g),
1204 ErrorDiagnosticMetadata::workflow_deadlock(
1205 DeadlockDiagnosticMetadata::new(
1206 bounded_usize(g, 16),
1207 bounded_usize(g, 16),
1208 bounded_usize(g, 16),
1209 u64::arbitrary(g) % 60_000,
1210 generated_cycle_policy(g),
1211 )
1212 .with_terminal_counts(
1213 bounded_usize(g, 16),
1214 bounded_usize(g, 16),
1215 bounded_usize(g, 16),
1216 )
1217 .with_feedback_loop("start_all_nodes", "all_nodes_complete"),
1218 ),
1219 ),
1220 }
1221 }
1222
1223 fn generated_external_effect_metadata(g: &mut Gen) -> ExternalEffectMetadataRecord {
1224 let context = generated_context(g);
1225 let effect = generated_effect(g);
1226 let operation = format!("operation_{}", bounded_usize(g, 32));
1227 let target = format!("target_{}", bounded_usize(g, 32));
1228 match u8::arbitrary(g) % 3 {
1229 0 => ExternalEffectMetadataRecord::requested(context, effect, operation, target),
1230 1 => ExternalEffectMetadataRecord::completed(
1231 context,
1232 effect,
1233 operation,
1234 target,
1235 format!("status_{}", bounded_usize(g, 16)),
1236 ),
1237 _ => ExternalEffectMetadataRecord::failed(
1238 context,
1239 effect,
1240 operation,
1241 target,
1242 bool::arbitrary(g).then(|| format!("status_{}", bounded_usize(g, 16))),
1243 ),
1244 }
1245 }
1246
1247 fn generated_effect(g: &mut Gen) -> EffectCapability {
1248 match u8::arbitrary(g) % 8 {
1249 0 => EffectCapability::FileSystemRead,
1250 1 => EffectCapability::FileSystemWrite,
1251 2 => EffectCapability::NetworkOutbound,
1252 3 => EffectCapability::ExternalEffect,
1253 4 => EffectCapability::ProcessSpawn,
1254 5 => EffectCapability::EnvironmentRead,
1255 6 => EffectCapability::EnvironmentWrite,
1256 _ => EffectCapability::Clock,
1257 }
1258 }
1259
1260 fn generated_error(g: &mut Gen) -> crate::PureflowError {
1261 let message = format!("generated_failure_{}", bounded_usize(g, 32));
1262 match u8::arbitrary(g) % 4 {
1263 0 => crate::PureflowError::execution(message),
1264 1 => crate::PureflowError::cancelled(message),
1265 2 => crate::PureflowError::lifecycle(message),
1266 _ => crate::PureflowError::metadata(message),
1267 }
1268 }
1269
1270 fn generated_lifecycle_kind(g: &mut Gen) -> LifecycleEventKind {
1271 match u8::arbitrary(g) % 5 {
1272 0 => LifecycleEventKind::NodeScheduled,
1273 1 => LifecycleEventKind::NodeStarted,
1274 2 => LifecycleEventKind::NodeCompleted,
1275 3 => LifecycleEventKind::NodeFailed,
1276 _ => LifecycleEventKind::NodeCancelled,
1277 }
1278 }
1279
1280 fn generated_message_boundary_kind(g: &mut Gen) -> MessageBoundaryKind {
1281 match u8::arbitrary(g) % 3 {
1282 0 => MessageBoundaryKind::Enqueued,
1283 1 => MessageBoundaryKind::Dequeued,
1284 _ => MessageBoundaryKind::Dropped,
1285 }
1286 }
1287
1288 fn generated_queue_port_direction(g: &mut Gen) -> QueuePortDirection {
1289 if bool::arbitrary(g) {
1290 QueuePortDirection::Input
1291 } else {
1292 QueuePortDirection::Output
1293 }
1294 }
1295
1296 fn generated_queue_pressure_kind(g: &mut Gen) -> QueuePressureBoundaryKind {
1297 match u8::arbitrary(g) % 9 {
1298 0 => QueuePressureBoundaryKind::ReceiveAttempted,
1299 1 => QueuePressureBoundaryKind::ReceiveReady,
1300 2 => QueuePressureBoundaryKind::ReceiveEmpty,
1301 3 => QueuePressureBoundaryKind::ReceiveClosed,
1302 4 => QueuePressureBoundaryKind::ReserveAttempted,
1303 5 => QueuePressureBoundaryKind::ReserveReady,
1304 6 => QueuePressureBoundaryKind::ReserveFull,
1305 7 => QueuePressureBoundaryKind::SendCommitted,
1306 _ => QueuePressureBoundaryKind::SendDropped,
1307 }
1308 }
1309
1310 fn generated_cycle_policy(g: &mut Gen) -> &'static str {
1311 if bool::arbitrary(g) {
1312 "reject_cycles"
1313 } else {
1314 "allow_feedback_loops"
1315 }
1316 }
1317
1318 fn generated_workflow_id(g: &mut Gen, prefix: &str) -> WorkflowId {
1319 workflow_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1320 }
1321
1322 fn generated_execution_id(g: &mut Gen, prefix: &str) -> ExecutionId {
1323 execution_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1324 }
1325
1326 fn generated_message_id(g: &mut Gen, prefix: &str) -> MessageId {
1327 MessageId::new(format!("{prefix}_{}", bounded_usize(g, 64))).expect("valid message id")
1328 }
1329
1330 fn generated_node_id(g: &mut Gen, prefix: &str) -> NodeId {
1331 node_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1332 }
1333
1334 fn generated_port_id(g: &mut Gen, prefix: &str) -> PortId {
1335 port_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1336 }
1337
1338 fn top_level_kind(value: &Value) -> Option<&str> {
1339 value.as_object()?.get("kind")?.as_str()
1340 }
1341
1342 fn is_known_kind(kind: &str) -> bool {
1343 matches!(
1344 kind,
1345 "execution_context"
1346 | "node_scheduled"
1347 | "node_started"
1348 | "node_completed"
1349 | "node_failed"
1350 | "node_cancelled"
1351 | "enqueued"
1352 | "dequeued"
1353 | "dropped"
1354 | "receive_attempted"
1355 | "receive_ready"
1356 | "receive_empty"
1357 | "receive_closed"
1358 | "reserve_attempted"
1359 | "reserve_ready"
1360 | "reserve_full"
1361 | "send_committed"
1362 | "send_dropped"
1363 | "workflow_failed"
1364 | "external_effect_requested"
1365 | "external_effect_completed"
1366 | "external_effect_failed"
1367 )
1368 }
1369
1370 fn contains_reproducibility_sensitive_key(value: &Value) -> bool {
1371 match value {
1372 Value::Object(map) => map.iter().any(|(key, value)| {
1373 is_reproducibility_sensitive_key(key)
1374 || contains_reproducibility_sensitive_key(value)
1375 }),
1376 Value::Array(values) => values.iter().any(contains_reproducibility_sensitive_key),
1377 Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => false,
1378 }
1379 }
1380
1381 fn is_reproducibility_sensitive_key(key: &str) -> bool {
1382 matches!(
1383 key,
1384 "timestamp"
1385 | "wall_clock_timestamp"
1386 | "thread_id"
1387 | "process_id"
1388 | "pid"
1389 | "payload"
1390 | "payload_bytes"
1391 | "raw_payload_bytes"
1392 )
1393 }
1394
1395 #[derive(Debug, Default)]
1396 struct RecordingMetadataSink {
1397 records: Mutex<Vec<MetadataRecord>>,
1398 }
1399
1400 impl RecordingMetadataSink {
1401 fn len(&self) -> usize {
1402 self.records
1403 .lock()
1404 .expect("recording metadata sink lock should not be poisoned")
1405 .len()
1406 }
1407
1408 fn records(&self) -> Vec<MetadataRecord> {
1409 self.records
1410 .lock()
1411 .expect("recording metadata sink lock should not be poisoned")
1412 .clone()
1413 }
1414 }
1415
1416 impl MetadataSink for RecordingMetadataSink {
1417 fn record(&self, record: &MetadataRecord) -> Result<()> {
1418 self.records
1419 .lock()
1420 .expect("recording metadata sink lock should not be poisoned")
1421 .push(record.clone());
1422 Ok(())
1423 }
1424 }
1425
1426 impl MetadataSink for Arc<RecordingMetadataSink> {
1427 fn record(&self, record: &MetadataRecord) -> Result<()> {
1428 self.as_ref().record(record)
1429 }
1430 }
1431
1432 #[test]
1433 fn generated_metadata_json_preserves_stable_machine_contract() {
1434 #[allow(clippy::needless_pass_by_value)]
1435 fn property(record: ArbitraryMetadataRecord) -> bool {
1436 let value = metadata_record_to_json_value(&record.0);
1437
1438 top_level_kind(&value).is_some_and(is_known_kind)
1439 && !contains_reproducibility_sensitive_key(&value)
1440 }
1441
1442 QuickCheck::new()
1443 .tests(128)
1444 .quickcheck(property as fn(ArbitraryMetadataRecord) -> bool);
1445 }
1446
1447 #[test]
1448 fn metadata_record_keeps_context_shape_intact() {
1449 let record: MetadataRecord = MetadataRecord::ExecutionContext(context());
1450
1451 assert!(matches!(
1452 record,
1453 MetadataRecord::ExecutionContext(ctx) if ctx.node_id().as_str() == "node"
1454 ));
1455 }
1456
1457 #[test]
1458 fn noop_metadata_sink_accepts_records() {
1459 let record: MetadataRecord = MetadataRecord::ExecutionContext(context());
1460
1461 NoopMetadataSink
1462 .record(&record)
1463 .expect("noop metadata sink should accept records");
1464 }
1465
1466 #[test]
1467 fn tiered_metadata_sink_records_control_records_by_default() {
1468 let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1469 let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> =
1470 TieredMetadataSink::new(inner.clone());
1471 let record: MetadataRecord = MetadataRecord::Lifecycle(LifecycleEvent::new(
1472 LifecycleEventKind::NodeStarted,
1473 context(),
1474 ));
1475
1476 sink.record(&record)
1477 .expect("control metadata should pass through");
1478
1479 assert_eq!(inner.records(), vec![record]);
1480 }
1481
1482 #[test]
1483 fn tiered_metadata_sink_drops_data_and_high_cost_records_by_default() {
1484 let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1485 let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> =
1486 TieredMetadataSink::new(inner.clone());
1487 let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1488 MessageBoundaryKind::Enqueued,
1489 message_metadata(),
1490 ));
1491
1492 sink.record_with_tier(MetadataTier::Data, &record)
1493 .expect("dropped data metadata should be accepted");
1494 sink.record_with_tier(MetadataTier::HighCostData, &record)
1495 .expect("dropped high-cost metadata should be accepted");
1496
1497 assert_eq!(inner.len(), 0);
1498 }
1499
1500 #[test]
1501 fn tiered_metadata_sink_samples_data_records() {
1502 let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1503 let policy: TieredMetadataPolicy =
1504 TieredMetadataPolicy::sample_data_every(NonZeroUsize::new(2).expect("nonzero"));
1505 let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> =
1506 TieredMetadataSink::with_policy(inner.clone(), policy);
1507 let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1508 MessageBoundaryKind::Dequeued,
1509 message_metadata(),
1510 ));
1511
1512 sink.record_with_tier(MetadataTier::Data, &record)
1513 .expect("first sampled data metadata should pass through");
1514 sink.record_with_tier(MetadataTier::Data, &record)
1515 .expect("second sampled data metadata should be dropped");
1516 sink.record_with_tier(MetadataTier::Data, &record)
1517 .expect("third sampled data metadata should pass through");
1518
1519 assert_eq!(inner.records(), vec![record.clone(), record]);
1520 }
1521
1522 #[test]
1523 fn tiered_metadata_policy_can_enable_high_cost_records() {
1524 let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1525 let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> = TieredMetadataSink::with_policy(
1526 inner.clone(),
1527 TieredMetadataPolicy::control_only().with_high_cost_data(),
1528 );
1529 let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1530 MessageBoundaryKind::Dropped,
1531 message_metadata(),
1532 ));
1533
1534 sink.record_with_tier(MetadataTier::HighCostData, &record)
1535 .expect("enabled high-cost metadata should pass through");
1536
1537 assert_eq!(inner.records(), vec![record]);
1538 }
1539
1540 #[test]
1541 fn message_boundary_record_keeps_shape_intact() {
1542 let target: crate::message::MessageEndpoint =
1543 crate::message::MessageEndpoint::new(node_id("sink"), port_id("in"));
1544 let route: crate::message::MessageRoute = crate::message::MessageRoute::new(None, target);
1545 let metadata: MessageMetadata = MessageMetadata::new(
1546 MessageId::new("msg-1").expect("valid message id"),
1547 workflow_id("flow"),
1548 ExecutionMetadata::first_attempt(execution_id("run-1")),
1549 route,
1550 );
1551 let record: MessageBoundaryRecord =
1552 MessageBoundaryRecord::new(MessageBoundaryKind::Enqueued, metadata);
1553
1554 assert!(matches!(
1555 record,
1556 MessageBoundaryRecord {
1557 kind: MessageBoundaryKind::Enqueued,
1558 ..
1559 }
1560 ));
1561 }
1562
1563 #[test]
1564 fn metadata_record_json_uses_stable_queue_pressure_shape() {
1565 let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
1566 Some(context()),
1567 QueuePortDirection::Input,
1568 port_id("in"),
1569 QueuePressureBoundaryKind::ReceiveReady,
1570 2,
1571 Some(8),
1572 Some(3),
1573 ));
1574
1575 assert_eq!(
1576 metadata_record_to_json_value(&record),
1577 json!({
1578 "record_type": "queue_pressure",
1579 "kind": "receive_ready",
1580 "direction": "input",
1581 "port_id": "in",
1582 "context": {
1583 "workflow_id": "flow",
1584 "node_id": "node",
1585 "execution": {
1586 "execution_id": "run-1",
1587 "attempt": 1,
1588 },
1589 "cancellation": {
1590 "state": "active",
1591 },
1592 },
1593 "connected_edge_count": 2,
1594 "capacity": 8,
1595 "queued_count": 3,
1596 })
1597 );
1598 }
1599
1600 #[test]
1601 fn metadata_record_json_uses_stable_message_shape() {
1602 let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1603 MessageBoundaryKind::Dequeued,
1604 message_metadata(),
1605 ));
1606
1607 assert_eq!(
1608 metadata_record_to_json_value(&record),
1609 json!({
1610 "record_type": "message",
1611 "kind": "dequeued",
1612 "message": {
1613 "message_id": "msg-1",
1614 "workflow_id": "flow",
1615 "execution": {
1616 "execution_id": "run-1",
1617 "attempt": 1,
1618 },
1619 "route": {
1620 "source": {
1621 "node_id": "source",
1622 "port_id": "out",
1623 },
1624 "target": {
1625 "node_id": "sink",
1626 "port_id": "in",
1627 },
1628 },
1629 },
1630 })
1631 );
1632 }
1633
1634 #[test]
1635 fn metadata_record_json_uses_stable_external_effect_shape() {
1636 let record: MetadataRecord =
1637 MetadataRecord::ExternalEffect(ExternalEffectMetadataRecord::completed(
1638 context(),
1639 EffectCapability::ExternalEffect,
1640 "tool_call",
1641 "get_weather",
1642 "ok",
1643 ));
1644
1645 assert_eq!(
1646 metadata_record_to_json_value(&record),
1647 json!({
1648 "record_type": "external_effect",
1649 "kind": "external_effect_completed",
1650 "context": {
1651 "workflow_id": "flow",
1652 "node_id": "node",
1653 "execution": {
1654 "execution_id": "run-1",
1655 "attempt": 1,
1656 },
1657 "cancellation": {
1658 "state": "active",
1659 },
1660 },
1661 "effect": "external_effect",
1662 "operation": "tool_call",
1663 "target": "get_weather",
1664 "response_status": "ok",
1665 })
1666 );
1667 }
1668
1669 #[test]
1670 fn metadata_record_json_uses_stable_error_shape() {
1671 let record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::node_failed(
1672 &context(),
1673 crate::PureflowError::execution("executor failed"),
1674 ));
1675
1676 assert_eq!(
1677 metadata_record_to_json_value(&record),
1678 json!({
1679 "record_type": "error",
1680 "kind": "node_failed",
1681 "workflow_id": "flow",
1682 "node_id": "node",
1683 "execution": {
1684 "execution_id": "run-1",
1685 "attempt": 1,
1686 },
1687 "error": {
1688 "code": "CDT-EXEC-001",
1689 "message": "CDT-EXEC-001: node execution failed: executor failed",
1690 "visibility": "internal",
1691 "retry_disposition": "unknown",
1692 },
1693 "diagnostic": null,
1694 })
1695 );
1696 }
1697
1698 #[test]
1699 fn metadata_record_json_uses_stable_deadlock_diagnostic_shape() {
1700 let diagnostic: ErrorDiagnosticMetadata = ErrorDiagnosticMetadata::workflow_deadlock(
1701 DeadlockDiagnosticMetadata::new(2, 2, 2, 1, "allow_feedback_loops")
1702 .with_feedback_loop("start_all_nodes", "all_nodes_complete"),
1703 );
1704 let record: MetadataRecord =
1705 MetadataRecord::Error(ErrorMetadataRecord::workflow_failed_with_diagnostic(
1706 workflow_id("flow"),
1707 ExecutionMetadata::first_attempt(execution_id("run-1")),
1708 crate::PureflowError::execution("watchdog detected no workflow progress"),
1709 diagnostic,
1710 ));
1711
1712 assert_eq!(
1713 metadata_record_to_json_value(&record),
1714 json!({
1715 "record_type": "error",
1716 "kind": "workflow_failed",
1717 "workflow_id": "flow",
1718 "node_id": null,
1719 "execution": {
1720 "execution_id": "run-1",
1721 "attempt": 1,
1722 },
1723 "error": {
1724 "code": "CDT-EXEC-001",
1725 "message": "CDT-EXEC-001: node execution failed: watchdog detected no workflow progress",
1726 "visibility": "internal",
1727 "retry_disposition": "unknown",
1728 },
1729 "diagnostic": {
1730 "type": "workflow_deadlock",
1731 "scheduled_node_count": 2,
1732 "pending_node_count": 2,
1733 "completed_node_count": 0,
1734 "failed_node_count": 0,
1735 "cancelled_node_count": 0,
1736 "bounded_edge_count": 2,
1737 "no_progress_timeout_ms": 1,
1738 "cycle_policy": "allow_feedback_loops",
1739 "feedback_loop_startup": "start_all_nodes",
1740 "feedback_loop_termination": "all_nodes_complete",
1741 },
1742 })
1743 );
1744 }
1745
1746 #[test]
1747 fn workflow_error_metadata_has_no_node_id() {
1748 let record: ErrorMetadataRecord = ErrorMetadataRecord::workflow_failed(
1749 workflow_id("flow"),
1750 ExecutionMetadata::first_attempt(execution_id("run-1")),
1751 crate::PureflowError::cancelled("shutdown"),
1752 );
1753
1754 assert_eq!(record.kind(), ErrorMetadataKind::WorkflowFailed);
1755 assert!(record.node_id().is_none());
1756 assert!(record.diagnostic().is_none());
1757 }
1758
1759 #[test]
1760 fn jsonl_metadata_sink_writes_reproducible_lines() {
1761 let sink: JsonlMetadataSink<Vec<u8>> = JsonlMetadataSink::new(Vec::new());
1762 let record: MetadataRecord = MetadataRecord::Lifecycle(LifecycleEvent::new(
1763 LifecycleEventKind::NodeStarted,
1764 context(),
1765 ));
1766
1767 sink.record(&record)
1768 .expect("first metadata record should write");
1769 sink.record(&record)
1770 .expect("second metadata record should write");
1771 let output: String = String::from_utf8(sink.into_inner().expect("writer should return"))
1772 .expect("JSONL should be UTF-8");
1773 let mut lines = output.lines();
1774 let first = lines.next().expect("first JSONL line should exist");
1775 let second = lines.next().expect("second JSONL line should exist");
1776
1777 assert_eq!(first, second);
1778 assert!(lines.next().is_none());
1779 }
1780
1781 struct FailingWriter;
1782
1783 impl Write for FailingWriter {
1784 fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
1785 Err(io::Error::other("write failed"))
1786 }
1787
1788 fn flush(&mut self) -> io::Result<()> {
1789 Ok(())
1790 }
1791 }
1792
1793 #[test]
1794 fn jsonl_metadata_sink_maps_writer_failures() {
1795 let sink: JsonlMetadataSink<FailingWriter> = JsonlMetadataSink::new(FailingWriter);
1796 let record: MetadataRecord = MetadataRecord::ExecutionContext(context());
1797 let err: crate::PureflowError = sink
1798 .record(&record)
1799 .expect_err("writer failure should surface");
1800
1801 assert_eq!(err.code(), crate::ErrorCode::MetadataCollectionFailed);
1802 assert!(err.to_string().contains("failed to encode"));
1803 }
1804}