Skip to main content

objectiveai_sdk/cli/command/agents/queue/
mod.rs

1//! `agents queue` — deferred prompts queue. Three top-level
2//! subcommands:
3//!
4//! - `delete` — remove one queued prompt by id.
5//! - `deliver` — wake every queue-pending strict descendant of the
6//!   caller (try-lock each AIH; spawn the idle ones with empty
7//!   messages so they drain their own queues).
8//! - `read` (nested) — sub-tier whose only leaf today is `id`,
9//!   which fetches one piece of queued content by its
10//!   `prompt_contents.id`. The wire shape mirrors `RichContentPart`
11//!   (tagged by `type`).
12//!
13//! Enqueue is no longer a CLI verb here — use `agents message`
14//! instead; it handles persistence under the hood. Drain is also
15//! gone — the API consumes queue rows directly via the WS reverse-
16//! attach `read_message_queue` / `clear_message_queue` server
17//! requests once a matching hierarchy comes online.
18
19use crate::cli::command::CommandRequest;
20
21pub mod delete;
22pub mod deliver;
23pub mod read;
24
25#[derive(clap::Subcommand)]
26pub enum Command {
27    /// Delete one queued prompt by id.
28    Delete(delete::Command),
29    /// Wake every queue-pending descendant agent of the caller.
30    Deliver(deliver::Command),
31    /// Read queued content — `read id <id>` for a single content
32    /// piece, `read pending [parent]` for the list of queued
33    /// prompts under a parent.
34    Read(ReadCommand),
35}
36
37/// Intermediate clap level for the `read` sub-tier. Splitting it
38/// into its own wrapper (rather than a fattened `ReadId` variant on
39/// [`Command`]) gives the CLI surface `agents queue read id <num>`
40/// to match the user's invocation style and keeps the door open for
41/// additional `read <…>` leaves later.
42#[derive(clap::Args)]
43pub struct ReadCommand {
44    #[command(subcommand)]
45    pub sub: read::Command,
46}
47
48#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
49#[serde(untagged)]
50#[schemars(rename = "cli.command.agents.queue.Request")]
51pub enum Request {
52    #[schemars(title = "Delete")]
53    Delete(delete::Request),
54    #[schemars(title = "DeleteRequestSchema")]
55    DeleteRequestSchema(delete::request_schema::Request),
56    #[schemars(title = "DeleteResponseSchema")]
57    DeleteResponseSchema(delete::response_schema::Request),
58    #[schemars(title = "Deliver")]
59    Deliver(deliver::Request),
60    #[schemars(title = "DeliverRequestSchema")]
61    DeliverRequestSchema(deliver::request_schema::Request),
62    #[schemars(title = "DeliverResponseSchema")]
63    DeliverResponseSchema(deliver::response_schema::Request),
64    #[schemars(title = "Read")]
65    Read(read::Request),
66}
67
68// Exempt from json-schema coverage: tier aggregate (see the root
69// `ResponseItem` in command.rs - TS7056).
70#[objectiveai_sdk_macros::json_schema_ignore]
71#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
72#[schemars(rename = "cli.command.agents.queue.ResponseItem")]
73#[serde(untagged)]
74pub enum ResponseItem {
75    #[schemars(title = "Delete")]
76    Delete(delete::Response),
77    #[schemars(title = "DeleteRequestSchema")]
78    DeleteRequestSchema(delete::request_schema::Response),
79    #[schemars(title = "DeleteResponseSchema")]
80    DeleteResponseSchema(delete::response_schema::Response),
81    #[schemars(title = "Deliver")]
82    Deliver(deliver::ResponseItem),
83    #[schemars(title = "DeliverRequestSchema")]
84    DeliverRequestSchema(deliver::request_schema::Response),
85    #[schemars(title = "DeliverResponseSchema")]
86    DeliverResponseSchema(deliver::response_schema::Response),
87    #[schemars(title = "Read")]
88    Read(read::ResponseItem),
89}
90
91#[cfg(feature = "mcp")]
92impl crate::cli::command::CommandResponse for ResponseItem {
93    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
94        match self {
95            ResponseItem::Delete(v) => v.into_mcp(),
96            ResponseItem::DeleteRequestSchema(v) => v.into_mcp(),
97            ResponseItem::DeleteResponseSchema(v) => v.into_mcp(),
98            ResponseItem::Deliver(v) => v.into_mcp(),
99            ResponseItem::DeliverRequestSchema(v) => v.into_mcp(),
100            ResponseItem::DeliverResponseSchema(v) => v.into_mcp(),
101            ResponseItem::Read(v) => v.into_mcp(),
102        }
103    }
104}
105
106impl TryFrom<Command> for Request {
107    type Error = crate::cli::command::FromArgsError;
108    fn try_from(command: Command) -> Result<Self, Self::Error> {
109        match command {
110            Command::Delete(cmd) => match cmd.schema {
111                None => Ok(Request::Delete(delete::Request::try_from(cmd.args)?)),
112                Some(delete::Schema::RequestSchema(args)) => Ok(
113                    Request::DeleteRequestSchema(delete::request_schema::Request::try_from(args)?),
114                ),
115                Some(delete::Schema::ResponseSchema(args)) => Ok(
116                    Request::DeleteResponseSchema(delete::response_schema::Request::try_from(args)?),
117                ),
118            },
119            Command::Deliver(cmd) => match cmd.schema {
120                None => Ok(Request::Deliver(deliver::Request::try_from(cmd.args)?)),
121                Some(deliver::Schema::RequestSchema(args)) => Ok(
122                    Request::DeliverRequestSchema(deliver::request_schema::Request::try_from(args)?),
123                ),
124                Some(deliver::Schema::ResponseSchema(args)) => Ok(
125                    Request::DeliverResponseSchema(deliver::response_schema::Request::try_from(args)?),
126                ),
127            },
128            Command::Read(rc) => Ok(Request::Read(read::Request::try_from(rc.sub)?)),
129        }
130    }
131}
132
133impl CommandRequest for Request {
134    fn into_command(&self) -> Vec<String> {
135        match self {
136            Request::Delete(inner) => inner.into_command(),
137            Request::DeleteRequestSchema(inner) => inner.into_command(),
138            Request::DeleteResponseSchema(inner) => inner.into_command(),
139            Request::Deliver(inner) => inner.into_command(),
140            Request::DeliverRequestSchema(inner) => inner.into_command(),
141            Request::DeliverResponseSchema(inner) => inner.into_command(),
142            Request::Read(inner) => inner.into_command(),
143        }
144    }
145}
146
147#[cfg(feature = "cli-executor")]
148pub async fn execute<E: crate::cli::command::CommandExecutor>(
149    executor: &E,
150    request: Request,
151    agent_arguments: Option<&crate::cli::command::AgentArguments>,
152) -> Result<
153    std::pin::Pin<Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>>,
154    E::Error,
155> {
156    use futures::StreamExt;
157    let stream: std::pin::Pin<
158        Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>,
159    > = match request {
160        Request::Delete(req) => {
161            let value = delete::execute(executor, req, agent_arguments).await?;
162            Box::pin(crate::cli::command::StreamOnce::new(Ok(
163                ResponseItem::Delete(value),
164            )))
165        }
166        Request::DeleteRequestSchema(req) => {
167            let value =
168                delete::request_schema::execute(executor, req, agent_arguments).await?;
169            Box::pin(crate::cli::command::StreamOnce::new(Ok(
170                ResponseItem::DeleteRequestSchema(value),
171            )))
172        }
173        Request::DeleteResponseSchema(req) => {
174            let value =
175                delete::response_schema::execute(executor, req, agent_arguments).await?;
176            Box::pin(crate::cli::command::StreamOnce::new(Ok(
177                ResponseItem::DeleteResponseSchema(value),
178            )))
179        }
180        Request::Deliver(req) => {
181            let inner = deliver::execute(executor, req, agent_arguments).await?;
182            Box::pin(inner.map(|r| r.map(ResponseItem::Deliver)))
183        }
184        Request::DeliverRequestSchema(req) => {
185            let value =
186                deliver::request_schema::execute(executor, req, agent_arguments).await?;
187            Box::pin(crate::cli::command::StreamOnce::new(Ok(
188                ResponseItem::DeliverRequestSchema(value),
189            )))
190        }
191        Request::DeliverResponseSchema(req) => {
192            let value =
193                deliver::response_schema::execute(executor, req, agent_arguments).await?;
194            Box::pin(crate::cli::command::StreamOnce::new(Ok(
195                ResponseItem::DeliverResponseSchema(value),
196            )))
197        }
198        Request::Read(req) => {
199            let inner = read::execute(executor, req, agent_arguments).await?;
200            Box::pin(inner.map(|r| r.map(ResponseItem::Read)))
201        }
202    };
203    Ok(stream)
204}
205
206#[cfg(feature = "cli-executor")]
207pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
208    executor: &E,
209    request: Request,
210    jq: String,
211    agent_arguments: Option<&crate::cli::command::AgentArguments>,
212) -> Result<
213    std::pin::Pin<Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>>,
214    E::Error,
215> {
216    let stream: std::pin::Pin<
217        Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>,
218    > = match request {
219        Request::Delete(req) => {
220            let value = delete::execute_jq(executor, req, jq, agent_arguments).await?;
221            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
222        }
223        Request::DeleteRequestSchema(req) => {
224            let value =
225                delete::request_schema::execute_jq(executor, req, jq, agent_arguments).await?;
226            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
227        }
228        Request::DeleteResponseSchema(req) => {
229            let value =
230                delete::response_schema::execute_jq(executor, req, jq, agent_arguments).await?;
231            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
232        }
233        Request::Deliver(req) => {
234            let inner = deliver::execute_jq(executor, req, jq, agent_arguments).await?;
235            Box::pin(inner)
236        }
237        Request::DeliverRequestSchema(req) => {
238            let value =
239                deliver::request_schema::execute_jq(executor, req, jq, agent_arguments).await?;
240            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
241        }
242        Request::DeliverResponseSchema(req) => {
243            let value =
244                deliver::response_schema::execute_jq(executor, req, jq, agent_arguments).await?;
245            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
246        }
247        Request::Read(req) => {
248            let inner = read::execute_jq(executor, req, jq, agent_arguments).await?;
249            Box::pin(inner)
250        }
251    };
252    Ok(stream)
253}