1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! Database Table Management
//!
//! Extracted from database_legacy.rs
//! Contains table schema management and helper methods
use crate::types::{TableSchema, RowId};
use crate::Result;
use std::sync::Arc;
use super::core::MoteDB;
impl MoteDB {
/// Create a new table with schema
///
/// Automatically creates a primary key index if defined
///
/// # Example
/// ```ignore
/// use motedb::types::{TableSchema, ColumnDef, ColumnType};
///
/// let schema = TableSchema::new("users")
/// .with_column(ColumnDef::new("id", ColumnType::Integer).primary_key())
/// .with_column(ColumnDef::new("name", ColumnType::Text));
///
/// db.create_table(schema)?;
/// ```
pub fn create_table(&self, schema: TableSchema) -> Result<()> {
ensure_open!(self);
// Register table in catalog (acquires metadata.write() lock)
self.table_registry.create_table(schema.clone())?;
// 🔓 Lock released here
// Initialize row count counter for COUNT(*) fast path
self.table_row_count.insert(schema.name.clone(), Arc::new(std::sync::atomic::AtomicU64::new(0)));
// 🚀 Auto-create column index for PRIMARY KEY (if not AUTO_INCREMENT)
// AUTO_INCREMENT PKs don't need a column index because PK value == row_id.
// Non-AUTO_INCREMENT PKs need an index for point queries to be O(log N) instead of O(N).
if let Some(pk_col) = schema.primary_key() {
if !schema.is_primary_key_auto_increment() {
// Create disk-based column index (for persistence + range queries)
if let Err(e) = self.create_column_index(&schema.name, pk_col) {
eprintln!("[WARN] Failed to auto-create PK index for {}.{}: {}",
schema.name, pk_col, e);
}
// Create in-memory PK lookup (for O(1) PK → row_id resolution)
// Bounded by LRU eviction — falls back to disk index on cache miss.
let pk_cache = Arc::new(crate::database::pk_cache::PkLookupCache::new(self.pk_lookup_capacity));
self.pk_lookup.insert(schema.name.clone(), pk_cache);
}
}
// Register TimeSeries tables with the columnar store
if schema.table_type == crate::types::TableType::TimeSeries {
if let Ok(table_id) = self.table_registry.get_table_id(&schema.name) {
if let Err(e) = self.columnar_store.register_table(table_id, &schema) {
eprintln!("[WARN] Failed to register columnar table '{}': {}", schema.name, e);
}
}
}
Ok(())
}
/// Drop a table
///
/// Deletes row data (LSM tombstones), drops all indexes, cleans up caches,
/// and removes table metadata.
pub fn drop_table(&self, table_name: &str) -> Result<()> {
ensure_open!(self);
// 1. Delete row data from LSM (tombstones for compaction to reclaim)
let table_prefix = self.compute_table_prefix(table_name);
let start_key = table_prefix << 32;
let end_key = (table_prefix << 32) | 0xFFFF_FFFF;
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64;
let _ = self.lsm_engine.delete_range(start_key, end_key, timestamp);
// 2. Flush so tombstones reach SSTables (enables compaction cleanup)
let _ = self.lsm_engine.flush();
// 3. Drop columnar store for TimeSeries tables
let _ = self.columnar_store.drop_table(table_name);
// 4. Drop in-memory index handles
self.vector_indexes.remove(table_name);
self.ioctree_indexes.remove(table_name);
self.text_indexes.remove(table_name);
self.column_indexes.remove(table_name);
// 5. Invalidate row cache for this table
self.row_cache.invalidate_table(table_name);
// 6. Remove catalog metadata
self.table_registry.drop_table(table_name)?;
self.pk_lookup.remove(table_name);
self.table_auto_increment.remove(table_name);
Ok(())
}
/// Get table schema
///
/// # Example
/// ```ignore
/// let schema = db.get_table_schema("users")?;
/// println!("Table has {} columns", schema.column_count());
/// ```
pub fn get_table_schema(&self, table_name: &str) -> Result<Arc<TableSchema>> {
self.table_registry.get_table(table_name)
}
/// List all tables
///
/// # Example
/// ```ignore
/// let tables = db.list_tables()?;
/// for table in tables {
/// println!("Table: {}", table);
/// }
/// ```
pub fn list_tables(&self) -> Result<Vec<String>> {
self.table_registry.list_tables()
}
/// Check if table exists
///
/// # Example
/// ```ignore
/// if db.table_exists("users") {
/// println!("Table exists");
/// }
/// ```
pub fn table_exists(&self, table_name: &str) -> bool {
self.table_registry.table_exists(table_name)
}
// ==================== Internal Helper Methods ====================
/// Make composite key from table name and row ID
///
/// Format: [table_id:32bits][row_id:32bits]
///
/// Uses stable sequential table_id from registry (collision-free),
/// replacing the old hash-based scheme that had birthday-attack collision risk.
pub(crate) fn make_composite_key(&self, table_name: &str, row_id: RowId) -> u64 {
let table_id = self.table_registry.get_table_id(table_name)
.unwrap_or(0); // fallback to 0 for unregistered tables
((table_id as u64) << 32) | (row_id & 0xFFFFFFFF)
}
/// Compute table prefix (upper 32 bits of composite key)
///
/// Uses stable sequential table_id from registry (collision-free).
pub(crate) fn compute_table_prefix(&self, table_name: &str) -> u64 {
let table_id = self.table_registry.get_table_id(table_name)
.unwrap_or(0);
table_id as u64
}
/// Decode row data from LSM storage format
///
/// Internal helper for query operations
pub(crate) fn decode_row_data(data: &[u8]) -> Result<crate::types::Row> {
crate::storage::row_format::decode_any(data)
}
// Note: create_column_index() has been moved to indexes/column.rs
// Removed duplicate definition to avoid E0592
}