objectiveai_sdk/cli/command/db/query/mod.rs
1//! `db query` — execute arbitrary single-statement read-only SQL
2//! against the CLI's local postgres pool. Returns the row set as
3//! typed JSON cells (Postgres → JSON via a per-cell decoder),
4//! with column metadata and the wire-protocol command tag.
5//!
6//! Constraints:
7//! - **Read-only**: wrapped in `SET LOCAL TRANSACTION READ ONLY`
8//! server-side. Write attempts come back as
9//! `Error::QueryReadOnlyViolation`.
10//! - **Timeout**: required, humantime-parsed (`30s`, `5m`,
11//! `1h30m`). `0` is rejected at `TryFrom<Args>` time. The CLI
12//! threads it into both `SET LOCAL statement_timeout` (server)
13//! and `tokio::time::timeout` (client) so cancellation cleanly
14//! drops the connection without leaving the pool poisoned.
15//! - **Token budget**: optional `--max-tokens N`. When set, the
16//! CLI tokenizes the response per-part (`command_tag`,
17//! `columns`, each row) via `tiktoken-rs` o200k_base and sums.
18//! Over-budget responses error with `Error::TokenBudgetExceeded
19//! { limit, actual }` carrying the exact count and a
20//! refinement suggestion. `0` is rejected at `TryFrom<Args>`
21//! time (it means "no limit," not "no tokens").
22//!
23//! Multi-statement queries, `COPY … TO STDOUT|STDIN`, and
24//! transaction-control verbs (`BEGIN` / `COMMIT` / `ROLLBACK`)
25//! are rejected up front by a cheap leading-token scan on the
26//! CLI side before the query reaches the database.
27
28use crate::cli::command::CommandRequest;
29
30#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
31#[schemars(rename = "cli.command.db.query.Request")]
32pub struct Request {
33 pub path_type: Path,
34 /// SQL statement to execute. Single statement only (multi-
35 /// statement input is rejected by the CLI handler).
36 pub query: String,
37 /// Wall-clock execution cap, in whole seconds. Parsed from
38 /// `--timeout` (humantime), `> 0` enforced at
39 /// `TryFrom<Args>` time so this never carries 0 on the wire.
40 pub timeout_seconds: u64,
41 /// Optional response token budget. `None` ⇒ no limit;
42 /// `Some(n)` requires `n >= 1`. When set, the CLI errors
43 /// with `TokenBudgetExceeded` if the per-part token sum
44 /// exceeds the limit. Skip-serialized when None.
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 #[schemars(extend("omitempty" = true))]
47 pub max_tokens: Option<u64>,
48 pub jq: Option<String>,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
52#[schemars(rename = "cli.command.db.query.Path")]
53pub enum Path {
54 #[serde(rename = "db/query")]
55 DbQuery,
56}
57
58impl CommandRequest for Request {
59 fn into_command(&self) -> Vec<String> {
60 let mut argv = vec![
61 "db".to_string(),
62 "query".to_string(),
63 "--query".to_string(),
64 self.query.clone(),
65 "--timeout".to_string(),
66 humantime::format_duration(
67 std::time::Duration::from_secs(self.timeout_seconds),
68 )
69 .to_string(),
70 ];
71 if let Some(n) = self.max_tokens {
72 argv.push("--max-tokens".to_string());
73 argv.push(n.to_string());
74 }
75 if let Some(jq) = &self.jq {
76 argv.push("--jq".to_string());
77 argv.push(jq.clone());
78 }
79 argv
80 }
81}
82
83/// One result column. `r#type` is the Postgres `pg_type.typname`
84/// (e.g. `"int8"`, `"text"`, `"jsonb"`, `"timestamptz"`). Callers
85/// needing precision/scale/array-element-type can inspect the
86/// name; richer typeinfo is deferred.
87#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
88#[schemars(rename = "cli.command.db.query.Column")]
89pub struct Column {
90 pub name: String,
91 pub r#type: String,
92}
93
94/// Unary response from `db query`.
95#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
96#[schemars(rename = "cli.command.db.query.Response")]
97pub struct Response {
98 /// Wire-protocol-style command tag — e.g. `"SELECT 5"`,
99 /// `"SHOW 1"`, `"EXPLAIN 12"`, `"SET 0"`. Synthesized from
100 /// `{leading_keyword} {row_count}` since sqlx 0.8 doesn't
101 /// expose the wire-protocol `CommandComplete` tag; close
102 /// enough for telemetry / display.
103 pub command_tag: String,
104 /// Result columns in select-list order. Empty for no-row
105 /// statements (SET / LISTEN / DO).
106 pub columns: Vec<Column>,
107 /// One inner Vec per row, length matches `columns.len()`.
108 /// Each cell is a `serde_json::Value` produced by the CLI's
109 /// per-cell `pg_value_to_json` decoder. Common type
110 /// encodings: text/uuid/timestamps as JSON String, numeric
111 /// as String (preserving precision), bytea as base64 String,
112 /// json/jsonb passthrough as Value, arrays recurse. Empty
113 /// when the statement returns no rows (no-row command OR a
114 /// SELECT with no matches).
115 pub rows: Vec<Vec<serde_json::Value>>,
116 /// Always `false` in the current design — over-budget
117 /// responses return `TokenBudgetExceeded` rather than a
118 /// partial result. Reserved on the wire so a future "soft
119 /// truncation" mode can be added without a shape break.
120 pub truncated: bool,
121}
122
123#[derive(clap::Args)]
124pub struct Args {
125 /// SQL statement to execute. Single statement only.
126 #[arg(long)]
127 pub query: String,
128 /// Wall-clock execution cap. Humantime — `100ms`, `30s`,
129 /// `5m`, `1h30m`. Must be `> 0`.
130 #[arg(long)]
131 pub timeout: String,
132 /// Optional response token budget. Must be `>= 1` if set
133 /// (`0` is rejected; omit the flag entirely to disable
134 /// limiting).
135 #[arg(long)]
136 pub max_tokens: Option<u64>,
137 /// jq filter applied to the JSON output.
138 #[arg(long)]
139 pub jq: Option<String>,
140}
141
142#[derive(clap::Args)]
143#[command(args_conflicts_with_subcommands = true)]
144pub struct Command {
145 #[command(flatten)]
146 pub args: Args,
147 #[command(subcommand)]
148 pub schema: Option<Schema>,
149}
150
151#[derive(clap::Subcommand)]
152pub enum Schema {
153 /// Emit the JSON Schema for this leaf's `Request` type and exit.
154 RequestSchema(request_schema::Args),
155 /// Emit the JSON Schema for this leaf's `Response` type and exit.
156 ResponseSchema(response_schema::Args),
157}
158
159impl TryFrom<Args> for Request {
160 type Error = crate::cli::command::FromArgsError;
161 fn try_from(args: Args) -> Result<Self, Self::Error> {
162 let parsed_timeout = humantime::parse_duration(&args.timeout)
163 .map_err(|source| crate::cli::command::FromArgsError {
164 field: "timeout",
165 source: source.to_string().into(),
166 })?;
167 let timeout_seconds = parsed_timeout.as_secs();
168 if timeout_seconds == 0 {
169 // Treats sub-second timeouts as 0 too — humantime
170 // accepts `100ms` and we'd `.as_secs()` it down to 0.
171 // Sub-second is a real use case (cancellation
172 // testing); accept it here by rounding up to 1s OR
173 // by switching the storage unit. For now, the
174 // simplest spec-respecting reject path: require ≥
175 // 1s.
176 return Err(crate::cli::command::FromArgsError {
177 field: "timeout",
178 source: "must be >= 1s".to_string().into(),
179 });
180 }
181 if let Some(0) = args.max_tokens {
182 return Err(crate::cli::command::FromArgsError {
183 field: "max_tokens",
184 source: "must be >= 1 (omit the flag for unlimited)"
185 .to_string()
186 .into(),
187 });
188 }
189 Ok(Self {
190 path_type: Path::DbQuery,
191 query: args.query,
192 timeout_seconds,
193 max_tokens: args.max_tokens,
194 jq: args.jq,
195 })
196 }
197}
198
199#[cfg(feature = "cli-executor")]
200pub async fn execute<E: crate::cli::command::CommandExecutor>(
201 executor: &E,
202 mut request: Request,
203 agent_arguments: Option<&crate::cli::command::AgentArguments>,
204) -> Result<Response, E::Error> {
205 request.jq = None;
206 executor.execute_one(request, agent_arguments).await
207}
208
209#[cfg(feature = "cli-executor")]
210pub async fn execute_jq<E: crate::cli::command::CommandExecutor>(
211 executor: &E,
212 mut request: Request,
213 jq: String,
214 agent_arguments: Option<&crate::cli::command::AgentArguments>,
215) -> Result<serde_json::Value, E::Error> {
216 request.jq = Some(jq);
217 executor.execute_one(request, agent_arguments).await
218}
219
220#[cfg(feature = "mcp")]
221impl crate::cli::command::CommandResponse for Response {
222 fn into_mcp(self) -> crate::cli::command::McpResponseItem {
223 crate::cli::command::McpResponseItem::JSONL(serde_json::to_value(self).unwrap())
224 }
225}
226
227pub mod request_schema;
228pub mod response_schema;