1use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use super::request::{
20 AgentCompletionCreateParams, AgentCompletionRequest, FunctionExecutionCreateParams,
21 FunctionExecutionRequest, FunctionInventionRecursiveCreateParams,
22 FunctionInventionRecursiveRequest, LaboratoryExecutionCreateParams,
23 LaboratoryExecutionRequest, Request, ResponseError,
24};
25
26#[derive(Debug, Clone)]
31pub(super) struct ViewerData {
32 pub address: Option<Arc<String>>,
33 pub signature: Option<Arc<String>>,
34}
35
36pub struct Client {
40 tx: mpsc::UnboundedSender<(ViewerData, Request)>,
41 pub default_address: Option<Arc<String>>,
46 pub default_signature: Option<Arc<String>>,
51}
52
53impl Client {
54 pub fn new(
68 http_client: reqwest::Client,
69 address: Option<impl Into<String>>,
70 signature: Option<impl Into<String>>,
71 backoff_current_interval: Duration,
72 backoff_initial_interval: Duration,
73 backoff_randomization_factor: f64,
74 backoff_multiplier: f64,
75 backoff_max_interval: Duration,
76 backoff_max_elapsed_time: Duration,
77 ) -> Self {
78 let default_address = match address {
79 Some(s) => Some(Arc::new(s.into())),
80 #[cfg(feature = "env")]
81 None => std::env::var("VIEWER_ADDRESS").ok().map(Arc::new),
82 #[cfg(not(feature = "env"))]
83 None => None,
84 };
85 let default_signature = match signature {
86 Some(s) => Some(Arc::new(s.into())),
87 #[cfg(feature = "env")]
88 None => std::env::var("VIEWER_SIGNATURE").ok().map(Arc::new),
89 #[cfg(not(feature = "env"))]
90 None => None,
91 };
92
93 let (tx, mut rx) = mpsc::unbounded_channel::<(ViewerData, Request)>();
94
95 let bg_default_address = default_address.clone();
96 let bg_default_signature = default_signature.clone();
97
98 tokio::spawn(async move {
99 while let Some((viewer_data, request)) = rx.recv().await {
100 let (address, signature) = match viewer_data.address {
101 Some(addr) => (addr, viewer_data.signature),
102 None => match &bg_default_address {
103 Some(addr) => (addr.clone(), bg_default_signature.clone()),
104 None => continue,
105 },
106 };
107
108 let url = match &request {
109 Request::AgentCompletion(_) => {
110 format!("{}/agent/completions", address)
111 }
112 Request::FunctionExecution(_) => {
113 format!("{}/functions/executions", address)
114 }
115 Request::FunctionInventionRecursive(_) => {
116 format!("{}/functions/inventions/recursive", address)
117 }
118 Request::LaboratoryExecution(_) => {
119 format!("{}/laboratories/executions", address)
120 }
121 };
122
123 let body = match serde_json::to_vec(&request) {
124 Ok(body) => body,
125 Err(_) => continue,
126 };
127
128 let _ = backoff::future::retry(
129 backoff::ExponentialBackoff {
130 current_interval: backoff_current_interval,
131 initial_interval: backoff_initial_interval,
132 randomization_factor: backoff_randomization_factor,
133 multiplier: backoff_multiplier,
134 max_interval: backoff_max_interval,
135 max_elapsed_time: Some(backoff_max_elapsed_time),
136 start_time: std::time::Instant::now(),
137 clock: backoff::SystemClock::default(),
138 },
139 || {
140 let http_client = &http_client;
141 let url = &url;
142 let body = &body;
143 let signature = &signature;
144 async move {
145 let mut req = http_client
146 .post(url.as_str())
147 .header("Content-Type", "application/json")
148 .body(body.clone());
149
150 if let Some(sig) = signature {
151 req = req.header("X-VIEWER-SIGNATURE", sig.as_str());
152 }
153
154 let response = req.send().await.map_err(backoff::Error::transient)?;
155
156 if response.status().is_success() {
157 Ok(())
158 } else {
159 Err(backoff::Error::transient(
160 response.error_for_status().unwrap_err(),
161 ))
162 }
163 }
164 },
165 )
166 .await;
167 }
168 });
169
170 Self {
171 tx,
172 default_address,
173 default_signature,
174 }
175 }
176
177 fn enqueue(
180 &self,
181 address: Option<Arc<String>>,
182 signature: Option<Arc<String>>,
183 request: Request,
184 ) {
185 let _ = self.tx.send((ViewerData { address, signature }, request));
186 }
187
188 pub fn send_agent_completion_begin(
189 &self,
190 address: Option<Arc<String>>,
191 signature: Option<Arc<String>>,
192 id: String,
193 request: Arc<crate::agent::completions::request::AgentCompletionCreateParams>,
194 ) {
195 self.enqueue(
196 address,
197 signature,
198 Request::AgentCompletion(AgentCompletionRequest::Begin(
199 AgentCompletionCreateParams { id, inner: request },
200 )),
201 );
202 }
203
204 pub fn send_agent_completion_continue(
205 &self,
206 address: Option<Arc<String>>,
207 signature: Option<Arc<String>>,
208 chunk: crate::agent::completions::response::streaming::AgentCompletionChunk,
209 ) {
210 self.enqueue(
211 address,
212 signature,
213 Request::AgentCompletion(AgentCompletionRequest::Continue(chunk)),
214 );
215 }
216
217 pub fn send_agent_completion_error(
218 &self,
219 address: Option<Arc<String>>,
220 signature: Option<Arc<String>>,
221 id: String,
222 error: crate::error::ResponseError,
223 ) {
224 self.enqueue(
225 address,
226 signature,
227 Request::AgentCompletion(AgentCompletionRequest::Error(ResponseError {
228 id,
229 inner: error,
230 })),
231 );
232 }
233
234 pub fn send_function_execution_begin(
235 &self,
236 address: Option<Arc<String>>,
237 signature: Option<Arc<String>>,
238 id: String,
239 request: Arc<crate::functions::executions::request::FunctionExecutionCreateParams>,
240 ) {
241 self.enqueue(
242 address,
243 signature,
244 Request::FunctionExecution(FunctionExecutionRequest::Begin(
245 FunctionExecutionCreateParams { id, inner: request },
246 )),
247 );
248 }
249
250 pub fn send_function_execution_continue(
251 &self,
252 address: Option<Arc<String>>,
253 signature: Option<Arc<String>>,
254 chunk: crate::functions::executions::response::streaming::FunctionExecutionChunk,
255 ) {
256 self.enqueue(
257 address,
258 signature,
259 Request::FunctionExecution(FunctionExecutionRequest::Continue(chunk)),
260 );
261 }
262
263 pub fn send_function_execution_error(
264 &self,
265 address: Option<Arc<String>>,
266 signature: Option<Arc<String>>,
267 id: String,
268 error: crate::error::ResponseError,
269 ) {
270 self.enqueue(
271 address,
272 signature,
273 Request::FunctionExecution(FunctionExecutionRequest::Error(ResponseError {
274 id,
275 inner: error,
276 })),
277 );
278 }
279
280 pub fn send_function_invention_recursive_begin(
281 &self,
282 address: Option<Arc<String>>,
283 signature: Option<Arc<String>>,
284 id: String,
285 request: Arc<
286 crate::functions::inventions::recursive::request::FunctionInventionRecursiveCreateParams,
287 >,
288 ) {
289 self.enqueue(
290 address,
291 signature,
292 Request::FunctionInventionRecursive(FunctionInventionRecursiveRequest::Begin(
293 FunctionInventionRecursiveCreateParams { id, inner: request },
294 )),
295 );
296 }
297
298 pub fn send_function_invention_recursive_continue(
299 &self,
300 address: Option<Arc<String>>,
301 signature: Option<Arc<String>>,
302 chunk: crate::functions::inventions::recursive::response::streaming::FunctionInventionRecursiveChunk,
303 ) {
304 self.enqueue(
305 address,
306 signature,
307 Request::FunctionInventionRecursive(FunctionInventionRecursiveRequest::Continue(chunk)),
308 );
309 }
310
311 pub fn send_function_invention_recursive_error(
312 &self,
313 address: Option<Arc<String>>,
314 signature: Option<Arc<String>>,
315 id: String,
316 error: crate::error::ResponseError,
317 ) {
318 self.enqueue(
319 address,
320 signature,
321 Request::FunctionInventionRecursive(FunctionInventionRecursiveRequest::Error(
322 ResponseError { id, inner: error },
323 )),
324 );
325 }
326
327 pub fn send_laboratory_execution_begin(
328 &self,
329 address: Option<Arc<String>>,
330 signature: Option<Arc<String>>,
331 id: String,
332 request: Arc<crate::laboratories::executions::request::LaboratoryExecutionCreateParams>,
333 ) {
334 self.enqueue(
335 address,
336 signature,
337 Request::LaboratoryExecution(LaboratoryExecutionRequest::Begin(
338 LaboratoryExecutionCreateParams { id, inner: request },
339 )),
340 );
341 }
342
343 pub fn send_laboratory_execution_continue(
344 &self,
345 address: Option<Arc<String>>,
346 signature: Option<Arc<String>>,
347 chunk: crate::laboratories::executions::response::streaming::LaboratoryExecutionChunk,
348 ) {
349 self.enqueue(
350 address,
351 signature,
352 Request::LaboratoryExecution(LaboratoryExecutionRequest::Continue(chunk)),
353 );
354 }
355
356 pub fn send_laboratory_execution_error(
357 &self,
358 address: Option<Arc<String>>,
359 signature: Option<Arc<String>>,
360 id: String,
361 error: crate::error::ResponseError,
362 ) {
363 self.enqueue(
364 address,
365 signature,
366 Request::LaboratoryExecution(LaboratoryExecutionRequest::Error(ResponseError {
367 id,
368 inner: error,
369 })),
370 );
371 }
372}