Skip to main content

objectiveai_sdk/cli/command/
command_executor.rs

1use futures::Stream;
2
3use crate::cli::command::CommandRequest;
4use crate::cli::command::CommandResponse;
5
6pub mod binary;
7pub mod plugin;
8
9/// Run a [`CommandRequest`] against some backend (subprocess of the cli
10/// binary, in-process router, mock, …) and surface its output as a
11/// stream of typed items.
12///
13/// `T` is left to the caller: pick a concrete leaf response type
14/// (`agents::spawn::Response`, `functions::executions::create::standard::ResponseItem`,
15/// …) or a more general `serde_json::Value` for opaque consumption.
16///
17/// Every call accepts an optional [`AgentArguments`] bag controlling
18/// per-call identity. When `Some`, subprocess-spawning executors stamp
19/// each `Some(v)` field on the child env and `env_remove` each `None`
20/// — atomic per-call override. When `None`, the child inherits parent
21/// env unchanged. In-process executors (e.g. the plugin executor)
22/// accept the parameter for trait-signature symmetry and ignore it.
23pub trait CommandExecutor {
24    type Error: Send + 'static;
25    type Stream<T>: Stream<Item = Result<T, Self::Error>> + Send + 'static
26    where
27        T: Send + 'static;
28
29    fn execute<R, T>(
30        &self,
31        request: R,
32        agent_arguments: Option<&AgentArguments>,
33    ) -> impl Future<Output = Result<Self::Stream<T>, Self::Error>> + Send
34    where
35        R: CommandRequest + Send,
36        T: CommandResponse + serde::de::DeserializeOwned + Send + 'static;
37
38    /// Convenience for unary commands: run the request and resolve the
39    /// first item from the stream. Implementations should error with
40    /// their own "empty stream" variant if the stream closes without
41    /// producing an item.
42    fn execute_one<R, T>(
43        &self,
44        request: R,
45        agent_arguments: Option<&AgentArguments>,
46    ) -> impl Future<Output = Result<T, Self::Error>> + Send
47    where
48        R: CommandRequest + Send,
49        T: CommandResponse + serde::de::DeserializeOwned + Send + 'static;
50}
51
52/// Agent identity + response routing args carried per call.
53///
54/// When passed as `Some(&AgentArguments)` to a [`CommandExecutor`],
55/// subprocess-spawning executors (e.g. [`binary::BinaryExecutor`])
56/// apply ALL six fields to the spawned child's env atomically —
57/// `Some(v)` → set, `None` → `env_remove` so the parent's value for
58/// that var can't leak through. `None` for the whole bag means
59/// "inherit parent env unmodified". In-process executors (e.g.
60/// [`plugin::PluginExecutor`]) ignore the bag.
61///
62/// Field ↔ env-var mapping (same as `EnvConfigBuilder` in
63/// `objectiveai-cli/src/run.rs`):
64///
65/// - `agent_instance_hierarchy` ↔ `OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY`
66/// - `agent_id` ↔ `OBJECTIVEAI_AGENT_ID`
67/// - `agent_full_id` ↔ `OBJECTIVEAI_AGENT_FULL_ID`
68/// - `agent_remote` ↔ `OBJECTIVEAI_AGENT_REMOTE`
69/// - `response_id` ↔ `OBJECTIVEAI_RESPONSE_ID`
70/// - `response_ids` ↔ `OBJECTIVEAI_RESPONSE_IDS`
71/// - `mcp_session_id` ↔ `MCP_SESSION_ID` (the MCP transport
72///   session id minted by the MCP server, NOT an objectiveai-scoped
73///   identifier — same env-var convention as
74///   [`crate::mcp::MCP_SESSION_ID_ENV`])
75#[derive(
76    Debug,
77    Clone,
78    Default,
79    PartialEq,
80    Eq,
81    serde::Serialize,
82    serde::Deserialize,
83    schemars::JsonSchema,
84)]
85#[schemars(rename = "cli.command.AgentArguments")]
86pub struct AgentArguments {
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    #[schemars(extend("omitempty" = true))]
89    pub agent_instance_hierarchy: Option<String>,
90    #[serde(default, skip_serializing_if = "Option::is_none")]
91    #[schemars(extend("omitempty" = true))]
92    pub agent_id: Option<String>,
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    #[schemars(extend("omitempty" = true))]
95    pub agent_full_id: Option<String>,
96    #[serde(default, skip_serializing_if = "Option::is_none")]
97    #[schemars(extend("omitempty" = true))]
98    pub agent_remote: Option<String>,
99    #[serde(default, skip_serializing_if = "Option::is_none")]
100    #[schemars(extend("omitempty" = true))]
101    pub response_id: Option<String>,
102    #[serde(default, skip_serializing_if = "Option::is_none")]
103    #[schemars(extend("omitempty" = true))]
104    pub response_ids: Option<String>,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    #[schemars(extend("omitempty" = true))]
107    pub mcp_session_id: Option<String>,
108}
109
110impl AgentArguments {
111    /// Apply this bag to a child-process command: every `Some(v)`
112    /// stamps the matching env var, every `None` env-removes it so
113    /// the parent's value can't leak through. Called by
114    /// [`binary::BinaryExecutor`]; available for any executor that
115    /// spawns a subprocess.
116    #[cfg(feature = "cli-executor")]
117    pub fn apply_to_command(&self, command: &mut tokio::process::Command) {
118        let pairs: [(&str, &Option<String>); 7] = [
119            (
120                "OBJECTIVEAI_AGENT_INSTANCE_HIERARCHY",
121                &self.agent_instance_hierarchy,
122            ),
123            ("OBJECTIVEAI_AGENT_ID", &self.agent_id),
124            ("OBJECTIVEAI_AGENT_FULL_ID", &self.agent_full_id),
125            ("OBJECTIVEAI_AGENT_REMOTE", &self.agent_remote),
126            ("OBJECTIVEAI_RESPONSE_ID", &self.response_id),
127            ("OBJECTIVEAI_RESPONSE_IDS", &self.response_ids),
128            (crate::mcp::MCP_SESSION_ID_ENV, &self.mcp_session_id),
129        ];
130        for (name, value) in pairs {
131            match value {
132                Some(v) => {
133                    command.env(name, v);
134                }
135                None => {
136                    command.env_remove(name);
137                }
138            }
139        }
140    }
141}