1use crate::cache::CacheManager;
8use crate::error::Result;
9use duckdb::{types::ValueRef, Connection as DuckDbConnection};
10use serde::de::DeserializeOwned;
11use std::cell::RefCell;
12use std::collections::{HashMap, HashSet};
13
14fn static_list_columns() -> HashMap<&'static str, HashSet<&'static str>> {
18 let mut map = HashMap::new();
19 map.insert(
20 "cards",
21 HashSet::from([
22 "artistIds",
23 "attractionLights",
24 "availability",
25 "boosterTypes",
26 "cardParts",
27 "colorIdentity",
28 "colorIndicator",
29 "colors",
30 "finishes",
31 "frameEffects",
32 "keywords",
33 "originalPrintings",
34 "otherFaceIds",
35 "printings",
36 "producedMana",
37 "promoTypes",
38 "rebalancedPrintings",
39 "subsets",
40 "subtypes",
41 "supertypes",
42 "types",
43 "variations",
44 ]),
45 );
46 map.insert(
47 "tokens",
48 HashSet::from([
49 "artistIds",
50 "availability",
51 "boosterTypes",
52 "colorIdentity",
53 "colorIndicator",
54 "colors",
55 "finishes",
56 "frameEffects",
57 "keywords",
58 "otherFaceIds",
59 "producedMana",
60 "promoTypes",
61 "reverseRelated",
62 "subtypes",
63 "supertypes",
64 "types",
65 ]),
66 );
67 map
68}
69
70fn ignored_columns() -> HashSet<&'static str> {
74 HashSet::from([
75 "text",
76 "originalText",
77 "flavorText",
78 "printedText",
79 "identifiers",
80 "legalities",
81 "leadershipSkills",
82 "purchaseUrls",
83 "relatedCards",
84 "rulings",
85 "sourceProducts",
86 "foreignData",
87 "translations",
88 "toughness",
89 "status",
90 "format",
91 "uris",
92 "scryfallUri",
93 ])
94}
95
96fn json_cast_columns() -> HashSet<&'static str> {
99 HashSet::from([
100 "identifiers",
101 "legalities",
102 "leadershipSkills",
103 "purchaseUrls",
104 "relatedCards",
105 "rulings",
106 "sourceProducts",
107 "foreignData",
108 "translations",
109 ])
110}
111
112pub struct Connection {
118 conn: DuckDbConnection,
119 pub cache: RefCell<CacheManager>,
121 registered_views: RefCell<HashSet<String>>,
122}
123
124impl Connection {
125 pub fn new(cache: CacheManager) -> Result<Self> {
129 let conn = DuckDbConnection::open_in_memory()?;
130 Ok(Self {
131 conn,
132 cache: RefCell::new(cache),
133 registered_views: RefCell::new(HashSet::new()),
134 })
135 }
136
137 pub fn ensure_views(&self, views: &[&str]) -> Result<()> {
139 for name in views {
140 if !self.registered_views.borrow().contains(*name) {
141 self.ensure_view(name)?;
142 }
143 }
144 Ok(())
145 }
146
147 pub fn execute(
152 &self,
153 sql: &str,
154 params: &[String],
155 ) -> Result<Vec<HashMap<String, serde_json::Value>>> {
156 let mut stmt = self.conn.prepare(sql)?;
157
158 let param_values: Vec<&dyn duckdb::ToSql> = params
159 .iter()
160 .map(|p| p as &dyn duckdb::ToSql)
161 .collect();
162
163 let mut rows_result = stmt.query(param_values.as_slice())?;
164
165 let column_names: Vec<String> = rows_result
167 .as_ref()
168 .unwrap()
169 .column_names()
170 .into_iter()
171 .map(|s| s.to_string())
172 .collect();
173 let column_count = rows_result.as_ref().unwrap().column_count();
174
175 let mut out: Vec<HashMap<String, serde_json::Value>> = Vec::new();
176
177 while let Some(row) = rows_result.next()? {
178 let mut map = HashMap::new();
179 for i in 0..column_count {
180 let col_name = &column_names[i];
181 let value = convert_value_ref(row.get_ref(i)?);
182 map.insert(col_name.clone(), value);
183 }
184 out.push(map);
185 }
186
187 Ok(out)
188 }
189
190 pub fn execute_into<T: DeserializeOwned>(
195 &self,
196 sql: &str,
197 params: &[String],
198 ) -> Result<Vec<T>> {
199 let rows = self.execute(sql, params)?;
200 let mut results = Vec::with_capacity(rows.len());
201 for row in rows {
202 let value = serde_json::Value::Object(
203 row.into_iter().collect::<serde_json::Map<String, serde_json::Value>>(),
204 );
205 let item: T = serde_json::from_value(value)?;
206 results.push(item);
207 }
208 Ok(results)
209 }
210
211 pub fn execute_scalar(
215 &self,
216 sql: &str,
217 params: &[String],
218 ) -> Result<Option<serde_json::Value>> {
219 let mut stmt = self.conn.prepare(sql)?;
220 let param_values: Vec<&dyn duckdb::ToSql> = params
221 .iter()
222 .map(|p| p as &dyn duckdb::ToSql)
223 .collect();
224
225 let mut rows = stmt.query(param_values.as_slice())?;
226
227 if let Some(row) = rows.next()? {
228 let value = convert_value_ref(row.get_ref(0)?);
229 Ok(Some(value))
230 } else {
231 Ok(None)
232 }
233 }
234
235 pub fn register_table_from_ndjson(
240 &self,
241 table_name: &str,
242 ndjson_path: &str,
243 ) -> Result<()> {
244 let path_fwd = ndjson_path.replace('\\', "/");
245 self.conn.execute_batch(&format!(
246 "DROP TABLE IF EXISTS {}; \
247 CREATE TABLE {} AS SELECT * FROM read_json_auto('{}', format='newline_delimited')",
248 table_name, table_name, path_fwd
249 ))?;
250 self.registered_views.borrow_mut().insert(table_name.to_string());
251 Ok(())
252 }
253
254 pub fn has_view(&self, name: &str) -> bool {
256 self.registered_views.borrow().contains(name)
257 }
258
259 pub fn views(&self) -> Vec<String> {
261 self.registered_views.borrow().iter().cloned().collect()
262 }
263
264 pub fn reset_views(&self) {
266 self.registered_views.borrow_mut().clear();
267 }
268
269 pub fn raw(&self) -> &DuckDbConnection {
271 &self.conn
272 }
273
274 pub fn export_db(&self, path: &std::path::Path) -> crate::error::Result<std::path::PathBuf> {
278 let path_str = path.to_string_lossy().replace('\\', "/");
279 self.conn
280 .execute_batch(&format!("EXPORT DATABASE '{}'", path_str))?;
281 Ok(path.to_path_buf())
282 }
283
284 #[cfg(feature = "polars")]
288 pub fn execute_df(
289 &self,
290 sql: &str,
291 params: &[String],
292 ) -> crate::error::Result<polars::frame::DataFrame> {
293 use polars::prelude::*;
294
295 let mut stmt = self.conn.prepare(sql)?;
296
297 let param_values: Vec<&dyn duckdb::ToSql> = params
298 .iter()
299 .map(|p| p as &dyn duckdb::ToSql)
300 .collect();
301
302 let polars_iter = stmt.query_polars(param_values.as_slice())?;
303 let frames: Vec<DataFrame> = polars_iter.collect();
304
305 if frames.is_empty() {
306 Ok(DataFrame::empty())
307 } else if frames.len() == 1 {
308 Ok(frames.into_iter().next().unwrap())
309 } else {
310 let mut result = frames[0].clone();
312 for frame in &frames[1..] {
313 result = result.vstack(frame).map_err(|e| {
314 crate::error::MtgjsonError::Other(format!("Polars vstack failed: {}", e))
315 })?;
316 }
317 Ok(result)
318 }
319 }
320
321 fn ensure_view(&self, view_name: &str) -> Result<()> {
327 if self.registered_views.borrow().contains(view_name) {
328 return Ok(());
329 }
330
331 let path = self.cache.borrow_mut().ensure_parquet(view_name)?;
332 let path_str = path.to_string_lossy().replace('\\', "/");
334
335 if view_name == "card_legalities" {
336 self.register_legalities_view(&path_str)?;
337 return Ok(());
338 }
339
340 let replace_clause = self.build_csv_replace(&path_str, view_name)?;
342
343 let view_sql = format!(
344 "CREATE OR REPLACE VIEW {} AS SELECT *{} FROM read_parquet('{}')",
345 view_name, replace_clause, path_str
346 );
347 self.conn.execute_batch(&view_sql)?;
348 self.registered_views.borrow_mut().insert(view_name.to_string());
349 eprintln!("Registered view: {} -> {}", view_name, path_str);
350
351 Ok(())
352 }
353
354 fn build_csv_replace(&self, path_str: &str, view_name: &str) -> Result<String> {
364 let mut stmt = self.conn.prepare(&format!(
365 "SELECT column_name, column_type FROM \
366 (DESCRIBE SELECT * FROM read_parquet('{}'))",
367 path_str
368 ))?;
369
370 let mut rows = stmt.query([])?;
371 let mut schema: Vec<(String, String)> = Vec::new();
372 let mut schema_map: HashMap<String, String> = HashMap::new();
373
374 while let Some(row) = rows.next()? {
375 let col_name: String = row.get(0)?;
376 let col_type: String = row.get(1)?;
377 schema_map.insert(col_name.clone(), col_type.clone());
378 schema.push((col_name, col_type));
379 }
380
381 let static_lists = static_list_columns();
382 let ignored = ignored_columns();
383 let json_cast = json_cast_columns();
384
385 let mut candidates: HashSet<String> = HashSet::new();
387
388 if let Some(static_cols) = static_lists.get(view_name) {
390 for col in static_cols {
391 candidates.insert(col.to_string());
392 }
393 }
394
395 for (col, dtype) in &schema {
397 if dtype != "VARCHAR" {
398 continue;
399 }
400 if ignored.contains(col.as_str()) {
401 continue;
402 }
403 if col.ends_with('s') {
404 candidates.insert(col.clone());
405 }
406 }
407
408 let mut final_cols: Vec<String> = candidates
410 .into_iter()
411 .filter(|col| schema_map.get(col).map(|t| t == "VARCHAR").unwrap_or(false))
412 .collect();
413 final_cols.sort();
414
415 let mut exprs: Vec<String> = Vec::new();
416
417 for col in &final_cols {
418 exprs.push(format!(
419 "CASE WHEN \"{}\" IS NULL OR TRIM(\"{}\") = '' \
420 THEN []::VARCHAR[] \
421 ELSE string_split(\"{}\", ', ') END AS \"{}\"",
422 col, col, col, col
423 ));
424 }
425
426 let mut json_cols: Vec<&&str> = json_cast.iter().collect();
428 json_cols.sort();
429 for col in json_cols {
430 if schema_map.get(*col).map(|t| t == "VARCHAR").unwrap_or(false) {
431 exprs.push(format!("TRY_CAST(\"{}\" AS JSON) AS \"{}\"", col, col));
432 }
433 }
434
435 if exprs.is_empty() {
436 Ok(String::new())
437 } else {
438 Ok(format!(" REPLACE ({})", exprs.join(", ")))
439 }
440 }
441
442 fn register_legalities_view(&self, path_str: &str) -> Result<()> {
448 let mut stmt = self.conn.prepare(&format!(
449 "SELECT column_name FROM \
450 (DESCRIBE SELECT * FROM read_parquet('{}'))",
451 path_str
452 ))?;
453
454 let mut rows = stmt.query([])?;
455 let mut all_cols: Vec<String> = Vec::new();
456
457 while let Some(row) = rows.next()? {
458 let col_name: String = row.get(0)?;
459 all_cols.push(col_name);
460 }
461
462 let format_cols: Vec<&String> = all_cols.iter().filter(|c| c.as_str() != "uuid").collect();
464
465 if format_cols.is_empty() {
466 self.conn.execute_batch(&format!(
468 "CREATE OR REPLACE VIEW card_legalities AS \
469 SELECT * FROM read_parquet('{}')",
470 path_str
471 ))?;
472 } else {
473 let cols_sql: String = format_cols
474 .iter()
475 .map(|c| format!("\"{}\"", c))
476 .collect::<Vec<_>>()
477 .join(", ");
478
479 self.conn.execute_batch(&format!(
480 "CREATE OR REPLACE VIEW card_legalities AS \
481 SELECT uuid, format, status FROM (\
482 UNPIVOT (SELECT * FROM read_parquet('{}'))\
483 ON {}\
484 INTO NAME format VALUE status\
485 ) WHERE status IS NOT NULL",
486 path_str, cols_sql
487 ))?;
488 }
489
490 self.registered_views.borrow_mut().insert("card_legalities".to_string());
491 eprintln!(
492 "Registered legalities view (UNPIVOT {} formats): {}",
493 format_cols.len(),
494 path_str
495 );
496
497 Ok(())
498 }
499}
500
501fn convert_value_ref(val: ValueRef<'_>) -> serde_json::Value {
503 match val {
504 ValueRef::Null => serde_json::Value::Null,
505 ValueRef::Boolean(b) => serde_json::Value::Bool(b),
506 ValueRef::TinyInt(n) => serde_json::Value::Number(n.into()),
507 ValueRef::SmallInt(n) => serde_json::Value::Number(n.into()),
508 ValueRef::Int(n) => serde_json::Value::Number(n.into()),
509 ValueRef::BigInt(n) => serde_json::Value::Number(n.into()),
510 ValueRef::HugeInt(n) => {
511 if let Ok(i) = i64::try_from(n) {
513 serde_json::Value::Number(i.into())
514 } else {
515 serde_json::Value::String(n.to_string())
516 }
517 }
518 ValueRef::Float(f) => serde_json::Number::from_f64(f as f64)
519 .map(serde_json::Value::Number)
520 .unwrap_or(serde_json::Value::Null),
521 ValueRef::Double(f) => serde_json::Number::from_f64(f)
522 .map(serde_json::Value::Number)
523 .unwrap_or(serde_json::Value::Null),
524 ValueRef::Text(bytes) => {
525 let s = String::from_utf8_lossy(bytes).to_string();
526 serde_json::Value::String(s)
528 }
529 ValueRef::Blob(bytes) => {
530 serde_json::Value::String(format!(
532 "blob:{}",
533 bytes.iter().map(|b| format!("{:02x}", b)).collect::<String>()
534 ))
535 }
536 other => convert_owned_value(other.to_owned())
539 }
540}
541
542fn convert_owned_value(val: duckdb::types::Value) -> serde_json::Value {
546 use duckdb::types::Value as DV;
547 match val {
548 DV::Null => serde_json::Value::Null,
549 DV::Boolean(b) => serde_json::Value::Bool(b),
550 DV::TinyInt(n) => serde_json::Value::Number(n.into()),
551 DV::SmallInt(n) => serde_json::Value::Number(n.into()),
552 DV::Int(n) => serde_json::Value::Number(n.into()),
553 DV::BigInt(n) => serde_json::Value::Number(n.into()),
554 DV::HugeInt(n) => {
555 if let Ok(i) = i64::try_from(n) {
556 serde_json::Value::Number(i.into())
557 } else {
558 serde_json::Value::String(n.to_string())
559 }
560 }
561 DV::UTinyInt(n) => serde_json::Value::Number(n.into()),
562 DV::USmallInt(n) => serde_json::Value::Number(n.into()),
563 DV::UInt(n) => serde_json::Value::Number(n.into()),
564 DV::UBigInt(n) => serde_json::Value::Number(n.into()),
565 DV::Float(f) => serde_json::Number::from_f64(f as f64)
566 .map(serde_json::Value::Number)
567 .unwrap_or(serde_json::Value::Null),
568 DV::Double(f) => serde_json::Number::from_f64(f)
569 .map(serde_json::Value::Number)
570 .unwrap_or(serde_json::Value::Null),
571 DV::Decimal(d) => serde_json::Value::String(d.to_string()),
572 DV::Text(s) => serde_json::Value::String(s),
573 DV::Blob(b) => serde_json::Value::String(format!(
574 "blob:{}",
575 b.iter().map(|byte| format!("{:02x}", byte)).collect::<String>()
576 )),
577 DV::Enum(s) => serde_json::Value::String(s),
578 DV::Timestamp(_, micros) => {
579 let secs = micros / 1_000_000;
581 let remainder = (micros % 1_000_000).unsigned_abs();
582 serde_json::Value::String(format!("{}.{:06}", secs, remainder))
583 }
584 DV::Date32(days) => serde_json::Value::Number(days.into()),
585 DV::Time64(_, val) => serde_json::Value::Number(val.into()),
586 DV::Interval { months, days, nanos } => {
587 serde_json::Value::String(format!("{}m{}d{}ns", months, days, nanos))
588 }
589 DV::List(items) | DV::Array(items) => {
590 serde_json::Value::Array(items.into_iter().map(convert_owned_value).collect())
591 }
592 DV::Struct(map) => {
593 let obj: serde_json::Map<String, serde_json::Value> = map
594 .iter()
595 .map(|(k, v)| (k.clone(), convert_owned_value(v.clone())))
596 .collect();
597 serde_json::Value::Object(obj)
598 }
599 DV::Map(map) => {
600 let obj: serde_json::Map<String, serde_json::Value> = map
601 .iter()
602 .map(|(k, v)| (format!("{:?}", k), convert_owned_value(v.clone())))
603 .collect();
604 serde_json::Value::Object(obj)
605 }
606 DV::Union(inner) => convert_owned_value(*inner),
607 }
608}