Skip to main content

objectiveai_sdk/cli/command/agents/logs/read/all/
mod.rs

1//! `agents logs read all` — fetch every log row for a target AIH,
2//! coalesced into [`ResponseItem`] blocks.
3//!
4//! Each yielded `ResponseItem` is either a single-row request blob
5//! (`AgentCompletionRequest` / `VectorCompletionRequest` /
6//! `FunctionExecutionRequest`) or a multi-row block
7//! (`ClientNotification` / `AssistantResponse` / `ToolResponse`)
8//! formed by joining consecutive `logs.messages` rows that share
9//! `(block_class, agent_instance_hierarchy, response_id)`.
10//!
11//! `response_id` is carried on every variant — for the three
12//! request-blob variants it's the chunk's own id, for the three
13//! block variants it's the immediately-enclosing
14//! agent-completion's id (even when the underlying rows were
15//! emitted inside a function execution or vector completion
16//! chunk).
17
18use std::str::FromStr;
19
20use crate::cli::command::CommandRequest;
21use crate::cli::command::path_ref::tokenize;
22
23/// One queue-read target. Either direct `(parent, instance)` (parent
24/// defaults to the cli's own `Config.agent_instance_hierarchy` when
25/// omitted), a tag name the cli resolves at handler time, or `me`
26/// (the caller's own `Config.agent_instance_hierarchy`, with no child
27/// leaf appended). Shared with `agents read pending` via re-export.
28///
29/// Docker-style `key=value,key=value` wire form on the CLI:
30///   `--target instance=L`           (direct; parent defaults to ctx)
31///   `--target instance=L,parent=P`  (direct; explicit parent)
32///   `--target tag=T`                (tag; cli resolves via the tags tier)
33///   `--target me`                   (the caller's own AIH; bare valueless key)
34#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
35#[serde(tag = "by", rename_all = "snake_case")]
36#[schemars(rename = "cli.command.agents.logs.read.all.Target")]
37pub enum Target {
38    #[schemars(title = "Direct")]
39    Direct {
40        /// Optional lineage prefix. `None` ⇒ cli substitutes
41        /// `Config.agent_instance_hierarchy`.
42        #[serde(default, skip_serializing_if = "Option::is_none")]
43        #[schemars(extend("omitempty" = true))]
44        parent_agent_instance_hierarchy: Option<String>,
45        /// Leaf id of the target agent.
46        agent_instance: String,
47    },
48    #[schemars(title = "Tag")]
49    Tag { agent_tag: String },
50    /// The caller's own `Config.agent_instance_hierarchy`, verbatim.
51    /// CLI wire form is the bare valueless key `me` (docker
52    /// `readonly`-style), mutually exclusive with the other keys.
53    #[schemars(title = "Me")]
54    Me,
55}
56
57impl FromStr for Target {
58    type Err = String;
59    /// Parse a `--target` arg. Accepted forms: the bare valueless key
60    /// `me`; `instance` + optional `parent`; or `tag` alone. `tag` is
61    /// mutually exclusive with the other two keys.
62    fn from_str(s: &str) -> Result<Self, Self::Err> {
63        // Bare valueless key (docker `readonly`-style). Handled before
64        // `tokenize`, which requires every token to be `key=value`.
65        if s.trim() == "me" {
66            return Ok(Target::Me);
67        }
68        let mut tag: Option<String> = None;
69        let mut parent: Option<String> = None;
70        let mut instance: Option<String> = None;
71        for (k, v) in tokenize(s)? {
72            match k {
73                "tag" => tag = Some(v.to_string()),
74                "instance" => instance = Some(v.to_string()),
75                "parent" => parent = Some(v.to_string()),
76                other => return Err(format!("unknown key: {other}")),
77            }
78        }
79        match (tag, instance, parent) {
80            (Some(t), None, None) => Ok(Target::Tag { agent_tag: t }),
81            (Some(_), _, _) => Err(
82                "tag is mutually exclusive with instance and parent".to_string(),
83            ),
84            (None, Some(i), p) => Ok(Target::Direct {
85                parent_agent_instance_hierarchy: p,
86                agent_instance: i,
87            }),
88            (None, None, _) => Err("instance or tag is required".to_string()),
89        }
90    }
91}
92
93impl Target {
94    /// Inverse of [`FromStr::from_str`]: emit the docker-style
95    /// `key=value,key=value` wire form for round-tripping.
96    pub fn into_arg_string(&self) -> String {
97        match self {
98            Target::Me => "me".to_string(),
99            Target::Tag { agent_tag } => format!("tag={agent_tag}"),
100            Target::Direct {
101                parent_agent_instance_hierarchy: None,
102                agent_instance,
103            } => format!("instance={agent_instance}"),
104            Target::Direct {
105                parent_agent_instance_hierarchy: Some(p),
106                agent_instance,
107            } => format!("instance={agent_instance},parent={p}"),
108        }
109    }
110}
111
112#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
113#[schemars(rename = "cli.command.agents.logs.read.all.Request")]
114pub struct Request {
115    pub path_type: Path,
116    pub targets: Vec<Target>,
117    /// Skip rows with `logs.messages."index" <= after_id`. Use the
118    /// highest `id` from a previous page to paginate forward.
119    #[serde(default, skip_serializing_if = "Option::is_none")]
120    #[schemars(extend("omitempty" = true))]
121    pub after_id: Option<i64>,
122    /// Cap on rows scanned per target. `None` = unlimited.
123    #[serde(default, skip_serializing_if = "Option::is_none")]
124    #[schemars(extend("omitempty" = true))]
125    pub limit: Option<i64>,
126    #[serde(flatten)]
127    pub base: crate::cli::command::RequestBase,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
131#[schemars(rename = "cli.command.agents.logs.read.all.Path")]
132pub enum Path {
133    #[serde(rename = "agents/logs/read/all")]
134    AgentsLogsReadAll,
135}
136
137impl CommandRequest for Request {
138    fn into_command(&self) -> Vec<String> {
139        let mut argv = vec![
140            "agents".to_string(),
141            "logs".to_string(),
142            "read".to_string(),
143            "all".to_string(),
144        ];
145        for target in &self.targets {
146            argv.push("--target".to_string());
147            argv.push(target.into_arg_string());
148        }
149        if let Some(after_id) = self.after_id {
150            argv.push("--after-id".to_string());
151            argv.push(after_id.to_string());
152        }
153        if let Some(limit) = self.limit {
154            argv.push("--limit".to_string());
155            argv.push(limit.to_string());
156        }
157        self.base.push_flags(&mut argv);
158        argv
159    }
160
161    fn request_base(&self) -> &crate::cli::command::RequestBase {
162        &self.base
163    }
164
165    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
166        Some(&mut self.base)
167    }
168}
169
170/// Type tag for one `ClientNotification` part — the table-kind of
171/// the underlying `message_queue_*` content row.
172#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
173#[serde(rename_all = "snake_case")]
174#[schemars(rename = "cli.command.agents.logs.read.all.ClientNotificationPartType")]
175pub enum ClientNotificationPartType {
176    Text,
177    Image,
178    Audio,
179    Video,
180    File,
181}
182
183/// One row inside a `ClientNotification` block — a consumed
184/// `message_queue_contents` entry. `timestamp_queued` is on the
185/// enclosing block (it lives on `message_queue.enqueued_at`, not
186/// per-content); only the per-row consumption timestamp is here.
187#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
188#[schemars(rename = "cli.command.agents.logs.read.all.ClientNotificationPart")]
189pub struct ClientNotificationPart {
190    /// `logs.messages."index"` for this row. Pass to
191    /// `agents logs read id <n>` to fetch the consumed
192    /// `message_queue_contents` body.
193    pub id: i64,
194    /// `logs.messages."timestamp"` — when the receiver consumed
195    /// this content row and the LogWriter committed the
196    /// consumption event.
197    pub timestamp_delivered: i64,
198    pub r#type: ClientNotificationPartType,
199}
200
201/// Type tag for one `AssistantResponse` part — the table-kind of
202/// the underlying `assistant_response_*` row.
203#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
204#[serde(rename_all = "snake_case")]
205#[schemars(rename = "cli.command.agents.logs.read.all.AssistantResponsePartType")]
206pub enum AssistantResponsePartType {
207    Refusal,
208    Reasoning,
209    ToolCall,
210    Text,
211    Image,
212    Audio,
213    Video,
214    File,
215}
216
217/// One row inside an `AssistantResponse` block.
218#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
219#[schemars(rename = "cli.command.agents.logs.read.all.AssistantResponsePart")]
220pub struct AssistantResponsePart {
221    /// `logs.messages."index"` for this row. Pass to
222    /// `agents logs read id <n>` for the typed body.
223    pub id: i64,
224    pub timestamp_delivered: i64,
225    pub r#type: AssistantResponsePartType,
226    /// `function.name`, present only for `type = tool_call` rows
227    /// (`objectiveai.assistant_response_tool_calls.function_name`);
228    /// absent for every other part type. Surfaced here so callers
229    /// can dedupe tool calls by name without a per-row
230    /// `agents logs read id` round-trip.
231    #[serde(default, skip_serializing_if = "Option::is_none")]
232    #[schemars(extend("omitempty" = true))]
233    pub function_name: Option<String>,
234}
235
236/// Type tag for one `ToolResponse` part. `Container` is the
237/// `tool_response` head row (carries `tool_call_id`); the other
238/// five are content slots.
239#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
240#[serde(rename_all = "snake_case")]
241#[schemars(rename = "cli.command.agents.logs.read.all.ToolResponsePartType")]
242pub enum ToolResponsePartType {
243    Container,
244    Text,
245    Image,
246    Audio,
247    Video,
248    File,
249}
250
251/// One row inside a `ToolResponse` block.
252#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
253#[schemars(rename = "cli.command.agents.logs.read.all.ToolResponsePart")]
254pub struct ToolResponsePart {
255    /// `logs.messages."index"` for this row. Pass to
256    /// `agents logs read id <n>` for the typed body.
257    pub id: i64,
258    pub timestamp_delivered: i64,
259    pub r#type: ToolResponsePartType,
260}
261
262/// One yielded item. Three single-row request blobs +
263/// three multi-row blocks. Every variant carries `response_id`.
264/// `sender_agent_instance_hierarchy` appears only on the four
265/// variants that have a sender ≠ producer: the three request
266/// variants (caller AIH) and `ClientNotification` (enqueuer
267/// AIH). `AssistantResponse` and `ToolResponse` are emitted BY
268/// the agent itself — their `agent_instance_hierarchy` IS the
269/// producer, so no separate sender field exists.
270///
271/// Block-coalescing boundary tuple: `(class,
272/// agent_instance_hierarchy, response_id)` for assistant/tool
273/// blocks; `(class, agent_instance_hierarchy, response_id,
274/// sender, message_queue_id)` for `ClientNotification` blocks.
275/// One `ClientNotification` block = one consumed
276/// `message_queue` parent row, so `timestamp_queued` and
277/// `sender_agent_instance_hierarchy` are well-defined
278/// block-level.
279#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
280#[serde(tag = "type", rename_all = "snake_case")]
281#[schemars(rename = "cli.command.agents.logs.read.all.ResponseItem")]
282pub enum ResponseItem {
283    #[schemars(title = "AgentCompletionRequest")]
284    AgentCompletionRequest {
285        id: i64,
286        agent_instance_hierarchy: String,
287        /// AIH of the caller who issued the request — from
288        /// `logs.agent_completion_requests.sender_*`.
289        sender_agent_instance_hierarchy: String,
290        timestamp_delivered: i64,
291        response_id: String,
292    },
293    #[schemars(title = "VectorCompletionRequest")]
294    VectorCompletionRequest {
295        id: i64,
296        agent_instance_hierarchy: String,
297        sender_agent_instance_hierarchy: String,
298        timestamp_delivered: i64,
299        response_id: String,
300    },
301    #[schemars(title = "FunctionExecutionRequest")]
302    FunctionExecutionRequest {
303        id: i64,
304        agent_instance_hierarchy: String,
305        sender_agent_instance_hierarchy: String,
306        timestamp_delivered: i64,
307        response_id: String,
308    },
309    #[schemars(title = "ClientNotification")]
310    ClientNotification {
311        agent_instance_hierarchy: String,
312        /// AIH of the enqueuer — from `message_queue.sender_*`
313        /// joined through `message_queue_contents.id`.
314        sender_agent_instance_hierarchy: String,
315        response_id: String,
316        /// `message_queue.enqueued_at` of the consumed parent
317        /// queue row. One block = one parent queue row, so this
318        /// is well-defined block-level (each part's individual
319        /// `timestamp_delivered` still records its own
320        /// consumption moment).
321        timestamp_queued: i64,
322        /// Idempotency token, if the row was enqueued with
323        /// `--key` via `agents message --enqueue-with-key`.
324        /// Surfacing it lets readers attribute a notification
325        /// to a specific enqueue beyond just the sender AIH.
326        #[serde(default, skip_serializing_if = "Option::is_none")]
327        #[schemars(extend("omitempty" = true))]
328        key: Option<String>,
329        parts: Vec<ClientNotificationPart>,
330    },
331    /// Agent emissions — the agent IS the producer of these
332    /// rows, so there's no separate sender. The
333    /// `agent_instance_hierarchy` field IS the sender.
334    #[schemars(title = "AssistantResponse")]
335    AssistantResponse {
336        agent_instance_hierarchy: String,
337        response_id: String,
338        parts: Vec<AssistantResponsePart>,
339    },
340    #[schemars(title = "ToolResponse")]
341    ToolResponse {
342        agent_instance_hierarchy: String,
343        response_id: String,
344        parts: Vec<ToolResponsePart>,
345    },
346}
347
348#[derive(clap::Args)]
349pub struct Args {
350    /// One or more `--target instance=L[,parent=P]` entries. `parent`
351    /// defaults to the cli's own `Config.agent_instance_hierarchy`
352    /// when omitted on an individual target. Also accepts
353    /// `--target tag=T` and `--target me` (the caller's own AIH).
354    #[arg(long = "target", required = true)]
355    pub targets: Vec<String>,
356    /// Skip rows with `logs.messages."index" <= after_id` per target.
357    #[arg(long)]
358    pub after_id: Option<i64>,
359    /// Cap on rows scanned per target.
360    #[arg(long)]
361    pub limit: Option<i64>,
362    #[command(flatten)]
363    pub base: crate::cli::command::RequestBaseArgs,
364}
365
366#[derive(clap::Args)]
367#[command(args_conflicts_with_subcommands = true)]
368pub struct Command {
369    #[command(flatten)]
370    pub args: Args,
371    #[command(subcommand)]
372    pub schema: Option<Schema>,
373}
374
375#[derive(clap::Subcommand)]
376pub enum Schema {
377    /// Emit the JSON Schema for this leaf's `Request` type and exit.
378    RequestSchema(request_schema::Args),
379    /// Emit the JSON Schema for this leaf's `Response` type and exit.
380    ResponseSchema(response_schema::Args),
381}
382
383impl TryFrom<Args> for Request {
384    type Error = crate::cli::command::FromArgsError;
385    fn try_from(args: Args) -> Result<Self, Self::Error> {
386        let targets = args
387            .targets
388            .iter()
389            .map(|s| {
390                s.parse::<Target>().map_err(|msg| {
391                    crate::cli::command::FromArgsError::path_parse("target", msg)
392                })
393            })
394            .collect::<Result<Vec<_>, _>>()?;
395        Ok(Self {
396            path_type: Path::AgentsLogsReadAll,
397            targets,
398            after_id: args.after_id,
399            limit: args.limit,
400            base: args.base.into(),
401        })
402    }
403}
404
405#[cfg(feature = "cli-executor")]
406pub async fn execute<E: crate::cli::command::CommandExecutor>(
407    executor: &E,
408    mut request: Request,
409
410        agent_arguments: Option<&crate::cli::command::AgentArguments>,
411    ) -> Result<E::Stream<ResponseItem>, E::Error> {
412    request.base.clear_transform();
413    executor.execute(request, agent_arguments).await
414}
415
416#[cfg(feature = "cli-executor")]
417pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
418    executor: &E,
419    mut request: Request,
420    transform: crate::cli::command::Transform,
421
422        agent_arguments: Option<&crate::cli::command::AgentArguments>,
423    ) -> Result<E::Stream<serde_json::Value>, E::Error> {
424    request.base.set_transform(transform);
425    executor.execute(request, agent_arguments).await
426}
427
428#[cfg(feature = "mcp")]
429impl crate::cli::command::CommandResponse for ResponseItem {
430    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
431        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
432    }
433}
434
435pub mod request_schema;
436
437
438pub mod response_schema;
439
440#[cfg(test)]
441mod tests {
442    use super::Target;
443
444    #[test]
445    fn me_target_parses_and_round_trips() {
446        assert_eq!("me".parse::<Target>(), Ok(Target::Me));
447        // Surrounding whitespace is trimmed, same as the other forms.
448        assert_eq!("  me  ".parse::<Target>(), Ok(Target::Me));
449        assert_eq!(Target::Me.into_arg_string(), "me");
450    }
451
452    #[test]
453    fn me_is_mutually_exclusive_with_other_keys() {
454        // `me` combined with any other key falls through to the
455        // `key=value` tokenizer and is rejected.
456        assert!("me,instance=x".parse::<Target>().is_err());
457    }
458
459    #[test]
460    fn json_wire_shape_is_by_me() {
461        assert_eq!(
462            serde_json::to_value(Target::Me).unwrap(),
463            serde_json::json!({ "by": "me" }),
464        );
465    }
466}