Skip to main content

pureflow_core/
metadata.rs

1//! Runtime metadata collection boundary.
2//!
3//! ## Fragment: metadata-collection-boundary
4//!
5//! Metadata remains split by source type: context metadata identifies an
6//! execution attempt, message metadata travels with payloads, and lifecycle
7//! metadata records runtime transitions. Message boundary records describe
8//! send, receive, and drop observations at the port seam. The sink below is
9//! only the collection seam. It lets runtime code report those existing facts
10//! without collapsing them into a premature storage, tracing, or graph model.
11
12use std::{
13    fs::File,
14    io::{BufWriter, Write},
15    num::NonZeroUsize,
16    path::Path,
17    sync::{Mutex, MutexGuard, PoisonError},
18};
19
20use serde_json::{Value, json};
21
22use pureflow_types::{NodeId, PortId, WorkflowId};
23
24use crate::{
25    PureflowError, Result,
26    capability::EffectCapability,
27    context::{CancellationState, ExecutionMetadata, NodeContext},
28    lifecycle::{LifecycleEvent, LifecycleEventKind},
29    message::{MessageEndpoint, MessageMetadata, MessageRoute},
30};
31
32/// Where a message was observed at the port boundary.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum MessageBoundaryKind {
35    /// The message entered an output boundary.
36    Enqueued,
37    /// The message left an input boundary.
38    Dequeued,
39    /// The message was dropped at an output boundary.
40    Dropped,
41}
42
43/// One message observation at a runtime boundary.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct MessageBoundaryRecord {
46    kind: MessageBoundaryKind,
47    metadata: MessageMetadata,
48}
49
50impl MessageBoundaryRecord {
51    /// Create a message boundary observation.
52    #[must_use]
53    pub const fn new(kind: MessageBoundaryKind, metadata: MessageMetadata) -> Self {
54        Self { kind, metadata }
55    }
56
57    /// Kind of port-boundary observation.
58    #[must_use]
59    pub const fn kind(&self) -> MessageBoundaryKind {
60        self.kind
61    }
62
63    /// Message metadata observed at the boundary.
64    #[must_use]
65    pub const fn metadata(&self) -> &MessageMetadata {
66        &self.metadata
67    }
68}
69
70/// Direction of a queue observation relative to the current node.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum QueuePortDirection {
73    /// Input-side queue observation.
74    Input,
75    /// Output-side queue observation.
76    Output,
77}
78
79/// Where queue pressure was observed at the port boundary.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum QueuePressureBoundaryKind {
82    /// A receive operation is about to inspect or wait on an input queue.
83    ReceiveAttempted,
84    /// A receive operation dequeued a packet.
85    ReceiveReady,
86    /// A receive operation found no currently available packet.
87    ReceiveEmpty,
88    /// A receive operation observed upstream closure.
89    ReceiveClosed,
90    /// An output reserve operation is about to inspect or wait on capacity.
91    ReserveAttempted,
92    /// An output reserve operation acquired capacity.
93    ReserveReady,
94    /// An output reserve operation found all connected capacity full.
95    ReserveFull,
96    /// A reserved output packet committed to the output boundary.
97    SendCommitted,
98    /// A committed output packet had no downstream edge and was dropped.
99    SendDropped,
100}
101
102/// One queue pressure or capacity observation at a port boundary.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct QueuePressureRecord {
105    context: Option<NodeContext>,
106    direction: QueuePortDirection,
107    port_id: PortId,
108    kind: QueuePressureBoundaryKind,
109    connected_edge_count: usize,
110    capacity: Option<usize>,
111    queued_count: Option<usize>,
112}
113
114impl QueuePressureRecord {
115    /// Create a queue pressure observation.
116    #[must_use]
117    pub const fn new(
118        context: Option<NodeContext>,
119        direction: QueuePortDirection,
120        port_id: PortId,
121        kind: QueuePressureBoundaryKind,
122        connected_edge_count: usize,
123        capacity: Option<usize>,
124        queued_count: Option<usize>,
125    ) -> Self {
126        Self {
127            context,
128            direction,
129            port_id,
130            kind,
131            connected_edge_count,
132            capacity,
133            queued_count,
134        }
135    }
136
137    /// Runtime context for the node that observed queue pressure, when known.
138    #[must_use]
139    pub const fn context(&self) -> Option<&NodeContext> {
140        self.context.as_ref()
141    }
142
143    /// Direction of the observed port.
144    #[must_use]
145    pub const fn direction(&self) -> QueuePortDirection {
146        self.direction
147    }
148
149    /// Declared port identifier observed.
150    #[must_use]
151    pub const fn port_id(&self) -> &PortId {
152        &self.port_id
153    }
154
155    /// Boundary kind observed.
156    #[must_use]
157    pub const fn kind(&self) -> QueuePressureBoundaryKind {
158        self.kind
159    }
160
161    /// Number of connected graph edges behind this declared port.
162    #[must_use]
163    pub const fn connected_edge_count(&self) -> usize {
164        self.connected_edge_count
165    }
166
167    /// Total known bounded capacity across connected edges, when connected.
168    #[must_use]
169    pub const fn capacity(&self) -> Option<usize> {
170        self.capacity
171    }
172
173    /// Total currently queued packets across connected input edges, when observable.
174    #[must_use]
175    pub const fn queued_count(&self) -> Option<usize> {
176        self.queued_count
177    }
178}
179
180/// Scope where an error was observed by the runtime.
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182pub enum ErrorMetadataKind {
183    /// A node execution boundary returned an error.
184    NodeFailed,
185    /// A workflow run observed a terminal error.
186    WorkflowFailed,
187}
188
189/// One structured error observation at a runtime boundary.
190#[derive(Debug, Clone, PartialEq, Eq)]
191pub struct ErrorMetadataRecord {
192    kind: ErrorMetadataKind,
193    workflow_id: WorkflowId,
194    node_id: Option<NodeId>,
195    execution: ExecutionMetadata,
196    error: PureflowError,
197    diagnostic: Option<ErrorDiagnosticMetadata>,
198}
199
200impl ErrorMetadataRecord {
201    /// Create a node failure observation.
202    #[must_use]
203    pub fn node_failed(context: &NodeContext, error: PureflowError) -> Self {
204        Self {
205            kind: ErrorMetadataKind::NodeFailed,
206            workflow_id: context.workflow_id().clone(),
207            node_id: Some(context.node_id().clone()),
208            execution: context.execution().clone(),
209            error,
210            diagnostic: None,
211        }
212    }
213
214    /// Create a workflow failure observation.
215    #[must_use]
216    pub const fn workflow_failed(
217        workflow_id: WorkflowId,
218        execution: ExecutionMetadata,
219        error: PureflowError,
220    ) -> Self {
221        Self {
222            kind: ErrorMetadataKind::WorkflowFailed,
223            workflow_id,
224            node_id: None,
225            execution,
226            error,
227            diagnostic: None,
228        }
229    }
230
231    /// Create a workflow failure observation with structured diagnostic data.
232    #[must_use]
233    pub const fn workflow_failed_with_diagnostic(
234        workflow_id: WorkflowId,
235        execution: ExecutionMetadata,
236        error: PureflowError,
237        diagnostic: ErrorDiagnosticMetadata,
238    ) -> Self {
239        Self {
240            kind: ErrorMetadataKind::WorkflowFailed,
241            workflow_id,
242            node_id: None,
243            execution,
244            error,
245            diagnostic: Some(diagnostic),
246        }
247    }
248
249    /// Error observation scope.
250    #[must_use]
251    pub const fn kind(&self) -> ErrorMetadataKind {
252        self.kind
253    }
254
255    /// Workflow associated with this error.
256    #[must_use]
257    pub const fn workflow_id(&self) -> &WorkflowId {
258        &self.workflow_id
259    }
260
261    /// Node associated with this error, when the error came from a node.
262    #[must_use]
263    pub const fn node_id(&self) -> Option<&NodeId> {
264        self.node_id.as_ref()
265    }
266
267    /// Execution attempt associated with this error.
268    #[must_use]
269    pub const fn execution(&self) -> &ExecutionMetadata {
270        &self.execution
271    }
272
273    /// Structured Pureflow error observed.
274    #[must_use]
275    pub const fn error(&self) -> &PureflowError {
276        &self.error
277    }
278
279    /// Structured diagnostic payload associated with this error, when present.
280    #[must_use]
281    pub const fn diagnostic(&self) -> Option<&ErrorDiagnosticMetadata> {
282        self.diagnostic.as_ref()
283    }
284}
285
286/// Structured diagnostic payload attached to an error metadata record.
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub enum ErrorDiagnosticMetadata {
289    /// Workflow execution stopped making progress before the watchdog deadline.
290    WorkflowDeadlock(DeadlockDiagnosticMetadata),
291}
292
293impl ErrorDiagnosticMetadata {
294    /// Create workflow deadlock diagnostic metadata.
295    #[must_use]
296    pub const fn workflow_deadlock(diagnostic: DeadlockDiagnosticMetadata) -> Self {
297        Self::WorkflowDeadlock(diagnostic)
298    }
299}
300
301/// External effect observation kind at a node boundary.
302#[derive(Debug, Clone, Copy, PartialEq, Eq)]
303pub enum ExternalEffectMetadataKind {
304    /// A node requested an external effect.
305    Requested,
306    /// A node completed an external effect.
307    Completed,
308    /// A node observed an external effect failure.
309    Failed,
310}
311
312/// One structured external effect observation at a node boundary.
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub struct ExternalEffectMetadataRecord {
315    kind: ExternalEffectMetadataKind,
316    context: NodeContext,
317    effect: EffectCapability,
318    operation: String,
319    target: String,
320    response_status: Option<String>,
321}
322
323impl ExternalEffectMetadataRecord {
324    /// Create an external effect observation.
325    #[must_use]
326    pub fn new(
327        kind: ExternalEffectMetadataKind,
328        context: NodeContext,
329        effect: EffectCapability,
330        operation: impl Into<String>,
331        target: impl Into<String>,
332        response_status: Option<String>,
333    ) -> Self {
334        Self {
335            kind,
336            context,
337            effect,
338            operation: operation.into(),
339            target: target.into(),
340            response_status,
341        }
342    }
343
344    /// Create an external effect request observation.
345    #[must_use]
346    pub fn requested(
347        context: NodeContext,
348        effect: EffectCapability,
349        operation: impl Into<String>,
350        target: impl Into<String>,
351    ) -> Self {
352        Self::new(
353            ExternalEffectMetadataKind::Requested,
354            context,
355            effect,
356            operation,
357            target,
358            None,
359        )
360    }
361
362    /// Create a completed external effect observation.
363    #[must_use]
364    pub fn completed(
365        context: NodeContext,
366        effect: EffectCapability,
367        operation: impl Into<String>,
368        target: impl Into<String>,
369        response_status: impl Into<String>,
370    ) -> Self {
371        Self::new(
372            ExternalEffectMetadataKind::Completed,
373            context,
374            effect,
375            operation,
376            target,
377            Some(response_status.into()),
378        )
379    }
380
381    /// Create a failed external effect observation.
382    #[must_use]
383    pub fn failed(
384        context: NodeContext,
385        effect: EffectCapability,
386        operation: impl Into<String>,
387        target: impl Into<String>,
388        response_status: Option<String>,
389    ) -> Self {
390        Self::new(
391            ExternalEffectMetadataKind::Failed,
392            context,
393            effect,
394            operation,
395            target,
396            response_status,
397        )
398    }
399
400    /// External effect observation kind.
401    #[must_use]
402    pub const fn kind(&self) -> ExternalEffectMetadataKind {
403        self.kind
404    }
405
406    /// Runtime context for the node that observed the external effect.
407    #[must_use]
408    pub const fn context(&self) -> &NodeContext {
409        &self.context
410    }
411
412    /// Declared effect capability associated with this observation.
413    #[must_use]
414    pub const fn effect(&self) -> EffectCapability {
415        self.effect
416    }
417
418    /// Operation or tool name associated with this observation.
419    #[must_use]
420    pub fn operation(&self) -> &str {
421        &self.operation
422    }
423
424    /// External target associated with this observation.
425    #[must_use]
426    pub fn target(&self) -> &str {
427        &self.target
428    }
429
430    /// Stable response status associated with this observation, when known.
431    #[must_use]
432    pub fn response_status(&self) -> Option<&str> {
433        self.response_status.as_deref()
434    }
435}
436
437/// Machine-facing workflow deadlock diagnostic metadata.
438#[derive(Debug, Clone, PartialEq, Eq)]
439pub struct DeadlockDiagnosticMetadata {
440    scheduled_node_count: usize,
441    pending_node_count: usize,
442    completed_node_count: usize,
443    failed_node_count: usize,
444    cancelled_node_count: usize,
445    bounded_edge_count: usize,
446    no_progress_timeout_ms: u64,
447    cycle_policy: &'static str,
448    feedback_loop_startup: Option<&'static str>,
449    feedback_loop_termination: Option<&'static str>,
450}
451
452impl DeadlockDiagnosticMetadata {
453    /// Create workflow deadlock diagnostic metadata.
454    #[must_use]
455    pub const fn new(
456        scheduled_node_count: usize,
457        pending_node_count: usize,
458        bounded_edge_count: usize,
459        no_progress_timeout_ms: u64,
460        cycle_policy: &'static str,
461    ) -> Self {
462        Self {
463            scheduled_node_count,
464            pending_node_count,
465            completed_node_count: 0,
466            failed_node_count: 0,
467            cancelled_node_count: 0,
468            bounded_edge_count,
469            no_progress_timeout_ms,
470            cycle_policy,
471            feedback_loop_startup: None,
472            feedback_loop_termination: None,
473        }
474    }
475
476    /// Attach terminal counts observed before the watchdog fired.
477    #[must_use]
478    pub const fn with_terminal_counts(
479        mut self,
480        completed_node_count: usize,
481        failed_node_count: usize,
482        cancelled_node_count: usize,
483    ) -> Self {
484        self.completed_node_count = completed_node_count;
485        self.failed_node_count = failed_node_count;
486        self.cancelled_node_count = cancelled_node_count;
487        self
488    }
489
490    /// Attach feedback-loop policy details.
491    #[must_use]
492    pub const fn with_feedback_loop(
493        mut self,
494        startup: &'static str,
495        termination: &'static str,
496    ) -> Self {
497        self.feedback_loop_startup = Some(startup);
498        self.feedback_loop_termination = Some(termination);
499        self
500    }
501
502    /// Nodes scheduled for the run.
503    #[must_use]
504    pub const fn scheduled_node_count(&self) -> usize {
505        self.scheduled_node_count
506    }
507
508    /// Nodes still pending when the watchdog fired.
509    #[must_use]
510    pub const fn pending_node_count(&self) -> usize {
511        self.pending_node_count
512    }
513
514    /// Nodes completed before the watchdog fired.
515    #[must_use]
516    pub const fn completed_node_count(&self) -> usize {
517        self.completed_node_count
518    }
519
520    /// Nodes failed before the watchdog fired.
521    #[must_use]
522    pub const fn failed_node_count(&self) -> usize {
523        self.failed_node_count
524    }
525
526    /// Nodes cancelled before the watchdog fired.
527    #[must_use]
528    pub const fn cancelled_node_count(&self) -> usize {
529        self.cancelled_node_count
530    }
531
532    /// Bounded graph edges in the workflow.
533    #[must_use]
534    pub const fn bounded_edge_count(&self) -> usize {
535        self.bounded_edge_count
536    }
537
538    /// No-progress interval in milliseconds.
539    #[must_use]
540    pub const fn no_progress_timeout_ms(&self) -> u64 {
541        self.no_progress_timeout_ms
542    }
543
544    /// Cycle policy active when the watchdog fired.
545    #[must_use]
546    pub const fn cycle_policy(&self) -> &'static str {
547        self.cycle_policy
548    }
549
550    /// Feedback-loop startup policy, when cycles were explicitly allowed.
551    #[must_use]
552    pub const fn feedback_loop_startup(&self) -> Option<&'static str> {
553        self.feedback_loop_startup
554    }
555
556    /// Feedback-loop termination policy, when cycles were explicitly allowed.
557    #[must_use]
558    pub const fn feedback_loop_termination(&self) -> Option<&'static str> {
559        self.feedback_loop_termination
560    }
561}
562
563/// One metadata fact observed at a runtime boundary.
564#[derive(Debug, Clone, PartialEq, Eq)]
565pub enum MetadataRecord {
566    /// Execution context metadata for one node boundary.
567    ExecutionContext(NodeContext),
568    /// Lifecycle transition emitted by the runtime.
569    Lifecycle(LifecycleEvent),
570    /// Message metadata observed at a port boundary.
571    Message(MessageBoundaryRecord),
572    /// Queue pressure or capacity observed at a port boundary.
573    QueuePressure(QueuePressureRecord),
574    /// Error observed at a node or workflow boundary.
575    Error(ErrorMetadataRecord),
576    /// External tool, service, database, or API effect observed by a node.
577    ExternalEffect(ExternalEffectMetadataRecord),
578}
579
580/// Cost tier for one metadata record.
581#[derive(Debug, Clone, Copy, PartialEq, Eq)]
582pub enum MetadataTier {
583    /// Control-plane facts that are cheap and useful for diagnostics.
584    Control,
585    /// Data-tier facts that may be sampled to bound metadata volume.
586    Data,
587    /// High-cost data-tier facts such as payload bytes or Arrow buffer detail.
588    HighCostData,
589}
590
591/// Policy used by a tiered metadata sink.
592#[derive(Debug, Clone, Copy, PartialEq, Eq)]
593pub struct TieredMetadataPolicy {
594    data_sample_rate: Option<NonZeroUsize>,
595    record_high_cost_data: bool,
596}
597
598impl TieredMetadataPolicy {
599    /// Record control facts only.
600    #[must_use]
601    pub const fn control_only() -> Self {
602        Self {
603            data_sample_rate: None,
604            record_high_cost_data: false,
605        }
606    }
607
608    /// Record every data-tier fact while still dropping high-cost data facts.
609    #[must_use]
610    pub const fn record_data() -> Self {
611        Self {
612            data_sample_rate: Some(NonZeroUsize::MIN),
613            record_high_cost_data: false,
614        }
615    }
616
617    /// Record one data-tier fact every `sample_rate` observations.
618    #[must_use]
619    pub const fn sample_data_every(sample_rate: NonZeroUsize) -> Self {
620        Self {
621            data_sample_rate: Some(sample_rate),
622            record_high_cost_data: false,
623        }
624    }
625
626    /// Allow high-cost data-tier facts to pass through.
627    #[must_use]
628    pub const fn with_high_cost_data(mut self) -> Self {
629        self.record_high_cost_data = true;
630        self
631    }
632
633    fn should_record_data(self, ordinal: usize) -> bool {
634        self.data_sample_rate
635            .is_some_and(|sample_rate: NonZeroUsize| ordinal.is_multiple_of(sample_rate.get()))
636    }
637
638    const fn should_record_high_cost_data(self) -> bool {
639        self.record_high_cost_data
640    }
641}
642
643impl Default for TieredMetadataPolicy {
644    fn default() -> Self {
645        Self::control_only()
646    }
647}
648
649/// Convert one metadata record into the stable JSON shape used by JSONL sinks.
650///
651/// The projection intentionally omits wall-clock timestamps and process-local
652/// facts so repeated runs with the same runtime facts can produce identical
653/// log lines.
654#[must_use]
655pub fn metadata_record_to_json_value(record: &MetadataRecord) -> Value {
656    match record {
657        MetadataRecord::ExecutionContext(context) => json!({
658            "record_type": "execution_context",
659            "kind": "execution_context",
660            "context": node_context_to_json_value(context),
661        }),
662        MetadataRecord::Lifecycle(event) => json!({
663            "record_type": "lifecycle",
664            "kind": lifecycle_event_kind_label(event.kind()),
665            "context": node_context_to_json_value(event.context()),
666        }),
667        MetadataRecord::Message(message) => json!({
668            "record_type": "message",
669            "kind": message_boundary_kind_label(message.kind()),
670            "message": message_metadata_to_json_value(message.metadata()),
671        }),
672        MetadataRecord::QueuePressure(queue) => json!({
673            "record_type": "queue_pressure",
674            "kind": queue_pressure_boundary_kind_label(queue.kind()),
675            "direction": queue_port_direction_label(queue.direction()),
676            "port_id": queue.port_id().as_str(),
677            "context": queue
678                .context()
679                .map_or(Value::Null, node_context_to_json_value),
680            "connected_edge_count": queue.connected_edge_count(),
681            "capacity": queue.capacity(),
682            "queued_count": queue.queued_count(),
683        }),
684        MetadataRecord::Error(error) => json!({
685            "record_type": "error",
686            "kind": error_metadata_kind_label(error.kind()),
687            "workflow_id": error.workflow_id().as_str(),
688            "node_id": error
689                .node_id()
690                .map_or(Value::Null, |node_id: &NodeId| json!(node_id.as_str())),
691            "execution": execution_metadata_to_json_value(error.execution()),
692            "error": pureflow_error_to_json_value(error.error()),
693            "diagnostic": error
694                .diagnostic()
695                .map_or(Value::Null, error_diagnostic_metadata_to_json_value),
696        }),
697        MetadataRecord::ExternalEffect(effect) => json!({
698            "record_type": "external_effect",
699            "kind": external_effect_metadata_kind_label(effect.kind()),
700            "context": node_context_to_json_value(effect.context()),
701            "effect": effect.effect().as_str(),
702            "operation": effect.operation(),
703            "target": effect.target(),
704            "response_status": effect.response_status(),
705        }),
706    }
707}
708
709/// Collection sink for runtime metadata records.
710pub trait MetadataSink: Send + Sync {
711    /// Record one metadata fact.
712    ///
713    /// # Errors
714    ///
715    /// Returns an error when the sink cannot preserve or forward the record.
716    fn record(&self, record: &MetadataRecord) -> Result<()>;
717}
718
719/// Metadata sink that intentionally records nothing.
720#[derive(Debug, Clone, Copy, Default)]
721pub struct NoopMetadataSink;
722
723impl MetadataSink for NoopMetadataSink {
724    fn record(&self, _record: &MetadataRecord) -> Result<()> {
725        Ok(())
726    }
727}
728
729/// Metadata sink adapter that applies a cost-tier policy before forwarding.
730#[derive(Debug)]
731pub struct TieredMetadataSink<S> {
732    inner: S,
733    policy: TieredMetadataPolicy,
734    counters: Mutex<TieredMetadataCounters>,
735}
736
737#[derive(Debug, Default)]
738struct TieredMetadataCounters {
739    data_seen: usize,
740}
741
742impl<S> TieredMetadataSink<S> {
743    /// Wrap a sink with the default control-only metadata policy.
744    #[must_use]
745    pub const fn new(inner: S) -> Self {
746        Self::with_policy(inner, TieredMetadataPolicy::control_only())
747    }
748
749    /// Wrap a sink with an explicit tiered metadata policy.
750    #[must_use]
751    pub const fn with_policy(inner: S, policy: TieredMetadataPolicy) -> Self {
752        Self {
753            inner,
754            policy,
755            counters: Mutex::new(TieredMetadataCounters { data_seen: 0 }),
756        }
757    }
758
759    /// Return the configured tiered metadata policy.
760    #[must_use]
761    pub const fn policy(&self) -> TieredMetadataPolicy {
762        self.policy
763    }
764
765    /// Return the wrapped sink.
766    #[must_use]
767    pub fn into_inner(self) -> S {
768        self.inner
769    }
770
771    fn should_record(&self, tier: MetadataTier) -> Result<bool> {
772        match tier {
773            MetadataTier::Control => Ok(true),
774            MetadataTier::Data => {
775                let ordinal: usize = {
776                    let mut counters: MutexGuard<'_, TieredMetadataCounters> =
777                        self.counters.lock().map_err(
778                            |_err: PoisonError<MutexGuard<'_, TieredMetadataCounters>>| {
779                                tiered_lock_error()
780                            },
781                        )?;
782                    let ordinal: usize = counters.data_seen;
783                    counters.data_seen = counters.data_seen.saturating_add(1);
784                    ordinal
785                };
786                Ok(self.policy.should_record_data(ordinal))
787            }
788            MetadataTier::HighCostData => Ok(self.policy.should_record_high_cost_data()),
789        }
790    }
791}
792
793impl<S> TieredMetadataSink<S>
794where
795    S: MetadataSink,
796{
797    /// Record one metadata fact with an explicit cost tier.
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the tier policy state cannot be read or if the
802    /// wrapped sink rejects a record selected by the policy.
803    pub fn record_with_tier(&self, tier: MetadataTier, record: &MetadataRecord) -> Result<()> {
804        if self.should_record(tier)? {
805            self.inner.record(record)
806        } else {
807            Ok(())
808        }
809    }
810}
811
812impl<S> MetadataSink for TieredMetadataSink<S>
813where
814    S: MetadataSink,
815{
816    fn record(&self, record: &MetadataRecord) -> Result<()> {
817        self.record_with_tier(MetadataTier::Control, record)
818    }
819}
820
821/// Metadata sink that writes one stable JSON object per line.
822#[derive(Debug)]
823pub struct JsonlMetadataSink<W> {
824    writer: Mutex<W>,
825}
826
827impl<W> JsonlMetadataSink<W> {
828    /// Create a JSONL metadata sink around an existing writer.
829    #[must_use]
830    pub const fn new(writer: W) -> Self {
831        Self {
832            writer: Mutex::new(writer),
833        }
834    }
835
836    /// Return the wrapped writer.
837    ///
838    /// # Errors
839    ///
840    /// Returns an error if a prior panic poisoned the writer lock.
841    pub fn into_inner(self) -> Result<W> {
842        self.writer
843            .into_inner()
844            .map_err(|_err: PoisonError<W>| jsonl_lock_error())
845    }
846
847    fn lock_writer(&self) -> Result<MutexGuard<'_, W>> {
848        self.writer
849            .lock()
850            .map_err(|_err: PoisonError<MutexGuard<'_, W>>| jsonl_lock_error())
851    }
852}
853
854impl<W> JsonlMetadataSink<W>
855where
856    W: Write,
857{
858    /// Flush the wrapped writer.
859    ///
860    /// # Errors
861    ///
862    /// Returns an error if the writer cannot flush buffered metadata.
863    pub fn flush(&self) -> Result<()> {
864        let mut writer: MutexGuard<'_, W> = self.lock_writer()?;
865        writer.flush().map_err(|source: std::io::Error| {
866            crate::PureflowError::metadata(format!("failed to flush metadata JSONL: {source}"))
867        })
868    }
869}
870
871impl JsonlMetadataSink<BufWriter<File>> {
872    /// Create a file-backed JSONL metadata sink, truncating any existing file.
873    ///
874    /// # Errors
875    ///
876    /// Returns an error if the file cannot be created.
877    pub fn create(path: impl AsRef<Path>) -> Result<Self> {
878        let file: File = File::create(path).map_err(|source: std::io::Error| {
879            crate::PureflowError::metadata(format!("failed to create metadata JSONL file: {source}"))
880        })?;
881
882        Ok(Self::new(BufWriter::new(file)))
883    }
884}
885
886impl<W> MetadataSink for JsonlMetadataSink<W>
887where
888    W: Write + Send,
889{
890    fn record(&self, record: &MetadataRecord) -> Result<()> {
891        let value: Value = metadata_record_to_json_value(record);
892        let mut writer: MutexGuard<'_, W> = self.lock_writer()?;
893
894        serde_json::to_writer(&mut *writer, &value).map_err(|source: serde_json::Error| {
895            crate::PureflowError::metadata(format!(
896                "failed to encode metadata JSONL record: {source}"
897            ))
898        })?;
899        writer.write_all(b"\n").map_err(|source: std::io::Error| {
900            crate::PureflowError::metadata(format!(
901                "failed to write metadata JSONL newline: {source}"
902            ))
903        })
904    }
905}
906
907fn node_context_to_json_value(context: &NodeContext) -> Value {
908    json!({
909        "workflow_id": context.workflow_id().as_str(),
910        "node_id": context.node_id().as_str(),
911        "execution": execution_metadata_to_json_value(context.execution()),
912        "cancellation": cancellation_state_to_json_value(context.cancellation()),
913    })
914}
915
916fn execution_metadata_to_json_value(execution: &ExecutionMetadata) -> Value {
917    json!({
918        "execution_id": execution.execution_id().as_str(),
919        "attempt": execution.attempt().get(),
920    })
921}
922
923fn cancellation_state_to_json_value(cancellation: CancellationState) -> Value {
924    match cancellation {
925        CancellationState::Active => json!({
926            "state": "active",
927        }),
928        CancellationState::Requested(request) => json!({
929            "state": "requested",
930            "reason": request.reason(),
931        }),
932    }
933}
934
935const fn lifecycle_event_kind_label(kind: LifecycleEventKind) -> &'static str {
936    match kind {
937        LifecycleEventKind::NodeScheduled => "node_scheduled",
938        LifecycleEventKind::NodeStarted => "node_started",
939        LifecycleEventKind::NodeCompleted => "node_completed",
940        LifecycleEventKind::NodeFailed => "node_failed",
941        LifecycleEventKind::NodeCancelled => "node_cancelled",
942    }
943}
944
945const fn message_boundary_kind_label(kind: MessageBoundaryKind) -> &'static str {
946    match kind {
947        MessageBoundaryKind::Enqueued => "enqueued",
948        MessageBoundaryKind::Dequeued => "dequeued",
949        MessageBoundaryKind::Dropped => "dropped",
950    }
951}
952
953const fn queue_port_direction_label(direction: QueuePortDirection) -> &'static str {
954    match direction {
955        QueuePortDirection::Input => "input",
956        QueuePortDirection::Output => "output",
957    }
958}
959
960const fn queue_pressure_boundary_kind_label(kind: QueuePressureBoundaryKind) -> &'static str {
961    match kind {
962        QueuePressureBoundaryKind::ReceiveAttempted => "receive_attempted",
963        QueuePressureBoundaryKind::ReceiveReady => "receive_ready",
964        QueuePressureBoundaryKind::ReceiveEmpty => "receive_empty",
965        QueuePressureBoundaryKind::ReceiveClosed => "receive_closed",
966        QueuePressureBoundaryKind::ReserveAttempted => "reserve_attempted",
967        QueuePressureBoundaryKind::ReserveReady => "reserve_ready",
968        QueuePressureBoundaryKind::ReserveFull => "reserve_full",
969        QueuePressureBoundaryKind::SendCommitted => "send_committed",
970        QueuePressureBoundaryKind::SendDropped => "send_dropped",
971    }
972}
973
974const fn error_metadata_kind_label(kind: ErrorMetadataKind) -> &'static str {
975    match kind {
976        ErrorMetadataKind::NodeFailed => "node_failed",
977        ErrorMetadataKind::WorkflowFailed => "workflow_failed",
978    }
979}
980
981const fn external_effect_metadata_kind_label(kind: ExternalEffectMetadataKind) -> &'static str {
982    match kind {
983        ExternalEffectMetadataKind::Requested => "external_effect_requested",
984        ExternalEffectMetadataKind::Completed => "external_effect_completed",
985        ExternalEffectMetadataKind::Failed => "external_effect_failed",
986    }
987}
988
989fn pureflow_error_to_json_value(error: &PureflowError) -> Value {
990    json!({
991        "code": error.code().as_str(),
992        "message": error.to_string(),
993        "visibility": error_visibility_label(error.visibility()),
994        "retry_disposition": retry_disposition_label(error.retry_disposition()),
995    })
996}
997
998fn error_diagnostic_metadata_to_json_value(diagnostic: &ErrorDiagnosticMetadata) -> Value {
999    match diagnostic {
1000        ErrorDiagnosticMetadata::WorkflowDeadlock(deadlock) => json!({
1001            "type": "workflow_deadlock",
1002            "scheduled_node_count": deadlock.scheduled_node_count(),
1003            "pending_node_count": deadlock.pending_node_count(),
1004            "completed_node_count": deadlock.completed_node_count(),
1005            "failed_node_count": deadlock.failed_node_count(),
1006            "cancelled_node_count": deadlock.cancelled_node_count(),
1007            "bounded_edge_count": deadlock.bounded_edge_count(),
1008            "no_progress_timeout_ms": deadlock.no_progress_timeout_ms(),
1009            "cycle_policy": deadlock.cycle_policy(),
1010            "feedback_loop_startup": deadlock.feedback_loop_startup(),
1011            "feedback_loop_termination": deadlock.feedback_loop_termination(),
1012        }),
1013    }
1014}
1015
1016const fn error_visibility_label(visibility: crate::ErrorVisibility) -> &'static str {
1017    match visibility {
1018        crate::ErrorVisibility::User => "user",
1019        crate::ErrorVisibility::Internal => "internal",
1020    }
1021}
1022
1023const fn retry_disposition_label(disposition: crate::RetryDisposition) -> &'static str {
1024    match disposition {
1025        crate::RetryDisposition::Never => "never",
1026        crate::RetryDisposition::Safe => "safe",
1027        crate::RetryDisposition::Unknown => "unknown",
1028    }
1029}
1030
1031fn message_metadata_to_json_value(metadata: &MessageMetadata) -> Value {
1032    json!({
1033        "message_id": metadata.message_id().as_str(),
1034        "workflow_id": metadata.workflow_id().as_str(),
1035        "execution": execution_metadata_to_json_value(metadata.execution()),
1036        "route": message_route_to_json_value(metadata.route()),
1037    })
1038}
1039
1040fn message_route_to_json_value(route: &MessageRoute) -> Value {
1041    json!({
1042        "source": route
1043            .source()
1044            .map_or(Value::Null, message_endpoint_to_json_value),
1045        "target": message_endpoint_to_json_value(route.target()),
1046    })
1047}
1048
1049fn message_endpoint_to_json_value(endpoint: &MessageEndpoint) -> Value {
1050    json!({
1051        "node_id": endpoint.node_id().as_str(),
1052        "port_id": endpoint.port_id().as_str(),
1053    })
1054}
1055
1056fn jsonl_lock_error() -> crate::PureflowError {
1057    crate::PureflowError::metadata("metadata JSONL writer lock poisoned")
1058}
1059
1060fn tiered_lock_error() -> crate::PureflowError {
1061    crate::PureflowError::metadata("tiered metadata policy lock poisoned")
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067    use crate::context::{CancellationRequest, ExecutionAttempt, ExecutionMetadata};
1068    use pureflow_types::{ExecutionId, MessageId, NodeId, PortId, WorkflowId};
1069    use std::io;
1070    use std::num::NonZeroU32;
1071    use std::sync::Arc;
1072
1073    use quickcheck::{Arbitrary, Gen, QuickCheck};
1074
1075    fn execution_id(value: &str) -> ExecutionId {
1076        ExecutionId::new(value).expect("valid execution id")
1077    }
1078
1079    fn node_id(value: &str) -> NodeId {
1080        NodeId::new(value).expect("valid node id")
1081    }
1082
1083    fn workflow_id(value: &str) -> WorkflowId {
1084        WorkflowId::new(value).expect("valid workflow id")
1085    }
1086
1087    fn port_id(value: &str) -> PortId {
1088        PortId::new(value).expect("valid port id")
1089    }
1090
1091    fn context() -> NodeContext {
1092        NodeContext::new(
1093            workflow_id("flow"),
1094            node_id("node"),
1095            ExecutionMetadata::first_attempt(execution_id("run-1")),
1096        )
1097    }
1098
1099    fn message_metadata() -> MessageMetadata {
1100        let source: crate::message::MessageEndpoint =
1101            crate::message::MessageEndpoint::new(node_id("source"), port_id("out"));
1102        let target: crate::message::MessageEndpoint =
1103            crate::message::MessageEndpoint::new(node_id("sink"), port_id("in"));
1104        let route: crate::message::MessageRoute =
1105            crate::message::MessageRoute::new(Some(source), target);
1106
1107        MessageMetadata::new(
1108            MessageId::new("msg-1").expect("valid message id"),
1109            workflow_id("flow"),
1110            ExecutionMetadata::first_attempt(execution_id("run-1")),
1111            route,
1112        )
1113    }
1114
1115    #[derive(Debug, Clone)]
1116    struct ArbitraryMetadataRecord(MetadataRecord);
1117
1118    impl Arbitrary for ArbitraryMetadataRecord {
1119        fn arbitrary(g: &mut Gen) -> Self {
1120            let record = match u8::arbitrary(g) % 6 {
1121                0 => MetadataRecord::ExecutionContext(generated_context(g)),
1122                1 => MetadataRecord::Lifecycle(LifecycleEvent::new(
1123                    generated_lifecycle_kind(g),
1124                    generated_context(g),
1125                )),
1126                2 => MetadataRecord::Message(MessageBoundaryRecord::new(
1127                    generated_message_boundary_kind(g),
1128                    generated_message_metadata(g),
1129                )),
1130                3 => MetadataRecord::QueuePressure(QueuePressureRecord::new(
1131                    bool::arbitrary(g).then(|| generated_context(g)),
1132                    generated_queue_port_direction(g),
1133                    generated_port_id(g, "port"),
1134                    generated_queue_pressure_kind(g),
1135                    bounded_usize(g, 8),
1136                    bool::arbitrary(g).then(|| bounded_usize(g, 256)),
1137                    bool::arbitrary(g).then(|| bounded_usize(g, 256)),
1138                )),
1139                4 => MetadataRecord::Error(generated_error_metadata(g)),
1140                _ => MetadataRecord::ExternalEffect(generated_external_effect_metadata(g)),
1141            };
1142
1143            Self(record)
1144        }
1145    }
1146
1147    fn bounded_usize(g: &mut Gen, upper_exclusive: usize) -> usize {
1148        usize::arbitrary(g) % upper_exclusive
1149    }
1150
1151    fn generated_execution(g: &mut Gen) -> ExecutionMetadata {
1152        let attempt = NonZeroU32::new(u32::arbitrary(g) % 32 + 1)
1153            .expect("generated execution attempt is nonzero");
1154
1155        ExecutionMetadata::new(
1156            generated_execution_id(g, "run"),
1157            ExecutionAttempt::new(attempt),
1158        )
1159    }
1160
1161    fn generated_context(g: &mut Gen) -> NodeContext {
1162        let context = NodeContext::new(
1163            generated_workflow_id(g, "flow"),
1164            generated_node_id(g, "node"),
1165            generated_execution(g),
1166        );
1167
1168        if bool::arbitrary(g) {
1169            context.with_cancellation(CancellationRequest::new(format!(
1170                "cancelled_{}",
1171                bounded_usize(g, 16)
1172            )))
1173        } else {
1174            context
1175        }
1176    }
1177
1178    fn generated_message_metadata(g: &mut Gen) -> MessageMetadata {
1179        let source = bool::arbitrary(g).then(|| {
1180            MessageEndpoint::new(generated_node_id(g, "source"), generated_port_id(g, "out"))
1181        });
1182        let target = MessageEndpoint::new(generated_node_id(g, "sink"), generated_port_id(g, "in"));
1183
1184        MessageMetadata::new(
1185            generated_message_id(g, "msg"),
1186            generated_workflow_id(g, "flow"),
1187            generated_execution(g),
1188            MessageRoute::new(source, target),
1189        )
1190    }
1191
1192    fn generated_error_metadata(g: &mut Gen) -> ErrorMetadataRecord {
1193        match u8::arbitrary(g) % 3 {
1194            0 => ErrorMetadataRecord::node_failed(&generated_context(g), generated_error(g)),
1195            1 => ErrorMetadataRecord::workflow_failed(
1196                generated_workflow_id(g, "flow"),
1197                generated_execution(g),
1198                generated_error(g),
1199            ),
1200            _ => ErrorMetadataRecord::workflow_failed_with_diagnostic(
1201                generated_workflow_id(g, "flow"),
1202                generated_execution(g),
1203                generated_error(g),
1204                ErrorDiagnosticMetadata::workflow_deadlock(
1205                    DeadlockDiagnosticMetadata::new(
1206                        bounded_usize(g, 16),
1207                        bounded_usize(g, 16),
1208                        bounded_usize(g, 16),
1209                        u64::arbitrary(g) % 60_000,
1210                        generated_cycle_policy(g),
1211                    )
1212                    .with_terminal_counts(
1213                        bounded_usize(g, 16),
1214                        bounded_usize(g, 16),
1215                        bounded_usize(g, 16),
1216                    )
1217                    .with_feedback_loop("start_all_nodes", "all_nodes_complete"),
1218                ),
1219            ),
1220        }
1221    }
1222
1223    fn generated_external_effect_metadata(g: &mut Gen) -> ExternalEffectMetadataRecord {
1224        let context = generated_context(g);
1225        let effect = generated_effect(g);
1226        let operation = format!("operation_{}", bounded_usize(g, 32));
1227        let target = format!("target_{}", bounded_usize(g, 32));
1228        match u8::arbitrary(g) % 3 {
1229            0 => ExternalEffectMetadataRecord::requested(context, effect, operation, target),
1230            1 => ExternalEffectMetadataRecord::completed(
1231                context,
1232                effect,
1233                operation,
1234                target,
1235                format!("status_{}", bounded_usize(g, 16)),
1236            ),
1237            _ => ExternalEffectMetadataRecord::failed(
1238                context,
1239                effect,
1240                operation,
1241                target,
1242                bool::arbitrary(g).then(|| format!("status_{}", bounded_usize(g, 16))),
1243            ),
1244        }
1245    }
1246
1247    fn generated_effect(g: &mut Gen) -> EffectCapability {
1248        match u8::arbitrary(g) % 8 {
1249            0 => EffectCapability::FileSystemRead,
1250            1 => EffectCapability::FileSystemWrite,
1251            2 => EffectCapability::NetworkOutbound,
1252            3 => EffectCapability::ExternalEffect,
1253            4 => EffectCapability::ProcessSpawn,
1254            5 => EffectCapability::EnvironmentRead,
1255            6 => EffectCapability::EnvironmentWrite,
1256            _ => EffectCapability::Clock,
1257        }
1258    }
1259
1260    fn generated_error(g: &mut Gen) -> crate::PureflowError {
1261        let message = format!("generated_failure_{}", bounded_usize(g, 32));
1262        match u8::arbitrary(g) % 4 {
1263            0 => crate::PureflowError::execution(message),
1264            1 => crate::PureflowError::cancelled(message),
1265            2 => crate::PureflowError::lifecycle(message),
1266            _ => crate::PureflowError::metadata(message),
1267        }
1268    }
1269
1270    fn generated_lifecycle_kind(g: &mut Gen) -> LifecycleEventKind {
1271        match u8::arbitrary(g) % 5 {
1272            0 => LifecycleEventKind::NodeScheduled,
1273            1 => LifecycleEventKind::NodeStarted,
1274            2 => LifecycleEventKind::NodeCompleted,
1275            3 => LifecycleEventKind::NodeFailed,
1276            _ => LifecycleEventKind::NodeCancelled,
1277        }
1278    }
1279
1280    fn generated_message_boundary_kind(g: &mut Gen) -> MessageBoundaryKind {
1281        match u8::arbitrary(g) % 3 {
1282            0 => MessageBoundaryKind::Enqueued,
1283            1 => MessageBoundaryKind::Dequeued,
1284            _ => MessageBoundaryKind::Dropped,
1285        }
1286    }
1287
1288    fn generated_queue_port_direction(g: &mut Gen) -> QueuePortDirection {
1289        if bool::arbitrary(g) {
1290            QueuePortDirection::Input
1291        } else {
1292            QueuePortDirection::Output
1293        }
1294    }
1295
1296    fn generated_queue_pressure_kind(g: &mut Gen) -> QueuePressureBoundaryKind {
1297        match u8::arbitrary(g) % 9 {
1298            0 => QueuePressureBoundaryKind::ReceiveAttempted,
1299            1 => QueuePressureBoundaryKind::ReceiveReady,
1300            2 => QueuePressureBoundaryKind::ReceiveEmpty,
1301            3 => QueuePressureBoundaryKind::ReceiveClosed,
1302            4 => QueuePressureBoundaryKind::ReserveAttempted,
1303            5 => QueuePressureBoundaryKind::ReserveReady,
1304            6 => QueuePressureBoundaryKind::ReserveFull,
1305            7 => QueuePressureBoundaryKind::SendCommitted,
1306            _ => QueuePressureBoundaryKind::SendDropped,
1307        }
1308    }
1309
1310    fn generated_cycle_policy(g: &mut Gen) -> &'static str {
1311        if bool::arbitrary(g) {
1312            "reject_cycles"
1313        } else {
1314            "allow_feedback_loops"
1315        }
1316    }
1317
1318    fn generated_workflow_id(g: &mut Gen, prefix: &str) -> WorkflowId {
1319        workflow_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1320    }
1321
1322    fn generated_execution_id(g: &mut Gen, prefix: &str) -> ExecutionId {
1323        execution_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1324    }
1325
1326    fn generated_message_id(g: &mut Gen, prefix: &str) -> MessageId {
1327        MessageId::new(format!("{prefix}_{}", bounded_usize(g, 64))).expect("valid message id")
1328    }
1329
1330    fn generated_node_id(g: &mut Gen, prefix: &str) -> NodeId {
1331        node_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1332    }
1333
1334    fn generated_port_id(g: &mut Gen, prefix: &str) -> PortId {
1335        port_id(&format!("{prefix}_{}", bounded_usize(g, 64)))
1336    }
1337
1338    fn top_level_kind(value: &Value) -> Option<&str> {
1339        value.as_object()?.get("kind")?.as_str()
1340    }
1341
1342    fn is_known_kind(kind: &str) -> bool {
1343        matches!(
1344            kind,
1345            "execution_context"
1346                | "node_scheduled"
1347                | "node_started"
1348                | "node_completed"
1349                | "node_failed"
1350                | "node_cancelled"
1351                | "enqueued"
1352                | "dequeued"
1353                | "dropped"
1354                | "receive_attempted"
1355                | "receive_ready"
1356                | "receive_empty"
1357                | "receive_closed"
1358                | "reserve_attempted"
1359                | "reserve_ready"
1360                | "reserve_full"
1361                | "send_committed"
1362                | "send_dropped"
1363                | "workflow_failed"
1364                | "external_effect_requested"
1365                | "external_effect_completed"
1366                | "external_effect_failed"
1367        )
1368    }
1369
1370    fn contains_reproducibility_sensitive_key(value: &Value) -> bool {
1371        match value {
1372            Value::Object(map) => map.iter().any(|(key, value)| {
1373                is_reproducibility_sensitive_key(key)
1374                    || contains_reproducibility_sensitive_key(value)
1375            }),
1376            Value::Array(values) => values.iter().any(contains_reproducibility_sensitive_key),
1377            Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => false,
1378        }
1379    }
1380
1381    fn is_reproducibility_sensitive_key(key: &str) -> bool {
1382        matches!(
1383            key,
1384            "timestamp"
1385                | "wall_clock_timestamp"
1386                | "thread_id"
1387                | "process_id"
1388                | "pid"
1389                | "payload"
1390                | "payload_bytes"
1391                | "raw_payload_bytes"
1392        )
1393    }
1394
1395    #[derive(Debug, Default)]
1396    struct RecordingMetadataSink {
1397        records: Mutex<Vec<MetadataRecord>>,
1398    }
1399
1400    impl RecordingMetadataSink {
1401        fn len(&self) -> usize {
1402            self.records
1403                .lock()
1404                .expect("recording metadata sink lock should not be poisoned")
1405                .len()
1406        }
1407
1408        fn records(&self) -> Vec<MetadataRecord> {
1409            self.records
1410                .lock()
1411                .expect("recording metadata sink lock should not be poisoned")
1412                .clone()
1413        }
1414    }
1415
1416    impl MetadataSink for RecordingMetadataSink {
1417        fn record(&self, record: &MetadataRecord) -> Result<()> {
1418            self.records
1419                .lock()
1420                .expect("recording metadata sink lock should not be poisoned")
1421                .push(record.clone());
1422            Ok(())
1423        }
1424    }
1425
1426    impl MetadataSink for Arc<RecordingMetadataSink> {
1427        fn record(&self, record: &MetadataRecord) -> Result<()> {
1428            self.as_ref().record(record)
1429        }
1430    }
1431
1432    #[test]
1433    fn generated_metadata_json_preserves_stable_machine_contract() {
1434        #[allow(clippy::needless_pass_by_value)]
1435        fn property(record: ArbitraryMetadataRecord) -> bool {
1436            let value = metadata_record_to_json_value(&record.0);
1437
1438            top_level_kind(&value).is_some_and(is_known_kind)
1439                && !contains_reproducibility_sensitive_key(&value)
1440        }
1441
1442        QuickCheck::new()
1443            .tests(128)
1444            .quickcheck(property as fn(ArbitraryMetadataRecord) -> bool);
1445    }
1446
1447    #[test]
1448    fn metadata_record_keeps_context_shape_intact() {
1449        let record: MetadataRecord = MetadataRecord::ExecutionContext(context());
1450
1451        assert!(matches!(
1452            record,
1453            MetadataRecord::ExecutionContext(ctx) if ctx.node_id().as_str() == "node"
1454        ));
1455    }
1456
1457    #[test]
1458    fn noop_metadata_sink_accepts_records() {
1459        let record: MetadataRecord = MetadataRecord::ExecutionContext(context());
1460
1461        NoopMetadataSink
1462            .record(&record)
1463            .expect("noop metadata sink should accept records");
1464    }
1465
1466    #[test]
1467    fn tiered_metadata_sink_records_control_records_by_default() {
1468        let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1469        let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> =
1470            TieredMetadataSink::new(inner.clone());
1471        let record: MetadataRecord = MetadataRecord::Lifecycle(LifecycleEvent::new(
1472            LifecycleEventKind::NodeStarted,
1473            context(),
1474        ));
1475
1476        sink.record(&record)
1477            .expect("control metadata should pass through");
1478
1479        assert_eq!(inner.records(), vec![record]);
1480    }
1481
1482    #[test]
1483    fn tiered_metadata_sink_drops_data_and_high_cost_records_by_default() {
1484        let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1485        let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> =
1486            TieredMetadataSink::new(inner.clone());
1487        let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1488            MessageBoundaryKind::Enqueued,
1489            message_metadata(),
1490        ));
1491
1492        sink.record_with_tier(MetadataTier::Data, &record)
1493            .expect("dropped data metadata should be accepted");
1494        sink.record_with_tier(MetadataTier::HighCostData, &record)
1495            .expect("dropped high-cost metadata should be accepted");
1496
1497        assert_eq!(inner.len(), 0);
1498    }
1499
1500    #[test]
1501    fn tiered_metadata_sink_samples_data_records() {
1502        let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1503        let policy: TieredMetadataPolicy =
1504            TieredMetadataPolicy::sample_data_every(NonZeroUsize::new(2).expect("nonzero"));
1505        let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> =
1506            TieredMetadataSink::with_policy(inner.clone(), policy);
1507        let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1508            MessageBoundaryKind::Dequeued,
1509            message_metadata(),
1510        ));
1511
1512        sink.record_with_tier(MetadataTier::Data, &record)
1513            .expect("first sampled data metadata should pass through");
1514        sink.record_with_tier(MetadataTier::Data, &record)
1515            .expect("second sampled data metadata should be dropped");
1516        sink.record_with_tier(MetadataTier::Data, &record)
1517            .expect("third sampled data metadata should pass through");
1518
1519        assert_eq!(inner.records(), vec![record.clone(), record]);
1520    }
1521
1522    #[test]
1523    fn tiered_metadata_policy_can_enable_high_cost_records() {
1524        let inner: Arc<RecordingMetadataSink> = Arc::new(RecordingMetadataSink::default());
1525        let sink: TieredMetadataSink<Arc<RecordingMetadataSink>> = TieredMetadataSink::with_policy(
1526            inner.clone(),
1527            TieredMetadataPolicy::control_only().with_high_cost_data(),
1528        );
1529        let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1530            MessageBoundaryKind::Dropped,
1531            message_metadata(),
1532        ));
1533
1534        sink.record_with_tier(MetadataTier::HighCostData, &record)
1535            .expect("enabled high-cost metadata should pass through");
1536
1537        assert_eq!(inner.records(), vec![record]);
1538    }
1539
1540    #[test]
1541    fn message_boundary_record_keeps_shape_intact() {
1542        let target: crate::message::MessageEndpoint =
1543            crate::message::MessageEndpoint::new(node_id("sink"), port_id("in"));
1544        let route: crate::message::MessageRoute = crate::message::MessageRoute::new(None, target);
1545        let metadata: MessageMetadata = MessageMetadata::new(
1546            MessageId::new("msg-1").expect("valid message id"),
1547            workflow_id("flow"),
1548            ExecutionMetadata::first_attempt(execution_id("run-1")),
1549            route,
1550        );
1551        let record: MessageBoundaryRecord =
1552            MessageBoundaryRecord::new(MessageBoundaryKind::Enqueued, metadata);
1553
1554        assert!(matches!(
1555            record,
1556            MessageBoundaryRecord {
1557                kind: MessageBoundaryKind::Enqueued,
1558                ..
1559            }
1560        ));
1561    }
1562
1563    #[test]
1564    fn metadata_record_json_uses_stable_queue_pressure_shape() {
1565        let record: MetadataRecord = MetadataRecord::QueuePressure(QueuePressureRecord::new(
1566            Some(context()),
1567            QueuePortDirection::Input,
1568            port_id("in"),
1569            QueuePressureBoundaryKind::ReceiveReady,
1570            2,
1571            Some(8),
1572            Some(3),
1573        ));
1574
1575        assert_eq!(
1576            metadata_record_to_json_value(&record),
1577            json!({
1578                "record_type": "queue_pressure",
1579                "kind": "receive_ready",
1580                "direction": "input",
1581                "port_id": "in",
1582                "context": {
1583                    "workflow_id": "flow",
1584                    "node_id": "node",
1585                    "execution": {
1586                        "execution_id": "run-1",
1587                        "attempt": 1,
1588                    },
1589                    "cancellation": {
1590                        "state": "active",
1591                    },
1592                },
1593                "connected_edge_count": 2,
1594                "capacity": 8,
1595                "queued_count": 3,
1596            })
1597        );
1598    }
1599
1600    #[test]
1601    fn metadata_record_json_uses_stable_message_shape() {
1602        let record: MetadataRecord = MetadataRecord::Message(MessageBoundaryRecord::new(
1603            MessageBoundaryKind::Dequeued,
1604            message_metadata(),
1605        ));
1606
1607        assert_eq!(
1608            metadata_record_to_json_value(&record),
1609            json!({
1610                "record_type": "message",
1611                "kind": "dequeued",
1612                "message": {
1613                    "message_id": "msg-1",
1614                    "workflow_id": "flow",
1615                    "execution": {
1616                        "execution_id": "run-1",
1617                        "attempt": 1,
1618                    },
1619                    "route": {
1620                        "source": {
1621                            "node_id": "source",
1622                            "port_id": "out",
1623                        },
1624                        "target": {
1625                            "node_id": "sink",
1626                            "port_id": "in",
1627                        },
1628                    },
1629                },
1630            })
1631        );
1632    }
1633
1634    #[test]
1635    fn metadata_record_json_uses_stable_external_effect_shape() {
1636        let record: MetadataRecord =
1637            MetadataRecord::ExternalEffect(ExternalEffectMetadataRecord::completed(
1638                context(),
1639                EffectCapability::ExternalEffect,
1640                "tool_call",
1641                "get_weather",
1642                "ok",
1643            ));
1644
1645        assert_eq!(
1646            metadata_record_to_json_value(&record),
1647            json!({
1648                "record_type": "external_effect",
1649                "kind": "external_effect_completed",
1650                "context": {
1651                    "workflow_id": "flow",
1652                    "node_id": "node",
1653                    "execution": {
1654                        "execution_id": "run-1",
1655                        "attempt": 1,
1656                    },
1657                    "cancellation": {
1658                        "state": "active",
1659                    },
1660                },
1661                "effect": "external_effect",
1662                "operation": "tool_call",
1663                "target": "get_weather",
1664                "response_status": "ok",
1665            })
1666        );
1667    }
1668
1669    #[test]
1670    fn metadata_record_json_uses_stable_error_shape() {
1671        let record: MetadataRecord = MetadataRecord::Error(ErrorMetadataRecord::node_failed(
1672            &context(),
1673            crate::PureflowError::execution("executor failed"),
1674        ));
1675
1676        assert_eq!(
1677            metadata_record_to_json_value(&record),
1678            json!({
1679                "record_type": "error",
1680                "kind": "node_failed",
1681                "workflow_id": "flow",
1682                "node_id": "node",
1683                "execution": {
1684                    "execution_id": "run-1",
1685                    "attempt": 1,
1686                },
1687                "error": {
1688                    "code": "CDT-EXEC-001",
1689                    "message": "CDT-EXEC-001: node execution failed: executor failed",
1690                    "visibility": "internal",
1691                    "retry_disposition": "unknown",
1692                },
1693                "diagnostic": null,
1694            })
1695        );
1696    }
1697
1698    #[test]
1699    fn metadata_record_json_uses_stable_deadlock_diagnostic_shape() {
1700        let diagnostic: ErrorDiagnosticMetadata = ErrorDiagnosticMetadata::workflow_deadlock(
1701            DeadlockDiagnosticMetadata::new(2, 2, 2, 1, "allow_feedback_loops")
1702                .with_feedback_loop("start_all_nodes", "all_nodes_complete"),
1703        );
1704        let record: MetadataRecord =
1705            MetadataRecord::Error(ErrorMetadataRecord::workflow_failed_with_diagnostic(
1706                workflow_id("flow"),
1707                ExecutionMetadata::first_attempt(execution_id("run-1")),
1708                crate::PureflowError::execution("watchdog detected no workflow progress"),
1709                diagnostic,
1710            ));
1711
1712        assert_eq!(
1713            metadata_record_to_json_value(&record),
1714            json!({
1715                "record_type": "error",
1716                "kind": "workflow_failed",
1717                "workflow_id": "flow",
1718                "node_id": null,
1719                "execution": {
1720                    "execution_id": "run-1",
1721                    "attempt": 1,
1722                },
1723                "error": {
1724                    "code": "CDT-EXEC-001",
1725                    "message": "CDT-EXEC-001: node execution failed: watchdog detected no workflow progress",
1726                    "visibility": "internal",
1727                    "retry_disposition": "unknown",
1728                },
1729                "diagnostic": {
1730                    "type": "workflow_deadlock",
1731                    "scheduled_node_count": 2,
1732                    "pending_node_count": 2,
1733                    "completed_node_count": 0,
1734                    "failed_node_count": 0,
1735                    "cancelled_node_count": 0,
1736                    "bounded_edge_count": 2,
1737                    "no_progress_timeout_ms": 1,
1738                    "cycle_policy": "allow_feedback_loops",
1739                    "feedback_loop_startup": "start_all_nodes",
1740                    "feedback_loop_termination": "all_nodes_complete",
1741                },
1742            })
1743        );
1744    }
1745
1746    #[test]
1747    fn workflow_error_metadata_has_no_node_id() {
1748        let record: ErrorMetadataRecord = ErrorMetadataRecord::workflow_failed(
1749            workflow_id("flow"),
1750            ExecutionMetadata::first_attempt(execution_id("run-1")),
1751            crate::PureflowError::cancelled("shutdown"),
1752        );
1753
1754        assert_eq!(record.kind(), ErrorMetadataKind::WorkflowFailed);
1755        assert!(record.node_id().is_none());
1756        assert!(record.diagnostic().is_none());
1757    }
1758
1759    #[test]
1760    fn jsonl_metadata_sink_writes_reproducible_lines() {
1761        let sink: JsonlMetadataSink<Vec<u8>> = JsonlMetadataSink::new(Vec::new());
1762        let record: MetadataRecord = MetadataRecord::Lifecycle(LifecycleEvent::new(
1763            LifecycleEventKind::NodeStarted,
1764            context(),
1765        ));
1766
1767        sink.record(&record)
1768            .expect("first metadata record should write");
1769        sink.record(&record)
1770            .expect("second metadata record should write");
1771        let output: String = String::from_utf8(sink.into_inner().expect("writer should return"))
1772            .expect("JSONL should be UTF-8");
1773        let mut lines = output.lines();
1774        let first = lines.next().expect("first JSONL line should exist");
1775        let second = lines.next().expect("second JSONL line should exist");
1776
1777        assert_eq!(first, second);
1778        assert!(lines.next().is_none());
1779    }
1780
1781    struct FailingWriter;
1782
1783    impl Write for FailingWriter {
1784        fn write(&mut self, _buf: &[u8]) -> io::Result<usize> {
1785            Err(io::Error::other("write failed"))
1786        }
1787
1788        fn flush(&mut self) -> io::Result<()> {
1789            Ok(())
1790        }
1791    }
1792
1793    #[test]
1794    fn jsonl_metadata_sink_maps_writer_failures() {
1795        let sink: JsonlMetadataSink<FailingWriter> = JsonlMetadataSink::new(FailingWriter);
1796        let record: MetadataRecord = MetadataRecord::ExecutionContext(context());
1797        let err: crate::PureflowError = sink
1798            .record(&record)
1799            .expect_err("writer failure should surface");
1800
1801        assert_eq!(err.code(), crate::ErrorCode::MetadataCollectionFailed);
1802        assert!(err.to_string().contains("failed to encode"));
1803    }
1804}