Skip to main content

objectiveai_sdk/cli/command/agents/logs/read/subscribe/
mod.rs

1//! `agents logs read subscribe` — the live cousin of
2//! `agents logs read pending`. Same `Vec<Target>` input + same
3//! parts-grouped block output, but with a first-ping-or-go-inactive
4//! wait loop. Returns either a real block (the EXACT same JSON
5//! shape `read all` / `read pending` emit) OR the literal
6//! string `"agents_inactive"` when no target has a live agent
7//! to wait on.
8//!
9//! Optional kind filter: any subset of `--request` /
10//! `--assistant` / `--tool` flags. `--request` covers BOTH
11//! request blob rows AND `message_queue_*` client notification
12//! rows (incoming work is one bucket regardless of source). No
13//! flags set = no filter (all kinds).
14
15use crate::cli::command::CommandRequest;
16
17#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
18#[schemars(rename = "cli.command.agents.logs.read.subscribe.Request")]
19pub struct Request {
20    pub path_type: Path,
21    pub targets: Vec<Target>,
22    /// Filter to rows whose `MessageTable` falls in the selected
23    /// bucket(s). `None` on the wire = no filter (all kinds).
24    #[serde(default, skip_serializing_if = "Option::is_none")]
25    #[schemars(extend("omitempty" = true))]
26    pub kinds: Option<KindFilter>,
27    /// Skip rows with `logs.messages."index" <= after_id`.
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    #[schemars(extend("omitempty" = true))]
30    pub after_id: Option<i64>,
31    /// Cap on rows scanned per target.
32    #[serde(default, skip_serializing_if = "Option::is_none")]
33    #[schemars(extend("omitempty" = true))]
34    pub limit: Option<i64>,
35    #[serde(flatten)]
36    pub base: crate::cli::command::RequestBase,
37}
38
39/// 3-bool bitset for the type-filter flags. All independent and
40/// optional. If all 3 are false, the request's `kinds` field is
41/// serialized as `None` (no filter — wait for any kind).
42#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
43#[schemars(rename = "cli.command.agents.logs.read.subscribe.KindFilter")]
44pub struct KindFilter {
45    /// `--request`: covers all 3 request blob kinds
46    /// (`agent_completion_request` / `vector_completion_request`
47    /// / `function_execution_request`) AND the 5
48    /// `message_queue_*` client notification kinds. Treats
49    /// "incoming work" as one bucket regardless of whether it's
50    /// a new completion request or a queued message landing in
51    /// the agent's inbox.
52    pub request: bool,
53    /// `--assistant`: all 8 `assistant_response_*` kinds
54    /// (refusal / reasoning / tool_calls + 5 content kinds).
55    pub assistant: bool,
56    /// `--tool`: all 6 `tool_response*` kinds (1 container +
57    /// 5 content kinds).
58    pub tool: bool,
59}
60
61impl KindFilter {
62    /// Returns `None` when all 3 booleans are false (no filter
63    /// — `Request.kinds` stays `None` on the wire).
64    pub fn from_flags(request: bool, assistant: bool, tool: bool) -> Option<Self> {
65        if request || assistant || tool {
66            Some(Self { request, assistant, tool })
67        } else {
68            None
69        }
70    }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
74#[schemars(rename = "cli.command.agents.logs.read.subscribe.Path")]
75pub enum Path {
76    #[serde(rename = "agents/logs/read/subscribe")]
77    AgentsLogsReadSubscribe,
78}
79
80impl CommandRequest for Request {
81    fn into_command(&self) -> Vec<String> {
82        let mut argv = vec![
83            "agents".to_string(),
84            "logs".to_string(),
85            "read".to_string(),
86            "subscribe".to_string(),
87        ];
88        for target in &self.targets {
89            argv.push("--target".to_string());
90            argv.push(target.into_arg_string());
91        }
92        if let Some(kinds) = &self.kinds {
93            if kinds.request {
94                argv.push("--request".to_string());
95            }
96            if kinds.assistant {
97                argv.push("--assistant".to_string());
98            }
99            if kinds.tool {
100                argv.push("--tool".to_string());
101            }
102        }
103        if let Some(after_id) = self.after_id {
104            argv.push("--after-id".to_string());
105            argv.push(after_id.to_string());
106        }
107        if let Some(limit) = self.limit {
108            argv.push("--limit".to_string());
109            argv.push(limit.to_string());
110        }
111        self.base.push_flags(&mut argv);
112        argv
113    }
114
115    fn request_base(&self) -> &crate::cli::command::RequestBase {
116        &self.base
117    }
118
119    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
120        Some(&mut self.base)
121    }
122}
123
124// Re-export `Target` from `logs::read::all` — single source of
125// truth for the docker-style `--target` parser. Same shape on
126// the wire as `agents logs read all` / `read pending`.
127pub use super::all::Target;
128
129/// Subscribe's wire shape. Either a real parts-grouped block
130/// (the EXACT same enum `read all` / `read pending` emit) OR
131/// the literal string `"agents_inactive"`. `#[serde(untagged)]`
132/// so the `Item` arm passes through transparently — JSONL
133/// consumers see either a block JSON object or a bare string.
134#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
135#[serde(untagged)]
136#[schemars(rename = "cli.command.agents.logs.read.subscribe.ResponseItem")]
137pub enum ResponseItem {
138    #[schemars(title = "Item")]
139    Item(super::all::ResponseItem),
140    #[schemars(title = "AgentsInactive")]
141    AgentsInactive(AgentsInactiveTag),
142}
143
144/// Single-variant enum whose lone variant serializes as the
145/// literal string `"agents_inactive"`. Construct as
146/// `AgentsInactiveTag::AgentsInactive`; the surrounding
147/// `ResponseItem::AgentsInactive(_)` carries it.
148#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
149#[schemars(rename = "cli.command.agents.logs.read.subscribe.AgentsInactiveTag")]
150pub enum AgentsInactiveTag {
151    #[serde(rename = "agents_inactive")]
152    AgentsInactive,
153}
154
155#[derive(clap::Args)]
156pub struct Args {
157    /// One or more `--target instance=L[,parent=P]` entries.
158    /// `parent` defaults to the cli's own
159    /// `Config.agent_instance_hierarchy` when omitted on an
160    /// individual target. Also accepts `--target tag=T` and
161    /// `--target me` (the caller's own AIH).
162    #[arg(long = "target", required = true)]
163    pub targets: Vec<String>,
164    /// Wait for new request blob rows OR new `message_queue_*`
165    /// client notification rows.
166    #[arg(long)]
167    pub request: bool,
168    /// Wait for new `assistant_response_*` rows.
169    #[arg(long)]
170    pub assistant: bool,
171    /// Wait for new `tool_response*` rows.
172    #[arg(long)]
173    pub tool: bool,
174    /// Skip rows with `logs.messages."index" <= after_id`.
175    #[arg(long)]
176    pub after_id: Option<i64>,
177    /// Cap on rows scanned per target.
178    #[arg(long)]
179    pub limit: Option<i64>,
180    #[command(flatten)]
181    pub base: crate::cli::command::RequestBaseArgs,
182}
183
184#[derive(clap::Args)]
185#[command(args_conflicts_with_subcommands = true)]
186pub struct Command {
187    #[command(flatten)]
188    pub args: Args,
189    #[command(subcommand)]
190    pub schema: Option<Schema>,
191}
192
193#[derive(clap::Subcommand)]
194pub enum Schema {
195    /// Emit the JSON Schema for this leaf's `Request` type and exit.
196    RequestSchema(request_schema::Args),
197    /// Emit the JSON Schema for this leaf's `Response` type and exit.
198    ResponseSchema(response_schema::Args),
199}
200
201impl TryFrom<Args> for Request {
202    type Error = crate::cli::command::FromArgsError;
203    fn try_from(args: Args) -> Result<Self, Self::Error> {
204        let targets = args
205            .targets
206            .iter()
207            .map(|s| {
208                s.parse::<Target>().map_err(|msg| {
209                    crate::cli::command::FromArgsError::path_parse("target", msg)
210                })
211            })
212            .collect::<Result<Vec<_>, _>>()?;
213        let kinds = KindFilter::from_flags(args.request, args.assistant, args.tool);
214        Ok(Self {
215            path_type: Path::AgentsLogsReadSubscribe,
216            targets,
217            kinds,
218            after_id: args.after_id,
219            limit: args.limit,
220            base: args.base.into(),
221        })
222    }
223}
224
225#[cfg(feature = "cli-executor")]
226pub async fn execute<E: crate::cli::command::CommandExecutor>(
227    executor: &E,
228    mut request: Request,
229
230        agent_arguments: Option<&crate::cli::command::AgentArguments>,
231    ) -> Result<E::Stream<ResponseItem>, E::Error> {
232    request.base.clear_transform();
233    executor.execute(request, agent_arguments).await
234}
235
236#[cfg(feature = "cli-executor")]
237pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
238    executor: &E,
239    mut request: Request,
240    transform: crate::cli::command::Transform,
241
242        agent_arguments: Option<&crate::cli::command::AgentArguments>,
243    ) -> Result<E::Stream<serde_json::Value>, E::Error> {
244    request.base.set_transform(transform);
245    executor.execute(request, agent_arguments).await
246}
247
248#[cfg(feature = "mcp")]
249impl crate::cli::command::CommandResponse for ResponseItem {
250    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
251        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
252    }
253}
254
255pub mod request_schema;
256
257
258pub mod response_schema;