1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use tracing::debug;
13
14use clawft_core::bus::MessageBus;
15
16use crate::error::KernelError;
17use crate::process::Pid;
18
19static IPC_MSG_COUNTER: AtomicU64 = AtomicU64::new(1);
26
27#[inline]
31fn next_ipc_msg_id() -> String {
32 let id = IPC_MSG_COUNTER.fetch_add(1, Ordering::Relaxed);
33 format!("ipc-{id}")
34}
35
36#[non_exhaustive]
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub enum MessageTarget {
40 Process(Pid),
42 Topic(String),
44 Broadcast,
46 Service(String),
48 ServiceMethod {
53 service: String,
55 method: String,
57 },
58 Kernel,
60 RemoteNode {
63 node_id: String,
65 target: Box<MessageTarget>,
67 },
68}
69
70#[non_exhaustive]
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub enum MessagePayload {
74 Text(String),
76 Json(serde_json::Value),
78 ToolCall {
80 name: String,
82 args: serde_json::Value,
84 },
85 ToolResult {
87 call_id: String,
89 result: serde_json::Value,
91 },
92 Signal(KernelSignal),
94 Rvf {
100 segment_type: u8,
102 data: Vec<u8>,
104 },
105}
106
107impl MessagePayload {
108 pub fn type_name(&self) -> &'static str {
110 match self {
111 MessagePayload::Text(_) => "text",
112 MessagePayload::Json(_) => "json",
113 MessagePayload::ToolCall { .. } => "tool_call",
114 MessagePayload::ToolResult { .. } => "tool_result",
115 MessagePayload::Signal(_) => "signal",
116 MessagePayload::Rvf { .. } => "rvf",
117 }
118 }
119}
120
121#[non_exhaustive]
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub enum ExitReason {
125 Normal,
127 Crash(String),
129 Killed,
131 Timeout,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ProcessDown {
138 pub pid: crate::process::Pid,
140 pub reason: ExitReason,
142}
143
144#[non_exhaustive]
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub enum KernelSignal {
148 Shutdown,
150 Suspend,
152 Resume,
154 Ping,
156 Pong,
158 ReloadConfig,
160 DumpState,
162 UserDefined(u8),
164 Kill,
166 LinkExit {
168 pid: crate::process::Pid,
170 reason: ExitReason,
172 },
173 MonitorDown(ProcessDown),
175 ResourceWarning {
177 resource: String,
179 current: u64,
181 limit: u64,
183 },
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct KernelMessage {
189 pub id: String,
191 pub from: Pid,
193 pub target: MessageTarget,
195 pub payload: MessagePayload,
197 pub timestamp: DateTime<Utc>,
199 #[serde(default, skip_serializing_if = "Option::is_none")]
205 pub correlation_id: Option<String>,
206 #[serde(default, skip_serializing_if = "Option::is_none")]
212 pub trace_id: Option<String>,
213}
214
215impl KernelMessage {
216 pub fn new(from: Pid, target: MessageTarget, payload: MessagePayload) -> Self {
221 Self {
222 id: next_ipc_msg_id(),
223 from,
224 target,
225 payload,
226 timestamp: Utc::now(),
227 correlation_id: None,
228 trace_id: None,
229 }
230 }
231
232 pub fn with_correlation(
234 from: Pid,
235 target: MessageTarget,
236 payload: MessagePayload,
237 correlation_id: String,
238 ) -> Self {
239 Self {
240 id: next_ipc_msg_id(),
241 from,
242 target,
243 payload,
244 timestamp: Utc::now(),
245 correlation_id: Some(correlation_id),
246 trace_id: None,
247 }
248 }
249
250 pub fn with_trace(
252 from: Pid,
253 target: MessageTarget,
254 payload: MessagePayload,
255 trace_id: String,
256 ) -> Self {
257 Self {
258 id: next_ipc_msg_id(),
259 from,
260 target,
261 payload,
262 timestamp: Utc::now(),
263 correlation_id: None,
264 trace_id: Some(trace_id),
265 }
266 }
267
268 pub fn set_trace_id(mut self, trace_id: impl Into<String>) -> Self {
270 self.trace_id = Some(trace_id.into());
271 self
272 }
273
274 pub fn ensure_trace_id(mut self) -> Self {
276 if self.trace_id.is_none() {
277 self.trace_id = Some(uuid::Uuid::new_v4().to_string());
278 }
279 self
280 }
281
282 pub fn text(from: Pid, target: MessageTarget, text: impl Into<String>) -> Self {
284 Self::new(from, target, MessagePayload::Text(text.into()))
285 }
286
287 pub fn signal(from: Pid, target: MessageTarget, signal: KernelSignal) -> Self {
289 Self::new(from, target, MessagePayload::Signal(signal))
290 }
291
292 pub fn tool_call(
294 from: Pid,
295 target: MessageTarget,
296 name: impl Into<String>,
297 args: serde_json::Value,
298 ) -> Self {
299 Self::new(
300 from,
301 target,
302 MessagePayload::ToolCall {
303 name: name.into(),
304 args,
305 },
306 )
307 }
308
309 pub fn tool_result(
311 from: Pid,
312 target: MessageTarget,
313 call_id: impl Into<String>,
314 result: serde_json::Value,
315 ) -> Self {
316 Self::new(
317 from,
318 target,
319 MessagePayload::ToolResult {
320 call_id: call_id.into(),
321 result,
322 },
323 )
324 }
325}
326
327#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
331pub struct GlobalPid {
332 pub node_id: String,
334 pub pid: Pid,
336}
337
338impl GlobalPid {
339 pub fn local(pid: Pid, node_id: &str) -> Self {
341 Self {
342 node_id: node_id.to_string(),
343 pid,
344 }
345 }
346
347 pub fn is_local(&self, my_node_id: &str) -> bool {
349 self.node_id == my_node_id
350 }
351}
352
353impl std::fmt::Display for GlobalPid {
354 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
355 write!(f, "{}:{}", self.node_id, self.pid)
356 }
357}
358
359pub struct KernelIpc {
364 bus: Arc<MessageBus>,
365}
366
367impl KernelIpc {
368 pub fn new(bus: Arc<MessageBus>) -> Self {
370 Self { bus }
371 }
372
373 pub fn bus(&self) -> &Arc<MessageBus> {
375 &self.bus
376 }
377
378 #[cfg(feature = "exochain")]
385 pub fn send_checked(
386 &self,
387 msg: &KernelMessage,
388 checker: &crate::capability::CapabilityChecker,
389 chain: Option<&crate::chain::ChainManager>,
390 ) -> Result<(), KernelError> {
391 if let MessageTarget::Process(to_pid) = &msg.target {
393 checker.check_ipc_target(msg.from, *to_pid)?;
394 }
395
396 if let Some(cm) = chain {
398 cm.append(
399 "ipc",
400 "ipc.send",
401 Some(serde_json::json!({
402 "from": msg.from,
403 "target": format!("{:?}", msg.target),
404 "payload_type": msg.payload.type_name(),
405 "msg_id": msg.id,
406 })),
407 );
408 }
409
410 self.send(msg)
412 }
413
414 pub fn send(&self, msg: &KernelMessage) -> Result<(), KernelError> {
420 debug!(
421 id = %msg.id,
422 from = msg.from,
423 "sending kernel message"
424 );
425
426 let json = serde_json::to_string(msg)
427 .map_err(|e| KernelError::Ipc(format!("failed to serialize message: {e}")))?;
428
429 let inbound = clawft_types::event::InboundMessage {
432 channel: "kernel-ipc".to_owned(),
433 sender_id: format!("pid-{}", msg.from),
434 chat_id: msg.id.clone(),
435 content: json,
436 timestamp: msg.timestamp,
437 media: vec![],
438 metadata: std::collections::HashMap::new(),
439 };
440
441 self.bus
442 .publish_inbound(inbound)
443 .map_err(|e| KernelError::Ipc(format!("bus publish failed: {e}")))
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 #[test]
452 fn kernel_message_text() {
453 let msg = KernelMessage::text(0, MessageTarget::Process(1), "hello");
454 assert_eq!(msg.from, 0);
455 assert!(matches!(msg.target, MessageTarget::Process(1)));
456 assert!(matches!(msg.payload, MessagePayload::Text(ref t) if t == "hello"));
457 }
458
459 #[test]
460 fn kernel_message_signal() {
461 let msg = KernelMessage::signal(0, MessageTarget::Broadcast, KernelSignal::Shutdown);
462 assert!(matches!(msg.target, MessageTarget::Broadcast));
463 assert!(matches!(
464 msg.payload,
465 MessagePayload::Signal(KernelSignal::Shutdown)
466 ));
467 }
468
469 #[test]
470 fn kernel_message_json_payload() {
471 let payload = MessagePayload::Json(serde_json::json!({"key": "value"}));
472 let msg = KernelMessage::new(1, MessageTarget::Kernel, payload);
473 assert!(matches!(msg.payload, MessagePayload::Json(_)));
474 }
475
476 #[test]
477 fn message_serde_roundtrip() {
478 let msg = KernelMessage::text(5, MessageTarget::Service("health".into()), "check");
479 let json = serde_json::to_string(&msg).unwrap();
480 let restored: KernelMessage = serde_json::from_str(&json).unwrap();
481 assert_eq!(restored.id, msg.id);
482 assert_eq!(restored.from, 5);
483 }
484
485 #[tokio::test]
486 async fn ipc_send() {
487 let bus = Arc::new(MessageBus::new());
488 let ipc = KernelIpc::new(bus.clone());
489
490 let msg = KernelMessage::text(0, MessageTarget::Process(1), "test");
491 ipc.send(&msg).unwrap();
492
493 let received = bus.consume_inbound().await.unwrap();
495 assert_eq!(received.channel, "kernel-ipc");
496 assert_eq!(received.sender_id, "pid-0");
497 }
498
499 #[test]
500 fn ipc_bus_ref() {
501 let bus = Arc::new(MessageBus::new());
502 let ipc = KernelIpc::new(bus.clone());
503 assert!(Arc::ptr_eq(ipc.bus(), &bus));
504 }
505
506 #[test]
507 fn message_target_variants() {
508 let targets = vec![
509 MessageTarget::Process(1),
510 MessageTarget::Broadcast,
511 MessageTarget::Service("test".into()),
512 MessageTarget::ServiceMethod {
513 service: "auth".into(),
514 method: "validate_token".into(),
515 },
516 MessageTarget::Kernel,
517 ];
518 for target in targets {
519 let json = serde_json::to_string(&target).unwrap();
520 let _: MessageTarget = serde_json::from_str(&json).unwrap();
521 }
522 }
523
524 #[test]
525 fn kernel_signal_variants() {
526 let signals = vec![
527 KernelSignal::Shutdown,
528 KernelSignal::Suspend,
529 KernelSignal::Resume,
530 KernelSignal::Ping,
531 KernelSignal::Pong,
532 ];
533 for signal in signals {
534 let json = serde_json::to_string(&signal).unwrap();
535 let _: KernelSignal = serde_json::from_str(&json).unwrap();
536 }
537 }
538
539 #[test]
540 fn expanded_signal_variants_serde() {
541 let signals = vec![
542 KernelSignal::ReloadConfig,
543 KernelSignal::DumpState,
544 KernelSignal::UserDefined(42),
545 KernelSignal::Kill,
546 KernelSignal::LinkExit {
547 pid: 7,
548 reason: ExitReason::Crash("boom".into()),
549 },
550 KernelSignal::MonitorDown(ProcessDown {
551 pid: 9,
552 reason: ExitReason::Normal,
553 }),
554 KernelSignal::ResourceWarning {
555 resource: "memory".into(),
556 current: 800,
557 limit: 1000,
558 },
559 ];
560 for signal in signals {
561 let json = serde_json::to_string(&signal).unwrap();
562 let restored: KernelSignal = serde_json::from_str(&json).unwrap();
563 let json2 = serde_json::to_string(&restored).unwrap();
565 assert_eq!(json, json2);
566 }
567 }
568
569 #[test]
570 fn exit_reason_variants() {
571 let reasons = vec![
572 ExitReason::Normal,
573 ExitReason::Crash("error".into()),
574 ExitReason::Killed,
575 ExitReason::Timeout,
576 ];
577 for reason in reasons {
578 let json = serde_json::to_string(&reason).unwrap();
579 let _: ExitReason = serde_json::from_str(&json).unwrap();
580 }
581 }
582
583 #[test]
584 fn process_down_serde() {
585 let pd = ProcessDown {
586 pid: 42,
587 reason: ExitReason::Crash("segfault".into()),
588 };
589 let json = serde_json::to_string(&pd).unwrap();
590 let restored: ProcessDown = serde_json::from_str(&json).unwrap();
591 assert_eq!(restored.pid, 42);
592 assert!(matches!(restored.reason, ExitReason::Crash(ref s) if s == "segfault"));
593 }
594
595 #[test]
596 fn message_with_correlation_id() {
597 let msg = KernelMessage::with_correlation(
598 1,
599 MessageTarget::Process(2),
600 MessagePayload::Text("request".into()),
601 "req-123".into(),
602 );
603 assert_eq!(msg.correlation_id, Some("req-123".into()));
604 assert_eq!(msg.from, 1);
605 }
606
607 #[test]
608 fn message_without_correlation_id() {
609 let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
610 assert!(msg.correlation_id.is_none());
611 }
612
613 #[test]
614 fn tool_call_message() {
615 let msg = KernelMessage::tool_call(
616 1,
617 MessageTarget::Process(2),
618 "read_file",
619 serde_json::json!({"path": "/src/main.rs"}),
620 );
621 match &msg.payload {
622 MessagePayload::ToolCall { name, args } => {
623 assert_eq!(name, "read_file");
624 assert_eq!(args["path"], "/src/main.rs");
625 }
626 other => panic!("expected ToolCall, got: {other:?}"),
627 }
628 }
629
630 #[test]
631 fn tool_result_message() {
632 let msg = KernelMessage::tool_result(
633 2,
634 MessageTarget::Process(1),
635 "call-123",
636 serde_json::json!({"content": "file contents"}),
637 );
638 match &msg.payload {
639 MessagePayload::ToolResult { call_id, result } => {
640 assert_eq!(call_id, "call-123");
641 assert_eq!(result["content"], "file contents");
642 }
643 other => panic!("expected ToolResult, got: {other:?}"),
644 }
645 }
646
647 #[test]
648 fn topic_target() {
649 let msg = KernelMessage::text(1, MessageTarget::Topic("build-status".into()), "done");
650 assert!(matches!(msg.target, MessageTarget::Topic(ref t) if t == "build-status"));
651 }
652
653 #[test]
654 fn tool_call_serde_roundtrip() {
655 let msg = KernelMessage::tool_call(
656 1,
657 MessageTarget::Process(2),
658 "search",
659 serde_json::json!({"query": "test"}),
660 );
661 let json = serde_json::to_string(&msg).unwrap();
662 let restored: KernelMessage = serde_json::from_str(&json).unwrap();
663 assert!(matches!(
664 restored.payload,
665 MessagePayload::ToolCall { ref name, .. } if name == "search"
666 ));
667 }
668
669 #[test]
670 fn correlation_id_serde_roundtrip() {
671 let msg = KernelMessage::with_correlation(
672 1,
673 MessageTarget::Process(2),
674 MessagePayload::Text("req".into()),
675 "corr-456".into(),
676 );
677 let json = serde_json::to_string(&msg).unwrap();
678 let restored: KernelMessage = serde_json::from_str(&json).unwrap();
679 assert_eq!(restored.correlation_id, Some("corr-456".into()));
680 }
681
682 #[test]
683 fn rvf_payload_variant() {
684 let payload = MessagePayload::Rvf {
685 segment_type: 0x40,
686 data: vec![0xCA, 0xFE],
687 };
688 let msg = KernelMessage::new(1, MessageTarget::Process(2), payload);
689 assert_eq!(msg.payload.type_name(), "rvf");
690
691 let json = serde_json::to_string(&msg).unwrap();
693 let restored: KernelMessage = serde_json::from_str(&json).unwrap();
694 match &restored.payload {
695 MessagePayload::Rvf { segment_type, data } => {
696 assert_eq!(*segment_type, 0x40);
697 assert_eq!(data, &[0xCA, 0xFE]);
698 }
699 other => panic!("expected Rvf, got: {other:?}"),
700 }
701 }
702
703 #[test]
704 fn payload_type_names() {
705 assert_eq!(MessagePayload::Text("hi".into()).type_name(), "text");
706 assert_eq!(
707 MessagePayload::Json(serde_json::json!(1)).type_name(),
708 "json"
709 );
710 assert_eq!(
711 MessagePayload::Signal(KernelSignal::Ping).type_name(),
712 "signal"
713 );
714 }
715
716 #[test]
717 fn remote_node_serde_roundtrip() {
718 let target = MessageTarget::RemoteNode {
719 node_id: "node-42".into(),
720 target: Box::new(MessageTarget::Process(7)),
721 };
722 let json = serde_json::to_string(&target).unwrap();
723 let restored: MessageTarget = serde_json::from_str(&json).unwrap();
724 match restored {
725 MessageTarget::RemoteNode { node_id, target } => {
726 assert_eq!(node_id, "node-42");
727 assert!(matches!(*target, MessageTarget::Process(7)));
728 }
729 other => panic!("expected RemoteNode, got: {other:?}"),
730 }
731 }
732
733 #[test]
734 fn global_pid_equality() {
735 let a = GlobalPid::local(1, "node-a");
736 let b = GlobalPid::local(1, "node-b");
737 let c = GlobalPid::local(1, "node-a");
738 assert_ne!(a, b, "same pid on different nodes must not be equal");
739 assert_eq!(a, c, "same pid on same node must be equal");
740 }
741
742 #[test]
743 fn global_pid_is_local() {
744 let gpid = GlobalPid::local(5, "my-node");
745 assert!(gpid.is_local("my-node"));
746 assert!(!gpid.is_local("other-node"));
747 }
748
749 #[test]
750 fn global_pid_display() {
751 let gpid = GlobalPid::local(42, "alpha");
752 assert_eq!(gpid.to_string(), "alpha:42");
753 }
754
755 #[test]
756 fn correlation_id_absent_in_json_when_none() {
757 let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
758 let json = serde_json::to_string(&msg).unwrap();
759 assert!(!json.contains("correlation_id"));
760 }
761
762 #[test]
765 fn trace_id_absent_by_default() {
766 let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
767 assert!(msg.trace_id.is_none());
768 }
769
770 #[test]
771 fn trace_id_with_trace() {
772 let msg = KernelMessage::with_trace(
773 1,
774 MessageTarget::Process(2),
775 MessagePayload::Text("traced".into()),
776 "trace-abc-123".into(),
777 );
778 assert_eq!(msg.trace_id, Some("trace-abc-123".into()));
779 }
780
781 #[test]
782 fn trace_id_set_builder() {
783 let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello")
784 .set_trace_id("my-trace");
785 assert_eq!(msg.trace_id, Some("my-trace".into()));
786 }
787
788 #[test]
789 fn trace_id_ensure_generates() {
790 let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello").ensure_trace_id();
791 assert!(msg.trace_id.is_some());
792 assert!(!msg.trace_id.as_ref().unwrap().is_empty());
793 }
794
795 #[test]
796 fn trace_id_ensure_preserves_existing() {
797 let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello")
798 .set_trace_id("existing-trace")
799 .ensure_trace_id();
800 assert_eq!(msg.trace_id, Some("existing-trace".into()));
801 }
802
803 #[test]
804 fn trace_id_serde_roundtrip() {
805 let msg = KernelMessage::with_trace(
806 1,
807 MessageTarget::Process(2),
808 MessagePayload::Text("traced".into()),
809 "trace-roundtrip".into(),
810 );
811 let json = serde_json::to_string(&msg).unwrap();
812 let restored: KernelMessage = serde_json::from_str(&json).unwrap();
813 assert_eq!(restored.trace_id, Some("trace-roundtrip".into()));
814 }
815
816 #[test]
817 fn trace_id_absent_in_json_when_none() {
818 let msg = KernelMessage::text(1, MessageTarget::Process(2), "hello");
819 let json = serde_json::to_string(&msg).unwrap();
820 assert!(!json.contains("trace_id"));
821 }
822
823 #[test]
824 fn trace_id_backward_compat_deserialization() {
825 let json = r#"{"id":"test-id","from":1,"target":{"Process":2},"payload":{"Text":"hello"},"timestamp":"2024-01-01T00:00:00Z"}"#;
827 let msg: KernelMessage = serde_json::from_str(json).unwrap();
828 assert!(msg.trace_id.is_none());
829 assert!(msg.correlation_id.is_none());
830 }
831}