Skip to main content

objectiveai_sdk/http/
notifier.rs

1//! Client-side handle for sending `client_request::Notify` frames
2//! over a streaming WS connection and awaiting their matching
3//! `client_response::Response` replies.
4//!
5//! Returned from every `*_streaming_ws` method alongside the chunk
6//! Stream. Drop the Notifier to relinquish the write half — when
7//! both the Notifier and the Stream are dropped, the demux task
8//! exits and the WS closes cleanly.
9
10use crate::client_objectiveai_mcp::{client_request, client_response};
11use futures::stream::SplitSink;
12use futures::SinkExt;
13use std::sync::Arc;
14use tokio::net::TcpStream;
15use tokio::sync::{Mutex, oneshot};
16use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, tungstenite};
17
18/// Shared sender half of a split WebSocket. Both the Notifier and
19/// the demux task (which writes `server_response` frames after
20/// handler dispatch) hold a clone; the mutex serializes writes so
21/// frames don't interleave on the wire.
22pub(crate) type SharedSink = Arc<
23    Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
24>;
25
26/// Per-connection registry of outstanding notify ids and the
27/// oneshot senders the demux task fulfills when the matching
28/// `client_response::Response` arrives.
29pub(crate) type PendingNotifies =
30    Arc<dashmap::DashMap<String, oneshot::Sender<client_response::Response>>>;
31
32/// Client-side handle for sending notifies to a running agent
33/// completion (or any other streaming endpoint) over the same WS
34/// that carries the chunk stream.
35///
36/// Multiple in-flight notifies are supported; each gets a unique id
37/// and parks its own oneshot. Calls are independent and can run
38/// concurrently from multiple tasks.
39#[derive(Clone)]
40pub struct Notifier {
41    sink: SharedSink,
42    pending: PendingNotifies,
43}
44
45impl Notifier {
46    pub(crate) fn new(sink: SharedSink, pending: PendingNotifies) -> Self {
47        Self { sink, pending }
48    }
49
50    /// Push a user message at a running agent completion identified
51    /// by `params.response_id`. The message surfaces to the model
52    /// on its next natural inspection point.
53    ///
54    /// Returns `Ok(())` if the server accepted the notify. Returns
55    /// the server-supplied `code + message` if the server replied
56    /// with an `Error` variant (e.g. unknown response_id, MCP
57    /// channel down). Returns [`super::HttpError::NotifyChannelClosed`]
58    /// if the WS was torn down before the reply arrived.
59    pub async fn notify(
60        &self,
61        params: crate::agent::completions::request::AgentCompletionNotifyParams,
62    ) -> Result<(), super::HttpError> {
63        let id = uuid::Uuid::new_v4().to_string();
64        let (tx, rx) = oneshot::channel();
65        self.pending.insert(id.clone(), tx);
66
67        let request = client_request::Request {
68            id: id.clone(),
69            payload: client_request::Payload::AgentCompletionNotify(params),
70        };
71        let frame = match serde_json::to_string(&request) {
72            Ok(s) => s,
73            Err(e) => {
74                self.pending.remove(&id);
75                return Err(super::HttpError::NotifySerialize(e));
76            }
77        };
78
79        {
80            let mut guard = self.sink.lock().await;
81            if let Err(e) = guard
82                .send(tungstenite::Message::Text(frame.into()))
83                .await
84            {
85                drop(guard);
86                self.pending.remove(&id);
87                return Err(super::HttpError::NotifySend(e));
88            }
89        }
90
91        let response = match rx.await {
92            Ok(r) => r,
93            Err(_) => return Err(super::HttpError::NotifyChannelClosed),
94        };
95
96        match response {
97            client_response::Response::Ok { .. } => Ok(()),
98            client_response::Response::Error { code, message, .. } => {
99                Err(super::HttpError::NotifyRejected { code, message })
100            }
101        }
102    }
103}