Skip to main content

objectiveai_sdk/http/viewer/
client.rs

1//! Viewer HTTP client. Fire-and-forget event publisher with exponential
2//! backoff retries.
3//!
4//! The client owns an unbounded mpsc channel + a background tokio task.
5//! Each `send_*` method is synchronous — it serializes the request and
6//! pushes it onto the channel; the background task POSTs to the
7//! viewer's HTTP endpoint with retry. Callers don't await network I/O.
8//!
9//! This module is intentionally ctx-free. Per-request `address` and
10//! `signature` overrides are passed explicitly to each `send_*` call.
11//! Higher-level consumers (e.g. `objectiveai-api`) can wrap this client
12//! and do per-request context resolution on top.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use super::request::{
20    AgentCompletionCreateParams, AgentCompletionRequest,
21    FunctionExecutionCreateParams, FunctionExecutionRequest, Request,
22    ResponseError,
23};
24
25/// Resolved per-request override pair.
26///
27/// `None` on either field means "fall back to the [`Client`]'s
28/// `default_*`".
29#[derive(Debug, Clone)]
30pub(super) struct ViewerData {
31    pub address: Option<Arc<String>>,
32    pub signature: Option<Arc<String>>,
33}
34
35/// The viewer HTTP client. Constructed once at process startup; all
36/// `send_*` methods take `&self` so the client is shared across the
37/// app via `Arc<Client>`.
38pub struct Client {
39    tx: mpsc::UnboundedSender<(ViewerData, Request)>,
40    /// Background task handle. `flush(self)` drops `tx` and `.await`s
41    /// this so every enqueued request lands (or exhausts its retry
42    /// budget) before the consumer drops the runtime.
43    handle: tokio::task::JoinHandle<()>,
44    /// Fallback address used when a `send_*` call's `address` is
45    /// `None`. Set from the constructor's `address` argument, or from
46    /// `VIEWER_ADDRESS` if `address` is `None` and the `env` feature
47    /// is enabled.
48    pub default_address: Option<Arc<String>>,
49    /// Fallback signature used when a `send_*` call's `signature` is
50    /// `None`. Set from the constructor's `signature` argument, or
51    /// from `VIEWER_SIGNATURE` if `signature` is `None` and the `env`
52    /// feature is enabled.
53    pub default_signature: Option<Arc<String>>,
54}
55
56impl Client {
57    /// Construct a new client and spawn its background task.
58    ///
59    /// `address` / `signature` defaults: an explicit `Some(...)` wins.
60    /// When `None`, and the `env` feature is enabled, falls back to
61    /// the `VIEWER_ADDRESS` / `VIEWER_SIGNATURE` env vars. When both
62    /// the argument and the env var are missing, the client has no
63    /// default — every `send_*` call must supply its own `address`,
64    /// or the request is dropped.
65    ///
66    /// Backoff parameters tune the [`backoff::ExponentialBackoff`]
67    /// the background task uses for retry. `max_elapsed_time` caps
68    /// how long the task keeps retrying a single failed request
69    /// before giving up.
70    pub fn new(
71        http_client: reqwest::Client,
72        address: Option<impl Into<String>>,
73        signature: Option<impl Into<String>>,
74        backoff_current_interval: Duration,
75        backoff_initial_interval: Duration,
76        backoff_randomization_factor: f64,
77        backoff_multiplier: f64,
78        backoff_max_interval: Duration,
79        backoff_max_elapsed_time: Duration,
80    ) -> Self {
81        let default_address = match address {
82            Some(s) => Some(Arc::new(s.into())),
83            #[cfg(feature = "env")]
84            None => std::env::var("VIEWER_ADDRESS").ok().map(Arc::new),
85            #[cfg(not(feature = "env"))]
86            None => None,
87        };
88        let default_signature = match signature {
89            Some(s) => Some(Arc::new(s.into())),
90            #[cfg(feature = "env")]
91            None => std::env::var("VIEWER_SIGNATURE").ok().map(Arc::new),
92            #[cfg(not(feature = "env"))]
93            None => None,
94        };
95
96        let (tx, mut rx) = mpsc::unbounded_channel::<(ViewerData, Request)>();
97
98        let bg_default_address = default_address.clone();
99        let bg_default_signature = default_signature.clone();
100
101        let handle = tokio::spawn(async move {
102            while let Some((viewer_data, request)) = rx.recv().await {
103                let (address, signature) = match viewer_data.address {
104                    Some(addr) => (addr, viewer_data.signature),
105                    None => match &bg_default_address {
106                        Some(addr) => {
107                            (addr.clone(), bg_default_signature.clone())
108                        }
109                        None => continue,
110                    },
111                };
112
113                let url = match &request {
114                    Request::AgentCompletion(_) => {
115                        format!("{}/agent/completions", address)
116                    }
117                    Request::FunctionExecution(_) => {
118                        format!("{}/functions/executions", address)
119                    }
120                };
121
122                let body = match serde_json::to_vec(&request) {
123                    Ok(body) => body,
124                    Err(_) => continue,
125                };
126
127                let _ = backoff::future::retry(
128                    backoff::ExponentialBackoff {
129                        current_interval: backoff_current_interval,
130                        initial_interval: backoff_initial_interval,
131                        randomization_factor: backoff_randomization_factor,
132                        multiplier: backoff_multiplier,
133                        max_interval: backoff_max_interval,
134                        max_elapsed_time: Some(backoff_max_elapsed_time),
135                        start_time: std::time::Instant::now(),
136                        clock: backoff::SystemClock::default(),
137                    },
138                    || {
139                        let http_client = &http_client;
140                        let url = &url;
141                        let body = &body;
142                        let signature = &signature;
143                        async move {
144                            let mut req = http_client
145                                .post(url.as_str())
146                                .header("Content-Type", "application/json")
147                                .body(body.clone());
148
149                            if let Some(sig) = signature {
150                                req = req
151                                    .header("X-VIEWER-SIGNATURE", sig.as_str());
152                            }
153
154                            let response = req
155                                .send()
156                                .await
157                                .map_err(backoff::Error::transient)?;
158
159                            if response.status().is_success() {
160                                Ok(())
161                            } else {
162                                Err(backoff::Error::transient(
163                                    response.error_for_status().unwrap_err(),
164                                ))
165                            }
166                        }
167                    },
168                )
169                .await;
170            }
171        });
172
173        Self {
174            tx,
175            handle,
176            default_address,
177            default_signature,
178        }
179    }
180
181    /// Closes the channel and awaits the background task. After this
182    /// returns, every enqueued request has either succeeded or
183    /// exhausted its retry budget. Takes `self` so the
184    /// can't-send-after-flush invariant is enforced by the type
185    /// system.
186    ///
187    /// Intended for short-lived consumers (e.g. the cli) that must
188    /// drain in-flight POSTs before dropping the tokio runtime. Long-
189    /// lived consumers (api server) can rely on the natural drop
190    /// behavior — when the `Client` drops, `tx` drops, the bg task's
191    /// `rx.recv()` returns `None`, and the loop exits.
192    pub async fn flush(self) {
193        let Self { tx, handle, .. } = self;
194        drop(tx);
195        let _ = handle.await;
196    }
197
198    /// Internal helper. Pushes one `(ViewerData, Request)` onto the
199    /// channel; the background task takes it from there.
200    fn enqueue(
201        &self,
202        address: Option<Arc<String>>,
203        signature: Option<Arc<String>>,
204        request: Request,
205    ) {
206        let _ = self.tx.send((ViewerData { address, signature }, request));
207    }
208
209    pub fn send_agent_completion_begin(
210        &self,
211        address: Option<Arc<String>>,
212        signature: Option<Arc<String>>,
213        id: String,
214        request: Arc<
215            crate::agent::completions::request::AgentCompletionCreateParams,
216        >,
217    ) {
218        self.enqueue(
219            address,
220            signature,
221            Request::AgentCompletion(AgentCompletionRequest::Begin(
222                AgentCompletionCreateParams { id, inner: request },
223            )),
224        );
225    }
226
227    pub fn send_agent_completion_continue(
228        &self,
229        address: Option<Arc<String>>,
230        signature: Option<Arc<String>>,
231        chunk: crate::agent::completions::response::streaming::AgentCompletionChunk,
232    ) {
233        self.enqueue(
234            address,
235            signature,
236            Request::AgentCompletion(AgentCompletionRequest::Continue(chunk)),
237        );
238    }
239
240    pub fn send_agent_completion_error(
241        &self,
242        address: Option<Arc<String>>,
243        signature: Option<Arc<String>>,
244        id: String,
245        error: crate::error::ResponseError,
246    ) {
247        self.enqueue(
248            address,
249            signature,
250            Request::AgentCompletion(AgentCompletionRequest::Error(
251                ResponseError { id, inner: error },
252            )),
253        );
254    }
255
256    pub fn send_function_execution_begin(
257        &self,
258        address: Option<Arc<String>>,
259        signature: Option<Arc<String>>,
260        id: String,
261        request: Arc<crate::functions::executions::request::FunctionExecutionCreateParams>,
262    ) {
263        self.enqueue(
264            address,
265            signature,
266            Request::FunctionExecution(FunctionExecutionRequest::Begin(
267                FunctionExecutionCreateParams { id, inner: request },
268            )),
269        );
270    }
271
272    pub fn send_function_execution_continue(
273        &self,
274        address: Option<Arc<String>>,
275        signature: Option<Arc<String>>,
276        chunk: crate::functions::executions::response::streaming::FunctionExecutionChunk,
277    ) {
278        self.enqueue(
279            address,
280            signature,
281            Request::FunctionExecution(FunctionExecutionRequest::Continue(
282                chunk,
283            )),
284        );
285    }
286
287    pub fn send_function_execution_error(
288        &self,
289        address: Option<Arc<String>>,
290        signature: Option<Arc<String>>,
291        id: String,
292        error: crate::error::ResponseError,
293    ) {
294        self.enqueue(
295            address,
296            signature,
297            Request::FunctionExecution(FunctionExecutionRequest::Error(
298                ResponseError { id, inner: error },
299            )),
300        );
301    }
302
303}