Skip to main content

objectiveai_sdk/filesystem/logs/
client.rs

1use std::path::PathBuf;
2
3use super::super::{Client, Error};
4use super::ListItem;
5
6/// Result of reading a log file — either parsed JSON or a data URL.
7#[derive(Debug)]
8pub enum LogContent {
9    Json(serde_json::Value),
10    /// A `data:{mime};base64,{payload}` string.
11    DataUrl(String),
12}
13
14impl Client {
15    fn endpoint_dir(&self, endpoint: &str) -> PathBuf {
16        let mut dir = self.logs_dir();
17        for segment in endpoint.split('/') {
18            dir = dir.join(segment);
19        }
20        dir
21    }
22
23    async fn list_endpoint(
24        &self,
25        endpoint: &str,
26        offset: usize,
27        limit: usize,
28    ) -> Result<Vec<ListItem>, Error> {
29        let dir = self.endpoint_dir(endpoint);
30        match tokio::fs::metadata(&dir).await {
31            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
32            Err(e) => return Err(Error::ReadDir(dir, e)),
33            Ok(_) => {}
34        }
35        let mut read_dir = tokio::fs::read_dir(&dir)
36            .await
37            .map_err(|e| Error::ReadDir(dir.clone(), e))?;
38        let mut items = Vec::new();
39        while let Some(entry) = read_dir
40            .next_entry()
41            .await
42            .map_err(|e| Error::ReadDir(dir.clone(), e))?
43        {
44            let path = entry.path();
45            if path.extension().and_then(|e| e.to_str()) != Some("json") {
46                continue;
47            }
48            let stem = match path.file_stem().and_then(|s| s.to_str()) {
49                Some(s) => s.to_string(),
50                None => continue,
51            };
52            let metadata = tokio::fs::metadata(&path)
53                .await
54                .map_err(|e| Error::Read(path.clone(), e))?;
55            let created = metadata
56                .modified()
57                .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
58                .duration_since(std::time::SystemTime::UNIX_EPOCH)
59                .unwrap_or_default()
60                .as_secs();
61            items.push(ListItem { id: stem, created });
62        }
63        items.sort_by(|a, b| b.created.cmp(&a.created));
64        if offset > 0 || limit < usize::MAX {
65            items = items.into_iter().skip(offset).take(limit).collect();
66        }
67        Ok(items)
68    }
69
70    // -- List methods --------------------------------------------------------
71
72    pub async fn list_agent_completions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
73        self.list_endpoint("agents/completions", offset, limit).await
74    }
75    pub async fn list_vector_completions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
76        self.list_endpoint("vector/completions", offset, limit).await
77    }
78    pub async fn list_function_executions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
79        self.list_endpoint("functions/executions", offset, limit).await
80    }
81    pub async fn list_function_inventions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
82        self.list_endpoint("functions/inventions", offset, limit).await
83    }
84    pub async fn list_function_inventions_recursive(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
85        self.list_endpoint("functions/inventions/recursive", offset, limit).await
86    }
87    pub async fn list_laboratory_executions(&self, offset: usize, limit: usize) -> Result<Vec<ListItem>, Error> {
88        self.list_endpoint("laboratories/executions", offset, limit).await
89    }
90
91    // -- Clear helpers + methods --------------------------------------------
92
93    /// Deletes all files (not subdirectories) in the given endpoint directory.
94    async fn clear_endpoint(&self, endpoint: &str) -> Result<u64, Error> {
95        let dir = self.endpoint_dir(endpoint);
96        match tokio::fs::metadata(&dir).await {
97            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
98            Err(e) => return Err(Error::ReadDir(dir, e)),
99            Ok(_) => {}
100        }
101        let mut read_dir = tokio::fs::read_dir(&dir)
102            .await
103            .map_err(|e| Error::ReadDir(dir.clone(), e))?;
104        let mut count = 0u64;
105        while let Some(entry) = read_dir
106            .next_entry()
107            .await
108            .map_err(|e| Error::ReadDir(dir.clone(), e))?
109        {
110            let path = entry.path();
111            if path.is_file() {
112                tokio::fs::remove_file(&path)
113                    .await
114                    .map_err(|e| Error::Read(path, e))?;
115                count += 1;
116            }
117        }
118        Ok(count)
119    }
120
121    pub async fn clear_agent_completions(&self) -> Result<u64, Error> {
122        self.clear_endpoint("agents/completions").await
123    }
124    pub async fn clear_agent_completion_continuations(&self) -> Result<u64, Error> {
125        self.clear_endpoint("agent/completions/continuation").await
126    }
127    pub async fn clear_agent_completion_messages(&self) -> Result<u64, Error> {
128        self.clear_endpoint("agent/completions/messages").await
129    }
130    pub async fn clear_agent_completion_message_logprobs(&self) -> Result<u64, Error> {
131        self.clear_endpoint("agent/completions/messages/logprobs").await
132    }
133    pub async fn clear_agent_completion_message_images(&self) -> Result<u64, Error> {
134        self.clear_endpoint("agent/completions/messages/image").await
135    }
136    pub async fn clear_agent_completion_message_audio(&self) -> Result<u64, Error> {
137        self.clear_endpoint("agent/completions/messages/audio").await
138    }
139    pub async fn clear_agent_completion_message_video(&self) -> Result<u64, Error> {
140        self.clear_endpoint("agent/completions/messages/video").await
141    }
142    pub async fn clear_agent_completion_message_files(&self) -> Result<u64, Error> {
143        self.clear_endpoint("agent/completions/messages/file").await
144    }
145    pub async fn clear_vector_completions(&self) -> Result<u64, Error> {
146        self.clear_endpoint("vector/completions").await
147    }
148    pub async fn clear_function_executions(&self) -> Result<u64, Error> {
149        self.clear_endpoint("functions/executions").await
150    }
151    pub async fn clear_function_execution_retry_tokens(&self) -> Result<u64, Error> {
152        self.clear_endpoint("functions/executions/retry_token").await
153    }
154    pub async fn clear_function_inventions(&self) -> Result<u64, Error> {
155        self.clear_endpoint("functions/inventions").await
156    }
157    pub async fn clear_function_inventions_recursive(&self) -> Result<u64, Error> {
158        self.clear_endpoint("functions/inventions/recursive").await
159    }
160    pub async fn clear_laboratory_executions(&self) -> Result<u64, Error> {
161        self.clear_endpoint("laboratories/executions").await
162    }
163
164    // -- Write methods (LogWriter constructors) -----------------------------
165
166    pub fn write_agent_completion(&self) -> super::LogWriter<crate::agent::completions::response::streaming::AgentCompletionChunk> {
167        super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
168    }
169    pub fn write_vector_completion(&self) -> super::LogWriter<crate::vector::completions::response::streaming::VectorCompletionChunk> {
170        super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
171    }
172    pub fn write_function_execution(&self) -> super::LogWriter<crate::functions::executions::response::streaming::FunctionExecutionChunk> {
173        super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
174    }
175    pub fn write_function_invention(&self) -> super::LogWriter<crate::functions::inventions::response::streaming::FunctionInventionChunk> {
176        super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
177    }
178    pub fn write_function_invention_recursive(&self) -> super::LogWriter<crate::functions::inventions::recursive::response::streaming::FunctionInventionRecursiveChunk> {
179        super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
180    }
181    pub fn write_laboratory_execution(&self) -> super::LogWriter<crate::laboratories::executions::response::streaming::LaboratoryExecutionChunk> {
182        super::LogWriter::new(self.logs_dir(), |chunk| chunk.produce_files().map(|(_, files)| files))
183    }
184
185    // -- Read helpers + methods ---------------------------------------------
186
187    async fn read_json(&self, dir: &str, stem: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
188        let full = self.logs_dir().join(dir).join(format!("{stem}.json"));
189        let bytes = tokio::fs::read(&full)
190            .await
191            .map_err(|e| Error::Read(full.clone(), e))?;
192        let value: serde_json::Value =
193            serde_json::from_slice(&bytes).map_err(|e| Error::Parse(full, e))?;
194        apply_jq(value, jq)
195    }
196
197    /// Finds the first file in `dir` whose name starts with `stem.` (any extension)
198    /// and returns it as a data URL.
199    async fn read_data_url_by_stem(&self, dir: &str, stem: &str) -> Result<String, Error> {
200        use base64::Engine;
201        let dir_path = self.logs_dir().join(dir);
202        let prefix = format!("{stem}.");
203        let mut read_dir = tokio::fs::read_dir(&dir_path)
204            .await
205            .map_err(|e| Error::ReadDir(dir_path.clone(), e))?;
206        while let Some(entry) = read_dir
207            .next_entry()
208            .await
209            .map_err(|e| Error::ReadDir(dir_path.clone(), e))?
210        {
211            let path = entry.path();
212            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
213                if name.starts_with(&prefix) {
214                    let bytes = tokio::fs::read(&path)
215                        .await
216                        .map_err(|e| Error::Read(path.clone(), e))?;
217                    let mime = mime_guess::from_path(&path)
218                        .first_or_octet_stream()
219                        .to_string();
220                    let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
221                    return Ok(format!("data:{mime};base64,{b64}"));
222                }
223            }
224        }
225        Err(Error::Read(
226            dir_path.join(&prefix),
227            std::io::Error::new(std::io::ErrorKind::NotFound, "no matching file"),
228        ))
229    }
230
231    pub async fn read_agent_completion(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
232        self.read_json("agents/completions", id, jq).await
233    }
234    pub async fn read_agent_completion_continuation(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
235        self.read_json("agents/completions/continuation", id, jq).await
236    }
237    pub async fn read_agent_completion_message(&self, id: &str, message_index: u64, jq: Option<&str>) -> Result<serde_json::Value, Error> {
238        self.read_json("agents/completions/messages", &format!("{id}_{message_index}"), jq).await
239    }
240    pub async fn read_agent_completion_message_logprobs(&self, id: &str, message_index: u64, jq: Option<&str>) -> Result<serde_json::Value, Error> {
241        self.read_json("agents/completions/messages/logprobs", &format!("{id}_{message_index}"), jq).await
242    }
243    pub async fn read_agent_completion_message_image(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
244        self.read_data_url_by_stem("agents/completions/messages/image", &format!("{id}_{message_index}_{media_index}")).await
245    }
246    pub async fn read_agent_completion_message_audio(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
247        self.read_data_url_by_stem("agents/completions/messages/audio", &format!("{id}_{message_index}_{media_index}")).await
248    }
249    pub async fn read_agent_completion_message_video(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
250        self.read_data_url_by_stem("agents/completions/messages/video", &format!("{id}_{message_index}_{media_index}")).await
251    }
252    pub async fn read_agent_completion_message_file(&self, id: &str, message_index: u64, media_index: u64) -> Result<String, Error> {
253        self.read_data_url_by_stem("agents/completions/messages/file", &format!("{id}_{message_index}_{media_index}")).await
254    }
255    pub async fn read_vector_completion(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
256        self.read_json("vector/completions", id, jq).await
257    }
258    pub async fn read_function_execution(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
259        self.read_json("functions/executions", id, jq).await
260    }
261    pub async fn read_function_execution_retry_token(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
262        self.read_json("functions/executions/retry_token", id, jq).await
263    }
264    pub async fn read_function_invention(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
265        self.read_json("functions/inventions", id, jq).await
266    }
267    pub async fn read_function_invention_recursive(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
268        self.read_json("functions/inventions/recursive", id, jq).await
269    }
270    pub async fn read_laboratory_execution(&self, id: &str, jq: Option<&str>) -> Result<serde_json::Value, Error> {
271        self.read_json("laboratories/executions", id, jq).await
272    }
273
274    // -- Subscribe helpers + methods ----------------------------------------
275
276    /// Polls for a JSON file. If `require_modification` is false, returns
277    /// immediately when the file exists. If true, waits for creation or
278    /// modification. Returns `Ok(None)` on deletion or timeout. When `jq` is
279    /// provided, the result is run through the filter before returning.
280    async fn subscribe_json(
281        &self,
282        dir: &str,
283        stem: &str,
284        timeout: std::time::Duration,
285        require_modification: bool,
286        jq: Option<&str>,
287    ) -> Result<Option<serde_json::Value>, Error> {
288        let full = self.logs_dir().join(dir).join(format!("{stem}.json"));
289        if poll_file(&full, timeout, require_modification).await.is_none() {
290            return Ok(None);
291        }
292        let bytes = match tokio::fs::read(&full).await {
293            Ok(b) => b,
294            Err(_) => return Ok(None),
295        };
296        let value: serde_json::Value = match serde_json::from_slice(&bytes) {
297            Ok(v) => v,
298            Err(_) => return Ok(None),
299        };
300        apply_jq(value, jq).map(Some)
301    }
302
303    /// Polls for a media file (any extension matching `stem.`). If
304    /// `require_modification` is false, returns immediately when the file
305    /// exists. If true, waits for creation or modification. Returns `None`
306    /// on deletion or timeout.
307    async fn subscribe_data_url_by_stem(
308        &self,
309        dir: &str,
310        stem: &str,
311        timeout: std::time::Duration,
312        require_modification: bool,
313    ) -> Option<String> {
314        use base64::Engine;
315        let dir_path = self.logs_dir().join(dir);
316        let prefix = format!("{stem}.");
317
318        let deadline = tokio::time::Instant::now() + timeout;
319        let initial_mtime = find_file_mtime_by_prefix(&dir_path, &prefix).await;
320
321        if !require_modification {
322            if let Some((path, _)) = &initial_mtime {
323                let bytes = tokio::fs::read(path).await.ok()?;
324                let mime = mime_guess::from_path(path).first_or_octet_stream().to_string();
325                let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
326                return Some(format!("data:{mime};base64,{b64}"));
327            }
328        }
329
330        loop {
331            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
332            if tokio::time::Instant::now() >= deadline {
333                return None;
334            }
335
336            let current_mtime = find_file_mtime_by_prefix(&dir_path, &prefix).await;
337            match (&initial_mtime, &current_mtime) {
338                (None, Some((path, _))) => {
339                    let bytes = tokio::fs::read(path).await.ok()?;
340                    let mime = mime_guess::from_path(path).first_or_octet_stream().to_string();
341                    let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
342                    return Some(format!("data:{mime};base64,{b64}"));
343                }
344                (Some((_, old_t)), Some((path, new_t))) if new_t > old_t => {
345                    let bytes = tokio::fs::read(path).await.ok()?;
346                    let mime = mime_guess::from_path(path).first_or_octet_stream().to_string();
347                    let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
348                    return Some(format!("data:{mime};base64,{b64}"));
349                }
350                (Some(_), None) => return None,
351                _ => continue,
352            }
353        }
354    }
355
356    pub async fn subscribe_agent_completion(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
357        self.subscribe_json("agents/completions", id, timeout, require_modification, jq).await
358    }
359    pub async fn subscribe_agent_completion_continuation(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
360        self.subscribe_json("agents/completions/continuation", id, timeout, require_modification, jq).await
361    }
362    pub async fn subscribe_agent_completion_message(&self, id: &str, message_index: u64, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
363        self.subscribe_json("agents/completions/messages", &format!("{id}_{message_index}"), timeout, require_modification, jq).await
364    }
365    pub async fn subscribe_agent_completion_message_logprobs(&self, id: &str, message_index: u64, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
366        self.subscribe_json("agents/completions/messages/logprobs", &format!("{id}_{message_index}"), timeout, require_modification, jq).await
367    }
368    pub async fn subscribe_agent_completion_message_image(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
369        self.subscribe_data_url_by_stem("agents/completions/messages/image", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
370    }
371    pub async fn subscribe_agent_completion_message_audio(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
372        self.subscribe_data_url_by_stem("agents/completions/messages/audio", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
373    }
374    pub async fn subscribe_agent_completion_message_video(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
375        self.subscribe_data_url_by_stem("agents/completions/messages/video", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
376    }
377    pub async fn subscribe_agent_completion_message_file(&self, id: &str, message_index: u64, media_index: u64, timeout: std::time::Duration, require_modification: bool) -> Option<String> {
378        self.subscribe_data_url_by_stem("agents/completions/messages/file", &format!("{id}_{message_index}_{media_index}"), timeout, require_modification).await
379    }
380    pub async fn subscribe_vector_completion(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
381        self.subscribe_json("vector/completions", id, timeout, require_modification, jq).await
382    }
383    pub async fn subscribe_function_execution(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
384        self.subscribe_json("functions/executions", id, timeout, require_modification, jq).await
385    }
386    pub async fn subscribe_function_execution_retry_token(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
387        self.subscribe_json("functions/executions/retry_token", id, timeout, require_modification, jq).await
388    }
389    pub async fn subscribe_function_invention(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
390        self.subscribe_json("functions/inventions", id, timeout, require_modification, jq).await
391    }
392    pub async fn subscribe_function_invention_recursive(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
393        self.subscribe_json("functions/inventions/recursive", id, timeout, require_modification, jq).await
394    }
395    pub async fn subscribe_laboratory_execution(&self, id: &str, timeout: std::time::Duration, require_modification: bool, jq: Option<&str>) -> Result<Option<serde_json::Value>, Error> {
396        self.subscribe_json("laboratories/executions", id, timeout, require_modification, jq).await
397    }
398}
399
400// -- Pure helpers (no &Client) ---------------------------------------------
401
402/// Applies a jq filter to a JSON value, collapsing the multi-result vector
403/// the same way the CLI `config get` command does: a single result is
404/// unwrapped, an empty result becomes JSON null, and multiple results are
405/// wrapped as an array. When `jq` is `None`, the value is returned as-is.
406fn apply_jq(value: serde_json::Value, jq: Option<&str>) -> Result<serde_json::Value, Error> {
407    let Some(filter) = jq else { return Ok(value); };
408    let mut results = super::super::run_jq(&value, filter)?;
409    Ok(match results.len() {
410        0 => serde_json::Value::Null,
411        1 => results.remove(0),
412        _ => serde_json::Value::Array(results),
413    })
414}
415
416/// Polls a specific file path. If `require_modification` is false,
417/// returns immediately when the file exists. If true, waits for
418/// creation or modification. Returns `None` on deletion or timeout.
419async fn poll_file(
420    path: &std::path::Path,
421    timeout: std::time::Duration,
422    require_modification: bool,
423) -> Option<()> {
424    let deadline = tokio::time::Instant::now() + timeout;
425    let initial_mtime = file_mtime(path).await;
426
427    if !require_modification && initial_mtime.is_some() {
428        return Some(());
429    }
430
431    loop {
432        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
433        if tokio::time::Instant::now() >= deadline {
434            return None;
435        }
436
437        let current_mtime = file_mtime(path).await;
438        match (&initial_mtime, &current_mtime) {
439            (None, Some(_)) => return Some(()),
440            (Some(old), Some(new)) if new > old => return Some(()),
441            (Some(_), None) => return None,
442            _ => continue,
443        }
444    }
445}
446
447async fn file_mtime(path: &std::path::Path) -> Option<std::time::SystemTime> {
448    tokio::fs::metadata(path).await.ok()?.modified().ok()
449}
450
451async fn find_file_mtime_by_prefix(
452    dir: &std::path::Path,
453    prefix: &str,
454) -> Option<(std::path::PathBuf, std::time::SystemTime)> {
455    let mut read_dir = tokio::fs::read_dir(dir).await.ok()?;
456    while let Some(entry) = read_dir.next_entry().await.ok()? {
457        let path = entry.path();
458        if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
459            if name.starts_with(prefix) {
460                let mtime = tokio::fs::metadata(&path).await.ok()?.modified().ok()?;
461                return Some((path, mtime));
462            }
463        }
464    }
465    None
466}