Skip to main content

objectiveai_sdk/cli/command/agents/queue/
mod.rs

1//! `agents queue` — deferred prompts queue. Three top-level
2//! subcommands:
3//!
4//! - `delete` — remove one queued prompt by id.
5//! - `deliver` — wake every queue-pending strict descendant of the
6//!   caller (try-lock each AIH; spawn the idle ones with empty
7//!   messages so they drain their own queues).
8//! - `read` (nested) — sub-tier whose only leaf today is `id`,
9//!   which fetches one piece of queued content by its
10//!   `prompt_contents.id`. The wire shape mirrors `RichContentPart`
11//!   (tagged by `type`).
12//!
13//! Enqueue is no longer a CLI verb here — use `agents message`
14//! instead; it handles persistence under the hood. Drain is also
15//! gone — the API consumes queue rows directly via the WS reverse-
16//! attach `read_message_queue` / `clear_message_queue` server
17//! requests once a matching hierarchy comes online.
18
19use crate::cli::command::CommandRequest;
20
21pub mod delete;
22pub mod deliver;
23pub mod read;
24
25#[derive(clap::Subcommand)]
26pub enum Command {
27    /// Delete one queued prompt by id.
28    Delete(delete::Command),
29    /// Wake every queue-pending descendant agent of the caller.
30    Deliver(deliver::Command),
31    /// Read queued content — `read id <id>` for a single content
32    /// piece, `read pending [parent]` for the list of queued
33    /// prompts under a parent.
34    Read(ReadCommand),
35}
36
37/// Intermediate clap level for the `read` sub-tier. Splitting it
38/// into its own wrapper (rather than a fattened `ReadId` variant on
39/// [`Command`]) gives the CLI surface `agents queue read id <num>`
40/// to match the user's invocation style and keeps the door open for
41/// additional `read <…>` leaves later.
42#[derive(clap::Args)]
43pub struct ReadCommand {
44    #[command(subcommand)]
45    pub sub: read::Command,
46}
47
48#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
49#[serde(untagged)]
50#[schemars(rename = "cli.command.agents.queue.Request")]
51pub enum Request {
52    #[schemars(title = "Delete")]
53    Delete(delete::Request),
54    #[schemars(title = "DeleteRequestSchema")]
55    DeleteRequestSchema(delete::request_schema::Request),
56    #[schemars(title = "DeleteResponseSchema")]
57    DeleteResponseSchema(delete::response_schema::Request),
58    #[schemars(title = "Deliver")]
59    Deliver(deliver::Request),
60    #[schemars(title = "DeliverRequestSchema")]
61    DeliverRequestSchema(deliver::request_schema::Request),
62    #[schemars(title = "DeliverResponseSchema")]
63    DeliverResponseSchema(deliver::response_schema::Request),
64    #[schemars(title = "Read")]
65    Read(read::Request),
66}
67
68// Exempt from json-schema coverage: tier aggregate (see the root
69// `ResponseItem` in command.rs - TS7056).
70#[objectiveai_sdk_macros::json_schema_ignore]
71#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
72#[schemars(rename = "cli.command.agents.queue.ResponseItem")]
73#[serde(untagged)]
74pub enum ResponseItem {
75    #[schemars(title = "Delete")]
76    Delete(delete::Response),
77    #[schemars(title = "DeleteRequestSchema")]
78    DeleteRequestSchema(delete::request_schema::Response),
79    #[schemars(title = "DeleteResponseSchema")]
80    DeleteResponseSchema(delete::response_schema::Response),
81    #[schemars(title = "Deliver")]
82    Deliver(deliver::ResponseItem),
83    #[schemars(title = "DeliverRequestSchema")]
84    DeliverRequestSchema(deliver::request_schema::Response),
85    #[schemars(title = "DeliverResponseSchema")]
86    DeliverResponseSchema(deliver::response_schema::Response),
87    #[schemars(title = "Read")]
88    Read(read::ResponseItem),
89}
90
91#[cfg(feature = "mcp")]
92impl crate::cli::command::CommandResponse for ResponseItem {
93    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
94        match self {
95            ResponseItem::Delete(v) => v.into_mcp(),
96            ResponseItem::DeleteRequestSchema(v) => v.into_mcp(),
97            ResponseItem::DeleteResponseSchema(v) => v.into_mcp(),
98            ResponseItem::Deliver(v) => v.into_mcp(),
99            ResponseItem::DeliverRequestSchema(v) => v.into_mcp(),
100            ResponseItem::DeliverResponseSchema(v) => v.into_mcp(),
101            ResponseItem::Read(v) => v.into_mcp(),
102        }
103    }
104}
105
106impl TryFrom<Command> for Request {
107    type Error = crate::cli::command::FromArgsError;
108    fn try_from(command: Command) -> Result<Self, Self::Error> {
109        match command {
110            Command::Delete(cmd) => match cmd.schema {
111                None => Ok(Request::Delete(delete::Request::try_from(cmd.args)?)),
112                Some(delete::Schema::RequestSchema(args)) => Ok(
113                    Request::DeleteRequestSchema(delete::request_schema::Request::try_from(args)?),
114                ),
115                Some(delete::Schema::ResponseSchema(args)) => Ok(
116                    Request::DeleteResponseSchema(delete::response_schema::Request::try_from(args)?),
117                ),
118            },
119            Command::Deliver(cmd) => match cmd.schema {
120                None => Ok(Request::Deliver(deliver::Request::try_from(cmd.args)?)),
121                Some(deliver::Schema::RequestSchema(args)) => Ok(
122                    Request::DeliverRequestSchema(deliver::request_schema::Request::try_from(args)?),
123                ),
124                Some(deliver::Schema::ResponseSchema(args)) => Ok(
125                    Request::DeliverResponseSchema(deliver::response_schema::Request::try_from(args)?),
126                ),
127            },
128            Command::Read(rc) => Ok(Request::Read(read::Request::try_from(rc.sub)?)),
129        }
130    }
131}
132
133impl CommandRequest for Request {
134    fn into_command(&self) -> Vec<String> {
135        match self {
136            Request::Delete(inner) => inner.into_command(),
137            Request::DeleteRequestSchema(inner) => inner.into_command(),
138            Request::DeleteResponseSchema(inner) => inner.into_command(),
139            Request::Deliver(inner) => inner.into_command(),
140            Request::DeliverRequestSchema(inner) => inner.into_command(),
141            Request::DeliverResponseSchema(inner) => inner.into_command(),
142            Request::Read(inner) => inner.into_command(),
143        }
144    }
145
146    fn request_base(&self) -> &crate::cli::command::RequestBase {
147        match self {
148            Request::Delete(inner) => inner.request_base(),
149            Request::DeleteRequestSchema(inner) => inner.request_base(),
150            Request::DeleteResponseSchema(inner) => inner.request_base(),
151            Request::Deliver(inner) => inner.request_base(),
152            Request::DeliverRequestSchema(inner) => inner.request_base(),
153            Request::DeliverResponseSchema(inner) => inner.request_base(),
154            Request::Read(inner) => inner.request_base(),
155        }
156    }
157
158    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
159        match self {
160            Request::Delete(inner) => inner.request_base_mut(),
161            Request::DeleteRequestSchema(inner) => inner.request_base_mut(),
162            Request::DeleteResponseSchema(inner) => inner.request_base_mut(),
163            Request::Deliver(inner) => inner.request_base_mut(),
164            Request::DeliverRequestSchema(inner) => inner.request_base_mut(),
165            Request::DeliverResponseSchema(inner) => inner.request_base_mut(),
166            Request::Read(inner) => inner.request_base_mut(),
167        }
168    }
169}
170
171#[cfg(feature = "cli-executor")]
172pub async fn execute<E: crate::cli::command::CommandExecutor>(
173    executor: &E,
174    request: Request,
175    agent_arguments: Option<&crate::cli::command::AgentArguments>,
176) -> Result<
177    std::pin::Pin<Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>>,
178    E::Error,
179> {
180    use futures::StreamExt;
181    let stream: std::pin::Pin<
182        Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>,
183    > = match request {
184        Request::Delete(req) => {
185            let value = delete::execute(executor, req, agent_arguments).await?;
186            Box::pin(crate::cli::command::StreamOnce::new(Ok(
187                ResponseItem::Delete(value),
188            )))
189        }
190        Request::DeleteRequestSchema(req) => {
191            let value =
192                delete::request_schema::execute(executor, req, agent_arguments).await?;
193            Box::pin(crate::cli::command::StreamOnce::new(Ok(
194                ResponseItem::DeleteRequestSchema(value),
195            )))
196        }
197        Request::DeleteResponseSchema(req) => {
198            let value =
199                delete::response_schema::execute(executor, req, agent_arguments).await?;
200            Box::pin(crate::cli::command::StreamOnce::new(Ok(
201                ResponseItem::DeleteResponseSchema(value),
202            )))
203        }
204        Request::Deliver(req) => {
205            let inner = deliver::execute(executor, req, agent_arguments).await?;
206            Box::pin(inner.map(|r| r.map(ResponseItem::Deliver)))
207        }
208        Request::DeliverRequestSchema(req) => {
209            let value =
210                deliver::request_schema::execute(executor, req, agent_arguments).await?;
211            Box::pin(crate::cli::command::StreamOnce::new(Ok(
212                ResponseItem::DeliverRequestSchema(value),
213            )))
214        }
215        Request::DeliverResponseSchema(req) => {
216            let value =
217                deliver::response_schema::execute(executor, req, agent_arguments).await?;
218            Box::pin(crate::cli::command::StreamOnce::new(Ok(
219                ResponseItem::DeliverResponseSchema(value),
220            )))
221        }
222        Request::Read(req) => {
223            let inner = read::execute(executor, req, agent_arguments).await?;
224            Box::pin(inner.map(|r| r.map(ResponseItem::Read)))
225        }
226    };
227    Ok(stream)
228}
229
230#[cfg(feature = "cli-executor")]
231pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
232    executor: &E,
233    request: Request,
234    transform: crate::cli::command::Transform,
235    agent_arguments: Option<&crate::cli::command::AgentArguments>,
236) -> Result<
237    std::pin::Pin<Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>>,
238    E::Error,
239> {
240    let stream: std::pin::Pin<
241        Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>,
242    > = match request {
243        Request::Delete(req) => {
244            let value = delete::execute_transform(executor, req, transform, agent_arguments).await?;
245            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
246        }
247        Request::DeleteRequestSchema(req) => {
248            let value =
249                delete::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
250            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
251        }
252        Request::DeleteResponseSchema(req) => {
253            let value =
254                delete::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
255            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
256        }
257        Request::Deliver(req) => {
258            let inner = deliver::execute_transform(executor, req, transform, agent_arguments).await?;
259            Box::pin(inner)
260        }
261        Request::DeliverRequestSchema(req) => {
262            let value =
263                deliver::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
264            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
265        }
266        Request::DeliverResponseSchema(req) => {
267            let value =
268                deliver::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
269            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
270        }
271        Request::Read(req) => {
272            let inner = read::execute_transform(executor, req, transform, agent_arguments).await?;
273            Box::pin(inner)
274        }
275    };
276    Ok(stream)
277}