Skip to main content

clawft_kernel/
ipc.rs

1//! Kernel IPC subsystem.
2//!
3//! [`KernelIpc`] wraps the existing [`MessageBus`] from `clawft-core`,
4//! adding typed [`KernelMessage`] envelopes and PID-based routing.
5//! The underlying message bus channels are reused (no new channels).
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use tracing::debug;
13
14use clawft_core::bus::MessageBus;
15
16use crate::error::KernelError;
17use crate::process::Pid;
18
19/// Global atomic counter for generating internal IPC message IDs.
20///
21/// Using an atomic counter instead of `uuid::Uuid::new_v4()` eliminates
22/// the crypto-random generation overhead (~50-100ns per message) in hot
23/// IPC paths. The counter is monotonically increasing and unique within
24/// a single process lifetime, which is sufficient for internal correlation.
25static IPC_MSG_COUNTER: AtomicU64 = AtomicU64::new(1);
26
27/// Generate a fast, unique internal message ID using an atomic counter.
28///
29/// Format: `"ipc-{counter}"` -- lightweight string, no crypto-random overhead.
30#[inline]
31fn next_ipc_msg_id() -> String {
32    let id = IPC_MSG_COUNTER.fetch_add(1, Ordering::Relaxed);
33    format!("ipc-{id}")
34}
35
36/// Target for a kernel message.
37#[non_exhaustive]
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub enum MessageTarget {
40    /// Send to a specific process by PID.
41    Process(Pid),
42    /// Publish to a named topic (all subscribers receive).
43    Topic(String),
44    /// Broadcast to all processes.
45    Broadcast,
46    /// Send to a named service (routed via ServiceRegistry).
47    Service(String),
48    /// Send to a specific method on a named service (D19, K2.1).
49    ///
50    /// The router resolves the service via ServiceRegistry and wraps
51    /// the payload with method metadata for the receiving agent.
52    ServiceMethod {
53        /// Service name to resolve.
54        service: String,
55        /// Method to invoke on the service.
56        method: String,
57    },
58    /// Send to the kernel itself.
59    Kernel,
60    /// Route to a specific process on a remote node (K6).
61    /// The inner target is resolved on the destination node.
62    RemoteNode {
63        /// Remote node identifier.
64        node_id: String,
65        /// Target to resolve on the remote node.
66        target: Box<MessageTarget>,
67    },
68}
69
70/// Payload types for kernel messages.
71#[non_exhaustive]
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub enum MessagePayload {
74    /// Plain text message.
75    Text(String),
76    /// Structured JSON data.
77    Json(serde_json::Value),
78    /// Tool call delegation from one agent to another.
79    ToolCall {
80        /// Name of the tool to call.
81        name: String,
82        /// Tool arguments.
83        args: serde_json::Value,
84    },
85    /// Result of a delegated tool call.
86    ToolResult {
87        /// Correlation ID linking to the original request.
88        call_id: String,
89        /// Tool execution result.
90        result: serde_json::Value,
91    },
92    /// System control signal.
93    Signal(KernelSignal),
94    /// RVF-typed payload with segment type hint.
95    ///
96    /// Agents can exchange RVF-typed messages. The segment type tells
97    /// the receiver what format the data is in (using rvf-types
98    /// discriminants, e.g. 0x40 = ExochainEvent).
99    Rvf {
100        /// RVF segment type discriminant.
101        segment_type: u8,
102        /// Payload data (CBOR, JSON, or raw bytes).
103        data: Vec<u8>,
104    },
105}
106
107impl MessagePayload {
108    /// Return the payload type name (for logging/chain events).
109    pub fn type_name(&self) -> &'static str {
110        match self {
111            MessagePayload::Text(_) => "text",
112            MessagePayload::Json(_) => "json",
113            MessagePayload::ToolCall { .. } => "tool_call",
114            MessagePayload::ToolResult { .. } => "tool_result",
115            MessagePayload::Signal(_) => "signal",
116            MessagePayload::Rvf { .. } => "rvf",
117        }
118    }
119}
120
121/// Reason a process exited (used in link/monitor notifications).
122#[non_exhaustive]
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub enum ExitReason {
125    /// Normal exit (process completed successfully).
126    Normal,
127    /// Process crashed with an error.
128    Crash(String),
129    /// Process was killed.
130    Killed,
131    /// Process timed out.
132    Timeout,
133}
134
135/// Notification that a monitored process went down.
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ProcessDown {
138    /// PID of the process that went down.
139    pub pid: crate::process::Pid,
140    /// Why the process exited.
141    pub reason: ExitReason,
142}
143
144/// Kernel control signals.
145#[non_exhaustive]
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub enum KernelSignal {
148    /// Request a process to shut down gracefully.
149    Shutdown,
150    /// Request a process to suspend.
151    Suspend,
152    /// Request a process to resume from suspension.
153    Resume,
154    /// Heartbeat / keep-alive ping.
155    Ping,
156    /// Response to a heartbeat ping.
157    Pong,
158    /// Reload configuration (K2-G5).
159    ReloadConfig,
160    /// Dump internal state for debugging (K2-G5).
161    DumpState,
162    /// User-defined signal with a discriminant (K2-G5).
163    UserDefined(u8),
164    /// Immediate kill -- no cleanup, no graceful shutdown (K2-G5).
165    Kill,
166    /// Crash notification from a linked process (K1-G2).
167    LinkExit {
168        /// PID of the linked process that exited.
169        pid: crate::process::Pid,
170        /// Reason the process exited.
171        reason: ExitReason,
172    },
173    /// Monitor DOWN notification (K1-G2).
174    MonitorDown(ProcessDown),
175    /// Resource usage warning at 80% of limit (K1-G3).
176    ResourceWarning {
177        /// Name of the resource (e.g. "memory", "cpu_time").
178        resource: String,
179        /// Current usage value.
180        current: u64,
181        /// Configured limit.
182        limit: u64,
183    },
184}
185
186/// A typed message envelope for kernel IPC.
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct KernelMessage {
189    /// Unique message identifier.
190    pub id: String,
191    /// Sender PID (0 = kernel).
192    pub from: Pid,
193    /// Target for delivery.
194    pub target: MessageTarget,
195    /// Message payload.
196    pub payload: MessagePayload,
197    /// Creation timestamp.
198    pub timestamp: DateTime<Utc>,
199    /// Optional correlation ID for request-response patterns.
200    ///
201    /// When set, this links a response message back to the original
202    /// request that triggered it. Used by the A2A protocol's
203    /// request-response tracking.
204    #[serde(default, skip_serializing_if = "Option::is_none")]
205    pub correlation_id: Option<String>,
206    /// Distributed trace ID for end-to-end request tracing (K2-G4).
207    ///
208    /// External messages entering the kernel receive a new UUID v4
209    /// trace_id. Internal messages inherit the parent's trace_id
210    /// via correlation linkage.
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    pub trace_id: Option<String>,
213}
214
215impl KernelMessage {
216    /// Create a new kernel message.
217    ///
218    /// Uses an atomic counter for the message ID instead of UUID v4,
219    /// eliminating crypto-random generation overhead in hot IPC paths.
220    pub fn new(from: Pid, target: MessageTarget, payload: MessagePayload) -> Self {
221        Self {
222            id: next_ipc_msg_id(),
223            from,
224            target,
225            payload,
226            timestamp: Utc::now(),
227            correlation_id: None,
228            trace_id: None,
229        }
230    }
231
232    /// Create a new kernel message with a correlation ID.
233    pub fn with_correlation(
234        from: Pid,
235        target: MessageTarget,
236        payload: MessagePayload,
237        correlation_id: String,
238    ) -> Self {
239        Self {
240            id: next_ipc_msg_id(),
241            from,
242            target,
243            payload,
244            timestamp: Utc::now(),
245            correlation_id: Some(correlation_id),
246            trace_id: None,
247        }
248    }
249
250    /// Create a new kernel message with a trace ID (for external entry points).
251    pub fn with_trace(
252        from: Pid,
253        target: MessageTarget,
254        payload: MessagePayload,
255        trace_id: String,
256    ) -> Self {
257        Self {
258            id: next_ipc_msg_id(),
259            from,
260            target,
261            payload,
262            timestamp: Utc::now(),
263            correlation_id: None,
264            trace_id: Some(trace_id),
265        }
266    }
267
268    /// Set the trace ID on this message (builder pattern).
269    pub fn set_trace_id(mut self, trace_id: impl Into<String>) -> Self {
270        self.trace_id = Some(trace_id.into());
271        self
272    }
273
274    /// Ensure this message has a trace ID, generating one if missing.
275    pub fn ensure_trace_id(mut self) -> Self {
276        if self.trace_id.is_none() {
277            self.trace_id = Some(uuid::Uuid::new_v4().to_string());
278        }
279        self
280    }
281
282    /// Create a text message.
283    pub fn text(from: Pid, target: MessageTarget, text: impl Into<String>) -> Self {
284        Self::new(from, target, MessagePayload::Text(text.into()))
285    }
286
287    /// Create a signal message.
288    pub fn signal(from: Pid, target: MessageTarget, signal: KernelSignal) -> Self {
289        Self::new(from, target, MessagePayload::Signal(signal))
290    }
291
292    /// Create a tool call message.
293    pub fn tool_call(
294        from: Pid,
295        target: MessageTarget,
296        name: impl Into<String>,
297        args: serde_json::Value,
298    ) -> Self {
299        Self::new(
300            from,
301            target,
302            MessagePayload::ToolCall {
303                name: name.into(),
304                args,
305            },
306        )
307    }
308
309    /// Create a tool result message (response to a tool call).
310    pub fn tool_result(
311        from: Pid,
312        target: MessageTarget,
313        call_id: impl Into<String>,
314        result: serde_json::Value,
315    ) -> Self {
316        Self::new(
317            from,
318            target,
319            MessagePayload::ToolResult {
320                call_id: call_id.into(),
321                result,
322            },
323        )
324    }
325}
326
327/// Globally unique process identifier: (node_id, local_pid).
328///
329/// Used for cross-node process addressing in K6 mesh networking.
330#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
331pub struct GlobalPid {
332    /// Node that owns this process.
333    pub node_id: String,
334    /// Local PID on that node.
335    pub pid: Pid,
336}
337
338impl GlobalPid {
339    /// Create a GlobalPid for a local process.
340    pub fn local(pid: Pid, node_id: &str) -> Self {
341        Self {
342            node_id: node_id.to_string(),
343            pid,
344        }
345    }
346
347    /// Check if this PID belongs to the given node.
348    pub fn is_local(&self, my_node_id: &str) -> bool {
349        self.node_id == my_node_id
350    }
351}
352
353impl std::fmt::Display for GlobalPid {
354    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355        write!(f, "{}:{}", self.node_id, self.pid)
356    }
357}
358
359/// Kernel IPC subsystem wrapping the core MessageBus.
360///
361/// Adds kernel-level message envelope (type, routing, timestamps)
362/// on top of the existing broadcast channel infrastructure.
363pub struct KernelIpc {
364    bus: Arc<MessageBus>,
365}
366
367impl KernelIpc {
368    /// Create a new KernelIpc wrapping the given MessageBus.
369    pub fn new(bus: Arc<MessageBus>) -> Self {
370        Self { bus }
371    }
372
373    /// Get a reference to the underlying MessageBus.
374    pub fn bus(&self) -> &Arc<MessageBus> {
375        &self.bus
376    }
377
378    /// Send a kernel message with RBAC enforcement and chain logging.
379    ///
380    /// 1. If the target is `Process(to_pid)`, checks IPC capability
381    ///    via the `CapabilityChecker`.
382    /// 2. Logs the send event to the chain (if provided).
383    /// 3. Publishes via the bus.
384    #[cfg(feature = "exochain")]
385    pub fn send_checked(
386        &self,
387        msg: &KernelMessage,
388        checker: &crate::capability::CapabilityChecker,
389        chain: Option<&crate::chain::ChainManager>,
390    ) -> Result<(), KernelError> {
391        // 1. Check IPC capability
392        if let MessageTarget::Process(to_pid) = &msg.target {
393            checker.check_ipc_target(msg.from, *to_pid)?;
394        }
395
396        // 2. Log to chain
397        if let Some(cm) = chain {
398            cm.append(
399                "ipc",
400                "ipc.send",
401                Some(serde_json::json!({
402                    "from": msg.from,
403                    "target": format!("{:?}", msg.target),
404                    "payload_type": msg.payload.type_name(),
405                    "msg_id": msg.id,
406                })),
407            );
408        }
409
410        // 3. Send via bus
411        self.send(msg)
412    }
413
414    /// Send a kernel message.
415    ///
416    /// Currently serializes the message to JSON and publishes it
417    /// as an inbound message on the bus. Future versions (K2) will
418    /// implement PID-based routing and topic subscriptions.
419    pub fn send(&self, msg: &KernelMessage) -> Result<(), KernelError> {
420        debug!(
421            id = %msg.id,
422            from = msg.from,
423            "sending kernel message"
424        );
425
426        let json = serde_json::to_string(msg)
427            .map_err(|e| KernelError::Ipc(format!("failed to serialize message: {e}")))?;
428
429        // For now, publish as an inbound message. The A2A routing (K2)
430        // will replace this with proper PID-based delivery.
431        let inbound = clawft_types::event::InboundMessage {
432            channel: "kernel-ipc".to_owned(),
433            sender_id: format!("pid-{}", msg.from),
434            chat_id: msg.id.clone(),
435            content: json,
436            timestamp: msg.timestamp,
437            media: vec![],
438            metadata: std::collections::HashMap::new(),
439        };
440
441        self.bus
442            .publish_inbound(inbound)
443            .map_err(|e| KernelError::Ipc(format!("bus publish failed: {e}")))
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    #[test]
452    fn kernel_message_text() {
453        let msg = KernelMessage::text(0, MessageTarget::Process(1), "hello");
454        assert_eq!(msg.from, 0);
455        assert!(matches!(msg.target, MessageTarget::Process(1)));
456        assert!(matches!(msg.payload, MessagePayload::Text(ref t) if t == "hello"));
457    }
458
459    #[test]
460    fn kernel_message_signal() {
461        let msg = KernelMessage::signal(0, MessageTarget::Broadcast, KernelSignal::Shutdown);
462        assert!(matches!(msg.target, MessageTarget::Broadcast));
463        assert!(matches!(
464            msg.payload,
465            MessagePayload::Signal(KernelSignal::Shutdown)
466        ));
467    }
468
469    #[test]
470    fn kernel_message_json_payload() {
471        let payload = MessagePayload::Json(serde_json::json!({"key": "value"}));
472        let msg = KernelMessage::new(1, MessageTarget::Kernel, payload);
473        assert!(matches!(msg.payload, MessagePayload::Json(_)));
474    }
475
476    #[test]
477    fn message_serde_roundtrip() {
478        let msg = KernelMessage::text(5, MessageTarget::Service("health".into()), "check");
479        let json = serde_json::to_string(&msg).unwrap();
480        let restored: KernelMessage = serde_json::from_str(&json).unwrap();
481        assert_eq!(restored.id, msg.id);
482        assert_eq!(restored.from, 5);
483    }
484
485    #[tokio::test]
486    async fn ipc_send() {
487        let bus = Arc::new(MessageBus::new());
488        let ipc = KernelIpc::new(bus.clone());
489
490        let msg = KernelMessage::text(0, MessageTarget::Process(1), "test");
491        ipc.send(&msg).unwrap();
492
493        // Should be consumable from the bus
494        let received = bus.consume_inbound().await.unwrap();
495        assert_eq!(received.channel, "kernel-ipc");
496        assert_eq!(received.sender_id, "pid-0");
497    }
498
499    #[test]
500    fn ipc_bus_ref() {
501        let bus = Arc::new(MessageBus::new());
502        let ipc = KernelIpc::new(bus.clone());
503        assert!(Arc::ptr_eq(ipc.bus(), &bus));
504    }
505
506    #[test]
507    fn message_target_variants() {
508        let targets = vec![
509            MessageTarget::Process(1),
510            MessageTarget::Broadcast,
511            MessageTarget::Service("test".into()),
512            MessageTarget::ServiceMethod {
513                service: "auth".into(),
514                method: "validate_token".into(),
515            },
516            MessageTarget::Kernel,
517        ];
518        for target in targets {
519            let json = serde_json::to_string(&target).unwrap();
520            let _: MessageTarget = serde_json::from_str(&json).unwrap();
521        }
522    }
523
524    #[test]
525    fn kernel_signal_variants() {
526        let signals = vec![
527            KernelSignal::Shutdown,
528            KernelSignal::Suspend,
529            KernelSignal::Resume,
530            KernelSignal::Ping,
531            KernelSignal::Pong,
532        ];
533        for signal in signals {
534            let json = serde_json::to_string(&signal).unwrap();
535            let _: KernelSignal = serde_json::from_str(&json).unwrap();
536        }
537    }
538
539    #[test]
540    fn expanded_signal_variants_serde() {
541        let signals = vec![
542            KernelSignal::ReloadConfig,
543            KernelSignal::DumpState,
544            KernelSignal::UserDefined(42),
545            KernelSignal::Kill,
546            KernelSignal::LinkExit {
547                pid: 7,
548                reason: ExitReason::Crash("boom".into()),
549            },
550            KernelSignal::MonitorDown(ProcessDown {
551                pid: 9,
552                reason: ExitReason::Normal,
553            }),
554            KernelSignal::ResourceWarning {
555                resource: "memory".into(),
556                current: 800,
557                limit: 1000,
558            },
559        ];
560        for signal in signals {
561            let json = serde_json::to_string(&signal).unwrap();
562            let restored: KernelSignal = serde_json::from_str(&json).unwrap();
563            // Verify roundtrip by re-serializing
564            let json2 = serde_json::to_string(&restored).unwrap();
565            assert_eq!(json, json2);
566        }
567    }
568
569    #[test]
570    fn exit_reason_variants() {
571        let reasons = vec![
572            ExitReason::Normal,
573            ExitReason::Crash("error".into()),
574            ExitReason::Killed,
575            ExitReason::Timeout,
576        ];
577        for reason in reasons {
578            let json = serde_json::to_string(&reason).unwrap();
579            let _: ExitReason = serde_json::from_str(&json).unwrap();
580        }
581    }
582
583    #[test]
584    fn process_down_serde() {
585        let pd = ProcessDown {
586            pid: 42,
587            reason: ExitReason::Crash("segfault".into()),
588        };
589        let json = serde_json::to_string(&pd).unwrap();
590        let restored: ProcessDown = serde_json::from_str(&json).unwrap();
591        assert_eq!(restored.pid, 42);
592        assert!(matches!(restored.reason, ExitReason::Crash(ref s) if s == "segfault"));
593    }
594
595    #[test]
596    fn message_with_correlation_id() {
597        let msg = KernelMessage::with_correlation(
598            1,
599            MessageTarget::Process(2),
600            MessagePayload::Text("request".into()),
601            "req-123".into(),
602        );
603        assert_eq!(msg.correlation_id, Some("req-123".into()));
604        assert_eq!(msg.from, 1);
605    }
606
607    #[test]
608    fn message_without_correlation_id() {
609        let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
610        assert!(msg.correlation_id.is_none());
611    }
612
613    #[test]
614    fn tool_call_message() {
615        let msg = KernelMessage::tool_call(
616            1,
617            MessageTarget::Process(2),
618            "read_file",
619            serde_json::json!({"path": "/src/main.rs"}),
620        );
621        match &msg.payload {
622            MessagePayload::ToolCall { name, args } => {
623                assert_eq!(name, "read_file");
624                assert_eq!(args["path"], "/src/main.rs");
625            }
626            other => panic!("expected ToolCall, got: {other:?}"),
627        }
628    }
629
630    #[test]
631    fn tool_result_message() {
632        let msg = KernelMessage::tool_result(
633            2,
634            MessageTarget::Process(1),
635            "call-123",
636            serde_json::json!({"content": "file contents"}),
637        );
638        match &msg.payload {
639            MessagePayload::ToolResult { call_id, result } => {
640                assert_eq!(call_id, "call-123");
641                assert_eq!(result["content"], "file contents");
642            }
643            other => panic!("expected ToolResult, got: {other:?}"),
644        }
645    }
646
647    #[test]
648    fn topic_target() {
649        let msg = KernelMessage::text(1, MessageTarget::Topic("build-status".into()), "done");
650        assert!(matches!(msg.target, MessageTarget::Topic(ref t) if t == "build-status"));
651    }
652
653    #[test]
654    fn tool_call_serde_roundtrip() {
655        let msg = KernelMessage::tool_call(
656            1,
657            MessageTarget::Process(2),
658            "search",
659            serde_json::json!({"query": "test"}),
660        );
661        let json = serde_json::to_string(&msg).unwrap();
662        let restored: KernelMessage = serde_json::from_str(&json).unwrap();
663        assert!(matches!(
664            restored.payload,
665            MessagePayload::ToolCall { ref name, .. } if name == "search"
666        ));
667    }
668
669    #[test]
670    fn correlation_id_serde_roundtrip() {
671        let msg = KernelMessage::with_correlation(
672            1,
673            MessageTarget::Process(2),
674            MessagePayload::Text("req".into()),
675            "corr-456".into(),
676        );
677        let json = serde_json::to_string(&msg).unwrap();
678        let restored: KernelMessage = serde_json::from_str(&json).unwrap();
679        assert_eq!(restored.correlation_id, Some("corr-456".into()));
680    }
681
682    #[test]
683    fn rvf_payload_variant() {
684        let payload = MessagePayload::Rvf {
685            segment_type: 0x40,
686            data: vec![0xCA, 0xFE],
687        };
688        let msg = KernelMessage::new(1, MessageTarget::Process(2), payload);
689        assert_eq!(msg.payload.type_name(), "rvf");
690
691        // serde roundtrip
692        let json = serde_json::to_string(&msg).unwrap();
693        let restored: KernelMessage = serde_json::from_str(&json).unwrap();
694        match &restored.payload {
695            MessagePayload::Rvf { segment_type, data } => {
696                assert_eq!(*segment_type, 0x40);
697                assert_eq!(data, &[0xCA, 0xFE]);
698            }
699            other => panic!("expected Rvf, got: {other:?}"),
700        }
701    }
702
703    #[test]
704    fn payload_type_names() {
705        assert_eq!(MessagePayload::Text("hi".into()).type_name(), "text");
706        assert_eq!(
707            MessagePayload::Json(serde_json::json!(1)).type_name(),
708            "json"
709        );
710        assert_eq!(
711            MessagePayload::Signal(KernelSignal::Ping).type_name(),
712            "signal"
713        );
714    }
715
716    #[test]
717    fn remote_node_serde_roundtrip() {
718        let target = MessageTarget::RemoteNode {
719            node_id: "node-42".into(),
720            target: Box::new(MessageTarget::Process(7)),
721        };
722        let json = serde_json::to_string(&target).unwrap();
723        let restored: MessageTarget = serde_json::from_str(&json).unwrap();
724        match restored {
725            MessageTarget::RemoteNode { node_id, target } => {
726                assert_eq!(node_id, "node-42");
727                assert!(matches!(*target, MessageTarget::Process(7)));
728            }
729            other => panic!("expected RemoteNode, got: {other:?}"),
730        }
731    }
732
733    #[test]
734    fn global_pid_equality() {
735        let a = GlobalPid::local(1, "node-a");
736        let b = GlobalPid::local(1, "node-b");
737        let c = GlobalPid::local(1, "node-a");
738        assert_ne!(a, b, "same pid on different nodes must not be equal");
739        assert_eq!(a, c, "same pid on same node must be equal");
740    }
741
742    #[test]
743    fn global_pid_is_local() {
744        let gpid = GlobalPid::local(5, "my-node");
745        assert!(gpid.is_local("my-node"));
746        assert!(!gpid.is_local("other-node"));
747    }
748
749    #[test]
750    fn global_pid_display() {
751        let gpid = GlobalPid::local(42, "alpha");
752        assert_eq!(gpid.to_string(), "alpha:42");
753    }
754
755    #[test]
756    fn correlation_id_absent_in_json_when_none() {
757        let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
758        let json = serde_json::to_string(&msg).unwrap();
759        assert!(!json.contains("correlation_id"));
760    }
761
762    // ── K2-G4: Trace ID tests ──────────────────────────────────
763
764    #[test]
765    fn trace_id_absent_by_default() {
766        let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
767        assert!(msg.trace_id.is_none());
768    }
769
770    #[test]
771    fn trace_id_with_trace() {
772        let msg = KernelMessage::with_trace(
773            1,
774            MessageTarget::Process(2),
775            MessagePayload::Text("traced".into()),
776            "trace-abc-123".into(),
777        );
778        assert_eq!(msg.trace_id, Some("trace-abc-123".into()));
779    }
780
781    #[test]
782    fn trace_id_set_builder() {
783        let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello")
784            .set_trace_id("my-trace");
785        assert_eq!(msg.trace_id, Some("my-trace".into()));
786    }
787
788    #[test]
789    fn trace_id_ensure_generates() {
790        let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello").ensure_trace_id();
791        assert!(msg.trace_id.is_some());
792        assert!(!msg.trace_id.as_ref().unwrap().is_empty());
793    }
794
795    #[test]
796    fn trace_id_ensure_preserves_existing() {
797        let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello")
798            .set_trace_id("existing-trace")
799            .ensure_trace_id();
800        assert_eq!(msg.trace_id, Some("existing-trace".into()));
801    }
802
803    #[test]
804    fn trace_id_serde_roundtrip() {
805        let msg = KernelMessage::with_trace(
806            1,
807            MessageTarget::Process(2),
808            MessagePayload::Text("traced".into()),
809            "trace-roundtrip".into(),
810        );
811        let json = serde_json::to_string(&msg).unwrap();
812        let restored: KernelMessage = serde_json::from_str(&json).unwrap();
813        assert_eq!(restored.trace_id, Some("trace-roundtrip".into()));
814    }
815
816    #[test]
817    fn trace_id_absent_in_json_when_none() {
818        let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
819        let json = serde_json::to_string(&msg).unwrap();
820        assert!(!json.contains("trace_id"));
821    }
822
823    #[test]
824    fn trace_id_backward_compat_deserialization() {
825        // Simulate a message serialized without trace_id field
826        let json = r#"{"id":"test-id","from":1,"target":{"Process":2},"payload":{"Text":"hello"},"timestamp":"2024-01-01T00:00:00Z"}"#;
827        let msg: KernelMessage = serde_json::from_str(json).unwrap();
828        assert!(msg.trace_id.is_none());
829        assert!(msg.correlation_id.is_none());
830    }
831}