pub use car_server_types::host::*;
use crate::session::WsChannel;
use futures::SinkExt;
use std::sync::Arc;
use std::time::Duration;
use tokio_tungstenite::tungstenite::Message;
pub const RUN_TRACE_CHANNEL_CAP: usize = 256;
pub struct RunTraceSubscriber {
pub host_client_id: String,
tx: tokio::sync::mpsc::Sender<car_proto::RunTraceEvent>,
}
impl RunTraceSubscriber {
pub fn spawn(host_client_id: String, channel: Arc<WsChannel>) -> Self {
let (tx, mut rx) =
tokio::sync::mpsc::channel::<car_proto::RunTraceEvent>(RUN_TRACE_CHANNEL_CAP);
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
let Ok(json) = serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"method": "runs.trace.event",
"params": event,
})) else {
continue;
};
let mut guard = channel.write.lock().await;
let send = tokio::time::timeout(
Duration::from_secs(10),
guard.send(Message::Text(json.into())),
)
.await;
drop(guard);
match send {
Ok(Ok(())) => {}
_ => break,
}
}
});
Self { host_client_id, tx }
}
pub fn push(&self, event: car_proto::RunTraceEvent) -> bool {
self.tx.try_send(event).is_ok()
}
#[cfg(test)]
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use tokio::sync::Mutex;
fn channel_with_capture() -> (
Arc<WsChannel>,
futures::channel::mpsc::UnboundedReceiver<Message>,
) {
use futures::sink::SinkExt;
let (tx, rx) = futures::channel::mpsc::unbounded::<Message>();
let sink: crate::session::WsSink =
Box::pin(tx.sink_map_err(|_| tokio_tungstenite::tungstenite::Error::ConnectionClosed));
let channel = Arc::new(WsChannel {
write: Mutex::new(sink),
pending: Mutex::new(HashMap::new()),
next_id: std::sync::atomic::AtomicU64::new(0),
});
(channel, rx)
}
#[tokio::test]
async fn run_trace_subscriber_drains_event_to_socket() {
let (channel, mut rx) = channel_with_capture();
let sub = RunTraceSubscriber::spawn("host-1".to_string(), channel);
let event = car_proto::RunTraceEvent {
run_id: "run-1".to_string(),
agent_id: "agent-a".to_string(),
record: car_proto::RunRecord::Started(car_proto::RunStarted {
run_id: "run-1".to_string(),
agent_id: "agent-a".to_string(),
intent: "go".to_string(),
outcome_description: None,
started_at: chrono::Utc::now(),
}),
cursor: 0,
status: car_proto::RunLiveStatus::InProgress,
};
assert!(sub.push(event), "push onto a fresh channel succeeds");
use futures::StreamExt;
let frame = tokio::time::timeout(Duration::from_secs(2), rx.next())
.await
.expect("drain task wrote within the deadline")
.expect("a frame");
let text = match frame {
Message::Text(t) => t.to_string(),
other => panic!("expected a text frame, got {other:?}"),
};
let json: serde_json::Value = serde_json::from_str(&text).unwrap();
assert_eq!(json["method"], "runs.trace.event");
assert_eq!(json["params"]["run_id"], "run-1");
assert_eq!(json["params"]["status"], "in_progress");
}
#[tokio::test]
async fn dropping_subscriber_ends_its_drain_task() {
let (channel, _rx) = channel_with_capture();
let sub = RunTraceSubscriber::spawn("host-1".to_string(), channel);
assert!(!sub.is_closed());
drop(sub);
}
}