1use std::path::{Path, PathBuf};
4
5#[cfg(unix)]
6use std::os::unix::ffi::OsStrExt;
7#[cfg(windows)]
8use std::os::windows::ffi::OsStrExt;
9
10use crate::client::{ClientError, DaemonClient};
11use crate::proto::daemon::{
12 DaemonRequest, GetSessionTeeStatusRequest, RegisterSessionTeeRequest, RequestType, StatusCode,
13 TeeBackpressure as ProtoTeeBackpressure, TeeFileMode as ProtoTeeFileMode,
14 TeeSessionKind as ProtoTeeSessionKind, TeeSinkKind, TeeStreamKind as ProtoTeeStreamKind,
15 UnregisterSessionTeeRequest,
16};
17
18#[derive(Clone, Copy, Debug, Eq, PartialEq)]
19pub enum SessionTeeKind {
20 Pty,
21 Pipe,
22}
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum SessionTeeStream {
26 PtyOutput,
27 Stdout,
28 Stderr,
29 Stdin,
30}
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub enum SessionTeeFileMode {
34 Append,
35 Truncate,
36}
37
38#[derive(Clone, Copy, Debug, Eq, PartialEq)]
39pub enum SessionTeeBackpressure {
40 DropOldest,
41 Block,
42}
43
44#[derive(Clone, Debug, Eq, PartialEq)]
45pub struct SessionTeeFileRequest {
46 pub session_id: String,
47 pub session_kind: SessionTeeKind,
48 pub stream: SessionTeeStream,
49 pub path: PathBuf,
50 pub mode: SessionTeeFileMode,
51 pub queue_capacity: u32,
53 pub write_missed_markers: bool,
54 pub backpressure: SessionTeeBackpressure,
55}
56
57impl SessionTeeFileRequest {
58 pub fn new<P>(
59 session_id: impl Into<String>,
60 session_kind: SessionTeeKind,
61 stream: SessionTeeStream,
62 path: P,
63 ) -> Self
64 where
65 P: AsRef<Path>,
66 {
67 Self {
68 session_id: session_id.into(),
69 session_kind,
70 stream,
71 path: path.as_ref().to_path_buf(),
72 mode: SessionTeeFileMode::Append,
73 queue_capacity: 0,
74 write_missed_markers: true,
75 backpressure: SessionTeeBackpressure::DropOldest,
76 }
77 }
78
79 pub fn truncate(mut self) -> Self {
80 self.mode = SessionTeeFileMode::Truncate;
81 self
82 }
83
84 pub fn queue_capacity(mut self, capacity: u32) -> Self {
85 self.queue_capacity = capacity;
86 self
87 }
88
89 pub fn suppress_missed_markers(mut self) -> Self {
90 self.write_missed_markers = false;
91 self
92 }
93
94 pub fn backpressure(mut self, backpressure: SessionTeeBackpressure) -> Self {
95 self.backpressure = backpressure;
96 self
97 }
98}
99
100#[derive(Clone, Copy, Debug, Eq, PartialEq)]
101pub struct SessionTeeStatus {
102 pub stream: SessionTeeStream,
103 pub missed_bytes: u64,
104 pub disconnected: bool,
105}
106
107impl DaemonClient {
108 pub fn register_session_file_tee(
109 &mut self,
110 request: &SessionTeeFileRequest,
111 ) -> Result<u64, ClientError> {
112 let daemon_request = DaemonRequest {
113 id: self.next_request_id(),
114 r#type: RequestType::RegisterSessionTee.into(),
115 protocol_version: 1,
116 register_session_tee: Some(RegisterSessionTeeRequest {
117 session_id: request.session_id.clone(),
118 session_kind: proto_session_kind(request.session_kind) as i32,
119 stream: proto_stream_kind(request.stream) as i32,
120 sink_kind: TeeSinkKind::File as i32,
121 file_path: encode_os_path(&request.path),
122 file_mode: proto_file_mode(request.mode) as i32,
123 queue_capacity: request.queue_capacity,
124 suppress_missed_markers: !request.write_missed_markers,
125 backpressure: proto_backpressure(request.backpressure) as i32,
126 }),
127 ..Default::default()
128 };
129 let response = self.send_request(daemon_request)?;
130 ensure_ok(&response)?;
131 let payload = response
132 .register_session_tee
133 .ok_or_else(|| ClientError::Server {
134 code: StatusCode::Internal,
135 message: "register_session_tee response missing payload".into(),
136 })?;
137 Ok(payload.tee_handle)
138 }
139
140 pub fn unregister_session_tee(
141 &mut self,
142 session_kind: SessionTeeKind,
143 session_id: &str,
144 tee_handle: u64,
145 ) -> Result<(), ClientError> {
146 let daemon_request = DaemonRequest {
147 id: self.next_request_id(),
148 r#type: RequestType::UnregisterSessionTee.into(),
149 protocol_version: 1,
150 unregister_session_tee: Some(UnregisterSessionTeeRequest {
151 session_id: session_id.to_string(),
152 session_kind: proto_session_kind(session_kind) as i32,
153 tee_handle,
154 }),
155 ..Default::default()
156 };
157 let response = self.send_request(daemon_request)?;
158 ensure_ok(&response)
159 }
160
161 pub fn get_session_tee_status(
162 &mut self,
163 session_kind: SessionTeeKind,
164 session_id: &str,
165 tee_handle: u64,
166 ) -> Result<SessionTeeStatus, ClientError> {
167 let daemon_request = DaemonRequest {
168 id: self.next_request_id(),
169 r#type: RequestType::GetSessionTeeStatus.into(),
170 protocol_version: 1,
171 get_session_tee_status: Some(GetSessionTeeStatusRequest {
172 session_id: session_id.to_string(),
173 session_kind: proto_session_kind(session_kind) as i32,
174 tee_handle,
175 }),
176 ..Default::default()
177 };
178 let response = self.send_request(daemon_request)?;
179 ensure_ok(&response)?;
180 let payload = response
181 .get_session_tee_status
182 .ok_or_else(|| ClientError::Server {
183 code: StatusCode::Internal,
184 message: "get_session_tee_status response missing payload".into(),
185 })?;
186 let stream =
187 ProtoTeeStreamKind::try_from(payload.stream).map_err(|_| ClientError::Server {
188 code: StatusCode::Internal,
189 message: "get_session_tee_status response has invalid stream".into(),
190 })?;
191 Ok(SessionTeeStatus {
192 stream: client_stream_kind(stream)?,
193 missed_bytes: payload.missed_bytes,
194 disconnected: payload.disconnected,
195 })
196 }
197}
198
199fn ensure_ok(response: &crate::proto::daemon::DaemonResponse) -> Result<(), ClientError> {
200 if response.code == StatusCode::Ok as i32 {
201 return Ok(());
202 }
203 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
204 Err(ClientError::Server {
205 code,
206 message: response.message.clone(),
207 })
208}
209
210fn proto_session_kind(kind: SessionTeeKind) -> ProtoTeeSessionKind {
211 match kind {
212 SessionTeeKind::Pty => ProtoTeeSessionKind::Pty,
213 SessionTeeKind::Pipe => ProtoTeeSessionKind::Pipe,
214 }
215}
216
217fn proto_stream_kind(stream: SessionTeeStream) -> ProtoTeeStreamKind {
218 match stream {
219 SessionTeeStream::PtyOutput => ProtoTeeStreamKind::PtyOutput,
220 SessionTeeStream::Stdout => ProtoTeeStreamKind::Stdout,
221 SessionTeeStream::Stderr => ProtoTeeStreamKind::Stderr,
222 SessionTeeStream::Stdin => ProtoTeeStreamKind::Stdin,
223 }
224}
225
226fn client_stream_kind(stream: ProtoTeeStreamKind) -> Result<SessionTeeStream, ClientError> {
227 match stream {
228 ProtoTeeStreamKind::PtyOutput => Ok(SessionTeeStream::PtyOutput),
229 ProtoTeeStreamKind::Stdout => Ok(SessionTeeStream::Stdout),
230 ProtoTeeStreamKind::Stderr => Ok(SessionTeeStream::Stderr),
231 ProtoTeeStreamKind::Stdin => Ok(SessionTeeStream::Stdin),
232 ProtoTeeStreamKind::Unspecified => Err(ClientError::Server {
233 code: StatusCode::Internal,
234 message: "get_session_tee_status response has unspecified stream".into(),
235 }),
236 }
237}
238
239fn proto_file_mode(mode: SessionTeeFileMode) -> ProtoTeeFileMode {
240 match mode {
241 SessionTeeFileMode::Append => ProtoTeeFileMode::Append,
242 SessionTeeFileMode::Truncate => ProtoTeeFileMode::Truncate,
243 }
244}
245
246fn proto_backpressure(backpressure: SessionTeeBackpressure) -> ProtoTeeBackpressure {
247 match backpressure {
248 SessionTeeBackpressure::DropOldest => ProtoTeeBackpressure::DropOldest,
249 SessionTeeBackpressure::Block => ProtoTeeBackpressure::Block,
250 }
251}
252
253#[cfg(unix)]
254fn encode_os_path(path: &Path) -> Vec<u8> {
255 path.as_os_str().as_bytes().to_vec()
256}
257
258#[cfg(windows)]
259fn encode_os_path(path: &Path) -> Vec<u8> {
260 path.as_os_str()
261 .encode_wide()
262 .flat_map(u16::to_le_bytes)
263 .collect()
264}