Skip to main content

objectiveai_sdk/cli/command/agents/queue/deliver/
mod.rs

1//! `agents queue deliver` — wake every queue-pending descendant
2//! agent of the caller.
3//!
4//! The handler enumerates two kinds of targets with active queued
5//! prompts in the caller's subtree: unique AIHs that are STRICT
6//! descendants of the caller (direct rows + rows against BOUND
7//! tags), and un-upgraded (GROUPED) tags whose group parent sits in
8//! the subtree. Per target it try-acquires the agent's (or tag's)
9//! lock with no waiting: a live owner yields
10//! [`AgentActiveResponseItem`] / [`TagActiveResponseItem`]; winning
11//! the lock yields [`AgentSpawnedResponseItem`] /
12//! [`TagSpawnedResponseItem`] and runs the same spawn machinery
13//! `agents spawn` / `agents message` use (empty messages, plus the
14//! stored continuation for AIHs or the group's stored agent spec for
15//! tags), streaming each spawn item as a [`ValueResponseItem`] and
16//! releasing the lock when that task's stream ends. Once EVERY
17//! target has resolved (active or spawned), the bare string
18//! `"AllAgentsActive"` is emitted.
19//!
20//! Two modes, selected by `dangerous_advanced.stream_spawns`:
21//! * unset/false (the default, user-facing): re-exec the cli binary
22//!   as a detached orphan with `stream_spawns = true` and emit the
23//!   child's items up to and including `AllAgentsActive`, then
24//!   return — the orphan keeps running the spawns to completion.
25//! * true (the re-exec'd child): run the full delivery in-process and
26//!   stream everything, spawn output included.
27
28use crate::cli::command::CommandRequest;
29
30#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
31#[schemars(rename = "cli.command.agents.queue.deliver.Request")]
32pub struct Request {
33    pub path_type: Path,
34    pub dangerous_advanced: Option<RequestDangerousAdvanced>,
35    #[serde(flatten)]
36    pub base: crate::cli::command::RequestBase,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
40#[schemars(rename = "cli.command.agents.queue.deliver.Path")]
41pub enum Path {
42    #[serde(rename = "agents/queue/deliver")]
43    AgentsQueueDeliver,
44}
45
46#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
47#[schemars(rename = "cli.command.agents.queue.deliver.RequestDangerousAdvanced")]
48pub struct RequestDangerousAdvanced {
49    /// Run the delivery in-process and stream every spawned agent's
50    /// output to completion. Unset/false re-execs a detached child
51    /// with this set and returns at its `AllAgentsActive` marker.
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    #[schemars(extend("omitempty" = true))]
54    pub stream_spawns: Option<bool>,
55}
56
57impl CommandRequest for Request {
58    fn request_base(&self) -> &crate::cli::command::RequestBase {
59        &self.base
60    }
61
62    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
63        Some(&mut self.base)
64    }
65}
66
67/// One stream item from `agents queue deliver`. Untagged — the
68/// variants are disjoint on the wire: `Value` requires `value`,
69/// `AgentActive` / `AgentSpawned` / `TagActive` / `TagSpawned` carry
70/// distinct `type` markers, and `AllAgentsActive` is the bare string
71/// `"AllAgentsActive"`.
72#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
73#[serde(untagged)]
74#[schemars(rename = "cli.command.agents.queue.deliver.ResponseItem")]
75pub enum ResponseItem {
76    #[schemars(title = "Value")]
77    Value(ValueResponseItem),
78    #[schemars(title = "AgentActive")]
79    AgentActive(AgentActiveResponseItem),
80    #[schemars(title = "AgentSpawned")]
81    AgentSpawned(AgentSpawnedResponseItem),
82    #[schemars(title = "TagActive")]
83    TagActive(TagActiveResponseItem),
84    #[schemars(title = "TagSpawned")]
85    TagSpawned(TagSpawnedResponseItem),
86    #[schemars(title = "AllAgentsActive")]
87    AllAgentsActive(AllAgentsActive),
88}
89
90/// One output item from one delivered agent's spawn stream. `value`
91/// is the typed root [`crate::cli::command::ResponseItem`] (the spawn
92/// item wrapped at the root) — boxed because the root union
93/// transitively contains *this* type (`agents → queue → deliver`),
94/// and boxing is what makes the recursion sized.
95///
96/// The `value` field's JSON schema is opaqued to `serde_json::Value`
97/// (renders as bare `{}` aka JsonValue) so the published schema
98/// doesn't inline the entire root union — that's the TS7056 blowup
99/// the root and tier aggregates dodge by being `json_schema_ignore`.
100/// Downstream SDKs see `value: JsonValue` on the typed `execute`
101/// path; consumers that want to peer inside parse it case-by-case.
102#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
103#[schemars(rename = "cli.command.agents.queue.deliver.ValueResponseItem")]
104pub struct ValueResponseItem {
105    /// The delivered agent's `agent_instance_hierarchy`.
106    pub agent_instance_hierarchy: String,
107    /// The typed root item the spawn emitted.
108    #[schemars(with = "serde_json::Value")]
109    pub value: Box<crate::cli::command::ResponseItem>,
110}
111
112/// This agent's lock was held by a live owner — it is already active
113/// and will drain its own queue; nothing was spawned for it.
114#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
115#[schemars(rename = "cli.command.agents.queue.deliver.AgentActiveResponseItem")]
116pub struct AgentActiveResponseItem {
117    pub r#type: AgentActiveType,
118    pub agent_instance_hierarchy: String,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
122#[schemars(rename = "cli.command.agents.queue.deliver.AgentActiveType")]
123pub enum AgentActiveType {
124    #[serde(rename = "AgentActive")]
125    AgentActive,
126}
127
128/// This agent's lock was won and its spawn has started; its output
129/// follows as [`ValueResponseItem`]s (in `stream_spawns` mode).
130#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
131#[schemars(rename = "cli.command.agents.queue.deliver.AgentSpawnedResponseItem")]
132pub struct AgentSpawnedResponseItem {
133    pub r#type: AgentSpawnedType,
134    pub agent_instance_hierarchy: String,
135}
136
137#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
138#[schemars(rename = "cli.command.agents.queue.deliver.AgentSpawnedType")]
139pub enum AgentSpawnedType {
140    #[serde(rename = "AgentSpawned")]
141    AgentSpawned,
142}
143
144/// This un-upgraded tag's lock was held by a live owner — another
145/// process is already materializing it; the queued rows will reach
146/// the agent it mints. Nothing was spawned for it.
147#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
148#[schemars(rename = "cli.command.agents.queue.deliver.TagActiveResponseItem")]
149pub struct TagActiveResponseItem {
150    pub r#type: TagActiveType,
151    pub agent_tag: String,
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
155#[schemars(rename = "cli.command.agents.queue.deliver.TagActiveType")]
156pub enum TagActiveType {
157    #[serde(rename = "TagActive")]
158    TagActive,
159}
160
161/// This un-upgraded tag's lock was won and a fresh spawn of the
162/// group's stored agent spec has started. The minted
163/// `agent_instance_hierarchy` isn't known yet at this point — it
164/// arrives as the FIRST inner item (the spawn `Id`) of the
165/// [`ValueResponseItem`]s that follow (in `stream_spawns` mode).
166#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
167#[schemars(rename = "cli.command.agents.queue.deliver.TagSpawnedResponseItem")]
168pub struct TagSpawnedResponseItem {
169    pub r#type: TagSpawnedType,
170    pub agent_tag: String,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
174#[schemars(rename = "cli.command.agents.queue.deliver.TagSpawnedType")]
175pub enum TagSpawnedType {
176    #[serde(rename = "TagSpawned")]
177    TagSpawned,
178}
179
180/// Every target has resolved to active-or-spawned. Wire shape is the
181/// bare string `"AllAgentsActive"` (a one-variant enum — a unit
182/// variant in the untagged [`ResponseItem`] would serialize as
183/// `null`, not the marker string). The detached default mode stops
184/// reading its child at this item.
185#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
186#[schemars(rename = "cli.command.agents.queue.deliver.AllAgentsActive")]
187pub enum AllAgentsActive {
188    AllAgentsActive,
189}
190
191#[derive(clap::Args)]
192pub struct Args {
193    /// Raw JSON for `RequestDangerousAdvanced` (e.g.
194    /// `{"stream_spawns":true}`).
195    #[arg(long)]
196    pub dangerous_advanced: Option<String>,
197    #[command(flatten)]
198    pub base: crate::cli::command::RequestBaseArgs,
199}
200
201#[derive(clap::Args)]
202#[command(args_conflicts_with_subcommands = true)]
203pub struct Command {
204    #[command(flatten)]
205    pub args: Args,
206    #[command(subcommand)]
207    pub schema: Option<Schema>,
208}
209
210#[derive(clap::Subcommand)]
211pub enum Schema {
212    /// Emit the JSON Schema for this leaf's `Request` type and exit.
213    RequestSchema(request_schema::Args),
214    /// Emit the JSON Schema for this leaf's `Response` type and exit.
215    ResponseSchema(response_schema::Args),
216}
217
218impl TryFrom<Args> for Request {
219    type Error = crate::cli::command::FromArgsError;
220    fn try_from(args: Args) -> Result<Self, Self::Error> {
221        let dangerous_advanced: Option<RequestDangerousAdvanced> =
222            if let Some(s) = args.dangerous_advanced {
223                let mut de = serde_json::Deserializer::from_str(&s);
224                let v = serde_path_to_error::deserialize(&mut de).map_err(|source| {
225                    crate::cli::command::FromArgsError {
226                        field: "dangerous_advanced",
227                        source: source.into(),
228                    }
229                })?;
230                Some(v)
231            } else {
232                None
233            };
234        Ok(Self {
235            path_type: Path::AgentsQueueDeliver,
236            dangerous_advanced,
237            base: args.base.into(),
238        })
239    }
240}
241
242#[cfg(feature = "cli-executor")]
243pub async fn execute<E: crate::cli::command::CommandExecutor>(
244    executor: &E,
245    mut request: Request,
246    agent_arguments: Option<&crate::cli::command::AgentArguments>,
247) -> Result<E::Stream<ResponseItem>, E::Error> {
248    request.base.clear_transform();
249    executor.execute(request, agent_arguments).await
250}
251
252#[cfg(feature = "cli-executor")]
253pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
254    executor: &E,
255    mut request: Request,
256    transform: crate::cli::command::Transform,
257    agent_arguments: Option<&crate::cli::command::AgentArguments>,
258) -> Result<E::Stream<serde_json::Value>, E::Error> {
259    request.base.set_transform(transform);
260    executor.execute(request, agent_arguments).await
261}
262
263#[cfg(feature = "mcp")]
264impl crate::cli::command::CommandResponse for ResponseItem {
265    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
266        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
267    }
268}
269
270pub mod request_schema;
271
272pub mod response_schema;