pub struct WriteEngine { /* private fields */ }Expand description
Write engine coordinator
Orchestrates WAL, memtable, and SSTable flushing for write operations. This is the primary public API for all write operations in CQLite.
§Thread Safety
WriteEngine follows a single-writer model. It is NOT thread-safe and
should be used from a single thread or protected by external locking.
The closed flag uses atomic operations for safe concurrent access checking.
§Example
use cqlite_core::storage::write_engine::{WriteEngine, WriteEngineConfig, Mutation};
use std::path::PathBuf;
// Create configuration
let config = WriteEngineConfig::new(
PathBuf::from("data"),
PathBuf::from("wal"),
schema
);
// Create engine
let mut engine = WriteEngine::new(config)?;
// Write a mutation
engine.write(mutation)?;
// Execute CQL statement
engine.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")?;
// Flush to SSTable
engine.flush()?;
// Close cleanly
engine.close()?;Implementations§
Source§impl WriteEngine
Export implementation methods (added to WriteEngine)
impl WriteEngine
Export implementation methods (added to WriteEngine)
Sourcepub async fn export_sstable(
&mut self,
output_dir: &Path,
options: ExportOptions,
) -> Result<ExportReport>
pub async fn export_sstable( &mut self, output_dir: &Path, options: ExportOptions, ) -> Result<ExportReport>
Export an SSTable suitable for distribution
This method performs the following steps:
- Flushes the memtable if not empty
- Performs full compaction (if enabled) to merge all L0 files
- Copies the resulting SSTable to the output directory with Cassandra naming
- Validates the exported SSTable (if enabled)
§Arguments
output_dir- Directory where exported files will be writtenoptions- Export configuration
§Returns
An ExportReport containing metadata about the export operation.
§Errors
Returns an error if:
- Engine has been closed
- Flush fails
- Compaction fails
- File copy fails
- Validation fails
§Example
let options = ExportOptions::new("test_ks", "users", 1);
let report = engine.export_sstable(Path::new("/export"), options).await?;
println!("Exported {} partitions ({} bytes)", report.partition_count, report.total_size());Source§impl WriteEngine
impl WriteEngine
Sourcepub fn new(config: WriteEngineConfig) -> Result<Self>
pub fn new(config: WriteEngineConfig) -> Result<Self>
Create a new write engine
This initializes the WAL and memtable. If a WAL exists in the wal_dir, it will be replayed to recover in-flight writes.
§Arguments
config- Write engine configuration
§Returns
A new WriteEngine ready to accept writes.
§Errors
Returns an error if:
- WAL directory doesn’t exist
- Data directory doesn’t exist
- WAL replay fails
Sourcepub fn write(&mut self, mutation: Mutation) -> Result<()>
pub fn write(&mut self, mutation: Mutation) -> Result<()>
Write a mutation to the write engine
This appends the mutation to the WAL for durability, then inserts it into the memtable. If the memtable exceeds the flush threshold, an automatic flush is triggered.
Note: Automatic flush is disabled when called from an async context.
Use write_async() for async contexts with automatic flush support.
§Arguments
mutation- The mutation to write
§Returns
Ok(()) on success, or an error if the write fails.
§Errors
Returns an error if:
- Engine has been closed
- WAL append fails
- Memtable insert fails
- Automatic flush fails (sync context only)
Sourcepub async fn write_async(&mut self, mutation: Mutation) -> Result<()>
pub async fn write_async(&mut self, mutation: Mutation) -> Result<()>
Write a mutation with async automatic flush support
This is the async version of write() that supports automatic flushing
in async contexts. Use this method when calling from async code.
§Arguments
mutation- The mutation to write
§Returns
Ok(()) on success, or an error if the write fails.
§Errors
Returns an error if:
- Engine has been closed
- WAL append fails
- Memtable insert fails
- Automatic flush fails
Sourcepub fn execute(&mut self, statement: &str) -> Result<()>
pub fn execute(&mut self, statement: &str) -> Result<()>
Execute a CQL statement (INSERT, UPDATE, DELETE)
This parses the CQL statement and converts it to a mutation,
then writes it using the write() method.
§Arguments
statement- CQL statement string
§Returns
Ok(()) on success, or an error if parsing or writing fails.
§Errors
Returns an error if:
- CQL parsing fails
- Statement is not a mutation (INSERT/UPDATE/DELETE)
- Mutation conversion fails
- Write fails
§Example
engine.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")?;
engine.execute("UPDATE users SET name = 'Bob' WHERE id = 1")?;
engine.execute("DELETE FROM users WHERE id = 1")?;Sourcepub async fn flush(&mut self) -> Result<Option<SSTableInfo>>
pub async fn flush(&mut self) -> Result<Option<SSTableInfo>>
Force a flush of the memtable to SSTable
This writes all data in the memtable to a new SSTable generation, then truncates the WAL. The memtable is cleared after a successful flush.
§Returns
Returns Some(SSTableInfo) if data was flushed, or None if the
memtable was empty.
§Errors
Returns an error if:
- Engine has been closed
- SSTable write fails
- WAL truncate fails
Sourcepub async fn close(&mut self) -> Result<()>
pub async fn close(&mut self) -> Result<()>
Close the write engine
This flushes any remaining data in the memtable to SSTable, syncs the WAL, then marks the engine as closed. After calling close(), the engine cannot be used for further writes.
This method is idempotent - calling it multiple times is safe.
§Returns
Ok(()) on success.
§Errors
Returns an error if the final flush fails. If the WAL truncate fails after a successful SSTable write, a warning is logged but no error is returned (the data is already persisted).
Sourcepub fn memtable_size(&self) -> usize
pub fn memtable_size(&self) -> usize
Get the current memtable size in bytes
Sourcepub fn memtable_row_count(&self) -> usize
pub fn memtable_row_count(&self) -> usize
Get the current memtable row count
Sourcepub fn generation(&self) -> u64
pub fn generation(&self) -> u64
Get the current generation number
Sourcepub fn set_merge_policy(&mut self, policy: Box<dyn MergePolicy>) -> Result<()>
pub fn set_merge_policy(&mut self, policy: Box<dyn MergePolicy>) -> Result<()>
Set the merge policy for background compaction (M5.2, Issue #383)
§Arguments
policy- Merge policy implementation (e.g., STCS, LCS, TWCS)
Sourcepub fn maintenance_stats(&self) -> CompactionStats
pub fn maintenance_stats(&self) -> CompactionStats
Return cumulative compaction statistics (M5.2, Issue #474)
Returns a snapshot of the lifetime totals accumulated across all compaction
cycles that have completed since the WriteEngine was created. The snapshot
is cheaply cloneable and safe to inspect from any thread (no lock required,
because WriteEngine itself is not Sync).
§Example
let stats = engine.maintenance_stats();
println!(
"Completed {} compactions, merged {} rows, wrote {} bytes",
stats.compactions_completed,
stats.rows_merged,
stats.bytes_written,
);Sourcepub fn maintenance_step(
&mut self,
budget: Duration,
) -> Result<MaintenanceReport>
pub fn maintenance_step( &mut self, budget: Duration, ) -> Result<MaintenanceReport>
Perform incremental maintenance work (M5.2, Issue #384)
This method performs background compaction work within a time budget. It can be called repeatedly from a background thread or task scheduler to make incremental progress on compaction.
§Runtime contexts
This is a synchronous method, but its internal async-to-sync bridge is
runtime-aware (see [merge::block_on_async]), so it is safe to call from
either a plain synchronous context or from within an active Tokio
runtime — including #[tokio::main]/#[tokio::test] worker threads and
async fn callers. Prior to Issue #587 calling it from inside a runtime
panicked with “Cannot start a runtime from within a runtime” once a merge
had input SSTables to read. The sync signature is preserved so the CLI and
Python bindings can keep calling it directly. (The Node binding wraps it in
spawn_blocking, which remains correct.)
§Behavior
- If no active merge exists, consult the merge policy for work
- If merge work is available, start a new merge
- Process the active merge until budget is exhausted
- Return progress report
§Invariants
- Budget is honored within 10% tolerance
- At least one partition is processed per call (minimum progress guarantee)
- Merge state is preserved across calls for resumption
§Budget Enforcement
The budget is honored within approximately 10% tolerance. This tolerance exists to avoid interrupting partition processing mid-stream, which would require complex state management to resume. The tolerance ensures forward progress on each call while remaining responsive to time constraints.
§Arguments
budget- Maximum time to spend in this call
§Returns
A report containing progress metrics and whether more work is pending.
§Errors
Returns an error if:
- Engine has been closed
- Merge policy returns an error
- SSTable reading or writing fails
§Example
use std::time::Duration;
// Background compaction loop
loop {
let report = engine.maintenance_step(Duration::from_millis(100))?;
if !report.pending_compaction {
// No more work, sleep or exit
break;
}
// Log progress
println!("Merged {} rows in {:?}", report.rows_merged, report.time_spent);
}