Skip to main content

objectiveai_sdk/cli/command/agents/logs/read/subscribe/
mod.rs

1//! `agents logs read subscribe` — the live cousin of
2//! `agents logs read pending`. Same `Vec<Target>` input + same
3//! parts-grouped block output, but with a first-ping-or-go-inactive
4//! wait loop. Returns either a real block (the EXACT same JSON
5//! shape `read all` / `read pending` emit) OR the literal
6//! string `"agents_inactive"` when no target has a live agent
7//! to wait on.
8//!
9//! Optional kind filter: any subset of `--request` /
10//! `--assistant` / `--tool` flags. `--request` covers BOTH
11//! request blob rows AND `message_queue_*` client notification
12//! rows (incoming work is one bucket regardless of source). No
13//! flags set = no filter (all kinds).
14
15use crate::cli::command::CommandRequest;
16
17#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
18#[schemars(rename = "cli.command.agents.logs.read.subscribe.Request")]
19pub struct Request {
20    pub path_type: Path,
21    pub targets: Vec<Target>,
22    /// Filter to rows whose `MessageTable` falls in the selected
23    /// bucket(s). `None` on the wire = no filter (all kinds).
24    #[serde(default, skip_serializing_if = "Option::is_none")]
25    #[schemars(extend("omitempty" = true))]
26    pub kinds: Option<KindFilter>,
27    /// Skip rows with `logs.messages."index" <= after_id`.
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    #[schemars(extend("omitempty" = true))]
30    pub after_id: Option<i64>,
31    /// Cap on rows scanned per target.
32    #[serde(default, skip_serializing_if = "Option::is_none")]
33    #[schemars(extend("omitempty" = true))]
34    pub limit: Option<i64>,
35    pub jq: Option<String>,
36}
37
38/// 3-bool bitset for the type-filter flags. All independent and
39/// optional. If all 3 are false, the request's `kinds` field is
40/// serialized as `None` (no filter — wait for any kind).
41#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
42#[schemars(rename = "cli.command.agents.logs.read.subscribe.KindFilter")]
43pub struct KindFilter {
44    /// `--request`: covers all 3 request blob kinds
45    /// (`agent_completion_request` / `vector_completion_request`
46    /// / `function_execution_request`) AND the 5
47    /// `message_queue_*` client notification kinds. Treats
48    /// "incoming work" as one bucket regardless of whether it's
49    /// a new completion request or a queued message landing in
50    /// the agent's inbox.
51    pub request: bool,
52    /// `--assistant`: all 8 `assistant_response_*` kinds
53    /// (refusal / reasoning / tool_calls + 5 content kinds).
54    pub assistant: bool,
55    /// `--tool`: all 6 `tool_response*` kinds (1 container +
56    /// 5 content kinds).
57    pub tool: bool,
58}
59
60impl KindFilter {
61    /// Returns `None` when all 3 booleans are false (no filter
62    /// — `Request.kinds` stays `None` on the wire).
63    pub fn from_flags(request: bool, assistant: bool, tool: bool) -> Option<Self> {
64        if request || assistant || tool {
65            Some(Self { request, assistant, tool })
66        } else {
67            None
68        }
69    }
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
73#[schemars(rename = "cli.command.agents.logs.read.subscribe.Path")]
74pub enum Path {
75    #[serde(rename = "agents/logs/read/subscribe")]
76    AgentsLogsReadSubscribe,
77}
78
79impl CommandRequest for Request {
80    fn into_command(&self) -> Vec<String> {
81        let mut argv = vec![
82            "agents".to_string(),
83            "logs".to_string(),
84            "read".to_string(),
85            "subscribe".to_string(),
86        ];
87        for target in &self.targets {
88            argv.push("--target".to_string());
89            argv.push(target.into_arg_string());
90        }
91        if let Some(kinds) = &self.kinds {
92            if kinds.request {
93                argv.push("--request".to_string());
94            }
95            if kinds.assistant {
96                argv.push("--assistant".to_string());
97            }
98            if kinds.tool {
99                argv.push("--tool".to_string());
100            }
101        }
102        if let Some(after_id) = self.after_id {
103            argv.push("--after-id".to_string());
104            argv.push(after_id.to_string());
105        }
106        if let Some(limit) = self.limit {
107            argv.push("--limit".to_string());
108            argv.push(limit.to_string());
109        }
110        if let Some(jq) = &self.jq {
111            argv.push("--jq".to_string());
112            argv.push(jq.clone());
113        }
114        argv
115    }
116}
117
118// Re-export `Target` from `logs::read::all` — single source of
119// truth for the docker-style `--target` parser. Same shape on
120// the wire as `agents logs read all` / `read pending`.
121pub use super::all::Target;
122
123/// Subscribe's wire shape. Either a real parts-grouped block
124/// (the EXACT same enum `read all` / `read pending` emit) OR
125/// the literal string `"agents_inactive"`. `#[serde(untagged)]`
126/// so the `Item` arm passes through transparently — JSONL
127/// consumers see either a block JSON object or a bare string.
128#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
129#[serde(untagged)]
130#[schemars(rename = "cli.command.agents.logs.read.subscribe.ResponseItem")]
131pub enum ResponseItem {
132    #[schemars(title = "Item")]
133    Item(super::all::ResponseItem),
134    #[schemars(title = "AgentsInactive")]
135    AgentsInactive(AgentsInactiveTag),
136}
137
138/// Single-variant enum whose lone variant serializes as the
139/// literal string `"agents_inactive"`. Construct as
140/// `AgentsInactiveTag::AgentsInactive`; the surrounding
141/// `ResponseItem::AgentsInactive(_)` carries it.
142#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
143#[schemars(rename = "cli.command.agents.logs.read.subscribe.AgentsInactiveTag")]
144pub enum AgentsInactiveTag {
145    #[serde(rename = "agents_inactive")]
146    AgentsInactive,
147}
148
149#[derive(clap::Args)]
150pub struct Args {
151    /// One or more `--target instance=L[,parent=P]` entries.
152    /// `parent` defaults to the cli's own
153    /// `Config.agent_instance_hierarchy` when omitted on an
154    /// individual target. Also accepts `--target tag=T` and
155    /// `--target me` (the caller's own AIH).
156    #[arg(long = "target", required = true)]
157    pub targets: Vec<String>,
158    /// Wait for new request blob rows OR new `message_queue_*`
159    /// client notification rows.
160    #[arg(long)]
161    pub request: bool,
162    /// Wait for new `assistant_response_*` rows.
163    #[arg(long)]
164    pub assistant: bool,
165    /// Wait for new `tool_response*` rows.
166    #[arg(long)]
167    pub tool: bool,
168    /// Skip rows with `logs.messages."index" <= after_id`.
169    #[arg(long)]
170    pub after_id: Option<i64>,
171    /// Cap on rows scanned per target.
172    #[arg(long)]
173    pub limit: Option<i64>,
174    /// jq filter applied to the JSON output.
175    #[arg(long)]
176    pub jq: Option<String>,
177}
178
179#[derive(clap::Args)]
180#[command(args_conflicts_with_subcommands = true)]
181pub struct Command {
182    #[command(flatten)]
183    pub args: Args,
184    #[command(subcommand)]
185    pub schema: Option<Schema>,
186}
187
188#[derive(clap::Subcommand)]
189pub enum Schema {
190    /// Emit the JSON Schema for this leaf's `Request` type and exit.
191    RequestSchema(request_schema::Args),
192    /// Emit the JSON Schema for this leaf's `Response` type and exit.
193    ResponseSchema(response_schema::Args),
194}
195
196impl TryFrom<Args> for Request {
197    type Error = crate::cli::command::FromArgsError;
198    fn try_from(args: Args) -> Result<Self, Self::Error> {
199        let targets = args
200            .targets
201            .iter()
202            .map(|s| {
203                s.parse::<Target>().map_err(|msg| {
204                    crate::cli::command::FromArgsError::path_parse("target", msg)
205                })
206            })
207            .collect::<Result<Vec<_>, _>>()?;
208        let kinds = KindFilter::from_flags(args.request, args.assistant, args.tool);
209        Ok(Self {
210            path_type: Path::AgentsLogsReadSubscribe,
211            targets,
212            kinds,
213            after_id: args.after_id,
214            limit: args.limit,
215            jq: args.jq,
216        })
217    }
218}
219
220#[cfg(feature = "cli-executor")]
221pub async fn execute<E: crate::cli::command::CommandExecutor>(
222    executor: &E,
223    mut request: Request,
224
225        agent_arguments: Option<&crate::cli::command::AgentArguments>,
226    ) -> Result<E::Stream<ResponseItem>, E::Error> {
227    request.jq = None;
228    executor.execute(request, agent_arguments).await
229}
230
231#[cfg(feature = "cli-executor")]
232pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
233    executor: &E,
234    mut request: Request,
235    jq: String,
236
237        agent_arguments: Option<&crate::cli::command::AgentArguments>,
238    ) -> Result<E::Stream<serde_json::Value>, E::Error> {
239    request.jq = Some(jq);
240    executor.execute(request, agent_arguments).await
241}
242
243#[cfg(feature = "mcp")]
244impl crate::cli::command::CommandResponse for ResponseItem {
245    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
246        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
247    }
248}
249
250pub mod request_schema;
251
252
253pub mod response_schema;