Skip to main content

running_process/client/
observer.rs

1//! Client helpers for daemon-owned session observer subscriptions
2//! (#221 Phase 2 / #429).
3//!
4//! Mirrors `client::telemetry` but for event-stream observer payloads
5//! (process lifecycle Started/Exited, eventually file/network/process
6//! events in Phase 3). The client wraps the three IPC round trips:
7//!
8//! - [`DaemonClient::register_session_observer`]
9//! - [`DaemonClient::unregister_session_observer`]
10//! - [`DaemonClient::get_session_observer_status`]
11//!
12//! Registrations live on the per-session struct on the daemon side, so
13//! they survive the IPC connection going away. Events emitted while no
14//! consumer is draining the bounded channel are accounted for via
15//! [`SessionObserverStatus::missed_events`] under `DropOldest` policy.
16
17use std::sync::mpsc;
18
19use crate::client::{ClientError, DaemonClient};
20use crate::observer::{EventCategory, ObserverEvent, ObserverSubscriber};
21use crate::proto::daemon::{
22    DaemonRequest, GetSessionObserverStatusRequest,
23    ObserverBackpressure as ProtoObserverBackpressure,
24    ObserverSessionKind as ProtoObserverSessionKind, RegisterSessionObserverRequest, RequestType,
25    StatusCode, UnregisterSessionObserverRequest,
26};
27
28/// Session transport that owns the observed process.
29#[derive(Clone, Copy, Debug, Eq, PartialEq)]
30pub enum SessionObserverKind {
31    /// Daemon-owned pseudo-terminal session.
32    Pty,
33    /// Daemon-owned pipe-backed session.
34    Pipe,
35}
36
37/// Backpressure policy for the daemon-side bounded sink.
38#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
39pub enum SessionObserverBackpressure {
40    /// Drop the newest event and increment `missed_events` when the sink is full.
41    #[default]
42    DropOldest,
43    /// Block the emitter (daemon-side) until the sink has room.
44    Block,
45}
46
47/// Request used to register a daemon-managed observer subscription on a
48/// session.
49#[derive(Clone, Debug, Eq, PartialEq)]
50pub struct SessionObserverRequest {
51    /// Session identifier returned by the spawn API.
52    pub session_id: String,
53    /// Kind of session that owns `session_id`.
54    pub session_kind: SessionObserverKind,
55    /// Requested event categories. Empty defaults to `[Lifecycle]` on the
56    /// server, matching the Phase 1 in-process default.
57    pub categories: Vec<EventCategory>,
58    /// Bounded sink capacity. `0` means use the daemon default (1024).
59    pub ring_capacity_events: u32,
60    /// Sink backpressure policy.
61    pub backpressure: SessionObserverBackpressure,
62}
63
64impl SessionObserverRequest {
65    /// Construct a request with default `[Lifecycle]` categories, daemon
66    /// default capacity, and `DropOldest` backpressure.
67    pub fn new(session_id: impl Into<String>, session_kind: SessionObserverKind) -> Self {
68        Self {
69            session_id: session_id.into(),
70            session_kind,
71            categories: vec![EventCategory::Lifecycle],
72            ring_capacity_events: 0,
73            backpressure: SessionObserverBackpressure::DropOldest,
74        }
75    }
76
77    /// Override the requested category set.
78    pub fn categories(mut self, categories: impl IntoIterator<Item = EventCategory>) -> Self {
79        self.categories = categories.into_iter().collect();
80        self
81    }
82
83    /// Override the bounded sink capacity. `0` keeps the daemon default.
84    pub fn ring_capacity_events(mut self, capacity: u32) -> Self {
85        self.ring_capacity_events = capacity;
86        self
87    }
88
89    /// Override the backpressure policy.
90    pub fn backpressure(mut self, backpressure: SessionObserverBackpressure) -> Self {
91        self.backpressure = backpressure;
92        self
93    }
94}
95
96/// Outcome of [`DaemonClient::register_session_observer`].
97///
98/// Holds the server-assigned [`subscriber_id`](Self::subscriber_id) plus a
99/// local [`ObserverSubscriber`] handle. The local subscriber is wired to a
100/// connection-local `mpsc` channel: future work will pump
101/// [`crate::observer::ObserverEvent`] frames into that channel from a
102/// streaming IPC attach. In this PR no streaming attach exists yet, so the
103/// channel stays empty — clients can still observe activity by polling
104/// [`DaemonClient::get_session_observer_status`].
105pub struct RemoteObserverSubscription {
106    /// Server-assigned UUID identifying this subscription. Pass back to
107    /// [`DaemonClient::unregister_session_observer`] /
108    /// [`DaemonClient::get_session_observer_status`].
109    pub subscriber_id: String,
110    /// Local subscriber whose channel is currently inert.
111    ///
112    /// Held here so the surface matches the Phase 1 in-process API and so
113    /// callers can pass it to existing code that expects an
114    /// `ObserverSubscriber`. Once a streaming-attach frame is added the
115    /// sender half (held internally) will start feeding this receiver.
116    pub subscriber: ObserverSubscriber,
117    // Local sender held so the channel does not disconnect immediately.
118    // Future work will use this to dispatch IPC-delivered events.
119    _local_sender: mpsc::Sender<ObserverEvent>,
120}
121
122/// Current daemon-side counters for a registered subscription.
123#[derive(Clone, Copy, Debug, Eq, PartialEq)]
124pub struct SessionObserverStatus {
125    /// Cumulative events that arrived while the bounded sink was full.
126    pub missed_events: u64,
127    /// True once the downstream receiver has disconnected.
128    pub disconnected: bool,
129    /// Cumulative events successfully delivered into the bounded sink.
130    pub delivered_events: u64,
131}
132
133impl DaemonClient {
134    /// Register a session-scoped observer subscription on the daemon.
135    pub fn register_session_observer(
136        &mut self,
137        request: &SessionObserverRequest,
138    ) -> Result<RemoteObserverSubscription, ClientError> {
139        let daemon_request = DaemonRequest {
140            id: self.next_request_id(),
141            r#type: RequestType::RegisterSessionObserver.into(),
142            protocol_version: 1,
143            register_session_observer: Some(RegisterSessionObserverRequest {
144                session_id: request.session_id.clone(),
145                session_kind: proto_session_kind(request.session_kind) as i32,
146                categories: request
147                    .categories
148                    .iter()
149                    .map(|c| event_category_to_u32(*c))
150                    .collect(),
151                ring_capacity_events: request.ring_capacity_events,
152                backpressure: proto_backpressure(request.backpressure) as i32,
153            }),
154            ..Default::default()
155        };
156        let response = self.send_request(daemon_request)?;
157        ensure_ok(&response)?;
158        let payload = response
159            .register_session_observer
160            .ok_or_else(|| ClientError::Server {
161                code: StatusCode::Internal,
162                message: "register_session_observer response missing payload".into(),
163            })?;
164        // Create a local inert channel for the surface
165        // — future streaming attach will pump events into the sender.
166        let (tx, rx) = mpsc::channel();
167        Ok(RemoteObserverSubscription {
168            subscriber_id: payload.subscriber_id,
169            subscriber: ObserverSubscriber::from_receiver(rx),
170            _local_sender: tx,
171        })
172    }
173
174    /// Unregister a previously registered subscription.
175    pub fn unregister_session_observer(
176        &mut self,
177        session_kind: SessionObserverKind,
178        session_id: &str,
179        subscriber_id: &str,
180    ) -> Result<(), ClientError> {
181        let daemon_request = DaemonRequest {
182            id: self.next_request_id(),
183            r#type: RequestType::UnregisterSessionObserver.into(),
184            protocol_version: 1,
185            unregister_session_observer: Some(UnregisterSessionObserverRequest {
186                session_id: session_id.to_string(),
187                session_kind: proto_session_kind(session_kind) as i32,
188                subscriber_id: subscriber_id.to_string(),
189            }),
190            ..Default::default()
191        };
192        let response = self.send_request(daemon_request)?;
193        ensure_ok(&response)
194    }
195
196    /// Fetch current counters for a registered subscription.
197    pub fn get_session_observer_status(
198        &mut self,
199        session_kind: SessionObserverKind,
200        session_id: &str,
201        subscriber_id: &str,
202    ) -> Result<SessionObserverStatus, ClientError> {
203        let daemon_request = DaemonRequest {
204            id: self.next_request_id(),
205            r#type: RequestType::GetSessionObserverStatus.into(),
206            protocol_version: 1,
207            get_session_observer_status: Some(GetSessionObserverStatusRequest {
208                session_id: session_id.to_string(),
209                session_kind: proto_session_kind(session_kind) as i32,
210                subscriber_id: subscriber_id.to_string(),
211            }),
212            ..Default::default()
213        };
214        let response = self.send_request(daemon_request)?;
215        ensure_ok(&response)?;
216        let payload = response
217            .get_session_observer_status
218            .ok_or_else(|| ClientError::Server {
219                code: StatusCode::Internal,
220                message: "get_session_observer_status response missing payload".into(),
221            })?;
222        Ok(SessionObserverStatus {
223            missed_events: payload.missed_events,
224            disconnected: payload.disconnected,
225            delivered_events: payload.delivered_events,
226        })
227    }
228}
229
230fn ensure_ok(response: &crate::proto::daemon::DaemonResponse) -> Result<(), ClientError> {
231    if response.code == StatusCode::Ok as i32 {
232        return Ok(());
233    }
234    let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
235    Err(ClientError::Server {
236        code,
237        message: response.message.clone(),
238    })
239}
240
241fn proto_session_kind(kind: SessionObserverKind) -> ProtoObserverSessionKind {
242    match kind {
243        SessionObserverKind::Pty => ProtoObserverSessionKind::Pty,
244        SessionObserverKind::Pipe => ProtoObserverSessionKind::Pipe,
245    }
246}
247
248fn proto_backpressure(b: SessionObserverBackpressure) -> ProtoObserverBackpressure {
249    match b {
250        SessionObserverBackpressure::DropOldest => ProtoObserverBackpressure::DropOldest,
251        SessionObserverBackpressure::Block => ProtoObserverBackpressure::Block,
252    }
253}
254
255fn event_category_to_u32(category: EventCategory) -> u32 {
256    match category {
257        EventCategory::Lifecycle => 0,
258        EventCategory::File => 1,
259        EventCategory::Network => 2,
260        EventCategory::Process => 3,
261    }
262}