use crate::cache::CacheManager;
use crate::error::Result;
use duckdb::{types::ValueRef, Connection as DuckDbConnection};
use serde::de::DeserializeOwned;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
fn static_list_columns() -> HashMap<&'static str, HashSet<&'static str>> {
let mut map = HashMap::new();
map.insert(
"cards",
HashSet::from([
"artistIds",
"attractionLights",
"availability",
"boosterTypes",
"cardParts",
"colorIdentity",
"colorIndicator",
"colors",
"finishes",
"frameEffects",
"keywords",
"originalPrintings",
"otherFaceIds",
"printings",
"producedMana",
"promoTypes",
"rebalancedPrintings",
"subsets",
"subtypes",
"supertypes",
"types",
"variations",
]),
);
map.insert(
"tokens",
HashSet::from([
"artistIds",
"availability",
"boosterTypes",
"colorIdentity",
"colorIndicator",
"colors",
"finishes",
"frameEffects",
"keywords",
"otherFaceIds",
"producedMana",
"promoTypes",
"reverseRelated",
"subtypes",
"supertypes",
"types",
]),
);
map
}
fn ignored_columns() -> HashSet<&'static str> {
HashSet::from([
"text",
"originalText",
"flavorText",
"printedText",
"identifiers",
"legalities",
"leadershipSkills",
"purchaseUrls",
"relatedCards",
"rulings",
"sourceProducts",
"foreignData",
"translations",
"toughness",
"status",
"format",
"uris",
"scryfallUri",
])
}
fn json_cast_columns() -> HashSet<&'static str> {
HashSet::from([
"identifiers",
"legalities",
"leadershipSkills",
"purchaseUrls",
"relatedCards",
"rulings",
"sourceProducts",
"foreignData",
"translations",
])
}
pub struct Connection {
conn: DuckDbConnection,
pub cache: RefCell<CacheManager>,
registered_views: RefCell<HashSet<String>>,
}
impl Connection {
pub fn new(cache: CacheManager) -> Result<Self> {
let conn = DuckDbConnection::open_in_memory()?;
Ok(Self {
conn,
cache: RefCell::new(cache),
registered_views: RefCell::new(HashSet::new()),
})
}
pub fn ensure_views(&self, views: &[&str]) -> Result<()> {
for name in views {
if !self.registered_views.borrow().contains(*name) {
self.ensure_view(name)?;
}
}
Ok(())
}
pub fn execute(
&self,
sql: &str,
params: &[String],
) -> Result<Vec<HashMap<String, serde_json::Value>>> {
let mut stmt = self.conn.prepare(sql)?;
let param_values: Vec<&dyn duckdb::ToSql> = params
.iter()
.map(|p| p as &dyn duckdb::ToSql)
.collect();
let mut rows_result = stmt.query(param_values.as_slice())?;
let column_names: Vec<String> = rows_result
.as_ref()
.unwrap()
.column_names()
.into_iter()
.map(|s| s.to_string())
.collect();
let column_count = rows_result.as_ref().unwrap().column_count();
let mut out: Vec<HashMap<String, serde_json::Value>> = Vec::new();
while let Some(row) = rows_result.next()? {
let mut map = HashMap::new();
for i in 0..column_count {
let col_name = &column_names[i];
let value = convert_value_ref(row.get_ref(i)?);
map.insert(col_name.clone(), value);
}
out.push(map);
}
Ok(out)
}
pub fn execute_into<T: DeserializeOwned>(
&self,
sql: &str,
params: &[String],
) -> Result<Vec<T>> {
let rows = self.execute(sql, params)?;
let mut results = Vec::with_capacity(rows.len());
for row in rows {
let value = serde_json::Value::Object(
row.into_iter().collect::<serde_json::Map<String, serde_json::Value>>(),
);
let item: T = serde_json::from_value(value)?;
results.push(item);
}
Ok(results)
}
pub fn execute_scalar(
&self,
sql: &str,
params: &[String],
) -> Result<Option<serde_json::Value>> {
let mut stmt = self.conn.prepare(sql)?;
let param_values: Vec<&dyn duckdb::ToSql> = params
.iter()
.map(|p| p as &dyn duckdb::ToSql)
.collect();
let mut rows = stmt.query(param_values.as_slice())?;
if let Some(row) = rows.next()? {
let value = convert_value_ref(row.get_ref(0)?);
Ok(Some(value))
} else {
Ok(None)
}
}
pub fn register_table_from_ndjson(
&self,
table_name: &str,
ndjson_path: &str,
) -> Result<()> {
let path_fwd = ndjson_path.replace('\\', "/");
self.conn.execute_batch(&format!(
"DROP TABLE IF EXISTS {}; \
CREATE TABLE {} AS SELECT * FROM read_json_auto('{}', format='newline_delimited')",
table_name, table_name, path_fwd
))?;
self.registered_views.borrow_mut().insert(table_name.to_string());
Ok(())
}
pub fn has_view(&self, name: &str) -> bool {
self.registered_views.borrow().contains(name)
}
pub fn views(&self) -> Vec<String> {
self.registered_views.borrow().iter().cloned().collect()
}
pub fn reset_views(&self) {
self.registered_views.borrow_mut().clear();
}
pub fn raw(&self) -> &DuckDbConnection {
&self.conn
}
pub fn export_db(&self, path: &std::path::Path) -> crate::error::Result<std::path::PathBuf> {
let path_str = path.to_string_lossy().replace('\\', "/");
self.conn
.execute_batch(&format!("EXPORT DATABASE '{}'", path_str))?;
Ok(path.to_path_buf())
}
#[cfg(feature = "polars")]
pub fn execute_df(
&self,
sql: &str,
params: &[String],
) -> crate::error::Result<polars::frame::DataFrame> {
use polars::prelude::*;
let mut stmt = self.conn.prepare(sql)?;
let param_values: Vec<&dyn duckdb::ToSql> = params
.iter()
.map(|p| p as &dyn duckdb::ToSql)
.collect();
let polars_iter = stmt.query_polars(param_values.as_slice())?;
let frames: Vec<DataFrame> = polars_iter.collect();
if frames.is_empty() {
Ok(DataFrame::empty())
} else if frames.len() == 1 {
Ok(frames.into_iter().next().unwrap())
} else {
let mut result = frames[0].clone();
for frame in &frames[1..] {
result = result.vstack(frame).map_err(|e| {
crate::error::MtgjsonError::Other(format!("Polars vstack failed: {}", e))
})?;
}
Ok(result)
}
}
fn ensure_view(&self, view_name: &str) -> Result<()> {
if self.registered_views.borrow().contains(view_name) {
return Ok(());
}
let path = self.cache.borrow_mut().ensure_parquet(view_name)?;
let path_str = path.to_string_lossy().replace('\\', "/");
if view_name == "card_legalities" {
self.register_legalities_view(&path_str)?;
return Ok(());
}
let replace_clause = self.build_csv_replace(&path_str, view_name)?;
let view_sql = format!(
"CREATE OR REPLACE VIEW {} AS SELECT *{} FROM read_parquet('{}')",
view_name, replace_clause, path_str
);
self.conn.execute_batch(&view_sql)?;
self.registered_views.borrow_mut().insert(view_name.to_string());
eprintln!("Registered view: {} -> {}", view_name, path_str);
Ok(())
}
fn build_csv_replace(&self, path_str: &str, view_name: &str) -> Result<String> {
let mut stmt = self.conn.prepare(&format!(
"SELECT column_name, column_type FROM \
(DESCRIBE SELECT * FROM read_parquet('{}'))",
path_str
))?;
let mut rows = stmt.query([])?;
let mut schema: Vec<(String, String)> = Vec::new();
let mut schema_map: HashMap<String, String> = HashMap::new();
while let Some(row) = rows.next()? {
let col_name: String = row.get(0)?;
let col_type: String = row.get(1)?;
schema_map.insert(col_name.clone(), col_type.clone());
schema.push((col_name, col_type));
}
let static_lists = static_list_columns();
let ignored = ignored_columns();
let json_cast = json_cast_columns();
let mut candidates: HashSet<String> = HashSet::new();
if let Some(static_cols) = static_lists.get(view_name) {
for col in static_cols {
candidates.insert(col.to_string());
}
}
for (col, dtype) in &schema {
if dtype != "VARCHAR" {
continue;
}
if ignored.contains(col.as_str()) {
continue;
}
if col.ends_with('s') {
candidates.insert(col.clone());
}
}
let mut final_cols: Vec<String> = candidates
.into_iter()
.filter(|col| schema_map.get(col).map(|t| t == "VARCHAR").unwrap_or(false))
.collect();
final_cols.sort();
let mut exprs: Vec<String> = Vec::new();
for col in &final_cols {
exprs.push(format!(
"CASE WHEN \"{}\" IS NULL OR TRIM(\"{}\") = '' \
THEN []::VARCHAR[] \
ELSE string_split(\"{}\", ', ') END AS \"{}\"",
col, col, col, col
));
}
let mut json_cols: Vec<&&str> = json_cast.iter().collect();
json_cols.sort();
for col in json_cols {
if schema_map.get(*col).map(|t| t == "VARCHAR").unwrap_or(false) {
exprs.push(format!("TRY_CAST(\"{}\" AS JSON) AS \"{}\"", col, col));
}
}
if exprs.is_empty() {
Ok(String::new())
} else {
Ok(format!(" REPLACE ({})", exprs.join(", ")))
}
}
fn register_legalities_view(&self, path_str: &str) -> Result<()> {
let mut stmt = self.conn.prepare(&format!(
"SELECT column_name FROM \
(DESCRIBE SELECT * FROM read_parquet('{}'))",
path_str
))?;
let mut rows = stmt.query([])?;
let mut all_cols: Vec<String> = Vec::new();
while let Some(row) = rows.next()? {
let col_name: String = row.get(0)?;
all_cols.push(col_name);
}
let format_cols: Vec<&String> = all_cols.iter().filter(|c| c.as_str() != "uuid").collect();
if format_cols.is_empty() {
self.conn.execute_batch(&format!(
"CREATE OR REPLACE VIEW card_legalities AS \
SELECT * FROM read_parquet('{}')",
path_str
))?;
} else {
let cols_sql: String = format_cols
.iter()
.map(|c| format!("\"{}\"", c))
.collect::<Vec<_>>()
.join(", ");
self.conn.execute_batch(&format!(
"CREATE OR REPLACE VIEW card_legalities AS \
SELECT uuid, format, status FROM (\
UNPIVOT (SELECT * FROM read_parquet('{}'))\
ON {}\
INTO NAME format VALUE status\
) WHERE status IS NOT NULL",
path_str, cols_sql
))?;
}
self.registered_views.borrow_mut().insert("card_legalities".to_string());
eprintln!(
"Registered legalities view (UNPIVOT {} formats): {}",
format_cols.len(),
path_str
);
Ok(())
}
}
fn convert_value_ref(val: ValueRef<'_>) -> serde_json::Value {
match val {
ValueRef::Null => serde_json::Value::Null,
ValueRef::Boolean(b) => serde_json::Value::Bool(b),
ValueRef::TinyInt(n) => serde_json::Value::Number(n.into()),
ValueRef::SmallInt(n) => serde_json::Value::Number(n.into()),
ValueRef::Int(n) => serde_json::Value::Number(n.into()),
ValueRef::BigInt(n) => serde_json::Value::Number(n.into()),
ValueRef::HugeInt(n) => {
if let Ok(i) = i64::try_from(n) {
serde_json::Value::Number(i.into())
} else {
serde_json::Value::String(n.to_string())
}
}
ValueRef::Float(f) => serde_json::Number::from_f64(f as f64)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
ValueRef::Double(f) => serde_json::Number::from_f64(f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
ValueRef::Text(bytes) => {
let s = String::from_utf8_lossy(bytes).to_string();
serde_json::Value::String(s)
}
ValueRef::Blob(bytes) => {
serde_json::Value::String(format!(
"blob:{}",
bytes.iter().map(|b| format!("{:02x}", b)).collect::<String>()
))
}
other => convert_owned_value(other.to_owned())
}
}
fn convert_owned_value(val: duckdb::types::Value) -> serde_json::Value {
use duckdb::types::Value as DV;
match val {
DV::Null => serde_json::Value::Null,
DV::Boolean(b) => serde_json::Value::Bool(b),
DV::TinyInt(n) => serde_json::Value::Number(n.into()),
DV::SmallInt(n) => serde_json::Value::Number(n.into()),
DV::Int(n) => serde_json::Value::Number(n.into()),
DV::BigInt(n) => serde_json::Value::Number(n.into()),
DV::HugeInt(n) => {
if let Ok(i) = i64::try_from(n) {
serde_json::Value::Number(i.into())
} else {
serde_json::Value::String(n.to_string())
}
}
DV::UTinyInt(n) => serde_json::Value::Number(n.into()),
DV::USmallInt(n) => serde_json::Value::Number(n.into()),
DV::UInt(n) => serde_json::Value::Number(n.into()),
DV::UBigInt(n) => serde_json::Value::Number(n.into()),
DV::Float(f) => serde_json::Number::from_f64(f as f64)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
DV::Double(f) => serde_json::Number::from_f64(f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null),
DV::Decimal(d) => serde_json::Value::String(d.to_string()),
DV::Text(s) => serde_json::Value::String(s),
DV::Blob(b) => serde_json::Value::String(format!(
"blob:{}",
b.iter().map(|byte| format!("{:02x}", byte)).collect::<String>()
)),
DV::Enum(s) => serde_json::Value::String(s),
DV::Timestamp(_, micros) => {
let secs = micros / 1_000_000;
let remainder = (micros % 1_000_000).unsigned_abs();
serde_json::Value::String(format!("{}.{:06}", secs, remainder))
}
DV::Date32(days) => serde_json::Value::Number(days.into()),
DV::Time64(_, val) => serde_json::Value::Number(val.into()),
DV::Interval { months, days, nanos } => {
serde_json::Value::String(format!("{}m{}d{}ns", months, days, nanos))
}
DV::List(items) | DV::Array(items) => {
serde_json::Value::Array(items.into_iter().map(convert_owned_value).collect())
}
DV::Struct(map) => {
let obj: serde_json::Map<String, serde_json::Value> = map
.iter()
.map(|(k, v)| (k.clone(), convert_owned_value(v.clone())))
.collect();
serde_json::Value::Object(obj)
}
DV::Map(map) => {
let obj: serde_json::Map<String, serde_json::Value> = map
.iter()
.map(|(k, v)| (format!("{:?}", k), convert_owned_value(v.clone())))
.collect();
serde_json::Value::Object(obj)
}
DV::Union(inner) => convert_owned_value(*inner),
}
}