objectiveai_sdk/http/notifier.rs
1//! Client-side handle for sending `client_request::Notify` frames
2//! over a streaming WS connection and awaiting their matching
3//! `client_response::Response` replies.
4//!
5//! Returned from every `*_streaming_ws` method alongside the chunk
6//! Stream. Drop the Notifier to relinquish the write half — when
7//! both the Notifier and the Stream are dropped, the demux task
8//! exits and the WS closes cleanly.
9
10use crate::client_objectiveai_mcp::{client_request, client_response};
11use futures::stream::SplitSink;
12use futures::SinkExt;
13use std::sync::Arc;
14use tokio::net::TcpStream;
15use tokio::sync::{Mutex, oneshot};
16use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, tungstenite};
17
18/// Shared sender half of a split WebSocket. Both the Notifier and
19/// the demux task (which writes `server_response` frames after
20/// handler dispatch) hold a clone; the mutex serializes writes so
21/// frames don't interleave on the wire.
22pub(crate) type SharedSink = Arc<
23 Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
24>;
25
26/// Per-connection registry of outstanding notify ids and the
27/// oneshot senders the demux task fulfills when the matching
28/// `client_response::Response` arrives.
29pub(crate) type PendingNotifies =
30 Arc<dashmap::DashMap<String, oneshot::Sender<client_response::Response>>>;
31
32/// Client-side handle for sending notifies to a running agent
33/// completion (or any other streaming endpoint) over the same WS
34/// that carries the chunk stream.
35///
36/// Multiple in-flight notifies are supported; each gets a unique id
37/// and parks its own oneshot. Calls are independent and can run
38/// concurrently from multiple tasks.
39#[derive(Clone)]
40pub struct Notifier {
41 sink: SharedSink,
42 pending: PendingNotifies,
43}
44
45impl Notifier {
46 pub(crate) fn new(sink: SharedSink, pending: PendingNotifies) -> Self {
47 Self { sink, pending }
48 }
49
50 /// Push a user message at a running agent completion identified
51 /// by `params.response_id`. The message surfaces to the model
52 /// on its next natural inspection point.
53 ///
54 /// Returns `Ok(())` if the server accepted the notify. Returns
55 /// the server-supplied `code + message` if the server replied
56 /// with an `Error` variant (e.g. unknown response_id, MCP
57 /// channel down). Returns [`super::HttpError::NotifyChannelClosed`]
58 /// if the WS was torn down before the reply arrived.
59 pub async fn notify(
60 &self,
61 params: crate::agent::completions::request::AgentCompletionNotifyParams,
62 ) -> Result<(), super::HttpError> {
63 let id = uuid::Uuid::new_v4().to_string();
64 let (tx, rx) = oneshot::channel();
65 self.pending.insert(id.clone(), tx);
66
67 let request = client_request::Request {
68 id: id.clone(),
69 payload: client_request::Payload::AgentCompletionNotify(params),
70 };
71 let frame = match serde_json::to_string(&request) {
72 Ok(s) => s,
73 Err(e) => {
74 self.pending.remove(&id);
75 return Err(super::HttpError::NotifySerialize(e));
76 }
77 };
78
79 {
80 let mut guard = self.sink.lock().await;
81 if let Err(e) = guard
82 .send(tungstenite::Message::Text(frame.into()))
83 .await
84 {
85 drop(guard);
86 self.pending.remove(&id);
87 return Err(super::HttpError::NotifySend(e));
88 }
89 }
90
91 let response = match rx.await {
92 Ok(r) => r,
93 Err(_) => return Err(super::HttpError::NotifyChannelClosed),
94 };
95
96 match response {
97 client_response::Response::Ok { .. } => Ok(()),
98 client_response::Response::Error { code, message, .. } => {
99 Err(super::HttpError::NotifyRejected { code, message })
100 }
101 }
102 }
103}