Skip to main content

cqlite_core/storage/
mod.rs

1//! Storage engine implementation for CQLite
2
3pub mod sstable;
4
5// Canonical partition-key (de)serialization, shared by the read (query) path
6// and the write engine so the two never drift (Issue #586). Always compiled —
7// the scan path needs it even without `write-support`.
8pub mod partition_key_codec;
9
10// M5: Write engine and serialization (Issue #359)
11#[cfg(feature = "write-support")]
12pub mod serialization;
13#[cfg(feature = "write-support")]
14pub mod write_engine;
15
16// REPL data access components (Issue #249: CLI-specific)
17#[cfg(feature = "cli-helpers")]
18pub mod repl_data_api;
19pub mod schema_discovery;
20pub mod sstable_data_manager;
21
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24#[cfg(feature = "state_machine")]
25use tokio::sync::RwLock;
26
27use crate::platform::Platform;
28use crate::{types::TableId, Config, Result, RowKey, Value};
29
30/// Main storage engine that coordinates all storage components
31///
32/// NOTE: Issue #176 removed write infrastructure (compaction, manifest).
33/// This is now a read-only storage layer focused on SSTable access.
34#[derive(Debug)]
35pub struct StorageEngine {
36    /// SSTable manager for persistent storage
37    sstables: Arc<sstable::SSTableManager>,
38
39    /// Platform abstraction
40    #[allow(dead_code)]
41    _platform: Arc<Platform>,
42
43    /// Storage configuration
44    #[allow(dead_code)]
45    config: Config,
46
47    /// Schema registry for schema-aware operations (feature-gated)
48    #[cfg(feature = "state_machine")]
49    schema_registry: Arc<RwLock<Option<Arc<RwLock<crate::schema::SchemaRegistry>>>>>,
50}
51
52impl StorageEngine {
53    /// Open a storage engine at the given path
54    ///
55    /// This method discovers SSTables by scanning the storage directory.
56    /// For pre-discovered SSTables, use `open_with_sstables` instead.
57    ///
58    /// NOTE: Issue #176 removed write infrastructure (compaction, manifest).
59    /// This is now a read-only storage layer focused on SSTable access.
60    pub async fn open(
61        path: &Path,
62        config: &Config,
63        platform: Arc<Platform>,
64        #[cfg(feature = "state_machine")] schema_registry: Option<
65            Arc<RwLock<crate::schema::SchemaRegistry>>,
66        >,
67    ) -> Result<Self> {
68        // Create storage directory if it doesn't exist
69        platform.fs().create_dir_all(path).await?;
70
71        // Initialize SSTable manager with schema registry
72        let sstables = Arc::new(
73            sstable::SSTableManager::new(
74                path,
75                config,
76                platform.clone(),
77                #[cfg(feature = "state_machine")]
78                schema_registry.clone(),
79            )
80            .await?,
81        );
82
83        Ok(Self {
84            sstables,
85            _platform: platform,
86            config: config.clone(),
87            #[cfg(feature = "state_machine")]
88            schema_registry: Arc::new(RwLock::new(schema_registry)),
89        })
90    }
91
92    /// Open a storage engine with pre-discovered SSTable table directories
93    ///
94    /// This method is used when SSTables have been discovered externally (e.g., by DiscoveryService)
95    /// and allows the storage engine to be initialized with specific table directories rather than
96    /// scanning the storage directory. Each table directory will be scanned for Data.db files.
97    ///
98    /// # Arguments
99    /// * `path` - Base storage path for manifest and SSTable operations
100    /// * `discovered_table_dirs` - Vector of table directory paths (each containing SSTable files)
101    /// * `config` - Storage configuration
102    /// * `platform` - Platform abstraction for I/O operations
103    ///
104    /// # Returns
105    /// A StorageEngine instance with all components initialized, including SSTable readers
106    /// for all Data.db files found in the discovered table directories.
107    ///
108    /// # Example
109    /// ```no_run
110    /// # use std::path::{Path, PathBuf};
111    /// # use std::sync::Arc;
112    /// # use cqlite_core::{Config, Platform, storage::StorageEngine};
113    /// # async fn example() -> cqlite_core::Result<()> {
114    /// let config = Config::default();
115    /// let platform = Arc::new(Platform::new(&config).await?);
116    /// let storage_path = Path::new("/var/lib/cqlite/storage");
117    /// let discovered_table_dirs = vec![
118    ///     PathBuf::from("/var/lib/cassandra/keyspace1/table1-abc123"),
119    ///     PathBuf::from("/var/lib/cassandra/keyspace1/table2-def456"),
120    /// ];
121    ///
122    /// let engine = StorageEngine::open_with_sstables(
123    ///     storage_path,
124    ///     discovered_table_dirs,
125    ///     &config,
126    ///     platform,
127    ///     #[cfg(feature = "state_machine")]
128    ///     None,
129    /// ).await?;
130    /// # Ok(())
131    /// # }
132    /// ```
133    pub async fn open_with_sstables(
134        path: &Path,
135        discovered_table_dirs: Vec<PathBuf>,
136        config: &Config,
137        platform: Arc<Platform>,
138        #[cfg(feature = "state_machine")] schema_registry: Option<
139            Arc<RwLock<crate::schema::SchemaRegistry>>,
140        >,
141    ) -> Result<Self> {
142        // Create storage directory if it doesn't exist
143        platform.fs().create_dir_all(path).await?;
144
145        // Initialize SSTable manager with pre-discovered paths and schema registry
146        let sstables = Arc::new(
147            sstable::SSTableManager::new_from_discovered_paths(
148                path,
149                discovered_table_dirs,
150                config,
151                platform.clone(),
152                #[cfg(feature = "state_machine")]
153                schema_registry.clone(),
154            )
155            .await?,
156        );
157
158        Ok(Self {
159            sstables,
160            _platform: platform,
161            config: config.clone(),
162            #[cfg(feature = "state_machine")]
163            schema_registry: Arc::new(RwLock::new(schema_registry)),
164        })
165    }
166
167    /// Insert a key-value pair
168    ///
169    /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
170    /// This method is feature-gated behind 'experimental' but currently unimplemented.
171    #[cfg(feature = "experimental")]
172    pub async fn put(&self, _table_id: &TableId, _key: RowKey, _value: Value) -> Result<()> {
173        Err(crate::error::Error::UnsupportedFormat(
174            "Write operations (put) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
175        ))
176    }
177
178    /// Get a value by key
179    pub async fn get(&self, table_id: &TableId, key: &RowKey) -> Result<Option<Value>> {
180        // Check SSTables
181        self.sstables.get(table_id, key).await
182    }
183
184    /// Delete a key
185    ///
186    /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
187    /// This method is feature-gated behind 'experimental' but currently unimplemented.
188    #[cfg(feature = "experimental")]
189    pub async fn delete(&self, _table_id: &TableId, _key: RowKey) -> Result<()> {
190        Err(crate::error::Error::UnsupportedFormat(
191            "Write operations (delete) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
192        ))
193    }
194
195    /// Scan a range of keys
196    ///
197    /// # Arguments
198    /// * `table_id` - The table to scan
199    /// * `start_key` - Optional start key for range scan
200    /// * `end_key` - Optional end key for range scan
201    /// * `limit` - Optional limit on number of results
202    /// * `schema` - Optional table schema for schema-aware parsing. When provided,
203    ///   enables accurate type detection and avoids heuristic-based parsing.
204    ///   Strongly recommended for Cassandra 5.0+ formats.
205    pub async fn scan(
206        &self,
207        table_id: &TableId,
208        start_key: Option<&RowKey>,
209        end_key: Option<&RowKey>,
210        limit: Option<usize>,
211        schema: Option<&crate::schema::TableSchema>,
212    ) -> Result<Vec<(RowKey, Value)>> {
213        // Scan SSTables directly
214        self.sstables
215            .scan(table_id, start_key, end_key, limit, schema)
216            .await
217    }
218
219    /// Flush MemTable to SSTable
220    ///
221    /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
222    /// This method is feature-gated behind 'experimental' but currently unimplemented.
223    #[allow(dead_code)]
224    #[cfg(feature = "experimental")]
225    async fn flush_memtable(&self) -> Result<()> {
226        Err(crate::error::Error::UnsupportedFormat(
227            "Write operations (flush_memtable) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
228        ))
229    }
230
231    /// Force flush all pending writes
232    ///
233    /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
234    /// This method is feature-gated behind 'experimental' but currently unimplemented.
235    #[cfg(feature = "experimental")]
236    pub async fn flush(&self) -> Result<()> {
237        Err(crate::error::Error::UnsupportedFormat(
238            "Write operations (flush) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
239        ))
240    }
241
242    /// Perform manual compaction
243    #[cfg(feature = "experimental")]
244    pub async fn compact(&self) -> Result<()> {
245        // TODO: Implement proper compaction logic
246        // This would need to identify candidates and call CompactionManager::run_compaction
247        Ok(())
248    }
249
250    /// Get storage statistics
251    ///
252    /// NOTE: Issue #176 removed compaction stats (compaction.rs deleted).
253    pub async fn stats(&self) -> Result<StorageStats> {
254        let sstable_stats = self.sstables.stats().await?;
255
256        Ok(StorageStats {
257            sstables: sstable_stats,
258        })
259    }
260
261    /// Batch write operations for better performance
262    ///
263    /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
264    /// This method is feature-gated behind 'experimental' but currently unimplemented.
265    #[cfg(feature = "experimental")]
266    pub async fn batch_write(&mut self, _operations: Vec<BatchOperation>) -> Result<()> {
267        Err(crate::error::Error::UnsupportedFormat(
268            "Write operations (batch_write) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
269        ))
270    }
271
272    /// Explicit batch flush
273    ///
274    /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
275    /// This method is feature-gated behind 'experimental' but currently unimplemented.
276    #[cfg(feature = "experimental")]
277    pub async fn flush_batch(&mut self) -> Result<()> {
278        Err(crate::error::Error::UnsupportedFormat(
279            "Write operations (flush_batch) removed in Issue #175 - WAL and MemTable infrastructure deleted".to_string()
280        ))
281    }
282
283    /// Get batch writer statistics
284    ///
285    /// NOTE: Write functionality removed in Issue #175 (WAL/MemTable infrastructure deleted).
286    /// This method is feature-gated behind 'experimental' but currently unimplemented.
287    #[cfg(feature = "experimental")]
288    pub fn batch_stats(&self) -> Option<()> {
289        None
290    }
291
292    /// Shutdown the storage engine
293    ///
294    /// NOTE: Issue #176 removed compaction shutdown (compaction.rs deleted).
295    /// Issue #175 removed flush operations (WAL/MemTable deleted).
296    pub async fn shutdown(&self) -> Result<()> {
297        // Nothing to shutdown - read-only storage layer
298        Ok(())
299    }
300
301    /// Set the schema registry for schema-aware operations
302    ///
303    /// This method propagates the schema registry to the SSTable manager,
304    /// which will apply it to all SSTable readers for schema-aware parsing.
305    #[cfg(feature = "state_machine")]
306    pub async fn set_schema_registry(
307        &self,
308        registry: Arc<RwLock<crate::schema::SchemaRegistry>>,
309    ) -> Result<()> {
310        // Store in our field
311        {
312            let mut schema_reg = self.schema_registry.write().await;
313            *schema_reg = Some(registry.clone());
314        }
315
316        // Propagate to SSTable manager
317        self.sstables.set_schema_registry(registry).await
318    }
319}
320
321/// Batch operation types
322#[cfg(feature = "experimental")]
323#[derive(Debug, Clone)]
324pub enum BatchOperation {
325    /// Put operation
326    Put {
327        table_id: TableId,
328        key: RowKey,
329        value: Value,
330    },
331    /// Delete operation
332    Delete { table_id: TableId, key: RowKey },
333    /// Merge operation
334    Merge {
335        table_id: TableId,
336        key: RowKey,
337        value: Value,
338    },
339}
340
341/// Storage engine statistics
342///
343/// NOTE: Issue #176 removed compaction statistics (compaction.rs deleted).
344#[derive(Debug, Clone)]
345pub struct StorageStats {
346    /// SSTable statistics
347    pub sstables: sstable::SSTableStats,
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353    use tempfile::TempDir;
354
355    #[tokio::test]
356    async fn test_storage_engine_creation() {
357        let temp_dir = TempDir::new().unwrap();
358        let config = Config::test_config();
359        let platform = Arc::new(Platform::new(&config).await.unwrap());
360
361        let storage = StorageEngine::open(
362            temp_dir.path(),
363            &config,
364            platform,
365            #[cfg(feature = "state_machine")]
366            None,
367        )
368        .await
369        .unwrap();
370        let stats = storage.stats().await.unwrap();
371
372        assert_eq!(stats.sstables.sstable_count, 0);
373        storage.shutdown().await.unwrap();
374    }
375
376    #[tokio::test]
377    async fn test_storage_engine_with_discovered_sstables() {
378        let temp_dir = TempDir::new().unwrap();
379        let config = Config::test_config();
380        let platform = Arc::new(Platform::new(&config).await.unwrap());
381
382        // Create an empty list of discovered SSTables for this test
383        let discovered_paths = Vec::new();
384
385        let storage = StorageEngine::open_with_sstables(
386            temp_dir.path(),
387            discovered_paths,
388            &config,
389            platform,
390            #[cfg(feature = "state_machine")]
391            None,
392        )
393        .await
394        .unwrap();
395
396        let stats = storage.stats().await.unwrap();
397
398        // Should have 0 SSTables since we provided an empty list
399        assert_eq!(stats.sstables.sstable_count, 0);
400        storage.shutdown().await.unwrap();
401    }
402
403    #[tokio::test]
404    #[cfg(all(feature = "legacy-heuristics", feature = "experimental"))]
405    async fn test_batch_operations() {
406        let temp_dir = TempDir::new().unwrap();
407        let config = Config::default();
408        let platform = Arc::new(Platform::new(&config).await.unwrap());
409
410        let mut storage = StorageEngine::open(
411            temp_dir.path(),
412            &config,
413            platform,
414            #[cfg(feature = "state_machine")]
415            None,
416        )
417        .await
418        .unwrap();
419
420        // Test batch write operations
421        let batch_ops = vec![
422            BatchOperation::Put {
423                table_id: TableId::new("test_table"),
424                key: RowKey::from("key1"),
425                value: Value::Text("value1".to_string()),
426            },
427            BatchOperation::Put {
428                table_id: TableId::new("test_table"),
429                key: RowKey::from("key2"),
430                value: Value::Text("value2".to_string()),
431            },
432            BatchOperation::Delete {
433                table_id: TableId::new("test_table"),
434                key: RowKey::from("key3"),
435            },
436        ];
437
438        storage.batch_write(batch_ops).await.unwrap();
439        storage.shutdown().await.unwrap();
440    }
441
442    #[tokio::test]
443    #[cfg(all(feature = "legacy-heuristics", feature = "experimental"))]
444    async fn test_batch_operations_fallback() {
445        // Add timeout to prevent hanging in parallel test execution
446        tokio::time::timeout(std::time::Duration::from_secs(30), async {
447            let temp_dir = TempDir::new().unwrap();
448            let mut config = Config::default();
449            // Force fallback path by setting small threshold so batch writer is not initialized
450            config.storage.memtable_size_threshold = 1024; // 1KB - smaller than 1MB threshold
451            let platform = Arc::new(Platform::new(&config).await.unwrap());
452
453            let mut storage = StorageEngine::open(
454                temp_dir.path(),
455                &config,
456                platform,
457                #[cfg(feature = "state_machine")]
458                None,
459            )
460            .await
461            .unwrap();
462
463            // Test batch write operations (should use fallback path)
464            let batch_ops = vec![
465                BatchOperation::Put {
466                    table_id: TableId::new("test_table"),
467                    key: RowKey::from("key1"),
468                    value: Value::Text("value1".to_string()),
469                },
470                BatchOperation::Put {
471                    table_id: TableId::new("test_table"),
472                    key: RowKey::from("key2"),
473                    value: Value::Text("value2".to_string()),
474                },
475                BatchOperation::Delete {
476                    table_id: TableId::new("test_table"),
477                    key: RowKey::from("key3"),
478                },
479            ];
480
481            storage.batch_write(batch_ops).await.unwrap();
482            storage.shutdown().await.unwrap();
483        })
484        .await
485        .expect("Test should complete within 30 seconds");
486    }
487}