Skip to main content

kaizen/
ipc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Daemon IPC protocol. JSON control frames now; payload marker leaves room for Arrow IPC batches.
3
4use crate::core::event::{Event, SessionRecord};
5use crate::store::{SessionFilter, SessionPage, SpanNode};
6use serde::{Deserialize, Serialize};
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
8
9pub const PROTO_VERSION: u32 = 1;
10
11#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
12#[serde(rename_all = "snake_case")]
13pub enum ClientKind {
14    Tui,
15    Cli,
16    Mcp,
17}
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct ClientHello {
21    pub proto_version: u32,
22    pub client: ClientKind,
23    pub workspace: Option<String>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ServerHello {
28    pub proto_version: u32,
29    pub daemon_version: String,
30    pub workspaces: Vec<String>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct DaemonStatus {
35    pub pid: u32,
36    pub uptime_ms: u64,
37    pub queue_depth: usize,
38    pub last_error: Option<String>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SessionDetail {
43    pub session: Option<SessionRecord>,
44    pub events: Vec<Event>,
45    pub spans: Vec<SpanNode>,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "type", rename_all = "snake_case")]
50pub enum DaemonRequest {
51    Hello(ClientHello),
52    Status,
53    Stop,
54    ListSessions {
55        workspace: String,
56        offset: usize,
57        limit: usize,
58        filter: SessionFilter,
59    },
60    GetSessionDetail {
61        id: String,
62        workspace: Option<String>,
63    },
64    IngestHook {
65        source: crate::shell::ingest::IngestSource,
66        payload: String,
67        workspace: Option<String>,
68    },
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72#[serde(tag = "type", rename_all = "snake_case")]
73pub enum DaemonResponse {
74    Hello(ServerHello),
75    Status(DaemonStatus),
76    Sessions(SessionPage),
77    Detail(Box<SessionDetail>),
78    Ack {
79        message: String,
80    },
81    Error {
82        message: String,
83        supported_min: Option<u32>,
84        supported_max: Option<u32>,
85    },
86}
87
88pub async fn read_frame<T, R>(reader: &mut R) -> anyhow::Result<T>
89where
90    T: for<'de> Deserialize<'de>,
91    R: AsyncRead + Unpin,
92{
93    let len = reader.read_u32().await? as usize;
94    let mut buf = vec![0_u8; len];
95    reader.read_exact(&mut buf).await?;
96    Ok(serde_json::from_slice(&buf)?)
97}
98
99pub async fn write_frame<T, W>(writer: &mut W, value: &T) -> anyhow::Result<()>
100where
101    T: Serialize,
102    W: AsyncWrite + Unpin,
103{
104    let buf = serde_json::to_vec(value)?;
105    writer.write_u32(buf.len() as u32).await?;
106    writer.write_all(&buf).await?;
107    writer.flush().await?;
108    Ok(())
109}