Skip to main content

objectiveai_sdk/cli/command/agents/queue/deliver/
mod.rs

1//! `agents queue deliver` — wake every queue-pending descendant
2//! agent of the caller.
3//!
4//! The handler enumerates the unique AIHs with active queued prompts
5//! that are STRICT descendants of the caller (the caller itself is
6//! excluded) and, per AIH, try-acquires the agent's file lock with no
7//! waiting: a live owner yields [`AgentActiveResponseItem`]; winning
8//! the lock yields [`AgentSpawnedResponseItem`] and runs the same
9//! spawn machinery `agents spawn` / `agents message` use (empty
10//! messages + the stored continuation), streaming each spawn item as
11//! a [`ValueResponseItem`] and releasing the lock when that task's
12//! stream ends. Once EVERY target has resolved (active or spawned),
13//! the bare string `"AllAgentsActive"` is emitted.
14//!
15//! Two modes, selected by `dangerous_advanced.stream_spawns`:
16//! * unset/false (the default, user-facing): re-exec the cli binary
17//!   as a detached orphan with `stream_spawns = true` and emit the
18//!   child's items up to and including `AllAgentsActive`, then
19//!   return — the orphan keeps running the spawns to completion.
20//! * true (the re-exec'd child): run the full delivery in-process and
21//!   stream everything, spawn output included.
22
23use crate::cli::command::CommandRequest;
24
25#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
26#[schemars(rename = "cli.command.agents.queue.deliver.Request")]
27pub struct Request {
28    pub path_type: Path,
29    pub dangerous_advanced: Option<RequestDangerousAdvanced>,
30    pub jq: Option<String>,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
34#[schemars(rename = "cli.command.agents.queue.deliver.Path")]
35pub enum Path {
36    #[serde(rename = "agents/queue/deliver")]
37    AgentsQueueDeliver,
38}
39
40#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
41#[schemars(rename = "cli.command.agents.queue.deliver.RequestDangerousAdvanced")]
42pub struct RequestDangerousAdvanced {
43    /// Run the delivery in-process and stream every spawned agent's
44    /// output to completion. Unset/false re-execs a detached child
45    /// with this set and returns at its `AllAgentsActive` marker.
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    #[schemars(extend("omitempty" = true))]
48    pub stream_spawns: Option<bool>,
49}
50
51impl CommandRequest for Request {
52    fn into_command(&self) -> Vec<String> {
53        let mut argv = vec![
54            "agents".to_string(),
55            "queue".to_string(),
56            "deliver".to_string(),
57        ];
58        if let Some(advanced) = &self.dangerous_advanced {
59            argv.push("--dangerous-advanced".to_string());
60            argv.push(
61                serde_json::to_string(advanced)
62                    .expect("RequestDangerousAdvanced serializes"),
63            );
64        }
65        if let Some(jq) = &self.jq {
66            argv.push("--jq".to_string());
67            argv.push(jq.clone());
68        }
69        argv
70    }
71}
72
73/// One stream item from `agents queue deliver`. Untagged — the
74/// variants are disjoint on the wire: `Value` requires `value`,
75/// `AgentActive` / `AgentSpawned` carry distinct `type` markers, and
76/// `AllAgentsActive` is the bare string `"AllAgentsActive"`.
77#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
78#[serde(untagged)]
79#[schemars(rename = "cli.command.agents.queue.deliver.ResponseItem")]
80pub enum ResponseItem {
81    #[schemars(title = "Value")]
82    Value(ValueResponseItem),
83    #[schemars(title = "AgentActive")]
84    AgentActive(AgentActiveResponseItem),
85    #[schemars(title = "AgentSpawned")]
86    AgentSpawned(AgentSpawnedResponseItem),
87    #[schemars(title = "AllAgentsActive")]
88    AllAgentsActive(AllAgentsActive),
89}
90
91/// One output item from one delivered agent's spawn stream. `value`
92/// is the typed root [`crate::cli::command::ResponseItem`] (the spawn
93/// item wrapped at the root) — boxed because the root union
94/// transitively contains *this* type (`agents → queue → deliver`),
95/// and boxing is what makes the recursion sized.
96///
97/// The `value` field's JSON schema is opaqued to `serde_json::Value`
98/// (renders as bare `{}` aka JsonValue) so the published schema
99/// doesn't inline the entire root union — that's the TS7056 blowup
100/// the root and tier aggregates dodge by being `json_schema_ignore`.
101/// Downstream SDKs see `value: JsonValue` on the typed `execute`
102/// path; consumers that want to peer inside parse it case-by-case.
103#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
104#[schemars(rename = "cli.command.agents.queue.deliver.ValueResponseItem")]
105pub struct ValueResponseItem {
106    /// The delivered agent's `agent_instance_hierarchy`.
107    pub agent_instance_hierarchy: String,
108    /// The typed root item the spawn emitted.
109    #[schemars(with = "serde_json::Value")]
110    pub value: Box<crate::cli::command::ResponseItem>,
111}
112
113/// This agent's lock was held by a live owner — it is already active
114/// and will drain its own queue; nothing was spawned for it.
115#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
116#[schemars(rename = "cli.command.agents.queue.deliver.AgentActiveResponseItem")]
117pub struct AgentActiveResponseItem {
118    pub r#type: AgentActiveType,
119    pub agent_instance_hierarchy: String,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
123#[schemars(rename = "cli.command.agents.queue.deliver.AgentActiveType")]
124pub enum AgentActiveType {
125    #[serde(rename = "AgentActive")]
126    AgentActive,
127}
128
129/// This agent's lock was won and its spawn has started; its output
130/// follows as [`ValueResponseItem`]s (in `stream_spawns` mode).
131#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
132#[schemars(rename = "cli.command.agents.queue.deliver.AgentSpawnedResponseItem")]
133pub struct AgentSpawnedResponseItem {
134    pub r#type: AgentSpawnedType,
135    pub agent_instance_hierarchy: String,
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
139#[schemars(rename = "cli.command.agents.queue.deliver.AgentSpawnedType")]
140pub enum AgentSpawnedType {
141    #[serde(rename = "AgentSpawned")]
142    AgentSpawned,
143}
144
145/// Every target has resolved to active-or-spawned. Wire shape is the
146/// bare string `"AllAgentsActive"` (a one-variant enum — a unit
147/// variant in the untagged [`ResponseItem`] would serialize as
148/// `null`, not the marker string). The detached default mode stops
149/// reading its child at this item.
150#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
151#[schemars(rename = "cli.command.agents.queue.deliver.AllAgentsActive")]
152pub enum AllAgentsActive {
153    AllAgentsActive,
154}
155
156#[derive(clap::Args)]
157pub struct Args {
158    /// Raw JSON for `RequestDangerousAdvanced` (e.g.
159    /// `{"stream_spawns":true}`).
160    #[arg(long)]
161    pub dangerous_advanced: Option<String>,
162    /// jq filter applied to the JSON output.
163    #[arg(long)]
164    pub jq: Option<String>,
165}
166
167#[derive(clap::Args)]
168#[command(args_conflicts_with_subcommands = true)]
169pub struct Command {
170    #[command(flatten)]
171    pub args: Args,
172    #[command(subcommand)]
173    pub schema: Option<Schema>,
174}
175
176#[derive(clap::Subcommand)]
177pub enum Schema {
178    /// Emit the JSON Schema for this leaf's `Request` type and exit.
179    RequestSchema(request_schema::Args),
180    /// Emit the JSON Schema for this leaf's `Response` type and exit.
181    ResponseSchema(response_schema::Args),
182}
183
184impl TryFrom<Args> for Request {
185    type Error = crate::cli::command::FromArgsError;
186    fn try_from(args: Args) -> Result<Self, Self::Error> {
187        let dangerous_advanced: Option<RequestDangerousAdvanced> =
188            if let Some(s) = args.dangerous_advanced {
189                let mut de = serde_json::Deserializer::from_str(&s);
190                let v = serde_path_to_error::deserialize(&mut de).map_err(|source| {
191                    crate::cli::command::FromArgsError {
192                        field: "dangerous_advanced",
193                        source: source.into(),
194                    }
195                })?;
196                Some(v)
197            } else {
198                None
199            };
200        Ok(Self {
201            path_type: Path::AgentsQueueDeliver,
202            dangerous_advanced,
203            jq: args.jq,
204        })
205    }
206}
207
208#[cfg(feature = "cli-executor")]
209pub async fn execute<E: crate::cli::command::CommandExecutor>(
210    executor: &E,
211    mut request: Request,
212    agent_arguments: Option<&crate::cli::command::AgentArguments>,
213) -> Result<E::Stream<ResponseItem>, E::Error> {
214    request.jq = None;
215    executor.execute(request, agent_arguments).await
216}
217
218#[cfg(feature = "cli-executor")]
219pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
220    executor: &E,
221    mut request: Request,
222    jq: String,
223    agent_arguments: Option<&crate::cli::command::AgentArguments>,
224) -> Result<E::Stream<serde_json::Value>, E::Error> {
225    request.jq = Some(jq);
226    executor.execute(request, agent_arguments).await
227}
228
229#[cfg(feature = "mcp")]
230impl crate::cli::command::CommandResponse for ResponseItem {
231    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
232        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
233    }
234}
235
236pub mod request_schema;
237
238pub mod response_schema;