Skip to main content

lilo_rm_core/
proto.rs

1use std::io::{BufRead, Write};
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
8use uuid::Uuid;
9
10use crate::{
11    ErrorCode, KillByPidRequest, KillByPidResponse, KillRequest, LaunchSpec, Lifecycle,
12    McpBridgeRequest, McpBridgeResponse, NudgeRequest, NudgeResponse, ProtocolError, RuntimeEvent,
13    ShimExit, ShimLaunchRequest, ShimReady, SpawnRequest, StatusFilter, ValidateTargetRequest,
14    ValidateTargetResponse, WatcherCounts,
15};
16
17pub type EventCursor = u64;
18
19pub const EVENT_LOG_RETENTION_MIN_AGE_SECS: u64 = 7 * 24 * 60 * 60;
20pub const EVENT_LOG_RETENTION_MIN_EVENTS: usize = 10_000;
21/// Maximum single Events long poll wait window.
22///
23/// Requests above this ceiling are clamped rather than rejected.
24pub const EVENT_WAIT_MAX_MS: u32 = 60_000;
25
26#[derive(Clone, Copy, Debug, Default, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
27pub struct EventsRequest {
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub since: Option<EventCursor>,
30    /// Optional long poll window in milliseconds. `None` and `0` return immediately.
31    #[serde(default, skip_serializing_if = "Option::is_none")]
32    pub wait_ms: Option<u32>,
33}
34
35pub const fn clamped_event_wait_ms(wait_ms: Option<u32>) -> u32 {
36    match wait_ms {
37        Some(value) if value < EVENT_WAIT_MAX_MS => value,
38        Some(_) => EVENT_WAIT_MAX_MS,
39        None => 0,
40    }
41}
42
43#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
44pub struct StatusRequest {
45    pub session_id: Option<Uuid>,
46    #[serde(default, skip_serializing_if = "Vec::is_empty")]
47    pub session_ids: Vec<Uuid>,
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub updated_since: Option<DateTime<Utc>>,
50    #[serde(default)]
51    pub runtime: Option<String>,
52    #[serde(default)]
53    pub state: Option<String>,
54}
55
56impl From<StatusRequest> for StatusFilter {
57    fn from(request: StatusRequest) -> Self {
58        Self {
59            session_id: request.session_id,
60            session_ids: request.session_ids,
61            updated_since: request.updated_since,
62            runtime: request.runtime,
63            state: request.state,
64        }
65    }
66}
67
68impl From<StatusFilter> for StatusRequest {
69    fn from(filter: StatusFilter) -> Self {
70        Self {
71            session_id: filter.session_id,
72            session_ids: filter.session_ids,
73            updated_since: filter.updated_since,
74            runtime: filter.runtime,
75            state: filter.state,
76        }
77    }
78}
79
80#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
81#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
82pub enum RuntimeRpc {
83    Spawn {
84        request: SpawnRequest,
85    },
86    ValidateTarget {
87        request: ValidateTargetRequest,
88    },
89    Kill {
90        request: KillRequest,
91    },
92    KillByPid {
93        request: KillByPidRequest,
94    },
95    Nudge {
96        request: NudgeRequest,
97    },
98    Capture {
99        request: crate::CaptureRequest,
100    },
101    Status {
102        request: StatusRequest,
103    },
104    Version,
105    Watchers,
106    Doctor,
107    Events {
108        #[serde(default, flatten)]
109        request: EventsRequest,
110    },
111    Stop,
112    McpBridge {
113        request: McpBridgeRequest,
114    },
115    ShimLaunch {
116        request: ShimLaunchRequest,
117    },
118    ShimReady {
119        ready: ShimReady,
120    },
121    ShimExit {
122        exit: ShimExit,
123    },
124}
125
126#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
127#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
128pub enum RuntimeResponse {
129    Spawned {
130        lifecycle: Lifecycle,
131        event: RuntimeEvent,
132        log_dir: Option<PathBuf>,
133        stdout_path: Option<PathBuf>,
134        stderr_path: Option<PathBuf>,
135    },
136    ValidateTarget {
137        response: ValidateTargetResponse,
138    },
139    Status {
140        lifecycles: Vec<Lifecycle>,
141    },
142    KillByPid {
143        response: KillByPidResponse,
144    },
145    Nudge {
146        response: NudgeResponse,
147    },
148    Capture {
149        response: crate::CaptureResponse,
150    },
151    Version {
152        version: crate::VersionInfo,
153    },
154    Watchers {
155        watchers: WatcherCounts,
156    },
157    Doctor {
158        doctor: crate::DoctorResponse,
159    },
160    /// Events in daemon append order.
161    Events {
162        events: Vec<RuntimeEvent>,
163        cursor: EventCursor,
164    },
165    CursorExpired {
166        oldest: EventCursor,
167    },
168    McpBridge {
169        response: McpBridgeResponse,
170    },
171    ShimLaunch {
172        launch: LaunchSpec,
173    },
174    Ack,
175    Stopping,
176    Error {
177        code: ErrorCode,
178        message: String,
179    },
180}
181
182impl RuntimeResponse {
183    pub fn error(code: ErrorCode, message: impl Into<String>) -> Self {
184        Self::Error {
185            code,
186            message: message.into(),
187        }
188    }
189}
190
191pub async fn read_json_line<R, T>(reader: &mut R) -> Result<T, ProtocolError>
192where
193    R: AsyncBufRead + Unpin,
194    T: DeserializeOwned,
195{
196    let mut line = String::new();
197    let read = reader.read_line(&mut line).await?;
198    if read == 0 {
199        return Err(ProtocolError::Eof);
200    }
201    parse_json_line(&line)
202}
203
204pub async fn write_json_line<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
205where
206    W: AsyncWrite + Unpin,
207    T: Serialize,
208{
209    let bytes = json_line_bytes(message)?;
210    writer.write_all(&bytes).await?;
211    writer.flush().await?;
212    Ok(())
213}
214
215pub fn read_json_line_blocking<R, T>(reader: &mut R) -> Result<T, ProtocolError>
216where
217    R: BufRead,
218    T: DeserializeOwned,
219{
220    let mut line = String::new();
221    let read = reader.read_line(&mut line)?;
222    if read == 0 {
223        return Err(ProtocolError::Eof);
224    }
225    parse_json_line(&line)
226}
227
228pub fn write_json_line_blocking<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
229where
230    W: Write,
231    T: Serialize,
232{
233    let bytes = json_line_bytes(message)?;
234    writer.write_all(&bytes)?;
235    writer.flush()?;
236    Ok(())
237}
238
239fn parse_json_line<T>(line: &str) -> Result<T, ProtocolError>
240where
241    T: DeserializeOwned,
242{
243    Ok(serde_json::from_str(line.trim_end())?)
244}
245
246fn json_line_bytes<T>(message: &T) -> Result<Vec<u8>, ProtocolError>
247where
248    T: Serialize,
249{
250    let mut bytes = serde_json::to_vec(message)?;
251    bytes.push(b'\n');
252    Ok(bytes)
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn clamped_event_wait_ms_applies_ceiling_and_default() {
261        assert_eq!(clamped_event_wait_ms(None), 0);
262        assert_eq!(clamped_event_wait_ms(Some(500)), 500);
263        assert_eq!(
264            clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS)),
265            EVENT_WAIT_MAX_MS
266        );
267        assert_eq!(
268            clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS + 1)),
269            EVENT_WAIT_MAX_MS
270        );
271    }
272}