Skip to main content

objectiveai_sdk/cli/command/agents/queue/deliver/
mod.rs

1//! `agents queue deliver` — wake every queue-pending descendant
2//! agent of the caller.
3//!
4//! The handler enumerates two kinds of targets with active queued
5//! prompts in the caller's subtree: unique AIHs that are STRICT
6//! descendants of the caller (direct rows + rows against BOUND
7//! tags), and un-upgraded (GROUPED) tags whose group parent sits in
8//! the subtree. Per target it try-acquires the agent's (or tag's)
9//! lock with no waiting: a live owner yields
10//! [`AgentActiveResponseItem`] / [`TagActiveResponseItem`]; winning
11//! the lock yields [`AgentSpawnedResponseItem`] /
12//! [`TagSpawnedResponseItem`] and runs the same spawn machinery
13//! `agents spawn` / `agents message` use (empty messages, plus the
14//! stored continuation for AIHs or the group's stored agent spec for
15//! tags), streaming each spawn item as a [`ValueResponseItem`] and
16//! releasing the lock when that task's stream ends. Once EVERY
17//! target has resolved (active or spawned), the bare string
18//! `"AllAgentsActive"` is emitted.
19//!
20//! Two modes, selected by `dangerous_advanced.stream_spawns`:
21//! * unset/false (the default, user-facing): re-exec the cli binary
22//!   as a detached orphan with `stream_spawns = true` and emit the
23//!   child's items up to and including `AllAgentsActive`, then
24//!   return — the orphan keeps running the spawns to completion.
25//! * true (the re-exec'd child): run the full delivery in-process and
26//!   stream everything, spawn output included.
27
28use crate::cli::command::CommandRequest;
29
30#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
31#[schemars(rename = "cli.command.agents.queue.deliver.Request")]
32pub struct Request {
33    pub path_type: Path,
34    pub dangerous_advanced: Option<RequestDangerousAdvanced>,
35    #[serde(flatten)]
36    pub base: crate::cli::command::RequestBase,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
40#[schemars(rename = "cli.command.agents.queue.deliver.Path")]
41pub enum Path {
42    #[serde(rename = "agents/queue/deliver")]
43    AgentsQueueDeliver,
44}
45
46#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
47#[schemars(rename = "cli.command.agents.queue.deliver.RequestDangerousAdvanced")]
48pub struct RequestDangerousAdvanced {
49    /// Run the delivery in-process and stream every spawned agent's
50    /// output to completion. Unset/false re-execs a detached child
51    /// with this set and returns at its `AllAgentsActive` marker.
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    #[schemars(extend("omitempty" = true))]
54    pub stream_spawns: Option<bool>,
55}
56
57impl CommandRequest for Request {
58    fn into_command(&self) -> Vec<String> {
59        let mut argv = vec![
60            "agents".to_string(),
61            "queue".to_string(),
62            "deliver".to_string(),
63        ];
64        if let Some(advanced) = &self.dangerous_advanced {
65            argv.push("--dangerous-advanced".to_string());
66            argv.push(
67                serde_json::to_string(advanced)
68                    .expect("RequestDangerousAdvanced serializes"),
69            );
70        }
71        self.base.push_flags(&mut argv);
72        argv
73    }
74
75    fn request_base(&self) -> &crate::cli::command::RequestBase {
76        &self.base
77    }
78
79    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
80        Some(&mut self.base)
81    }
82}
83
84/// One stream item from `agents queue deliver`. Untagged — the
85/// variants are disjoint on the wire: `Value` requires `value`,
86/// `AgentActive` / `AgentSpawned` / `TagActive` / `TagSpawned` carry
87/// distinct `type` markers, and `AllAgentsActive` is the bare string
88/// `"AllAgentsActive"`.
89#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
90#[serde(untagged)]
91#[schemars(rename = "cli.command.agents.queue.deliver.ResponseItem")]
92pub enum ResponseItem {
93    #[schemars(title = "Value")]
94    Value(ValueResponseItem),
95    #[schemars(title = "AgentActive")]
96    AgentActive(AgentActiveResponseItem),
97    #[schemars(title = "AgentSpawned")]
98    AgentSpawned(AgentSpawnedResponseItem),
99    #[schemars(title = "TagActive")]
100    TagActive(TagActiveResponseItem),
101    #[schemars(title = "TagSpawned")]
102    TagSpawned(TagSpawnedResponseItem),
103    #[schemars(title = "AllAgentsActive")]
104    AllAgentsActive(AllAgentsActive),
105}
106
107/// One output item from one delivered agent's spawn stream. `value`
108/// is the typed root [`crate::cli::command::ResponseItem`] (the spawn
109/// item wrapped at the root) — boxed because the root union
110/// transitively contains *this* type (`agents → queue → deliver`),
111/// and boxing is what makes the recursion sized.
112///
113/// The `value` field's JSON schema is opaqued to `serde_json::Value`
114/// (renders as bare `{}` aka JsonValue) so the published schema
115/// doesn't inline the entire root union — that's the TS7056 blowup
116/// the root and tier aggregates dodge by being `json_schema_ignore`.
117/// Downstream SDKs see `value: JsonValue` on the typed `execute`
118/// path; consumers that want to peer inside parse it case-by-case.
119#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
120#[schemars(rename = "cli.command.agents.queue.deliver.ValueResponseItem")]
121pub struct ValueResponseItem {
122    /// The delivered agent's `agent_instance_hierarchy`.
123    pub agent_instance_hierarchy: String,
124    /// The typed root item the spawn emitted.
125    #[schemars(with = "serde_json::Value")]
126    pub value: Box<crate::cli::command::ResponseItem>,
127}
128
129/// This agent's lock was held by a live owner — it is already active
130/// and will drain its own queue; nothing was spawned for it.
131#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
132#[schemars(rename = "cli.command.agents.queue.deliver.AgentActiveResponseItem")]
133pub struct AgentActiveResponseItem {
134    pub r#type: AgentActiveType,
135    pub agent_instance_hierarchy: String,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
139#[schemars(rename = "cli.command.agents.queue.deliver.AgentActiveType")]
140pub enum AgentActiveType {
141    #[serde(rename = "AgentActive")]
142    AgentActive,
143}
144
145/// This agent's lock was won and its spawn has started; its output
146/// follows as [`ValueResponseItem`]s (in `stream_spawns` mode).
147#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
148#[schemars(rename = "cli.command.agents.queue.deliver.AgentSpawnedResponseItem")]
149pub struct AgentSpawnedResponseItem {
150    pub r#type: AgentSpawnedType,
151    pub agent_instance_hierarchy: String,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
155#[schemars(rename = "cli.command.agents.queue.deliver.AgentSpawnedType")]
156pub enum AgentSpawnedType {
157    #[serde(rename = "AgentSpawned")]
158    AgentSpawned,
159}
160
161/// This un-upgraded tag's lock was held by a live owner — another
162/// process is already materializing it; the queued rows will reach
163/// the agent it mints. Nothing was spawned for it.
164#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
165#[schemars(rename = "cli.command.agents.queue.deliver.TagActiveResponseItem")]
166pub struct TagActiveResponseItem {
167    pub r#type: TagActiveType,
168    pub agent_tag: String,
169}
170
171#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
172#[schemars(rename = "cli.command.agents.queue.deliver.TagActiveType")]
173pub enum TagActiveType {
174    #[serde(rename = "TagActive")]
175    TagActive,
176}
177
178/// This un-upgraded tag's lock was won and a fresh spawn of the
179/// group's stored agent spec has started. The minted
180/// `agent_instance_hierarchy` isn't known yet at this point — it
181/// arrives as the FIRST inner item (the spawn `Id`) of the
182/// [`ValueResponseItem`]s that follow (in `stream_spawns` mode).
183#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
184#[schemars(rename = "cli.command.agents.queue.deliver.TagSpawnedResponseItem")]
185pub struct TagSpawnedResponseItem {
186    pub r#type: TagSpawnedType,
187    pub agent_tag: String,
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
191#[schemars(rename = "cli.command.agents.queue.deliver.TagSpawnedType")]
192pub enum TagSpawnedType {
193    #[serde(rename = "TagSpawned")]
194    TagSpawned,
195}
196
197/// Every target has resolved to active-or-spawned. Wire shape is the
198/// bare string `"AllAgentsActive"` (a one-variant enum — a unit
199/// variant in the untagged [`ResponseItem`] would serialize as
200/// `null`, not the marker string). The detached default mode stops
201/// reading its child at this item.
202#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
203#[schemars(rename = "cli.command.agents.queue.deliver.AllAgentsActive")]
204pub enum AllAgentsActive {
205    AllAgentsActive,
206}
207
208#[derive(clap::Args)]
209pub struct Args {
210    /// Raw JSON for `RequestDangerousAdvanced` (e.g.
211    /// `{"stream_spawns":true}`).
212    #[arg(long)]
213    pub dangerous_advanced: Option<String>,
214    #[command(flatten)]
215    pub base: crate::cli::command::RequestBaseArgs,
216}
217
218#[derive(clap::Args)]
219#[command(args_conflicts_with_subcommands = true)]
220pub struct Command {
221    #[command(flatten)]
222    pub args: Args,
223    #[command(subcommand)]
224    pub schema: Option<Schema>,
225}
226
227#[derive(clap::Subcommand)]
228pub enum Schema {
229    /// Emit the JSON Schema for this leaf's `Request` type and exit.
230    RequestSchema(request_schema::Args),
231    /// Emit the JSON Schema for this leaf's `Response` type and exit.
232    ResponseSchema(response_schema::Args),
233}
234
235impl TryFrom<Args> for Request {
236    type Error = crate::cli::command::FromArgsError;
237    fn try_from(args: Args) -> Result<Self, Self::Error> {
238        let dangerous_advanced: Option<RequestDangerousAdvanced> =
239            if let Some(s) = args.dangerous_advanced {
240                let mut de = serde_json::Deserializer::from_str(&s);
241                let v = serde_path_to_error::deserialize(&mut de).map_err(|source| {
242                    crate::cli::command::FromArgsError {
243                        field: "dangerous_advanced",
244                        source: source.into(),
245                    }
246                })?;
247                Some(v)
248            } else {
249                None
250            };
251        Ok(Self {
252            path_type: Path::AgentsQueueDeliver,
253            dangerous_advanced,
254            base: args.base.into(),
255        })
256    }
257}
258
259#[cfg(feature = "cli-executor")]
260pub async fn execute<E: crate::cli::command::CommandExecutor>(
261    executor: &E,
262    mut request: Request,
263    agent_arguments: Option<&crate::cli::command::AgentArguments>,
264) -> Result<E::Stream<ResponseItem>, E::Error> {
265    request.base.clear_transform();
266    executor.execute(request, agent_arguments).await
267}
268
269#[cfg(feature = "cli-executor")]
270pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
271    executor: &E,
272    mut request: Request,
273    transform: crate::cli::command::Transform,
274    agent_arguments: Option<&crate::cli::command::AgentArguments>,
275) -> Result<E::Stream<serde_json::Value>, E::Error> {
276    request.base.set_transform(transform);
277    executor.execute(request, agent_arguments).await
278}
279
280#[cfg(feature = "mcp")]
281impl crate::cli::command::CommandResponse for ResponseItem {
282    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
283        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
284    }
285}
286
287pub mod request_schema;
288
289pub mod response_schema;