lattice_sql_client/lib.rs
1//! # lattice-sql-client
2//!
3//! Typed Rust SDK for [lattice-sql](https://github.com/Taika-3D-Oy/lattice-db) —
4//! the SQL frontend for lattice-db.
5//!
6//! Sends SQL strings to the `ldb.sql.query` NATS subject and maps the JSON
7//! responses back to strongly-typed Rust values.
8//!
9//! ## Quick start
10//!
11//! ```rust,no_run
12//! use nats_wasip3::client::{Client, ConnectConfig};
13//! use lattice_sql_client::LatticeSql;
14//!
15//! # async fn example() -> Result<(), lattice_sql_client::Error> {
16//! let client = Client::connect(ConnectConfig::default()).await?;
17//! let db = LatticeSql::new(client)
18//! .with_auth("my-token"); // matches LDB_AUTH_TOKEN on the server
19//!
20//! // DDL — create a table
21//! db.ddl("CREATE TABLE users (id TEXT PRIMARY KEY, name TEXT NOT NULL, age INTEGER)").await?;
22//!
23//! // DML — insert rows
24//! let affected = db.exec("INSERT INTO users (id, name, age) VALUES ('alice', 'Alice', 30)").await?;
25//! assert_eq!(affected, 1);
26//!
27//! // Query — SELECT with filters, sorting, pagination
28//! let result = db.query("SELECT * FROM users WHERE age >= 25 ORDER BY name ASC LIMIT 10").await?;
29//! println!("columns: {:?}", result.columns);
30//! for row in &result.rows {
31//! println!("{:?}", row);
32//! }
33//!
34//! // Lookup a cell by column name
35//! if let Some(name) = result.cell(0, "name") {
36//! println!("first row name: {name}");
37//! }
38//!
39//! // Aggregates
40//! let agg = db.query("SELECT COUNT(*), SUM(age) FROM users").await?;
41//! let count = &agg.rows[0][0]; // COUNT(*)
42//!
43//! // Cleanup
44//! db.ddl("DROP TABLE users").await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use nats_wasi::client::{Client, Duration, secs};
50use serde::{Deserialize, Serialize, de::DeserializeOwned};
51
52// ── Error ──────────────────────────────────────────────────────────
53
54/// Errors returned by the lattice-sql client.
55#[derive(Debug)]
56pub enum Error {
57 /// NATS transport error (connection refused, timeout, etc.).
58 Nats(nats_wasi::Error),
59 /// lattice-sql returned an application-level error in its response.
60 ///
61 /// This includes SQL parse errors, missing tables, constraint violations,
62 /// and any other error the service returns in `{"error":"..."}`.
63 Db(String),
64 /// JSON serialisation / deserialisation error.
65 Json(String),
66 /// A typed helper (`query`, `exec`, `ddl`) received the wrong response shape.
67 ///
68 /// For example, calling [`LatticeSql::query`] with an INSERT statement.
69 WrongResultType(String),
70}
71
72impl std::fmt::Display for Error {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 match self {
75 Error::Nats(e) => write!(f, "nats: {e}"),
76 Error::Db(e) => write!(f, "lattice-sql: {e}"),
77 Error::Json(e) => write!(f, "json: {e}"),
78 Error::WrongResultType(e) => write!(f, "wrong result type: {e}"),
79 }
80 }
81}
82
83impl std::error::Error for Error {}
84
85impl From<nats_wasi::Error> for Error {
86 fn from(e: nats_wasi::Error) -> Self {
87 Error::Nats(e)
88 }
89}
90
91// ── Public types ───────────────────────────────────────────────────
92
93/// The result of a SELECT query.
94///
95/// Rows are ordered as returned by the engine. Each row is a `Vec` of
96/// [`serde_json::Value`] aligned with [`columns`](QueryResult::columns).
97#[derive(Debug, Clone)]
98pub struct QueryResult {
99 /// Column names in projection order.
100 pub columns: Vec<String>,
101 /// Rows as arrays of JSON values, one per projected column.
102 pub rows: Vec<Vec<serde_json::Value>>,
103}
104
105impl QueryResult {
106 /// Number of rows in the result set.
107 pub fn row_count(&self) -> usize {
108 self.rows.len()
109 }
110
111 /// Find the zero-based index of a column by name.
112 /// Returns `None` if the column is not in the projection.
113 pub fn col_index(&self, name: &str) -> Option<usize> {
114 self.columns.iter().position(|c| c == name)
115 }
116
117 /// Get a single cell value by row index and column name.
118 ///
119 /// Returns `None` if the row index is out of bounds or the column name
120 /// is not in the projection.
121 pub fn cell(&self, row: usize, col: &str) -> Option<&serde_json::Value> {
122 let col_idx = self.col_index(col)?;
123 self.rows.get(row)?.get(col_idx)
124 }
125
126 /// Deserialise every row into a typed Rust struct.
127 ///
128 /// Each row is first converted to a JSON object keyed by column name,
129 /// then deserialized via `serde_json`. Useful when the columns match a
130 /// `#[derive(Deserialize)]` struct.
131 ///
132 /// ```rust,ignore
133 /// #[derive(serde::Deserialize)]
134 /// struct User { id: String, name: String, age: i64 }
135 ///
136 /// let users: Vec<User> = result.deserialize_rows().map_err(|e| Error::Json(e))?;
137 /// ```
138 pub fn deserialize_rows<T: DeserializeOwned>(&self) -> Result<Vec<T>, String> {
139 self.rows
140 .iter()
141 .map(|row| {
142 let mut map = serde_json::Map::new();
143 for (i, col) in self.columns.iter().enumerate() {
144 map.insert(
145 col.clone(),
146 row.get(i).cloned().unwrap_or(serde_json::Value::Null),
147 );
148 }
149 serde_json::from_value(serde_json::Value::Object(map))
150 .map_err(|e| e.to_string())
151 })
152 .collect()
153 }
154}
155
156/// The auto-detected result of any SQL statement sent via [`LatticeSql::sql`].
157#[derive(Debug, Clone)]
158pub enum SqlResult {
159 /// A SELECT query returned columnar data.
160 Query(QueryResult),
161 /// An INSERT, UPDATE, DELETE, or DDL statement completed.
162 Exec {
163 /// Rows modified. Zero for DDL statements.
164 affected_rows: u64,
165 },
166}
167
168impl SqlResult {
169 /// Unwrap as a [`QueryResult`], returning [`Error::WrongResultType`] if
170 /// the statement was DML/DDL.
171 pub fn into_query(self) -> Result<QueryResult, Error> {
172 match self {
173 SqlResult::Query(r) => Ok(r),
174 SqlResult::Exec { .. } => Err(Error::WrongResultType(
175 "expected SELECT result but got exec/DDL result".into(),
176 )),
177 }
178 }
179
180 /// Unwrap `affected_rows`, returning [`Error::WrongResultType`] if the
181 /// statement was a SELECT.
182 pub fn into_affected_rows(self) -> Result<u64, Error> {
183 match self {
184 SqlResult::Exec { affected_rows } => Ok(affected_rows),
185 SqlResult::Query(_) => Err(Error::WrongResultType(
186 "expected exec/DDL result but got SELECT result".into(),
187 )),
188 }
189 }
190
191 /// Returns `true` if this is a SELECT result.
192 pub fn is_query(&self) -> bool {
193 matches!(self, SqlResult::Query(_))
194 }
195
196 /// Returns `true` if this is a DML or DDL result.
197 pub fn is_exec(&self) -> bool {
198 matches!(self, SqlResult::Exec { .. })
199 }
200}
201
202// ── Wire types (private) ───────────────────────────────────────────
203
204/// Outgoing request payload.
205#[derive(Serialize)]
206struct SqlReq<'a> {
207 sql: &'a str,
208 #[serde(rename = "_auth", skip_serializing_if = "Option::is_none")]
209 auth: Option<&'a str>,
210}
211
212/// Unified incoming response.
213///
214/// lattice-sql sends one of three JSON shapes depending on what the SQL did:
215/// - `{"columns":[…],"rows":[[…],…]}` — SELECT result
216/// - `{"affected_rows":N}` — DML/DDL result
217/// - `{"error":"…"}` — any error
218///
219/// We deserialise into this single struct first, then dispatch.
220#[derive(Deserialize)]
221struct AnyResp {
222 columns: Option<Vec<String>>,
223 rows: Option<Vec<Vec<serde_json::Value>>>,
224 affected_rows: Option<u64>,
225 error: Option<String>,
226}
227
228// ── Client ─────────────────────────────────────────────────────────
229
230/// Typed client for the lattice-sql SQL frontend.
231///
232/// All SQL statements are forwarded to [`SUBJECT`](LatticeSql::SUBJECT) as
233/// `{"sql":"…"}` JSON payloads. The response is parsed and returned as a
234/// strongly typed Rust value.
235///
236/// # Requirements
237///
238/// - A running NATS server (JetStream not required for SQL queries)
239/// - `storage-service` running and listening on `ldb.*`
240/// - `lattice-sql` running and subscribed to `ldb.sql.>`
241///
242/// # Example
243///
244/// ```rust,no_run
245/// use lattice_sql_client::LatticeSql;
246/// use nats_wasip3::client::{Client, ConnectConfig};
247///
248/// # async fn example() -> Result<(), lattice_sql_client::Error> {
249/// let client = Client::connect(ConnectConfig::default()).await?;
250/// let db = LatticeSql::new(client);
251/// db.ddl("CREATE TABLE items (id TEXT PRIMARY KEY, label TEXT)").await?;
252/// # Ok(())
253/// # }
254/// ```
255pub struct LatticeSql {
256 client: Client,
257 timeout: Duration,
258 /// Value sent as `_auth` on every request. Must match `LDB_AUTH_TOKEN`.
259 auth_token: Option<String>,
260}
261
262impl LatticeSql {
263 /// NATS subject that lattice-sql subscribes to.
264 ///
265 /// The service accepts any subject matching `ldb.sql.>` and treats them
266 /// all identically. `ldb.sql.query` is used by convention.
267 pub const SUBJECT: &'static str = "ldb.sql.query";
268
269 /// Create a new client with the default 10-second timeout.
270 ///
271 /// The timeout is higher than `LatticeDb`'s default (5s) because a single
272 /// SQL query may require several round-trips to the storage service.
273 pub fn new(client: Client) -> Self {
274 Self { client, timeout: secs(10), auth_token: None }
275 }
276
277 /// Create a new client with a custom timeout (nanoseconds).
278 pub fn with_timeout(client: Client, timeout: Duration) -> Self {
279 Self { client, timeout, auth_token: None }
280 }
281
282 /// Attach a shared auth token. Sent as `_auth` in every request.
283 ///
284 /// Required when the server is configured with `LDB_AUTH_TOKEN`.
285 pub fn with_auth(mut self, token: impl Into<String>) -> Self {
286 self.auth_token = Some(token.into());
287 self
288 }
289
290 // ── Typed shortcuts ────────────────────────────────────────
291
292 /// Execute a SELECT statement and return [`QueryResult`].
293 ///
294 /// Returns [`Error::WrongResultType`] if the SQL is not a SELECT.
295 pub async fn query(&self, sql: &str) -> Result<QueryResult, Error> {
296 self.sql(sql).await?.into_query()
297 }
298
299 /// Execute an INSERT, UPDATE, or DELETE and return the affected row count.
300 ///
301 /// Returns [`Error::WrongResultType`] if the SQL is a SELECT.
302 pub async fn exec(&self, sql: &str) -> Result<u64, Error> {
303 self.sql(sql).await?.into_affected_rows()
304 }
305
306 /// Execute a DDL statement (CREATE TABLE, DROP TABLE, CREATE INDEX, …).
307 ///
308 /// DDL always reports zero affected rows, so the count is discarded.
309 /// Returns `()` on success, [`Error::Db`] if the DDL fails.
310 pub async fn ddl(&self, sql: &str) -> Result<(), Error> {
311 self.exec(sql).await?;
312 Ok(())
313 }
314
315 // ── Auto-detect ────────────────────────────────────────────
316
317 /// Send any SQL statement and return an auto-detected [`SqlResult`].
318 ///
319 /// Response shapes:
320 /// - `{"columns":[…],"rows":[[…],…]}` → [`SqlResult::Query`]
321 /// - `{"affected_rows":N}` → [`SqlResult::Exec`]
322 /// - `{"error":"…"}` → [`Error::Db`]
323 pub async fn sql(&self, sql: &str) -> Result<SqlResult, Error> {
324 let resp: AnyResp = self.send(sql).await?;
325
326 if let Some(msg) = resp.error {
327 return Err(Error::Db(msg));
328 }
329
330 // SELECT responses always carry a `columns` array.
331 if let Some(columns) = resp.columns {
332 return Ok(SqlResult::Query(QueryResult {
333 columns,
334 rows: resp.rows.unwrap_or_default(),
335 }));
336 }
337
338 // DML / DDL responses carry `affected_rows` (may be 0 for DDL).
339 Ok(SqlResult::Exec {
340 affected_rows: resp.affected_rows.unwrap_or(0),
341 })
342 }
343
344 // ── Internal ───────────────────────────────────────────────
345
346 async fn send<R: DeserializeOwned>(&self, sql: &str) -> Result<R, Error> {
347 let body =
348 serde_json::to_vec(&SqlReq { sql, auth: self.auth_token.as_deref() })
349 .map_err(|e| Error::Json(e.to_string()))?;
350 let reply = self
351 .client
352 .request(Self::SUBJECT, &body, self.timeout)
353 .await?;
354 serde_json::from_slice(&reply.payload).map_err(|e| Error::Json(e.to_string()))
355 }
356}