1use std::path::{Path, PathBuf};
4
5#[cfg(unix)]
6use std::os::unix::ffi::OsStrExt;
7#[cfg(windows)]
8use std::os::windows::ffi::OsStrExt;
9
10use crate::client::{ClientError, DaemonClient};
11use crate::proto::daemon::{
12 DaemonRequest, GetSessionTeeStatusRequest, RegisterSessionTeeRequest, RequestType, StatusCode,
13 TeeBackpressure as ProtoTeeBackpressure, TeeFileMode as ProtoTeeFileMode,
14 TeeSessionKind as ProtoTeeSessionKind, TeeSinkKind, TeeStreamKind as ProtoTeeStreamKind,
15 UnregisterSessionTeeRequest,
16};
17
18#[derive(Clone, Copy, Debug, Eq, PartialEq)]
19pub enum SessionTeeKind {
21 Pty,
23 Pipe,
25}
26
27#[derive(Clone, Copy, Debug, Eq, PartialEq)]
28pub enum SessionTeeStream {
30 PtyOutput,
32 Stdout,
34 Stderr,
36 Stdin,
38}
39
40#[derive(Clone, Copy, Debug, Eq, PartialEq)]
41pub enum SessionTeeFileMode {
43 Append,
45 Truncate,
47}
48
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
50pub enum SessionTeeBackpressure {
52 DropOldest,
54 Block,
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
59pub struct SessionTeeFileRequest {
61 pub session_id: String,
63 pub session_kind: SessionTeeKind,
65 pub stream: SessionTeeStream,
67 pub path: PathBuf,
69 pub mode: SessionTeeFileMode,
71 pub queue_capacity: u32,
73 pub write_missed_markers: bool,
75 pub backpressure: SessionTeeBackpressure,
77}
78
79impl SessionTeeFileRequest {
80 pub fn new<P>(
82 session_id: impl Into<String>,
83 session_kind: SessionTeeKind,
84 stream: SessionTeeStream,
85 path: P,
86 ) -> Self
87 where
88 P: AsRef<Path>,
89 {
90 Self {
91 session_id: session_id.into(),
92 session_kind,
93 stream,
94 path: path.as_ref().to_path_buf(),
95 mode: SessionTeeFileMode::Append,
96 queue_capacity: 0,
97 write_missed_markers: true,
98 backpressure: SessionTeeBackpressure::DropOldest,
99 }
100 }
101
102 pub fn truncate(mut self) -> Self {
104 self.mode = SessionTeeFileMode::Truncate;
105 self
106 }
107
108 pub fn queue_capacity(mut self, capacity: u32) -> Self {
110 self.queue_capacity = capacity;
111 self
112 }
113
114 pub fn suppress_missed_markers(mut self) -> Self {
116 self.write_missed_markers = false;
117 self
118 }
119
120 pub fn backpressure(mut self, backpressure: SessionTeeBackpressure) -> Self {
122 self.backpressure = backpressure;
123 self
124 }
125}
126
127#[derive(Clone, Copy, Debug, Eq, PartialEq)]
128pub struct SessionTeeStatus {
130 pub stream: SessionTeeStream,
132 pub missed_bytes: u64,
134 pub disconnected: bool,
136}
137
138impl DaemonClient {
139 pub fn register_session_file_tee(
141 &mut self,
142 request: &SessionTeeFileRequest,
143 ) -> Result<u64, ClientError> {
144 let daemon_request = DaemonRequest {
145 id: self.next_request_id(),
146 r#type: RequestType::RegisterSessionTee.into(),
147 protocol_version: 1,
148 register_session_tee: Some(RegisterSessionTeeRequest {
149 session_id: request.session_id.clone(),
150 session_kind: proto_session_kind(request.session_kind) as i32,
151 stream: proto_stream_kind(request.stream) as i32,
152 sink_kind: TeeSinkKind::File as i32,
153 file_path: encode_os_path(&request.path),
154 file_mode: proto_file_mode(request.mode) as i32,
155 queue_capacity: request.queue_capacity,
156 suppress_missed_markers: !request.write_missed_markers,
157 backpressure: proto_backpressure(request.backpressure) as i32,
158 }),
159 ..Default::default()
160 };
161 let response = self.send_request(daemon_request)?;
162 ensure_ok(&response)?;
163 let payload = response
164 .register_session_tee
165 .ok_or_else(|| ClientError::Server {
166 code: StatusCode::Internal,
167 message: "register_session_tee response missing payload".into(),
168 })?;
169 Ok(payload.tee_handle)
170 }
171
172 pub fn unregister_session_tee(
174 &mut self,
175 session_kind: SessionTeeKind,
176 session_id: &str,
177 tee_handle: u64,
178 ) -> Result<(), ClientError> {
179 let daemon_request = DaemonRequest {
180 id: self.next_request_id(),
181 r#type: RequestType::UnregisterSessionTee.into(),
182 protocol_version: 1,
183 unregister_session_tee: Some(UnregisterSessionTeeRequest {
184 session_id: session_id.to_string(),
185 session_kind: proto_session_kind(session_kind) as i32,
186 tee_handle,
187 }),
188 ..Default::default()
189 };
190 let response = self.send_request(daemon_request)?;
191 ensure_ok(&response)
192 }
193
194 pub fn get_session_tee_status(
196 &mut self,
197 session_kind: SessionTeeKind,
198 session_id: &str,
199 tee_handle: u64,
200 ) -> Result<SessionTeeStatus, ClientError> {
201 let daemon_request = DaemonRequest {
202 id: self.next_request_id(),
203 r#type: RequestType::GetSessionTeeStatus.into(),
204 protocol_version: 1,
205 get_session_tee_status: Some(GetSessionTeeStatusRequest {
206 session_id: session_id.to_string(),
207 session_kind: proto_session_kind(session_kind) as i32,
208 tee_handle,
209 }),
210 ..Default::default()
211 };
212 let response = self.send_request(daemon_request)?;
213 ensure_ok(&response)?;
214 let payload = response
215 .get_session_tee_status
216 .ok_or_else(|| ClientError::Server {
217 code: StatusCode::Internal,
218 message: "get_session_tee_status response missing payload".into(),
219 })?;
220 let stream =
221 ProtoTeeStreamKind::try_from(payload.stream).map_err(|_| ClientError::Server {
222 code: StatusCode::Internal,
223 message: "get_session_tee_status response has invalid stream".into(),
224 })?;
225 Ok(SessionTeeStatus {
226 stream: client_stream_kind(stream)?,
227 missed_bytes: payload.missed_bytes,
228 disconnected: payload.disconnected,
229 })
230 }
231}
232
233fn ensure_ok(response: &crate::proto::daemon::DaemonResponse) -> Result<(), ClientError> {
234 if response.code == StatusCode::Ok as i32 {
235 return Ok(());
236 }
237 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
238 Err(ClientError::Server {
239 code,
240 message: response.message.clone(),
241 })
242}
243
244fn proto_session_kind(kind: SessionTeeKind) -> ProtoTeeSessionKind {
245 match kind {
246 SessionTeeKind::Pty => ProtoTeeSessionKind::Pty,
247 SessionTeeKind::Pipe => ProtoTeeSessionKind::Pipe,
248 }
249}
250
251fn proto_stream_kind(stream: SessionTeeStream) -> ProtoTeeStreamKind {
252 match stream {
253 SessionTeeStream::PtyOutput => ProtoTeeStreamKind::PtyOutput,
254 SessionTeeStream::Stdout => ProtoTeeStreamKind::Stdout,
255 SessionTeeStream::Stderr => ProtoTeeStreamKind::Stderr,
256 SessionTeeStream::Stdin => ProtoTeeStreamKind::Stdin,
257 }
258}
259
260fn client_stream_kind(stream: ProtoTeeStreamKind) -> Result<SessionTeeStream, ClientError> {
261 match stream {
262 ProtoTeeStreamKind::PtyOutput => Ok(SessionTeeStream::PtyOutput),
263 ProtoTeeStreamKind::Stdout => Ok(SessionTeeStream::Stdout),
264 ProtoTeeStreamKind::Stderr => Ok(SessionTeeStream::Stderr),
265 ProtoTeeStreamKind::Stdin => Ok(SessionTeeStream::Stdin),
266 ProtoTeeStreamKind::Unspecified => Err(ClientError::Server {
267 code: StatusCode::Internal,
268 message: "get_session_tee_status response has unspecified stream".into(),
269 }),
270 }
271}
272
273fn proto_file_mode(mode: SessionTeeFileMode) -> ProtoTeeFileMode {
274 match mode {
275 SessionTeeFileMode::Append => ProtoTeeFileMode::Append,
276 SessionTeeFileMode::Truncate => ProtoTeeFileMode::Truncate,
277 }
278}
279
280fn proto_backpressure(backpressure: SessionTeeBackpressure) -> ProtoTeeBackpressure {
281 match backpressure {
282 SessionTeeBackpressure::DropOldest => ProtoTeeBackpressure::DropOldest,
283 SessionTeeBackpressure::Block => ProtoTeeBackpressure::Block,
284 }
285}
286
287#[cfg(unix)]
288fn encode_os_path(path: &Path) -> Vec<u8> {
289 path.as_os_str().as_bytes().to_vec()
290}
291
292#[cfg(windows)]
293fn encode_os_path(path: &Path) -> Vec<u8> {
294 path.as_os_str()
295 .encode_wide()
296 .flat_map(u16::to_le_bytes)
297 .collect()
298}