vibesql_storage/persistence/
json.rs

1// ============================================================================
2// JSON Format Support (Save/Load Operations)
3// ============================================================================
4//
5// Provides JSON serialization/deserialization for database persistence:
6// - Human-readable, tool-friendly format
7// - Standard JSON for easy integration with other systems
8// - Supports all SQL types with proper representation
9
10use std::{
11    collections::HashMap,
12    fs::File,
13    io::{BufReader, BufWriter, Write},
14    path::Path,
15};
16
17use serde::{Deserialize, Serialize};
18use vibesql_ast::OrderDirection;
19use vibesql_catalog::{ColumnSchema, TableSchema};
20use vibesql_types::{DataType, SqlValue};
21
22use crate::{Database, Row, StorageError, Table};
23
24// ============================================================================
25// JSON Schema Structures
26// ============================================================================
27
28/// Root JSON document structure
29#[derive(Debug, Serialize, Deserialize)]
30pub struct JsonDatabase {
31    /// Metadata about the format
32    pub vibesql: JsonMetadata,
33    /// Database schemas (excluding default schema which is implicit)
34    #[serde(default, skip_serializing_if = "Vec::is_empty")]
35    pub schemas: Vec<JsonSchema>,
36    /// Database roles
37    #[serde(default, skip_serializing_if = "Vec::is_empty")]
38    pub roles: Vec<JsonRole>,
39    /// Tables with data
40    pub tables: Vec<JsonTable>,
41    /// User-defined indexes
42    #[serde(default, skip_serializing_if = "Vec::is_empty")]
43    pub indexes: Vec<JsonIndex>,
44    /// Views
45    #[serde(default, skip_serializing_if = "Vec::is_empty")]
46    pub views: Vec<JsonView>,
47}
48
49/// Format metadata
50#[derive(Debug, Serialize, Deserialize)]
51pub struct JsonMetadata {
52    /// Format version (for future compatibility)
53    pub version: String,
54    /// Format type
55    pub format: String,
56    /// Timestamp when exported
57    #[serde(with = "chrono::serde::ts_seconds")]
58    pub timestamp: chrono::DateTime<chrono::Utc>,
59}
60
61/// Schema representation
62#[derive(Debug, Serialize, Deserialize)]
63pub struct JsonSchema {
64    pub name: String,
65}
66
67/// Role representation
68#[derive(Debug, Serialize, Deserialize)]
69pub struct JsonRole {
70    pub name: String,
71}
72
73/// Table with schema and data
74#[derive(Debug, Serialize, Deserialize)]
75pub struct JsonTable {
76    pub name: String,
77    #[serde(default = "default_schema_name")]
78    pub schema: String,
79    pub columns: Vec<JsonColumn>,
80    #[serde(default)]
81    pub rows: Vec<JsonRow>,
82}
83
84fn default_schema_name() -> String {
85    vibesql_catalog::DEFAULT_SCHEMA.to_string()
86}
87
88/// Column schema
89#[derive(Debug, Serialize, Deserialize)]
90pub struct JsonColumn {
91    pub name: String,
92    #[serde(rename = "type")]
93    pub data_type: String,
94    #[serde(default)]
95    pub nullable: bool,
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub max_length: Option<usize>,
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub precision: Option<u8>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub scale: Option<u8>,
102    /// Column-level collation (e.g., "nocase", "binary", "rtrim")
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub collation: Option<String>,
105}
106
107/// Row data (key-value pairs for readability)
108pub type JsonRow = HashMap<String, serde_json::Value>;
109
110/// Index representation
111#[derive(Debug, Serialize, Deserialize)]
112pub struct JsonIndex {
113    pub name: String,
114    pub table: String,
115    pub columns: Vec<JsonIndexColumn>,
116    #[serde(default)]
117    pub unique: bool,
118}
119
120/// Index column with sort direction - supports both column references and expressions
121#[derive(Debug, Serialize, Deserialize)]
122#[serde(untagged)]
123pub enum JsonIndexColumn {
124    /// Simple column reference
125    Column {
126        name: String,
127        #[serde(default = "default_asc")]
128        direction: String,
129        #[serde(default, skip_serializing_if = "Option::is_none")]
130        prefix_length: Option<u64>,
131    },
132    /// Expression index (functional index)
133    Expression {
134        expression: String,
135        #[serde(default = "default_asc")]
136        direction: String,
137    },
138}
139
140fn default_asc() -> String {
141    "ASC".to_string()
142}
143
144/// View representation
145#[derive(Debug, Serialize, Deserialize)]
146pub struct JsonView {
147    pub name: String,
148    pub definition: String,
149}
150
151// ============================================================================
152// Options for JSON serialization
153// ============================================================================
154
155/// Configuration options for JSON export
156#[derive(Debug, Clone)]
157pub struct JsonOptions {
158    /// Pretty-print the JSON (true) or minified (false)
159    pub pretty: bool,
160    /// Include metadata (timestamp, version)
161    pub include_metadata: bool,
162}
163
164impl Default for JsonOptions {
165    fn default() -> Self {
166        JsonOptions { pretty: true, include_metadata: true }
167    }
168}
169
170// ============================================================================
171// Database save_json implementation
172// ============================================================================
173
174impl Database {
175    /// Save database in JSON format with default options
176    ///
177    /// # Example
178    /// ```no_run
179    /// # use vibesql_storage::Database;
180    /// let db = Database::new();
181    /// db.save_json("database.json").unwrap();
182    /// ```
183    pub fn save_json<P: AsRef<Path>>(&self, path: P) -> Result<(), StorageError> {
184        self.save_json_with_options(path, JsonOptions::default())
185    }
186
187    /// Save database in JSON format with custom options
188    ///
189    /// # Example
190    /// ```no_run
191    /// # use vibesql_storage::Database;
192    /// # use vibesql_storage::persistence::json::JsonOptions;
193    /// let db = Database::new();
194    /// let options = JsonOptions { pretty: true, include_metadata: true };
195    /// db.save_json_with_options("database.json", options).unwrap();
196    /// ```
197    pub fn save_json_with_options<P: AsRef<Path>>(
198        &self,
199        path: P,
200        options: JsonOptions,
201    ) -> Result<(), StorageError> {
202        let file = File::create(path)
203            .map_err(|e| StorageError::NotImplemented(format!("Failed to create file: {}", e)))?;
204
205        let mut writer = BufWriter::new(file);
206
207        // Build JSON structure
208        let json_db = self.to_json_database(options.include_metadata)?;
209
210        // Serialize to JSON
211        let json_str = if options.pretty {
212            serde_json::to_string_pretty(&json_db)
213        } else {
214            serde_json::to_string(&json_db)
215        }
216        .map_err(|e| StorageError::NotImplemented(format!("JSON serialization failed: {}", e)))?;
217
218        // Write to file
219        writer
220            .write_all(json_str.as_bytes())
221            .map_err(|e| StorageError::NotImplemented(format!("Write error: {}", e)))?;
222
223        writer.flush().map_err(|e| StorageError::NotImplemented(format!("Flush error: {}", e)))?;
224
225        Ok(())
226    }
227
228    /// Convert Database to JSON representation
229    fn to_json_database(&self, include_metadata: bool) -> Result<JsonDatabase, StorageError> {
230        // Metadata
231        let vibesql = if include_metadata {
232            JsonMetadata {
233                version: "1.0".to_string(),
234                format: "json".to_string(),
235                timestamp: chrono::Utc::now(),
236            }
237        } else {
238            JsonMetadata {
239                version: "1.0".to_string(),
240                format: "json".to_string(),
241                timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
242            }
243        };
244
245        // Schemas (excluding default and temp schemas which are auto-created)
246        // Skip all temp schemas (temp_1, temp_2, etc.) - they are session-scoped
247        let schemas = self
248            .catalog
249            .list_schemas()
250            .into_iter()
251            .filter(|name| {
252                name != vibesql_catalog::DEFAULT_SCHEMA
253                    && !vibesql_catalog::Catalog::is_temp_schema(name)
254            })
255            .map(|name| JsonSchema { name })
256            .collect();
257
258        // Roles
259        let roles = self.catalog.list_roles().into_iter().map(|name| JsonRole { name }).collect();
260
261        // Tables with data
262        let mut tables = Vec::new();
263        for table_name in self.catalog.list_tables() {
264            if let Some(table) = self.get_table(&table_name) {
265                tables.push(table_to_json(&table_name, table)?);
266            }
267        }
268
269        // Indexes (skip auto-generated indexes which are recreated by constraints)
270        let indexes = self
271            .list_indexes()
272            .into_iter()
273            .filter(|index_name| {
274                // Skip auto-generated indexes - these are automatically created by constraints:
275                // - "pk_<table_name>" indexes are created by PRIMARY KEY constraints
276                // - "sqlite_autoindex_<table>_<n>" indexes are created by PRIMARY KEY/UNIQUE
277                //   constraints (follows SQLite naming convention for implicit indexes)
278                let lower_name = index_name.to_lowercase();
279                !lower_name.starts_with("pk_") && !lower_name.starts_with("sqlite_autoindex_")
280            })
281            .filter_map(|index_name| {
282                self.get_index(&index_name).map(|metadata| JsonIndex {
283                    name: index_name,
284                    table: metadata.table_name.clone(),
285                    columns: metadata
286                        .columns
287                        .iter()
288                        .map(|col| {
289                            let direction = match col.direction() {
290                                OrderDirection::Desc => "DESC".to_string(),
291                                OrderDirection::Asc => "ASC".to_string(),
292                            };
293                            if let Some(name) = col.column_name() {
294                                JsonIndexColumn::Column {
295                                    name: name.to_string(),
296                                    direction,
297                                    prefix_length: col.prefix_length(),
298                                }
299                            } else if let Some(expr) = col.get_expression() {
300                                JsonIndexColumn::Expression {
301                                    expression: format!("{:?}", expr),
302                                    direction,
303                                }
304                            } else {
305                                // Fallback - shouldn't happen
306                                JsonIndexColumn::Column {
307                                    name: "unknown".to_string(),
308                                    direction,
309                                    prefix_length: None,
310                                }
311                            }
312                        })
313                        .collect(),
314                    unique: metadata.unique,
315                })
316            })
317            .collect();
318
319        // Views - serialize view definitions
320        // Use view_def.name to preserve original case (list_views returns normalized keys)
321        let views = self
322            .catalog
323            .list_views()
324            .into_iter()
325            .filter_map(|view_name| {
326                self.catalog.get_view(&view_name).map(|view_def| {
327                    let definition = view_def
328                        .sql_definition
329                        .clone()
330                        .unwrap_or_else(|| format!("{:#?}", view_def.query));
331                    JsonView { name: view_def.name.clone(), definition }
332                })
333            })
334            .collect();
335
336        Ok(JsonDatabase { vibesql, schemas, roles, tables, indexes, views })
337    }
338
339    /// Load database from JSON format
340    ///
341    /// # Example
342    /// ```no_run
343    /// # use vibesql_storage::Database;
344    /// let db = Database::load_json("database.json").unwrap();
345    /// ```
346    pub fn load_json<P: AsRef<Path>>(path: P) -> Result<Self, StorageError> {
347        let file = File::open(path)
348            .map_err(|e| StorageError::NotImplemented(format!("Failed to open file: {}", e)))?;
349
350        let reader = BufReader::new(file);
351
352        // Deserialize from JSON
353        let json_db: JsonDatabase = serde_json::from_reader(reader).map_err(|e| {
354            StorageError::NotImplemented(format!("JSON deserialization failed: {}", e))
355        })?;
356
357        // Convert to Database
358        json_database_to_db(json_db)
359    }
360}
361
362// ============================================================================
363// Conversion functions
364// ============================================================================
365
366/// Convert Table to JSON representation
367fn table_to_json(table_name: &str, table: &Table) -> Result<JsonTable, StorageError> {
368    let schema_name = if let Some(idx) = table_name.find('.') {
369        table_name[..idx].to_string()
370    } else {
371        vibesql_catalog::DEFAULT_SCHEMA.to_string()
372    };
373
374    let unqualified_name = if let Some(idx) = table_name.find('.') {
375        table_name[idx + 1..].to_string()
376    } else {
377        table_name.to_string()
378    };
379
380    let columns = table.schema.columns.iter().map(column_to_json).collect();
381
382    // Only persist live (non-deleted) rows.
383    // Using scan_live() to skip rows marked as deleted in the deletion bitmap.
384    let rows = table
385        .scan_live()
386        .map(|(_idx, row)| {
387            let mut row_map = HashMap::new();
388            for (i, value) in row.values.iter().enumerate() {
389                let col_name = &table.schema.columns[i].name;
390                row_map.insert(col_name.clone(), sql_value_to_json(value));
391            }
392            row_map
393        })
394        .collect();
395
396    Ok(JsonTable { name: unqualified_name, schema: schema_name, columns, rows })
397}
398
399/// Convert ColumnSchema to JSON representation
400fn column_to_json(col: &ColumnSchema) -> JsonColumn {
401    let (type_name, max_length, precision, scale) = match &col.data_type {
402        DataType::Integer => ("INTEGER".to_string(), None, None, None),
403        DataType::Smallint => ("SMALLINT".to_string(), None, None, None),
404        DataType::Bigint => ("BIGINT".to_string(), None, None, None),
405        DataType::Unsigned => ("UNSIGNED".to_string(), None, None, None),
406        DataType::Numeric { precision: p, scale: s } => {
407            ("NUMERIC".to_string(), None, Some(*p), Some(*s))
408        }
409        DataType::Decimal { precision: p, scale: s } => {
410            ("DECIMAL".to_string(), None, Some(*p), Some(*s))
411        }
412        DataType::Float { precision: p } => ("FLOAT".to_string(), None, Some(*p), None),
413        DataType::Real => ("REAL".to_string(), None, None, None),
414        DataType::DoublePrecision => ("DOUBLE PRECISION".to_string(), None, None, None),
415        DataType::Character { length } => ("CHAR".to_string(), Some(*length), None, None),
416        DataType::Varchar { max_length: ml } => ("VARCHAR".to_string(), *ml, None, None),
417        DataType::CharacterLargeObject => ("CLOB".to_string(), None, None, None),
418        DataType::Name => ("NAME".to_string(), None, None, None),
419        DataType::Boolean => ("BOOLEAN".to_string(), None, None, None),
420        DataType::Date => ("DATE".to_string(), None, None, None),
421        DataType::Time { with_timezone } => {
422            if *with_timezone {
423                ("TIME WITH TIME ZONE".to_string(), None, None, None)
424            } else {
425                ("TIME".to_string(), None, None, None)
426            }
427        }
428        DataType::Timestamp { with_timezone } => {
429            // NOTE: DATETIME is serialized as TIMESTAMP since they are
430            // internally represented as the same type (DataType::Timestamp).
431            // This is intentional - see issue #1626 for rationale.
432            if *with_timezone {
433                ("TIMESTAMP WITH TIME ZONE".to_string(), None, None, None)
434            } else {
435                ("TIMESTAMP".to_string(), None, None, None)
436            }
437        }
438        DataType::Interval { .. } => ("INTERVAL".to_string(), None, None, None),
439        DataType::BinaryLargeObject => ("BLOB".to_string(), None, None, None),
440        DataType::Bit { length } => ("BIT".to_string(), *length, None, None),
441        DataType::Vector { dimensions } => (format!("VECTOR({})", dimensions), None, None, None),
442        DataType::UserDefined { type_name } => (type_name.clone(), None, None, None),
443        DataType::Null => ("NULL".to_string(), None, None, None),
444    };
445
446    JsonColumn {
447        name: col.name.clone(),
448        data_type: type_name,
449        nullable: col.nullable,
450        max_length,
451        precision,
452        scale,
453        collation: col.collation.clone(),
454    }
455}
456
457/// Convert SqlValue to JSON representation
458fn sql_value_to_json(value: &SqlValue) -> serde_json::Value {
459    match value {
460        SqlValue::Integer(v) => serde_json::Value::Number((*v).into()),
461        SqlValue::Smallint(v) => serde_json::Value::Number((*v).into()),
462        SqlValue::Bigint(v) => serde_json::Value::Number((*v).into()),
463        SqlValue::Unsigned(v) => serde_json::Value::Number((*v).into()),
464        SqlValue::Numeric(v) => serde_json::json!(v),
465        SqlValue::Float(v) => serde_json::json!(v),
466        SqlValue::Real(v) => serde_json::json!(v),
467        SqlValue::Double(v) => serde_json::json!(v),
468        SqlValue::Character(s) | SqlValue::Varchar(s) => serde_json::Value::String(s.to_string()),
469        SqlValue::Boolean(b) => serde_json::Value::Bool(*b),
470        SqlValue::Date(d) => serde_json::Value::String(d.to_string()),
471        SqlValue::Time(t) => serde_json::Value::String(t.to_string()),
472        SqlValue::Timestamp(ts) => serde_json::Value::String(ts.to_string()),
473        SqlValue::Interval(i) => serde_json::Value::String(i.to_string()),
474        SqlValue::Vector(v) => {
475            // Serialize vector as JSON array of floats
476            serde_json::Value::Array(v.iter().map(|f| serde_json::json!(f)).collect())
477        }
478        SqlValue::Blob(b) => {
479            // Serialize blob as hex string with x'' prefix
480            let hex: String = b.iter().map(|byte| format!("{:02X}", byte)).collect();
481            serde_json::Value::String(format!("x'{}'", hex))
482        }
483        SqlValue::Null => serde_json::Value::Null,
484    }
485}
486
487/// Convert JsonDatabase to Database
488fn json_database_to_db(json_db: JsonDatabase) -> Result<Database, StorageError> {
489    let mut db = Database::new();
490
491    // Create schemas (skip auto-created ones like public/temp schemas)
492    for schema in json_db.schemas {
493        // Skip schemas that already exist (e.g., default and temp schemas are auto-created)
494        // Also skip any temp schemas (temp_1, temp_2, etc.) from old data files
495        if schema.name == vibesql_catalog::DEFAULT_SCHEMA
496            || vibesql_catalog::Catalog::is_temp_schema(&schema.name)
497        {
498            continue;
499        }
500        db.catalog.create_schema(schema.name).map_err(|e: vibesql_catalog::CatalogError| {
501            StorageError::CatalogError(e.to_string())
502        })?;
503    }
504
505    // Create roles
506    for role in json_db.roles {
507        db.catalog.create_role(role.name).map_err(|e: vibesql_catalog::CatalogError| {
508            StorageError::CatalogError(e.to_string())
509        })?;
510    }
511
512    // Create tables and insert data
513    for json_table in json_db.tables {
514        // Build table schema
515        let columns: Vec<ColumnSchema> =
516            json_table.columns.iter().map(json_column_to_schema).collect::<Result<Vec<_>, _>>()?;
517
518        let table_schema = TableSchema::new(json_table.name.clone(), columns);
519
520        // Create table - Database::create_table() automatically qualifies with default schema
521        db.create_table(table_schema.clone())?;
522
523        // Insert rows using the default schema (since that's where create_table puts the table)
524        if !json_table.rows.is_empty() {
525            let default_schema = db.catalog.get_current_schema();
526            let qualified_name = format!("{}.{}", default_schema, json_table.name);
527            if let Some(table) = db.get_table_mut(&qualified_name) {
528                for json_row in json_table.rows {
529                    let row = json_row_to_row(&json_row, &table_schema)?;
530                    table.insert(row)?;
531                }
532            }
533        }
534    }
535
536    // Views are preserved in JSON export but not automatically recreated during import.
537    // This is intentional - recreating views would require SQL parsing and execution
538    // during deserialization, which could fail or have side effects.
539    // Views should be recreated manually or via SQL execution after loading.
540    if !json_db.views.is_empty() {
541        log::warn!(
542            "{} view(s) found in JSON export but not automatically recreated. \
543             Please recreate views manually using CREATE VIEW statements.",
544            json_db.views.len()
545        );
546    }
547
548    // Create indexes
549    for json_index in json_db.indexes {
550        let columns: Vec<vibesql_ast::IndexColumn> = json_index
551            .columns
552            .iter()
553            .filter_map(|c| match c {
554                JsonIndexColumn::Column { name, direction, prefix_length } => {
555                    Some(vibesql_ast::IndexColumn::Column {
556                        column_name: name.clone(),
557                        direction: if direction == "DESC" {
558                            OrderDirection::Desc
559                        } else {
560                            OrderDirection::Asc
561                        },
562                        prefix_length: *prefix_length,
563                    })
564                }
565                JsonIndexColumn::Expression { expression: _, direction: _ } => {
566                    // Expression indexes cannot be recreated without SQL parsing
567                    log::warn!("Skipping expression index column during JSON import - expression indexes require SQL parsing");
568                    None
569                }
570            })
571            .collect();
572
573        // Only create index if it has at least one column
574        if !columns.is_empty() {
575            db.create_index(json_index.name, json_index.table, json_index.unique, columns)?;
576        }
577    }
578
579    Ok(db)
580}
581
582/// Convert JsonColumn to ColumnSchema
583fn json_column_to_schema(col: &JsonColumn) -> Result<ColumnSchema, StorageError> {
584    let data_type = parse_data_type(&col.data_type, col.max_length, col.precision, col.scale)?;
585    Ok(ColumnSchema {
586        name: col.name.clone(),
587        data_type,
588        nullable: col.nullable,
589        default_value: None,
590        generated_expr: None,
591        collation: col.collation.clone(),
592        // Default to false for backward compatibility with existing databases
593        // New tables will have this set correctly at creation time
594        is_exact_integer_type: false,
595    })
596}
597
598/// Parse data type from string
599fn parse_data_type(
600    type_str: &str,
601    max_length: Option<usize>,
602    precision: Option<u8>,
603    scale: Option<u8>,
604) -> Result<DataType, StorageError> {
605    match type_str.to_uppercase().as_str() {
606        "INTEGER" | "INT" => Ok(DataType::Integer),
607        "SMALLINT" => Ok(DataType::Smallint),
608        "BIGINT" => Ok(DataType::Bigint),
609        "UNSIGNED" => Ok(DataType::Unsigned),
610        "NUMERIC" => {
611            Ok(DataType::Numeric { precision: precision.unwrap_or(38), scale: scale.unwrap_or(0) })
612        }
613        "DECIMAL" => {
614            Ok(DataType::Decimal { precision: precision.unwrap_or(38), scale: scale.unwrap_or(0) })
615        }
616        "FLOAT" => Ok(DataType::Float { precision: precision.unwrap_or(53) }),
617        "REAL" => Ok(DataType::Real),
618        "DOUBLE PRECISION" | "DOUBLE" => Ok(DataType::DoublePrecision),
619        "CHAR" | "CHARACTER" => Ok(DataType::Character { length: max_length.unwrap_or(1) }),
620        "VARCHAR" => Ok(DataType::Varchar { max_length }),
621        "CLOB" => Ok(DataType::CharacterLargeObject),
622        "NAME" => Ok(DataType::Name),
623        "BOOLEAN" | "BOOL" => Ok(DataType::Boolean),
624        "DATE" => Ok(DataType::Date),
625        "TIME" => Ok(DataType::Time { with_timezone: false }),
626        "TIME WITH TIME ZONE" => Ok(DataType::Time { with_timezone: true }),
627        "TIMESTAMP" | "DATETIME" => Ok(DataType::Timestamp { with_timezone: false }),
628        "TIMESTAMP WITH TIME ZONE" | "DATETIME WITH TIME ZONE" => {
629            Ok(DataType::Timestamp { with_timezone: true })
630        }
631        "INTERVAL" => Ok(DataType::Interval {
632            start_field: vibesql_types::IntervalField::Day,
633            end_field: None,
634        }),
635        "BLOB" => Ok(DataType::BinaryLargeObject),
636        "NULL" => Ok(DataType::Null),
637        other => Ok(DataType::UserDefined { type_name: other.to_string() }),
638    }
639}
640
641/// Convert JsonRow to Row
642fn json_row_to_row(json_row: &JsonRow, schema: &TableSchema) -> Result<Row, StorageError> {
643    let mut values = Vec::new();
644    for col in &schema.columns {
645        let json_value = json_row.get(&col.name).ok_or_else(|| {
646            StorageError::NotImplemented(format!("Missing column '{}' in JSON row", col.name))
647        })?;
648        values.push(json_value_to_sql(json_value, &col.data_type)?);
649    }
650    Ok(Row::new(values))
651}
652
653/// Convert JSON value to SqlValue
654fn json_value_to_sql(
655    json_value: &serde_json::Value,
656    data_type: &DataType,
657) -> Result<SqlValue, StorageError> {
658    match (json_value, data_type) {
659        (serde_json::Value::Null, _) => Ok(SqlValue::Null),
660        (serde_json::Value::Number(n), DataType::Integer) => n
661            .as_i64()
662            .map(SqlValue::Integer)
663            .ok_or_else(|| StorageError::NotImplemented(format!("Invalid integer value: {}", n))),
664        (serde_json::Value::Number(n), DataType::Smallint) => {
665            n.as_i64().and_then(|v| i16::try_from(v).ok()).map(SqlValue::Smallint).ok_or_else(
666                || StorageError::NotImplemented(format!("Invalid smallint value: {}", n)),
667            )
668        }
669        (serde_json::Value::Number(n), DataType::Bigint) => n
670            .as_i64()
671            .map(SqlValue::Bigint)
672            .ok_or_else(|| StorageError::NotImplemented(format!("Invalid bigint value: {}", n))),
673        (serde_json::Value::Number(n), DataType::Unsigned) => n
674            .as_u64()
675            .map(SqlValue::Unsigned)
676            .ok_or_else(|| StorageError::NotImplemented(format!("Invalid unsigned value: {}", n))),
677        (serde_json::Value::Number(n), DataType::Numeric { .. })
678        | (serde_json::Value::Number(n), DataType::Decimal { .. }) => n
679            .as_f64()
680            .map(SqlValue::Numeric)
681            .ok_or_else(|| StorageError::NotImplemented(format!("Invalid numeric value: {}", n))),
682        (serde_json::Value::Number(n), DataType::Float { .. }) => n
683            .as_f64()
684            .map(|v| SqlValue::Float(v as f32))
685            .ok_or_else(|| StorageError::NotImplemented(format!("Invalid float value: {}", n))),
686        (serde_json::Value::Number(n), DataType::Real) => n
687            .as_f64()
688            .map(SqlValue::Real)  // Real is now f64
689            .ok_or_else(|| StorageError::NotImplemented(format!("Invalid real value: {}", n))),
690        (serde_json::Value::Number(n), DataType::DoublePrecision) => n
691            .as_f64()
692            .map(SqlValue::Double)
693            .ok_or_else(|| StorageError::NotImplemented(format!("Invalid double value: {}", n))),
694        (serde_json::Value::String(s), DataType::Character { .. }) => {
695            Ok(SqlValue::Character(arcstr::ArcStr::from(s.as_str())))
696        }
697        (serde_json::Value::String(s), DataType::Varchar { .. })
698        | (serde_json::Value::String(s), DataType::Name) => {
699            Ok(SqlValue::Varchar(arcstr::ArcStr::from(s.as_str())))
700        }
701        (serde_json::Value::Bool(b), DataType::Boolean) => Ok(SqlValue::Boolean(*b)),
702        (serde_json::Value::String(s), DataType::Date) => s
703            .parse()
704            .map(SqlValue::Date)
705            .map_err(|e| StorageError::NotImplemented(format!("Invalid date: {}", e))),
706        (serde_json::Value::String(s), DataType::Time { .. }) => s
707            .parse()
708            .map(SqlValue::Time)
709            .map_err(|e| StorageError::NotImplemented(format!("Invalid time: {}", e))),
710        (serde_json::Value::String(s), DataType::Timestamp { .. }) => s
711            .parse()
712            .map(SqlValue::Timestamp)
713            .map_err(|e| StorageError::NotImplemented(format!("Invalid timestamp: {}", e))),
714        (serde_json::Value::String(s), DataType::Interval { .. }) => s
715            .parse()
716            .map(SqlValue::Interval)
717            .map_err(|e| StorageError::NotImplemented(format!("Invalid interval: {}", e))),
718        _ => Err(StorageError::NotImplemented(format!(
719            "Unsupported JSON value {:?} for type {:?}",
720            json_value, data_type
721        ))),
722    }
723}