Skip to main content

running_process/client/
telemetry.rs

1//! Client helpers for daemon-owned session tee telemetry.
2
3use std::path::{Path, PathBuf};
4
5#[cfg(unix)]
6use std::os::unix::ffi::OsStrExt;
7#[cfg(windows)]
8use std::os::windows::ffi::OsStrExt;
9
10use crate::client::{ClientError, DaemonClient};
11use crate::proto::daemon::{
12    DaemonRequest, GetSessionTeeStatusRequest, RegisterSessionTeeRequest, RequestType, StatusCode,
13    TeeBackpressure as ProtoTeeBackpressure, TeeFileMode as ProtoTeeFileMode,
14    TeeSessionKind as ProtoTeeSessionKind, TeeSinkKind, TeeStreamKind as ProtoTeeStreamKind,
15    UnregisterSessionTeeRequest,
16};
17
18#[derive(Clone, Copy, Debug, Eq, PartialEq)]
19pub enum SessionTeeKind {
20    Pty,
21    Pipe,
22}
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum SessionTeeStream {
26    PtyOutput,
27    Stdout,
28    Stderr,
29    Stdin,
30}
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub enum SessionTeeFileMode {
34    Append,
35    Truncate,
36}
37
38#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39pub enum SessionTeeBackpressure {
40    DropOldest,
41    Block,
42}
43
44#[derive(Clone, Debug, Eq, PartialEq)]
45pub struct SessionTeeFileRequest {
46    pub session_id: String,
47    pub session_kind: SessionTeeKind,
48    pub stream: SessionTeeStream,
49    pub path: PathBuf,
50    pub mode: SessionTeeFileMode,
51    /// 0 means use the daemon default.
52    pub queue_capacity: u32,
53    pub write_missed_markers: bool,
54    pub backpressure: SessionTeeBackpressure,
55}
56
57impl SessionTeeFileRequest {
58    pub fn new<P>(
59        session_id: impl Into<String>,
60        session_kind: SessionTeeKind,
61        stream: SessionTeeStream,
62        path: P,
63    ) -> Self
64    where
65        P: AsRef<Path>,
66    {
67        Self {
68            session_id: session_id.into(),
69            session_kind,
70            stream,
71            path: path.as_ref().to_path_buf(),
72            mode: SessionTeeFileMode::Append,
73            queue_capacity: 0,
74            write_missed_markers: true,
75            backpressure: SessionTeeBackpressure::DropOldest,
76        }
77    }
78
79    pub fn truncate(mut self) -> Self {
80        self.mode = SessionTeeFileMode::Truncate;
81        self
82    }
83
84    pub fn queue_capacity(mut self, capacity: u32) -> Self {
85        self.queue_capacity = capacity;
86        self
87    }
88
89    pub fn suppress_missed_markers(mut self) -> Self {
90        self.write_missed_markers = false;
91        self
92    }
93
94    pub fn backpressure(mut self, backpressure: SessionTeeBackpressure) -> Self {
95        self.backpressure = backpressure;
96        self
97    }
98}
99
100#[derive(Clone, Copy, Debug, Eq, PartialEq)]
101pub struct SessionTeeStatus {
102    pub stream: SessionTeeStream,
103    pub missed_bytes: u64,
104    pub disconnected: bool,
105}
106
107impl DaemonClient {
108    pub fn register_session_file_tee(
109        &mut self,
110        request: &SessionTeeFileRequest,
111    ) -> Result<u64, ClientError> {
112        let daemon_request = DaemonRequest {
113            id: self.next_request_id(),
114            r#type: RequestType::RegisterSessionTee.into(),
115            protocol_version: 1,
116            register_session_tee: Some(RegisterSessionTeeRequest {
117                session_id: request.session_id.clone(),
118                session_kind: proto_session_kind(request.session_kind) as i32,
119                stream: proto_stream_kind(request.stream) as i32,
120                sink_kind: TeeSinkKind::File as i32,
121                file_path: encode_os_path(&request.path),
122                file_mode: proto_file_mode(request.mode) as i32,
123                queue_capacity: request.queue_capacity,
124                suppress_missed_markers: !request.write_missed_markers,
125                backpressure: proto_backpressure(request.backpressure) as i32,
126            }),
127            ..Default::default()
128        };
129        let response = self.send_request(daemon_request)?;
130        ensure_ok(&response)?;
131        let payload = response
132            .register_session_tee
133            .ok_or_else(|| ClientError::Server {
134                code: StatusCode::Internal,
135                message: "register_session_tee response missing payload".into(),
136            })?;
137        Ok(payload.tee_handle)
138    }
139
140    pub fn unregister_session_tee(
141        &mut self,
142        session_kind: SessionTeeKind,
143        session_id: &str,
144        tee_handle: u64,
145    ) -> Result<(), ClientError> {
146        let daemon_request = DaemonRequest {
147            id: self.next_request_id(),
148            r#type: RequestType::UnregisterSessionTee.into(),
149            protocol_version: 1,
150            unregister_session_tee: Some(UnregisterSessionTeeRequest {
151                session_id: session_id.to_string(),
152                session_kind: proto_session_kind(session_kind) as i32,
153                tee_handle,
154            }),
155            ..Default::default()
156        };
157        let response = self.send_request(daemon_request)?;
158        ensure_ok(&response)
159    }
160
161    pub fn get_session_tee_status(
162        &mut self,
163        session_kind: SessionTeeKind,
164        session_id: &str,
165        tee_handle: u64,
166    ) -> Result<SessionTeeStatus, ClientError> {
167        let daemon_request = DaemonRequest {
168            id: self.next_request_id(),
169            r#type: RequestType::GetSessionTeeStatus.into(),
170            protocol_version: 1,
171            get_session_tee_status: Some(GetSessionTeeStatusRequest {
172                session_id: session_id.to_string(),
173                session_kind: proto_session_kind(session_kind) as i32,
174                tee_handle,
175            }),
176            ..Default::default()
177        };
178        let response = self.send_request(daemon_request)?;
179        ensure_ok(&response)?;
180        let payload = response
181            .get_session_tee_status
182            .ok_or_else(|| ClientError::Server {
183                code: StatusCode::Internal,
184                message: "get_session_tee_status response missing payload".into(),
185            })?;
186        let stream =
187            ProtoTeeStreamKind::try_from(payload.stream).map_err(|_| ClientError::Server {
188                code: StatusCode::Internal,
189                message: "get_session_tee_status response has invalid stream".into(),
190            })?;
191        Ok(SessionTeeStatus {
192            stream: client_stream_kind(stream)?,
193            missed_bytes: payload.missed_bytes,
194            disconnected: payload.disconnected,
195        })
196    }
197}
198
199fn ensure_ok(response: &crate::proto::daemon::DaemonResponse) -> Result<(), ClientError> {
200    if response.code == StatusCode::Ok as i32 {
201        return Ok(());
202    }
203    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
204    Err(ClientError::Server {
205        code,
206        message: response.message.clone(),
207    })
208}
209
210fn proto_session_kind(kind: SessionTeeKind) -> ProtoTeeSessionKind {
211    match kind {
212        SessionTeeKind::Pty => ProtoTeeSessionKind::Pty,
213        SessionTeeKind::Pipe => ProtoTeeSessionKind::Pipe,
214    }
215}
216
217fn proto_stream_kind(stream: SessionTeeStream) -> ProtoTeeStreamKind {
218    match stream {
219        SessionTeeStream::PtyOutput => ProtoTeeStreamKind::PtyOutput,
220        SessionTeeStream::Stdout => ProtoTeeStreamKind::Stdout,
221        SessionTeeStream::Stderr => ProtoTeeStreamKind::Stderr,
222        SessionTeeStream::Stdin => ProtoTeeStreamKind::Stdin,
223    }
224}
225
226fn client_stream_kind(stream: ProtoTeeStreamKind) -> Result<SessionTeeStream, ClientError> {
227    match stream {
228        ProtoTeeStreamKind::PtyOutput => Ok(SessionTeeStream::PtyOutput),
229        ProtoTeeStreamKind::Stdout => Ok(SessionTeeStream::Stdout),
230        ProtoTeeStreamKind::Stderr => Ok(SessionTeeStream::Stderr),
231        ProtoTeeStreamKind::Stdin => Ok(SessionTeeStream::Stdin),
232        ProtoTeeStreamKind::Unspecified => Err(ClientError::Server {
233            code: StatusCode::Internal,
234            message: "get_session_tee_status response has unspecified stream".into(),
235        }),
236    }
237}
238
239fn proto_file_mode(mode: SessionTeeFileMode) -> ProtoTeeFileMode {
240    match mode {
241        SessionTeeFileMode::Append => ProtoTeeFileMode::Append,
242        SessionTeeFileMode::Truncate => ProtoTeeFileMode::Truncate,
243    }
244}
245
246fn proto_backpressure(backpressure: SessionTeeBackpressure) -> ProtoTeeBackpressure {
247    match backpressure {
248        SessionTeeBackpressure::DropOldest => ProtoTeeBackpressure::DropOldest,
249        SessionTeeBackpressure::Block => ProtoTeeBackpressure::Block,
250    }
251}
252
253#[cfg(unix)]
254fn encode_os_path(path: &Path) -> Vec<u8> {
255    path.as_os_str().as_bytes().to_vec()
256}
257
258#[cfg(windows)]
259fn encode_os_path(path: &Path) -> Vec<u8> {
260    path.as_os_str()
261        .encode_wide()
262        .flat_map(u16::to_le_bytes)
263        .collect()
264}