Skip to main content

kaizen/
ipc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Daemon IPC protocol. JSON control frames now; payload marker leaves room for Arrow IPC batches.
3
4use crate::core::event::{Event, SessionRecord};
5use crate::store::{SessionFilter, SessionPage, SpanNode};
6use serde::{Deserialize, Serialize};
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
8
9pub const PROTO_VERSION: u32 = 1;
10
11#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
12#[serde(rename_all = "snake_case")]
13pub enum ClientKind {
14    Tui,
15    Cli,
16    Mcp,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClientHello {
21    pub proto_version: u32,
22    pub client: ClientKind,
23    pub workspace: Option<String>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ServerHello {
28    pub proto_version: u32,
29    pub daemon_version: String,
30    pub workspaces: Vec<String>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct DaemonStatus {
35    pub pid: u32,
36    pub uptime_ms: u64,
37    pub queue_depth: usize,
38    pub last_error: Option<String>,
39    #[serde(default)]
40    pub capture: Vec<CaptureStatus>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub web: Option<WebEndpoint>,
43}
44
45#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
46#[serde(rename_all = "snake_case")]
47pub enum CaptureComponentStatus {
48    Ready,
49    Partial,
50    Unsupported,
51    Error,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
55pub struct CaptureComponent {
56    pub name: String,
57    pub status: CaptureComponentStatus,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub detail: Option<String>,
60}
61
62#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
63pub struct ProxyEndpoint {
64    pub provider: String,
65    pub listen: String,
66    pub base_url: String,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub v1_base_url: Option<String>,
69}
70
71#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
72pub struct CaptureStatus {
73    pub workspace: String,
74    pub deep: bool,
75    pub hooks: Vec<CaptureComponent>,
76    pub watchers: Vec<CaptureComponent>,
77    pub proxies: Vec<ProxyEndpoint>,
78    pub errors: Vec<String>,
79}
80
81#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
82pub struct ObservedSession {
83    pub session: String,
84    pub proxies: Vec<ProxyEndpoint>,
85}
86
87#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
88pub struct WebEndpoint {
89    pub listen: String,
90    pub url: String,
91    pub token: String,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct SessionDetail {
96    pub session: Option<SessionRecord>,
97    pub events: Vec<Event>,
98    pub spans: Vec<SpanNode>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102#[serde(tag = "type", rename_all = "snake_case")]
103pub enum DaemonRequest {
104    Hello(ClientHello),
105    Status,
106    Stop,
107    ListSessions {
108        workspace: String,
109        offset: usize,
110        limit: usize,
111        filter: SessionFilter,
112    },
113    GetSessionDetail {
114        id: String,
115        workspace: Option<String>,
116    },
117    IngestHook {
118        source: crate::shell::ingest::IngestSource,
119        payload: String,
120        workspace: Option<String>,
121    },
122    EnsureWorkspaceCapture {
123        workspace: String,
124        deep: bool,
125    },
126    EnsureProxy {
127        workspace: String,
128        provider: String,
129    },
130    BeginObservedSession {
131        workspace: String,
132        agent: String,
133    },
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137#[serde(tag = "type", rename_all = "snake_case")]
138pub enum DaemonResponse {
139    Hello(ServerHello),
140    Status(DaemonStatus),
141    Sessions(SessionPage),
142    Detail(Box<SessionDetail>),
143    Ack {
144        message: String,
145    },
146    CaptureStatus(Box<CaptureStatus>),
147    ProxyEndpoint(ProxyEndpoint),
148    ObservedSession(ObservedSession),
149    Error {
150        message: String,
151        supported_min: Option<u32>,
152        supported_max: Option<u32>,
153    },
154}
155
156pub async fn read_frame<T, R>(reader: &mut R) -> anyhow::Result<T>
157where
158    T: for<'de> Deserialize<'de>,
159    R: AsyncRead + Unpin,
160{
161    let len = reader.read_u32().await? as usize;
162    let mut buf = vec![0_u8; len];
163    reader.read_exact(&mut buf).await?;
164    Ok(serde_json::from_slice(&buf)?)
165}
166
167pub async fn write_frame<T, W>(writer: &mut W, value: &T) -> anyhow::Result<()>
168where
169    T: Serialize,
170    W: AsyncWrite + Unpin,
171{
172    let buf = serde_json::to_vec(value)?;
173    writer.write_u32(buf.len() as u32).await?;
174    writer.write_all(&buf).await?;
175    writer.flush().await?;
176    Ok(())
177}