Skip to main content

mtgjson_sdk/
connection.rs

1//! DuckDB connection wrapper with view registration and query execution.
2//!
3//! Uses schema introspection to adapt views dynamically:
4//! - CSV VARCHAR columns are auto-detected and converted to arrays
5//! - Wide-format legalities are auto-UNPIVOTed to (uuid, format, status) rows
6
7use crate::cache::CacheManager;
8use crate::error::Result;
9use duckdb::{types::ValueRef, Connection as DuckDbConnection};
10use serde::de::DeserializeOwned;
11use std::cell::RefCell;
12use std::collections::{HashMap, HashSet};
13
14/// Known list columns that don't follow the plural naming convention
15/// (e.g. colorIdentity, availability, producedMana). Always converted
16/// to arrays regardless of heuristic detection.
17fn static_list_columns() -> HashMap<&'static str, HashSet<&'static str>> {
18    let mut map = HashMap::new();
19    map.insert(
20        "cards",
21        HashSet::from([
22            "artistIds",
23            "attractionLights",
24            "availability",
25            "boosterTypes",
26            "cardParts",
27            "colorIdentity",
28            "colorIndicator",
29            "colors",
30            "finishes",
31            "frameEffects",
32            "keywords",
33            "originalPrintings",
34            "otherFaceIds",
35            "printings",
36            "producedMana",
37            "promoTypes",
38            "rebalancedPrintings",
39            "subsets",
40            "subtypes",
41            "supertypes",
42            "types",
43            "variations",
44        ]),
45    );
46    map.insert(
47        "tokens",
48        HashSet::from([
49            "artistIds",
50            "availability",
51            "boosterTypes",
52            "colorIdentity",
53            "colorIndicator",
54            "colors",
55            "finishes",
56            "frameEffects",
57            "keywords",
58            "otherFaceIds",
59            "producedMana",
60            "promoTypes",
61            "reverseRelated",
62            "subtypes",
63            "supertypes",
64            "types",
65        ]),
66    );
67    map
68}
69
70/// VARCHAR columns that are definitely NOT lists, even if they match the
71/// plural-name heuristic. Prevents splitting text fields that contain commas,
72/// JSON struct fields, and other scalar strings.
73fn ignored_columns() -> HashSet<&'static str> {
74    HashSet::from([
75        "text",
76        "originalText",
77        "flavorText",
78        "printedText",
79        "identifiers",
80        "legalities",
81        "leadershipSkills",
82        "purchaseUrls",
83        "relatedCards",
84        "rulings",
85        "sourceProducts",
86        "foreignData",
87        "translations",
88        "toughness",
89        "status",
90        "format",
91        "uris",
92        "scryfallUri",
93    ])
94}
95
96/// VARCHAR columns containing JSON strings that should be cast to DuckDB's
97/// JSON type. This enables SQL operators like ->>, json_extract(), etc.
98fn json_cast_columns() -> HashSet<&'static str> {
99    HashSet::from([
100        "identifiers",
101        "legalities",
102        "leadershipSkills",
103        "purchaseUrls",
104        "relatedCards",
105        "rulings",
106        "sourceProducts",
107        "foreignData",
108        "translations",
109    ])
110}
111
112/// Wraps a DuckDB connection and registers parquet files as views.
113///
114/// Uses schema introspection to adapt views dynamically:
115/// - CSV VARCHAR columns are auto-detected and converted to arrays
116/// - Wide-format legalities are auto-UNPIVOTed to (uuid, format, status) rows
117pub struct Connection {
118    conn: DuckDbConnection,
119    /// The cache manager used to download/locate data files.
120    pub cache: RefCell<CacheManager>,
121    registered_views: RefCell<HashSet<String>>,
122}
123
124impl Connection {
125    /// Create a connection backed by the given cache.
126    ///
127    /// Opens an in-memory DuckDB database.
128    pub fn new(cache: CacheManager) -> Result<Self> {
129        let conn = DuckDbConnection::open_in_memory()?;
130        Ok(Self {
131            conn,
132            cache: RefCell::new(cache),
133            registered_views: RefCell::new(HashSet::new()),
134        })
135    }
136
137    /// Ensure one or more views are registered, downloading data if needed.
138    pub fn ensure_views(&self, views: &[&str]) -> Result<()> {
139        for name in views {
140            if !self.registered_views.borrow().contains(*name) {
141                self.ensure_view(name)?;
142            }
143        }
144        Ok(())
145    }
146
147    /// Execute SQL and return results as a `Vec` of `HashMap`s.
148    ///
149    /// Each row is represented as a `HashMap<String, serde_json::Value>`.
150    /// Automatically converts DuckDB types to `serde_json::Value`.
151    pub fn execute(
152        &self,
153        sql: &str,
154        params: &[String],
155    ) -> Result<Vec<HashMap<String, serde_json::Value>>> {
156        let mut stmt = self.conn.prepare(sql)?;
157
158        let param_values: Vec<&dyn duckdb::ToSql> = params
159            .iter()
160            .map(|p| p as &dyn duckdb::ToSql)
161            .collect();
162
163        let mut rows_result = stmt.query(param_values.as_slice())?;
164
165        // Get column metadata AFTER query execution (calling before panics in duckdb-rs)
166        let column_names: Vec<String> = rows_result
167            .as_ref()
168            .unwrap()
169            .column_names()
170            .into_iter()
171            .map(|s| s.to_string())
172            .collect();
173        let column_count = rows_result.as_ref().unwrap().column_count();
174
175        let mut out: Vec<HashMap<String, serde_json::Value>> = Vec::new();
176
177        while let Some(row) = rows_result.next()? {
178            let mut map = HashMap::new();
179            for i in 0..column_count {
180                let col_name = &column_names[i];
181                let value = convert_value_ref(row.get_ref(i)?);
182                map.insert(col_name.clone(), value);
183            }
184            out.push(map);
185        }
186
187        Ok(out)
188    }
189
190    /// Execute SQL and deserialize each row into type `T`.
191    ///
192    /// First executes the query as `HashMap` rows, then deserializes each
193    /// row using `serde_json`.
194    pub fn execute_into<T: DeserializeOwned>(
195        &self,
196        sql: &str,
197        params: &[String],
198    ) -> Result<Vec<T>> {
199        let rows = self.execute(sql, params)?;
200        let mut results = Vec::with_capacity(rows.len());
201        for row in rows {
202            let value = serde_json::Value::Object(
203                row.into_iter().collect::<serde_json::Map<String, serde_json::Value>>(),
204            );
205            let item: T = serde_json::from_value(value)?;
206            results.push(item);
207        }
208        Ok(results)
209    }
210
211    /// Execute SQL and return the first column of the first row.
212    ///
213    /// Returns `None` if the result set is empty.
214    pub fn execute_scalar(
215        &self,
216        sql: &str,
217        params: &[String],
218    ) -> Result<Option<serde_json::Value>> {
219        let mut stmt = self.conn.prepare(sql)?;
220        let param_values: Vec<&dyn duckdb::ToSql> = params
221            .iter()
222            .map(|p| p as &dyn duckdb::ToSql)
223            .collect();
224
225        let mut rows = stmt.query(param_values.as_slice())?;
226
227        if let Some(row) = rows.next()? {
228            let value = convert_value_ref(row.get_ref(0)?);
229            Ok(Some(value))
230        } else {
231            Ok(None)
232        }
233    }
234
235    /// Create a DuckDB table from a newline-delimited JSON file.
236    ///
237    /// More memory-efficient than loading data into a Rust structure first,
238    /// since data is streamed from disk by DuckDB.
239    pub fn register_table_from_ndjson(
240        &self,
241        table_name: &str,
242        ndjson_path: &str,
243    ) -> Result<()> {
244        let path_fwd = ndjson_path.replace('\\', "/");
245        self.conn.execute_batch(&format!(
246            "DROP TABLE IF EXISTS {}; \
247             CREATE TABLE {} AS SELECT * FROM read_json_auto('{}', format='newline_delimited')",
248            table_name, table_name, path_fwd
249        ))?;
250        self.registered_views.borrow_mut().insert(table_name.to_string());
251        Ok(())
252    }
253
254    /// Check whether a view has been registered.
255    pub fn has_view(&self, name: &str) -> bool {
256        self.registered_views.borrow().contains(name)
257    }
258
259    /// Return a list of all registered view names.
260    pub fn views(&self) -> Vec<String> {
261        self.registered_views.borrow().iter().cloned().collect()
262    }
263
264    /// Clear all registered views so they will be re-created on next access.
265    pub fn reset_views(&self) {
266        self.registered_views.borrow_mut().clear();
267    }
268
269    /// Access the underlying DuckDB connection for advanced usage.
270    pub fn raw(&self) -> &DuckDbConnection {
271        &self.conn
272    }
273
274    /// Export the in-memory DuckDB database to a directory on disk.
275    ///
276    /// Uses DuckDB's `EXPORT DATABASE` command.
277    pub fn export_db(&self, path: &std::path::Path) -> crate::error::Result<std::path::PathBuf> {
278        let path_str = path.to_string_lossy().replace('\\', "/");
279        self.conn
280            .execute_batch(&format!("EXPORT DATABASE '{}'", path_str))?;
281        Ok(path.to_path_buf())
282    }
283
284    /// Execute SQL and return the result as a Polars DataFrame.
285    ///
286    /// Requires the `polars` cargo feature.
287    #[cfg(feature = "polars")]
288    pub fn execute_df(
289        &self,
290        sql: &str,
291        params: &[String],
292    ) -> crate::error::Result<polars::frame::DataFrame> {
293        use polars::prelude::*;
294
295        let mut stmt = self.conn.prepare(sql)?;
296
297        let param_values: Vec<&dyn duckdb::ToSql> = params
298            .iter()
299            .map(|p| p as &dyn duckdb::ToSql)
300            .collect();
301
302        let polars_iter = stmt.query_polars(param_values.as_slice())?;
303        let frames: Vec<DataFrame> = polars_iter.collect();
304
305        if frames.is_empty() {
306            Ok(DataFrame::empty())
307        } else if frames.len() == 1 {
308            Ok(frames.into_iter().next().unwrap())
309        } else {
310            // Vertically concatenate all chunks
311            let mut result = frames[0].clone();
312            for frame in &frames[1..] {
313                result = result.vstack(frame).map_err(|e| {
314                    crate::error::MtgjsonError::Other(format!("Polars vstack failed: {}", e))
315                })?;
316            }
317            Ok(result)
318        }
319    }
320
321    /// Lazily register a parquet file as a DuckDB view.
322    ///
323    /// Introspects the parquet schema on first registration and builds
324    /// the view SQL dynamically, so the SDK adapts to upstream schema
325    /// changes without code updates.
326    fn ensure_view(&self, view_name: &str) -> Result<()> {
327        if self.registered_views.borrow().contains(view_name) {
328            return Ok(());
329        }
330
331        let path = self.cache.borrow_mut().ensure_parquet(view_name)?;
332        // Use forward slashes for DuckDB compatibility
333        let path_str = path.to_string_lossy().replace('\\', "/");
334
335        if view_name == "card_legalities" {
336            self.register_legalities_view(&path_str)?;
337            return Ok(());
338        }
339
340        // Hybrid CSV->array detection: static baseline + dynamic heuristic
341        let replace_clause = self.build_csv_replace(&path_str, view_name)?;
342
343        let view_sql = format!(
344            "CREATE OR REPLACE VIEW {} AS SELECT *{} FROM read_parquet('{}')",
345            view_name, replace_clause, path_str
346        );
347        self.conn.execute_batch(&view_sql)?;
348        self.registered_views.borrow_mut().insert(view_name.to_string());
349        eprintln!("Registered view: {} -> {}", view_name, path_str);
350
351        Ok(())
352    }
353
354    /// Build a REPLACE clause using a hybrid static + dynamic approach.
355    ///
356    /// Four layers:
357    /// 1. Static baseline: known non-plural list columns
358    /// 2. Dynamic heuristic: VARCHAR columns ending in 's' are likely lists
359    /// 3. Safety blocklist: prevents splitting text fields and JSON structs
360    /// 4. JSON casting: struct-like VARCHAR columns cast to DuckDB JSON type
361    ///
362    /// Only reads the parquet footer (DESCRIBE) -- no data scanning needed.
363    fn build_csv_replace(&self, path_str: &str, view_name: &str) -> Result<String> {
364        let mut stmt = self.conn.prepare(&format!(
365            "SELECT column_name, column_type FROM \
366             (DESCRIBE SELECT * FROM read_parquet('{}'))",
367            path_str
368        ))?;
369
370        let mut rows = stmt.query([])?;
371        let mut schema: Vec<(String, String)> = Vec::new();
372        let mut schema_map: HashMap<String, String> = HashMap::new();
373
374        while let Some(row) = rows.next()? {
375            let col_name: String = row.get(0)?;
376            let col_type: String = row.get(1)?;
377            schema_map.insert(col_name.clone(), col_type.clone());
378            schema.push((col_name, col_type));
379        }
380
381        let static_lists = static_list_columns();
382        let ignored = ignored_columns();
383        let json_cast = json_cast_columns();
384
385        // Build candidate set from both layers
386        let mut candidates: HashSet<String> = HashSet::new();
387
388        // Layer 1: Static baseline (the "knowns")
389        if let Some(static_cols) = static_lists.get(view_name) {
390            for col in static_cols {
391                candidates.insert(col.to_string());
392            }
393        }
394
395        // Layer 2: Dynamic heuristic (the "unknowns")
396        for (col, dtype) in &schema {
397            if dtype != "VARCHAR" {
398                continue;
399            }
400            if ignored.contains(col.as_str()) {
401                continue;
402            }
403            if col.ends_with('s') {
404                candidates.insert(col.clone());
405            }
406        }
407
408        // Filter to columns that actually exist as VARCHAR in this file
409        let mut final_cols: Vec<String> = candidates
410            .into_iter()
411            .filter(|col| schema_map.get(col).map(|t| t == "VARCHAR").unwrap_or(false))
412            .collect();
413        final_cols.sort();
414
415        let mut exprs: Vec<String> = Vec::new();
416
417        for col in &final_cols {
418            exprs.push(format!(
419                "CASE WHEN \"{}\" IS NULL OR TRIM(\"{}\") = '' \
420                 THEN []::VARCHAR[] \
421                 ELSE string_split(\"{}\", ', ') END AS \"{}\"",
422                col, col, col, col
423            ));
424        }
425
426        // Layer 4: JSON casting for struct-like VARCHAR columns
427        let mut json_cols: Vec<&&str> = json_cast.iter().collect();
428        json_cols.sort();
429        for col in json_cols {
430            if schema_map.get(*col).map(|t| t == "VARCHAR").unwrap_or(false) {
431                exprs.push(format!("TRY_CAST(\"{}\" AS JSON) AS \"{}\"", col, col));
432            }
433        }
434
435        if exprs.is_empty() {
436            Ok(String::new())
437        } else {
438            Ok(format!(" REPLACE ({})", exprs.join(", ")))
439        }
440    }
441
442    /// Register card_legalities by dynamically UNPIVOTing wide format.
443    ///
444    /// Introspects the parquet schema and UNPIVOTs all columns except 'uuid'
445    /// into (uuid, format, status) rows. Automatically picks up new formats
446    /// (e.g. 'timeless', 'oathbreaker') as they appear in the data.
447    fn register_legalities_view(&self, path_str: &str) -> Result<()> {
448        let mut stmt = self.conn.prepare(&format!(
449            "SELECT column_name FROM \
450             (DESCRIBE SELECT * FROM read_parquet('{}'))",
451            path_str
452        ))?;
453
454        let mut rows = stmt.query([])?;
455        let mut all_cols: Vec<String> = Vec::new();
456
457        while let Some(row) = rows.next()? {
458            let col_name: String = row.get(0)?;
459            all_cols.push(col_name);
460        }
461
462        // Everything except 'uuid' is a format column
463        let format_cols: Vec<&String> = all_cols.iter().filter(|c| c.as_str() != "uuid").collect();
464
465        if format_cols.is_empty() {
466            // Fallback: assume row format (test data or different schema)
467            self.conn.execute_batch(&format!(
468                "CREATE OR REPLACE VIEW card_legalities AS \
469                 SELECT * FROM read_parquet('{}')",
470                path_str
471            ))?;
472        } else {
473            let cols_sql: String = format_cols
474                .iter()
475                .map(|c| format!("\"{}\"", c))
476                .collect::<Vec<_>>()
477                .join(", ");
478
479            self.conn.execute_batch(&format!(
480                "CREATE OR REPLACE VIEW card_legalities AS \
481                 SELECT uuid, format, status FROM (\
482                   UNPIVOT (SELECT * FROM read_parquet('{}'))\
483                   ON {}\
484                   INTO NAME format VALUE status\
485                 ) WHERE status IS NOT NULL",
486                path_str, cols_sql
487            ))?;
488        }
489
490        self.registered_views.borrow_mut().insert("card_legalities".to_string());
491        eprintln!(
492            "Registered legalities view (UNPIVOT {} formats): {}",
493            format_cols.len(),
494            path_str
495        );
496
497        Ok(())
498    }
499}
500
501/// Convert a DuckDB `ValueRef` to a `serde_json::Value`.
502fn convert_value_ref(val: ValueRef<'_>) -> serde_json::Value {
503    match val {
504        ValueRef::Null => serde_json::Value::Null,
505        ValueRef::Boolean(b) => serde_json::Value::Bool(b),
506        ValueRef::TinyInt(n) => serde_json::Value::Number(n.into()),
507        ValueRef::SmallInt(n) => serde_json::Value::Number(n.into()),
508        ValueRef::Int(n) => serde_json::Value::Number(n.into()),
509        ValueRef::BigInt(n) => serde_json::Value::Number(n.into()),
510        ValueRef::HugeInt(n) => {
511            // HugeInt may not fit in i64; try i64, fallback to string
512            if let Ok(i) = i64::try_from(n) {
513                serde_json::Value::Number(i.into())
514            } else {
515                serde_json::Value::String(n.to_string())
516            }
517        }
518        ValueRef::Float(f) => serde_json::Number::from_f64(f as f64)
519            .map(serde_json::Value::Number)
520            .unwrap_or(serde_json::Value::Null),
521        ValueRef::Double(f) => serde_json::Number::from_f64(f)
522            .map(serde_json::Value::Number)
523            .unwrap_or(serde_json::Value::Null),
524        ValueRef::Text(bytes) => {
525            let s = String::from_utf8_lossy(bytes).to_string();
526            // Try to parse as JSON if it looks like a JSON structure
527            serde_json::Value::String(s)
528        }
529        ValueRef::Blob(bytes) => {
530            // Encode blob as base64 or hex string
531            serde_json::Value::String(format!(
532                "blob:{}",
533                bytes.iter().map(|b| format!("{:02x}", b)).collect::<String>()
534            ))
535        }
536        // For complex types (List, Enum, Struct, Date, Time, Timestamp, etc.),
537        // convert through the owned Value type which handles all variants.
538        other => convert_owned_value(other.to_owned())
539    }
540}
541
542/// Convert an owned DuckDB `Value` to a `serde_json::Value`.
543/// Used for complex types (List, Enum, Struct, Date, Timestamp, etc.)
544/// that aren't directly handled by `convert_value_ref`.
545fn convert_owned_value(val: duckdb::types::Value) -> serde_json::Value {
546    use duckdb::types::Value as DV;
547    match val {
548        DV::Null => serde_json::Value::Null,
549        DV::Boolean(b) => serde_json::Value::Bool(b),
550        DV::TinyInt(n) => serde_json::Value::Number(n.into()),
551        DV::SmallInt(n) => serde_json::Value::Number(n.into()),
552        DV::Int(n) => serde_json::Value::Number(n.into()),
553        DV::BigInt(n) => serde_json::Value::Number(n.into()),
554        DV::HugeInt(n) => {
555            if let Ok(i) = i64::try_from(n) {
556                serde_json::Value::Number(i.into())
557            } else {
558                serde_json::Value::String(n.to_string())
559            }
560        }
561        DV::UTinyInt(n) => serde_json::Value::Number(n.into()),
562        DV::USmallInt(n) => serde_json::Value::Number(n.into()),
563        DV::UInt(n) => serde_json::Value::Number(n.into()),
564        DV::UBigInt(n) => serde_json::Value::Number(n.into()),
565        DV::Float(f) => serde_json::Number::from_f64(f as f64)
566            .map(serde_json::Value::Number)
567            .unwrap_or(serde_json::Value::Null),
568        DV::Double(f) => serde_json::Number::from_f64(f)
569            .map(serde_json::Value::Number)
570            .unwrap_or(serde_json::Value::Null),
571        DV::Decimal(d) => serde_json::Value::String(d.to_string()),
572        DV::Text(s) => serde_json::Value::String(s),
573        DV::Blob(b) => serde_json::Value::String(format!(
574            "blob:{}",
575            b.iter().map(|byte| format!("{:02x}", byte)).collect::<String>()
576        )),
577        DV::Enum(s) => serde_json::Value::String(s),
578        DV::Timestamp(_, micros) => {
579            // Convert microseconds since epoch to ISO-8601-ish string
580            let secs = micros / 1_000_000;
581            let remainder = (micros % 1_000_000).unsigned_abs();
582            serde_json::Value::String(format!("{}.{:06}", secs, remainder))
583        }
584        DV::Date32(days) => serde_json::Value::Number(days.into()),
585        DV::Time64(_, val) => serde_json::Value::Number(val.into()),
586        DV::Interval { months, days, nanos } => {
587            serde_json::Value::String(format!("{}m{}d{}ns", months, days, nanos))
588        }
589        DV::List(items) | DV::Array(items) => {
590            serde_json::Value::Array(items.into_iter().map(convert_owned_value).collect())
591        }
592        DV::Struct(map) => {
593            let obj: serde_json::Map<String, serde_json::Value> = map
594                .iter()
595                .map(|(k, v)| (k.clone(), convert_owned_value(v.clone())))
596                .collect();
597            serde_json::Value::Object(obj)
598        }
599        DV::Map(map) => {
600            let obj: serde_json::Map<String, serde_json::Value> = map
601                .iter()
602                .map(|(k, v)| (format!("{:?}", k), convert_owned_value(v.clone())))
603                .collect();
604            serde_json::Value::Object(obj)
605        }
606        DV::Union(inner) => convert_owned_value(*inner),
607    }
608}