use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::IdbError;
#[derive(Debug, Deserialize)]
pub struct SdiEnvelope {
#[serde(default)]
pub mysqld_version_id: u64,
#[serde(default)]
pub dd_object_type: String,
#[serde(default)]
pub dd_object: DdTable,
}
#[derive(Debug, Default, Deserialize)]
pub struct DdTable {
#[serde(default)]
pub name: String,
#[serde(default)]
pub schema_ref: String,
#[serde(default)]
pub engine: String,
#[serde(default)]
pub collation_id: u64,
#[serde(default)]
pub row_format: u64,
#[serde(default)]
pub comment: String,
#[serde(default)]
pub columns: Vec<DdColumn>,
#[serde(default)]
pub indexes: Vec<DdIndex>,
#[serde(default)]
pub foreign_keys: Vec<DdForeignKey>,
#[serde(default)]
pub mysql_version_id: u64,
#[serde(default)]
pub se_private_data: Option<String>,
#[serde(default)]
pub se_private_id: u64,
}
#[derive(Debug, Default, Deserialize)]
pub struct DdColumn {
#[serde(default)]
pub name: String,
#[serde(rename = "type", default)]
pub dd_type: u64,
#[serde(default)]
pub column_type_utf8: String,
#[serde(default)]
pub ordinal_position: u64,
#[serde(default)]
pub hidden: u64,
#[serde(default)]
pub is_nullable: bool,
#[serde(default)]
pub is_unsigned: bool,
#[serde(default)]
pub is_auto_increment: bool,
#[serde(default)]
pub is_virtual: bool,
#[serde(default)]
pub char_length: u64,
#[serde(default)]
pub numeric_precision: u64,
#[serde(default)]
pub numeric_scale: u64,
#[serde(default)]
pub datetime_precision: u64,
#[serde(default)]
pub collation_id: u64,
#[serde(default)]
pub default_value_utf8: String,
#[serde(default)]
pub default_value_utf8_null: bool,
#[serde(default)]
pub has_no_default: bool,
#[serde(default)]
pub default_option: String,
#[serde(default)]
pub update_option: String,
#[serde(default)]
pub generation_expression: String,
#[serde(default)]
pub generation_expression_utf8: String,
#[serde(default)]
pub elements: Vec<DdColumnElement>,
#[serde(default)]
pub comment: String,
#[serde(default)]
pub is_zerofill: bool,
#[serde(default)]
pub se_private_data: Option<String>,
}
#[derive(Debug, Default, Deserialize)]
pub struct DdColumnElement {
#[serde(default)]
pub name: String,
}
#[derive(Debug, Default, Deserialize)]
pub struct DdIndex {
#[serde(default)]
pub name: String,
#[serde(rename = "type", default)]
pub index_type: u64,
#[serde(default)]
pub algorithm: u64,
#[serde(default)]
pub hidden: bool,
#[serde(default)]
pub elements: Vec<DdIndexElement>,
#[serde(default)]
pub comment: String,
#[serde(default)]
pub is_visible: bool,
}
#[derive(Debug, Default, Deserialize)]
pub struct DdIndexElement {
#[serde(default)]
pub column_opx: u64,
#[serde(default)]
pub length: u64,
#[serde(default)]
pub order: u64,
#[serde(default)]
pub hidden: bool,
}
#[derive(Debug, Default, Deserialize)]
pub struct DdForeignKey {
#[serde(default)]
pub name: String,
#[serde(default)]
pub referenced_table_schema_name: String,
#[serde(default)]
pub referenced_table_name: String,
#[serde(default)]
pub update_rule: u64,
#[serde(default)]
pub delete_rule: u64,
#[serde(default)]
pub elements: Vec<DdForeignKeyElement>,
}
#[derive(Debug, Default, Deserialize)]
pub struct DdForeignKeyElement {
#[serde(default)]
pub column_opx: u64,
#[serde(default)]
pub referenced_column_name: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct TableSchema {
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_name: Option<String>,
pub table_name: String,
pub engine: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub row_format: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub collation: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub charset: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mysql_version: Option<String>,
pub source: String,
pub columns: Vec<ColumnDef>,
pub indexes: Vec<IndexDef>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub foreign_keys: Vec<ForeignKeyDef>,
pub ddl: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct ColumnDef {
pub name: String,
pub column_type: String,
pub is_nullable: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub default_value: Option<String>,
#[serde(skip_serializing_if = "is_false")]
pub is_auto_increment: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation_expression: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_virtual: Option<bool>,
#[serde(skip_serializing_if = "is_false")]
pub is_invisible: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version_added: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub version_dropped: Option<u32>,
}
fn is_false(v: &bool) -> bool {
!v
}
#[derive(Debug, Clone, Serialize)]
pub struct IndexDef {
pub name: String,
pub index_type: String,
pub columns: Vec<IndexColumnDef>,
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
#[serde(skip_serializing_if = "is_true")]
pub is_visible: bool,
}
fn is_true(v: &bool) -> bool {
*v
}
#[derive(Debug, Clone, Serialize)]
pub struct IndexColumnDef {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub prefix_length: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub order: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ForeignKeyDef {
pub name: String,
pub columns: Vec<String>,
pub referenced_table: String,
pub referenced_columns: Vec<String>,
pub on_update: String,
pub on_delete: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct InferredSchema {
pub source: String,
pub record_format: String,
pub indexes: Vec<InferredIndex>,
}
#[derive(Debug, Clone, Serialize)]
pub struct InferredIndex {
pub index_id: u64,
pub leaf_pages: u64,
pub max_level: u16,
}
pub fn collation_name(id: u64) -> Option<&'static str> {
match id {
2 => Some("latin2_czech_cs"),
8 => Some("latin1_swedish_ci"),
11 => Some("ascii_general_ci"),
33 => Some("utf8mb3_general_ci"),
45 => Some("utf8mb4_general_ci"),
46 => Some("utf8mb4_bin"),
47 => Some("latin1_bin"),
48 => Some("latin1_general_ci"),
63 => Some("binary"),
83 => Some("utf8mb3_bin"),
224 => Some("utf8mb4_unicode_ci"),
255 => Some("utf8mb4_0900_ai_ci"),
_ => None,
}
}
pub fn charset_from_collation(id: u64) -> Option<&'static str> {
match id {
2 => Some("latin2"),
8 | 47 | 48 => Some("latin1"),
11 => Some("ascii"),
33 | 83 => Some("utf8mb3"),
45 | 46 | 224 | 255 => Some("utf8mb4"),
63 => Some("binary"),
_ => None,
}
}
pub fn row_format_name(id: u64) -> &'static str {
match id {
1 => "FIXED",
2 => "DYNAMIC",
3 => "COMPRESSED",
4 => "REDUNDANT",
5 => "COMPACT",
_ => "UNKNOWN",
}
}
pub fn fk_rule_name(rule: u64) -> &'static str {
match rule {
0 => "NO ACTION",
1 => "RESTRICT",
2 => "CASCADE",
3 => "SET NULL",
4 => "SET DEFAULT",
_ => "NO ACTION",
}
}
pub fn dd_type_to_sql(col: &DdColumn) -> String {
match col.dd_type {
1 => "tinyint".to_string(),
2 => "smallint".to_string(),
3 => "mediumint".to_string(),
4 => "int".to_string(),
5 => "bigint".to_string(),
6 => format_decimal(col),
7 => "float".to_string(),
8 => "double".to_string(),
9 | 10 => "binary".to_string(), 11 => "year".to_string(),
12 => "date".to_string(),
13 => "time".to_string(),
14 => "datetime".to_string(),
15 => "timestamp".to_string(),
16 => format_varchar(col),
17 => format_char(col),
18 => "bit".to_string(),
19 => "enum".to_string(),
20 => "set".to_string(),
23 => "tinyblob".to_string(),
24 => "mediumblob".to_string(),
25 => "longblob".to_string(),
26 => "blob".to_string(),
27 => format_text(col),
28 => "varbinary".to_string(),
29 => "binary".to_string(),
30 => "geometry".to_string(),
31 => "json".to_string(),
_ => format!("unknown_type({})", col.dd_type),
}
}
fn format_decimal(col: &DdColumn) -> String {
if col.numeric_precision > 0 {
if col.numeric_scale > 0 {
format!("decimal({},{})", col.numeric_precision, col.numeric_scale)
} else {
format!("decimal({})", col.numeric_precision)
}
} else {
"decimal".to_string()
}
}
fn format_varchar(col: &DdColumn) -> String {
let max_bytes_per_char = charset_max_bytes(col.collation_id);
let char_len = if max_bytes_per_char > 0 {
col.char_length / max_bytes_per_char
} else {
col.char_length
};
format!("varchar({})", char_len)
}
fn format_char(col: &DdColumn) -> String {
let max_bytes_per_char = charset_max_bytes(col.collation_id);
let char_len = if max_bytes_per_char > 0 {
col.char_length / max_bytes_per_char
} else {
col.char_length
};
format!("char({})", char_len.max(1))
}
fn format_text(col: &DdColumn) -> String {
match col.char_length {
0..=255 => "tinytext".to_string(),
256..=65535 => "text".to_string(),
65536..=16777215 => "mediumtext".to_string(),
_ => "longtext".to_string(),
}
}
fn charset_max_bytes(collation_id: u64) -> u64 {
match collation_id {
2 | 8 | 11 | 47 | 48 => 1, 33 | 83 => 3, 45 | 46 | 224 | 255 => 4, 63 => 1, _ => 4, }
}
fn format_mysql_version(version_id: u64) -> String {
if version_id == 0 {
return "unknown".to_string();
}
let major = version_id / 10000;
let minor = (version_id % 10000) / 100;
let patch = version_id % 100;
format!("{}.{}.{}", major, minor, patch)
}
pub fn extract_schema_from_sdi(sdi_json: &str) -> Result<TableSchema, IdbError> {
let envelope: SdiEnvelope = serde_json::from_str(sdi_json)
.map_err(|e| IdbError::Parse(format!("Failed to parse SDI JSON: {}", e)))?;
let dd = &envelope.dd_object;
let all_columns: Vec<&DdColumn> = {
let mut cols: Vec<&DdColumn> = dd.columns.iter().collect();
cols.sort_by_key(|c| c.ordinal_position);
cols
};
let column_by_index: HashMap<u64, &DdColumn> = dd
.columns
.iter()
.enumerate()
.map(|(i, c)| (i as u64, c))
.collect();
let visible_columns: Vec<&DdColumn> = {
let mut cols: Vec<&DdColumn> = all_columns
.iter()
.copied()
.filter(|c| c.hidden == 1 || c.hidden == 4)
.collect();
cols.sort_by_key(|c| c.ordinal_position);
cols
};
let columns: Vec<ColumnDef> = visible_columns
.iter()
.map(|c| build_column_def(c))
.collect();
let indexes: Vec<IndexDef> = dd
.indexes
.iter()
.filter(|idx| !idx.hidden)
.map(|idx| build_index_def(idx, &column_by_index))
.collect();
let foreign_keys: Vec<ForeignKeyDef> = dd
.foreign_keys
.iter()
.map(|fk| build_fk_def(fk, &column_by_index))
.collect();
let row_fmt = row_format_name(dd.row_format);
let coll = collation_name(dd.collation_id);
let cs = charset_from_collation(dd.collation_id);
let mysql_ver = format_mysql_version(envelope.mysqld_version_id);
let schema_name = if dd.schema_ref.is_empty() {
None
} else {
Some(dd.schema_ref.clone())
};
let comment = if dd.comment.is_empty() {
None
} else {
Some(dd.comment.clone())
};
let mut schema = TableSchema {
schema_name,
table_name: dd.name.clone(),
engine: dd.engine.clone(),
row_format: Some(row_fmt.to_string()),
collation: coll.map(|s| s.to_string()),
charset: cs.map(|s| s.to_string()),
comment,
mysql_version: Some(mysql_ver),
source: "sdi".to_string(),
columns,
indexes,
foreign_keys,
ddl: String::new(),
};
schema.ddl = generate_ddl(&schema);
Ok(schema)
}
fn build_column_def(col: &DdColumn) -> ColumnDef {
let column_type = if !col.column_type_utf8.is_empty() {
col.column_type_utf8.clone()
} else {
dd_type_to_sql(col)
};
let default_value = if !col.default_option.is_empty() {
Some(col.default_option.clone())
} else if !col.has_no_default && !col.default_value_utf8_null {
let is_numeric = matches!(col.dd_type, 1..=8);
if is_numeric {
Some(col.default_value_utf8.clone())
} else {
Some(format!("'{}'", col.default_value_utf8.replace('\'', "''")))
}
} else if !col.has_no_default && col.is_nullable && col.default_value_utf8_null {
Some("NULL".to_string())
} else {
None
};
let generation_expression = if !col.generation_expression_utf8.is_empty() {
Some(col.generation_expression_utf8.clone())
} else {
None
};
let is_virtual = if generation_expression.is_some() {
Some(col.is_virtual)
} else {
None
};
let comment = if col.comment.is_empty() {
None
} else {
Some(col.comment.clone())
};
let (version_added, version_dropped) = if let Some(ref spd) = col.se_private_data {
let map = parse_se_private_data(spd);
(
map.get("version_added").and_then(|v| v.parse().ok()),
map.get("version_dropped").and_then(|v| v.parse().ok()),
)
} else {
(None, None)
};
ColumnDef {
name: col.name.clone(),
column_type,
is_nullable: col.is_nullable,
default_value,
is_auto_increment: col.is_auto_increment,
generation_expression,
is_invisible: col.hidden == 4,
is_virtual,
comment,
version_added,
version_dropped,
}
}
fn build_index_def(idx: &DdIndex, columns: &HashMap<u64, &DdColumn>) -> IndexDef {
let index_type = match idx.index_type {
1 => "PRIMARY KEY",
2 => "UNIQUE KEY",
3 => "KEY",
4 => "FULLTEXT KEY",
5 => "SPATIAL KEY",
_ => "KEY",
};
let idx_columns: Vec<IndexColumnDef> = idx
.elements
.iter()
.filter(|e| !e.hidden)
.map(|e| {
let col_name = columns
.get(&e.column_opx)
.map(|c| c.name.clone())
.unwrap_or_else(|| format!("col_{}", e.column_opx));
let prefix_length = if e.length < 4294967295 {
let col = columns.get(&e.column_opx);
let full_len = col.map(|c| c.char_length).unwrap_or(0);
let max_bytes = col.map(|c| charset_max_bytes(c.collation_id)).unwrap_or(4);
let full_char_len = if max_bytes > 0 {
full_len / max_bytes
} else {
full_len
};
if e.length < full_char_len {
let prefix_chars = if max_bytes > 0 {
e.length / max_bytes
} else {
e.length
};
if prefix_chars > 0 {
Some(prefix_chars)
} else {
Some(e.length)
}
} else {
None
}
} else {
None
};
let order = if e.order == 1 {
Some("DESC".to_string())
} else {
None
};
IndexColumnDef {
name: col_name,
prefix_length,
order,
}
})
.collect();
let comment = if idx.comment.is_empty() {
None
} else {
Some(idx.comment.clone())
};
IndexDef {
name: idx.name.clone(),
index_type: index_type.to_string(),
columns: idx_columns,
comment,
is_visible: idx.is_visible,
}
}
fn build_fk_def(fk: &DdForeignKey, columns: &HashMap<u64, &DdColumn>) -> ForeignKeyDef {
let fk_columns: Vec<String> = fk
.elements
.iter()
.map(|e| {
columns
.get(&e.column_opx)
.map(|c| c.name.clone())
.unwrap_or_else(|| format!("col_{}", e.column_opx))
})
.collect();
let ref_columns: Vec<String> = fk
.elements
.iter()
.map(|e| e.referenced_column_name.clone())
.collect();
let ref_table = if fk.referenced_table_schema_name.is_empty() {
format!("`{}`", fk.referenced_table_name)
} else {
format!(
"`{}`.`{}`",
fk.referenced_table_schema_name, fk.referenced_table_name
)
};
ForeignKeyDef {
name: fk.name.clone(),
columns: fk_columns,
referenced_table: ref_table,
referenced_columns: ref_columns,
on_update: fk_rule_name(fk.update_rule).to_string(),
on_delete: fk_rule_name(fk.delete_rule).to_string(),
}
}
pub fn parse_se_private_data(data: &str) -> HashMap<String, String> {
data.split(';')
.filter(|s| !s.is_empty())
.filter_map(|kv| {
let mut parts = kv.splitn(2, '=');
let k = parts.next()?.to_string();
let v = parts.next()?.to_string();
Some((k, v))
})
.collect()
}
pub fn has_instant_columns(se_private_data: &str) -> bool {
parse_se_private_data(se_private_data).contains_key("instant_col")
}
pub fn generate_ddl(schema: &TableSchema) -> String {
let mut ddl = format!("CREATE TABLE `{}` (\n", schema.table_name);
let mut parts: Vec<String> = Vec::new();
for col in &schema.columns {
parts.push(format_column_ddl(col));
}
for idx in &schema.indexes {
parts.push(format_index_ddl(idx));
}
for fk in &schema.foreign_keys {
parts.push(format_fk_ddl(fk));
}
ddl.push_str(&parts.join(",\n"));
ddl.push_str("\n)");
let mut options = Vec::new();
options.push(format!("ENGINE={}", schema.engine));
if let Some(ref cs) = schema.charset {
options.push(format!("DEFAULT CHARSET={}", cs));
}
if let Some(ref coll) = schema.collation {
options.push(format!("COLLATE={}", coll));
}
if let Some(ref fmt) = schema.row_format {
if fmt != "DYNAMIC" {
options.push(format!("ROW_FORMAT={}", fmt));
}
}
if let Some(ref comment) = schema.comment {
options.push(format!("COMMENT='{}'", comment.replace('\'', "''")));
}
if !options.is_empty() {
ddl.push(' ');
ddl.push_str(&options.join(" "));
}
ddl.push(';');
ddl
}
fn format_column_ddl(col: &ColumnDef) -> String {
let mut parts = vec![format!(" `{}` {}", col.name, col.column_type)];
if !col.is_nullable {
parts.push("NOT NULL".to_string());
}
if let Some(ref default) = col.default_value {
parts.push(format!("DEFAULT {}", default));
}
if col.is_auto_increment {
parts.push("AUTO_INCREMENT".to_string());
}
if let Some(ref expr) = col.generation_expression {
let stored_or_virtual = if col.is_virtual == Some(true) {
"VIRTUAL"
} else {
"STORED"
};
parts.push(format!(
"GENERATED ALWAYS AS ({}) {}",
expr, stored_or_virtual
));
}
if col.is_invisible {
parts.push("/*!80023 INVISIBLE */".to_string());
}
if let Some(ref comment) = col.comment {
parts.push(format!("COMMENT '{}'", comment.replace('\'', "''")));
}
parts.join(" ")
}
fn format_index_ddl(idx: &IndexDef) -> String {
let cols = format_index_columns(&idx.columns);
let visibility = if !idx.is_visible {
" /*!80000 INVISIBLE */"
} else {
""
};
let comment = if let Some(ref c) = idx.comment {
format!(" COMMENT '{}'", c.replace('\'', "''"))
} else {
String::new()
};
match idx.index_type.as_str() {
"PRIMARY KEY" => format!(" PRIMARY KEY ({}){}{}", cols, comment, visibility),
_ => format!(
" {} `{}` ({}){}{}",
idx.index_type, idx.name, cols, comment, visibility
),
}
}
fn format_index_columns(columns: &[IndexColumnDef]) -> String {
columns
.iter()
.map(|c| {
let mut s = format!("`{}`", c.name);
if let Some(len) = c.prefix_length {
s.push_str(&format!("({})", len));
}
if let Some(ref ord) = c.order {
s.push(' ');
s.push_str(ord);
}
s
})
.collect::<Vec<_>>()
.join(", ")
}
fn format_fk_ddl(fk: &ForeignKeyDef) -> String {
let cols = fk
.columns
.iter()
.map(|c| format!("`{}`", c))
.collect::<Vec<_>>()
.join(", ");
let ref_cols = fk
.referenced_columns
.iter()
.map(|c| format!("`{}`", c))
.collect::<Vec<_>>()
.join(", ");
let mut s = format!(
" CONSTRAINT `{}` FOREIGN KEY ({}) REFERENCES {} ({})",
fk.name, cols, fk.referenced_table, ref_cols
);
if fk.on_delete != "NO ACTION" {
s.push_str(&format!(" ON DELETE {}", fk.on_delete));
}
if fk.on_update != "NO ACTION" {
s.push_str(&format!(" ON UPDATE {}", fk.on_update));
}
s
}
pub fn infer_schema_from_pages(
ts: &mut crate::innodb::tablespace::Tablespace,
) -> Result<InferredSchema, IdbError> {
use crate::innodb::index::IndexHeader;
use crate::innodb::page::FilHeader;
use crate::innodb::page_types::PageType;
use std::collections::BTreeMap;
let page_count = ts.page_count();
let mut is_compact = true;
let mut index_stats: BTreeMap<u64, (u64, u16)> = BTreeMap::new();
for page_num in 0..page_count {
let page_data = match ts.read_page(page_num) {
Ok(d) => d,
Err(_) => continue,
};
let header = match FilHeader::parse(&page_data) {
Some(h) => h,
None => continue,
};
if header.page_type != PageType::Index {
continue;
}
let idx = match IndexHeader::parse(&page_data) {
Some(h) => h,
None => continue,
};
if !idx.is_compact() {
is_compact = false;
}
let entry = index_stats.entry(idx.index_id).or_insert((0, 0));
if idx.is_leaf() {
entry.0 += 1;
}
if idx.level > entry.1 {
entry.1 = idx.level;
}
}
let indexes = index_stats
.into_iter()
.map(|(index_id, (leaf_pages, max_level))| InferredIndex {
index_id,
leaf_pages,
max_level,
})
.collect();
Ok(InferredSchema {
source: "Inferred (no SDI metadata available)".to_string(),
record_format: if is_compact { "COMPACT" } else { "REDUNDANT" }.to_string(),
indexes,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_se_private_data() {
let data = "version_added=1;physical_pos=5;";
let map = parse_se_private_data(data);
assert_eq!(map.get("version_added"), Some(&"1".to_string()));
assert_eq!(map.get("physical_pos"), Some(&"5".to_string()));
}
#[test]
fn test_parse_se_private_data_empty() {
let map = parse_se_private_data("");
assert!(map.is_empty());
}
#[test]
fn test_has_instant_columns() {
assert!(has_instant_columns("instant_col=3;table_id=1234;"));
assert!(!has_instant_columns("table_id=1234;"));
assert!(!has_instant_columns(""));
}
#[test]
fn test_extract_schema_with_instant_columns() {
let json = r#"{
"mysqld_version_id": 80040,
"dd_object_type": "Table",
"dd_object": {
"name": "t",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"se_private_data": "instant_col=1;",
"columns": [
{
"name": "id",
"type": 4,
"column_type_utf8": "int",
"ordinal_position": 1,
"hidden": 1,
"is_nullable": false,
"is_auto_increment": true,
"has_no_default": true
},
{
"name": "added_col",
"type": 4,
"column_type_utf8": "int",
"ordinal_position": 2,
"hidden": 1,
"is_nullable": true,
"se_private_data": "version_added=1;physical_pos=2;"
}
],
"indexes": [],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.columns.len(), 2);
assert_eq!(schema.columns[1].name, "added_col");
assert_eq!(schema.columns[1].version_added, Some(1));
assert_eq!(schema.columns[1].version_dropped, None);
let envelope: SdiEnvelope = serde_json::from_str(json).unwrap();
assert!(has_instant_columns(
envelope.dd_object.se_private_data.as_deref().unwrap_or("")
));
}
#[test]
fn test_collation_name() {
assert_eq!(collation_name(255), Some("utf8mb4_0900_ai_ci"));
assert_eq!(collation_name(63), Some("binary"));
assert_eq!(collation_name(45), Some("utf8mb4_general_ci"));
assert_eq!(collation_name(46), Some("utf8mb4_bin"));
assert_eq!(collation_name(33), Some("utf8mb3_general_ci"));
assert_eq!(collation_name(0), None);
}
#[test]
fn test_charset_from_collation() {
assert_eq!(charset_from_collation(255), Some("utf8mb4"));
assert_eq!(charset_from_collation(63), Some("binary"));
assert_eq!(charset_from_collation(8), Some("latin1"));
assert_eq!(charset_from_collation(33), Some("utf8mb3"));
assert_eq!(charset_from_collation(0), None);
}
#[test]
fn test_row_format_name() {
assert_eq!(row_format_name(1), "FIXED");
assert_eq!(row_format_name(2), "DYNAMIC");
assert_eq!(row_format_name(3), "COMPRESSED");
assert_eq!(row_format_name(99), "UNKNOWN");
}
#[test]
fn test_fk_rule_name() {
assert_eq!(fk_rule_name(0), "NO ACTION");
assert_eq!(fk_rule_name(1), "RESTRICT");
assert_eq!(fk_rule_name(2), "CASCADE");
assert_eq!(fk_rule_name(3), "SET NULL");
assert_eq!(fk_rule_name(4), "SET DEFAULT");
}
#[test]
fn test_dd_type_to_sql_int() {
let col = DdColumn {
dd_type: 4,
numeric_precision: 10,
..Default::default()
};
assert_eq!(dd_type_to_sql(&col), "int");
}
#[test]
fn test_dd_type_to_sql_varchar() {
let col = DdColumn {
dd_type: 16,
char_length: 400,
collation_id: 255, ..Default::default()
};
assert_eq!(dd_type_to_sql(&col), "varchar(100)");
}
#[test]
fn test_dd_type_to_sql_decimal() {
let col = DdColumn {
dd_type: 6,
numeric_precision: 10,
numeric_scale: 2,
..Default::default()
};
assert_eq!(dd_type_to_sql(&col), "decimal(10,2)");
}
#[test]
fn test_dd_type_to_sql_text() {
let col = DdColumn {
dd_type: 27,
char_length: 65535,
..Default::default()
};
assert_eq!(dd_type_to_sql(&col), "text");
let col = DdColumn {
dd_type: 27,
char_length: 16777215,
..Default::default()
};
assert_eq!(dd_type_to_sql(&col), "mediumtext");
}
#[test]
fn test_format_mysql_version() {
assert_eq!(format_mysql_version(90001), "9.0.1");
assert_eq!(format_mysql_version(80040), "8.0.40");
assert_eq!(format_mysql_version(0), "unknown");
}
#[test]
fn test_extract_schema_from_sdi_minimal() {
let json = r#"{
"mysqld_version_id": 90001,
"dd_object_type": "Table",
"dd_object": {
"name": "users",
"schema_ref": "myapp",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{
"name": "id",
"type": 4,
"column_type_utf8": "int unsigned",
"ordinal_position": 1,
"hidden": 1,
"is_nullable": false,
"is_auto_increment": true,
"has_no_default": true
},
{
"name": "email",
"type": 16,
"column_type_utf8": "varchar(255)",
"ordinal_position": 2,
"hidden": 1,
"is_nullable": false,
"has_no_default": true
},
{
"name": "DB_TRX_ID",
"type": 10,
"ordinal_position": 3,
"hidden": 2
},
{
"name": "DB_ROLL_PTR",
"type": 9,
"ordinal_position": 4,
"hidden": 2
}
],
"indexes": [
{
"name": "PRIMARY",
"type": 1,
"hidden": false,
"is_visible": true,
"elements": [
{ "column_opx": 0, "hidden": false, "length": 4, "order": 2 },
{ "column_opx": 2, "hidden": true, "length": 4294967295, "order": 2 },
{ "column_opx": 3, "hidden": true, "length": 4294967295, "order": 2 }
]
},
{
"name": "idx_email",
"type": 2,
"hidden": false,
"is_visible": true,
"elements": [
{ "column_opx": 1, "hidden": false, "length": 4294967295, "order": 2 },
{ "column_opx": 0, "hidden": true, "length": 4294967295, "order": 2 }
]
}
],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.table_name, "users");
assert_eq!(schema.schema_name, Some("myapp".to_string()));
assert_eq!(schema.engine, "InnoDB");
assert_eq!(schema.source, "sdi");
assert_eq!(schema.columns.len(), 2);
assert_eq!(schema.columns[0].name, "id");
assert_eq!(schema.columns[0].column_type, "int unsigned");
assert!(schema.columns[0].is_auto_increment);
assert_eq!(schema.columns[1].name, "email");
assert_eq!(schema.columns[1].column_type, "varchar(255)");
assert_eq!(schema.indexes.len(), 2);
assert_eq!(schema.indexes[0].index_type, "PRIMARY KEY");
assert_eq!(schema.indexes[0].columns.len(), 1); assert_eq!(schema.indexes[1].index_type, "UNIQUE KEY");
assert_eq!(schema.indexes[1].name, "idx_email");
assert!(schema.ddl.contains("CREATE TABLE `users`"));
assert!(schema
.ddl
.contains("`id` int unsigned NOT NULL AUTO_INCREMENT"));
assert!(schema.ddl.contains("PRIMARY KEY (`id`)"));
assert!(schema.ddl.contains("UNIQUE KEY `idx_email` (`email`)"));
}
#[test]
fn test_extract_schema_with_fk() {
let json = r#"{
"mysqld_version_id": 80040,
"dd_object_type": "Table",
"dd_object": {
"name": "orders",
"schema_ref": "shop",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{
"name": "id",
"type": 4,
"column_type_utf8": "int",
"ordinal_position": 1,
"hidden": 1,
"is_nullable": false,
"is_auto_increment": true
},
{
"name": "user_id",
"type": 4,
"column_type_utf8": "int",
"ordinal_position": 2,
"hidden": 1,
"is_nullable": false
}
],
"indexes": [
{
"name": "PRIMARY",
"type": 1,
"hidden": false,
"is_visible": true,
"elements": [
{ "column_opx": 0, "hidden": false, "length": 4, "order": 2 }
]
}
],
"foreign_keys": [
{
"name": "fk_orders_user",
"referenced_table_schema_name": "shop",
"referenced_table_name": "users",
"update_rule": 0,
"delete_rule": 2,
"elements": [
{ "column_opx": 1, "referenced_column_name": "id" }
]
}
]
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.foreign_keys.len(), 1);
let fk = &schema.foreign_keys[0];
assert_eq!(fk.name, "fk_orders_user");
assert_eq!(fk.columns, vec!["user_id"]);
assert_eq!(fk.referenced_table, "`shop`.`users`");
assert_eq!(fk.referenced_columns, vec!["id"]);
assert_eq!(fk.on_delete, "CASCADE");
assert_eq!(fk.on_update, "NO ACTION");
assert!(schema.ddl.contains("CONSTRAINT `fk_orders_user` FOREIGN KEY (`user_id`) REFERENCES `shop`.`users` (`id`) ON DELETE CASCADE"));
}
#[test]
fn test_extract_schema_with_generated_column() {
let json = r#"{
"mysqld_version_id": 80040,
"dd_object_type": "Table",
"dd_object": {
"name": "products",
"schema_ref": "shop",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{
"name": "price",
"type": 6,
"column_type_utf8": "decimal(10,2)",
"ordinal_position": 1,
"hidden": 1,
"is_nullable": false
},
{
"name": "tax",
"type": 6,
"column_type_utf8": "decimal(10,2)",
"ordinal_position": 2,
"hidden": 1,
"is_nullable": true,
"is_virtual": true,
"generation_expression_utf8": "`price` * 0.1"
}
],
"indexes": [],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.columns.len(), 2);
let tax = &schema.columns[1];
assert_eq!(tax.generation_expression, Some("`price` * 0.1".to_string()));
assert_eq!(tax.is_virtual, Some(true));
assert!(schema
.ddl
.contains("GENERATED ALWAYS AS (`price` * 0.1) VIRTUAL"));
}
#[test]
fn test_ddl_generation_table_options() {
let schema = TableSchema {
schema_name: Some("mydb".to_string()),
table_name: "test".to_string(),
engine: "InnoDB".to_string(),
row_format: Some("COMPRESSED".to_string()),
collation: Some("utf8mb4_0900_ai_ci".to_string()),
charset: Some("utf8mb4".to_string()),
comment: None,
mysql_version: Some("8.0.40".to_string()),
source: "sdi".to_string(),
columns: vec![ColumnDef {
name: "id".to_string(),
column_type: "int".to_string(),
is_nullable: false,
default_value: None,
is_auto_increment: true,
generation_expression: None,
is_invisible: false,
is_virtual: None,
comment: None,
version_added: None,
version_dropped: None,
}],
indexes: vec![IndexDef {
name: "PRIMARY".to_string(),
index_type: "PRIMARY KEY".to_string(),
columns: vec![IndexColumnDef {
name: "id".to_string(),
prefix_length: None,
order: None,
}],
comment: None,
is_visible: true,
}],
foreign_keys: vec![],
ddl: String::new(),
};
let ddl = generate_ddl(&schema);
assert!(ddl.contains("ENGINE=InnoDB"));
assert!(ddl.contains("DEFAULT CHARSET=utf8mb4"));
assert!(ddl.contains("COLLATE=utf8mb4_0900_ai_ci"));
assert!(ddl.contains("ROW_FORMAT=COMPRESSED"));
}
#[test]
fn test_hidden_column_filtering() {
let json = r#"{
"mysqld_version_id": 90001,
"dd_object_type": "Table",
"dd_object": {
"name": "t",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{ "name": "a", "type": 4, "column_type_utf8": "int", "ordinal_position": 1, "hidden": 1 },
{ "name": "b", "type": 4, "column_type_utf8": "int", "ordinal_position": 2, "hidden": 1 },
{ "name": "DB_TRX_ID", "type": 10, "ordinal_position": 3, "hidden": 2 },
{ "name": "DB_ROLL_PTR", "type": 9, "ordinal_position": 4, "hidden": 2 },
{ "name": "DB_ROW_ID", "type": 10, "ordinal_position": 5, "hidden": 2 }
],
"indexes": [],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.columns.len(), 2);
assert_eq!(schema.columns[0].name, "a");
assert_eq!(schema.columns[1].name, "b");
}
#[test]
fn test_nullable_column_default_null() {
let json = r#"{
"mysqld_version_id": 90001,
"dd_object_type": "Table",
"dd_object": {
"name": "t",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{
"name": "notes",
"type": 16,
"column_type_utf8": "varchar(255)",
"ordinal_position": 1,
"hidden": 1,
"is_nullable": true,
"has_no_default": false,
"default_value_utf8": "",
"default_value_utf8_null": true
}
],
"indexes": [],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.columns[0].default_value, Some("NULL".to_string()));
assert!(schema.ddl.contains("DEFAULT NULL"));
}
#[test]
fn test_empty_string_default() {
let json = r#"{
"mysqld_version_id": 90001,
"dd_object_type": "Table",
"dd_object": {
"name": "t",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{
"name": "tag",
"type": 16,
"column_type_utf8": "varchar(50)",
"ordinal_position": 1,
"hidden": 1,
"is_nullable": false,
"has_no_default": false,
"default_value_utf8": "",
"default_value_utf8_null": false
}
],
"indexes": [],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.columns[0].default_value, Some("''".to_string()));
assert!(schema.ddl.contains("DEFAULT ''"));
}
#[test]
fn test_invisible_column() {
let json = r#"{
"mysqld_version_id": 80040,
"dd_object_type": "Table",
"dd_object": {
"name": "t",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{
"name": "id",
"type": 4,
"column_type_utf8": "int",
"ordinal_position": 1,
"hidden": 1,
"is_nullable": false,
"is_auto_increment": true,
"has_no_default": true
},
{
"name": "secret",
"type": 16,
"column_type_utf8": "varchar(100)",
"ordinal_position": 2,
"hidden": 4,
"is_nullable": true,
"has_no_default": false,
"default_value_utf8_null": true
},
{
"name": "DB_TRX_ID",
"type": 10,
"column_type_utf8": "",
"ordinal_position": 3,
"hidden": 2
}
],
"indexes": [],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.columns.len(), 2);
assert_eq!(schema.columns[0].name, "id");
assert!(!schema.columns[0].is_invisible);
assert_eq!(schema.columns[1].name, "secret");
assert!(schema.columns[1].is_invisible);
assert!(schema.ddl.contains("/*!80023 INVISIBLE */"));
assert!(!schema.ddl.contains("DB_TRX_ID"));
}
#[test]
fn test_index_desc_order() {
let json = r#"{
"mysqld_version_id": 80040,
"dd_object_type": "Table",
"dd_object": {
"name": "t",
"engine": "InnoDB",
"collation_id": 255,
"row_format": 2,
"columns": [
{ "name": "a", "type": 4, "column_type_utf8": "int", "ordinal_position": 1, "hidden": 1 },
{ "name": "b", "type": 4, "column_type_utf8": "int", "ordinal_position": 2, "hidden": 1 }
],
"indexes": [
{
"name": "idx_b_desc",
"type": 3,
"hidden": false,
"is_visible": true,
"elements": [
{ "column_opx": 1, "hidden": false, "length": 4294967295, "order": 1 }
]
}
],
"foreign_keys": []
}
}"#;
let schema = extract_schema_from_sdi(json).unwrap();
assert_eq!(schema.indexes[0].columns[0].order, Some("DESC".to_string()));
assert!(schema.ddl.contains("`b` DESC"));
}
}