Skip to main content

objectiveai_sdk/http/viewer/
client.rs

1//! Viewer HTTP client. Fire-and-forget event publisher with exponential
2//! backoff retries.
3//!
4//! The client owns an unbounded mpsc channel + a background tokio task.
5//! Each `send_*` method is synchronous — it serializes the request and
6//! pushes it onto the channel; the background task POSTs to the
7//! viewer's HTTP endpoint with retry. Callers don't await network I/O.
8//!
9//! This module is intentionally ctx-free. Per-request `address` and
10//! `signature` overrides are passed explicitly to each `send_*` call.
11//! Higher-level consumers (e.g. `objectiveai-api`) can wrap this client
12//! and do per-request context resolution on top.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use super::request::{
20    AgentCompletionCreateParams, AgentCompletionRequest, FunctionExecutionCreateParams,
21    FunctionExecutionRequest, FunctionInventionRecursiveCreateParams,
22    FunctionInventionRecursiveRequest, LaboratoryExecutionCreateParams,
23    LaboratoryExecutionRequest, Request, ResponseError,
24};
25
26/// Resolved per-request override pair.
27///
28/// `None` on either field means "fall back to the [`Client`]'s
29/// `default_*`".
30#[derive(Debug, Clone)]
31pub(super) struct ViewerData {
32    pub address: Option<Arc<String>>,
33    pub signature: Option<Arc<String>>,
34}
35
36/// The viewer HTTP client. Constructed once at process startup; all
37/// `send_*` methods take `&self` so the client is shared across the
38/// app via `Arc<Client>`.
39pub struct Client {
40    tx: mpsc::UnboundedSender<(ViewerData, Request)>,
41    /// Fallback address used when a `send_*` call's `address` is
42    /// `None`. Set from the constructor's `address` argument, or from
43    /// `VIEWER_ADDRESS` if `address` is `None` and the `env` feature
44    /// is enabled.
45    pub default_address: Option<Arc<String>>,
46    /// Fallback signature used when a `send_*` call's `signature` is
47    /// `None`. Set from the constructor's `signature` argument, or
48    /// from `VIEWER_SIGNATURE` if `signature` is `None` and the `env`
49    /// feature is enabled.
50    pub default_signature: Option<Arc<String>>,
51}
52
53impl Client {
54    /// Construct a new client and spawn its background task.
55    ///
56    /// `address` / `signature` defaults: an explicit `Some(...)` wins.
57    /// When `None`, and the `env` feature is enabled, falls back to
58    /// the `VIEWER_ADDRESS` / `VIEWER_SIGNATURE` env vars. When both
59    /// the argument and the env var are missing, the client has no
60    /// default — every `send_*` call must supply its own `address`,
61    /// or the request is dropped.
62    ///
63    /// Backoff parameters tune the [`backoff::ExponentialBackoff`]
64    /// the background task uses for retry. `max_elapsed_time` caps
65    /// how long the task keeps retrying a single failed request
66    /// before giving up.
67    pub fn new(
68        http_client: reqwest::Client,
69        address: Option<impl Into<String>>,
70        signature: Option<impl Into<String>>,
71        backoff_current_interval: Duration,
72        backoff_initial_interval: Duration,
73        backoff_randomization_factor: f64,
74        backoff_multiplier: f64,
75        backoff_max_interval: Duration,
76        backoff_max_elapsed_time: Duration,
77    ) -> Self {
78        let default_address = match address {
79            Some(s) => Some(Arc::new(s.into())),
80            #[cfg(feature = "env")]
81            None => std::env::var("VIEWER_ADDRESS").ok().map(Arc::new),
82            #[cfg(not(feature = "env"))]
83            None => None,
84        };
85        let default_signature = match signature {
86            Some(s) => Some(Arc::new(s.into())),
87            #[cfg(feature = "env")]
88            None => std::env::var("VIEWER_SIGNATURE").ok().map(Arc::new),
89            #[cfg(not(feature = "env"))]
90            None => None,
91        };
92
93        let (tx, mut rx) = mpsc::unbounded_channel::<(ViewerData, Request)>();
94
95        let bg_default_address = default_address.clone();
96        let bg_default_signature = default_signature.clone();
97
98        tokio::spawn(async move {
99            while let Some((viewer_data, request)) = rx.recv().await {
100                let (address, signature) = match viewer_data.address {
101                    Some(addr) => (addr, viewer_data.signature),
102                    None => match &bg_default_address {
103                        Some(addr) => (addr.clone(), bg_default_signature.clone()),
104                        None => continue,
105                    },
106                };
107
108                let url = match &request {
109                    Request::AgentCompletion(_) => {
110                        format!("{}/agent/completions", address)
111                    }
112                    Request::FunctionExecution(_) => {
113                        format!("{}/functions/executions", address)
114                    }
115                    Request::FunctionInventionRecursive(_) => {
116                        format!("{}/functions/inventions/recursive", address)
117                    }
118                    Request::LaboratoryExecution(_) => {
119                        format!("{}/laboratories/executions", address)
120                    }
121                };
122
123                let body = match serde_json::to_vec(&request) {
124                    Ok(body) => body,
125                    Err(_) => continue,
126                };
127
128                let _ = backoff::future::retry(
129                    backoff::ExponentialBackoff {
130                        current_interval: backoff_current_interval,
131                        initial_interval: backoff_initial_interval,
132                        randomization_factor: backoff_randomization_factor,
133                        multiplier: backoff_multiplier,
134                        max_interval: backoff_max_interval,
135                        max_elapsed_time: Some(backoff_max_elapsed_time),
136                        start_time: std::time::Instant::now(),
137                        clock: backoff::SystemClock::default(),
138                    },
139                    || {
140                        let http_client = &http_client;
141                        let url = &url;
142                        let body = &body;
143                        let signature = &signature;
144                        async move {
145                            let mut req = http_client
146                                .post(url.as_str())
147                                .header("Content-Type", "application/json")
148                                .body(body.clone());
149
150                            if let Some(sig) = signature {
151                                req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
152                            }
153
154                            let response = req.send().await.map_err(backoff::Error::transient)?;
155
156                            if response.status().is_success() {
157                                Ok(())
158                            } else {
159                                Err(backoff::Error::transient(
160                                    response.error_for_status().unwrap_err(),
161                                ))
162                            }
163                        }
164                    },
165                )
166                .await;
167            }
168        });
169
170        Self {
171            tx,
172            default_address,
173            default_signature,
174        }
175    }
176
177    /// Internal helper. Pushes one `(ViewerData, Request)` onto the
178    /// channel; the background task takes it from there.
179    fn enqueue(
180        &self,
181        address: Option<Arc<String>>,
182        signature: Option<Arc<String>>,
183        request: Request,
184    ) {
185        let _ = self.tx.send((ViewerData { address, signature }, request));
186    }
187
188    pub fn send_agent_completion_begin(
189        &self,
190        address: Option<Arc<String>>,
191        signature: Option<Arc<String>>,
192        id: String,
193        request: Arc<crate::agent::completions::request::AgentCompletionCreateParams>,
194    ) {
195        self.enqueue(
196            address,
197            signature,
198            Request::AgentCompletion(AgentCompletionRequest::Begin(
199                AgentCompletionCreateParams { id, inner: request },
200            )),
201        );
202    }
203
204    pub fn send_agent_completion_continue(
205        &self,
206        address: Option<Arc<String>>,
207        signature: Option<Arc<String>>,
208        chunk: crate::agent::completions::response::streaming::AgentCompletionChunk,
209    ) {
210        self.enqueue(
211            address,
212            signature,
213            Request::AgentCompletion(AgentCompletionRequest::Continue(chunk)),
214        );
215    }
216
217    pub fn send_agent_completion_error(
218        &self,
219        address: Option<Arc<String>>,
220        signature: Option<Arc<String>>,
221        id: String,
222        error: crate::error::ResponseError,
223    ) {
224        self.enqueue(
225            address,
226            signature,
227            Request::AgentCompletion(AgentCompletionRequest::Error(ResponseError {
228                id,
229                inner: error,
230            })),
231        );
232    }
233
234    pub fn send_function_execution_begin(
235        &self,
236        address: Option<Arc<String>>,
237        signature: Option<Arc<String>>,
238        id: String,
239        request: Arc<crate::functions::executions::request::FunctionExecutionCreateParams>,
240    ) {
241        self.enqueue(
242            address,
243            signature,
244            Request::FunctionExecution(FunctionExecutionRequest::Begin(
245                FunctionExecutionCreateParams { id, inner: request },
246            )),
247        );
248    }
249
250    pub fn send_function_execution_continue(
251        &self,
252        address: Option<Arc<String>>,
253        signature: Option<Arc<String>>,
254        chunk: crate::functions::executions::response::streaming::FunctionExecutionChunk,
255    ) {
256        self.enqueue(
257            address,
258            signature,
259            Request::FunctionExecution(FunctionExecutionRequest::Continue(chunk)),
260        );
261    }
262
263    pub fn send_function_execution_error(
264        &self,
265        address: Option<Arc<String>>,
266        signature: Option<Arc<String>>,
267        id: String,
268        error: crate::error::ResponseError,
269    ) {
270        self.enqueue(
271            address,
272            signature,
273            Request::FunctionExecution(FunctionExecutionRequest::Error(ResponseError {
274                id,
275                inner: error,
276            })),
277        );
278    }
279
280    pub fn send_function_invention_recursive_begin(
281        &self,
282        address: Option<Arc<String>>,
283        signature: Option<Arc<String>>,
284        id: String,
285        request: Arc<
286            crate::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams,
287        >,
288    ) {
289        self.enqueue(
290            address,
291            signature,
292            Request::FunctionInventionRecursive(FunctionInventionRecursiveRequest::Begin(
293                FunctionInventionRecursiveCreateParams { id, inner: request },
294            )),
295        );
296    }
297
298    pub fn send_function_invention_recursive_continue(
299        &self,
300        address: Option<Arc<String>>,
301        signature: Option<Arc<String>>,
302        chunk: crate::functions::inventions::recursive::response::streaming::FunctionInventionRecursiveChunk,
303    ) {
304        self.enqueue(
305            address,
306            signature,
307            Request::FunctionInventionRecursive(FunctionInventionRecursiveRequest::Continue(chunk)),
308        );
309    }
310
311    pub fn send_function_invention_recursive_error(
312        &self,
313        address: Option<Arc<String>>,
314        signature: Option<Arc<String>>,
315        id: String,
316        error: crate::error::ResponseError,
317    ) {
318        self.enqueue(
319            address,
320            signature,
321            Request::FunctionInventionRecursive(FunctionInventionRecursiveRequest::Error(
322                ResponseError { id, inner: error },
323            )),
324        );
325    }
326
327    pub fn send_laboratory_execution_begin(
328        &self,
329        address: Option<Arc<String>>,
330        signature: Option<Arc<String>>,
331        id: String,
332        request: Arc<crate::laboratories::executions::request::LaboratoryExecutionCreateParams>,
333    ) {
334        self.enqueue(
335            address,
336            signature,
337            Request::LaboratoryExecution(LaboratoryExecutionRequest::Begin(
338                LaboratoryExecutionCreateParams { id, inner: request },
339            )),
340        );
341    }
342
343    pub fn send_laboratory_execution_continue(
344        &self,
345        address: Option<Arc<String>>,
346        signature: Option<Arc<String>>,
347        chunk: crate::laboratories::executions::response::streaming::LaboratoryExecutionChunk,
348    ) {
349        self.enqueue(
350            address,
351            signature,
352            Request::LaboratoryExecution(LaboratoryExecutionRequest::Continue(chunk)),
353        );
354    }
355
356    pub fn send_laboratory_execution_error(
357        &self,
358        address: Option<Arc<String>>,
359        signature: Option<Arc<String>>,
360        id: String,
361        error: crate::error::ResponseError,
362    ) {
363        self.enqueue(
364            address,
365            signature,
366            Request::LaboratoryExecution(LaboratoryExecutionRequest::Error(ResponseError {
367                id,
368                inner: error,
369            })),
370        );
371    }
372}