use std::sync::mpsc;
use crate::client::{ClientError, DaemonClient};
use crate::observer::{EventCategory, ObserverEvent, ObserverSubscriber};
use crate::proto::daemon::{
DaemonRequest, GetSessionObserverStatusRequest,
ObserverBackpressure as ProtoObserverBackpressure,
ObserverSessionKind as ProtoObserverSessionKind, RegisterSessionObserverRequest, RequestType,
StatusCode, UnregisterSessionObserverRequest,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SessionObserverKind {
Pty,
Pipe,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum SessionObserverBackpressure {
#[default]
DropOldest,
Block,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct SessionObserverRequest {
pub session_id: String,
pub session_kind: SessionObserverKind,
pub categories: Vec<EventCategory>,
pub ring_capacity_events: u32,
pub backpressure: SessionObserverBackpressure,
}
impl SessionObserverRequest {
pub fn new(session_id: impl Into<String>, session_kind: SessionObserverKind) -> Self {
Self {
session_id: session_id.into(),
session_kind,
categories: vec![EventCategory::Lifecycle],
ring_capacity_events: 0,
backpressure: SessionObserverBackpressure::DropOldest,
}
}
pub fn categories(mut self, categories: impl IntoIterator<Item = EventCategory>) -> Self {
self.categories = categories.into_iter().collect();
self
}
pub fn ring_capacity_events(mut self, capacity: u32) -> Self {
self.ring_capacity_events = capacity;
self
}
pub fn backpressure(mut self, backpressure: SessionObserverBackpressure) -> Self {
self.backpressure = backpressure;
self
}
}
pub struct RemoteObserverSubscription {
pub subscriber_id: String,
pub subscriber: ObserverSubscriber,
_local_sender: mpsc::Sender<ObserverEvent>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct SessionObserverStatus {
pub missed_events: u64,
pub disconnected: bool,
pub delivered_events: u64,
}
impl DaemonClient {
pub fn register_session_observer(
&mut self,
request: &SessionObserverRequest,
) -> Result<RemoteObserverSubscription, ClientError> {
let daemon_request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::RegisterSessionObserver.into(),
protocol_version: 1,
register_session_observer: Some(RegisterSessionObserverRequest {
session_id: request.session_id.clone(),
session_kind: proto_session_kind(request.session_kind) as i32,
categories: request
.categories
.iter()
.map(|c| event_category_to_u32(*c))
.collect(),
ring_capacity_events: request.ring_capacity_events,
backpressure: proto_backpressure(request.backpressure) as i32,
}),
..Default::default()
};
let response = self.send_request(daemon_request)?;
ensure_ok(&response)?;
let payload = response
.register_session_observer
.ok_or_else(|| ClientError::Server {
code: StatusCode::Internal,
message: "register_session_observer response missing payload".into(),
})?;
let (tx, rx) = mpsc::channel();
Ok(RemoteObserverSubscription {
subscriber_id: payload.subscriber_id,
subscriber: ObserverSubscriber::from_receiver(rx),
_local_sender: tx,
})
}
pub fn unregister_session_observer(
&mut self,
session_kind: SessionObserverKind,
session_id: &str,
subscriber_id: &str,
) -> Result<(), ClientError> {
let daemon_request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::UnregisterSessionObserver.into(),
protocol_version: 1,
unregister_session_observer: Some(UnregisterSessionObserverRequest {
session_id: session_id.to_string(),
session_kind: proto_session_kind(session_kind) as i32,
subscriber_id: subscriber_id.to_string(),
}),
..Default::default()
};
let response = self.send_request(daemon_request)?;
ensure_ok(&response)
}
pub fn get_session_observer_status(
&mut self,
session_kind: SessionObserverKind,
session_id: &str,
subscriber_id: &str,
) -> Result<SessionObserverStatus, ClientError> {
let daemon_request = DaemonRequest {
id: self.next_request_id(),
r#type: RequestType::GetSessionObserverStatus.into(),
protocol_version: 1,
get_session_observer_status: Some(GetSessionObserverStatusRequest {
session_id: session_id.to_string(),
session_kind: proto_session_kind(session_kind) as i32,
subscriber_id: subscriber_id.to_string(),
}),
..Default::default()
};
let response = self.send_request(daemon_request)?;
ensure_ok(&response)?;
let payload = response
.get_session_observer_status
.ok_or_else(|| ClientError::Server {
code: StatusCode::Internal,
message: "get_session_observer_status response missing payload".into(),
})?;
Ok(SessionObserverStatus {
missed_events: payload.missed_events,
disconnected: payload.disconnected,
delivered_events: payload.delivered_events,
})
}
}
fn ensure_ok(response: &crate::proto::daemon::DaemonResponse) -> Result<(), ClientError> {
if response.code == StatusCode::Ok as i32 {
return Ok(());
}
let code = StatusCode::try_from(response.code).unwrap_or(StatusCode::UnknownRequest);
Err(ClientError::Server {
code,
message: response.message.clone(),
})
}
fn proto_session_kind(kind: SessionObserverKind) -> ProtoObserverSessionKind {
match kind {
SessionObserverKind::Pty => ProtoObserverSessionKind::Pty,
SessionObserverKind::Pipe => ProtoObserverSessionKind::Pipe,
}
}
fn proto_backpressure(b: SessionObserverBackpressure) -> ProtoObserverBackpressure {
match b {
SessionObserverBackpressure::DropOldest => ProtoObserverBackpressure::DropOldest,
SessionObserverBackpressure::Block => ProtoObserverBackpressure::Block,
}
}
fn event_category_to_u32(category: EventCategory) -> u32 {
match category {
EventCategory::Lifecycle => 0,
EventCategory::File => 1,
EventCategory::Network => 2,
EventCategory::Process => 3,
}
}