Skip to main content

lilo_rm_core/
proto.rs

1use std::io::{BufRead, Write};
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
8use uuid::Uuid;
9
10use crate::{
11    ErrorCode, KillByPidRequest, KillByPidResponse, KillRequest, LaunchSpec, Lifecycle,
12    McpBridgeRequest, McpBridgeResponse, NudgeRequest, NudgeResponse, ProtocolError, RuntimeEvent,
13    ShimExit, ShimLaunchRequest, ShimReady, SpawnRequest, StatusFilter, ValidateTargetRequest,
14    ValidateTargetResponse, WatcherCounts,
15};
16
17#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
18pub struct StatusRequest {
19    pub session_id: Option<Uuid>,
20    #[serde(default, skip_serializing_if = "Vec::is_empty")]
21    pub session_ids: Vec<Uuid>,
22    #[serde(default, skip_serializing_if = "Option::is_none")]
23    pub updated_since: Option<DateTime<Utc>>,
24    #[serde(default)]
25    pub runtime: Option<String>,
26    #[serde(default)]
27    pub state: Option<String>,
28}
29
30impl From<StatusRequest> for StatusFilter {
31    fn from(request: StatusRequest) -> Self {
32        Self {
33            session_id: request.session_id,
34            session_ids: request.session_ids,
35            updated_since: request.updated_since,
36            runtime: request.runtime,
37            state: request.state,
38        }
39    }
40}
41
42impl From<StatusFilter> for StatusRequest {
43    fn from(filter: StatusFilter) -> Self {
44        Self {
45            session_id: filter.session_id,
46            session_ids: filter.session_ids,
47            updated_since: filter.updated_since,
48            runtime: filter.runtime,
49            state: filter.state,
50        }
51    }
52}
53
54#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
55#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
56pub enum RuntimeRpc {
57    Spawn {
58        request: SpawnRequest,
59    },
60    ValidateTarget {
61        request: ValidateTargetRequest,
62    },
63    Kill {
64        request: KillRequest,
65    },
66    KillByPid {
67        request: KillByPidRequest,
68    },
69    Nudge {
70        request: NudgeRequest,
71    },
72    Status {
73        request: StatusRequest,
74    },
75    Version,
76    Watchers,
77    Doctor,
78    /// Return the current daemon process event vector.
79    ///
80    /// v0.2 has no cursor parameter. Clients poll this request, filter by
81    /// session, and dedupe by session id plus full event content. Cursor based
82    /// `Events { since } -> { cursor, events }` support is deferred to v0.3.
83    Events,
84    Stop,
85    McpBridge {
86        request: McpBridgeRequest,
87    },
88    ShimLaunch {
89        request: ShimLaunchRequest,
90    },
91    ShimReady {
92        ready: ShimReady,
93    },
94    ShimExit {
95        exit: ShimExit,
96    },
97}
98
99#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
100#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
101pub enum RuntimeResponse {
102    Spawned {
103        lifecycle: Lifecycle,
104        event: RuntimeEvent,
105        log_dir: Option<PathBuf>,
106        stdout_path: Option<PathBuf>,
107        stderr_path: Option<PathBuf>,
108    },
109    ValidateTarget {
110        response: ValidateTargetResponse,
111    },
112    Status {
113        lifecycles: Vec<Lifecycle>,
114    },
115    KillByPid {
116        response: KillByPidResponse,
117    },
118    Nudge {
119        response: NudgeResponse,
120    },
121    Version {
122        version: crate::VersionInfo,
123    },
124    Watchers {
125        watchers: WatcherCounts,
126    },
127    Doctor {
128        doctor: crate::DoctorResponse,
129    },
130    /// Events in daemon append order.
131    ///
132    /// The vector is retained only in current rtmd process memory. It has no
133    /// cursor, retention window, sqlite replay, or limit policy in v0.2.
134    Events {
135        events: Vec<RuntimeEvent>,
136    },
137    McpBridge {
138        response: McpBridgeResponse,
139    },
140    ShimLaunch {
141        launch: LaunchSpec,
142    },
143    Ack,
144    Stopping,
145    Error {
146        code: ErrorCode,
147        message: String,
148    },
149}
150
151impl RuntimeResponse {
152    pub fn error(code: ErrorCode, message: impl Into<String>) -> Self {
153        Self::Error {
154            code,
155            message: message.into(),
156        }
157    }
158}
159
160pub async fn read_json_line<R, T>(reader: &mut R) -> Result<T, ProtocolError>
161where
162    R: AsyncBufRead + Unpin,
163    T: DeserializeOwned,
164{
165    let mut line = String::new();
166    let read = reader.read_line(&mut line).await?;
167    if read == 0 {
168        return Err(ProtocolError::Eof);
169    }
170    parse_json_line(&line)
171}
172
173pub async fn write_json_line<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
174where
175    W: AsyncWrite + Unpin,
176    T: Serialize,
177{
178    let bytes = json_line_bytes(message)?;
179    writer.write_all(&bytes).await?;
180    writer.flush().await?;
181    Ok(())
182}
183
184pub fn read_json_line_blocking<R, T>(reader: &mut R) -> Result<T, ProtocolError>
185where
186    R: BufRead,
187    T: DeserializeOwned,
188{
189    let mut line = String::new();
190    let read = reader.read_line(&mut line)?;
191    if read == 0 {
192        return Err(ProtocolError::Eof);
193    }
194    parse_json_line(&line)
195}
196
197pub fn write_json_line_blocking<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
198where
199    W: Write,
200    T: Serialize,
201{
202    let bytes = json_line_bytes(message)?;
203    writer.write_all(&bytes)?;
204    writer.flush()?;
205    Ok(())
206}
207
208fn parse_json_line<T>(line: &str) -> Result<T, ProtocolError>
209where
210    T: DeserializeOwned,
211{
212    Ok(serde_json::from_str(line.trim_end())?)
213}
214
215fn json_line_bytes<T>(message: &T) -> Result<Vec<u8>, ProtocolError>
216where
217    T: Serialize,
218{
219    let mut bytes = serde_json::to_vec(message)?;
220    bytes.push(b'\n');
221    Ok(bytes)
222}