Skip to main content

objectiveai_sdk/cli/command/agents/queue/
mod.rs

1//! `agents queue` — deferred prompts queue. Top-level subcommands:
2//!
3//! - `open --id <id>` — fetch one piece of queued content by its
4//!   `prompt_contents.id`. The wire shape mirrors `RichContentPart`
5//!   (tagged by `type`).
6//! - `list --pending` — stream the queued prompts pending delivery
7//!   under the resolved targets.
8//! - `delete --id <id>` — remove one queued prompt by id.
9//! - `deliver` — wake every queue-pending strict descendant of the
10//!   caller (try-lock each AIH; spawn the idle ones with empty
11//!   messages so they drain their own queues).
12//!
13//! Enqueue is no longer a CLI verb here — use `agents message`
14//! instead; it handles persistence under the hood.
15
16use crate::cli::command::CommandRequest;
17
18pub mod delete;
19pub mod deliver;
20pub mod list;
21pub mod open;
22
23#[derive(clap::Subcommand)]
24pub enum Command {
25    /// Delete one queued prompt by id.
26    Delete(delete::Command),
27    /// Wake every queue-pending descendant agent of the caller.
28    Deliver(deliver::Command),
29    /// List queued prompts pending delivery under the targets.
30    List(list::Command),
31    /// Fetch one piece of queued content by `prompt_contents.id`.
32    Open(open::Command),
33}
34
35#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
36#[serde(untagged)]
37#[schemars(rename = "cli.command.agents.queue.Request")]
38pub enum Request {
39    #[schemars(title = "Delete")]
40    Delete(delete::Request),
41    #[schemars(title = "DeleteRequestSchema")]
42    DeleteRequestSchema(delete::request_schema::Request),
43    #[schemars(title = "DeleteResponseSchema")]
44    DeleteResponseSchema(delete::response_schema::Request),
45    #[schemars(title = "Deliver")]
46    Deliver(deliver::Request),
47    #[schemars(title = "DeliverRequestSchema")]
48    DeliverRequestSchema(deliver::request_schema::Request),
49    #[schemars(title = "DeliverResponseSchema")]
50    DeliverResponseSchema(deliver::response_schema::Request),
51    #[schemars(title = "List")]
52    List(list::Request),
53    #[schemars(title = "ListRequestSchema")]
54    ListRequestSchema(list::request_schema::Request),
55    #[schemars(title = "ListResponseSchema")]
56    ListResponseSchema(list::response_schema::Request),
57    #[schemars(title = "Open")]
58    Open(open::Request),
59    #[schemars(title = "OpenRequestSchema")]
60    OpenRequestSchema(open::request_schema::Request),
61    #[schemars(title = "OpenResponseSchema")]
62    OpenResponseSchema(open::response_schema::Request),
63}
64
65// Exempt from json-schema coverage: tier aggregate (see the root
66// `ResponseItem` in command.rs - TS7056).
67#[objectiveai_sdk_macros::json_schema_ignore]
68#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
69#[schemars(rename = "cli.command.agents.queue.ResponseItem")]
70#[serde(untagged)]
71pub enum ResponseItem {
72    #[schemars(title = "Delete")]
73    Delete(delete::Response),
74    #[schemars(title = "DeleteRequestSchema")]
75    DeleteRequestSchema(delete::request_schema::Response),
76    #[schemars(title = "DeleteResponseSchema")]
77    DeleteResponseSchema(delete::response_schema::Response),
78    #[schemars(title = "Deliver")]
79    Deliver(deliver::ResponseItem),
80    #[schemars(title = "DeliverRequestSchema")]
81    DeliverRequestSchema(deliver::request_schema::Response),
82    #[schemars(title = "DeliverResponseSchema")]
83    DeliverResponseSchema(deliver::response_schema::Response),
84    #[schemars(title = "List")]
85    List(list::ResponseItem),
86    #[schemars(title = "ListRequestSchema")]
87    ListRequestSchema(list::request_schema::Response),
88    #[schemars(title = "ListResponseSchema")]
89    ListResponseSchema(list::response_schema::Response),
90    #[schemars(title = "Open")]
91    Open(open::Response),
92    #[schemars(title = "OpenRequestSchema")]
93    OpenRequestSchema(open::request_schema::Response),
94    #[schemars(title = "OpenResponseSchema")]
95    OpenResponseSchema(open::response_schema::Response),
96}
97
98#[cfg(feature = "mcp")]
99impl crate::cli::command::CommandResponse for ResponseItem {
100    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
101        match self {
102            ResponseItem::Delete(v) => v.into_mcp(),
103            ResponseItem::DeleteRequestSchema(v) => v.into_mcp(),
104            ResponseItem::DeleteResponseSchema(v) => v.into_mcp(),
105            ResponseItem::Deliver(v) => v.into_mcp(),
106            ResponseItem::DeliverRequestSchema(v) => v.into_mcp(),
107            ResponseItem::DeliverResponseSchema(v) => v.into_mcp(),
108            ResponseItem::List(v) => v.into_mcp(),
109            ResponseItem::ListRequestSchema(v) => v.into_mcp(),
110            ResponseItem::ListResponseSchema(v) => v.into_mcp(),
111            ResponseItem::Open(v) => v.into_mcp(),
112            ResponseItem::OpenRequestSchema(v) => v.into_mcp(),
113            ResponseItem::OpenResponseSchema(v) => v.into_mcp(),
114        }
115    }
116}
117
118impl TryFrom<Command> for Request {
119    type Error = crate::cli::command::FromArgsError;
120    fn try_from(command: Command) -> Result<Self, Self::Error> {
121        match command {
122            Command::Delete(cmd) => match cmd.schema {
123                None => Ok(Request::Delete(delete::Request::try_from(cmd.args)?)),
124                Some(delete::Schema::RequestSchema(args)) => Ok(
125                    Request::DeleteRequestSchema(delete::request_schema::Request::try_from(args)?),
126                ),
127                Some(delete::Schema::ResponseSchema(args)) => Ok(
128                    Request::DeleteResponseSchema(delete::response_schema::Request::try_from(args)?),
129                ),
130            },
131            Command::Deliver(cmd) => match cmd.schema {
132                None => Ok(Request::Deliver(deliver::Request::try_from(cmd.args)?)),
133                Some(deliver::Schema::RequestSchema(args)) => Ok(
134                    Request::DeliverRequestSchema(deliver::request_schema::Request::try_from(args)?),
135                ),
136                Some(deliver::Schema::ResponseSchema(args)) => Ok(
137                    Request::DeliverResponseSchema(deliver::response_schema::Request::try_from(args)?),
138                ),
139            },
140            Command::List(cmd) => match cmd.schema {
141                None => Ok(Request::List(list::Request::try_from(cmd.args)?)),
142                Some(list::Schema::RequestSchema(args)) => Ok(
143                    Request::ListRequestSchema(list::request_schema::Request::try_from(args)?),
144                ),
145                Some(list::Schema::ResponseSchema(args)) => Ok(
146                    Request::ListResponseSchema(list::response_schema::Request::try_from(args)?),
147                ),
148            },
149            Command::Open(cmd) => match cmd.schema {
150                None => Ok(Request::Open(open::Request::try_from(cmd.args)?)),
151                Some(open::Schema::RequestSchema(args)) => Ok(
152                    Request::OpenRequestSchema(open::request_schema::Request::try_from(args)?),
153                ),
154                Some(open::Schema::ResponseSchema(args)) => Ok(
155                    Request::OpenResponseSchema(open::response_schema::Request::try_from(args)?),
156                ),
157            },
158        }
159    }
160}
161
162impl CommandRequest for Request {
163    fn request_base(&self) -> &crate::cli::command::RequestBase {
164        match self {
165            Request::Delete(inner) => inner.request_base(),
166            Request::DeleteRequestSchema(inner) => inner.request_base(),
167            Request::DeleteResponseSchema(inner) => inner.request_base(),
168            Request::Deliver(inner) => inner.request_base(),
169            Request::DeliverRequestSchema(inner) => inner.request_base(),
170            Request::DeliverResponseSchema(inner) => inner.request_base(),
171            Request::List(inner) => inner.request_base(),
172            Request::ListRequestSchema(inner) => inner.request_base(),
173            Request::ListResponseSchema(inner) => inner.request_base(),
174            Request::Open(inner) => inner.request_base(),
175            Request::OpenRequestSchema(inner) => inner.request_base(),
176            Request::OpenResponseSchema(inner) => inner.request_base(),
177        }
178    }
179
180    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
181        match self {
182            Request::Delete(inner) => inner.request_base_mut(),
183            Request::DeleteRequestSchema(inner) => inner.request_base_mut(),
184            Request::DeleteResponseSchema(inner) => inner.request_base_mut(),
185            Request::Deliver(inner) => inner.request_base_mut(),
186            Request::DeliverRequestSchema(inner) => inner.request_base_mut(),
187            Request::DeliverResponseSchema(inner) => inner.request_base_mut(),
188            Request::List(inner) => inner.request_base_mut(),
189            Request::ListRequestSchema(inner) => inner.request_base_mut(),
190            Request::ListResponseSchema(inner) => inner.request_base_mut(),
191            Request::Open(inner) => inner.request_base_mut(),
192            Request::OpenRequestSchema(inner) => inner.request_base_mut(),
193            Request::OpenResponseSchema(inner) => inner.request_base_mut(),
194        }
195    }
196}
197
198#[cfg(feature = "cli-executor")]
199pub async fn execute<E: crate::cli::command::CommandExecutor>(
200    executor: &E,
201    request: Request,
202    agent_arguments: Option<&crate::cli::command::AgentArguments>,
203) -> Result<
204    std::pin::Pin<Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>>,
205    E::Error,
206> {
207    use futures::StreamExt;
208    let stream: std::pin::Pin<
209        Box<dyn futures::Stream<Item = Result<ResponseItem, E::Error>> + Send>,
210    > = match request {
211        Request::Delete(req) => {
212            let value = delete::execute(executor, req, agent_arguments).await?;
213            Box::pin(crate::cli::command::StreamOnce::new(Ok(
214                ResponseItem::Delete(value),
215            )))
216        }
217        Request::DeleteRequestSchema(req) => {
218            let value =
219                delete::request_schema::execute(executor, req, agent_arguments).await?;
220            Box::pin(crate::cli::command::StreamOnce::new(Ok(
221                ResponseItem::DeleteRequestSchema(value),
222            )))
223        }
224        Request::DeleteResponseSchema(req) => {
225            let value =
226                delete::response_schema::execute(executor, req, agent_arguments).await?;
227            Box::pin(crate::cli::command::StreamOnce::new(Ok(
228                ResponseItem::DeleteResponseSchema(value),
229            )))
230        }
231        Request::Deliver(req) => {
232            let inner = deliver::execute(executor, req, agent_arguments).await?;
233            Box::pin(inner.map(|r| r.map(ResponseItem::Deliver)))
234        }
235        Request::DeliverRequestSchema(req) => {
236            let value =
237                deliver::request_schema::execute(executor, req, agent_arguments).await?;
238            Box::pin(crate::cli::command::StreamOnce::new(Ok(
239                ResponseItem::DeliverRequestSchema(value),
240            )))
241        }
242        Request::DeliverResponseSchema(req) => {
243            let value =
244                deliver::response_schema::execute(executor, req, agent_arguments).await?;
245            Box::pin(crate::cli::command::StreamOnce::new(Ok(
246                ResponseItem::DeliverResponseSchema(value),
247            )))
248        }
249        Request::List(req) => {
250            let inner = list::execute(executor, req, agent_arguments).await?;
251            Box::pin(inner.map(|r| r.map(ResponseItem::List)))
252        }
253        Request::ListRequestSchema(req) => {
254            let value =
255                list::request_schema::execute(executor, req, agent_arguments).await?;
256            Box::pin(crate::cli::command::StreamOnce::new(Ok(
257                ResponseItem::ListRequestSchema(value),
258            )))
259        }
260        Request::ListResponseSchema(req) => {
261            let value =
262                list::response_schema::execute(executor, req, agent_arguments).await?;
263            Box::pin(crate::cli::command::StreamOnce::new(Ok(
264                ResponseItem::ListResponseSchema(value),
265            )))
266        }
267        Request::Open(req) => {
268            let value = open::execute(executor, req, agent_arguments).await?;
269            Box::pin(crate::cli::command::StreamOnce::new(Ok(
270                ResponseItem::Open(value),
271            )))
272        }
273        Request::OpenRequestSchema(req) => {
274            let value =
275                open::request_schema::execute(executor, req, agent_arguments).await?;
276            Box::pin(crate::cli::command::StreamOnce::new(Ok(
277                ResponseItem::OpenRequestSchema(value),
278            )))
279        }
280        Request::OpenResponseSchema(req) => {
281            let value =
282                open::response_schema::execute(executor, req, agent_arguments).await?;
283            Box::pin(crate::cli::command::StreamOnce::new(Ok(
284                ResponseItem::OpenResponseSchema(value),
285            )))
286        }
287    };
288    Ok(stream)
289}
290
291#[cfg(feature = "cli-executor")]
292pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
293    executor: &E,
294    request: Request,
295    transform: crate::cli::command::Transform,
296    agent_arguments: Option<&crate::cli::command::AgentArguments>,
297) -> Result<
298    std::pin::Pin<Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>>,
299    E::Error,
300> {
301    let stream: std::pin::Pin<
302        Box<dyn futures::Stream<Item = Result<serde_json::Value, E::Error>> + Send>,
303    > = match request {
304        Request::Delete(req) => {
305            let value = delete::execute_transform(executor, req, transform, agent_arguments).await?;
306            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
307        }
308        Request::DeleteRequestSchema(req) => {
309            let value =
310                delete::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
311            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
312        }
313        Request::DeleteResponseSchema(req) => {
314            let value =
315                delete::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
316            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
317        }
318        Request::Deliver(req) => {
319            let inner = deliver::execute_transform(executor, req, transform, agent_arguments).await?;
320            Box::pin(inner)
321        }
322        Request::DeliverRequestSchema(req) => {
323            let value =
324                deliver::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
325            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
326        }
327        Request::DeliverResponseSchema(req) => {
328            let value =
329                deliver::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
330            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
331        }
332        Request::List(req) => {
333            let inner = list::execute_transform(executor, req, transform, agent_arguments).await?;
334            Box::pin(inner)
335        }
336        Request::ListRequestSchema(req) => {
337            let value =
338                list::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
339            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
340        }
341        Request::ListResponseSchema(req) => {
342            let value =
343                list::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
344            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
345        }
346        Request::Open(req) => {
347            let value = open::execute_transform(executor, req, transform, agent_arguments).await?;
348            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
349        }
350        Request::OpenRequestSchema(req) => {
351            let value =
352                open::request_schema::execute_transform(executor, req, transform, agent_arguments).await?;
353            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
354        }
355        Request::OpenResponseSchema(req) => {
356            let value =
357                open::response_schema::execute_transform(executor, req, transform, agent_arguments).await?;
358            Box::pin(crate::cli::command::StreamOnce::new(Ok(value)))
359        }
360    };
361    Ok(stream)
362}