1use std::io::{BufRead, Write};
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
8use uuid::Uuid;
9
10use crate::{
11 ErrorCode, KillByPidRequest, KillByPidResponse, KillRequest, LaunchSpec, Lifecycle,
12 McpBridgeRequest, McpBridgeResponse, NudgeRequest, NudgeResponse, ProtocolError, RuntimeEvent,
13 ShimExit, ShimLaunchRequest, ShimReady, SpawnRequest, StatusFilter, ValidateTargetRequest,
14 ValidateTargetResponse, WatcherCounts,
15};
16
17pub type EventCursor = u64;
18
19pub const EVENT_LOG_RETENTION_MIN_AGE_SECS: u64 = 7 * 24 * 60 * 60;
20pub const EVENT_LOG_RETENTION_MIN_EVENTS: usize = 10_000;
21pub const EVENT_WAIT_MAX_MS: u32 = 60_000;
25
26#[derive(Clone, Copy, Debug, Default, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
27pub struct EventsRequest {
28 #[serde(default, skip_serializing_if = "Option::is_none")]
29 pub since: Option<EventCursor>,
30 #[serde(default, skip_serializing_if = "Option::is_none")]
32 pub wait_ms: Option<u32>,
33}
34
35pub const fn clamped_event_wait_ms(wait_ms: Option<u32>) -> u32 {
36 match wait_ms {
37 Some(value) if value < EVENT_WAIT_MAX_MS => value,
38 Some(_) => EVENT_WAIT_MAX_MS,
39 None => 0,
40 }
41}
42
43#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
44pub struct StatusRequest {
45 pub session_id: Option<Uuid>,
46 #[serde(default, skip_serializing_if = "Vec::is_empty")]
47 pub session_ids: Vec<Uuid>,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub updated_since: Option<DateTime<Utc>>,
50 #[serde(default)]
51 pub runtime: Option<String>,
52 #[serde(default)]
53 pub state: Option<String>,
54}
55
56impl From<StatusRequest> for StatusFilter {
57 fn from(request: StatusRequest) -> Self {
58 Self {
59 session_id: request.session_id,
60 session_ids: request.session_ids,
61 updated_since: request.updated_since,
62 runtime: request.runtime,
63 state: request.state,
64 }
65 }
66}
67
68impl From<StatusFilter> for StatusRequest {
69 fn from(filter: StatusFilter) -> Self {
70 Self {
71 session_id: filter.session_id,
72 session_ids: filter.session_ids,
73 updated_since: filter.updated_since,
74 runtime: filter.runtime,
75 state: filter.state,
76 }
77 }
78}
79
80#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
81#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
82pub enum RuntimeRpc {
83 Spawn {
84 request: SpawnRequest,
85 },
86 ValidateTarget {
87 request: ValidateTargetRequest,
88 },
89 Kill {
90 request: KillRequest,
91 },
92 KillByPid {
93 request: KillByPidRequest,
94 },
95 Nudge {
96 request: NudgeRequest,
97 },
98 Capture {
99 request: crate::CaptureRequest,
100 },
101 Status {
102 request: StatusRequest,
103 },
104 Version,
105 Watchers,
106 Doctor,
107 Events {
108 #[serde(default, flatten)]
109 request: EventsRequest,
110 },
111 Stop,
112 McpBridge {
113 request: McpBridgeRequest,
114 },
115 ShimLaunch {
116 request: ShimLaunchRequest,
117 },
118 ShimReady {
119 ready: ShimReady,
120 },
121 ShimExit {
122 exit: ShimExit,
123 },
124}
125
126#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
127#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
128pub enum RuntimeResponse {
129 Spawned {
130 lifecycle: Lifecycle,
131 event: RuntimeEvent,
132 log_dir: Option<PathBuf>,
133 stdout_path: Option<PathBuf>,
134 stderr_path: Option<PathBuf>,
135 },
136 ValidateTarget {
137 response: ValidateTargetResponse,
138 },
139 Status {
140 lifecycles: Vec<Lifecycle>,
141 },
142 KillByPid {
143 response: KillByPidResponse,
144 },
145 Nudge {
146 response: NudgeResponse,
147 },
148 Capture {
149 response: crate::CaptureResponse,
150 },
151 Version {
152 version: crate::VersionInfo,
153 },
154 Watchers {
155 watchers: WatcherCounts,
156 },
157 Doctor {
158 doctor: crate::DoctorResponse,
159 },
160 Events {
162 events: Vec<RuntimeEvent>,
163 cursor: EventCursor,
164 },
165 CursorExpired {
166 oldest: EventCursor,
167 },
168 McpBridge {
169 response: McpBridgeResponse,
170 },
171 ShimLaunch {
172 launch: LaunchSpec,
173 },
174 Ack,
175 Stopping,
176 Error {
177 code: ErrorCode,
178 message: String,
179 },
180}
181
182impl RuntimeResponse {
183 pub fn error(code: ErrorCode, message: impl Into<String>) -> Self {
184 Self::Error {
185 code,
186 message: message.into(),
187 }
188 }
189}
190
191pub async fn read_json_line<R, T>(reader: &mut R) -> Result<T, ProtocolError>
192where
193 R: AsyncBufRead + Unpin,
194 T: DeserializeOwned,
195{
196 let mut line = String::new();
197 let read = reader.read_line(&mut line).await?;
198 if read == 0 {
199 return Err(ProtocolError::Eof);
200 }
201 parse_json_line(&line)
202}
203
204pub async fn write_json_line<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
205where
206 W: AsyncWrite + Unpin,
207 T: Serialize,
208{
209 let bytes = json_line_bytes(message)?;
210 writer.write_all(&bytes).await?;
211 writer.flush().await?;
212 Ok(())
213}
214
215pub fn read_json_line_blocking<R, T>(reader: &mut R) -> Result<T, ProtocolError>
216where
217 R: BufRead,
218 T: DeserializeOwned,
219{
220 let mut line = String::new();
221 let read = reader.read_line(&mut line)?;
222 if read == 0 {
223 return Err(ProtocolError::Eof);
224 }
225 parse_json_line(&line)
226}
227
228pub fn write_json_line_blocking<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
229where
230 W: Write,
231 T: Serialize,
232{
233 let bytes = json_line_bytes(message)?;
234 writer.write_all(&bytes)?;
235 writer.flush()?;
236 Ok(())
237}
238
239fn parse_json_line<T>(line: &str) -> Result<T, ProtocolError>
240where
241 T: DeserializeOwned,
242{
243 Ok(serde_json::from_str(line.trim_end())?)
244}
245
246fn json_line_bytes<T>(message: &T) -> Result<Vec<u8>, ProtocolError>
247where
248 T: Serialize,
249{
250 let mut bytes = serde_json::to_vec(message)?;
251 bytes.push(b'\n');
252 Ok(bytes)
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn clamped_event_wait_ms_applies_ceiling_and_default() {
261 assert_eq!(clamped_event_wait_ms(None), 0);
262 assert_eq!(clamped_event_wait_ms(Some(500)), 500);
263 assert_eq!(
264 clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS)),
265 EVENT_WAIT_MAX_MS
266 );
267 assert_eq!(
268 clamped_event_wait_ms(Some(EVENT_WAIT_MAX_MS + 1)),
269 EVENT_WAIT_MAX_MS
270 );
271 }
272}