Skip to main content

objectiveai_sdk/cli/command/agents/logs/read/all/
mod.rs

1//! `agents logs read all` — fetch every log row for a target AIH,
2//! coalesced into [`ResponseItem`] blocks.
3//!
4//! Each yielded `ResponseItem` is either a single-row request blob
5//! (`AgentCompletionRequest` / `VectorCompletionRequest` /
6//! `FunctionExecutionRequest`) or a multi-row block
7//! (`ClientNotification` / `AssistantResponse` / `ToolResponse`)
8//! formed by joining consecutive `logs.messages` rows that share
9//! `(block_class, agent_instance_hierarchy, response_id)` —
10//! `ToolResponse` additionally splits per `tool_call_id`.
11//!
12//! `response_id` is carried on every variant — for the three
13//! request-blob variants it's the chunk's own id, for the three
14//! block variants it's the immediately-enclosing
15//! agent-completion's id (even when the underlying rows were
16//! emitted inside a function execution or vector completion
17//! chunk).
18
19use std::str::FromStr;
20
21use crate::cli::command::CommandRequest;
22use crate::cli::command::path_ref::tokenize;
23
24/// One queue-read target. Either direct `(parent, instance)` (parent
25/// defaults to the cli's own `Config.agent_instance_hierarchy` when
26/// omitted), a tag name the cli resolves at handler time, or `me`
27/// (the caller's own `Config.agent_instance_hierarchy`, with no child
28/// leaf appended). Shared with `agents read pending` via re-export.
29///
30/// Docker-style `key=value,key=value` wire form on the CLI:
31///   `--target instance=L`           (direct; parent defaults to ctx)
32///   `--target instance=L,parent=P`  (direct; explicit parent)
33///   `--target tag=T`                (tag; cli resolves via the tags tier)
34///   `--target me`                   (the caller's own AIH; bare valueless key)
35#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
36#[serde(tag = "by", rename_all = "snake_case")]
37#[schemars(rename = "cli.command.agents.logs.read.all.Target")]
38pub enum Target {
39    #[schemars(title = "Direct")]
40    Direct {
41        /// Optional lineage prefix. `None` ⇒ cli substitutes
42        /// `Config.agent_instance_hierarchy`.
43        #[serde(default, skip_serializing_if = "Option::is_none")]
44        #[schemars(extend("omitempty" = true))]
45        parent_agent_instance_hierarchy: Option<String>,
46        /// Leaf id of the target agent.
47        agent_instance: String,
48    },
49    #[schemars(title = "Tag")]
50    Tag { agent_tag: String },
51    /// The caller's own `Config.agent_instance_hierarchy`, verbatim.
52    /// CLI wire form is the bare valueless key `me` (docker
53    /// `readonly`-style), mutually exclusive with the other keys.
54    #[schemars(title = "Me")]
55    Me,
56}
57
58impl FromStr for Target {
59    type Err = String;
60    /// Parse a `--target` arg. Accepted forms: the bare valueless key
61    /// `me`; `instance` + optional `parent`; or `tag` alone. `tag` is
62    /// mutually exclusive with the other two keys.
63    fn from_str(s: &str) -> Result<Self, Self::Err> {
64        // Bare valueless key (docker `readonly`-style). Handled before
65        // `tokenize`, which requires every token to be `key=value`.
66        if s.trim() == "me" {
67            return Ok(Target::Me);
68        }
69        let mut tag: Option<String> = None;
70        let mut parent: Option<String> = None;
71        let mut instance: Option<String> = None;
72        for (k, v) in tokenize(s)? {
73            match k {
74                "tag" => tag = Some(v.to_string()),
75                "instance" => instance = Some(v.to_string()),
76                "parent" => parent = Some(v.to_string()),
77                other => return Err(format!("unknown key: {other}")),
78            }
79        }
80        match (tag, instance, parent) {
81            (Some(t), None, None) => Ok(Target::Tag { agent_tag: t }),
82            (Some(_), _, _) => Err(
83                "tag is mutually exclusive with instance and parent".to_string(),
84            ),
85            (None, Some(i), p) => Ok(Target::Direct {
86                parent_agent_instance_hierarchy: p,
87                agent_instance: i,
88            }),
89            (None, None, _) => Err("instance or tag is required".to_string()),
90        }
91    }
92}
93
94impl Target {
95    /// Inverse of [`FromStr::from_str`]: emit the docker-style
96    /// `key=value,key=value` wire form for round-tripping.
97    pub fn into_arg_string(&self) -> String {
98        match self {
99            Target::Me => "me".to_string(),
100            Target::Tag { agent_tag } => format!("tag={agent_tag}"),
101            Target::Direct {
102                parent_agent_instance_hierarchy: None,
103                agent_instance,
104            } => format!("instance={agent_instance}"),
105            Target::Direct {
106                parent_agent_instance_hierarchy: Some(p),
107                agent_instance,
108            } => format!("instance={agent_instance},parent={p}"),
109        }
110    }
111}
112
113#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
114#[schemars(rename = "cli.command.agents.logs.read.all.Request")]
115pub struct Request {
116    pub path_type: Path,
117    pub targets: Vec<Target>,
118    /// Skip rows with `logs.messages."index" <= after_id`. Use the
119    /// highest `id` from a previous page to paginate forward.
120    #[serde(default, skip_serializing_if = "Option::is_none")]
121    #[schemars(extend("omitempty" = true))]
122    pub after_id: Option<i64>,
123    /// Cap on rows scanned per target. `None` = unlimited.
124    #[serde(default, skip_serializing_if = "Option::is_none")]
125    #[schemars(extend("omitempty" = true))]
126    pub limit: Option<i64>,
127    #[serde(flatten)]
128    pub base: crate::cli::command::RequestBase,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
132#[schemars(rename = "cli.command.agents.logs.read.all.Path")]
133pub enum Path {
134    #[serde(rename = "agents/logs/read/all")]
135    AgentsLogsReadAll,
136}
137
138impl CommandRequest for Request {
139    fn into_command(&self) -> Vec<String> {
140        let mut argv = vec![
141            "agents".to_string(),
142            "logs".to_string(),
143            "read".to_string(),
144            "all".to_string(),
145        ];
146        for target in &self.targets {
147            argv.push("--target".to_string());
148            argv.push(target.into_arg_string());
149        }
150        if let Some(after_id) = self.after_id {
151            argv.push("--after-id".to_string());
152            argv.push(after_id.to_string());
153        }
154        if let Some(limit) = self.limit {
155            argv.push("--limit".to_string());
156            argv.push(limit.to_string());
157        }
158        self.base.push_flags(&mut argv);
159        argv
160    }
161
162    fn request_base(&self) -> &crate::cli::command::RequestBase {
163        &self.base
164    }
165
166    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
167        Some(&mut self.base)
168    }
169}
170
171/// Type tag for one `ClientNotification` part — the table-kind of
172/// the underlying `message_queue_*` content row.
173#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
174#[serde(rename_all = "snake_case")]
175#[schemars(rename = "cli.command.agents.logs.read.all.ClientNotificationPartType")]
176pub enum ClientNotificationPartType {
177    Text,
178    Image,
179    Audio,
180    Video,
181    File,
182}
183
184/// One row inside a `ClientNotification` block — a consumed
185/// `message_queue_contents` entry. `queued_at` is on the
186/// enclosing block (it lives on `message_queue.enqueued_at`, not
187/// per-content); only the per-row consumption timestamp is here.
188#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
189#[schemars(rename = "cli.command.agents.logs.read.all.ClientNotificationPart")]
190pub struct ClientNotificationPart {
191    /// `logs.messages."index"` for this row. Pass to
192    /// `agents logs read id <n>` to fetch the consumed
193    /// `message_queue_contents` body.
194    pub id: i64,
195    /// `logs.messages."timestamp"` — when the receiver consumed
196    /// this content row and the LogWriter committed the
197    /// consumption event.
198    pub delivered_at: String,
199    pub r#type: ClientNotificationPartType,
200}
201
202/// One row inside an `AssistantResponse` block, tagged by the
203/// table-kind of the underlying `assistant_response_*` row.
204///
205/// The `ToolCall` variant inlines the call's metadata
206/// (`function_name` / `tool_call_id` / `tool_call_index`) so callers
207/// can dedupe and correlate without a per-row round-trip; its `id`
208/// addresses the same `assistant_response_tool_calls` row, which
209/// `agents logs read id <id>` returns as the call's `arguments`
210/// (text). Every other variant carries only `id` (the
211/// `logs.messages."index"` to pass to `agents logs read id`) and the
212/// delivery timestamp.
213#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
214#[serde(tag = "type", rename_all = "snake_case")]
215#[schemars(rename = "cli.command.agents.logs.read.all.AssistantResponsePart")]
216pub enum AssistantResponsePart {
217    #[schemars(title = "ToolCall")]
218    ToolCall {
219        /// `logs.messages."index"` for the tool-call row. Pass to
220        /// `agents logs read id <n>` to read the call's `arguments`
221        /// as text.
222        id: i64,
223        delivered_at: String,
224        /// `objectiveai.assistant_response_tool_calls.function_name`.
225        function_name: String,
226        /// The wire tool-call id this row carries.
227        tool_call_id: String,
228        /// The tool call's wire index within the assistant message's
229        /// `tool_calls[]`.
230        tool_call_index: i64,
231    },
232    #[schemars(title = "Refusal")]
233    Refusal { id: i64, delivered_at: String },
234    #[schemars(title = "Reasoning")]
235    Reasoning { id: i64, delivered_at: String },
236    #[schemars(title = "Text")]
237    Text { id: i64, delivered_at: String },
238    #[schemars(title = "Image")]
239    Image { id: i64, delivered_at: String },
240    #[schemars(title = "Audio")]
241    Audio { id: i64, delivered_at: String },
242    #[schemars(title = "Video")]
243    Video { id: i64, delivered_at: String },
244    #[schemars(title = "File")]
245    File { id: i64, delivered_at: String },
246}
247
248/// Type tag for one `ToolResponse` part — the table-kind of the
249/// underlying `tool_response_content_*` row. The tool-call linkage
250/// (`tool_call_id`) lives on the enclosing `ResponseItem::ToolResponse`
251/// block, not on the parts.
252#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
253#[serde(rename_all = "snake_case")]
254#[schemars(rename = "cli.command.agents.logs.read.all.ToolResponsePartType")]
255pub enum ToolResponsePartType {
256    Text,
257    Image,
258    Audio,
259    Video,
260    File,
261}
262
263/// One row inside a `ToolResponse` block.
264#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
265#[schemars(rename = "cli.command.agents.logs.read.all.ToolResponsePart")]
266pub struct ToolResponsePart {
267    /// `logs.messages."index"` for this row. Pass to
268    /// `agents logs read id <n>` for the typed body.
269    pub id: i64,
270    pub delivered_at: String,
271    pub r#type: ToolResponsePartType,
272}
273
274/// One yielded item. Three single-row request blobs +
275/// three multi-row blocks. Every variant carries `response_id`.
276/// `sender_agent_instance_hierarchy` appears only on the four
277/// variants that have a sender ≠ producer: the three request
278/// variants (caller AIH) and `ClientNotification` (enqueuer
279/// AIH). `AssistantResponse` and `ToolResponse` are emitted BY
280/// the agent itself — their `agent_instance_hierarchy` IS the
281/// producer, so no separate sender field exists.
282///
283/// Block-coalescing boundary tuple: `(class,
284/// agent_instance_hierarchy, response_id)` for `AssistantResponse`;
285/// `(class, agent_instance_hierarchy, response_id, tool_call_id)`
286/// for `ToolResponse` (one block per tool call); `(class,
287/// agent_instance_hierarchy, response_id, sender, message_queue_id)`
288/// for `ClientNotification` blocks.
289/// One `ClientNotification` block = one consumed
290/// `message_queue` parent row, so `queued_at` and
291/// `sender_agent_instance_hierarchy` are well-defined
292/// block-level.
293#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
294#[serde(tag = "type", rename_all = "snake_case")]
295#[schemars(rename = "cli.command.agents.logs.read.all.ResponseItem")]
296pub enum ResponseItem {
297    #[schemars(title = "AgentCompletionRequest")]
298    AgentCompletionRequest {
299        id: i64,
300        agent_instance_hierarchy: String,
301        /// AIH of the caller who issued the request — from
302        /// `logs.agent_completion_requests.sender_*`.
303        sender_agent_instance_hierarchy: String,
304        delivered_at: String,
305        response_id: String,
306    },
307    #[schemars(title = "VectorCompletionRequest")]
308    VectorCompletionRequest {
309        id: i64,
310        agent_instance_hierarchy: String,
311        sender_agent_instance_hierarchy: String,
312        delivered_at: String,
313        response_id: String,
314    },
315    #[schemars(title = "FunctionExecutionRequest")]
316    FunctionExecutionRequest {
317        id: i64,
318        agent_instance_hierarchy: String,
319        sender_agent_instance_hierarchy: String,
320        delivered_at: String,
321        response_id: String,
322    },
323    #[schemars(title = "ClientNotification")]
324    ClientNotification {
325        agent_instance_hierarchy: String,
326        /// AIH of the enqueuer — from `message_queue.sender_*`
327        /// joined through `message_queue_contents.id`.
328        sender_agent_instance_hierarchy: String,
329        response_id: String,
330        /// `message_queue.enqueued_at` of the consumed parent
331        /// queue row. One block = one parent queue row, so this
332        /// is well-defined block-level (each part's individual
333        /// `delivered_at` still records its own
334        /// consumption moment).
335        queued_at: String,
336        /// Idempotency token, if the row was enqueued with
337        /// `--key` via `agents message --enqueue-with-key`.
338        /// Surfacing it lets readers attribute a notification
339        /// to a specific enqueue beyond just the sender AIH.
340        #[serde(default, skip_serializing_if = "Option::is_none")]
341        #[schemars(extend("omitempty" = true))]
342        key: Option<String>,
343        parts: Vec<ClientNotificationPart>,
344    },
345    /// Agent emissions — the agent IS the producer of these
346    /// rows, so there's no separate sender. The
347    /// `agent_instance_hierarchy` field IS the sender.
348    #[schemars(title = "AssistantResponse")]
349    AssistantResponse {
350        agent_instance_hierarchy: String,
351        response_id: String,
352        parts: Vec<AssistantResponsePart>,
353    },
354    /// One tool call's response. Blocks are grouped per
355    /// `tool_call_id` (in addition to `agent_instance_hierarchy` +
356    /// `response_id`), so two responses in the same turn yield two
357    /// blocks.
358    #[schemars(title = "ToolResponse")]
359    ToolResponse {
360        agent_instance_hierarchy: String,
361        response_id: String,
362        /// The wire tool-call id this response answers.
363        tool_call_id: String,
364        parts: Vec<ToolResponsePart>,
365    },
366}
367
368#[derive(clap::Args)]
369pub struct Args {
370    /// One or more `--target instance=L[,parent=P]` entries. `parent`
371    /// defaults to the cli's own `Config.agent_instance_hierarchy`
372    /// when omitted on an individual target. Also accepts
373    /// `--target tag=T` and `--target me` (the caller's own AIH).
374    #[arg(long = "target", required = true)]
375    pub targets: Vec<String>,
376    /// Skip rows with `logs.messages."index" <= after_id` per target.
377    #[arg(long)]
378    pub after_id: Option<i64>,
379    /// Cap on rows scanned per target.
380    #[arg(long)]
381    pub limit: Option<i64>,
382    #[command(flatten)]
383    pub base: crate::cli::command::RequestBaseArgs,
384}
385
386#[derive(clap::Args)]
387#[command(args_conflicts_with_subcommands = true)]
388pub struct Command {
389    #[command(flatten)]
390    pub args: Args,
391    #[command(subcommand)]
392    pub schema: Option<Schema>,
393}
394
395#[derive(clap::Subcommand)]
396pub enum Schema {
397    /// Emit the JSON Schema for this leaf's `Request` type and exit.
398    RequestSchema(request_schema::Args),
399    /// Emit the JSON Schema for this leaf's `Response` type and exit.
400    ResponseSchema(response_schema::Args),
401}
402
403impl TryFrom<Args> for Request {
404    type Error = crate::cli::command::FromArgsError;
405    fn try_from(args: Args) -> Result<Self, Self::Error> {
406        let targets = args
407            .targets
408            .iter()
409            .map(|s| {
410                s.parse::<Target>().map_err(|msg| {
411                    crate::cli::command::FromArgsError::path_parse("target", msg)
412                })
413            })
414            .collect::<Result<Vec<_>, _>>()?;
415        Ok(Self {
416            path_type: Path::AgentsLogsReadAll,
417            targets,
418            after_id: args.after_id,
419            limit: args.limit,
420            base: args.base.into(),
421        })
422    }
423}
424
425#[cfg(feature = "cli-executor")]
426pub async fn execute<E: crate::cli::command::CommandExecutor>(
427    executor: &E,
428    mut request: Request,
429
430        agent_arguments: Option<&crate::cli::command::AgentArguments>,
431    ) -> Result<E::Stream<ResponseItem>, E::Error> {
432    request.base.clear_transform();
433    executor.execute(request, agent_arguments).await
434}
435
436#[cfg(feature = "cli-executor")]
437pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
438    executor: &E,
439    mut request: Request,
440    transform: crate::cli::command::Transform,
441
442        agent_arguments: Option<&crate::cli::command::AgentArguments>,
443    ) -> Result<E::Stream<serde_json::Value>, E::Error> {
444    request.base.set_transform(transform);
445    executor.execute(request, agent_arguments).await
446}
447
448#[cfg(feature = "mcp")]
449impl crate::cli::command::CommandResponse for ResponseItem {
450    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
451        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
452    }
453}
454
455pub mod request_schema;
456
457
458pub mod response_schema;
459
460#[cfg(test)]
461mod tests {
462    use super::Target;
463
464    #[test]
465    fn me_target_parses_and_round_trips() {
466        assert_eq!("me".parse::<Target>(), Ok(Target::Me));
467        // Surrounding whitespace is trimmed, same as the other forms.
468        assert_eq!("  me  ".parse::<Target>(), Ok(Target::Me));
469        assert_eq!(Target::Me.into_arg_string(), "me");
470    }
471
472    #[test]
473    fn me_is_mutually_exclusive_with_other_keys() {
474        // `me` combined with any other key falls through to the
475        // `key=value` tokenizer and is rejected.
476        assert!("me,instance=x".parse::<Target>().is_err());
477    }
478
479    #[test]
480    fn json_wire_shape_is_by_me() {
481        assert_eq!(
482            serde_json::to_value(Target::Me).unwrap(),
483            serde_json::json!({ "by": "me" }),
484        );
485    }
486}