Skip to main content

objectiveai_sdk/cli/command/agents/logs/subscribe/
mod.rs

1//! `agents logs read subscribe` — the live cousin of
2//! `agents logs read pending`. Same `Vec<Target>` input + same
3//! parts-grouped block output, but with a first-ping-or-go-inactive
4//! wait loop. Returns either a real block (the EXACT same JSON
5//! shape `read all` / `read pending` emit) OR the literal
6//! string `"agents_inactive"` when no target has a live agent
7//! to wait on.
8//!
9//! Optional kind filter: any subset of `--request` /
10//! `--assistant` / `--tool` flags. `--request` covers BOTH
11//! request blob rows AND `message_queue_*` client notification
12//! rows (incoming work is one bucket regardless of source). No
13//! flags set = no filter (all kinds).
14
15use crate::cli::command::CommandRequest;
16
17#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
18#[schemars(rename = "cli.command.agents.logs.subscribe.Request")]
19pub struct Request {
20    pub path_type: Path,
21    pub targets: Vec<Target>,
22    /// Filter to rows whose `MessageTable` falls in the selected
23    /// bucket(s). `None` on the wire = no filter (all kinds).
24    #[serde(default, skip_serializing_if = "Option::is_none")]
25    #[schemars(extend("omitempty" = true))]
26    pub kinds: Option<KindFilter>,
27    /// Skip rows with `logs.messages."index" <= after_id`.
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    #[schemars(extend("omitempty" = true))]
30    pub after_id: Option<i64>,
31    /// Cap on rows scanned per target.
32    #[serde(default, skip_serializing_if = "Option::is_none")]
33    #[schemars(extend("omitempty" = true))]
34    pub limit: Option<i64>,
35    #[serde(flatten)]
36    pub base: crate::cli::command::RequestBase,
37}
38
39/// 3-bool bitset for the type-filter flags. All independent and
40/// optional. If all 3 are false, the request's `kinds` field is
41/// serialized as `None` (no filter — wait for any kind).
42#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
43#[schemars(rename = "cli.command.agents.logs.subscribe.KindFilter")]
44pub struct KindFilter {
45    /// `--request`: covers all 3 request blob kinds
46    /// (`agent_completion_request` / `vector_completion_request`
47    /// / `function_execution_request`) AND the 5
48    /// `message_queue_*` client notification kinds. Treats
49    /// "incoming work" as one bucket regardless of whether it's
50    /// a new completion request or a queued message landing in
51    /// the agent's inbox.
52    pub request: bool,
53    /// `--assistant`: all 8 `assistant_response_*` kinds
54    /// (refusal / reasoning / tool_calls + 5 content kinds).
55    pub assistant: bool,
56    /// `--tool`: all 6 `tool_response*` kinds (1 container +
57    /// 5 content kinds).
58    pub tool: bool,
59}
60
61impl KindFilter {
62    /// Returns `None` when all 3 booleans are false (no filter
63    /// — `Request.kinds` stays `None` on the wire).
64    pub fn from_flags(request: bool, assistant: bool, tool: bool) -> Option<Self> {
65        if request || assistant || tool {
66            Some(Self { request, assistant, tool })
67        } else {
68            None
69        }
70    }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
74#[schemars(rename = "cli.command.agents.logs.subscribe.Path")]
75pub enum Path {
76    #[serde(rename = "agents/logs/subscribe")]
77    AgentsLogsSubscribe,
78}
79
80impl CommandRequest for Request {
81    fn request_base(&self) -> &crate::cli::command::RequestBase {
82        &self.base
83    }
84
85    fn request_base_mut(&mut self) -> Option<&mut crate::cli::command::RequestBase> {
86        Some(&mut self.base)
87    }
88}
89
90// Re-export `Target` from `logs::list` — single source of
91// truth for the docker-style `--target` parser. Same shape on
92// the wire as `agents logs read all` / `read pending`.
93pub use super::list::Target;
94
95/// Subscribe's wire shape. Either a real parts-grouped block
96/// (the EXACT same enum `read all` / `read pending` emit) OR
97/// the literal string `"agents_inactive"`. `#[serde(untagged)]`
98/// so the `Item` arm passes through transparently — JSONL
99/// consumers see either a block JSON object or a bare string.
100#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
101#[serde(untagged)]
102#[schemars(rename = "cli.command.agents.logs.subscribe.ResponseItem")]
103pub enum ResponseItem {
104    #[schemars(title = "Item")]
105    Item(super::list::ResponseItem),
106    #[schemars(title = "AgentsInactive")]
107    AgentsInactive(AgentsInactiveTag),
108}
109
110/// Single-variant enum whose lone variant serializes as the
111/// literal string `"agents_inactive"`. Construct as
112/// `AgentsInactiveTag::AgentsInactive`; the surrounding
113/// `ResponseItem::AgentsInactive(_)` carries it.
114#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
115#[schemars(rename = "cli.command.agents.logs.subscribe.AgentsInactiveTag")]
116pub enum AgentsInactiveTag {
117    #[serde(rename = "agents_inactive")]
118    AgentsInactive,
119}
120
121#[derive(clap::Args)]
122pub struct Args {
123    /// One or more `--target instance=L[,parent=P]` entries.
124    /// `parent` defaults to the cli's own
125    /// `Config.agent_instance_hierarchy` when omitted on an
126    /// individual target. Also accepts `--target tag=T` and
127    /// `--target me` (the caller's own AIH).
128    #[arg(long = "target", required = true)]
129    pub targets: Vec<String>,
130    /// Wait for new request blob rows OR new `message_queue_*`
131    /// client notification rows.
132    #[arg(long)]
133    pub request: bool,
134    /// Wait for new `assistant_response_*` rows.
135    #[arg(long)]
136    pub assistant: bool,
137    /// Wait for new `tool_response*` rows.
138    #[arg(long)]
139    pub tool: bool,
140    /// Skip rows with `logs.messages."index" <= after_id`.
141    #[arg(long)]
142    pub after_id: Option<i64>,
143    /// Cap on rows scanned per target.
144    #[arg(long)]
145    pub limit: Option<i64>,
146    #[command(flatten)]
147    pub base: crate::cli::command::RequestBaseArgs,
148}
149
150#[derive(clap::Args)]
151#[command(args_conflicts_with_subcommands = true)]
152pub struct Command {
153    #[command(flatten)]
154    pub args: Args,
155    #[command(subcommand)]
156    pub schema: Option<Schema>,
157}
158
159#[derive(clap::Subcommand)]
160pub enum Schema {
161    /// Emit the JSON Schema for this leaf's `Request` type and exit.
162    RequestSchema(request_schema::Args),
163    /// Emit the JSON Schema for this leaf's `Response` type and exit.
164    ResponseSchema(response_schema::Args),
165}
166
167impl TryFrom<Args> for Request {
168    type Error = crate::cli::command::FromArgsError;
169    fn try_from(args: Args) -> Result<Self, Self::Error> {
170        let targets = args
171            .targets
172            .iter()
173            .map(|s| {
174                s.parse::<Target>().map_err(|msg| {
175                    crate::cli::command::FromArgsError::path_parse("target", msg)
176                })
177            })
178            .collect::<Result<Vec<_>, _>>()?;
179        let kinds = KindFilter::from_flags(args.request, args.assistant, args.tool);
180        Ok(Self {
181            path_type: Path::AgentsLogsSubscribe,
182            targets,
183            kinds,
184            after_id: args.after_id,
185            limit: args.limit,
186            base: args.base.into(),
187        })
188    }
189}
190
191#[cfg(feature = "cli-executor")]
192pub async fn execute<E: crate::cli::command::CommandExecutor>(
193    executor: &E,
194    mut request: Request,
195
196        agent_arguments: Option<&crate::cli::command::AgentArguments>,
197    ) -> Result<E::Stream<ResponseItem>, E::Error> {
198    request.base.clear_transform();
199    executor.execute(request, agent_arguments).await
200}
201
202#[cfg(feature = "cli-executor")]
203pub async fn execute_transform<E: crate::cli::command::CommandExecutor>(
204    executor: &E,
205    mut request: Request,
206    transform: crate::cli::command::Transform,
207
208        agent_arguments: Option<&crate::cli::command::AgentArguments>,
209    ) -> Result<E::Stream<serde_json::Value>, E::Error> {
210    request.base.set_transform(transform);
211    executor.execute(request, agent_arguments).await
212}
213
214#[cfg(feature = "mcp")]
215impl crate::cli::command::CommandResponse for ResponseItem {
216    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
217        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
218    }
219}
220
221pub mod request_schema;
222
223
224pub mod response_schema;