Skip to main content

objectiveai_sdk/cli/command/agents/queue/read/pending/
mod.rs

1//! `agents queue read pending` — stream every pending (i.e.
2//! `active = TRUE`) `message_queue` row for the resolved targets,
3//! coalesced into parts-grouped `ResponseItem` blocks. Symmetric
4//! with `agents logs read all`'s `ClientNotification` shape — same
5//! `Target` input, same per-part type tag, same `id` shape (an i64
6//! you pass to the matching `read id` verb to drill into the body).
7
8use crate::cli::command::CommandRequest;
9
10#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
11#[schemars(rename = "cli.command.agents.queue.read.pending.Request")]
12pub struct Request {
13    pub path_type: Path,
14    pub targets: Vec<Target>,
15    /// Skip rows with `message_queue_contents.id <= after_id`. Use
16    /// the highest `id` from a previous page to paginate forward.
17    #[serde(default, skip_serializing_if = "Option::is_none")]
18    #[schemars(extend("omitempty" = true))]
19    pub after_id: Option<i64>,
20    /// Cap on rows scanned per target. `None` = unlimited.
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    #[schemars(extend("omitempty" = true))]
23    pub limit: Option<i64>,
24    #[serde(flatten)]
25    pub base: crate::cli::command::RequestBase,
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
29#[schemars(rename = "cli.command.agents.queue.read.pending.Path")]
30pub enum Path {
31    #[serde(rename = "agents/queue/read/pending")]
32    AgentsQueueReadPending,
33}
34
35impl CommandRequest for Request {
36    fn into_command(&self) -> Vec<String> {
37        let mut argv = vec![
38            "agents".to_string(),
39            "queue".to_string(),
40            "read".to_string(),
41            "pending".to_string(),
42        ];
43        for target in &self.targets {
44            argv.push("--target".to_string());
45            argv.push(target.into_arg_string());
46        }
47        if let Some(after_id) = self.after_id {
48            argv.push("--after-id".to_string());
49            argv.push(after_id.to_string());
50        }
51        if let Some(limit) = self.limit {
52            argv.push("--limit".to_string());
53            argv.push(limit.to_string());
54        }
55        self.base.push_flags(&mut argv);
56        argv
57    }
58
59    fn request_base(&self) -> &crate::cli::command::RequestBase {
60        &self.base
61    }
62
63    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
64        Some(&mut self.base)
65    }
66}
67
68// Share `Target` + `ClientNotificationPartType` with
69// `agents logs read all` — same per-target input shape, same
70// per-part 5-variant kind discriminator. `agents queue read pending`
71// is the pre-execution mirror of logs read all's
72// `ClientNotification` block.
73pub use super::super::super::logs::read::all::{
74    ClientNotificationPartType as QueuePartType, Target,
75};
76
77/// One row inside a `ResponseItem` block — a
78/// `message_queue_contents` entry. The `id` is the
79/// `message_queue_contents.id`, which you pass to
80/// `agents queue read id <n>` to drill into the body.
81/// `timestamp_queued` is on the enclosing block, not here
82/// (one block = one `message_queue` parent row, sharing one
83/// `enqueued_at`).
84#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
85#[schemars(rename = "cli.command.agents.queue.read.pending.QueuePart")]
86pub struct QueuePart {
87    pub id: i64,
88    pub r#type: QueuePartType,
89}
90
91/// One pending `message_queue` row, with its content rows
92/// grouped as `parts`. Two variants — direct AIH target or tag
93/// target — both flat (no nested `LookupState`).
94#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
95#[serde(tag = "by", rename_all = "snake_case")]
96#[schemars(rename = "cli.command.agents.queue.read.pending.ResponseItem")]
97pub enum ResponseItem {
98    #[schemars(title = "AgentInstanceHierarchy")]
99    AgentInstanceHierarchy {
100        /// `message_queue.id` — the row-level id this block
101        /// represents. Pass to `agents queue delete <id>` to
102        /// soft-flip the entire row (all parts) in one call.
103        /// Distinct from each `QueuePart.id` (which is a
104        /// `message_queue_contents.id` for drilling into one
105        /// content slot via `agents queue read id`).
106        delete_id: i64,
107        agent_instance_hierarchy: String,
108        /// AIH of the caller who enqueued — from
109        /// `message_queue.sender_*`.
110        sender_agent_instance_hierarchy: String,
111        /// `message_queue.enqueued_at`. One block = one parent
112        /// `message_queue` row, so this is well-defined
113        /// block-level.
114        timestamp_queued: i64,
115        /// Idempotency token, if the row was enqueued with `--key`.
116        #[serde(default, skip_serializing_if = "Option::is_none")]
117        #[schemars(extend("omitempty" = true))]
118        key: Option<String>,
119        parts: Vec<QueuePart>,
120    },
121    #[schemars(title = "Tag")]
122    Tag {
123        /// `message_queue.id`. Pass to `agents queue delete <id>`.
124        delete_id: i64,
125        agent_tag: String,
126        sender_agent_instance_hierarchy: String,
127        timestamp_queued: i64,
128        #[serde(default, skip_serializing_if = "Option::is_none")]
129        #[schemars(extend("omitempty" = true))]
130        key: Option<String>,
131        parts: Vec<QueuePart>,
132    },
133}
134
135#[derive(clap::Args)]
136pub struct Args {
137    /// One or more `--target instance=L[,parent=P]` entries.
138    /// `parent` defaults to the cli's own
139    /// `Config.agent_instance_hierarchy` when omitted on an
140    /// individual target. Also accepts `--target tag=T` and
141    /// `--target me` (the caller's own AIH).
142    #[arg(long = "target", required = true)]
143    pub targets: Vec<String>,
144    /// Skip rows with `message_queue_contents.id <= after_id`.
145    #[arg(long)]
146    pub after_id: Option<i64>,
147    /// Cap on rows scanned per target.
148    #[arg(long)]
149    pub limit: Option<i64>,
150    #[command(flatten)]
151    pub base: crate::cli::command::RequestBaseArgs,
152}
153
154#[derive(clap::Args)]
155#[command(args_conflicts_with_subcommands = true)]
156pub struct Command {
157    #[command(flatten)]
158    pub args: Args,
159    #[command(subcommand)]
160    pub schema: Option<Schema>,
161}
162
163#[derive(clap::Subcommand)]
164pub enum Schema {
165    /// Emit the JSON Schema for this leaf's `Request` type and exit.
166    RequestSchema(request_schema::Args),
167    /// Emit the JSON Schema for this leaf's `Response` type and exit.
168    ResponseSchema(response_schema::Args),
169}
170
171impl TryFrom<Args> for Request {
172    type Error = crate::cli::command::FromArgsError;
173    fn try_from(args: Args) -> Result<Self, Self::Error> {
174        let targets = args
175            .targets
176            .iter()
177            .map(|s| {
178                s.parse::<Target>().map_err(|msg| {
179                    crate::cli::command::FromArgsError::path_parse("target", msg)
180                })
181            })
182            .collect::<Result<Vec<_>, _>>()?;
183        Ok(Self {
184            path_type: Path::AgentsQueueReadPending,
185            targets,
186            after_id: args.after_id,
187            limit: args.limit,
188            base: args.base.into(),
189        })
190    }
191}
192
193#[cfg(feature = "cli-executor")]
194pub async fn execute<E: crate::cli::command::CommandExecutor>(
195    executor: &E,
196    mut request: Request,
197    agent_arguments: Option<&crate::cli::command::AgentArguments>,
198) -> Result<E::Stream<ResponseItem>, E::Error> {
199    request.base.clear_transform();
200    executor.execute(request, agent_arguments).await
201}
202
203#[cfg(feature = "cli-executor")]
204pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
205    executor: &E,
206    mut request: Request,
207    transform: crate::cli::command::Transform,
208    agent_arguments: Option<&crate::cli::command::AgentArguments>,
209) -> Result<E::Stream<serde_json::Value>, E::Error> {
210    request.base.set_transform(transform);
211    executor.execute(request, agent_arguments).await
212}
213
214#[cfg(feature = "mcp")]
215impl crate::cli::command::CommandResponse for ResponseItem {
216    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
217        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
218    }
219}
220
221pub mod request_schema;
222
223pub mod response_schema;