cqlite_core/storage/mod.rs
1//! Storage engine implementation for CQLite
2
3pub mod sstable;
4
5// Canonical partition-key (de)serialization, shared by the read (query) path
6// and the write engine so the two never drift (Issue #586). Always compiled —
7// the scan path needs it even without `write-support`.
8pub mod partition_key_codec;
9
10// M5: Write engine and serialization (Issue #359)
11#[cfg(feature = "write-support")]
12pub mod serialization;
13#[cfg(feature = "write-support")]
14pub mod write_engine;
15
16// REPL data access components (Issue #249: CLI-specific)
17#[cfg(feature = "cli-helpers")]
18pub mod repl_data_api;
19pub mod schema_discovery;
20pub mod sstable_data_manager;
21
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24#[cfg(feature = "state_machine")]
25use tokio::sync::RwLock;
26
27use crate::platform::Platform;
28use crate::{types::TableId, Config, Result, RowKey, Value};
29
30/// Main storage engine that coordinates all storage components
31///
32/// NOTE: Issue #176 removed write infrastructure (compaction, manifest).
33/// This is now a read-only storage layer focused on SSTable access.
34#[derive(Debug)]
35pub struct StorageEngine {
36 /// SSTable manager for persistent storage
37 sstables: Arc<sstable::SSTableManager>,
38
39 /// Platform abstraction
40 #[allow(dead_code)]
41 _platform: Arc<Platform>,
42
43 /// Storage configuration
44 #[allow(dead_code)]
45 config: Config,
46
47 /// Schema registry for schema-aware operations (feature-gated)
48 #[cfg(feature = "state_machine")]
49 schema_registry: Arc<RwLock<Option<Arc<RwLock<crate::schema::SchemaRegistry>>>>>,
50}
51
52impl StorageEngine {
53 /// Open a storage engine at the given path
54 ///
55 /// This method discovers SSTables by scanning the storage directory.
56 /// For pre-discovered SSTables, use `open_with_sstables` instead.
57 ///
58 /// NOTE: Issue #176 removed write infrastructure (compaction, manifest).
59 /// This is now a read-only storage layer focused on SSTable access.
60 pub async fn open(
61 path: &Path,
62 config: &Config,
63 platform: Arc<Platform>,
64 #[cfg(feature = "state_machine")] schema_registry: Option<
65 Arc<RwLock<crate::schema::SchemaRegistry>>,
66 >,
67 ) -> Result<Self> {
68 // Create storage directory if it doesn't exist
69 platform.fs().create_dir_all(path).await?;
70
71 // Initialize SSTable manager with schema registry
72 let sstables = Arc::new(
73 sstable::SSTableManager::new(
74 path,
75 config,
76 platform.clone(),
77 #[cfg(feature = "state_machine")]
78 schema_registry.clone(),
79 )
80 .await?,
81 );
82
83 Ok(Self {
84 sstables,
85 _platform: platform,
86 config: config.clone(),
87 #[cfg(feature = "state_machine")]
88 schema_registry: Arc::new(RwLock::new(schema_registry)),
89 })
90 }
91
92 /// Open a storage engine with pre-discovered SSTable table directories
93 ///
94 /// This method is used when SSTables have been discovered externally (e.g., by DiscoveryService)
95 /// and allows the storage engine to be initialized with specific table directories rather than
96 /// scanning the storage directory. Each table directory will be scanned for Data.db files.
97 ///
98 /// # Arguments
99 /// * `path` - Base storage path for manifest and SSTable operations
100 /// * `discovered_table_dirs` - Vector of table directory paths (each containing SSTable files)
101 /// * `config` - Storage configuration
102 /// * `platform` - Platform abstraction for I/O operations
103 ///
104 /// # Returns
105 /// A StorageEngine instance with all components initialized, including SSTable readers
106 /// for all Data.db files found in the discovered table directories.
107 ///
108 /// # Example
109 /// ```no_run
110 /// # use std::path::{Path, PathBuf};
111 /// # use std::sync::Arc;
112 /// # use cqlite_core::{Config, Platform, storage::StorageEngine};
113 /// # async fn example() -> cqlite_core::Result<()> {
114 /// let config = Config::default();
115 /// let platform = Arc::new(Platform::new(&config).await?);
116 /// let storage_path = Path::new("/var/lib/cqlite/storage");
117 /// let discovered_table_dirs = vec![
118 /// PathBuf::from("/var/lib/cassandra/keyspace1/table1-abc123"),
119 /// PathBuf::from("/var/lib/cassandra/keyspace1/table2-def456"),
120 /// ];
121 ///
122 /// let engine = StorageEngine::open_with_sstables(
123 /// storage_path,
124 /// discovered_table_dirs,
125 /// &config,
126 /// platform,
127 /// #[cfg(feature = "state_machine")]
128 /// None,
129 /// ).await?;
130 /// # Ok(())
131 /// # }
132 /// ```
133 pub async fn open_with_sstables(
134 path: &Path,
135 discovered_table_dirs: Vec<PathBuf>,
136 config: &Config,
137 platform: Arc<Platform>,
138 #[cfg(feature = "state_machine")] schema_registry: Option<
139 Arc<RwLock<crate::schema::SchemaRegistry>>,
140 >,
141 ) -> Result<Self> {
142 // Create storage directory if it doesn't exist
143 platform.fs().create_dir_all(path).await?;
144
145 // Initialize SSTable manager with pre-discovered paths and schema registry
146 let sstables = Arc::new(
147 sstable::SSTableManager::new_from_discovered_paths(
148 path,
149 discovered_table_dirs,
150 config,
151 platform.clone(),
152 #[cfg(feature = "state_machine")]
153 schema_registry.clone(),
154 )
155 .await?,
156 );
157
158 Ok(Self {
159 sstables,
160 _platform: platform,
161 config: config.clone(),
162 #[cfg(feature = "state_machine")]
163 schema_registry: Arc::new(RwLock::new(schema_registry)),
164 })
165 }
166
167 /// Insert a key-value pair
168 ///
169 /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
170 /// This method is feature-gated behind 'experimental' but currently unimplemented.
171 #[cfg(feature = "experimental")]
172 pub async fn put(&self, _table_id: &TableId, _key: RowKey, _value: Value) -> Result<()> {
173 Err(crate::error::Error::UnsupportedFormat(
174 "Write operations (put) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
175 ))
176 }
177
178 /// Get a value by key
179 pub async fn get(&self, table_id: &TableId, key: &RowKey) -> Result<Option<Value>> {
180 // Check SSTables
181 self.sstables.get(table_id, key).await
182 }
183
184 /// Delete a key
185 ///
186 /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
187 /// This method is feature-gated behind 'experimental' but currently unimplemented.
188 #[cfg(feature = "experimental")]
189 pub async fn delete(&self, _table_id: &TableId, _key: RowKey) -> Result<()> {
190 Err(crate::error::Error::UnsupportedFormat(
191 "Write operations (delete) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
192 ))
193 }
194
195 /// Scan a range of keys
196 ///
197 /// # Arguments
198 /// * `table_id` - The table to scan
199 /// * `start_key` - Optional start key for range scan
200 /// * `end_key` - Optional end key for range scan
201 /// * `limit` - Optional limit on number of results
202 /// * `schema` - Optional table schema for schema-aware parsing. When provided,
203 /// enables accurate type detection and avoids heuristic-based parsing.
204 /// Strongly recommended for Cassandra 5.0+ formats.
205 pub async fn scan(
206 &self,
207 table_id: &TableId,
208 start_key: Option<&RowKey>,
209 end_key: Option<&RowKey>,
210 limit: Option<usize>,
211 schema: Option<&crate::schema::TableSchema>,
212 ) -> Result<Vec<(RowKey, Value)>> {
213 // Scan SSTables directly
214 self.sstables
215 .scan(table_id, start_key, end_key, limit, schema)
216 .await
217 }
218
219 /// Flush MemTable to SSTable
220 ///
221 /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
222 /// This method is feature-gated behind 'experimental' but currently unimplemented.
223 #[allow(dead_code)]
224 #[cfg(feature = "experimental")]
225 async fn flush_memtable(&self) -> Result<()> {
226 Err(crate::error::Error::UnsupportedFormat(
227 "Write operations (flush_memtable) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
228 ))
229 }
230
231 /// Force flush all pending writes
232 ///
233 /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
234 /// This method is feature-gated behind 'experimental' but currently unimplemented.
235 #[cfg(feature = "experimental")]
236 pub async fn flush(&self) -> Result<()> {
237 Err(crate::error::Error::UnsupportedFormat(
238 "Write operations (flush) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
239 ))
240 }
241
242 /// Perform manual compaction
243 #[cfg(feature = "experimental")]
244 pub async fn compact(&self) -> Result<()> {
245 // TODO: Implement proper compaction logic
246 // This would need to identify candidates and call CompactionManager::run_compaction
247 Ok(())
248 }
249
250 /// Get storage statistics
251 ///
252 /// NOTE: Issue #176 removed compaction stats (compaction.rs deleted).
253 pub async fn stats(&self) -> Result<StorageStats> {
254 let sstable_stats = self.sstables.stats().await?;
255
256 Ok(StorageStats {
257 sstables: sstable_stats,
258 })
259 }
260
261 /// Batch write operations for better performance
262 ///
263 /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
264 /// This method is feature-gated behind 'experimental' but currently unimplemented.
265 #[cfg(feature = "experimental")]
266 pub async fn batch_write(&mut self, _operations: Vec<BatchOperation>) -> Result<()> {
267 Err(crate::error::Error::UnsupportedFormat(
268 "Write operations (batch_write) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
269 ))
270 }
271
272 /// Explicit batch flush
273 ///
274 /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
275 /// This method is feature-gated behind 'experimental' but currently unimplemented.
276 #[cfg(feature = "experimental")]
277 pub async fn flush_batch(&mut self) -> Result<()> {
278 Err(crate::error::Error::UnsupportedFormat(
279 "Write operations (flush_batch) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
280 ))
281 }
282
283 /// Get batch writer statistics
284 ///
285 /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
286 /// This method is feature-gated behind 'experimental' but currently unimplemented.
287 #[cfg(feature = "experimental")]
288 pub fn batch_stats(&self) -> Option<()> {
289 None
290 }
291
292 /// Shutdown the storage engine
293 ///
294 /// NOTE: Issue #176 removed compaction shutdown (compaction.rs deleted).
295 /// Issue #175 removed flush operations (WAL/MemTable deleted).
296 pub async fn shutdown(&self) -> Result<()> {
297 // Nothing to shutdown - read-only storage layer
298 Ok(())
299 }
300
301 /// Set the schema registry for schema-aware operations
302 ///
303 /// This method propagates the schema registry to the SSTable manager,
304 /// which will apply it to all SSTable readers for schema-aware parsing.
305 #[cfg(feature = "state_machine")]
306 pub async fn set_schema_registry(
307 &self,
308 registry: Arc<RwLock<crate::schema::SchemaRegistry>>,
309 ) -> Result<()> {
310 // Store in our field
311 {
312 let mut schema_reg = self.schema_registry.write().await;
313 *schema_reg = Some(registry.clone());
314 }
315
316 // Propagate to SSTable manager
317 self.sstables.set_schema_registry(registry).await
318 }
319}
320
321/// Batch operation types
322#[cfg(feature = "experimental")]
323#[derive(Debug, Clone)]
324pub enum BatchOperation {
325 /// Put operation
326 Put {
327 table_id: TableId,
328 key: RowKey,
329 value: Value,
330 },
331 /// Delete operation
332 Delete { table_id: TableId, key: RowKey },
333 /// Merge operation
334 Merge {
335 table_id: TableId,
336 key: RowKey,
337 value: Value,
338 },
339}
340
341/// Storage engine statistics
342///
343/// NOTE: Issue #176 removed compaction statistics (compaction.rs deleted).
344#[derive(Debug, Clone)]
345pub struct StorageStats {
346 /// SSTable statistics
347 pub sstables: sstable::SSTableStats,
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353 use tempfile::TempDir;
354
355 #[tokio::test]
356 async fn test_storage_engine_creation() {
357 let temp_dir = TempDir::new().unwrap();
358 let config = Config::test_config();
359 let platform = Arc::new(Platform::new(&config).await.unwrap());
360
361 let storage = StorageEngine::open(
362 temp_dir.path(),
363 &config,
364 platform,
365 #[cfg(feature = "state_machine")]
366 None,
367 )
368 .await
369 .unwrap();
370 let stats = storage.stats().await.unwrap();
371
372 assert_eq!(stats.sstables.sstable_count, 0);
373 storage.shutdown().await.unwrap();
374 }
375
376 #[tokio::test]
377 async fn test_storage_engine_with_discovered_sstables() {
378 let temp_dir = TempDir::new().unwrap();
379 let config = Config::test_config();
380 let platform = Arc::new(Platform::new(&config).await.unwrap());
381
382 // Create an empty list of discovered SSTables for this test
383 let discovered_paths = Vec::new();
384
385 let storage = StorageEngine::open_with_sstables(
386 temp_dir.path(),
387 discovered_paths,
388 &config,
389 platform,
390 #[cfg(feature = "state_machine")]
391 None,
392 )
393 .await
394 .unwrap();
395
396 let stats = storage.stats().await.unwrap();
397
398 // Should have 0 SSTables since we provided an empty list
399 assert_eq!(stats.sstables.sstable_count, 0);
400 storage.shutdown().await.unwrap();
401 }
402
403 #[tokio::test]
404 #[cfg(all(feature = "legacy-heuristics", feature = "experimental"))]
405 async fn test_batch_operations() {
406 let temp_dir = TempDir::new().unwrap();
407 let config = Config::default();
408 let platform = Arc::new(Platform::new(&config).await.unwrap());
409
410 let mut storage = StorageEngine::open(
411 temp_dir.path(),
412 &config,
413 platform,
414 #[cfg(feature = "state_machine")]
415 None,
416 )
417 .await
418 .unwrap();
419
420 // Test batch write operations
421 let batch_ops = vec![
422 BatchOperation::Put {
423 table_id: TableId::new("test_table"),
424 key: RowKey::from("key1"),
425 value: Value::Text("value1".to_string()),
426 },
427 BatchOperation::Put {
428 table_id: TableId::new("test_table"),
429 key: RowKey::from("key2"),
430 value: Value::Text("value2".to_string()),
431 },
432 BatchOperation::Delete {
433 table_id: TableId::new("test_table"),
434 key: RowKey::from("key3"),
435 },
436 ];
437
438 storage.batch_write(batch_ops).await.unwrap();
439 storage.shutdown().await.unwrap();
440 }
441
442 #[tokio::test]
443 #[cfg(all(feature = "legacy-heuristics", feature = "experimental"))]
444 async fn test_batch_operations_fallback() {
445 // Add timeout to prevent hanging in parallel test execution
446 tokio::time::timeout(std::time::Duration::from_secs(30), async {
447 let temp_dir = TempDir::new().unwrap();
448 let mut config = Config::default();
449 // Force fallback path by setting small threshold so batch writer is not initialized
450 config.storage.memtable_size_threshold = 1024; // 1KB - smaller than 1MB threshold
451 let platform = Arc::new(Platform::new(&config).await.unwrap());
452
453 let mut storage = StorageEngine::open(
454 temp_dir.path(),
455 &config,
456 platform,
457 #[cfg(feature = "state_machine")]
458 None,
459 )
460 .await
461 .unwrap();
462
463 // Test batch write operations (should use fallback path)
464 let batch_ops = vec![
465 BatchOperation::Put {
466 table_id: TableId::new("test_table"),
467 key: RowKey::from("key1"),
468 value: Value::Text("value1".to_string()),
469 },
470 BatchOperation::Put {
471 table_id: TableId::new("test_table"),
472 key: RowKey::from("key2"),
473 value: Value::Text("value2".to_string()),
474 },
475 BatchOperation::Delete {
476 table_id: TableId::new("test_table"),
477 key: RowKey::from("key3"),
478 },
479 ];
480
481 storage.batch_write(batch_ops).await.unwrap();
482 storage.shutdown().await.unwrap();
483 })
484 .await
485 .expect("Test should complete within 30 seconds");
486 }
487}