Skip to main content

objectiveai_sdk/cli/command/agents/queue/read/pending/
mod.rs

1//! `agents queue read pending` — stream every pending (i.e.
2//! `active = TRUE`) `message_queue` row for the resolved targets,
3//! coalesced into parts-grouped `ResponseItem` blocks. Symmetric
4//! with `agents logs read all`'s `ClientNotification` shape — same
5//! `Target` input, same per-part type tag, same `id` shape (an i64
6//! you pass to the matching `read id` verb to drill into the body).
7
8use crate::cli::command::CommandRequest;
9
10#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
11#[schemars(rename = "cli.command.agents.queue.read.pending.Request")]
12pub struct Request {
13    pub path_type: Path,
14    pub targets: Vec<Target>,
15    /// Skip rows with `message_queue_contents.id <= after_id`. Use
16    /// the highest `id` from a previous page to paginate forward.
17    #[serde(default, skip_serializing_if = "Option::is_none")]
18    #[schemars(extend("omitempty" = true))]
19    pub after_id: Option<i64>,
20    /// Cap on rows scanned per target. `None` = unlimited.
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    #[schemars(extend("omitempty" = true))]
23    pub limit: Option<i64>,
24    pub jq: Option<String>,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
28#[schemars(rename = "cli.command.agents.queue.read.pending.Path")]
29pub enum Path {
30    #[serde(rename = "agents/queue/read/pending")]
31    AgentsQueueReadPending,
32}
33
34impl CommandRequest for Request {
35    fn into_command(&self) -> Vec<String> {
36        let mut argv = vec![
37            "agents".to_string(),
38            "queue".to_string(),
39            "read".to_string(),
40            "pending".to_string(),
41        ];
42        for target in &self.targets {
43            argv.push("--target".to_string());
44            argv.push(target.into_arg_string());
45        }
46        if let Some(after_id) = self.after_id {
47            argv.push("--after-id".to_string());
48            argv.push(after_id.to_string());
49        }
50        if let Some(limit) = self.limit {
51            argv.push("--limit".to_string());
52            argv.push(limit.to_string());
53        }
54        if let Some(jq) = &self.jq {
55            argv.push("--jq".to_string());
56            argv.push(jq.clone());
57        }
58        argv
59    }
60}
61
62// Share `Target` + `ClientNotificationPartType` with
63// `agents logs read all` — same per-target input shape, same
64// per-part 5-variant kind discriminator. `agents queue read pending`
65// is the pre-execution mirror of logs read all's
66// `ClientNotification` block.
67pub use super::super::super::logs::read::all::{
68    ClientNotificationPartType as QueuePartType, Target,
69};
70
71/// One row inside a `ResponseItem` block — a
72/// `message_queue_contents` entry. The `id` is the
73/// `message_queue_contents.id`, which you pass to
74/// `agents queue read id <n>` to drill into the body.
75/// `timestamp_queued` is on the enclosing block, not here
76/// (one block = one `message_queue` parent row, sharing one
77/// `enqueued_at`).
78#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
79#[schemars(rename = "cli.command.agents.queue.read.pending.QueuePart")]
80pub struct QueuePart {
81    pub id: i64,
82    pub r#type: QueuePartType,
83}
84
85/// One pending `message_queue` row, with its content rows
86/// grouped as `parts`. Two variants — direct AIH target or tag
87/// target — both flat (no nested `LookupState`).
88#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
89#[serde(tag = "by", rename_all = "snake_case")]
90#[schemars(rename = "cli.command.agents.queue.read.pending.ResponseItem")]
91pub enum ResponseItem {
92    #[schemars(title = "AgentInstanceHierarchy")]
93    AgentInstanceHierarchy {
94        /// `message_queue.id` — the row-level id this block
95        /// represents. Pass to `agents queue delete <id>` to
96        /// soft-flip the entire row (all parts) in one call.
97        /// Distinct from each `QueuePart.id` (which is a
98        /// `message_queue_contents.id` for drilling into one
99        /// content slot via `agents queue read id`).
100        delete_id: i64,
101        agent_instance_hierarchy: String,
102        /// AIH of the caller who enqueued — from
103        /// `message_queue.sender_*`.
104        sender_agent_instance_hierarchy: String,
105        /// `message_queue.enqueued_at`. One block = one parent
106        /// `message_queue` row, so this is well-defined
107        /// block-level.
108        timestamp_queued: i64,
109        /// Idempotency token, if the row was enqueued with `--key`.
110        #[serde(default, skip_serializing_if = "Option::is_none")]
111        #[schemars(extend("omitempty" = true))]
112        key: Option<String>,
113        parts: Vec<QueuePart>,
114    },
115    #[schemars(title = "Tag")]
116    Tag {
117        /// `message_queue.id`. Pass to `agents queue delete <id>`.
118        delete_id: i64,
119        agent_tag: String,
120        sender_agent_instance_hierarchy: String,
121        timestamp_queued: i64,
122        #[serde(default, skip_serializing_if = "Option::is_none")]
123        #[schemars(extend("omitempty" = true))]
124        key: Option<String>,
125        parts: Vec<QueuePart>,
126    },
127}
128
129#[derive(clap::Args)]
130pub struct Args {
131    /// One or more `--target instance=L[,parent=P]` entries.
132    /// `parent` defaults to the cli's own
133    /// `Config.agent_instance_hierarchy` when omitted on an
134    /// individual target. Also accepts `--target tag=T` and
135    /// `--target me` (the caller's own AIH).
136    #[arg(long = "target", required = true)]
137    pub targets: Vec<String>,
138    /// Skip rows with `message_queue_contents.id <= after_id`.
139    #[arg(long)]
140    pub after_id: Option<i64>,
141    /// Cap on rows scanned per target.
142    #[arg(long)]
143    pub limit: Option<i64>,
144    /// jq filter applied to the JSON output.
145    #[arg(long)]
146    pub jq: Option<String>,
147}
148
149#[derive(clap::Args)]
150#[command(args_conflicts_with_subcommands = true)]
151pub struct Command {
152    #[command(flatten)]
153    pub args: Args,
154    #[command(subcommand)]
155    pub schema: Option<Schema>,
156}
157
158#[derive(clap::Subcommand)]
159pub enum Schema {
160    /// Emit the JSON Schema for this leaf's `Request` type and exit.
161    RequestSchema(request_schema::Args),
162    /// Emit the JSON Schema for this leaf's `Response` type and exit.
163    ResponseSchema(response_schema::Args),
164}
165
166impl TryFrom<Args> for Request {
167    type Error = crate::cli::command::FromArgsError;
168    fn try_from(args: Args) -> Result<Self, Self::Error> {
169        let targets = args
170            .targets
171            .iter()
172            .map(|s| {
173                s.parse::<Target>().map_err(|msg| {
174                    crate::cli::command::FromArgsError::path_parse("target", msg)
175                })
176            })
177            .collect::<Result<Vec<_>, _>>()?;
178        Ok(Self {
179            path_type: Path::AgentsQueueReadPending,
180            targets,
181            after_id: args.after_id,
182            limit: args.limit,
183            jq: args.jq,
184        })
185    }
186}
187
188#[cfg(feature = "cli-executor")]
189pub async fn execute<E: crate::cli::command::CommandExecutor>(
190    executor: &E,
191    mut request: Request,
192    agent_arguments: Option<&crate::cli::command::AgentArguments>,
193) -> Result<E::Stream<ResponseItem>, E::Error> {
194    request.jq = None;
195    executor.execute(request, agent_arguments).await
196}
197
198#[cfg(feature = "cli-executor")]
199pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
200    executor: &E,
201    mut request: Request,
202    jq: String,
203    agent_arguments: Option<&crate::cli::command::AgentArguments>,
204) -> Result<E::Stream<serde_json::Value>, E::Error> {
205    request.jq = Some(jq);
206    executor.execute(request, agent_arguments).await
207}
208
209#[cfg(feature = "mcp")]
210impl crate::cli::command::CommandResponse for ResponseItem {
211    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
212        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
213    }
214}
215
216pub mod request_schema;
217
218pub mod response_schema;