1use std::path::PathBuf;
2
3use super::super::{Client, Error};
4use super::ListItem;
5
6#[derive(Debug)]
8pub enum LogContent {
9 Json(serde_json::Value),
10 DataUrl(String),
12}
13
14impl Client {
15 fn endpoint_dir(&self, endpoint: &str) -> PathBuf {
16 let mut dir = self.logs_dir();
17 for segment in endpoint.split('/') {
18 dir = dir.join(segment);
19 }
20 dir
21 }
22
23 async fn list_endpoint(
24 &self,
25 endpoint: &str,
26 offset: usize,
27 limit: usize,
28 ) -> Result<Vec<ListItem>, Error> {
29 let dir = self.endpoint_dir(endpoint);
30 match tokio::fs::metadata(&dir).await {
31 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
32 Err(e) => return Err(Error::ReadDir(dir, e)),
33 Ok(_) => {}
34 }
35 let mut read_dir = tokio::fs::read_dir(&dir)
36 .await
37 .map_err(|e| Error::ReadDir(dir.clone(), e))?;
38 let mut items = Vec::new();
39 while let Some(entry) = read_dir
40 .next_entry()
41 .await
42 .map_err(|e| Error::ReadDir(dir.clone(), e))?
43 {
44 let path = entry.path();
45 if path.extension().and_then(|e| e.to_str()) != Some("json") {
46 continue;
47 }
48 let stem = match path.file_stem().and_then(|s| s.to_str()) {
49 Some(s) => s.to_string(),
50 None => continue,
51 };
52 let metadata = tokio::fs::metadata(&path)
53 .await
54 .map_err(|e| Error::Read(path.clone(), e))?;
55 let created = metadata
56 .modified()
57 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
58 .duration_since(std::time::SystemTime::UNIX_EPOCH)
59 .unwrap_or_default()
60 .as_secs();
61 items.push(ListItem { id: stem, created });
62 }
63 items.sort_by(|a, b| b.created.cmp(&a.created));
64 if offset > 0 || limit < usize::MAX {
65 items = items.into_iter().skip(offset).take(limit).collect();
66 }
67 Ok(items)
68 }
69
70 pub async fn list_agent_completions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
73 self.list_endpoint("agents/completions", offset, limit).await
74 }
75 pub async fn list_vector_completions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
76 self.list_endpoint("vector/completions", offset, limit).await
77 }
78 pub async fn list_function_executions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
79 self.list_endpoint("functions/executions", offset, limit).await
80 }
81 pub async fn list_function_inventions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
82 self.list_endpoint("functions/inventions", offset, limit).await
83 }
84 pub async fn list_function_inventions_recursive(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
85 self.list_endpoint("functions/inventions/recursive", offset, limit).await
86 }
87 pub async fn list_laboratory_executions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
88 self.list_endpoint("laboratories/executions", offset, limit).await
89 }
90
91 async fn clear_endpoint(&self, endpoint: &str) -> Result<u64, Error> {
95 let dir = self.endpoint_dir(endpoint);
96 match tokio::fs::metadata(&dir).await {
97 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
98 Err(e) => return Err(Error::ReadDir(dir, e)),
99 Ok(_) => {}
100 }
101 let mut read_dir = tokio::fs::read_dir(&dir)
102 .await
103 .map_err(|e| Error::ReadDir(dir.clone(), e))?;
104 let mut count = 0u64;
105 while let Some(entry) = read_dir
106 .next_entry()
107 .await
108 .map_err(|e| Error::ReadDir(dir.clone(), e))?
109 {
110 let path = entry.path();
111 if path.is_file() {
112 tokio::fs::remove_file(&path)
113 .await
114 .map_err(|e| Error::Read(path, e))?;
115 count += 1;
116 }
117 }
118 Ok(count)
119 }
120
121 pub async fn clear_agent_completions(&self) -> Result<u64, Error> {
122 self.clear_endpoint("agents/completions").await
123 }
124 pub async fn clear_agent_completion_continuations(&self) -> Result<u64, Error> {
125 self.clear_endpoint("agent/completions/continuation").await
126 }
127 pub async fn clear_agent_completion_messages(&self) -> Result<u64, Error> {
128 self.clear_endpoint("agent/completions/messages").await
129 }
130 pub async fn clear_agent_completion_message_logprobs(&self) -> Result<u64, Error> {
131 self.clear_endpoint("agent/completions/messages/logprobs").await
132 }
133 pub async fn clear_agent_completion_message_images(&self) -> Result<u64, Error> {
134 self.clear_endpoint("agent/completions/messages/image").await
135 }
136 pub async fn clear_agent_completion_message_audio(&self) -> Result<u64, Error> {
137 self.clear_endpoint("agent/completions/messages/audio").await
138 }
139 pub async fn clear_agent_completion_message_video(&self) -> Result<u64, Error> {
140 self.clear_endpoint("agent/completions/messages/video").await
141 }
142 pub async fn clear_agent_completion_message_files(&self) -> Result<u64, Error> {
143 self.clear_endpoint("agent/completions/messages/file").await
144 }
145 pub async fn clear_vector_completions(&self) -> Result<u64, Error> {
146 self.clear_endpoint("vector/completions").await
147 }
148 pub async fn clear_function_executions(&self) -> Result<u64, Error> {
149 self.clear_endpoint("functions/executions").await
150 }
151 pub async fn clear_function_execution_retry_tokens(&self) -> Result<u64, Error> {
152 self.clear_endpoint("functions/executions/retry_token").await
153 }
154 pub async fn clear_function_inventions(&self) -> Result<u64, Error> {
155 self.clear_endpoint("functions/inventions").await
156 }
157 pub async fn clear_function_inventions_recursive(&self) -> Result<u64, Error> {
158 self.clear_endpoint("functions/inventions/recursive").await
159 }
160 pub async fn clear_laboratory_executions(&self) -> Result<u64, Error> {
161 self.clear_endpoint("laboratories/executions").await
162 }
163
164 pub fn write_agent_completion(&self) -> super::LogWriter<crate::agent::completions::response::streaming::AgentCompletionChunk> {
167 super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
168 }
169 pub fn write_vector_completion(&self) -> super::LogWriter<crate::vector::completions::response::streaming::VectorCompletionChunk> {
170 super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
171 }
172 pub fn write_function_execution(&self) -> super::LogWriter<crate::functions::executions::response::streaming::FunctionExecutionChunk> {
173 super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
174 }
175 pub fn write_function_invention(&self) -> super::LogWriter<crate::functions::inventions::response::streaming::FunctionInventionChunk> {
176 super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
177 }
178 pub fn write_function_invention_recursive(&self) -> super::LogWriter<crate::functions::inventions::recursive::response::streaming::FunctionInventionRecursiveChunk> {
179 super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
180 }
181 pub fn write_laboratory_execution(&self) -> super::LogWriter<crate::laboratories::executions::response::streaming::LaboratoryExecutionChunk> {
182 super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
183 }
184
185 async fn read_json(&self, dir: &str, stem: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
188 let full = self.logs_dir().join(dir).join(format!("{stem}.json"));
189 let bytes = tokio::fs::read(&full)
190 .await
191 .map_err(|e| Error::Read(full.clone(), e))?;
192 let value: serde_json::Value =
193 serde_json::from_slice(&bytes).map_err(|e| Error::Parse(full, e))?;
194 apply_jq(value, jq)
195 }
196
197 async fn read_data_url_by_stem(&self, dir: &str, stem: &str) -> Result<String, Error> {
200 use base64::Engine;
201 let dir_path = self.logs_dir().join(dir);
202 let prefix = format!("{stem}.");
203 let mut read_dir = tokio::fs::read_dir(&dir_path)
204 .await
205 .map_err(|e| Error::ReadDir(dir_path.clone(), e))?;
206 while let Some(entry) = read_dir
207 .next_entry()
208 .await
209 .map_err(|e| Error::ReadDir(dir_path.clone(), e))?
210 {
211 let path = entry.path();
212 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
213 if name.starts_with(&prefix) {
214 let bytes = tokio::fs::read(&path)
215 .await
216 .map_err(|e| Error::Read(path.clone(), e))?;
217 let mime = mime_guess::from_path(&path)
218 .first_or_octet_stream()
219 .to_string();
220 let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
221 return Ok(format!("data:{mime};base64,{b64}"));
222 }
223 }
224 }
225 Err(Error::Read(
226 dir_path.join(&prefix),
227 std::io::Error::new(std::io::ErrorKind::NotFound, "no matching file"),
228 ))
229 }
230
231 pub async fn read_agent_completion(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
232 self.read_json("agents/completions", id, jq).await
233 }
234 pub async fn read_agent_completion_continuation(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
235 self.read_json("agents/completions/continuation", id, jq).await
236 }
237 pub async fn read_agent_completion_message(&self, id: &str, message_index: u64, jq: Option<&str>) -> Result<serde_json::Value, Error> {
238 self.read_json("agents/completions/messages", &format!("{id}_{message_index}"), jq).await
239 }
240 pub async fn read_agent_completion_message_logprobs(&self, id: &str, message_index: u64, jq: Option<&str>) -> Result<serde_json::Value, Error> {
241 self.read_json("agents/completions/messages/logprobs", &format!("{id}_{message_index}"), jq).await
242 }
243 pub async fn read_agent_completion_message_image(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
244 self.read_data_url_by_stem("agents/completions/messages/image", &format!("{id}_{message_index}_{media_index}")).await
245 }
246 pub async fn read_agent_completion_message_audio(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
247 self.read_data_url_by_stem("agents/completions/messages/audio", &format!("{id}_{message_index}_{media_index}")).await
248 }
249 pub async fn read_agent_completion_message_video(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
250 self.read_data_url_by_stem("agents/completions/messages/video", &format!("{id}_{message_index}_{media_index}")).await
251 }
252 pub async fn read_agent_completion_message_file(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
253 self.read_data_url_by_stem("agents/completions/messages/file", &format!("{id}_{message_index}_{media_index}")).await
254 }
255 pub async fn read_vector_completion(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
256 self.read_json("vector/completions", id, jq).await
257 }
258 pub async fn read_function_execution(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
259 self.read_json("functions/executions", id, jq).await
260 }
261 pub async fn read_function_execution_retry_token(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
262 self.read_json("functions/executions/retry_token", id, jq).await
263 }
264 pub async fn read_function_invention(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
265 self.read_json("functions/inventions", id, jq).await
266 }
267 pub async fn read_function_invention_recursive(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
268 self.read_json("functions/inventions/recursive", id, jq).await
269 }
270 pub async fn read_laboratory_execution(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
271 self.read_json("laboratories/executions", id, jq).await
272 }
273
274 async fn subscribe_json(
281 &self,
282 dir: &str,
283 stem: &str,
284 timeout: std::time::Duration,
285 require_modification: bool,
286 jq: Option<&str>,
287 ) -> Result<Option<serde_json::Value>, Error> {
288 let full = self.logs_dir().join(dir).join(format!("{stem}.json"));
289 if poll_file(&full, timeout, require_modification).await.is_none() {
290 return Ok(None);
291 }
292 let bytes = match tokio::fs::read(&full).await {
293 Ok(b) => b,
294 Err(_) => return Ok(None),
295 };
296 let value: serde_json::Value = match serde_json::from_slice(&bytes) {
297 Ok(v) => v,
298 Err(_) => return Ok(None),
299 };
300 apply_jq(value, jq).map(Some)
301 }
302
303 async fn subscribe_data_url_by_stem(
308 &self,
309 dir: &str,
310 stem: &str,
311 timeout: std::time::Duration,
312 require_modification: bool,
313 ) -> Option<String> {
314 use base64::Engine;
315 let dir_path = self.logs_dir().join(dir);
316 let prefix = format!("{stem}.");
317
318 let deadline = tokio::time::Instant::now() + timeout;
319 let initial_mtime = find_file_mtime_by_prefix(&dir_path, &prefix).await;
320
321 if !require_modification {
322 if let Some((path, _)) = &initial_mtime {
323 let bytes = tokio::fs::read(path).await.ok()?;
324 let mime = mime_guess::from_path(path).first_or_octet_stream().to_string();
325 let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
326 return Some(format!("data:{mime};base64,{b64}"));
327 }
328 }
329
330 loop {
331 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
332 if tokio::time::Instant::now() >= deadline {
333 return None;
334 }
335
336 let current_mtime = find_file_mtime_by_prefix(&dir_path, &prefix).await;
337 match (&initial_mtime, ¤t_mtime) {
338 (None, Some((path, _))) => {
339 let bytes = tokio::fs::read(path).await.ok()?;
340 let mime = mime_guess::from_path(path).first_or_octet_stream().to_string();
341 let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
342 return Some(format!("data:{mime};base64,{b64}"));
343 }
344 (Some((_, old_t)), Some((path, new_t))) if new_t > old_t => {
345 let bytes = tokio::fs::read(path).await.ok()?;
346 let mime = mime_guess::from_path(path).first_or_octet_stream().to_string();
347 let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
348 return Some(format!("data:{mime};base64,{b64}"));
349 }
350 (Some(_), None) => return None,
351 _ => continue,
352 }
353 }
354 }
355
356 pub async fn subscribe_agent_completion(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
357 self.subscribe_json("agents/completions", id, timeout, require_modification, jq).await
358 }
359 pub async fn subscribe_agent_completion_continuation(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
360 self.subscribe_json("agents/completions/continuation", id, timeout, require_modification, jq).await
361 }
362 pub async fn subscribe_agent_completion_message(&self, id: &str, message_index: u64, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
363 self.subscribe_json("agents/completions/messages", &format!("{id}_{message_index}"), timeout, require_modification, jq).await
364 }
365 pub async fn subscribe_agent_completion_message_logprobs(&self, id: &str, message_index: u64, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
366 self.subscribe_json("agents/completions/messages/logprobs", &format!("{id}_{message_index}"), timeout, require_modification, jq).await
367 }
368 pub async fn subscribe_agent_completion_message_image(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
369 self.subscribe_data_url_by_stem("agents/completions/messages/image", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
370 }
371 pub async fn subscribe_agent_completion_message_audio(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
372 self.subscribe_data_url_by_stem("agents/completions/messages/audio", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
373 }
374 pub async fn subscribe_agent_completion_message_video(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
375 self.subscribe_data_url_by_stem("agents/completions/messages/video", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
376 }
377 pub async fn subscribe_agent_completion_message_file(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
378 self.subscribe_data_url_by_stem("agents/completions/messages/file", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
379 }
380 pub async fn subscribe_vector_completion(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
381 self.subscribe_json("vector/completions", id, timeout, require_modification, jq).await
382 }
383 pub async fn subscribe_function_execution(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
384 self.subscribe_json("functions/executions", id, timeout, require_modification, jq).await
385 }
386 pub async fn subscribe_function_execution_retry_token(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
387 self.subscribe_json("functions/executions/retry_token", id, timeout, require_modification, jq).await
388 }
389 pub async fn subscribe_function_invention(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
390 self.subscribe_json("functions/inventions", id, timeout, require_modification, jq).await
391 }
392 pub async fn subscribe_function_invention_recursive(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
393 self.subscribe_json("functions/inventions/recursive", id, timeout, require_modification, jq).await
394 }
395 pub async fn subscribe_laboratory_execution(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
396 self.subscribe_json("laboratories/executions", id, timeout, require_modification, jq).await
397 }
398}
399
400fn apply_jq(value: serde_json::Value, jq: Option<&str>) -> Result<serde_json::Value, Error> {
407 let Some(filter) = jq else { return Ok(value); };
408 let mut results = super::super::run_jq(&value, filter)?;
409 Ok(match results.len() {
410 0 => serde_json::Value::Null,
411 1 => results.remove(0),
412 _ => serde_json::Value::Array(results),
413 })
414}
415
416async fn poll_file(
420 path: &std::path::Path,
421 timeout: std::time::Duration,
422 require_modification: bool,
423) -> Option<()> {
424 let deadline = tokio::time::Instant::now() + timeout;
425 let initial_mtime = file_mtime(path).await;
426
427 if !require_modification && initial_mtime.is_some() {
428 return Some(());
429 }
430
431 loop {
432 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
433 if tokio::time::Instant::now() >= deadline {
434 return None;
435 }
436
437 let current_mtime = file_mtime(path).await;
438 match (&initial_mtime, ¤t_mtime) {
439 (None, Some(_)) => return Some(()),
440 (Some(old), Some(new)) if new > old => return Some(()),
441 (Some(_), None) => return None,
442 _ => continue,
443 }
444 }
445}
446
447async fn file_mtime(path: &std::path::Path) -> Option<std::time::SystemTime> {
448 tokio::fs::metadata(path).await.ok()?.modified().ok()
449}
450
451async fn find_file_mtime_by_prefix(
452 dir: &std::path::Path,
453 prefix: &str,
454) -> Option<(std::path::PathBuf, std::time::SystemTime)> {
455 let mut read_dir = tokio::fs::read_dir(dir).await.ok()?;
456 while let Some(entry) = read_dir.next_entry().await.ok()? {
457 let path = entry.path();
458 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
459 if name.starts_with(prefix) {
460 let mtime = tokio::fs::metadata(&path).await.ok()?.modified().ok()?;
461 return Some((path, mtime));
462 }
463 }
464 }
465 None
466}