Skip to main content

lattice_sql_client/
lib.rs

1//! # lattice-sql-client
2//!
3//! Typed Rust SDK for [lattice-sql](https://github.com/Taika-3D-Oy/lattice-db) —
4//! the SQL frontend for lattice-db.
5//!
6//! Sends SQL strings to the `ldb.sql.query` NATS subject and maps the JSON
7//! responses back to strongly-typed Rust values.
8//!
9//! ## Quick start
10//!
11//! ```rust,no_run
12//! use nats_wasip3::client::{Client, ConnectConfig};
13//! use lattice_sql_client::LatticeSql;
14//!
15//! # async fn example() -> Result<(), lattice_sql_client::Error> {
16//! let client = Client::connect(ConnectConfig::default()).await?;
17//! let db = LatticeSql::new(client)
18//!     .with_auth("my-token"); // matches LDB_AUTH_TOKEN on the server
19//!
20//! // DDL — create a table
21//! db.ddl("CREATE TABLE users (id TEXT PRIMARY KEY, name TEXT NOT NULL, age INTEGER)").await?;
22//!
23//! // DML — insert rows
24//! let affected = db.exec("INSERT INTO users (id, name, age) VALUES ('alice', 'Alice', 30)").await?;
25//! assert_eq!(affected, 1);
26//!
27//! // Query — SELECT with filters, sorting, pagination
28//! let result = db.query("SELECT * FROM users WHERE age >= 25 ORDER BY name ASC LIMIT 10").await?;
29//! println!("columns: {:?}", result.columns);
30//! for row in &result.rows {
31//!     println!("{:?}", row);
32//! }
33//!
34//! // Lookup a cell by column name
35//! if let Some(name) = result.cell(0, "name") {
36//!     println!("first row name: {name}");
37//! }
38//!
39//! // Aggregates
40//! let agg = db.query("SELECT COUNT(*), SUM(age) FROM users").await?;
41//! let count = &agg.rows[0][0]; // COUNT(*)
42//!
43//! // Cleanup
44//! db.ddl("DROP TABLE users").await?;
45//! # Ok(())
46//! # }
47//! ```
48
49use nats_wasi::client::{Client, Duration, secs};
50use serde::{Deserialize, Serialize, de::DeserializeOwned};
51
52// ── Error ──────────────────────────────────────────────────────────
53
54/// Errors returned by the lattice-sql client.
55#[derive(Debug)]
56pub enum Error {
57    /// NATS transport error (connection refused, timeout, etc.).
58    Nats(nats_wasi::Error),
59    /// lattice-sql returned an application-level error in its response.
60    ///
61    /// This includes SQL parse errors, missing tables, constraint violations,
62    /// and any other error the service returns in `{"error":"..."}`.
63    Db(String),
64    /// JSON serialisation / deserialisation error.
65    Json(String),
66    /// A typed helper (`query`, `exec`, `ddl`) received the wrong response shape.
67    ///
68    /// For example, calling [`LatticeSql::query`] with an INSERT statement.
69    WrongResultType(String),
70}
71
72impl std::fmt::Display for Error {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            Error::Nats(e) => write!(f, "nats: {e}"),
76            Error::Db(e) => write!(f, "lattice-sql: {e}"),
77            Error::Json(e) => write!(f, "json: {e}"),
78            Error::WrongResultType(e) => write!(f, "wrong result type: {e}"),
79        }
80    }
81}
82
83impl std::error::Error for Error {}
84
85impl From<nats_wasi::Error> for Error {
86    fn from(e: nats_wasi::Error) -> Self {
87        Error::Nats(e)
88    }
89}
90
91// ── Public types ───────────────────────────────────────────────────
92
93/// The result of a SELECT query.
94///
95/// Rows are ordered as returned by the engine. Each row is a `Vec` of
96/// [`serde_json::Value`] aligned with [`columns`](QueryResult::columns).
97#[derive(Debug, Clone)]
98pub struct QueryResult {
99    /// Column names in projection order.
100    pub columns: Vec<String>,
101    /// Rows as arrays of JSON values, one per projected column.
102    pub rows: Vec<Vec<serde_json::Value>>,
103}
104
105impl QueryResult {
106    /// Number of rows in the result set.
107    pub fn row_count(&self) -> usize {
108        self.rows.len()
109    }
110
111    /// Find the zero-based index of a column by name.
112    /// Returns `None` if the column is not in the projection.
113    pub fn col_index(&self, name: &str) -> Option<usize> {
114        self.columns.iter().position(|c| c == name)
115    }
116
117    /// Get a single cell value by row index and column name.
118    ///
119    /// Returns `None` if the row index is out of bounds or the column name
120    /// is not in the projection.
121    pub fn cell(&self, row: usize, col: &str) -> Option<&serde_json::Value> {
122        let col_idx = self.col_index(col)?;
123        self.rows.get(row)?.get(col_idx)
124    }
125
126    /// Deserialise every row into a typed Rust struct.
127    ///
128    /// Each row is first converted to a JSON object keyed by column name,
129    /// then deserialized via `serde_json`. Useful when the columns match a
130    /// `#[derive(Deserialize)]` struct.
131    ///
132    /// ```rust,ignore
133    /// #[derive(serde::Deserialize)]
134    /// struct User { id: String, name: String, age: i64 }
135    ///
136    /// let users: Vec<User> = result.deserialize_rows().map_err(|e| Error::Json(e))?;
137    /// ```
138    pub fn deserialize_rows<T: DeserializeOwned>(&self) -> Result<Vec<T>, String> {
139        self.rows
140            .iter()
141            .map(|row| {
142                let mut map = serde_json::Map::new();
143                for (i, col) in self.columns.iter().enumerate() {
144                    map.insert(
145                        col.clone(),
146                        row.get(i).cloned().unwrap_or(serde_json::Value::Null),
147                    );
148                }
149                serde_json::from_value(serde_json::Value::Object(map))
150                    .map_err(|e| e.to_string())
151            })
152            .collect()
153    }
154}
155
156/// The auto-detected result of any SQL statement sent via [`LatticeSql::sql`].
157#[derive(Debug, Clone)]
158pub enum SqlResult {
159    /// A SELECT query returned columnar data.
160    Query(QueryResult),
161    /// An INSERT, UPDATE, DELETE, or DDL statement completed.
162    Exec {
163        /// Rows modified. Zero for DDL statements.
164        affected_rows: u64,
165    },
166}
167
168impl SqlResult {
169    /// Unwrap as a [`QueryResult`], returning [`Error::WrongResultType`] if
170    /// the statement was DML/DDL.
171    pub fn into_query(self) -> Result<QueryResult, Error> {
172        match self {
173            SqlResult::Query(r) => Ok(r),
174            SqlResult::Exec { .. } => Err(Error::WrongResultType(
175                "expected SELECT result but got exec/DDL result".into(),
176            )),
177        }
178    }
179
180    /// Unwrap `affected_rows`, returning [`Error::WrongResultType`] if the
181    /// statement was a SELECT.
182    pub fn into_affected_rows(self) -> Result<u64, Error> {
183        match self {
184            SqlResult::Exec { affected_rows } => Ok(affected_rows),
185            SqlResult::Query(_) => Err(Error::WrongResultType(
186                "expected exec/DDL result but got SELECT result".into(),
187            )),
188        }
189    }
190
191    /// Returns `true` if this is a SELECT result.
192    pub fn is_query(&self) -> bool {
193        matches!(self, SqlResult::Query(_))
194    }
195
196    /// Returns `true` if this is a DML or DDL result.
197    pub fn is_exec(&self) -> bool {
198        matches!(self, SqlResult::Exec { .. })
199    }
200}
201
202// ── Wire types (private) ───────────────────────────────────────────
203
204/// Outgoing request payload.
205#[derive(Serialize)]
206struct SqlReq<'a> {
207    sql: &'a str,
208    #[serde(rename = "_auth", skip_serializing_if = "Option::is_none")]
209    auth: Option<&'a str>,
210}
211
212/// Unified incoming response.
213///
214/// lattice-sql sends one of three JSON shapes depending on what the SQL did:
215/// - `{"columns":[…],"rows":[[…],…]}` — SELECT result
216/// - `{"affected_rows":N}` — DML/DDL result
217/// - `{"error":"…"}` — any error
218///
219/// We deserialise into this single struct first, then dispatch.
220#[derive(Deserialize)]
221struct AnyResp {
222    columns: Option<Vec<String>>,
223    rows: Option<Vec<Vec<serde_json::Value>>>,
224    affected_rows: Option<u64>,
225    error: Option<String>,
226}
227
228// ── Client ─────────────────────────────────────────────────────────
229
230/// Typed client for the lattice-sql SQL frontend.
231///
232/// All SQL statements are forwarded to [`SUBJECT`](LatticeSql::SUBJECT) as
233/// `{"sql":"…"}` JSON payloads. The response is parsed and returned as a
234/// strongly typed Rust value.
235///
236/// # Requirements
237///
238/// - A running NATS server (JetStream not required for SQL queries)
239/// - `storage-service` running and listening on `ldb.*`
240/// - `lattice-sql` running and subscribed to `ldb.sql.>`
241///
242/// # Example
243///
244/// ```rust,no_run
245/// use lattice_sql_client::LatticeSql;
246/// use nats_wasip3::client::{Client, ConnectConfig};
247///
248/// # async fn example() -> Result<(), lattice_sql_client::Error> {
249/// let client = Client::connect(ConnectConfig::default()).await?;
250/// let db = LatticeSql::new(client);
251/// db.ddl("CREATE TABLE items (id TEXT PRIMARY KEY, label TEXT)").await?;
252/// # Ok(())
253/// # }
254/// ```
255pub struct LatticeSql {
256    client: Client,
257    timeout: Duration,
258    /// Value sent as `_auth` on every request. Must match `LDB_AUTH_TOKEN`.
259    auth_token: Option<String>,
260}
261
262impl LatticeSql {
263    /// NATS subject that lattice-sql subscribes to.
264    ///
265    /// The service accepts any subject matching `ldb.sql.>` and treats them
266    /// all identically. `ldb.sql.query` is used by convention.
267    pub const SUBJECT: &'static str = "ldb.sql.query";
268
269    /// Create a new client with the default 10-second timeout.
270    ///
271    /// The timeout is higher than `LatticeDb`'s default (5s) because a single
272    /// SQL query may require several round-trips to the storage service.
273    pub fn new(client: Client) -> Self {
274        Self { client, timeout: secs(10), auth_token: None }
275    }
276
277    /// Create a new client with a custom timeout (nanoseconds).
278    pub fn with_timeout(client: Client, timeout: Duration) -> Self {
279        Self { client, timeout, auth_token: None }
280    }
281
282    /// Attach a shared auth token. Sent as `_auth` in every request.
283    ///
284    /// Required when the server is configured with `LDB_AUTH_TOKEN`.
285    pub fn with_auth(mut self, token: impl Into<String>) -> Self {
286        self.auth_token = Some(token.into());
287        self
288    }
289
290    // ── Typed shortcuts ────────────────────────────────────────
291
292    /// Execute a SELECT statement and return [`QueryResult`].
293    ///
294    /// Returns [`Error::WrongResultType`] if the SQL is not a SELECT.
295    pub async fn query(&self, sql: &str) -> Result<QueryResult, Error> {
296        self.sql(sql).await?.into_query()
297    }
298
299    /// Execute an INSERT, UPDATE, or DELETE and return the affected row count.
300    ///
301    /// Returns [`Error::WrongResultType`] if the SQL is a SELECT.
302    pub async fn exec(&self, sql: &str) -> Result<u64, Error> {
303        self.sql(sql).await?.into_affected_rows()
304    }
305
306    /// Execute a DDL statement (CREATE TABLE, DROP TABLE, CREATE INDEX, …).
307    ///
308    /// DDL always reports zero affected rows, so the count is discarded.
309    /// Returns `()` on success, [`Error::Db`] if the DDL fails.
310    pub async fn ddl(&self, sql: &str) -> Result<(), Error> {
311        self.exec(sql).await?;
312        Ok(())
313    }
314
315    // ── Auto-detect ────────────────────────────────────────────
316
317    /// Send any SQL statement and return an auto-detected [`SqlResult`].
318    ///
319    /// Response shapes:
320    /// - `{"columns":[…],"rows":[[…],…]}` → [`SqlResult::Query`]
321    /// - `{"affected_rows":N}`             → [`SqlResult::Exec`]
322    /// - `{"error":"…"}`                   → [`Error::Db`]
323    pub async fn sql(&self, sql: &str) -> Result<SqlResult, Error> {
324        let resp: AnyResp = self.send(sql).await?;
325
326        if let Some(msg) = resp.error {
327            return Err(Error::Db(msg));
328        }
329
330        // SELECT responses always carry a `columns` array.
331        if let Some(columns) = resp.columns {
332            return Ok(SqlResult::Query(QueryResult {
333                columns,
334                rows: resp.rows.unwrap_or_default(),
335            }));
336        }
337
338        // DML / DDL responses carry `affected_rows` (may be 0 for DDL).
339        Ok(SqlResult::Exec {
340            affected_rows: resp.affected_rows.unwrap_or(0),
341        })
342    }
343
344    // ── Internal ───────────────────────────────────────────────
345
346    async fn send<R: DeserializeOwned>(&self, sql: &str) -> Result<R, Error> {
347        let body =
348            serde_json::to_vec(&SqlReq { sql, auth: self.auth_token.as_deref() })
349                .map_err(|e| Error::Json(e.to_string()))?;
350        let reply = self
351            .client
352            .request(Self::SUBJECT, &body, self.timeout)
353            .await?;
354        serde_json::from_slice(&reply.payload).map_err(|e| Error::Json(e.to_string()))
355    }
356}