Skip to main content

kaizen/
ipc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Daemon IPC protocol. JSON control frames now; payload marker leaves room for Arrow IPC batches.
3
4use crate::core::event::{Event, SessionRecord};
5use crate::store::{SessionFilter, SessionPage, SpanNode};
6use serde::{Deserialize, Serialize};
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
8
9pub const PROTO_VERSION: u32 = 1;
10
11#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
12#[serde(rename_all = "snake_case")]
13pub enum ClientKind {
14    Tui,
15    Cli,
16    Mcp,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClientHello {
21    pub proto_version: u32,
22    pub client: ClientKind,
23    pub workspace: Option<String>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ServerHello {
28    pub proto_version: u32,
29    pub daemon_version: String,
30    pub workspaces: Vec<String>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct DaemonStatus {
35    pub pid: u32,
36    pub uptime_ms: u64,
37    pub queue_depth: usize,
38    pub last_error: Option<String>,
39    #[serde(default)]
40    pub capture: Vec<CaptureStatus>,
41}
42
43#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum CaptureComponentStatus {
46    Ready,
47    Partial,
48    Unsupported,
49    Error,
50}
51
52#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
53pub struct CaptureComponent {
54    pub name: String,
55    pub status: CaptureComponentStatus,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub detail: Option<String>,
58}
59
60#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
61pub struct ProxyEndpoint {
62    pub provider: String,
63    pub listen: String,
64    pub base_url: String,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub v1_base_url: Option<String>,
67}
68
69#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
70pub struct CaptureStatus {
71    pub workspace: String,
72    pub deep: bool,
73    pub hooks: Vec<CaptureComponent>,
74    pub watchers: Vec<CaptureComponent>,
75    pub proxies: Vec<ProxyEndpoint>,
76    pub errors: Vec<String>,
77}
78
79#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
80pub struct ObservedSession {
81    pub session: String,
82    pub proxies: Vec<ProxyEndpoint>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct SessionDetail {
87    pub session: Option<SessionRecord>,
88    pub events: Vec<Event>,
89    pub spans: Vec<SpanNode>,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93#[serde(tag = "type", rename_all = "snake_case")]
94pub enum DaemonRequest {
95    Hello(ClientHello),
96    Status,
97    Stop,
98    ListSessions {
99        workspace: String,
100        offset: usize,
101        limit: usize,
102        filter: SessionFilter,
103    },
104    GetSessionDetail {
105        id: String,
106        workspace: Option<String>,
107    },
108    IngestHook {
109        source: crate::shell::ingest::IngestSource,
110        payload: String,
111        workspace: Option<String>,
112    },
113    EnsureWorkspaceCapture {
114        workspace: String,
115        deep: bool,
116    },
117    EnsureProxy {
118        workspace: String,
119        provider: String,
120    },
121    BeginObservedSession {
122        workspace: String,
123        agent: String,
124    },
125}
126
127#[derive(Debug, Clone, Serialize, Deserialize)]
128#[serde(tag = "type", rename_all = "snake_case")]
129pub enum DaemonResponse {
130    Hello(ServerHello),
131    Status(DaemonStatus),
132    Sessions(SessionPage),
133    Detail(Box<SessionDetail>),
134    Ack {
135        message: String,
136    },
137    CaptureStatus(Box<CaptureStatus>),
138    ProxyEndpoint(ProxyEndpoint),
139    ObservedSession(ObservedSession),
140    Error {
141        message: String,
142        supported_min: Option<u32>,
143        supported_max: Option<u32>,
144    },
145}
146
147pub async fn read_frame<T, R>(reader: &mut R) -> anyhow::Result<T>
148where
149    T: for<'de> Deserialize<'de>,
150    R: AsyncRead + Unpin,
151{
152    let len = reader.read_u32().await? as usize;
153    let mut buf = vec![0_u8; len];
154    reader.read_exact(&mut buf).await?;
155    Ok(serde_json::from_slice(&buf)?)
156}
157
158pub async fn write_frame<T, W>(writer: &mut W, value: &T) -> anyhow::Result<()>
159where
160    T: Serialize,
161    W: AsyncWrite + Unpin,
162{
163    let buf = serde_json::to_vec(value)?;
164    writer.write_u32(buf.len() as u32).await?;
165    writer.write_all(&buf).await?;
166    writer.flush().await?;
167    Ok(())
168}