#![cfg(feature = "monitor")]
use std::sync::Arc;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
use futures::StreamExt;
use rmcp::ErrorData;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use zendriver::{FrameDirection, NetworkEvent};
use crate::errors::map_error;
use crate::state::{MONITOR_BUFFER_CAP, MonitorBuffer, MonitorEvent, MonitorState, SessionState};
use crate::tools::common::current_tab;
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct StartInput {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub url_pattern: Option<String>,
#[serde(default)]
pub capture_bodies: bool,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct StartOutput {
pub handle: String,
}
pub async fn start(
state: Arc<Mutex<SessionState>>,
input: StartInput,
) -> Result<StartOutput, ErrorData> {
let tab = {
let s = state.lock().await;
current_tab(&s).await?
};
let mut mb = tab.monitor();
if let Some(p) = &input.url_pattern {
mb = mb.url_pattern(p.clone());
}
let mut stream = mb.start().await.map_err(map_error)?;
let buffer = Arc::new(Mutex::new(MonitorBuffer::default()));
let cancel = CancellationToken::new();
let capture = input.capture_bodies;
let task = {
let buffer = buffer.clone();
let cancel = cancel.clone();
tokio::spawn(async move {
loop {
tokio::select! {
() = cancel.cancelled() => break,
next = stream.next() => {
let Some(ev) = next else { break };
let wire = convert(ev, capture).await;
let mut buf = buffer.lock().await;
push_capped(&mut buf, wire);
}
}
}
})
};
let handle = uuid::Uuid::new_v4().to_string();
let mon = Arc::new(Mutex::new(MonitorState {
buffer,
cancel,
task,
}));
state.lock().await.monitors.insert(handle.clone(), mon);
Ok(StartOutput { handle })
}
fn push_capped(buf: &mut MonitorBuffer, event: MonitorEvent) {
if buf.events.len() >= MONITOR_BUFFER_CAP {
buf.events.pop_front();
buf.dropped += 1;
}
buf.events.push_back(event);
}
async fn convert(ev: NetworkEvent, capture: bool) -> MonitorEvent {
match ev {
NetworkEvent::Http(ex) => {
let (body, body_base64) = if capture {
match ex.body().await {
Ok(bytes) => (
Some(String::from_utf8_lossy(&bytes).into_owned()),
Some(BASE64.encode(&bytes)),
),
Err(_) => (None, None),
}
} else {
(None, None)
};
MonitorEvent::Http {
url: ex.request.url.clone(),
method: ex.request.method.clone(),
status: ex.status(),
error: ex.error.clone(),
body,
body_base64,
}
}
NetworkEvent::WebSocketOpen { request_id, url } => {
MonitorEvent::WebSocketOpen { request_id, url }
}
NetworkEvent::WebSocketFrame {
request_id,
direction,
opcode,
payload,
} => MonitorEvent::WebSocketFrame {
request_id,
direction: match direction {
FrameDirection::Sent => "sent",
FrameDirection::Received => "received",
}
.to_string(),
opcode,
payload,
},
NetworkEvent::WebSocketClose { request_id } => MonitorEvent::WebSocketClose { request_id },
NetworkEvent::EventSourceMessage {
request_id,
event_name,
event_id,
data,
} => MonitorEvent::EventSourceMessage {
request_id,
event_name,
event_id,
data,
},
}
}
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct ReadInput {
pub handle: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max: Option<usize>,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct ReadOutput {
pub events: Vec<MonitorEvent>,
pub dropped: usize,
}
pub async fn read(
state: Arc<Mutex<SessionState>>,
input: ReadInput,
) -> Result<ReadOutput, ErrorData> {
let mon = {
let s = state.lock().await;
s.monitors.get(&input.handle).cloned()
}
.ok_or_else(|| unknown_handle(&input.handle))?;
let buffer = {
let m = mon.lock().await;
m.buffer.clone()
};
let mut buf = buffer.lock().await;
let (events, dropped) = drain(&mut buf, input.max.unwrap_or(usize::MAX));
Ok(ReadOutput { events, dropped })
}
fn drain(buf: &mut MonitorBuffer, max: usize) -> (Vec<MonitorEvent>, usize) {
let take = max.min(buf.events.len());
let events: Vec<MonitorEvent> = buf.events.drain(..take).collect();
let dropped = std::mem::take(&mut buf.dropped);
(events, dropped)
}
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct StopInput {
pub handle: String,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct StopOutput {
pub stopped: bool,
}
pub async fn stop(
state: Arc<Mutex<SessionState>>,
input: StopInput,
) -> Result<StopOutput, ErrorData> {
let Some(mon) = state.lock().await.monitors.remove(&input.handle) else {
return Ok(StopOutput { stopped: false });
};
let m = mon.lock().await;
m.cancel.cancel();
m.task.abort();
Ok(StopOutput { stopped: true })
}
fn unknown_handle(handle: &str) -> ErrorData {
ErrorData::invalid_params(
format!(
"unknown monitor handle `{handle}`. Start one with `browser_monitor_start`; the handle may have already been stopped."
),
None,
)
}
#[cfg(test)]
#[allow(clippy::panic, clippy::unwrap_used)]
mod tests {
use std::collections::VecDeque;
use super::*;
fn ev(n: usize) -> MonitorEvent {
MonitorEvent::Http {
url: format!("https://example.com/{n}"),
method: "GET".into(),
status: Some(200),
error: None,
body: None,
body_base64: None,
}
}
#[test]
fn drain_returns_up_to_max_and_clears_them() {
let mut buf = MonitorBuffer {
events: (0..5).map(ev).collect(),
dropped: 0,
};
let (events, dropped) = drain(&mut buf, 3);
assert_eq!(events.len(), 3, "drains exactly `max`");
assert_eq!(events[0], ev(0), "oldest first");
assert_eq!(events[2], ev(2));
assert_eq!(dropped, 0);
assert_eq!(buf.events.len(), 2);
assert_eq!(buf.events.front().unwrap(), &ev(3));
}
#[test]
fn drain_caps_at_buffer_len_when_max_exceeds_it() {
let mut buf = MonitorBuffer {
events: (0..2).map(ev).collect(),
dropped: 0,
};
let (events, _) = drain(&mut buf, usize::MAX);
assert_eq!(events.len(), 2, "never over-drains past what's buffered");
assert!(buf.events.is_empty());
}
#[test]
fn drain_returns_and_resets_dropped() {
let mut buf = MonitorBuffer {
events: VecDeque::new(),
dropped: 3,
};
let (events, dropped) = drain(&mut buf, usize::MAX);
assert!(events.is_empty());
assert_eq!(dropped, 3, "reports the running dropped count");
assert_eq!(buf.dropped, 0, "and resets it to zero");
}
#[test]
fn push_capped_evicts_oldest_and_counts_dropped_over_cap() {
let mut buf = MonitorBuffer::default();
for n in 0..MONITOR_BUFFER_CAP {
push_capped(&mut buf, ev(n));
}
assert_eq!(buf.events.len(), MONITOR_BUFFER_CAP);
assert_eq!(buf.dropped, 0);
push_capped(&mut buf, ev(MONITOR_BUFFER_CAP));
assert_eq!(buf.events.len(), MONITOR_BUFFER_CAP, "stays at the cap");
assert_eq!(buf.dropped, 1, "the evicted event is counted");
assert_eq!(
buf.events.front().unwrap(),
&ev(1),
"event 0 was evicted; event 1 is now oldest"
);
assert_eq!(
buf.events.back().unwrap(),
&ev(MONITOR_BUFFER_CAP),
"the newest push is retained"
);
}
#[tokio::test]
async fn read_unknown_handle_is_invalid_params() {
let state = Arc::new(Mutex::new(SessionState::new()));
let err = read(
state,
ReadInput {
handle: "nope".into(),
max: None,
},
)
.await
.expect_err("expected unknown-handle error");
assert!(err.message.contains("unknown monitor handle"));
}
#[tokio::test]
async fn stop_unknown_handle_reports_not_stopped() {
let state = Arc::new(Mutex::new(SessionState::new()));
let out = stop(
state,
StopInput {
handle: "nope".into(),
},
)
.await
.expect("stop is idempotent for unknown handles");
assert!(!out.stopped, "unknown handle reports stopped=false");
}
}