Skip to main content

dbx_core/engine/
ddl_api.rs

1//! DDL API implementation - Schema management convenience methods
2
3use crate::engine::Database;
4use crate::error::DbxResult;
5use arrow::datatypes::{DataType, Schema};
6use std::sync::Arc;
7
8impl Database {
9    /// Create a new table with the given Arrow schema
10    ///
11    /// This is a convenience wrapper around `execute_sql("CREATE TABLE ...")`.
12    /// It automatically converts the Arrow schema to SQL DDL.
13    ///
14    /// # Example
15    ///
16    /// ```rust
17    /// use dbx_core::Database;
18    /// use arrow::datatypes::{DataType, Field, Schema};
19    ///
20    /// # fn main() -> dbx_core::DbxResult<()> {
21    /// let db = Database::open_in_memory()?;
22    ///
23    /// let schema = Schema::new(vec![
24    ///     Field::new("id", DataType::Int64, false),
25    ///     Field::new("name", DataType::Utf8, true),
26    ///     Field::new("age", DataType::Int32, true),
27    /// ]);
28    ///
29    /// db.create_table("users", schema)?;
30    /// assert!(db.table_exists("users"));
31    /// # Ok(())
32    /// # }
33    /// ```
34    pub fn create_table(&self, name: &str, schema: Schema) -> DbxResult<()> {
35        let schema_arc = Arc::new(schema);
36
37        // Generate CREATE TABLE SQL from Arrow Schema
38        let sql = self.generate_create_table_sql(name, &schema_arc);
39
40        // Execute SQL FIRST (this will check if table exists)
41        self.execute_sql(&sql)?;
42
43        // THEN store schema (after SQL succeeds)
44        self.table_schemas
45            .write()
46            .unwrap()
47            .insert(name.to_string(), Arc::clone(&schema_arc));
48
49        // Initialize empty table data
50        self.tables
51            .write()
52            .unwrap()
53            .insert(name.to_string(), vec![]);
54
55        // Initialize row counter
56        self.row_counters
57            .insert(name.to_string(), std::sync::atomic::AtomicUsize::new(0));
58
59        Ok(())
60    }
61
62    /// Drop a table
63    ///
64    /// # Example
65    ///
66    /// ```rust
67    /// use dbx_core::Database;
68    /// use arrow::datatypes::{DataType, Field, Schema};
69    ///
70    /// # fn main() -> dbx_core::DbxResult<()> {
71    /// let db = Database::open_in_memory()?;
72    ///
73    /// let schema = Schema::new(vec![
74    ///     Field::new("id", DataType::Int64, false),
75    /// ]);
76    ///
77    /// db.create_table("temp", schema)?;
78    /// db.drop_table("temp")?;
79    /// assert!(!db.table_exists("temp"));
80    /// # Ok(())
81    /// # }
82    /// ```
83    pub fn drop_table(&self, name: &str) -> DbxResult<()> {
84        self.execute_sql(&format!("DROP TABLE {}", name))?;
85        self.table_schemas.write().unwrap().remove(name);
86        Ok(())
87    }
88
89    /// Check if a table exists
90    ///
91    /// # Example
92    ///
93    /// ```rust
94    /// use dbx_core::Database;
95    /// use arrow::datatypes::{DataType, Field, Schema};
96    ///
97    /// # fn main() -> dbx_core::DbxResult<()> {
98    /// let db = Database::open_in_memory()?;
99    ///
100    /// assert!(!db.table_exists("users"));
101    ///
102    /// let schema = Schema::new(vec![
103    ///     Field::new("id", DataType::Int64, false),
104    /// ]);
105    ///
106    /// db.create_table("users", schema)?;
107    /// assert!(db.table_exists("users"));
108    /// # Ok(())
109    /// # }
110    /// ```
111    pub fn table_exists(&self, name: &str) -> bool {
112        self.table_schemas.read().unwrap().contains_key(name)
113    }
114
115    /// Get the schema of a table
116    ///
117    /// # Example
118    ///
119    /// ```rust
120    /// use dbx_core::Database;
121    /// use arrow::datatypes::{DataType, Field, Schema};
122    ///
123    /// # fn main() -> dbx_core::DbxResult<()> {
124    /// let db = Database::open_in_memory()?;
125    ///
126    /// let schema = Schema::new(vec![
127    ///     Field::new("id", DataType::Int64, false),
128    ///     Field::new("name", DataType::Utf8, true),
129    /// ]);
130    ///
131    /// db.create_table("users", schema.clone())?;
132    /// let retrieved_schema = db.get_table_schema("users")?;
133    /// assert_eq!(retrieved_schema.fields().len(), 2);
134    /// # Ok(())
135    /// # }
136    /// ```
137    pub fn get_table_schema(&self, name: &str) -> DbxResult<Schema> {
138        self.table_schemas
139            .read()
140            .unwrap()
141            .get(name)
142            .map(|s| (**s).clone())
143            .ok_or_else(|| crate::DbxError::Schema(format!("Table '{}' not found", name)))
144    }
145
146    /// List all tables
147    pub fn list_tables(&self) -> Vec<String> {
148        self.table_schemas.read().unwrap().keys().cloned().collect()
149    }
150
151    /// Helper: Generate CREATE TABLE SQL from Arrow Schema
152    fn generate_create_table_sql(&self, name: &str, schema: &Schema) -> String {
153        let columns: Vec<String> = schema
154            .fields()
155            .iter()
156            .map(|field| {
157                let sql_type = match field.data_type() {
158                    DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => "INT",
159                    DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
160                        "INT"
161                    }
162                    DataType::Float32 | DataType::Float64 => "FLOAT",
163                    DataType::Utf8 | DataType::LargeUtf8 => "TEXT",
164                    DataType::Boolean => "BOOLEAN",
165                    DataType::Binary | DataType::LargeBinary => "BLOB",
166                    DataType::Date32 | DataType::Date64 => "DATE",
167                    DataType::Timestamp(_, _) => "TIMESTAMP",
168                    _ => "TEXT", // Default to TEXT for unsupported types
169                };
170                format!("{} {}", field.name(), sql_type)
171            })
172            .collect();
173
174        format!("CREATE TABLE {} ({})", name, columns.join(", "))
175    }
176
177    /// Create a SQL index on table columns
178    ///
179    /// This is a convenience wrapper around `execute_sql("CREATE INDEX ...")`.
180    /// For Hash Index (O(1) lookup), use `create_index(table, column)` instead.
181    ///
182    /// # Example
183    ///
184    /// ```rust
185    /// use dbx_core::Database;
186    /// use arrow::datatypes::{DataType, Field, Schema};
187    ///
188    /// # fn main() -> dbx_core::DbxResult<()> {
189    /// let db = Database::open_in_memory()?;
190    ///
191    /// let schema = Schema::new(vec![
192    ///     Field::new("id", DataType::Int64, false),
193    ///     Field::new("email", DataType::Utf8, true),
194    /// ]);
195    ///
196    /// db.create_table("users", schema)?;
197    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
198    /// assert!(db.sql_index_exists("idx_email"));
199    /// # Ok(())
200    /// # }
201    /// ```
202    pub fn create_sql_index(
203        &self,
204        table: &str,
205        index_name: &str,
206        columns: Vec<String>,
207    ) -> DbxResult<()> {
208        // Generate CREATE INDEX SQL
209        let columns_str = columns.join(", ");
210        let sql = format!("CREATE INDEX {} ON {} ({})", index_name, table, columns_str);
211
212        // Execute SQL
213        self.execute_sql(&sql)?;
214        Ok(())
215    }
216
217    /// Drop a SQL index
218    ///
219    /// This is a convenience wrapper around `execute_sql("DROP INDEX ...")`.
220    /// For Hash Index, use `drop_index(table, column)` instead.
221    ///
222    /// Note: The index must have been created with `create_sql_index` to be tracked properly.
223    ///
224    /// # Example
225    ///
226    /// ```rust
227    /// use dbx_core::Database;
228    /// use arrow::datatypes::{DataType, Field, Schema};
229    ///
230    /// # fn main() -> dbx_core::DbxResult<()> {
231    /// let db = Database::open_in_memory()?;
232    ///
233    /// let schema = Schema::new(vec![
234    ///     Field::new("id", DataType::Int64, false),
235    ///     Field::new("email", DataType::Utf8, true),
236    /// ]);
237    ///
238    /// db.create_table("users", schema)?;
239    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
240    /// db.drop_sql_index("users", "idx_email")?;
241    /// assert!(!db.sql_index_exists("idx_email"));
242    /// # Ok(())
243    /// # }
244    /// ```
245    pub fn drop_sql_index(&self, table: &str, index_name: &str) -> DbxResult<()> {
246        // Use table.index_name format for DROP INDEX
247        let sql = format!("DROP INDEX {}.{}", table, index_name);
248        self.execute_sql(&sql)?;
249        Ok(())
250    }
251
252    /// Check if a SQL index exists
253    ///
254    /// For Hash Index, use `has_index(table, column)` instead.
255    ///
256    /// # Example
257    ///
258    /// ```rust
259    /// use dbx_core::Database;
260    /// use arrow::datatypes::{DataType, Field, Schema};
261    ///
262    /// # fn main() -> dbx_core::DbxResult<()> {
263    /// let db = Database::open_in_memory()?;
264    ///
265    /// let schema = Schema::new(vec![
266    ///     Field::new("id", DataType::Int64, false),
267    ///     Field::new("email", DataType::Utf8, true),
268    /// ]);
269    ///
270    /// db.create_table("users", schema)?;
271    /// assert!(!db.sql_index_exists("idx_email"));
272    ///
273    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
274    /// assert!(db.sql_index_exists("idx_email"));
275    /// # Ok(())
276    /// # }
277    /// ```
278    pub fn sql_index_exists(&self, index_name: &str) -> bool {
279        self.index_registry.read().unwrap().contains_key(index_name)
280    }
281
282    /// List all SQL indexes for a table
283    ///
284    /// # Example
285    ///
286    /// ```rust
287    /// use dbx_core::Database;
288    /// use arrow::datatypes::{DataType, Field, Schema};
289    ///
290    /// # fn main() -> dbx_core::DbxResult<()> {
291    /// let db = Database::open_in_memory()?;
292    ///
293    /// let schema = Schema::new(vec![
294    ///     Field::new("id", DataType::Int64, false),
295    ///     Field::new("email", DataType::Utf8, true),
296    ///     Field::new("name", DataType::Utf8, true),
297    /// ]);
298    ///
299    /// db.create_table("users", schema)?;
300    /// db.create_sql_index("users", "idx_email", vec!["email".to_string()])?;
301    /// db.create_sql_index("users", "idx_name", vec!["name".to_string()])?;
302    ///
303    /// let indexes = db.list_sql_indexes("users");
304    /// assert!(indexes.contains(&"idx_email".to_string()));
305    /// assert!(indexes.contains(&"idx_name".to_string()));
306    /// # Ok(())
307    /// # }
308    /// ```
309    pub fn list_sql_indexes(&self, table: &str) -> Vec<String> {
310        self.index_registry
311            .read()
312            .unwrap()
313            .iter()
314            .filter_map(|(index_name, (tbl, _col))| {
315                if tbl == table {
316                    Some(index_name.clone())
317                } else {
318                    None
319                }
320            })
321            .collect()
322    }
323
324    /// Add a column to an existing table
325    ///
326    /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... ADD COLUMN ...")`.
327    ///
328    /// # Example
329    ///
330    /// ```rust
331    /// use dbx_core::Database;
332    /// use arrow::datatypes::{DataType, Field, Schema};
333    ///
334    /// # fn main() -> dbx_core::DbxResult<()> {
335    /// let db = Database::open_in_memory()?;
336    ///
337    /// let schema = Schema::new(vec![
338    ///     Field::new("id", DataType::Int64, false),
339    ///     Field::new("name", DataType::Utf8, true),
340    /// ]);
341    ///
342    /// db.create_table("users", schema)?;
343    /// db.add_column("users", "email", "TEXT")?;
344    ///
345    /// let updated_schema = db.get_table_schema("users")?;
346    /// assert_eq!(updated_schema.fields().len(), 3);
347    /// # Ok(())
348    /// # }
349    /// ```
350    pub fn add_column(&self, table: &str, column_name: &str, data_type: &str) -> DbxResult<()> {
351        let sql = format!(
352            "ALTER TABLE {} ADD COLUMN {} {}",
353            table, column_name, data_type
354        );
355        self.execute_sql(&sql)?;
356        Ok(())
357    }
358
359    /// Drop a column from an existing table
360    ///
361    /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... DROP COLUMN ...")`.
362    ///
363    /// # Example
364    ///
365    /// ```rust
366    /// use dbx_core::Database;
367    /// use arrow::datatypes::{DataType, Field, Schema};
368    ///
369    /// # fn main() -> dbx_core::DbxResult<()> {
370    /// let db = Database::open_in_memory()?;
371    ///
372    /// let schema = Schema::new(vec![
373    ///     Field::new("id", DataType::Int64, false),
374    ///     Field::new("name", DataType::Utf8, true),
375    ///     Field::new("email", DataType::Utf8, true),
376    /// ]);
377    ///
378    /// db.create_table("users", schema)?;
379    /// db.drop_column("users", "email")?;
380    ///
381    /// let updated_schema = db.get_table_schema("users")?;
382    /// assert_eq!(updated_schema.fields().len(), 2);
383    /// # Ok(())
384    /// # }
385    /// ```
386    pub fn drop_column(&self, table: &str, column_name: &str) -> DbxResult<()> {
387        let sql = format!("ALTER TABLE {} DROP COLUMN {}", table, column_name);
388        self.execute_sql(&sql)?;
389        Ok(())
390    }
391
392    /// Rename a column in an existing table
393    ///
394    /// This is a convenience wrapper around `execute_sql("ALTER TABLE ... RENAME COLUMN ...")`.
395    ///
396    /// # Example
397    ///
398    /// ```rust
399    /// use dbx_core::Database;
400    /// use arrow::datatypes::{DataType, Field, Schema};
401    ///
402    /// # fn main() -> dbx_core::DbxResult<()> {
403    /// let db = Database::open_in_memory()?;
404    ///
405    /// let schema = Schema::new(vec![
406    ///     Field::new("id", DataType::Int64, false),
407    ///     Field::new("user_name", DataType::Utf8, true),
408    /// ]);
409    ///
410    /// db.create_table("users", schema)?;
411    /// db.rename_column("users", "user_name", "name")?;
412    ///
413    /// let updated_schema = db.get_table_schema("users")?;
414    /// assert_eq!(updated_schema.field(1).name(), "name");
415    /// # Ok(())
416    /// # }
417    /// ```
418    pub fn rename_column(&self, table: &str, old_name: &str, new_name: &str) -> DbxResult<()> {
419        let sql = format!(
420            "ALTER TABLE {} RENAME COLUMN {} TO {}",
421            table, old_name, new_name
422        );
423        self.execute_sql(&sql)?;
424        Ok(())
425    }
426}