Skip to main content

objectiveai_sdk/cli/command/agents/logs/read/all/
mod.rs

1//! `agents logs read all` — fetch every log row for a target AIH,
2//! coalesced into [`ResponseItem`] blocks.
3//!
4//! Each yielded `ResponseItem` is either a single-row request blob
5//! (`AgentCompletionRequest` / `VectorCompletionRequest` /
6//! `FunctionExecutionRequest`) or a multi-row block
7//! (`ClientNotification` / `AssistantResponse` / `ToolResponse`)
8//! formed by joining consecutive `logs.messages` rows that share
9//! `(block_class, agent_instance_hierarchy, response_id)`.
10//!
11//! `response_id` is carried on every variant — for the three
12//! request-blob variants it's the chunk's own id, for the three
13//! block variants it's the immediately-enclosing
14//! agent-completion's id (even when the underlying rows were
15//! emitted inside a function execution or vector completion
16//! chunk).
17
18use std::str::FromStr;
19
20use crate::cli::command::CommandRequest;
21use crate::cli::command::path_ref::tokenize;
22
23/// One queue-read target. Either direct `(parent, instance)` (parent
24/// defaults to the cli's own `Config.agent_instance_hierarchy` when
25/// omitted), a tag name the cli resolves at handler time, or `me`
26/// (the caller's own `Config.agent_instance_hierarchy`, with no child
27/// leaf appended). Shared with `agents read pending` via re-export.
28///
29/// Docker-style `key=value,key=value` wire form on the CLI:
30///   `--target instance=L`           (direct; parent defaults to ctx)
31///   `--target instance=L,parent=P`  (direct; explicit parent)
32///   `--target tag=T`                (tag; cli resolves via the tags tier)
33///   `--target me`                   (the caller's own AIH; bare valueless key)
34#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
35#[serde(tag = "by", rename_all = "snake_case")]
36#[schemars(rename = "cli.command.agents.logs.read.all.Target")]
37pub enum Target {
38    #[schemars(title = "Direct")]
39    Direct {
40        /// Optional lineage prefix. `None` ⇒ cli substitutes
41        /// `Config.agent_instance_hierarchy`.
42        #[serde(default, skip_serializing_if = "Option::is_none")]
43        #[schemars(extend("omitempty" = true))]
44        parent_agent_instance_hierarchy: Option<String>,
45        /// Leaf id of the target agent.
46        agent_instance: String,
47    },
48    #[schemars(title = "Tag")]
49    Tag { agent_tag: String },
50    /// The caller's own `Config.agent_instance_hierarchy`, verbatim.
51    /// CLI wire form is the bare valueless key `me` (docker
52    /// `readonly`-style), mutually exclusive with the other keys.
53    #[schemars(title = "Me")]
54    Me,
55}
56
57impl FromStr for Target {
58    type Err = String;
59    /// Parse a `--target` arg. Accepted forms: the bare valueless key
60    /// `me`; `instance` + optional `parent`; or `tag` alone. `tag` is
61    /// mutually exclusive with the other two keys.
62    fn from_str(s: &str) -> Result<Self, Self::Err> {
63        // Bare valueless key (docker `readonly`-style). Handled before
64        // `tokenize`, which requires every token to be `key=value`.
65        if s.trim() == "me" {
66            return Ok(Target::Me);
67        }
68        let mut tag: Option<String> = None;
69        let mut parent: Option<String> = None;
70        let mut instance: Option<String> = None;
71        for (k, v) in tokenize(s)? {
72            match k {
73                "tag" => tag = Some(v.to_string()),
74                "instance" => instance = Some(v.to_string()),
75                "parent" => parent = Some(v.to_string()),
76                other => return Err(format!("unknown key: {other}")),
77            }
78        }
79        match (tag, instance, parent) {
80            (Some(t), None, None) => Ok(Target::Tag { agent_tag: t }),
81            (Some(_), _, _) => Err(
82                "tag is mutually exclusive with instance and parent".to_string(),
83            ),
84            (None, Some(i), p) => Ok(Target::Direct {
85                parent_agent_instance_hierarchy: p,
86                agent_instance: i,
87            }),
88            (None, None, _) => Err("instance or tag is required".to_string()),
89        }
90    }
91}
92
93impl Target {
94    /// Inverse of [`FromStr::from_str`]: emit the docker-style
95    /// `key=value,key=value` wire form for round-tripping.
96    pub fn into_arg_string(&self) -> String {
97        match self {
98            Target::Me => "me".to_string(),
99            Target::Tag { agent_tag } => format!("tag={agent_tag}"),
100            Target::Direct {
101                parent_agent_instance_hierarchy: None,
102                agent_instance,
103            } => format!("instance={agent_instance}"),
104            Target::Direct {
105                parent_agent_instance_hierarchy: Some(p),
106                agent_instance,
107            } => format!("instance={agent_instance},parent={p}"),
108        }
109    }
110}
111
112#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
113#[schemars(rename = "cli.command.agents.logs.read.all.Request")]
114pub struct Request {
115    pub path_type: Path,
116    pub targets: Vec<Target>,
117    /// Skip rows with `logs.messages."index" <= after_id`. Use the
118    /// highest `id` from a previous page to paginate forward.
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    #[schemars(extend("omitempty" = true))]
121    pub after_id: Option<i64>,
122    /// Cap on rows scanned per target. `None` = unlimited.
123    #[serde(default, skip_serializing_if = "Option::is_none")]
124    #[schemars(extend("omitempty" = true))]
125    pub limit: Option<i64>,
126    pub jq: Option<String>,
127}
128
129#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
130#[schemars(rename = "cli.command.agents.logs.read.all.Path")]
131pub enum Path {
132    #[serde(rename = "agents/logs/read/all")]
133    AgentsLogsReadAll,
134}
135
136impl CommandRequest for Request {
137    fn into_command(&self) -> Vec<String> {
138        let mut argv = vec![
139            "agents".to_string(),
140            "logs".to_string(),
141            "read".to_string(),
142            "all".to_string(),
143        ];
144        for target in &self.targets {
145            argv.push("--target".to_string());
146            argv.push(target.into_arg_string());
147        }
148        if let Some(after_id) = self.after_id {
149            argv.push("--after-id".to_string());
150            argv.push(after_id.to_string());
151        }
152        if let Some(limit) = self.limit {
153            argv.push("--limit".to_string());
154            argv.push(limit.to_string());
155        }
156        if let Some(jq) = &self.jq {
157            argv.push("--jq".to_string());
158            argv.push(jq.clone());
159        }
160        argv
161    }
162}
163
164/// Type tag for one `ClientNotification` part — the table-kind of
165/// the underlying `message_queue_*` content row.
166#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
167#[serde(rename_all = "snake_case")]
168#[schemars(rename = "cli.command.agents.logs.read.all.ClientNotificationPartType")]
169pub enum ClientNotificationPartType {
170    Text,
171    Image,
172    Audio,
173    Video,
174    File,
175}
176
177/// One row inside a `ClientNotification` block — a consumed
178/// `message_queue_contents` entry. `timestamp_queued` is on the
179/// enclosing block (it lives on `message_queue.enqueued_at`, not
180/// per-content); only the per-row consumption timestamp is here.
181#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
182#[schemars(rename = "cli.command.agents.logs.read.all.ClientNotificationPart")]
183pub struct ClientNotificationPart {
184    /// `logs.messages."index"` for this row. Pass to
185    /// `agents logs read id <n>` to fetch the consumed
186    /// `message_queue_contents` body.
187    pub id: i64,
188    /// `logs.messages."timestamp"` — when the receiver consumed
189    /// this content row and the LogWriter committed the
190    /// consumption event.
191    pub timestamp_delivered: i64,
192    pub r#type: ClientNotificationPartType,
193}
194
195/// Type tag for one `AssistantResponse` part — the table-kind of
196/// the underlying `assistant_response_*` row.
197#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
198#[serde(rename_all = "snake_case")]
199#[schemars(rename = "cli.command.agents.logs.read.all.AssistantResponsePartType")]
200pub enum AssistantResponsePartType {
201    Refusal,
202    Reasoning,
203    ToolCall,
204    Text,
205    Image,
206    Audio,
207    Video,
208    File,
209}
210
211/// One row inside an `AssistantResponse` block.
212#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
213#[schemars(rename = "cli.command.agents.logs.read.all.AssistantResponsePart")]
214pub struct AssistantResponsePart {
215    /// `logs.messages."index"` for this row. Pass to
216    /// `agents logs read id <n>` for the typed body.
217    pub id: i64,
218    pub timestamp_delivered: i64,
219    pub r#type: AssistantResponsePartType,
220    /// `function.name` for `type = tool_call` rows
221    /// (`logs.assistant_response_tool_calls.function_name`).
222    /// Empty string for non-tool-call rows. Surfaced here so
223    /// callers can dedupe tool calls by name without a per-row
224    /// `agents logs read id` round-trip.
225    pub function_name: String,
226}
227
228/// Type tag for one `ToolResponse` part. `Container` is the
229/// `tool_response` head row (carries `tool_call_id`); the other
230/// five are content slots.
231#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
232#[serde(rename_all = "snake_case")]
233#[schemars(rename = "cli.command.agents.logs.read.all.ToolResponsePartType")]
234pub enum ToolResponsePartType {
235    Container,
236    Text,
237    Image,
238    Audio,
239    Video,
240    File,
241}
242
243/// One row inside a `ToolResponse` block.
244#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
245#[schemars(rename = "cli.command.agents.logs.read.all.ToolResponsePart")]
246pub struct ToolResponsePart {
247    /// `logs.messages."index"` for this row. Pass to
248    /// `agents logs read id <n>` for the typed body.
249    pub id: i64,
250    pub timestamp_delivered: i64,
251    pub r#type: ToolResponsePartType,
252}
253
254/// One yielded item. Three single-row request blobs +
255/// three multi-row blocks. Every variant carries `response_id`.
256/// `sender_agent_instance_hierarchy` appears only on the four
257/// variants that have a sender ≠ producer: the three request
258/// variants (caller AIH) and `ClientNotification` (enqueuer
259/// AIH). `AssistantResponse` and `ToolResponse` are emitted BY
260/// the agent itself — their `agent_instance_hierarchy` IS the
261/// producer, so no separate sender field exists.
262///
263/// Block-coalescing boundary tuple: `(class,
264/// agent_instance_hierarchy, response_id)` for assistant/tool
265/// blocks; `(class, agent_instance_hierarchy, response_id,
266/// sender, message_queue_id)` for `ClientNotification` blocks.
267/// One `ClientNotification` block = one consumed
268/// `message_queue` parent row, so `timestamp_queued` and
269/// `sender_agent_instance_hierarchy` are well-defined
270/// block-level.
271#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
272#[serde(tag = "type", rename_all = "snake_case")]
273#[schemars(rename = "cli.command.agents.logs.read.all.ResponseItem")]
274pub enum ResponseItem {
275    #[schemars(title = "AgentCompletionRequest")]
276    AgentCompletionRequest {
277        id: i64,
278        agent_instance_hierarchy: String,
279        /// AIH of the caller who issued the request — from
280        /// `logs.agent_completion_requests.sender_*`.
281        sender_agent_instance_hierarchy: String,
282        timestamp_delivered: i64,
283        response_id: String,
284    },
285    #[schemars(title = "VectorCompletionRequest")]
286    VectorCompletionRequest {
287        id: i64,
288        agent_instance_hierarchy: String,
289        sender_agent_instance_hierarchy: String,
290        timestamp_delivered: i64,
291        response_id: String,
292    },
293    #[schemars(title = "FunctionExecutionRequest")]
294    FunctionExecutionRequest {
295        id: i64,
296        agent_instance_hierarchy: String,
297        sender_agent_instance_hierarchy: String,
298        timestamp_delivered: i64,
299        response_id: String,
300    },
301    #[schemars(title = "ClientNotification")]
302    ClientNotification {
303        agent_instance_hierarchy: String,
304        /// AIH of the enqueuer — from `message_queue.sender_*`
305        /// joined through `message_queue_contents.id`.
306        sender_agent_instance_hierarchy: String,
307        response_id: String,
308        /// `message_queue.enqueued_at` of the consumed parent
309        /// queue row. One block = one parent queue row, so this
310        /// is well-defined block-level (each part's individual
311        /// `timestamp_delivered` still records its own
312        /// consumption moment).
313        timestamp_queued: i64,
314        /// Idempotency token, if the row was enqueued with
315        /// `--key` via `agents message --enqueue-with-key`.
316        /// Surfacing it lets readers attribute a notification
317        /// to a specific enqueue beyond just the sender AIH.
318        #[serde(default, skip_serializing_if = "Option::is_none")]
319        #[schemars(extend("omitempty" = true))]
320        key: Option<String>,
321        parts: Vec<ClientNotificationPart>,
322    },
323    /// Agent emissions — the agent IS the producer of these
324    /// rows, so there's no separate sender. The
325    /// `agent_instance_hierarchy` field IS the sender.
326    #[schemars(title = "AssistantResponse")]
327    AssistantResponse {
328        agent_instance_hierarchy: String,
329        response_id: String,
330        parts: Vec<AssistantResponsePart>,
331    },
332    #[schemars(title = "ToolResponse")]
333    ToolResponse {
334        agent_instance_hierarchy: String,
335        response_id: String,
336        parts: Vec<ToolResponsePart>,
337    },
338}
339
340#[derive(clap::Args)]
341pub struct Args {
342    /// One or more `--target instance=L[,parent=P]` entries. `parent`
343    /// defaults to the cli's own `Config.agent_instance_hierarchy`
344    /// when omitted on an individual target. Also accepts
345    /// `--target tag=T` and `--target me` (the caller's own AIH).
346    #[arg(long = "target", required = true)]
347    pub targets: Vec<String>,
348    /// Skip rows with `logs.messages."index" <= after_id` per target.
349    #[arg(long)]
350    pub after_id: Option<i64>,
351    /// Cap on rows scanned per target.
352    #[arg(long)]
353    pub limit: Option<i64>,
354    /// jq filter applied to the JSON output.
355    #[arg(long)]
356    pub jq: Option<String>,
357}
358
359#[derive(clap::Args)]
360#[command(args_conflicts_with_subcommands = true)]
361pub struct Command {
362    #[command(flatten)]
363    pub args: Args,
364    #[command(subcommand)]
365    pub schema: Option<Schema>,
366}
367
368#[derive(clap::Subcommand)]
369pub enum Schema {
370    /// Emit the JSON Schema for this leaf's `Request` type and exit.
371    RequestSchema(request_schema::Args),
372    /// Emit the JSON Schema for this leaf's `Response` type and exit.
373    ResponseSchema(response_schema::Args),
374}
375
376impl TryFrom<Args> for Request {
377    type Error = crate::cli::command::FromArgsError;
378    fn try_from(args: Args) -> Result<Self, Self::Error> {
379        let targets = args
380            .targets
381            .iter()
382            .map(|s| {
383                s.parse::<Target>().map_err(|msg| {
384                    crate::cli::command::FromArgsError::path_parse("target", msg)
385                })
386            })
387            .collect::<Result<Vec<_>, _>>()?;
388        Ok(Self {
389            path_type: Path::AgentsLogsReadAll,
390            targets,
391            after_id: args.after_id,
392            limit: args.limit,
393            jq: args.jq,
394        })
395    }
396}
397
398#[cfg(feature = "cli-executor")]
399pub async fn execute<E: crate::cli::command::CommandExecutor>(
400    executor: &E,
401    mut request: Request,
402
403        agent_arguments: Option<&crate::cli::command::AgentArguments>,
404    ) -> Result<E::Stream<ResponseItem>, E::Error> {
405    request.jq = None;
406    executor.execute(request, agent_arguments).await
407}
408
409#[cfg(feature = "cli-executor")]
410pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
411    executor: &E,
412    mut request: Request,
413    jq: String,
414
415        agent_arguments: Option<&crate::cli::command::AgentArguments>,
416    ) -> Result<E::Stream<serde_json::Value>, E::Error> {
417    request.jq = Some(jq);
418    executor.execute(request, agent_arguments).await
419}
420
421#[cfg(feature = "mcp")]
422impl crate::cli::command::CommandResponse for ResponseItem {
423    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
424        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
425    }
426}
427
428pub mod request_schema;
429
430
431pub mod response_schema;
432
433#[cfg(test)]
434mod tests {
435    use super::Target;
436
437    #[test]
438    fn me_target_parses_and_round_trips() {
439        assert_eq!("me".parse::<Target>(), Ok(Target::Me));
440        // Surrounding whitespace is trimmed, same as the other forms.
441        assert_eq!("  me  ".parse::<Target>(), Ok(Target::Me));
442        assert_eq!(Target::Me.into_arg_string(), "me");
443    }
444
445    #[test]
446    fn me_is_mutually_exclusive_with_other_keys() {
447        // `me` combined with any other key falls through to the
448        // `key=value` tokenizer and is rejected.
449        assert!("me,instance=x".parse::<Target>().is_err());
450    }
451
452    #[test]
453    fn json_wire_shape_is_by_me() {
454        assert_eq!(
455            serde_json::to_value(Target::Me).unwrap(),
456            serde_json::json!({ "by": "me" }),
457        );
458    }
459}