Skip to main content

objectiveai_sdk/cli/command/agents/message/
mod.rs

1//! `agents message` — stream-aware delivery primitive.
2//!
3//! Resolves the target, decides whether to enqueue / race delivery
4//! against a live agent / take over and spawn, all driven by
5//! `dangerous_advanced.stream` (mirror of `agents instances
6//! spawn`'s same flag).
7//!
8//! Tag resolution is the first step: a `MessageTarget::Tag` lookup
9//! against `tags` either yields a BOUND hierarchy (which makes the
10//! call act like a Direct target) or fails (PENDING / ABSENT), in
11//! which case the call falls back to a pure enqueue.
12//!
13//! Once we have a resolved hierarchy, the path splits by stream
14//! mode:
15//!
16//! - **stream=false** (default): non-acquiring lock-file check.
17//!   If a live agent holds it: enqueue + race DB-delivery against
18//!   lock-file release. If no live agent: re-exec ourselves as a
19//!   detached subprocess with stream=true so the new process
20//!   becomes the agent.
21//! - **stream=true**: try to acquire the lock-file. On success:
22//!   skip enqueue, run `spawn::run_multi_pass` in-process. On
23//!   failure: enqueue + race DB-delivery against lock acquisition.
24
25use crate::agent::completions::message::RichContent;
26use crate::agent::completions::response::streaming::AgentCompletionChunk;
27use crate::cli::command::CommandRequest;
28
29#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
30#[schemars(rename = "cli.command.agents.message.Request")]
31pub struct Request {
32    pub path_type: Path,
33    pub target: MessageTarget,
34    /// Required payload. The eventual enqueue / delivery / spawn
35    /// always carries this exact `RichContent` as its single
36    /// user message.
37    pub message: RequestMessage,
38    /// `None` (default) → run the full delivery flow (resolve
39    /// target, lock-race, spawn-takeover). `Some(_)` →
40    /// short-circuit straight into the queue against the target;
41    /// no lookup, no race, no spawn. With `Keyed { key }`, any
42    /// pre-existing row scoped to the same target + key is deleted
43    /// before insert.
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    #[schemars(extend("omitempty" = true))]
46    pub enqueue: Option<EnqueueMode>,
47    /// `Some(true)` → in-process streaming delivery / spawn.
48    /// `None | Some(false)` → detached subprocess re-exec for the
49    /// spawn-take-over case; the call returns the first item of
50    /// that child's stream. Ignored when `enqueue.is_some()` — the
51    /// enqueue path yields a single-item stream identical to its
52    /// unary response.
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    #[schemars(extend("omitempty" = true))]
55    pub dangerous_advanced: Option<RequestDangerousAdvanced>,
56    pub jq: Option<String>,
57}
58
59/// Mutually-exclusive addressing for an `agents message` call.
60///
61/// `Direct` composes `{parent}/{agent_instance}` (parent defaults to
62/// `Config.agent_instance_hierarchy` when omitted) and operates
63/// against that hierarchy. `Tag` is resolved against the tags DB at
64/// call time: a BOUND tag becomes effectively a Direct target,
65/// while PENDING / ABSENT falls back to pure enqueue against the
66/// tag name.
67#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
68#[serde(tag = "by", rename_all = "snake_case")]
69#[schemars(rename = "cli.command.agents.message.MessageTarget")]
70pub enum MessageTarget {
71    #[schemars(title = "Direct")]
72    Direct {
73        /// Lineage prefix to prepend to `agent_instance`. When
74        /// `None`, the CLI substitutes its own
75        /// `Config.agent_instance_hierarchy`.
76        #[serde(default, skip_serializing_if = "Option::is_none")]
77        #[schemars(extend("omitempty" = true))]
78        parent_agent_instance_hierarchy: Option<String>,
79        /// Leaf id of the target agent.
80        agent_instance: String,
81    },
82    #[schemars(title = "Tag")]
83    Tag { agent_tag: String },
84}
85
86#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
87#[schemars(rename = "cli.command.agents.message.Path")]
88pub enum Path {
89    #[serde(rename = "agents/message")]
90    AgentsMessage,
91}
92
93#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
94#[schemars(rename = "cli.command.agents.message.RequestMessage")]
95pub enum RequestMessage {
96    #[schemars(title = "Inline")]
97    Inline(RichContent),
98    #[schemars(title = "Simple")]
99    Simple(String),
100    #[schemars(title = "File")]
101    File(std::path::PathBuf),
102    #[schemars(title = "PythonInline")]
103    PythonInline(String),
104    #[schemars(title = "PythonFile")]
105    PythonFile(std::path::PathBuf),
106}
107
108impl RequestMessage {
109    /// Append the flag pair (`--simple <s>` / `--inline <json>` /
110    /// `--file <path>` / `--python-inline <code>` /
111    /// `--python-file <path>`) for this variant to `out`. Used by
112    /// both this leaf's [`CommandRequest::into_command`] and by
113    /// `agents queue add`'s — same wire shape, same five flags.
114    pub fn push_flags(&self, out: &mut Vec<String>) {
115        match self {
116            RequestMessage::Inline(rich) => {
117                out.push("--inline".to_string());
118                out.push(
119                    serde_json::to_string(rich)
120                        .expect("RichContent serializes to JSON cleanly"),
121                );
122            }
123            RequestMessage::Simple(s) => {
124                out.push("--simple".to_string());
125                out.push(s.clone());
126            }
127            RequestMessage::File(p) => {
128                out.push("--file".to_string());
129                out.push(p.to_string_lossy().into_owned());
130            }
131            RequestMessage::PythonInline(code) => {
132                out.push("--python-inline".to_string());
133                out.push(code.clone());
134            }
135            RequestMessage::PythonFile(p) => {
136                out.push("--python-file".to_string());
137                out.push(p.to_string_lossy().into_owned());
138            }
139        }
140    }
141}
142
143#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
144#[schemars(rename = "cli.command.agents.message.RequestDangerousAdvanced")]
145pub struct RequestDangerousAdvanced {
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    #[schemars(extend("omitempty" = true))]
148    pub stream: Option<bool>,
149    /// Deterministic seed for the upstream model's RNG. Plumbed
150    /// onto `AgentCompletionCreateParams.seed` on the
151    /// spawn-takeover path. `None` here ⇒ the api picks; tests
152    /// should always pin a value to keep continuation turns
153    /// reproducible.
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    #[schemars(extend("omitempty" = true))]
156    pub seed: Option<i64>,
157}
158
159/// "Fire and forget into the queue" mode. When attached to a
160/// [`Request`] via [`Request::enqueue`], the handler short-circuits:
161/// no tag lookup, no lock-file race, no spawn-takeover, no detached
162/// respawn — just one INSERT (preceded by a key-collision DELETE
163/// when `Keyed`) and a [`Response::Enqueued`] reply.
164///
165/// Key scope is per-target: the row's `(agent_instance_hierarchy,
166/// key)` or `(agent_tag, key)` pair is unique. Replacing an existing
167/// row scoped to one target leaves rows under other targets alone.
168#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
169#[serde(tag = "by", rename_all = "snake_case")]
170#[schemars(rename = "cli.command.agents.message.EnqueueMode")]
171pub enum EnqueueMode {
172    /// `--enqueue` — bare enqueue, no idempotency key.
173    #[schemars(title = "Plain")]
174    Plain,
175    /// `--enqueue-with-key <KEY>` — idempotent enqueue. Existing
176    /// queue rows scoped to the same target (AIH or tag) AND key
177    /// are deleted before the insert lands.
178    #[schemars(title = "Keyed")]
179    Keyed { key: String },
180}
181
182impl CommandRequest for Request {
183    fn into_command(&self) -> Vec<String> {
184        let mut argv = vec!["agents".to_string(), "message".to_string()];
185        match &self.target {
186            MessageTarget::Direct {
187                parent_agent_instance_hierarchy,
188                agent_instance,
189            } => {
190                argv.push(agent_instance.clone());
191                if let Some(parent) = parent_agent_instance_hierarchy {
192                    argv.push("--parent-agent-instance-hierarchy".to_string());
193                    argv.push(parent.clone());
194                }
195            }
196            MessageTarget::Tag { agent_tag } => {
197                argv.push("--agent-tag".to_string());
198                argv.push(agent_tag.clone());
199            }
200        }
201        self.message.push_flags(&mut argv);
202        if let Some(advanced) = &self.dangerous_advanced {
203            argv.push("--dangerous-advanced".to_string());
204            argv.push(
205                serde_json::to_string(advanced)
206                    .expect("RequestDangerousAdvanced serializes"),
207            );
208        }
209        match &self.enqueue {
210            None => {}
211            Some(EnqueueMode::Plain) => argv.push("--enqueue".to_string()),
212            Some(EnqueueMode::Keyed { key }) => {
213                argv.push("--enqueue-with-key".to_string());
214                argv.push(key.clone());
215            }
216        }
217        if let Some(jq) = &self.jq {
218            argv.push("--jq".to_string());
219            argv.push(jq.clone());
220        }
221        argv
222    }
223}
224
225/// Unary response (stream=false). Exactly one of these per call.
226/// Internally tagged via `type`; bare unit variant `Delivered`
227/// serializes as `{"type":"delivered"}`.
228#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
229#[serde(tag = "type", rename_all = "snake_case")]
230#[schemars(rename = "cli.command.agents.message.Response")]
231pub enum Response {
232    /// The queue row reached a live agent (the API stamped its id
233    /// onto an assistant chunk's `request_message_ids`) before
234    /// any other race finalized.
235    #[schemars(title = "Delivered")]
236    Delivered,
237    /// The target's tag wasn't bound at call time (PENDING /
238    /// ABSENT). The message was deferred into the queue.
239    #[schemars(title = "Enqueued")]
240    Enqueued {
241        id: i64,
242        #[serde(default, skip_serializing_if = "Option::is_none")]
243        #[schemars(extend("omitempty" = true))]
244        agent_instance_hierarchy: Option<String>,
245        #[serde(default, skip_serializing_if = "Option::is_none")]
246        #[schemars(extend("omitempty" = true))]
247        agent_tag: Option<String>,
248    },
249    /// The stream=false path re-execed itself as a detached
250    /// subprocess (stream=true) and the subprocess yielded a
251    /// `ResponseItem::Id` first. Same payload as spawn's
252    /// `ResponseItem::Id(String)` — the bare
253    /// `agent_instance_hierarchy` string the runner just minted.
254    #[schemars(title = "Id")]
255    Id { agent_instance_hierarchy: String },
256}
257
258/// Streamed response (stream=true). The cli yields a sequence of
259/// these. Same `Delivered` / `Enqueued` / `Id` first-item
260/// semantics as [`Response`]; the spawn-take-over branch adds
261/// streaming `Chunk` items after the initial `Id`.
262#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
263#[serde(tag = "type", rename_all = "snake_case")]
264#[schemars(rename = "cli.command.agents.message.ResponseItem")]
265pub enum ResponseItem {
266    #[schemars(title = "Delivered")]
267    Delivered,
268    #[schemars(title = "Enqueued")]
269    Enqueued {
270        id: i64,
271        #[serde(default, skip_serializing_if = "Option::is_none")]
272        #[schemars(extend("omitempty" = true))]
273        agent_instance_hierarchy: Option<String>,
274        #[serde(default, skip_serializing_if = "Option::is_none")]
275        #[schemars(extend("omitempty" = true))]
276        agent_tag: Option<String>,
277    },
278    #[schemars(title = "Id")]
279    Id { agent_instance_hierarchy: String },
280    /// Newtype-of-struct under an internally-tagged enum: the
281    /// chunk's own fields land at the top level of the JSON, with
282    /// `"type":"chunk"` injected. Wire shape equivalent to spawn's
283    /// `ResponseItem::Chunk(AgentCompletionChunk)` plus the `type`
284    /// discriminator.
285    #[schemars(title = "Chunk")]
286    Chunk(AgentCompletionChunk),
287}
288
289impl From<Response> for ResponseItem {
290    /// Lift the unary [`Response`] into the streaming
291    /// [`ResponseItem`] shape. Lossless — every `Response`
292    /// variant maps 1-to-1 onto a `ResponseItem` variant of the
293    /// same name; streaming-only variants (`Chunk`) are never
294    /// produced from a `Response`.
295    fn from(r: Response) -> Self {
296        match r {
297            Response::Delivered => ResponseItem::Delivered,
298            Response::Enqueued {
299                id,
300                agent_instance_hierarchy,
301                agent_tag,
302            } => ResponseItem::Enqueued {
303                id,
304                agent_instance_hierarchy,
305                agent_tag,
306            },
307            Response::Id {
308                agent_instance_hierarchy,
309            } => ResponseItem::Id {
310                agent_instance_hierarchy,
311            },
312        }
313    }
314}
315
316#[derive(clap::Args)]
317#[command(group(
318    clap::ArgGroup::new("message_target")
319        .required(true)
320        .multiple(false)
321        .args(["agent_instance", "agent_tag"])
322))]
323pub struct Args {
324    /// Leaf id of the target agent. Combined with `--parent` (or
325    /// the cli's own `Config.agent_instance_hierarchy` when
326    /// `--parent` is omitted) to form the full lineage. Mutually
327    /// exclusive with `--agent-tag`.
328    pub agent_instance: Option<String>,
329    /// Optional lineage prefix to prepend to `agent_instance`.
330    /// When omitted, the cli substitutes its own
331    /// `Config.agent_instance_hierarchy`. Only valid alongside a
332    /// positional `agent_instance`.
333    #[arg(long = "parent-agent-instance-hierarchy", requires = "agent_instance")]
334    pub parent_agent_instance_hierarchy: Option<String>,
335    #[command(flatten)]
336    pub message: MessageArgs,
337    /// Tag name to enqueue against. Stored verbatim — the cli does
338    /// NOT resolve the tag at enqueue time. Mutually exclusive with
339    /// `--agent-instance`.
340    #[arg(long = "agent-tag")]
341    pub agent_tag: Option<String>,
342    /// Raw JSON for [`RequestDangerousAdvanced`] (e.g.
343    /// `{"stream":true,"seed":42}`).
344    #[arg(long)]
345    pub dangerous_advanced: Option<String>,
346    /// Persist the message into the queue against the target and
347    /// return immediately. No tag lookup, no delivery race, no
348    /// spawn-takeover. Mutually exclusive with
349    /// `--enqueue-with-key`.
350    #[arg(long, conflicts_with = "enqueue_with_key")]
351    pub enqueue: bool,
352    /// Persist with an idempotency key — existing queue rows
353    /// scoped to the same target + key are deleted before insert.
354    /// Mutually exclusive with `--enqueue`.
355    #[arg(long)]
356    pub enqueue_with_key: Option<String>,
357    /// jq filter applied to the JSON output.
358    #[arg(long)]
359    pub jq: Option<String>,
360}
361
362#[derive(clap::Args)]
363#[group(required = true, multiple = false)]
364pub struct MessageArgs {
365    /// Plain text — becomes one user message.
366    #[arg(long)]
367    pub simple: Option<String>,
368    /// Inline JSON `RichContent`.
369    #[arg(long)]
370    pub inline: Option<String>,
371    /// Path to a JSON file containing the rich content.
372    #[arg(long)]
373    pub file: Option<std::path::PathBuf>,
374    /// Inline Python code that produces the rich content.
375    #[arg(long)]
376    pub python_inline: Option<String>,
377    /// Path to a Python file that produces the rich content.
378    #[arg(long)]
379    pub python_file: Option<std::path::PathBuf>,
380}
381
382#[derive(clap::Args)]
383#[command(args_conflicts_with_subcommands = true)]
384pub struct Command {
385    #[command(flatten)]
386    pub args: Args,
387    #[command(subcommand)]
388    pub schema: Option<Schema>,
389}
390
391#[derive(clap::Subcommand)]
392pub enum Schema {
393    /// Emit the JSON Schema for this leaf's `Request` type and exit.
394    RequestSchema(request_schema::Args),
395    /// Emit the JSON Schema for this leaf's `Response` type and exit.
396    ResponseSchema(response_schema::Args),
397}
398
399impl TryFrom<Args> for Request {
400    type Error = crate::cli::command::FromArgsError;
401    fn try_from(args: Args) -> Result<Self, Self::Error> {
402        let message = if let Some(s) = args.message.simple {
403            RequestMessage::Simple(s)
404        } else if let Some(s) = args.message.inline {
405            let mut de = serde_json::Deserializer::from_str(&s);
406            let v = serde_path_to_error::deserialize(&mut de).map_err(|source| {
407                crate::cli::command::FromArgsError {
408                    field: "inline",
409                    source: source.into(),
410                }
411            })?;
412            RequestMessage::Inline(v)
413        } else if let Some(p) = args.message.file {
414            RequestMessage::File(p)
415        } else if let Some(s) = args.message.python_inline {
416            RequestMessage::PythonInline(s)
417        } else {
418            // Clap `required = true` on `MessageArgs` guarantees
419            // exactly one of the five flags is set.
420            RequestMessage::PythonFile(args.message.python_file.unwrap())
421        };
422        let target = match (args.agent_instance, args.agent_tag) {
423            (Some(agent_instance), None) => MessageTarget::Direct {
424                parent_agent_instance_hierarchy: args.parent_agent_instance_hierarchy,
425                agent_instance,
426            },
427            (None, Some(agent_tag)) => MessageTarget::Tag { agent_tag },
428            _ => unreachable!(
429                "clap group `message_target` ensures exactly one of agent_instance | agent_tag"
430            ),
431        };
432        let dangerous_advanced: Option<RequestDangerousAdvanced> =
433            if let Some(s) = args.dangerous_advanced {
434                let mut de = serde_json::Deserializer::from_str(&s);
435                let v = serde_path_to_error::deserialize(&mut de).map_err(|source| {
436                    crate::cli::command::FromArgsError {
437                        field: "dangerous_advanced",
438                        source: source.into(),
439                    }
440                })?;
441                Some(v)
442            } else {
443                None
444            };
445        let enqueue = match (args.enqueue, args.enqueue_with_key) {
446            (false, None) => None,
447            (true, None) => Some(EnqueueMode::Plain),
448            (false, Some(key)) => Some(EnqueueMode::Keyed { key }),
449            (true, Some(_)) => unreachable!(
450                "clap `conflicts_with` prevents --enqueue + --enqueue-with-key"
451            ),
452        };
453        Ok(Self {
454            path_type: Path::AgentsMessage,
455            target,
456            message,
457            enqueue,
458            dangerous_advanced,
459            jq: args.jq,
460        })
461    }
462}
463
464#[cfg(feature = "cli-executor")]
465pub async fn execute_streaming<E: crate::cli::command::CommandExecutor>(
466    executor: &E,
467    mut request: Request,
468
469        agent_arguments: Option<&crate::cli::command::AgentArguments>,
470    ) -> Result<E::Stream<ResponseItem>, E::Error> {
471    request.jq = None;
472    let mut advanced = request.dangerous_advanced.unwrap_or_default();
473    advanced.stream = Some(true);
474    request.dangerous_advanced = Some(advanced);
475    executor.execute(request, agent_arguments).await
476}
477
478#[cfg(feature = "cli-executor")]
479pub async fn execute_streaming_jq<E: crate::cli::command::CommandExecutor>(
480    executor: &E,
481    mut request: Request,
482    jq: String,
483
484        agent_arguments: Option<&crate::cli::command::AgentArguments>,
485    ) -> Result<E::Stream<serde_json::Value>, E::Error> {
486    request.jq = Some(jq);
487    let mut advanced = request.dangerous_advanced.unwrap_or_default();
488    advanced.stream = Some(true);
489    request.dangerous_advanced = Some(advanced);
490    executor.execute(request, agent_arguments).await
491}
492
493#[cfg(feature = "cli-executor")]
494pub async fn execute<E: crate::cli::command::CommandExecutor>(
495    executor: &E,
496    mut request: Request,
497
498        agent_arguments: Option<&crate::cli::command::AgentArguments>,
499    ) -> Result<Response, E::Error> {
500    request.jq = None;
501    if let Some(advanced) = request.dangerous_advanced.as_mut() {
502        advanced.stream = None;
503    }
504    executor.execute_one(request, agent_arguments).await
505}
506
507#[cfg(feature = "cli-executor")]
508pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
509    executor: &E,
510    mut request: Request,
511    jq: String,
512
513        agent_arguments: Option<&crate::cli::command::AgentArguments>,
514    ) -> Result<serde_json::Value, E::Error> {
515    request.jq = Some(jq);
516    if let Some(advanced) = request.dangerous_advanced.as_mut() {
517        advanced.stream = None;
518    }
519    executor.execute_one(request, agent_arguments).await
520}
521
522#[cfg(feature = "mcp")]
523impl crate::cli::command::CommandResponse for Response {
524    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
525        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
526    }
527}
528
529#[cfg(feature = "mcp")]
530impl crate::cli::command::CommandResponse for ResponseItem {
531    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
532        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
533    }
534}
535
536pub mod request_schema;
537
538
539pub mod response_schema;