1use std::path::{Path, PathBuf};
8
9use lilo_rm_core::{
10 CaptureRequest, CaptureResponse, DoctorPayload, ErrorCode, EventBatch, EventsRequest,
11 KillByPidRequest, KillOutcome, KillRequest, ProtocolError, RUNTIME_PROTOCOL_VERSION,
12 RuntimeResponse, RuntimeRpc, SpawnRequest, SpawnedPayload, StatusFilter, StatusPayload,
13 ValidateTargetRequest, ValidateTargetResponse, VersionPayload, read_json_line, write_json_line,
14};
15use thiserror::Error;
16use tokio::io::BufReader;
17use tokio::net::UnixStream;
18
19mod event_watcher;
20
21pub use event_watcher::{EventWatcher, EventWatcherBuilder};
22
23#[derive(Clone, Debug)]
25pub struct RuntimeClient {
26 socket_path: PathBuf,
27}
28
29impl RuntimeClient {
30 pub fn new(socket_path: impl Into<PathBuf>) -> Self {
32 Self {
33 socket_path: socket_path.into(),
34 }
35 }
36
37 pub fn socket_path(&self) -> &Path {
39 &self.socket_path
40 }
41
42 pub async fn request(&self, rpc: RuntimeRpc) -> Result<RuntimeResponse, ClientError> {
44 request(self.socket_path(), rpc).await
45 }
46
47 pub async fn spawn(&self, request: SpawnRequest) -> Result<SpawnedPayload, ClientError> {
49 match self.request(RuntimeRpc::Spawn { request }).await? {
50 RuntimeResponse::Spawned(payload) => Ok(payload),
51 RuntimeResponse::SpawnConflict(payload) => {
52 Err(ClientError::SpawnConflict(Box::new(payload)))
53 }
54 response => unexpected_response("Spawned", &response),
55 }
56 }
57
58 pub async fn kill(&self, request: KillRequest) -> Result<KillOutcome, ClientError> {
60 match self.request(RuntimeRpc::Kill { request }).await? {
61 RuntimeResponse::Killed(payload) => Ok(payload.outcome),
62 response => unexpected_response("Killed", &response),
63 }
64 }
65
66 pub async fn kill_by_pid(&self, request: KillByPidRequest) -> Result<KillOutcome, ClientError> {
68 match self.request(RuntimeRpc::KillByPid { request }).await? {
69 RuntimeResponse::KillByPid(payload) => Ok(payload.response.outcome),
70 response => unexpected_response("KillByPid", &response),
71 }
72 }
73
74 pub async fn status(&self, filter: StatusFilter) -> Result<StatusPayload, ClientError> {
76 match self
77 .request(RuntimeRpc::Status {
78 request: filter.into(),
79 })
80 .await?
81 {
82 RuntimeResponse::Status(payload) => Ok(payload),
83 response => unexpected_response("Status", &response),
84 }
85 }
86
87 pub async fn nudge(
89 &self,
90 request: lilo_rm_core::NudgeRequest,
91 ) -> Result<lilo_rm_core::NudgeResponse, ClientError> {
92 match self.request(RuntimeRpc::Nudge { request }).await? {
93 RuntimeResponse::Nudge(payload) => Ok(payload.response),
94 response => unexpected_response("Nudge", &response),
95 }
96 }
97
98 pub async fn capture(&self, request: CaptureRequest) -> Result<CaptureResponse, ClientError> {
100 match self.request(RuntimeRpc::Capture { request }).await? {
101 RuntimeResponse::Capture(payload) => Ok(payload.response),
102 response => unexpected_response("Capture", &response),
103 }
104 }
105
106 pub async fn validate_target(
108 &self,
109 target: &str,
110 ) -> Result<ValidateTargetResponse, ClientError> {
111 match self
112 .request(RuntimeRpc::ValidateTarget {
113 request: ValidateTargetRequest {
114 target: target.to_owned(),
115 },
116 })
117 .await?
118 {
119 RuntimeResponse::ValidateTarget(payload) => Ok(payload.response),
120 response => unexpected_response("ValidateTarget", &response),
121 }
122 }
123
124 pub async fn doctor(&self) -> Result<DoctorPayload, ClientError> {
126 match self.request(RuntimeRpc::Doctor).await? {
127 RuntimeResponse::Doctor(payload) => Ok(payload),
128 response => unexpected_response("Doctor", &response),
129 }
130 }
131
132 pub async fn version(&self) -> Result<VersionPayload, ClientError> {
134 match self.request(RuntimeRpc::Version).await? {
135 RuntimeResponse::Version(payload) => Ok(payload),
136 response => unexpected_response("Version", &response),
137 }
138 }
139
140 pub async fn events(&self, request: EventsRequest) -> Result<EventBatch, ClientError> {
142 match self.request(RuntimeRpc::Events { request }).await? {
143 RuntimeResponse::Events(payload) => Ok(EventBatch::Events {
144 events: payload.events,
145 cursor: payload.cursor,
146 }),
147 RuntimeResponse::CursorExpired(payload) => Ok(EventBatch::CursorExpired {
148 oldest: payload.oldest,
149 }),
150 response => unexpected_response("Events or CursorExpired", &response),
151 }
152 }
153
154 async fn check_protocol_version(&self) -> Result<(), ClientError> {
155 let payload = self.version().await?;
156 let got = payload.version.protocol_version;
157 if got == RUNTIME_PROTOCOL_VERSION {
158 Ok(())
159 } else {
160 Err(ClientError::Protocol {
161 source: ProtocolError::UnsupportedVersion {
162 expected: RUNTIME_PROTOCOL_VERSION,
163 got,
164 },
165 })
166 }
167 }
168}
169
170#[derive(Debug, Error)]
172#[non_exhaustive]
173pub enum ClientError {
174 #[error("rtmd unavailable at {socket_path}: {source}")]
176 DaemonUnavailable {
177 socket_path: PathBuf,
179 #[source]
180 source: std::io::Error,
182 },
183 #[error("rtmd protocol error: {source}")]
185 Protocol {
186 #[from]
187 source: ProtocolError,
189 },
190 #[error("rtmd returned {code}: {message}")]
192 ErrorResponse { code: ErrorCode, message: String },
193 #[error("rtmd spawn conflict: {0:?}")]
195 SpawnConflict(Box<lilo_rm_core::SpawnConflictPayload>),
196 #[error("expected {expected} response, got {got}")]
198 UnexpectedResponse {
199 expected: &'static str,
201 got: &'static str,
203 },
204}
205
206impl ClientError {
207 pub const fn code(&self) -> ErrorCode {
209 match self {
210 Self::DaemonUnavailable { .. } => ErrorCode::RuntimeUnavailable,
211 Self::Protocol { .. } => ErrorCode::ProtocolMismatch,
212 Self::ErrorResponse { code, .. } => *code,
213 Self::SpawnConflict(_) => ErrorCode::SpawnConflict,
214 Self::UnexpectedResponse { .. } => ErrorCode::ProtocolMismatch,
215 }
216 }
217}
218
219pub async fn request(
221 socket_path: impl AsRef<Path>,
222 rpc: RuntimeRpc,
223) -> Result<RuntimeResponse, ClientError> {
224 let socket_path = socket_path.as_ref();
225 let stream = UnixStream::connect(socket_path).await.map_err(|source| {
226 ClientError::DaemonUnavailable {
227 socket_path: socket_path.to_path_buf(),
228 source,
229 }
230 })?;
231 request_on_stream(stream, rpc).await
232}
233
234fn unexpected_response<T>(
235 expected: &'static str,
236 response: &RuntimeResponse,
237) -> Result<T, ClientError> {
238 Err(ClientError::UnexpectedResponse {
239 expected,
240 got: response_name(response),
241 })
242}
243
244fn response_name(response: &RuntimeResponse) -> &'static str {
245 match response {
246 RuntimeResponse::Spawned(_) => "Spawned",
247 RuntimeResponse::SpawnConflict(_) => "SpawnConflict",
248 RuntimeResponse::ValidateTarget(_) => "ValidateTarget",
249 RuntimeResponse::Status(_) => "Status",
250 RuntimeResponse::Killed(_) => "Killed",
251 RuntimeResponse::KillByPid(_) => "KillByPid",
252 RuntimeResponse::Nudge(_) => "Nudge",
253 RuntimeResponse::Capture(_) => "Capture",
254 RuntimeResponse::Version(_) => "Version",
255 RuntimeResponse::Watchers(_) => "Watchers",
256 RuntimeResponse::Doctor(_) => "Doctor",
257 RuntimeResponse::Events(_) => "Events",
258 RuntimeResponse::CursorExpired(_) => "CursorExpired",
259 RuntimeResponse::McpBridge(_) => "McpBridge",
260 RuntimeResponse::ShimLaunch(_) => "ShimLaunch",
261 RuntimeResponse::Ack => "Ack",
262 RuntimeResponse::Stopping => "Stopping",
263 RuntimeResponse::Error(_) => "Error",
264 _ => "Unknown",
265 }
266}
267
268async fn request_on_stream(
269 stream: UnixStream,
270 rpc: RuntimeRpc,
271) -> Result<RuntimeResponse, ClientError> {
272 let (read_half, mut write_half) = stream.into_split();
273 write_json_line(&mut write_half, &rpc).await?;
274
275 let mut reader = BufReader::new(read_half);
276 match read_json_line(&mut reader).await? {
277 RuntimeResponse::Error(payload) => {
278 let code = payload.code;
279 let message = payload.message;
280 Err(ClientError::ErrorResponse { code, message })
281 }
282 response => Ok(response),
283 }
284}