Skip to main content

lilo_rm_core/
proto.rs

1use std::io::{BufRead, Write};
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
8use uuid::Uuid;
9
10use crate::{
11    ErrorCode, KillByPidRequest, KillByPidResponse, KillRequest, LaunchSpec, Lifecycle,
12    McpBridgeRequest, McpBridgeResponse, NudgeRequest, NudgeResponse, ProtocolError, RuntimeEvent,
13    ShimExit, ShimLaunchRequest, ShimReady, SpawnRequest, StatusFilter, ValidateTargetRequest,
14    ValidateTargetResponse, WatcherCounts,
15};
16
17pub type EventCursor = u64;
18
19pub const EVENT_LOG_RETENTION_MIN_AGE_SECS: u64 = 7 * 24 * 60 * 60;
20pub const EVENT_LOG_RETENTION_MIN_EVENTS: usize = 10_000;
21/// Maximum single Events long poll wait window.
22///
23/// Requests above this ceiling are clamped rather than rejected.
24pub const EVENT_WAIT_MAX_MS: u32 = 60_000;
25
26#[derive(Clone, Copy, Debug, Default, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
27pub struct EventsRequest {
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub since: Option<EventCursor>,
30    /// Optional long poll window in milliseconds. `None` and `0` return immediately.
31    #[serde(default, skip_serializing_if = "Option::is_none")]
32    pub wait_ms: Option<u32>,
33}
34
35pub const fn clamped_event_wait_ms(wait_ms: Option<u32>) -> u32 {
36    match wait_ms {
37        Some(value) if value < EVENT_WAIT_MAX_MS => value,
38        Some(_) => EVENT_WAIT_MAX_MS,
39        None => 0,
40    }
41}
42
43#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
44pub struct StatusRequest {
45    pub session_id: Option<Uuid>,
46    #[serde(default, skip_serializing_if = "Vec::is_empty")]
47    pub session_ids: Vec<Uuid>,
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub updated_since: Option<DateTime<Utc>>,
50    #[serde(default)]
51    pub runtime: Option<String>,
52    #[serde(default)]
53    pub state: Option<String>,
54}
55
56impl From<StatusRequest> for StatusFilter {
57    fn from(request: StatusRequest) -> Self {
58        Self {
59            session_id: request.session_id,
60            session_ids: request.session_ids,
61            updated_since: request.updated_since,
62            runtime: request.runtime,
63            state: request.state,
64        }
65    }
66}
67
68impl From<StatusFilter> for StatusRequest {
69    fn from(filter: StatusFilter) -> Self {
70        Self {
71            session_id: filter.session_id,
72            session_ids: filter.session_ids,
73            updated_since: filter.updated_since,
74            runtime: filter.runtime,
75            state: filter.state,
76        }
77    }
78}
79
80#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
81#[non_exhaustive]
82#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
83pub enum RuntimeRpc {
84    Spawn {
85        request: SpawnRequest,
86    },
87    ValidateTarget {
88        request: ValidateTargetRequest,
89    },
90    Kill {
91        request: KillRequest,
92    },
93    KillByPid {
94        request: KillByPidRequest,
95    },
96    Nudge {
97        request: NudgeRequest,
98    },
99    Capture {
100        request: crate::CaptureRequest,
101    },
102    Status {
103        request: StatusRequest,
104    },
105    Version,
106    Watchers,
107    Doctor,
108    Events {
109        #[serde(default, flatten)]
110        request: EventsRequest,
111    },
112    Stop,
113    McpBridge {
114        request: McpBridgeRequest,
115    },
116    ShimLaunch {
117        request: ShimLaunchRequest,
118    },
119    ShimReady {
120        ready: ShimReady,
121    },
122    ShimExit {
123        exit: ShimExit,
124    },
125}
126
127#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
128pub struct SpawnedPayload {
129    pub lifecycle: Lifecycle,
130    pub event: RuntimeEvent,
131    pub log_dir: Option<PathBuf>,
132    pub stdout_path: Option<PathBuf>,
133    pub stderr_path: Option<PathBuf>,
134}
135
136#[derive(Clone, Copy, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
137#[serde(rename_all = "snake_case")]
138pub enum SpawnConflictKind {
139    SessionId,
140    TmuxPaneOccupancy,
141}
142
143#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
144pub struct SpawnConflictPayload {
145    pub kind: SpawnConflictKind,
146    pub lifecycle: Lifecycle,
147}
148
149#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
150pub struct ValidateTargetPayload {
151    pub response: ValidateTargetResponse,
152}
153
154#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
155pub struct StatusPayload {
156    pub lifecycles: Vec<Lifecycle>,
157}
158
159#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
160pub struct KillByPidPayload {
161    pub response: KillByPidResponse,
162}
163
164/// Session kill result.
165///
166/// `Signalled` means rtmd delivered the requested signal to the runtime
167/// process. `AlreadyExited` means the process naturally exited before the
168/// signal landed, so the kill request was a successful no-op.
169#[derive(Clone, Copy, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
170pub struct KilledPayload {
171    pub outcome: crate::KillOutcome,
172}
173
174#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
175pub struct NudgePayload {
176    pub response: NudgeResponse,
177}
178
179#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
180pub struct CapturePayload {
181    pub response: crate::CaptureResponse,
182}
183
184#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
185pub struct VersionPayload {
186    pub version: crate::VersionInfo,
187}
188
189#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
190pub struct WatchersPayload {
191    pub watchers: WatcherCounts,
192}
193
194#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
195pub struct DoctorPayload {
196    pub doctor: crate::DoctorResponse,
197}
198
199#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
200pub struct EventsPayload {
201    pub events: Vec<RuntimeEvent>,
202    pub cursor: EventCursor,
203}
204
205#[derive(Clone, Copy, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
206pub struct CursorExpiredPayload {
207    pub oldest: EventCursor,
208}
209
210#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
211pub struct McpBridgePayload {
212    pub response: McpBridgeResponse,
213}
214
215#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
216pub struct ShimLaunchPayload {
217    pub launch: LaunchSpec,
218}
219
220#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
221pub struct ErrorPayload {
222    pub code: ErrorCode,
223    pub message: String,
224}
225
226#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
227#[non_exhaustive]
228pub enum EventBatch {
229    Events {
230        events: Vec<RuntimeEvent>,
231        cursor: EventCursor,
232    },
233    /// The watcher's cursor has been advanced to `oldest`. Calling `.next()` again will return events from `oldest` onward without an intervening reconcile. The caller is expected to perform a `client.status()` reconcile and optionally `.seek()` to a freshly-discovered cursor before continuing, otherwise events will replay.
234    CursorExpired { oldest: EventCursor },
235}
236
237#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
238#[non_exhaustive]
239#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
240pub enum RuntimeResponse {
241    Spawned(SpawnedPayload),
242    SpawnConflict(SpawnConflictPayload),
243    ValidateTarget(ValidateTargetPayload),
244    Status(StatusPayload),
245    Killed(KilledPayload),
246    KillByPid(KillByPidPayload),
247    Nudge(NudgePayload),
248    Capture(CapturePayload),
249    Version(VersionPayload),
250    Watchers(WatchersPayload),
251    Doctor(DoctorPayload),
252    /// Events in daemon append order.
253    Events(EventsPayload),
254    CursorExpired(CursorExpiredPayload),
255    McpBridge(McpBridgePayload),
256    ShimLaunch(ShimLaunchPayload),
257    Ack,
258    Stopping,
259    Error(ErrorPayload),
260}
261
262impl RuntimeResponse {
263    pub fn error(code: ErrorCode, message: impl Into<String>) -> Self {
264        Self::Error(ErrorPayload {
265            code,
266            message: message.into(),
267        })
268    }
269}
270
271pub async fn read_json_line<R, T>(reader: &mut R) -> Result<T, ProtocolError>
272where
273    R: AsyncBufRead + Unpin,
274    T: DeserializeOwned,
275{
276    let mut line = String::new();
277    let read = reader.read_line(&mut line).await?;
278    if read == 0 {
279        return Err(ProtocolError::Eof);
280    }
281    parse_json_line(&line)
282}
283
284pub async fn write_json_line<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
285where
286    W: AsyncWrite + Unpin,
287    T: Serialize,
288{
289    let bytes = json_line_bytes(message)?;
290    writer.write_all(&bytes).await?;
291    writer.flush().await?;
292    Ok(())
293}
294
295pub fn read_json_line_blocking<R, T>(reader: &mut R) -> Result<T, ProtocolError>
296where
297    R: BufRead,
298    T: DeserializeOwned,
299{
300    let mut line = String::new();
301    let read = reader.read_line(&mut line)?;
302    if read == 0 {
303        return Err(ProtocolError::Eof);
304    }
305    parse_json_line(&line)
306}
307
308pub fn write_json_line_blocking<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
309where
310    W: Write,
311    T: Serialize,
312{
313    let bytes = json_line_bytes(message)?;
314    writer.write_all(&bytes)?;
315    writer.flush()?;
316    Ok(())
317}
318
319fn parse_json_line<T>(line: &str) -> Result<T, ProtocolError>
320where
321    T: DeserializeOwned,
322{
323    Ok(serde_json::from_str(line.trim_end())?)
324}
325
326fn json_line_bytes<T>(message: &T) -> Result<Vec<u8>, ProtocolError>
327where
328    T: Serialize,
329{
330    let mut bytes = serde_json::to_vec(message)?;
331    bytes.push(b'\n');
332    Ok(bytes)
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn clamped_event_wait_ms_applies_ceiling_and_default() {
341        assert_eq!(clamped_event_wait_ms(None), 0);
342        assert_eq!(clamped_event_wait_ms(Some(500)), 500);
343        assert_eq!(
344            clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS)),
345            EVENT_WAIT_MAX_MS
346        );
347        assert_eq!(
348            clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS + 1)),
349            EVENT_WAIT_MAX_MS
350        );
351    }
352}