Skip to main content

objectiveai_sdk/http/
notifier.rs

1//! Client-side handle for sending `client_request::Notify` frames
2//! over a streaming WS connection and awaiting their matching
3//! `client_response::Response` replies.
4//!
5//! Returned from every `*_streaming_ws` method alongside the chunk
6//! Stream. Drop the Notifier to relinquish the write half — when
7//! both the Notifier and the Stream are dropped, the demux task
8//! exits and the WS closes cleanly.
9
10use crate::client_objectiveai_mcp::{client_request, client_response};
11use futures::SinkExt;
12use futures::stream::SplitSink;
13use std::sync::Arc;
14use tokio::net::TcpStream;
15use tokio::sync::{Mutex, oneshot};
16use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, tungstenite};
17
18/// Shared sender half of a split WebSocket. Both the Notifier and
19/// the demux task (which writes `server_response` frames after
20/// handler dispatch) hold a clone; the mutex serializes writes so
21/// frames don't interleave on the wire.
22pub(crate) type SharedSink = Arc<
23    Mutex<
24        SplitSink<
25            WebSocketStream<MaybeTlsStream<TcpStream>>,
26            tungstenite::Message,
27        >,
28    >,
29>;
30
31/// Per-connection registry of outstanding notify ids and the
32/// oneshot senders the demux task fulfills when the matching
33/// `client_response::Response` arrives.
34pub(crate) type PendingNotifies =
35    Arc<dashmap::DashMap<String, oneshot::Sender<client_response::Response>>>;
36
37/// Client-side handle for sending notifies to a running agent
38/// completion (or any other streaming endpoint) over the same WS
39/// that carries the chunk stream.
40///
41/// Multiple in-flight notifies are supported; each gets a unique id
42/// and parks its own oneshot. Calls are independent and can run
43/// concurrently from multiple tasks.
44#[derive(Clone)]
45pub struct Notifier {
46    sink: SharedSink,
47    pending: PendingNotifies,
48}
49
50impl Notifier {
51    pub(crate) fn new(sink: SharedSink, pending: PendingNotifies) -> Self {
52        Self { sink, pending }
53    }
54
55    /// Push a user message at a running agent completion identified
56    /// by `params.response_id`. The message surfaces to the model
57    /// on its next natural inspection point.
58    ///
59    /// Returns `Ok(())` if the server accepted the notify. Returns
60    /// the server-supplied `code + message` if the server replied
61    /// with an `Error` variant (e.g. unknown response_id, MCP
62    /// channel down). Returns [`super::HttpError::NotifyChannelClosed`]
63    /// if the WS was torn down before the reply arrived.
64    pub async fn notify(
65        &self,
66        params: crate::agent::completions::request::AgentCompletionNotifyParams,
67    ) -> Result<(), super::HttpError> {
68        self.send(client_request::Payload::AgentCompletionNotify(params))
69            .await
70    }
71
72    /// Forward a `notifications/{tools,resources}/list_changed`
73    /// observation from an upstream `mcp::Connection` up to the API,
74    /// which will fan it out as an SSE event on every matching
75    /// `/objectiveai-mcp/{ws_session_id}` GET stream subscribed to
76    /// the same `mcp_session_id`.
77    ///
78    /// Same ack semantics as [`Self::notify`].
79    pub async fn notify_list_changed(
80        &self,
81        change: client_request::McpListChanged,
82    ) -> Result<(), super::HttpError> {
83        self.send(client_request::Payload::McpListChanged(change))
84            .await
85    }
86
87    /// Common send-and-await-ack body shared by every `notify_*` method.
88    async fn send(
89        &self,
90        payload: client_request::Payload,
91    ) -> Result<(), super::HttpError> {
92        let id = uuid::Uuid::new_v4().to_string();
93        let (tx, rx) = oneshot::channel();
94        self.pending.insert(id.clone(), tx);
95
96        let request = client_request::Request {
97            id: id.clone(),
98            payload,
99        };
100        let frame = match serde_json::to_string(&request) {
101            Ok(s) => s,
102            Err(e) => {
103                self.pending.remove(&id);
104                return Err(super::HttpError::NotifySerialize(e));
105            }
106        };
107
108        {
109            let mut guard = self.sink.lock().await;
110            if let Err(e) =
111                guard.send(tungstenite::Message::Text(frame.into())).await
112            {
113                drop(guard);
114                self.pending.remove(&id);
115                return Err(super::HttpError::NotifySend(e));
116            }
117        }
118
119        let response = match rx.await {
120            Ok(r) => r,
121            Err(_) => return Err(super::HttpError::NotifyChannelClosed),
122        };
123
124        match response {
125            client_response::Response::Ok { .. } => Ok(()),
126            client_response::Response::Error { code, message, .. } => {
127                Err(super::HttpError::NotifyRejected { code, message })
128            }
129        }
130    }
131}