Skip to main content

kaizen/
ipc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Daemon IPC protocol. Length-prefixed JSON keeps local clients simple and portable.
3
4use crate::core::event::{Event, SessionRecord};
5use crate::store::{SessionFilter, SessionPage, SpanNode};
6use serde::{Deserialize, Serialize};
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
8
9pub const PROTO_VERSION: u32 = 1;
10/// Maximum accepted daemon IPC payload size. Larger frames fail before allocation.
11pub const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
12
13#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
14#[serde(rename_all = "snake_case")]
15pub enum ClientKind {
16    Tui,
17    Cli,
18    Mcp,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ClientHello {
23    pub proto_version: u32,
24    pub client: ClientKind,
25    pub workspace: Option<String>,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ServerHello {
30    pub proto_version: u32,
31    pub daemon_version: String,
32    pub workspaces: Vec<String>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct DaemonStatus {
37    pub pid: u32,
38    pub uptime_ms: u64,
39    pub queue_depth: usize,
40    pub last_error: Option<String>,
41    #[serde(default)]
42    pub capture: Vec<CaptureStatus>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub web: Option<WebEndpoint>,
45}
46
47#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum CaptureComponentStatus {
50    Ready,
51    Partial,
52    Unsupported,
53    Error,
54}
55
56#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
57pub struct CaptureComponent {
58    pub name: String,
59    pub status: CaptureComponentStatus,
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub detail: Option<String>,
62}
63
64#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
65pub struct ProxyEndpoint {
66    pub provider: String,
67    pub listen: String,
68    pub base_url: String,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub v1_base_url: Option<String>,
71}
72
73#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
74pub struct CaptureStatus {
75    pub workspace: String,
76    pub deep: bool,
77    pub hooks: Vec<CaptureComponent>,
78    pub watchers: Vec<CaptureComponent>,
79    pub proxies: Vec<ProxyEndpoint>,
80    pub errors: Vec<String>,
81}
82
83#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
84pub struct ObservedSession {
85    pub session: String,
86    pub proxies: Vec<ProxyEndpoint>,
87}
88
89#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
90pub struct WebEndpoint {
91    pub listen: String,
92    pub url: String,
93    pub token: String,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct SessionDetail {
98    pub session: Option<SessionRecord>,
99    pub events: Vec<Event>,
100    pub spans: Vec<SpanNode>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104#[serde(tag = "type", rename_all = "snake_case")]
105pub enum DaemonRequest {
106    Hello(ClientHello),
107    Status,
108    Stop,
109    ListSessions {
110        workspace: String,
111        offset: usize,
112        limit: usize,
113        filter: SessionFilter,
114    },
115    GetSessionDetail {
116        id: String,
117        workspace: Option<String>,
118    },
119    IngestHook {
120        source: crate::shell::ingest::IngestSource,
121        payload: String,
122        workspace: Option<String>,
123    },
124    EnsureWorkspaceCapture {
125        workspace: String,
126        deep: bool,
127    },
128    EnsureProxy {
129        workspace: String,
130        provider: String,
131    },
132    BeginObservedSession {
133        workspace: String,
134        agent: String,
135    },
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139#[serde(tag = "type", rename_all = "snake_case")]
140pub enum DaemonResponse {
141    Hello(ServerHello),
142    Status(DaemonStatus),
143    Sessions(SessionPage),
144    Detail(Box<SessionDetail>),
145    Ack {
146        message: String,
147    },
148    CaptureStatus(Box<CaptureStatus>),
149    ProxyEndpoint(ProxyEndpoint),
150    ObservedSession(ObservedSession),
151    Error {
152        message: String,
153        supported_min: Option<u32>,
154        supported_max: Option<u32>,
155    },
156}
157
158pub async fn read_frame<T, R>(reader: &mut R) -> anyhow::Result<T>
159where
160    T: for<'de> Deserialize<'de>,
161    R: AsyncRead + Unpin,
162{
163    let len = reader.read_u32().await? as usize;
164    anyhow::ensure!(
165        len <= MAX_FRAME_SIZE,
166        "IPC frame length {len} exceeds maximum {MAX_FRAME_SIZE} bytes"
167    );
168    let mut buf = vec![0_u8; len];
169    reader.read_exact(&mut buf).await?;
170    Ok(serde_json::from_slice(&buf)?)
171}
172
173pub async fn write_frame<T, W>(writer: &mut W, value: &T) -> anyhow::Result<()>
174where
175    T: Serialize,
176    W: AsyncWrite + Unpin,
177{
178    let buf = serde_json::to_vec(value)?;
179    writer.write_u32(buf.len() as u32).await?;
180    writer.write_all(&buf).await?;
181    writer.flush().await?;
182    Ok(())
183}