1use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use super::request::{
20 AgentCompletionCreateParams, AgentCompletionRequest,
21 FunctionExecutionCreateParams, FunctionExecutionRequest,
22 FunctionInventionRecursiveCreateParams, FunctionInventionRecursiveRequest,
23 LaboratoryExecutionCreateParams, LaboratoryExecutionRequest, Request,
24 ResponseError,
25};
26
27#[derive(Debug, Clone)]
32pub(super) struct ViewerData {
33 pub address: Option<Arc<String>>,
34 pub signature: Option<Arc<String>>,
35}
36
37pub struct Client {
41 tx: mpsc::UnboundedSender<(ViewerData, Request)>,
42 handle: tokio::task::JoinHandle<()>,
46 pub default_address: Option<Arc<String>>,
51 pub default_signature: Option<Arc<String>>,
56}
57
58impl Client {
59 pub fn new(
73 http_client: reqwest::Client,
74 address: Option<impl Into<String>>,
75 signature: Option<impl Into<String>>,
76 backoff_current_interval: Duration,
77 backoff_initial_interval: Duration,
78 backoff_randomization_factor: f64,
79 backoff_multiplier: f64,
80 backoff_max_interval: Duration,
81 backoff_max_elapsed_time: Duration,
82 ) -> Self {
83 let default_address = match address {
84 Some(s) => Some(Arc::new(s.into())),
85 #[cfg(feature = "env")]
86 None => std::env::var("VIEWER_ADDRESS").ok().map(Arc::new),
87 #[cfg(not(feature = "env"))]
88 None => None,
89 };
90 let default_signature = match signature {
91 Some(s) => Some(Arc::new(s.into())),
92 #[cfg(feature = "env")]
93 None => std::env::var("VIEWER_SIGNATURE").ok().map(Arc::new),
94 #[cfg(not(feature = "env"))]
95 None => None,
96 };
97
98 let (tx, mut rx) = mpsc::unbounded_channel::<(ViewerData, Request)>();
99
100 let bg_default_address = default_address.clone();
101 let bg_default_signature = default_signature.clone();
102
103 let handle = tokio::spawn(async move {
104 while let Some((viewer_data, request)) = rx.recv().await {
105 let (address, signature) = match viewer_data.address {
106 Some(addr) => (addr, viewer_data.signature),
107 None => match &bg_default_address {
108 Some(addr) => {
109 (addr.clone(), bg_default_signature.clone())
110 }
111 None => continue,
112 },
113 };
114
115 let url = match &request {
116 Request::AgentCompletion(_) => {
117 format!("{}/agent/completions", address)
118 }
119 Request::FunctionExecution(_) => {
120 format!("{}/functions/executions", address)
121 }
122 Request::FunctionInventionRecursive(_) => {
123 format!("{}/functions/inventions/recursive", address)
124 }
125 Request::LaboratoryExecution(_) => {
126 format!("{}/laboratories/executions", address)
127 }
128 Request::AgentsFavoritesChanged(_) => {
129 format!("{}/agents/favorites/changed", address)
130 }
131 };
132
133 let body = match serde_json::to_vec(&request) {
134 Ok(body) => body,
135 Err(_) => continue,
136 };
137
138 let _ = backoff::future::retry(
139 backoff::ExponentialBackoff {
140 current_interval: backoff_current_interval,
141 initial_interval: backoff_initial_interval,
142 randomization_factor: backoff_randomization_factor,
143 multiplier: backoff_multiplier,
144 max_interval: backoff_max_interval,
145 max_elapsed_time: Some(backoff_max_elapsed_time),
146 start_time: std::time::Instant::now(),
147 clock: backoff::SystemClock::default(),
148 },
149 || {
150 let http_client = &http_client;
151 let url = &url;
152 let body = &body;
153 let signature = &signature;
154 async move {
155 let mut req = http_client
156 .post(url.as_str())
157 .header("Content-Type", "application/json")
158 .body(body.clone());
159
160 if let Some(sig) = signature {
161 req = req
162 .header("X-VIEWER-SIGNATURE", sig.as_str());
163 }
164
165 let response = req
166 .send()
167 .await
168 .map_err(backoff::Error::transient)?;
169
170 if response.status().is_success() {
171 Ok(())
172 } else {
173 Err(backoff::Error::transient(
174 response.error_for_status().unwrap_err(),
175 ))
176 }
177 }
178 },
179 )
180 .await;
181 }
182 });
183
184 Self {
185 tx,
186 handle,
187 default_address,
188 default_signature,
189 }
190 }
191
192 pub async fn flush(self) {
204 let Self { tx, handle, .. } = self;
205 drop(tx);
206 let _ = handle.await;
207 }
208
209 fn enqueue(
212 &self,
213 address: Option<Arc<String>>,
214 signature: Option<Arc<String>>,
215 request: Request,
216 ) {
217 let _ = self.tx.send((ViewerData { address, signature }, request));
218 }
219
220 pub fn send_agent_completion_begin(
221 &self,
222 address: Option<Arc<String>>,
223 signature: Option<Arc<String>>,
224 id: String,
225 request: Arc<
226 crate::agent::completions::request::AgentCompletionCreateParams,
227 >,
228 ) {
229 self.enqueue(
230 address,
231 signature,
232 Request::AgentCompletion(AgentCompletionRequest::Begin(
233 AgentCompletionCreateParams { id, inner: request },
234 )),
235 );
236 }
237
238 pub fn send_agent_completion_continue(
239 &self,
240 address: Option<Arc<String>>,
241 signature: Option<Arc<String>>,
242 chunk: crate::agent::completions::response::streaming::AgentCompletionChunk,
243 ) {
244 self.enqueue(
245 address,
246 signature,
247 Request::AgentCompletion(AgentCompletionRequest::Continue(chunk)),
248 );
249 }
250
251 pub fn send_agent_completion_error(
252 &self,
253 address: Option<Arc<String>>,
254 signature: Option<Arc<String>>,
255 id: String,
256 error: crate::error::ResponseError,
257 ) {
258 self.enqueue(
259 address,
260 signature,
261 Request::AgentCompletion(AgentCompletionRequest::Error(
262 ResponseError { id, inner: error },
263 )),
264 );
265 }
266
267 pub fn send_function_execution_begin(
268 &self,
269 address: Option<Arc<String>>,
270 signature: Option<Arc<String>>,
271 id: String,
272 request: Arc<crate::functions::executions::request::FunctionExecutionCreateParams>,
273 ) {
274 self.enqueue(
275 address,
276 signature,
277 Request::FunctionExecution(FunctionExecutionRequest::Begin(
278 FunctionExecutionCreateParams { id, inner: request },
279 )),
280 );
281 }
282
283 pub fn send_function_execution_continue(
284 &self,
285 address: Option<Arc<String>>,
286 signature: Option<Arc<String>>,
287 chunk: crate::functions::executions::response::streaming::FunctionExecutionChunk,
288 ) {
289 self.enqueue(
290 address,
291 signature,
292 Request::FunctionExecution(FunctionExecutionRequest::Continue(
293 chunk,
294 )),
295 );
296 }
297
298 pub fn send_function_execution_error(
299 &self,
300 address: Option<Arc<String>>,
301 signature: Option<Arc<String>>,
302 id: String,
303 error: crate::error::ResponseError,
304 ) {
305 self.enqueue(
306 address,
307 signature,
308 Request::FunctionExecution(FunctionExecutionRequest::Error(
309 ResponseError { id, inner: error },
310 )),
311 );
312 }
313
314 pub fn send_function_invention_recursive_begin(
315 &self,
316 address: Option<Arc<String>>,
317 signature: Option<Arc<String>>,
318 id: String,
319 request: Arc<
320 crate::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams,
321 >,
322 ) {
323 self.enqueue(
324 address,
325 signature,
326 Request::FunctionInventionRecursive(
327 FunctionInventionRecursiveRequest::Begin(
328 FunctionInventionRecursiveCreateParams {
329 id,
330 inner: request,
331 },
332 ),
333 ),
334 );
335 }
336
337 pub fn send_function_invention_recursive_continue(
338 &self,
339 address: Option<Arc<String>>,
340 signature: Option<Arc<String>>,
341 chunk: crate::functions::inventions::recursive::response::streaming::FunctionInventionRecursiveChunk,
342 ) {
343 self.enqueue(
344 address,
345 signature,
346 Request::FunctionInventionRecursive(
347 FunctionInventionRecursiveRequest::Continue(chunk),
348 ),
349 );
350 }
351
352 pub fn send_function_invention_recursive_error(
353 &self,
354 address: Option<Arc<String>>,
355 signature: Option<Arc<String>>,
356 id: String,
357 error: crate::error::ResponseError,
358 ) {
359 self.enqueue(
360 address,
361 signature,
362 Request::FunctionInventionRecursive(
363 FunctionInventionRecursiveRequest::Error(ResponseError {
364 id,
365 inner: error,
366 }),
367 ),
368 );
369 }
370
371 pub fn send_laboratory_execution_begin(
372 &self,
373 address: Option<Arc<String>>,
374 signature: Option<Arc<String>>,
375 id: String,
376 request: Arc<crate::laboratories::executions::request::LaboratoryExecutionCreateParams>,
377 ) {
378 self.enqueue(
379 address,
380 signature,
381 Request::LaboratoryExecution(LaboratoryExecutionRequest::Begin(
382 LaboratoryExecutionCreateParams { id, inner: request },
383 )),
384 );
385 }
386
387 pub fn send_laboratory_execution_continue(
388 &self,
389 address: Option<Arc<String>>,
390 signature: Option<Arc<String>>,
391 chunk: crate::laboratories::executions::response::streaming::LaboratoryExecutionChunk,
392 ) {
393 self.enqueue(
394 address,
395 signature,
396 Request::LaboratoryExecution(LaboratoryExecutionRequest::Continue(
397 chunk,
398 )),
399 );
400 }
401
402 pub fn send_laboratory_execution_error(
403 &self,
404 address: Option<Arc<String>>,
405 signature: Option<Arc<String>>,
406 id: String,
407 error: crate::error::ResponseError,
408 ) {
409 self.enqueue(
410 address,
411 signature,
412 Request::LaboratoryExecution(LaboratoryExecutionRequest::Error(
413 ResponseError { id, inner: error },
414 )),
415 );
416 }
417
418 pub fn send_agents_favorites_changed(
424 &self,
425 address: Option<Arc<String>>,
426 signature: Option<Arc<String>>,
427 notification: crate::agent::favorites::ChangedNotification,
428 ) {
429 self.enqueue(
430 address,
431 signature,
432 Request::AgentsFavoritesChanged(notification),
433 );
434 }
435}