Skip to main content

objectiveai_sdk/http/viewer/
client.rs

1//! Viewer HTTP client. Fire-and-forget event publisher with exponential
2//! backoff retries.
3//!
4//! The client owns an unbounded mpsc channel + a background tokio task.
5//! Each `send_*` method is synchronous — it serializes the request and
6//! pushes it onto the channel; the background task POSTs to the
7//! viewer's HTTP endpoint with retry. Callers don't await network I/O.
8//!
9//! This module is intentionally ctx-free. Per-request `address` and
10//! `signature` overrides are passed explicitly to each `send_*` call.
11//! Higher-level consumers (e.g. `objectiveai-api`) can wrap this client
12//! and do per-request context resolution on top.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use super::request::{
20    AgentCompletionCreateParams, AgentCompletionRequest,
21    FunctionExecutionCreateParams, FunctionExecutionRequest,
22    FunctionInventionRecursiveCreateParams, FunctionInventionRecursiveRequest,
23    LaboratoryExecutionCreateParams, LaboratoryExecutionRequest, Request,
24    ResponseError,
25};
26
27/// Resolved per-request override pair.
28///
29/// `None` on either field means "fall back to the [`Client`]'s
30/// `default_*`".
31#[derive(Debug, Clone)]
32pub(super) struct ViewerData {
33    pub address: Option<Arc<String>>,
34    pub signature: Option<Arc<String>>,
35}
36
37/// The viewer HTTP client. Constructed once at process startup; all
38/// `send_*` methods take `&self` so the client is shared across the
39/// app via `Arc<Client>`.
40pub struct Client {
41    tx: mpsc::UnboundedSender<(ViewerData, Request)>,
42    /// Background task handle. `flush(self)` drops `tx` and `.await`s
43    /// this so every enqueued request lands (or exhausts its retry
44    /// budget) before the consumer drops the runtime.
45    handle: tokio::task::JoinHandle<()>,
46    /// Fallback address used when a `send_*` call's `address` is
47    /// `None`. Set from the constructor's `address` argument, or from
48    /// `VIEWER_ADDRESS` if `address` is `None` and the `env` feature
49    /// is enabled.
50    pub default_address: Option<Arc<String>>,
51    /// Fallback signature used when a `send_*` call's `signature` is
52    /// `None`. Set from the constructor's `signature` argument, or
53    /// from `VIEWER_SIGNATURE` if `signature` is `None` and the `env`
54    /// feature is enabled.
55    pub default_signature: Option<Arc<String>>,
56}
57
58impl Client {
59    /// Construct a new client and spawn its background task.
60    ///
61    /// `address` / `signature` defaults: an explicit `Some(...)` wins.
62    /// When `None`, and the `env` feature is enabled, falls back to
63    /// the `VIEWER_ADDRESS` / `VIEWER_SIGNATURE` env vars. When both
64    /// the argument and the env var are missing, the client has no
65    /// default — every `send_*` call must supply its own `address`,
66    /// or the request is dropped.
67    ///
68    /// Backoff parameters tune the [`backoff::ExponentialBackoff`]
69    /// the background task uses for retry. `max_elapsed_time` caps
70    /// how long the task keeps retrying a single failed request
71    /// before giving up.
72    pub fn new(
73        http_client: reqwest::Client,
74        address: Option<impl Into<String>>,
75        signature: Option<impl Into<String>>,
76        backoff_current_interval: Duration,
77        backoff_initial_interval: Duration,
78        backoff_randomization_factor: f64,
79        backoff_multiplier: f64,
80        backoff_max_interval: Duration,
81        backoff_max_elapsed_time: Duration,
82    ) -> Self {
83        let default_address = match address {
84            Some(s) => Some(Arc::new(s.into())),
85            #[cfg(feature = "env")]
86            None => std::env::var("VIEWER_ADDRESS").ok().map(Arc::new),
87            #[cfg(not(feature = "env"))]
88            None => None,
89        };
90        let default_signature = match signature {
91            Some(s) => Some(Arc::new(s.into())),
92            #[cfg(feature = "env")]
93            None => std::env::var("VIEWER_SIGNATURE").ok().map(Arc::new),
94            #[cfg(not(feature = "env"))]
95            None => None,
96        };
97
98        let (tx, mut rx) = mpsc::unbounded_channel::<(ViewerData, Request)>();
99
100        let bg_default_address = default_address.clone();
101        let bg_default_signature = default_signature.clone();
102
103        let handle = tokio::spawn(async move {
104            while let Some((viewer_data, request)) = rx.recv().await {
105                let (address, signature) = match viewer_data.address {
106                    Some(addr) => (addr, viewer_data.signature),
107                    None => match &bg_default_address {
108                        Some(addr) => {
109                            (addr.clone(), bg_default_signature.clone())
110                        }
111                        None => continue,
112                    },
113                };
114
115                let url = match &request {
116                    Request::AgentCompletion(_) => {
117                        format!("{}/agent/completions", address)
118                    }
119                    Request::FunctionExecution(_) => {
120                        format!("{}/functions/executions", address)
121                    }
122                    Request::FunctionInventionRecursive(_) => {
123                        format!("{}/functions/inventions/recursive", address)
124                    }
125                    Request::LaboratoryExecution(_) => {
126                        format!("{}/laboratories/executions", address)
127                    }
128                    Request::AgentsFavoritesChanged(_) => {
129                        format!("{}/agents/favorites/changed", address)
130                    }
131                };
132
133                let body = match serde_json::to_vec(&request) {
134                    Ok(body) => body,
135                    Err(_) => continue,
136                };
137
138                let _ = backoff::future::retry(
139                    backoff::ExponentialBackoff {
140                        current_interval: backoff_current_interval,
141                        initial_interval: backoff_initial_interval,
142                        randomization_factor: backoff_randomization_factor,
143                        multiplier: backoff_multiplier,
144                        max_interval: backoff_max_interval,
145                        max_elapsed_time: Some(backoff_max_elapsed_time),
146                        start_time: std::time::Instant::now(),
147                        clock: backoff::SystemClock::default(),
148                    },
149                    || {
150                        let http_client = &http_client;
151                        let url = &url;
152                        let body = &body;
153                        let signature = &signature;
154                        async move {
155                            let mut req = http_client
156                                .post(url.as_str())
157                                .header("Content-Type", "application/json")
158                                .body(body.clone());
159
160                            if let Some(sig) = signature {
161                                req = req
162                                    .header("X-VIEWER-SIGNATURE", sig.as_str());
163                            }
164
165                            let response = req
166                                .send()
167                                .await
168                                .map_err(backoff::Error::transient)?;
169
170                            if response.status().is_success() {
171                                Ok(())
172                            } else {
173                                Err(backoff::Error::transient(
174                                    response.error_for_status().unwrap_err(),
175                                ))
176                            }
177                        }
178                    },
179                )
180                .await;
181            }
182        });
183
184        Self {
185            tx,
186            handle,
187            default_address,
188            default_signature,
189        }
190    }
191
192    /// Closes the channel and awaits the background task. After this
193    /// returns, every enqueued request has either succeeded or
194    /// exhausted its retry budget. Takes `self` so the
195    /// can't-send-after-flush invariant is enforced by the type
196    /// system.
197    ///
198    /// Intended for short-lived consumers (e.g. the cli) that must
199    /// drain in-flight POSTs before dropping the tokio runtime. Long-
200    /// lived consumers (api server) can rely on the natural drop
201    /// behavior — when the `Client` drops, `tx` drops, the bg task's
202    /// `rx.recv()` returns `None`, and the loop exits.
203    pub async fn flush(self) {
204        let Self { tx, handle, .. } = self;
205        drop(tx);
206        let _ = handle.await;
207    }
208
209    /// Internal helper. Pushes one `(ViewerData, Request)` onto the
210    /// channel; the background task takes it from there.
211    fn enqueue(
212        &self,
213        address: Option<Arc<String>>,
214        signature: Option<Arc<String>>,
215        request: Request,
216    ) {
217        let _ = self.tx.send((ViewerData { address, signature }, request));
218    }
219
220    pub fn send_agent_completion_begin(
221        &self,
222        address: Option<Arc<String>>,
223        signature: Option<Arc<String>>,
224        id: String,
225        request: Arc<
226            crate::agent::completions::request::AgentCompletionCreateParams,
227        >,
228    ) {
229        self.enqueue(
230            address,
231            signature,
232            Request::AgentCompletion(AgentCompletionRequest::Begin(
233                AgentCompletionCreateParams { id, inner: request },
234            )),
235        );
236    }
237
238    pub fn send_agent_completion_continue(
239        &self,
240        address: Option<Arc<String>>,
241        signature: Option<Arc<String>>,
242        chunk: crate::agent::completions::response::streaming::AgentCompletionChunk,
243    ) {
244        self.enqueue(
245            address,
246            signature,
247            Request::AgentCompletion(AgentCompletionRequest::Continue(chunk)),
248        );
249    }
250
251    pub fn send_agent_completion_error(
252        &self,
253        address: Option<Arc<String>>,
254        signature: Option<Arc<String>>,
255        id: String,
256        error: crate::error::ResponseError,
257    ) {
258        self.enqueue(
259            address,
260            signature,
261            Request::AgentCompletion(AgentCompletionRequest::Error(
262                ResponseError { id, inner: error },
263            )),
264        );
265    }
266
267    pub fn send_function_execution_begin(
268        &self,
269        address: Option<Arc<String>>,
270        signature: Option<Arc<String>>,
271        id: String,
272        request: Arc<crate::functions::executions::request::FunctionExecutionCreateParams>,
273    ) {
274        self.enqueue(
275            address,
276            signature,
277            Request::FunctionExecution(FunctionExecutionRequest::Begin(
278                FunctionExecutionCreateParams { id, inner: request },
279            )),
280        );
281    }
282
283    pub fn send_function_execution_continue(
284        &self,
285        address: Option<Arc<String>>,
286        signature: Option<Arc<String>>,
287        chunk: crate::functions::executions::response::streaming::FunctionExecutionChunk,
288    ) {
289        self.enqueue(
290            address,
291            signature,
292            Request::FunctionExecution(FunctionExecutionRequest::Continue(
293                chunk,
294            )),
295        );
296    }
297
298    pub fn send_function_execution_error(
299        &self,
300        address: Option<Arc<String>>,
301        signature: Option<Arc<String>>,
302        id: String,
303        error: crate::error::ResponseError,
304    ) {
305        self.enqueue(
306            address,
307            signature,
308            Request::FunctionExecution(FunctionExecutionRequest::Error(
309                ResponseError { id, inner: error },
310            )),
311        );
312    }
313
314    pub fn send_function_invention_recursive_begin(
315        &self,
316        address: Option<Arc<String>>,
317        signature: Option<Arc<String>>,
318        id: String,
319        request: Arc<
320            crate::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams,
321        >,
322    ) {
323        self.enqueue(
324            address,
325            signature,
326            Request::FunctionInventionRecursive(
327                FunctionInventionRecursiveRequest::Begin(
328                    FunctionInventionRecursiveCreateParams {
329                        id,
330                        inner: request,
331                    },
332                ),
333            ),
334        );
335    }
336
337    pub fn send_function_invention_recursive_continue(
338        &self,
339        address: Option<Arc<String>>,
340        signature: Option<Arc<String>>,
341        chunk: crate::functions::inventions::recursive::response::streaming::FunctionInventionRecursiveChunk,
342    ) {
343        self.enqueue(
344            address,
345            signature,
346            Request::FunctionInventionRecursive(
347                FunctionInventionRecursiveRequest::Continue(chunk),
348            ),
349        );
350    }
351
352    pub fn send_function_invention_recursive_error(
353        &self,
354        address: Option<Arc<String>>,
355        signature: Option<Arc<String>>,
356        id: String,
357        error: crate::error::ResponseError,
358    ) {
359        self.enqueue(
360            address,
361            signature,
362            Request::FunctionInventionRecursive(
363                FunctionInventionRecursiveRequest::Error(ResponseError {
364                    id,
365                    inner: error,
366                }),
367            ),
368        );
369    }
370
371    pub fn send_laboratory_execution_begin(
372        &self,
373        address: Option<Arc<String>>,
374        signature: Option<Arc<String>>,
375        id: String,
376        request: Arc<crate::laboratories::executions::request::LaboratoryExecutionCreateParams>,
377    ) {
378        self.enqueue(
379            address,
380            signature,
381            Request::LaboratoryExecution(LaboratoryExecutionRequest::Begin(
382                LaboratoryExecutionCreateParams { id, inner: request },
383            )),
384        );
385    }
386
387    pub fn send_laboratory_execution_continue(
388        &self,
389        address: Option<Arc<String>>,
390        signature: Option<Arc<String>>,
391        chunk: crate::laboratories::executions::response::streaming::LaboratoryExecutionChunk,
392    ) {
393        self.enqueue(
394            address,
395            signature,
396            Request::LaboratoryExecution(LaboratoryExecutionRequest::Continue(
397                chunk,
398            )),
399        );
400    }
401
402    pub fn send_laboratory_execution_error(
403        &self,
404        address: Option<Arc<String>>,
405        signature: Option<Arc<String>>,
406        id: String,
407        error: crate::error::ResponseError,
408    ) {
409        self.enqueue(
410            address,
411            signature,
412            Request::LaboratoryExecution(LaboratoryExecutionRequest::Error(
413                ResponseError { id, inner: error },
414            )),
415        );
416    }
417
418    /// Enqueue an `agents_favorites_changed` notification. Posts to
419    /// `<address>/agents/favorites/changed`. Fired by the cli's
420    /// `agents favorites config {add,del,edit}` handlers so any
421    /// running viewer that exposes a `useFavoriteAgents()`-style hook
422    /// can refresh its favorites list.
423    pub fn send_agents_favorites_changed(
424        &self,
425        address: Option<Arc<String>>,
426        signature: Option<Arc<String>>,
427        notification: crate::agent::favorites::ChangedNotification,
428    ) {
429        self.enqueue(
430            address,
431            signature,
432            Request::AgentsFavoritesChanged(notification),
433        );
434    }
435}