1use std::io::{BufRead, Write};
2use std::path::PathBuf;
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt};
8use uuid::Uuid;
9
10use crate::{
11 ErrorCode, KillByPidRequest, KillByPidResponse, KillRequest, LaunchSpec, Lifecycle,
12 McpBridgeRequest, McpBridgeResponse, NudgeRequest, NudgeResponse, ProtocolError, RuntimeEvent,
13 ShimExit, ShimLaunchRequest, ShimReady, SpawnRequest, StatusFilter, ValidateTargetRequest,
14 ValidateTargetResponse, WatcherCounts,
15};
16
17#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
18pub struct StatusRequest {
19 pub session_id: Option<Uuid>,
20 #[serde(default, skip_serializing_if = "Vec::is_empty")]
21 pub session_ids: Vec<Uuid>,
22 #[serde(default, skip_serializing_if = "Option::is_none")]
23 pub updated_since: Option<DateTime<Utc>>,
24 #[serde(default)]
25 pub runtime: Option<String>,
26 #[serde(default)]
27 pub state: Option<String>,
28}
29
30impl From<StatusRequest> for StatusFilter {
31 fn from(request: StatusRequest) -> Self {
32 Self {
33 session_id: request.session_id,
34 session_ids: request.session_ids,
35 updated_since: request.updated_since,
36 runtime: request.runtime,
37 state: request.state,
38 }
39 }
40}
41
42impl From<StatusFilter> for StatusRequest {
43 fn from(filter: StatusFilter) -> Self {
44 Self {
45 session_id: filter.session_id,
46 session_ids: filter.session_ids,
47 updated_since: filter.updated_since,
48 runtime: filter.runtime,
49 state: filter.state,
50 }
51 }
52}
53
54#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
55#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
56pub enum RuntimeRpc {
57 Spawn {
58 request: SpawnRequest,
59 },
60 ValidateTarget {
61 request: ValidateTargetRequest,
62 },
63 Kill {
64 request: KillRequest,
65 },
66 KillByPid {
67 request: KillByPidRequest,
68 },
69 Nudge {
70 request: NudgeRequest,
71 },
72 Status {
73 request: StatusRequest,
74 },
75 Version,
76 Watchers,
77 Doctor,
78 Events,
84 Stop,
85 McpBridge {
86 request: McpBridgeRequest,
87 },
88 ShimLaunch {
89 request: ShimLaunchRequest,
90 },
91 ShimReady {
92 ready: ShimReady,
93 },
94 ShimExit {
95 exit: ShimExit,
96 },
97}
98
99#[derive(Clone, Debug, serde::Deserialize, Eq, PartialEq, serde::Serialize)]
100#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
101pub enum RuntimeResponse {
102 Spawned {
103 lifecycle: Lifecycle,
104 event: RuntimeEvent,
105 log_dir: Option<PathBuf>,
106 stdout_path: Option<PathBuf>,
107 stderr_path: Option<PathBuf>,
108 },
109 ValidateTarget {
110 response: ValidateTargetResponse,
111 },
112 Status {
113 lifecycles: Vec<Lifecycle>,
114 },
115 KillByPid {
116 response: KillByPidResponse,
117 },
118 Nudge {
119 response: NudgeResponse,
120 },
121 Version {
122 version: crate::VersionInfo,
123 },
124 Watchers {
125 watchers: WatcherCounts,
126 },
127 Doctor {
128 doctor: crate::DoctorResponse,
129 },
130 Events {
135 events: Vec<RuntimeEvent>,
136 },
137 McpBridge {
138 response: McpBridgeResponse,
139 },
140 ShimLaunch {
141 launch: LaunchSpec,
142 },
143 Ack,
144 Stopping,
145 Error {
146 code: ErrorCode,
147 message: String,
148 },
149}
150
151impl RuntimeResponse {
152 pub fn error(code: ErrorCode, message: impl Into<String>) -> Self {
153 Self::Error {
154 code,
155 message: message.into(),
156 }
157 }
158}
159
160pub async fn read_json_line<R, T>(reader: &mut R) -> Result<T, ProtocolError>
161where
162 R: AsyncBufRead + Unpin,
163 T: DeserializeOwned,
164{
165 let mut line = String::new();
166 let read = reader.read_line(&mut line).await?;
167 if read == 0 {
168 return Err(ProtocolError::Eof);
169 }
170 parse_json_line(&line)
171}
172
173pub async fn write_json_line<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
174where
175 W: AsyncWrite + Unpin,
176 T: Serialize,
177{
178 let bytes = json_line_bytes(message)?;
179 writer.write_all(&bytes).await?;
180 writer.flush().await?;
181 Ok(())
182}
183
184pub fn read_json_line_blocking<R, T>(reader: &mut R) -> Result<T, ProtocolError>
185where
186 R: BufRead,
187 T: DeserializeOwned,
188{
189 let mut line = String::new();
190 let read = reader.read_line(&mut line)?;
191 if read == 0 {
192 return Err(ProtocolError::Eof);
193 }
194 parse_json_line(&line)
195}
196
197pub fn write_json_line_blocking<W, T>(writer: &mut W, message: &T) -> Result<(), ProtocolError>
198where
199 W: Write,
200 T: Serialize,
201{
202 let bytes = json_line_bytes(message)?;
203 writer.write_all(&bytes)?;
204 writer.flush()?;
205 Ok(())
206}
207
208fn parse_json_line<T>(line: &str) -> Result<T, ProtocolError>
209where
210 T: DeserializeOwned,
211{
212 Ok(serde_json::from_str(line.trim_end())?)
213}
214
215fn json_line_bytes<T>(message: &T) -> Result<Vec<u8>, ProtocolError>
216where
217 T: Serialize,
218{
219 let mut bytes = serde_json::to_vec(message)?;
220 bytes.push(b'\n');
221 Ok(bytes)
222}