Skip to main content

objectiveai_sdk/cli/command/db/query/
mod.rs

1//! `db query` — execute arbitrary single-statement read-only SQL
2//! against the CLI's local postgres pool. Returns the row set as
3//! typed JSON cells (Postgres → JSON via a per-cell decoder),
4//! with column metadata and the wire-protocol command tag.
5//!
6//! Constraints:
7//! - **Read-only**: wrapped in `SET LOCAL TRANSACTION READ ONLY`
8//!   server-side. Write attempts come back as
9//!   `Error::QueryReadOnlyViolation`.
10//! - **Timeout**: required, humantime-parsed (`30s`, `5m`,
11//!   `1h30m`). `0` is rejected at `TryFrom<Args>` time. The CLI
12//!   threads it into both `SET LOCAL statement_timeout` (server)
13//!   and `tokio::time::timeout` (client) so cancellation cleanly
14//!   drops the connection without leaving the pool poisoned.
15//! - **Token budget**: optional `--max-tokens N`. When set, the
16//!   CLI tokenizes the response per-part (`command_tag`,
17//!   `columns`, each row) via `tiktoken-rs` o200k_base and sums.
18//!   Over-budget responses error with `Error::TokenBudgetExceeded
19//!   { limit, actual }` carrying the exact count and a
20//!   refinement suggestion. `0` is rejected at `TryFrom<Args>`
21//!   time (it means "no limit," not "no tokens").
22//!
23//! Multi-statement queries, `COPY … TO STDOUT|STDIN`, and
24//! transaction-control verbs (`BEGIN` / `COMMIT` / `ROLLBACK`)
25//! are rejected up front by a cheap leading-token scan on the
26//! CLI side before the query reaches the database.
27
28use crate::cli::command::CommandRequest;
29
30#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
31#[schemars(rename = "cli.command.db.query.Request")]
32pub struct Request {
33    pub path_type: Path,
34    /// SQL statement to execute. Single statement only (multi-
35    /// statement input is rejected by the CLI handler).
36    pub query: String,
37    /// Wall-clock execution cap, in whole seconds. Parsed from
38    /// `--timeout` (humantime), `> 0` enforced at
39    /// `TryFrom<Args>` time so this never carries 0 on the wire.
40    pub timeout_seconds: u64,
41    /// Optional response token budget. `None` ⇒ no limit;
42    /// `Some(n)` requires `n >= 1`. When set, the CLI errors
43    /// with `TokenBudgetExceeded` if the per-part token sum
44    /// exceeds the limit. Skip-serialized when None.
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    #[schemars(extend("omitempty" = true))]
47    pub max_tokens: Option<u64>,
48    pub jq: Option<String>,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
52#[schemars(rename = "cli.command.db.query.Path")]
53pub enum Path {
54    #[serde(rename = "db/query")]
55    DbQuery,
56}
57
58impl CommandRequest for Request {
59    fn into_command(&self) -> Vec<String> {
60        let mut argv = vec![
61            "db".to_string(),
62            "query".to_string(),
63            "--query".to_string(),
64            self.query.clone(),
65            "--timeout".to_string(),
66            humantime::format_duration(
67                std::time::Duration::from_secs(self.timeout_seconds),
68            )
69            .to_string(),
70        ];
71        if let Some(n) = self.max_tokens {
72            argv.push("--max-tokens".to_string());
73            argv.push(n.to_string());
74        }
75        if let Some(jq) = &self.jq {
76            argv.push("--jq".to_string());
77            argv.push(jq.clone());
78        }
79        argv
80    }
81}
82
83/// One result column. `r#type` is the Postgres `pg_type.typname`
84/// (e.g. `"int8"`, `"text"`, `"jsonb"`, `"timestamptz"`). Callers
85/// needing precision/scale/array-element-type can inspect the
86/// name; richer typeinfo is deferred.
87#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
88#[schemars(rename = "cli.command.db.query.Column")]
89pub struct Column {
90    pub name: String,
91    pub r#type: String,
92}
93
94/// Unary response from `db query`.
95#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
96#[schemars(rename = "cli.command.db.query.Response")]
97pub struct Response {
98    /// Wire-protocol-style command tag — e.g. `"SELECT 5"`,
99    /// `"SHOW 1"`, `"EXPLAIN 12"`, `"SET 0"`. Synthesized from
100    /// `{leading_keyword} {row_count}` since sqlx 0.8 doesn't
101    /// expose the wire-protocol `CommandComplete` tag; close
102    /// enough for telemetry / display.
103    pub command_tag: String,
104    /// Result columns in select-list order. Empty for no-row
105    /// statements (SET / LISTEN / DO).
106    pub columns: Vec<Column>,
107    /// One inner Vec per row, length matches `columns.len()`.
108    /// Each cell is a `serde_json::Value` produced by the CLI's
109    /// per-cell `pg_value_to_json` decoder. Common type
110    /// encodings: text/uuid/timestamps as JSON String, numeric
111    /// as String (preserving precision), bytea as base64 String,
112    /// json/jsonb passthrough as Value, arrays recurse. Empty
113    /// when the statement returns no rows (no-row command OR a
114    /// SELECT with no matches).
115    pub rows: Vec<Vec<serde_json::Value>>,
116    /// Always `false` in the current design — over-budget
117    /// responses return `TokenBudgetExceeded` rather than a
118    /// partial result. Reserved on the wire so a future "soft
119    /// truncation" mode can be added without a shape break.
120    pub truncated: bool,
121}
122
123#[derive(clap::Args)]
124pub struct Args {
125    /// SQL statement to execute. Single statement only.
126    #[arg(long)]
127    pub query: String,
128    /// Wall-clock execution cap. Humantime — `100ms`, `30s`,
129    /// `5m`, `1h30m`. Must be `> 0`.
130    #[arg(long)]
131    pub timeout: String,
132    /// Optional response token budget. Must be `>= 1` if set
133    /// (`0` is rejected; omit the flag entirely to disable
134    /// limiting).
135    #[arg(long)]
136    pub max_tokens: Option<u64>,
137    /// jq filter applied to the JSON output.
138    #[arg(long)]
139    pub jq: Option<String>,
140}
141
142#[derive(clap::Args)]
143#[command(args_conflicts_with_subcommands = true)]
144pub struct Command {
145    #[command(flatten)]
146    pub args: Args,
147    #[command(subcommand)]
148    pub schema: Option<Schema>,
149}
150
151#[derive(clap::Subcommand)]
152pub enum Schema {
153    /// Emit the JSON Schema for this leaf's `Request` type and exit.
154    RequestSchema(request_schema::Args),
155    /// Emit the JSON Schema for this leaf's `Response` type and exit.
156    ResponseSchema(response_schema::Args),
157}
158
159impl TryFrom<Args> for Request {
160    type Error = crate::cli::command::FromArgsError;
161    fn try_from(args: Args) -> Result<Self, Self::Error> {
162        let parsed_timeout = humantime::parse_duration(&args.timeout)
163            .map_err(|source| crate::cli::command::FromArgsError {
164                field: "timeout",
165                source: source.to_string().into(),
166            })?;
167        let timeout_seconds = parsed_timeout.as_secs();
168        if timeout_seconds == 0 {
169            // Treats sub-second timeouts as 0 too — humantime
170            // accepts `100ms` and we'd `.as_secs()` it down to 0.
171            // Sub-second is a real use case (cancellation
172            // testing); accept it here by rounding up to 1s OR
173            // by switching the storage unit. For now, the
174            // simplest spec-respecting reject path: require ≥
175            // 1s.
176            return Err(crate::cli::command::FromArgsError {
177                field: "timeout",
178                source: "must be >= 1s".to_string().into(),
179            });
180        }
181        if let Some(0) = args.max_tokens {
182            return Err(crate::cli::command::FromArgsError {
183                field: "max_tokens",
184                source: "must be >= 1 (omit the flag for unlimited)"
185                    .to_string()
186                    .into(),
187            });
188        }
189        Ok(Self {
190            path_type: Path::DbQuery,
191            query: args.query,
192            timeout_seconds,
193            max_tokens: args.max_tokens,
194            jq: args.jq,
195        })
196    }
197}
198
199#[cfg(feature = "cli-executor")]
200pub async fn execute<E: crate::cli::command::CommandExecutor>(
201    executor: &E,
202    mut request: Request,
203    agent_arguments: Option<&crate::cli::command::AgentArguments>,
204) -> Result<Response, E::Error> {
205    request.jq = None;
206    executor.execute_one(request, agent_arguments).await
207}
208
209#[cfg(feature = "cli-executor")]
210pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
211    executor: &E,
212    mut request: Request,
213    jq: String,
214    agent_arguments: Option<&crate::cli::command::AgentArguments>,
215) -> Result<serde_json::Value, E::Error> {
216    request.jq = Some(jq);
217    executor.execute_one(request, agent_arguments).await
218}
219
220#[cfg(feature = "mcp")]
221impl crate::cli::command::CommandResponse for Response {
222    fn into_mcp(self) -> crate::cli::command::McpResponseItem {
223        crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
224    }
225}
226
227pub mod request_schema;
228pub mod response_schema;