Skip to main content

lilo_rm_client/
lib.rs

1//! Async Unix socket client for the public rtmd JSON line contract.
2//!
3//! `lilo-rm-client` owns connection setup, newline delimited JSON framing, and
4//! typed client side error normalization. Protocol request and response shapes
5//! remain in `lilo-rm-core`.
6
7use std::path::{Path, PathBuf};
8
9use lilo_rm_core::{
10    CaptureRequest, CaptureResponse, DoctorPayload, ErrorCode, EventBatch, EventsRequest,
11    KillByPidRequest, KillOutcome, KillRequest, ProtocolError, RUNTIME_PROTOCOL_VERSION,
12    RuntimeResponse, RuntimeRpc, SpawnRequest, SpawnedPayload, StatusFilter, StatusPayload,
13    ValidateTargetRequest, ValidateTargetResponse, VersionPayload, read_json_line, write_json_line,
14};
15use thiserror::Error;
16use tokio::io::BufReader;
17use tokio::net::UnixStream;
18
19mod event_watcher;
20
21pub use event_watcher::{EventWatcher, EventWatcherBuilder};
22
23/// Async client for the rtmd Unix socket JSON line protocol.
24#[derive(Clone, Debug)]
25pub struct RuntimeClient {
26    socket_path: PathBuf,
27}
28
29impl RuntimeClient {
30    /// Create a client connected to `socket_path`.
31    pub fn new(socket_path: impl Into<PathBuf>) -> Self {
32        Self {
33            socket_path: socket_path.into(),
34        }
35    }
36
37    /// Return the Unix socket path this client connects to.
38    pub fn socket_path(&self) -> &Path {
39        &self.socket_path
40    }
41
42    /// Send a raw protocol request and return the raw protocol response.
43    pub async fn request(&self, rpc: RuntimeRpc) -> Result<RuntimeResponse, ClientError> {
44        request(self.socket_path(), rpc).await
45    }
46
47    /// Spawn a runtime session and return the created lifecycle payload.
48    pub async fn spawn(&self, request: SpawnRequest) -> Result<SpawnedPayload, ClientError> {
49        match self.request(RuntimeRpc::Spawn { request }).await? {
50            RuntimeResponse::Spawned(payload) => Ok(payload),
51            RuntimeResponse::SpawnConflict(payload) => {
52                Err(ClientError::SpawnConflict(Box::new(payload)))
53            }
54            response => unexpected_response("Spawned", &response),
55        }
56    }
57
58    /// Kill a runtime session by session id.
59    pub async fn kill(&self, request: KillRequest) -> Result<KillOutcome, ClientError> {
60        match self.request(RuntimeRpc::Kill { request }).await? {
61            RuntimeResponse::Killed(payload) => Ok(payload.outcome),
62            response => unexpected_response("Killed", &response),
63        }
64    }
65
66    /// Kill an arbitrary process id through the daemon admin path.
67    pub async fn kill_by_pid(&self, request: KillByPidRequest) -> Result<KillOutcome, ClientError> {
68        match self.request(RuntimeRpc::KillByPid { request }).await? {
69            RuntimeResponse::KillByPid(payload) => Ok(payload.response.outcome),
70            response => unexpected_response("KillByPid", &response),
71        }
72    }
73
74    /// Query runtime lifecycle status.
75    pub async fn status(&self, filter: StatusFilter) -> Result<StatusPayload, ClientError> {
76        match self
77            .request(RuntimeRpc::Status {
78                request: filter.into(),
79            })
80            .await?
81        {
82            RuntimeResponse::Status(payload) => Ok(payload),
83            response => unexpected_response("Status", &response),
84        }
85    }
86
87    /// Send a text nudge to a runtime session and return the delivery outcome.
88    pub async fn nudge(
89        &self,
90        request: lilo_rm_core::NudgeRequest,
91    ) -> Result<lilo_rm_core::NudgeResponse, ClientError> {
92        match self.request(RuntimeRpc::Nudge { request }).await? {
93            RuntimeResponse::Nudge(payload) => Ok(payload.response),
94            response => unexpected_response("Nudge", &response),
95        }
96    }
97
98    /// Capture scrollback for a runtime session.
99    pub async fn capture(&self, request: CaptureRequest) -> Result<CaptureResponse, ClientError> {
100        match self.request(RuntimeRpc::Capture { request }).await? {
101            RuntimeResponse::Capture(payload) => Ok(payload.response),
102            response => unexpected_response("Capture", &response),
103        }
104    }
105
106    /// Validate a user supplied spawn target string.
107    pub async fn validate_target(
108        &self,
109        target: &str,
110    ) -> Result<ValidateTargetResponse, ClientError> {
111        match self
112            .request(RuntimeRpc::ValidateTarget {
113                request: ValidateTargetRequest {
114                    target: target.to_owned(),
115                },
116            })
117            .await?
118        {
119            RuntimeResponse::ValidateTarget(payload) => Ok(payload.response),
120            response => unexpected_response("ValidateTarget", &response),
121        }
122    }
123
124    /// Query daemon diagnostics.
125    pub async fn doctor(&self) -> Result<DoctorPayload, ClientError> {
126        match self.request(RuntimeRpc::Doctor).await? {
127            RuntimeResponse::Doctor(payload) => Ok(payload),
128            response => unexpected_response("Doctor", &response),
129        }
130    }
131
132    /// Query the daemon version and protocol capability payload.
133    pub async fn version(&self) -> Result<VersionPayload, ClientError> {
134        match self.request(RuntimeRpc::Version).await? {
135            RuntimeResponse::Version(payload) => Ok(payload),
136            response => unexpected_response("Version", &response),
137        }
138    }
139
140    /// Query one batch of lifecycle events.
141    pub async fn events(&self, request: EventsRequest) -> Result<EventBatch, ClientError> {
142        match self.request(RuntimeRpc::Events { request }).await? {
143            RuntimeResponse::Events(payload) => Ok(EventBatch::Events {
144                events: payload.events,
145                cursor: payload.cursor,
146            }),
147            RuntimeResponse::CursorExpired(payload) => Ok(EventBatch::CursorExpired {
148                oldest: payload.oldest,
149            }),
150            response => unexpected_response("Events or CursorExpired", &response),
151        }
152    }
153
154    async fn check_protocol_version(&self) -> Result<(), ClientError> {
155        let payload = self.version().await?;
156        let got = payload.version.protocol_version;
157        if got == RUNTIME_PROTOCOL_VERSION {
158            Ok(())
159        } else {
160            Err(ClientError::Protocol {
161                source: ProtocolError::UnsupportedVersion {
162                    expected: RUNTIME_PROTOCOL_VERSION,
163                    got,
164                },
165            })
166        }
167    }
168}
169
170/// Error returned by the rtmd client.
171#[derive(Debug, Error)]
172#[non_exhaustive]
173pub enum ClientError {
174    /// The configured daemon socket could not be reached.
175    #[error("rtmd unavailable at {socket_path}: {source}")]
176    DaemonUnavailable {
177        /// Socket path the client tried to connect to.
178        socket_path: PathBuf,
179        #[source]
180        /// Underlying I/O error.
181        source: std::io::Error,
182    },
183    /// The daemon or transport returned malformed protocol data.
184    #[error("rtmd protocol error: {source}")]
185    Protocol {
186        #[from]
187        /// Underlying protocol error.
188        source: ProtocolError,
189    },
190    /// The daemon returned an explicit error response.
191    #[error("rtmd returned {code}: {message}")]
192    ErrorResponse { code: ErrorCode, message: String },
193    /// The daemon refused a spawn because the requested identity or pane is already occupied.
194    #[error("rtmd spawn conflict: {0:?}")]
195    SpawnConflict(Box<lilo_rm_core::SpawnConflictPayload>),
196    /// A typed helper received a different response variant than expected.
197    #[error("expected {expected} response, got {got}")]
198    UnexpectedResponse {
199        /// Response variant the helper expected.
200        expected: &'static str,
201        /// Response variant the daemon returned.
202        got: &'static str,
203    },
204}
205
206impl ClientError {
207    /// Return the stable runtime error code represented by this client error.
208    pub const fn code(&self) -> ErrorCode {
209        match self {
210            Self::DaemonUnavailable { .. } => ErrorCode::RuntimeUnavailable,
211            Self::Protocol { .. } => ErrorCode::ProtocolMismatch,
212            Self::ErrorResponse { code, .. } => *code,
213            Self::SpawnConflict(_) => ErrorCode::SpawnConflict,
214            Self::UnexpectedResponse { .. } => ErrorCode::ProtocolMismatch,
215        }
216    }
217}
218
219/// Send a raw protocol request to `socket_path`.
220pub async fn request(
221    socket_path: impl AsRef<Path>,
222    rpc: RuntimeRpc,
223) -> Result<RuntimeResponse, ClientError> {
224    let socket_path = socket_path.as_ref();
225    let stream = UnixStream::connect(socket_path).await.map_err(|source| {
226        ClientError::DaemonUnavailable {
227            socket_path: socket_path.to_path_buf(),
228            source,
229        }
230    })?;
231    request_on_stream(stream, rpc).await
232}
233
234fn unexpected_response<T>(
235    expected: &'static str,
236    response: &RuntimeResponse,
237) -> Result<T, ClientError> {
238    Err(ClientError::UnexpectedResponse {
239        expected,
240        got: response_name(response),
241    })
242}
243
244fn response_name(response: &RuntimeResponse) -> &'static str {
245    match response {
246        RuntimeResponse::Spawned(_) => "Spawned",
247        RuntimeResponse::SpawnConflict(_) => "SpawnConflict",
248        RuntimeResponse::ValidateTarget(_) => "ValidateTarget",
249        RuntimeResponse::Status(_) => "Status",
250        RuntimeResponse::Killed(_) => "Killed",
251        RuntimeResponse::KillByPid(_) => "KillByPid",
252        RuntimeResponse::Nudge(_) => "Nudge",
253        RuntimeResponse::Capture(_) => "Capture",
254        RuntimeResponse::Version(_) => "Version",
255        RuntimeResponse::Watchers(_) => "Watchers",
256        RuntimeResponse::Doctor(_) => "Doctor",
257        RuntimeResponse::Events(_) => "Events",
258        RuntimeResponse::CursorExpired(_) => "CursorExpired",
259        RuntimeResponse::McpBridge(_) => "McpBridge",
260        RuntimeResponse::ShimLaunch(_) => "ShimLaunch",
261        RuntimeResponse::Ack => "Ack",
262        RuntimeResponse::Stopping => "Stopping",
263        RuntimeResponse::Error(_) => "Error",
264        _ => "Unknown",
265    }
266}
267
268async fn request_on_stream(
269    stream: UnixStream,
270    rpc: RuntimeRpc,
271) -> Result<RuntimeResponse, ClientError> {
272    let (read_half, mut write_half) = stream.into_split();
273    write_json_line(&mut write_half, &rpc).await?;
274
275    let mut reader = BufReader::new(read_half);
276    match read_json_line(&mut reader).await? {
277        RuntimeResponse::Error(payload) => {
278            let code = payload.code;
279            let message = payload.message;
280            Err(ClientError::ErrorResponse { code, message })
281        }
282        response => Ok(response),
283    }
284}