objectiveai_sdk/http/viewer/
client.rs1use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::sync::mpsc;
18
19use super::request::{
20 AgentCompletionCreateParams, AgentCompletionRequest,
21 FunctionExecutionCreateParams, FunctionExecutionRequest, Request,
22 ResponseError,
23};
24
25#[derive(Debug, Clone)]
30pub(super) struct ViewerData {
31 pub address: Option<Arc<String>>,
32 pub signature: Option<Arc<String>>,
33}
34
35pub struct Client {
39 tx: mpsc::UnboundedSender<(ViewerData, Request)>,
40 handle: tokio::task::JoinHandle<()>,
44 pub default_address: Option<Arc<String>>,
49 pub default_signature: Option<Arc<String>>,
54}
55
56impl Client {
57 pub fn new(
71 http_client: reqwest::Client,
72 address: Option<impl Into<String>>,
73 signature: Option<impl Into<String>>,
74 backoff_current_interval: Duration,
75 backoff_initial_interval: Duration,
76 backoff_randomization_factor: f64,
77 backoff_multiplier: f64,
78 backoff_max_interval: Duration,
79 backoff_max_elapsed_time: Duration,
80 ) -> Self {
81 let default_address = match address {
82 Some(s) => Some(Arc::new(s.into())),
83 #[cfg(feature = "env")]
84 None => std::env::var("VIEWER_ADDRESS").ok().map(Arc::new),
85 #[cfg(not(feature = "env"))]
86 None => None,
87 };
88 let default_signature = match signature {
89 Some(s) => Some(Arc::new(s.into())),
90 #[cfg(feature = "env")]
91 None => std::env::var("VIEWER_SIGNATURE").ok().map(Arc::new),
92 #[cfg(not(feature = "env"))]
93 None => None,
94 };
95
96 let (tx, mut rx) = mpsc::unbounded_channel::<(ViewerData, Request)>();
97
98 let bg_default_address = default_address.clone();
99 let bg_default_signature = default_signature.clone();
100
101 let handle = tokio::spawn(async move {
102 while let Some((viewer_data, request)) = rx.recv().await {
103 let (address, signature) = match viewer_data.address {
104 Some(addr) => (addr, viewer_data.signature),
105 None => match &bg_default_address {
106 Some(addr) => {
107 (addr.clone(), bg_default_signature.clone())
108 }
109 None => continue,
110 },
111 };
112
113 let url = match &request {
114 Request::AgentCompletion(_) => {
115 format!("{}/agent/completions", address)
116 }
117 Request::FunctionExecution(_) => {
118 format!("{}/functions/executions", address)
119 }
120 };
121
122 let body = match serde_json::to_vec(&request) {
123 Ok(body) => body,
124 Err(_) => continue,
125 };
126
127 let _ = backoff::future::retry(
128 backoff::ExponentialBackoff {
129 current_interval: backoff_current_interval,
130 initial_interval: backoff_initial_interval,
131 randomization_factor: backoff_randomization_factor,
132 multiplier: backoff_multiplier,
133 max_interval: backoff_max_interval,
134 max_elapsed_time: Some(backoff_max_elapsed_time),
135 start_time: std::time::Instant::now(),
136 clock: backoff::SystemClock::default(),
137 },
138 || {
139 let http_client = &http_client;
140 let url = &url;
141 let body = &body;
142 let signature = &signature;
143 async move {
144 let mut req = http_client
145 .post(url.as_str())
146 .header("Content-Type", "application/json")
147 .body(body.clone());
148
149 if let Some(sig) = signature {
150 req = req
151 .header("X-VIEWER-SIGNATURE", sig.as_str());
152 }
153
154 let response = req
155 .send()
156 .await
157 .map_err(backoff::Error::transient)?;
158
159 if response.status().is_success() {
160 Ok(())
161 } else {
162 Err(backoff::Error::transient(
163 response.error_for_status().unwrap_err(),
164 ))
165 }
166 }
167 },
168 )
169 .await;
170 }
171 });
172
173 Self {
174 tx,
175 handle,
176 default_address,
177 default_signature,
178 }
179 }
180
181 pub async fn flush(self) {
193 let Self { tx, handle, .. } = self;
194 drop(tx);
195 let _ = handle.await;
196 }
197
198 fn enqueue(
201 &self,
202 address: Option<Arc<String>>,
203 signature: Option<Arc<String>>,
204 request: Request,
205 ) {
206 let _ = self.tx.send((ViewerData { address, signature }, request));
207 }
208
209 pub fn send_agent_completion_begin(
210 &self,
211 address: Option<Arc<String>>,
212 signature: Option<Arc<String>>,
213 id: String,
214 request: Arc<
215 crate::agent::completions::request::AgentCompletionCreateParams,
216 >,
217 ) {
218 self.enqueue(
219 address,
220 signature,
221 Request::AgentCompletion(AgentCompletionRequest::Begin(
222 AgentCompletionCreateParams { id, inner: request },
223 )),
224 );
225 }
226
227 pub fn send_agent_completion_continue(
228 &self,
229 address: Option<Arc<String>>,
230 signature: Option<Arc<String>>,
231 chunk: crate::agent::completions::response::streaming::AgentCompletionChunk,
232 ) {
233 self.enqueue(
234 address,
235 signature,
236 Request::AgentCompletion(AgentCompletionRequest::Continue(chunk)),
237 );
238 }
239
240 pub fn send_agent_completion_error(
241 &self,
242 address: Option<Arc<String>>,
243 signature: Option<Arc<String>>,
244 id: String,
245 error: crate::error::ResponseError,
246 ) {
247 self.enqueue(
248 address,
249 signature,
250 Request::AgentCompletion(AgentCompletionRequest::Error(
251 ResponseError { id, inner: error },
252 )),
253 );
254 }
255
256 pub fn send_function_execution_begin(
257 &self,
258 address: Option<Arc<String>>,
259 signature: Option<Arc<String>>,
260 id: String,
261 request: Arc<crate::functions::executions::request::FunctionExecutionCreateParams>,
262 ) {
263 self.enqueue(
264 address,
265 signature,
266 Request::FunctionExecution(FunctionExecutionRequest::Begin(
267 FunctionExecutionCreateParams { id, inner: request },
268 )),
269 );
270 }
271
272 pub fn send_function_execution_continue(
273 &self,
274 address: Option<Arc<String>>,
275 signature: Option<Arc<String>>,
276 chunk: crate::functions::executions::response::streaming::FunctionExecutionChunk,
277 ) {
278 self.enqueue(
279 address,
280 signature,
281 Request::FunctionExecution(FunctionExecutionRequest::Continue(
282 chunk,
283 )),
284 );
285 }
286
287 pub fn send_function_execution_error(
288 &self,
289 address: Option<Arc<String>>,
290 signature: Option<Arc<String>>,
291 id: String,
292 error: crate::error::ResponseError,
293 ) {
294 self.enqueue(
295 address,
296 signature,
297 Request::FunctionExecution(FunctionExecutionRequest::Error(
298 ResponseError { id, inner: error },
299 )),
300 );
301 }
302
303}