Skip to main content

running_process/client/
telemetry.rs

1//! Client helpers for daemon-owned session tee telemetry.
2
3use std::path::{Path, PathBuf};
4
5#[cfg(unix)]
6use std::os::unix::ffi::OsStrExt;
7#[cfg(windows)]
8use std::os::windows::ffi::OsStrExt;
9
10use crate::client::{ClientError, DaemonClient};
11use crate::proto::daemon::{
12    DaemonRequest, GetSessionTeeStatusRequest, RegisterSessionTeeRequest, RequestType, StatusCode,
13    TeeBackpressure as ProtoTeeBackpressure, TeeFileMode as ProtoTeeFileMode,
14    TeeSessionKind as ProtoTeeSessionKind, TeeSinkKind, TeeStreamKind as ProtoTeeStreamKind,
15    UnregisterSessionTeeRequest,
16};
17
18#[derive(Clone, Copy, Debug, Eq, PartialEq)]
19/// Session transport that owns the stream being tee'd.
20pub enum SessionTeeKind {
21    /// Daemon-owned pseudo-terminal session.
22    Pty,
23    /// Daemon-owned pipe-backed session.
24    Pipe,
25}
26
27#[derive(Clone, Copy, Debug, Eq, PartialEq)]
28/// Session byte stream that can be mirrored to a tee sink.
29pub enum SessionTeeStream {
30    /// Combined PTY output bytes.
31    PtyOutput,
32    /// Pipe session standard output bytes.
33    Stdout,
34    /// Pipe session standard error bytes.
35    Stderr,
36    /// Bytes written successfully to a pipe session's standard input.
37    Stdin,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41/// File open mode used when registering a file tee.
42pub enum SessionTeeFileMode {
43    /// Append tee output to the file if it already exists.
44    Append,
45    /// Truncate the file before writing tee output.
46    Truncate,
47}
48
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
50/// Backpressure policy for bounded tee queues.
51pub enum SessionTeeBackpressure {
52    /// Keep the session reader non-blocking and account for dropped bytes.
53    DropOldest,
54    /// Block the session reader until the tee sink accepts more bytes.
55    Block,
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
59/// Request used to register a daemon-managed file tee for a session stream.
60pub struct SessionTeeFileRequest {
61    /// Session identifier returned by the spawn API.
62    pub session_id: String,
63    /// Kind of session that owns `session_id`.
64    pub session_kind: SessionTeeKind,
65    /// Stream to mirror into the file.
66    pub stream: SessionTeeStream,
67    /// Destination file path on the daemon host.
68    pub path: PathBuf,
69    /// File open mode for the destination path.
70    pub mode: SessionTeeFileMode,
71    /// 0 means use the daemon default.
72    pub queue_capacity: u32,
73    /// Whether the daemon writes marker lines when tee bytes are missed.
74    pub write_missed_markers: bool,
75    /// Queue behavior when the file sink cannot keep up.
76    pub backpressure: SessionTeeBackpressure,
77}
78
79impl SessionTeeFileRequest {
80    /// Create a file tee request with append mode and daemon-default queue size.
81    pub fn new<P>(
82        session_id: impl Into<String>,
83        session_kind: SessionTeeKind,
84        stream: SessionTeeStream,
85        path: P,
86    ) -> Self
87    where
88        P: AsRef<Path>,
89    {
90        Self {
91            session_id: session_id.into(),
92            session_kind,
93            stream,
94            path: path.as_ref().to_path_buf(),
95            mode: SessionTeeFileMode::Append,
96            queue_capacity: 0,
97            write_missed_markers: true,
98            backpressure: SessionTeeBackpressure::DropOldest,
99        }
100    }
101
102    /// Open the destination file in truncate mode instead of append mode.
103    pub fn truncate(mut self) -> Self {
104        self.mode = SessionTeeFileMode::Truncate;
105        self
106    }
107
108    /// Set the bounded queue capacity; `0` keeps the daemon default.
109    pub fn queue_capacity(mut self, capacity: u32) -> Self {
110        self.queue_capacity = capacity;
111        self
112    }
113
114    /// Disable marker lines for bytes missed by the file tee.
115    pub fn suppress_missed_markers(mut self) -> Self {
116        self.write_missed_markers = false;
117        self
118    }
119
120    /// Set the queue backpressure policy for this tee.
121    pub fn backpressure(mut self, backpressure: SessionTeeBackpressure) -> Self {
122        self.backpressure = backpressure;
123        self
124    }
125}
126
127#[derive(Clone, Copy, Debug, Eq, PartialEq)]
128/// Current daemon status for a registered session tee.
129pub struct SessionTeeStatus {
130    /// Stream associated with the registered tee handle.
131    pub stream: SessionTeeStream,
132    /// Number of stream bytes missed by this tee.
133    pub missed_bytes: u64,
134    /// Whether the tee sink has disconnected from the session stream.
135    pub disconnected: bool,
136}
137
138impl DaemonClient {
139    /// Register a daemon-owned file tee and return its opaque tee handle.
140    pub fn register_session_file_tee(
141        &mut self,
142        request: &SessionTeeFileRequest,
143    ) -> Result<u64, ClientError> {
144        let daemon_request = DaemonRequest {
145            id: self.next_request_id(),
146            r#type: RequestType::RegisterSessionTee.into(),
147            protocol_version: 1,
148            register_session_tee: Some(RegisterSessionTeeRequest {
149                session_id: request.session_id.clone(),
150                session_kind: proto_session_kind(request.session_kind) as i32,
151                stream: proto_stream_kind(request.stream) as i32,
152                sink_kind: TeeSinkKind::File as i32,
153                file_path: encode_os_path(&request.path),
154                file_mode: proto_file_mode(request.mode) as i32,
155                queue_capacity: request.queue_capacity,
156                suppress_missed_markers: !request.write_missed_markers,
157                backpressure: proto_backpressure(request.backpressure) as i32,
158            }),
159            ..Default::default()
160        };
161        let response = self.send_request(daemon_request)?;
162        ensure_ok(&response)?;
163        let payload = response
164            .register_session_tee
165            .ok_or_else(|| ClientError::Server {
166                code: StatusCode::Internal,
167                message: "register_session_tee response missing payload".into(),
168            })?;
169        Ok(payload.tee_handle)
170    }
171
172    /// Unregister a previously registered session tee handle.
173    pub fn unregister_session_tee(
174        &mut self,
175        session_kind: SessionTeeKind,
176        session_id: &str,
177        tee_handle: u64,
178    ) -> Result<(), ClientError> {
179        let daemon_request = DaemonRequest {
180            id: self.next_request_id(),
181            r#type: RequestType::UnregisterSessionTee.into(),
182            protocol_version: 1,
183            unregister_session_tee: Some(UnregisterSessionTeeRequest {
184                session_id: session_id.to_string(),
185                session_kind: proto_session_kind(session_kind) as i32,
186                tee_handle,
187            }),
188            ..Default::default()
189        };
190        let response = self.send_request(daemon_request)?;
191        ensure_ok(&response)
192    }
193
194    /// Fetch the current status for a registered session tee handle.
195    pub fn get_session_tee_status(
196        &mut self,
197        session_kind: SessionTeeKind,
198        session_id: &str,
199        tee_handle: u64,
200    ) -> Result<SessionTeeStatus, ClientError> {
201        let daemon_request = DaemonRequest {
202            id: self.next_request_id(),
203            r#type: RequestType::GetSessionTeeStatus.into(),
204            protocol_version: 1,
205            get_session_tee_status: Some(GetSessionTeeStatusRequest {
206                session_id: session_id.to_string(),
207                session_kind: proto_session_kind(session_kind) as i32,
208                tee_handle,
209            }),
210            ..Default::default()
211        };
212        let response = self.send_request(daemon_request)?;
213        ensure_ok(&response)?;
214        let payload = response
215            .get_session_tee_status
216            .ok_or_else(|| ClientError::Server {
217                code: StatusCode::Internal,
218                message: "get_session_tee_status response missing payload".into(),
219            })?;
220        let stream =
221            ProtoTeeStreamKind::try_from(payload.stream).map_err(|_| ClientError::Server {
222                code: StatusCode::Internal,
223                message: "get_session_tee_status response has invalid stream".into(),
224            })?;
225        Ok(SessionTeeStatus {
226            stream: client_stream_kind(stream)?,
227            missed_bytes: payload.missed_bytes,
228            disconnected: payload.disconnected,
229        })
230    }
231}
232
233fn ensure_ok(response: &crate::proto::daemon::DaemonResponse) -> Result<(), ClientError> {
234    if response.code == StatusCode::Ok as i32 {
235        return Ok(());
236    }
237    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
238    Err(ClientError::Server {
239        code,
240        message: response.message.clone(),
241    })
242}
243
244fn proto_session_kind(kind: SessionTeeKind) -> ProtoTeeSessionKind {
245    match kind {
246        SessionTeeKind::Pty => ProtoTeeSessionKind::Pty,
247        SessionTeeKind::Pipe => ProtoTeeSessionKind::Pipe,
248    }
249}
250
251fn proto_stream_kind(stream: SessionTeeStream) -> ProtoTeeStreamKind {
252    match stream {
253        SessionTeeStream::PtyOutput => ProtoTeeStreamKind::PtyOutput,
254        SessionTeeStream::Stdout => ProtoTeeStreamKind::Stdout,
255        SessionTeeStream::Stderr => ProtoTeeStreamKind::Stderr,
256        SessionTeeStream::Stdin => ProtoTeeStreamKind::Stdin,
257    }
258}
259
260fn client_stream_kind(stream: ProtoTeeStreamKind) -> Result<SessionTeeStream, ClientError> {
261    match stream {
262        ProtoTeeStreamKind::PtyOutput => Ok(SessionTeeStream::PtyOutput),
263        ProtoTeeStreamKind::Stdout => Ok(SessionTeeStream::Stdout),
264        ProtoTeeStreamKind::Stderr => Ok(SessionTeeStream::Stderr),
265        ProtoTeeStreamKind::Stdin => Ok(SessionTeeStream::Stdin),
266        ProtoTeeStreamKind::Unspecified => Err(ClientError::Server {
267            code: StatusCode::Internal,
268            message: "get_session_tee_status response has unspecified stream".into(),
269        }),
270    }
271}
272
273fn proto_file_mode(mode: SessionTeeFileMode) -> ProtoTeeFileMode {
274    match mode {
275        SessionTeeFileMode::Append => ProtoTeeFileMode::Append,
276        SessionTeeFileMode::Truncate => ProtoTeeFileMode::Truncate,
277    }
278}
279
280fn proto_backpressure(backpressure: SessionTeeBackpressure) -> ProtoTeeBackpressure {
281    match backpressure {
282        SessionTeeBackpressure::DropOldest => ProtoTeeBackpressure::DropOldest,
283        SessionTeeBackpressure::Block => ProtoTeeBackpressure::Block,
284    }
285}
286
287#[cfg(unix)]
288fn encode_os_path(path: &Path) -> Vec<u8> {
289    path.as_os_str().as_bytes().to_vec()
290}
291
292#[cfg(windows)]
293fn encode_os_path(path: &Path) -> Vec<u8> {
294    path.as_os_str()
295        .encode_wide()
296        .flat_map(u16::to_le_bytes)
297        .collect()
298}