1use std::io::{BufRead, Write};
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
8use uuid::Uuid;
9
10use crate::{
11 ErrorCode, KillByPidRequest, KillByPidResponse, KillRequest, LaunchSpec, Lifecycle,
12 McpBridgeRequest, McpBridgeResponse, NudgeRequest, NudgeResponse, ProtocolError, RuntimeEvent,
13 ShimExit, ShimLaunchRequest, ShimReady, SpawnRequest, StatusFilter, ValidateTargetRequest,
14 ValidateTargetResponse, WatcherCounts,
15};
16
17pub type EventCursor = u64;
18
19pub const EVENT_LOG_RETENTION_MIN_AGE_SECS: u64 = 7 * 24 * 60 * 60;
20pub const EVENT_LOG_RETENTION_MIN_EVENTS: usize = 10_000;
21pub const EVENT_WAIT_MAX_MS: u32 = 60_000;
25
26#[derive(Clone, Copy, Debug, Default, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
27pub struct EventsRequest {
28 #[serde(default, skip_serializing_if = "Option::is_none")]
29 pub since: Option<EventCursor>,
30 #[serde(default, skip_serializing_if = "Option::is_none")]
32 pub wait_ms: Option<u32>,
33}
34
35pub const fn clamped_event_wait_ms(wait_ms: Option<u32>) -> u32 {
36 match wait_ms {
37 Some(value) if value < EVENT_WAIT_MAX_MS => value,
38 Some(_) => EVENT_WAIT_MAX_MS,
39 None => 0,
40 }
41}
42
43#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
44pub struct StatusRequest {
45 pub session_id: Option<Uuid>,
46 #[serde(default, skip_serializing_if = "Vec::is_empty")]
47 pub session_ids: Vec<Uuid>,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub updated_since: Option<DateTime<Utc>>,
50 #[serde(default)]
51 pub runtime: Option<String>,
52 #[serde(default)]
53 pub state: Option<String>,
54}
55
56impl From<StatusRequest> for StatusFilter {
57 fn from(request: StatusRequest) -> Self {
58 Self {
59 session_id: request.session_id,
60 session_ids: request.session_ids,
61 updated_since: request.updated_since,
62 runtime: request.runtime,
63 state: request.state,
64 }
65 }
66}
67
68impl From<StatusFilter> for StatusRequest {
69 fn from(filter: StatusFilter) -> Self {
70 Self {
71 session_id: filter.session_id,
72 session_ids: filter.session_ids,
73 updated_since: filter.updated_since,
74 runtime: filter.runtime,
75 state: filter.state,
76 }
77 }
78}
79
80#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
81#[non_exhaustive]
82#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
83pub enum RuntimeRpc {
84 Spawn {
85 request: SpawnRequest,
86 },
87 ValidateTarget {
88 request: ValidateTargetRequest,
89 },
90 Kill {
91 request: KillRequest,
92 },
93 KillByPid {
94 request: KillByPidRequest,
95 },
96 Nudge {
97 request: NudgeRequest,
98 },
99 Capture {
100 request: crate::CaptureRequest,
101 },
102 Status {
103 request: StatusRequest,
104 },
105 Version,
106 Watchers,
107 Doctor,
108 Events {
109 #[serde(default, flatten)]
110 request: EventsRequest,
111 },
112 Stop,
113 McpBridge {
114 request: McpBridgeRequest,
115 },
116 ShimLaunch {
117 request: ShimLaunchRequest,
118 },
119 ShimReady {
120 ready: ShimReady,
121 },
122 ShimExit {
123 exit: ShimExit,
124 },
125}
126
127#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
128pub struct SpawnedPayload {
129 pub lifecycle: Lifecycle,
130 pub event: RuntimeEvent,
131 pub log_dir: Option<PathBuf>,
132 pub stdout_path: Option<PathBuf>,
133 pub stderr_path: Option<PathBuf>,
134}
135
136#[derive(Clone, Copy, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
137#[serde(rename_all = "snake_case")]
138pub enum SpawnConflictKind {
139 SessionId,
140 TmuxPaneOccupancy,
141}
142
143#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
144pub struct SpawnConflictPayload {
145 pub kind: SpawnConflictKind,
146 pub lifecycle: Lifecycle,
147}
148
149#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
150pub struct ValidateTargetPayload {
151 pub response: ValidateTargetResponse,
152}
153
154#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
155pub struct StatusPayload {
156 pub lifecycles: Vec<Lifecycle>,
157}
158
159#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
160pub struct KillByPidPayload {
161 pub response: KillByPidResponse,
162}
163
164#[derive(Clone, Copy, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
170pub struct KilledPayload {
171 pub outcome: crate::KillOutcome,
172}
173
174#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
175pub struct NudgePayload {
176 pub response: NudgeResponse,
177}
178
179#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
180pub struct CapturePayload {
181 pub response: crate::CaptureResponse,
182}
183
184#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
185pub struct VersionPayload {
186 pub version: crate::VersionInfo,
187}
188
189#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
190pub struct WatchersPayload {
191 pub watchers: WatcherCounts,
192}
193
194#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
195pub struct DoctorPayload {
196 pub doctor: crate::DoctorResponse,
197}
198
199#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
200pub struct EventsPayload {
201 pub events: Vec<RuntimeEvent>,
202 pub cursor: EventCursor,
203}
204
205#[derive(Clone, Copy, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
206pub struct CursorExpiredPayload {
207 pub oldest: EventCursor,
208}
209
210#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
211pub struct McpBridgePayload {
212 pub response: McpBridgeResponse,
213}
214
215#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
216pub struct ShimLaunchPayload {
217 pub launch: LaunchSpec,
218}
219
220#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
221pub struct ErrorPayload {
222 pub code: ErrorCode,
223 pub message: String,
224}
225
226#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
227#[non_exhaustive]
228pub enum EventBatch {
229 Events {
230 events: Vec<RuntimeEvent>,
231 cursor: EventCursor,
232 },
233 CursorExpired { oldest: EventCursor },
235}
236
237#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
238#[non_exhaustive]
239#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
240pub enum RuntimeResponse {
241 Spawned(SpawnedPayload),
242 SpawnConflict(SpawnConflictPayload),
243 ValidateTarget(ValidateTargetPayload),
244 Status(StatusPayload),
245 Killed(KilledPayload),
246 KillByPid(KillByPidPayload),
247 Nudge(NudgePayload),
248 Capture(CapturePayload),
249 Version(VersionPayload),
250 Watchers(WatchersPayload),
251 Doctor(DoctorPayload),
252 Events(EventsPayload),
254 CursorExpired(CursorExpiredPayload),
255 McpBridge(McpBridgePayload),
256 ShimLaunch(ShimLaunchPayload),
257 Ack,
258 Stopping,
259 Error(ErrorPayload),
260}
261
262impl RuntimeResponse {
263 pub fn error(code: ErrorCode, message: impl Into<String>) -> Self {
264 Self::Error(ErrorPayload {
265 code,
266 message: message.into(),
267 })
268 }
269}
270
271pub async fn read_json_line<R, T>(reader: &mut R) -> Result<T, ProtocolError>
272where
273 R: AsyncBufRead + Unpin,
274 T: DeserializeOwned,
275{
276 let mut line = String::new();
277 let read = reader.read_line(&mut line).await?;
278 if read == 0 {
279 return Err(ProtocolError::Eof);
280 }
281 parse_json_line(&line)
282}
283
284pub async fn write_json_line<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
285where
286 W: AsyncWrite + Unpin,
287 T: Serialize,
288{
289 let bytes = json_line_bytes(message)?;
290 writer.write_all(&bytes).await?;
291 writer.flush().await?;
292 Ok(())
293}
294
295pub fn read_json_line_blocking<R, T>(reader: &mut R) -> Result<T, ProtocolError>
296where
297 R: BufRead,
298 T: DeserializeOwned,
299{
300 let mut line = String::new();
301 let read = reader.read_line(&mut line)?;
302 if read == 0 {
303 return Err(ProtocolError::Eof);
304 }
305 parse_json_line(&line)
306}
307
308pub fn write_json_line_blocking<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
309where
310 W: Write,
311 T: Serialize,
312{
313 let bytes = json_line_bytes(message)?;
314 writer.write_all(&bytes)?;
315 writer.flush()?;
316 Ok(())
317}
318
319fn parse_json_line<T>(line: &str) -> Result<T, ProtocolError>
320where
321 T: DeserializeOwned,
322{
323 Ok(serde_json::from_str(line.trim_end())?)
324}
325
326fn json_line_bytes<T>(message: &T) -> Result<Vec<u8>, ProtocolError>
327where
328 T: Serialize,
329{
330 let mut bytes = serde_json::to_vec(message)?;
331 bytes.push(b'\n');
332 Ok(bytes)
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn clamped_event_wait_ms_applies_ceiling_and_default() {
341 assert_eq!(clamped_event_wait_ms(None), 0);
342 assert_eq!(clamped_event_wait_ms(Some(500)), 500);
343 assert_eq!(
344 clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS)),
345 EVENT_WAIT_MAX_MS
346 );
347 assert_eq!(
348 clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS + 1)),
349 EVENT_WAIT_MAX_MS
350 );
351 }
352}