1use std::sync::mpsc;
18
19use crate::client::{ClientError, DaemonClient};
20use crate::observer::{EventCategory, ObserverEvent, ObserverSubscriber};
21use crate::proto::daemon::{
22 DaemonRequest, GetSessionObserverStatusRequest,
23 ObserverBackpressure as ProtoObserverBackpressure,
24 ObserverSessionKind as ProtoObserverSessionKind, RegisterSessionObserverRequest, RequestType,
25 StatusCode, UnregisterSessionObserverRequest,
26};
27
28#[derive(Clone, Copy, Debug, Eq, PartialEq)]
30pub enum SessionObserverKind {
31 Pty,
33 Pipe,
35}
36
37#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
39pub enum SessionObserverBackpressure {
40 #[default]
42 DropOldest,
43 Block,
45}
46
47#[derive(Clone, Debug, Eq, PartialEq)]
50pub struct SessionObserverRequest {
51 pub session_id: String,
53 pub session_kind: SessionObserverKind,
55 pub categories: Vec<EventCategory>,
58 pub ring_capacity_events: u32,
60 pub backpressure: SessionObserverBackpressure,
62}
63
64impl SessionObserverRequest {
65 pub fn new(session_id: impl Into<String>, session_kind: SessionObserverKind) -> Self {
68 Self {
69 session_id: session_id.into(),
70 session_kind,
71 categories: vec![EventCategory::Lifecycle],
72 ring_capacity_events: 0,
73 backpressure: SessionObserverBackpressure::DropOldest,
74 }
75 }
76
77 pub fn categories(mut self, categories: impl IntoIterator<Item = EventCategory>) -> Self {
79 self.categories = categories.into_iter().collect();
80 self
81 }
82
83 pub fn ring_capacity_events(mut self, capacity: u32) -> Self {
85 self.ring_capacity_events = capacity;
86 self
87 }
88
89 pub fn backpressure(mut self, backpressure: SessionObserverBackpressure) -> Self {
91 self.backpressure = backpressure;
92 self
93 }
94}
95
96pub struct RemoteObserverSubscription {
106 pub subscriber_id: String,
110 pub subscriber: ObserverSubscriber,
117 _local_sender: mpsc::Sender<ObserverEvent>,
120}
121
122#[derive(Clone, Copy, Debug, Eq, PartialEq)]
124pub struct SessionObserverStatus {
125 pub missed_events: u64,
127 pub disconnected: bool,
129 pub delivered_events: u64,
131}
132
133impl DaemonClient {
134 pub fn register_session_observer(
136 &mut self,
137 request: &SessionObserverRequest,
138 ) -> Result<RemoteObserverSubscription, ClientError> {
139 let daemon_request = DaemonRequest {
140 id: self.next_request_id(),
141 r#type: RequestType::RegisterSessionObserver.into(),
142 protocol_version: 1,
143 register_session_observer: Some(RegisterSessionObserverRequest {
144 session_id: request.session_id.clone(),
145 session_kind: proto_session_kind(request.session_kind) as i32,
146 categories: request
147 .categories
148 .iter()
149 .map(|c| event_category_to_u32(*c))
150 .collect(),
151 ring_capacity_events: request.ring_capacity_events,
152 backpressure: proto_backpressure(request.backpressure) as i32,
153 }),
154 ..Default::default()
155 };
156 let response = self.send_request(daemon_request)?;
157 ensure_ok(&response)?;
158 let payload = response
159 .register_session_observer
160 .ok_or_else(|| ClientError::Server {
161 code: StatusCode::Internal,
162 message: "register_session_observer response missing payload".into(),
163 })?;
164 let (tx, rx) = mpsc::channel();
167 Ok(RemoteObserverSubscription {
168 subscriber_id: payload.subscriber_id,
169 subscriber: ObserverSubscriber::from_receiver(rx),
170 _local_sender: tx,
171 })
172 }
173
174 pub fn unregister_session_observer(
176 &mut self,
177 session_kind: SessionObserverKind,
178 session_id: &str,
179 subscriber_id: &str,
180 ) -> Result<(), ClientError> {
181 let daemon_request = DaemonRequest {
182 id: self.next_request_id(),
183 r#type: RequestType::UnregisterSessionObserver.into(),
184 protocol_version: 1,
185 unregister_session_observer: Some(UnregisterSessionObserverRequest {
186 session_id: session_id.to_string(),
187 session_kind: proto_session_kind(session_kind) as i32,
188 subscriber_id: subscriber_id.to_string(),
189 }),
190 ..Default::default()
191 };
192 let response = self.send_request(daemon_request)?;
193 ensure_ok(&response)
194 }
195
196 pub fn get_session_observer_status(
198 &mut self,
199 session_kind: SessionObserverKind,
200 session_id: &str,
201 subscriber_id: &str,
202 ) -> Result<SessionObserverStatus, ClientError> {
203 let daemon_request = DaemonRequest {
204 id: self.next_request_id(),
205 r#type: RequestType::GetSessionObserverStatus.into(),
206 protocol_version: 1,
207 get_session_observer_status: Some(GetSessionObserverStatusRequest {
208 session_id: session_id.to_string(),
209 session_kind: proto_session_kind(session_kind) as i32,
210 subscriber_id: subscriber_id.to_string(),
211 }),
212 ..Default::default()
213 };
214 let response = self.send_request(daemon_request)?;
215 ensure_ok(&response)?;
216 let payload = response
217 .get_session_observer_status
218 .ok_or_else(|| ClientError::Server {
219 code: StatusCode::Internal,
220 message: "get_session_observer_status response missing payload".into(),
221 })?;
222 Ok(SessionObserverStatus {
223 missed_events: payload.missed_events,
224 disconnected: payload.disconnected,
225 delivered_events: payload.delivered_events,
226 })
227 }
228}
229
230fn ensure_ok(response: &crate::proto::daemon::DaemonResponse) -> Result<(), ClientError> {
231 if response.code == StatusCode::Ok as i32 {
232 return Ok(());
233 }
234 let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
235 Err(ClientError::Server {
236 code,
237 message: response.message.clone(),
238 })
239}
240
241fn proto_session_kind(kind: SessionObserverKind) -> ProtoObserverSessionKind {
242 match kind {
243 SessionObserverKind::Pty => ProtoObserverSessionKind::Pty,
244 SessionObserverKind::Pipe => ProtoObserverSessionKind::Pipe,
245 }
246}
247
248fn proto_backpressure(b: SessionObserverBackpressure) -> ProtoObserverBackpressure {
249 match b {
250 SessionObserverBackpressure::DropOldest => ProtoObserverBackpressure::DropOldest,
251 SessionObserverBackpressure::Block => ProtoObserverBackpressure::Block,
252 }
253}
254
255fn event_category_to_u32(category: EventCategory) -> u32 {
256 match category {
257 EventCategory::Lifecycle => 0,
258 EventCategory::File => 1,
259 EventCategory::Network => 2,
260 EventCategory::Process => 3,
261 }
262}