objectiveai_sdk/cli/command/command_executor.rs
1use futures::Stream;
2
3use crate::cli::command::CommandRequest;
4use crate::cli::command::CommandResponse;
5
6pub mod binary;
7pub mod plugin;
8
9/// Run a [`CommandRequest`] against some backend (subprocess of the cli
10/// binary, in-process router, mock, …) and surface its output as a
11/// stream of typed items.
12///
13/// `T` is left to the caller: pick a concrete leaf response type
14/// (`agents::spawn::Response`, `functions::executions::create::standard::ResponseItem`,
15/// …) or a more general `serde_json::Value` for opaque consumption.
16///
17/// Every call accepts an optional [`AgentArguments`] bag controlling
18/// per-call identity. When `Some`, subprocess-spawning executors stamp
19/// each `Some(v)` field on the child env and `env_remove` each `None`
20/// — atomic per-call override. When `None`, the child inherits parent
21/// env unchanged. In-process executors (e.g. the plugin executor)
22/// accept the parameter for trait-signature symmetry and ignore it.
23pub trait CommandExecutor {
24 type Error: Send + 'static;
25 type Stream<T>: Stream<Item = Result<T, Self::Error>> + Send + 'static
26 where
27 T: Send + 'static;
28
29 fn execute<R, T>(
30 &self,
31 request: R,
32 agent_arguments: Option<&AgentArguments>,
33 ) -> impl Future<Output = Result<Self::Stream<T>, Self::Error>> + Send
34 where
35 R: CommandRequest + Send,
36 T: CommandResponse + serde::de::DeserializeOwned + Send + 'static;
37
38 /// Convenience for unary commands: run the request and resolve the
39 /// first item from the stream. Implementations should error with
40 /// their own "empty stream" variant if the stream closes without
41 /// producing an item.
42 fn execute_one<R, T>(
43 &self,
44 request: R,
45 agent_arguments: Option<&AgentArguments>,
46 ) -> impl Future<Output = Result<T, Self::Error>> + Send
47 where
48 R: CommandRequest + Send,
49 T: CommandResponse + serde::de::DeserializeOwned + Send + 'static;
50}
51
52/// Agent identity + response routing args carried per call.
53///
54/// When passed as `Some(&AgentArguments)` to a [`CommandExecutor`],
55/// subprocess-spawning executors (e.g. [`binary::BinaryExecutor`])
56/// apply ALL six fields to the spawned child's env atomically —
57/// `Some(v)` → set, `None` → `env_remove` so the parent's value for
58/// that var can't leak through. `None` for the whole bag means
59/// "inherit parent env unmodified". In-process executors (e.g.
60/// [`plugin::PluginExecutor`]) ignore the bag.
61///
62/// Field ↔ env-var mapping (same as `EnvConfigBuilder` in
63/// `objectiveai-cli/src/run.rs`):
64///
65/// - `agent_instance_hierarchy` ↔ `OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY`
66/// - `agent_id` ↔ `OBJECTIVEAI_AGENT_ID`
67/// - `agent_full_id` ↔ `OBJECTIVEAI_AGENT_FULL_ID`
68/// - `agent_remote` ↔ `OBJECTIVEAI_AGENT_REMOTE`
69/// - `response_id` ↔ `OBJECTIVEAI_RESPONSE_ID`
70/// - `response_ids` ↔ `OBJECTIVEAI_RESPONSE_IDS`
71/// - `mcp_session_id` ↔ `MCP_SESSION_ID` (the MCP transport
72/// session id minted by the MCP server, NOT an objectiveai-scoped
73/// identifier — same env-var convention as
74/// [`crate::mcp::MCP_SESSION_ID_ENV`])
75#[derive(
76 Debug,
77 Clone,
78 Default,
79 PartialEq,
80 Eq,
81 serde::Serialize,
82 serde::Deserialize,
83 schemars::JsonSchema,
84)]
85#[schemars(rename = "cli.command.AgentArguments")]
86pub struct AgentArguments {
87 #[serde(default, skip_serializing_if = "Option::is_none")]
88 #[schemars(extend("omitempty" = true))]
89 pub agent_instance_hierarchy: Option<String>,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
91 #[schemars(extend("omitempty" = true))]
92 pub agent_id: Option<String>,
93 #[serde(default, skip_serializing_if = "Option::is_none")]
94 #[schemars(extend("omitempty" = true))]
95 pub agent_full_id: Option<String>,
96 #[serde(default, skip_serializing_if = "Option::is_none")]
97 #[schemars(extend("omitempty" = true))]
98 pub agent_remote: Option<String>,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 #[schemars(extend("omitempty" = true))]
101 pub response_id: Option<String>,
102 #[serde(default, skip_serializing_if = "Option::is_none")]
103 #[schemars(extend("omitempty" = true))]
104 pub response_ids: Option<String>,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 #[schemars(extend("omitempty" = true))]
107 pub mcp_session_id: Option<String>,
108}
109
110impl AgentArguments {
111 /// Apply this bag to a child-process command: every `Some(v)`
112 /// stamps the matching env var, every `None` env-removes it so
113 /// the parent's value can't leak through. Called by
114 /// [`binary::BinaryExecutor`]; available for any executor that
115 /// spawns a subprocess.
116 #[cfg(feature = "cli-executor")]
117 pub fn apply_to_command(&self, command: &mut tokio::process::Command) {
118 let pairs: [(&str, &Option<String>); 7] = [
119 (
120 "OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY",
121 &self.agent_instance_hierarchy,
122 ),
123 ("OBJECTIVEAI_AGENT_ID", &self.agent_id),
124 ("OBJECTIVEAI_AGENT_FULL_ID", &self.agent_full_id),
125 ("OBJECTIVEAI_AGENT_REMOTE", &self.agent_remote),
126 ("OBJECTIVEAI_RESPONSE_ID", &self.response_id),
127 ("OBJECTIVEAI_RESPONSE_IDS", &self.response_ids),
128 (crate::mcp::MCP_SESSION_ID_ENV, &self.mcp_session_id),
129 ];
130 for (name, value) in pairs {
131 match value {
132 Some(v) => {
133 command.env(name, v);
134 }
135 None => {
136 command.env_remove(name);
137 }
138 }
139 }
140 }
141}