Skip to main content

WriteEngine

Struct WriteEngine 

Source
pub struct WriteEngine { /* private fields */ }
Expand description

Write engine coordinator

Orchestrates WAL, memtable, and SSTable flushing for write operations. This is the primary public API for all write operations in CQLite.

§Thread Safety

WriteEngine follows a single-writer model. It is NOT thread-safe and should be used from a single thread or protected by external locking. The closed flag uses atomic operations for safe concurrent access checking.

§Example

use cqlite_core::storage::write_engine::{WriteEngine, WriteEngineConfig, Mutation};
use std::path::PathBuf;

// Create configuration
let config = WriteEngineConfig::new(
    PathBuf::from("data"),
    PathBuf::from("wal"),
    schema
);

// Create engine
let mut engine = WriteEngine::new(config)?;

// Write a mutation
engine.write(mutation)?;

// Execute CQL statement
engine.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")?;

// Flush to SSTable
engine.flush()?;

// Close cleanly
engine.close()?;

Implementations§

Source§

impl WriteEngine

Export implementation methods (added to WriteEngine)

Source

pub async fn export_sstable( &mut self, output_dir: &Path, options: ExportOptions, ) -> Result<ExportReport>

Export an SSTable suitable for distribution

This method performs the following steps:

  1. Flushes the memtable if not empty
  2. Performs full compaction (if enabled) to merge all L0 files
  3. Copies the resulting SSTable to the output directory with Cassandra naming
  4. Validates the exported SSTable (if enabled)
§Arguments
  • output_dir - Directory where exported files will be written
  • options - Export configuration
§Returns

An ExportReport containing metadata about the export operation.

§Errors

Returns an error if:

  • Engine has been closed
  • Flush fails
  • Compaction fails
  • File copy fails
  • Validation fails
§Example
let options = ExportOptions::new("test_ks", "users", 1);
let report = engine.export_sstable(Path::new("/export"), options).await?;
println!("Exported {} partitions ({} bytes)", report.partition_count, report.total_size());
Source§

impl WriteEngine

Source

pub fn new(config: WriteEngineConfig) -> Result<Self>

Create a new write engine

This initializes the WAL and memtable. If a WAL exists in the wal_dir, it will be replayed to recover in-flight writes.

§Arguments
  • config - Write engine configuration
§Returns

A new WriteEngine ready to accept writes.

§Errors

Returns an error if:

  • WAL directory doesn’t exist
  • Data directory doesn’t exist
  • WAL replay fails
Source

pub fn write(&mut self, mutation: Mutation) -> Result<()>

Write a mutation to the write engine

This appends the mutation to the WAL for durability, then inserts it into the memtable. If the memtable exceeds the flush threshold, an automatic flush is triggered.

Note: Automatic flush is disabled when called from an async context. Use write_async() for async contexts with automatic flush support.

§Arguments
  • mutation - The mutation to write
§Returns

Ok(()) on success, or an error if the write fails.

§Errors

Returns an error if:

  • Engine has been closed
  • WAL append fails
  • Memtable insert fails
  • Automatic flush fails (sync context only)
Source

pub async fn write_async(&mut self, mutation: Mutation) -> Result<()>

Write a mutation with async automatic flush support

This is the async version of write() that supports automatic flushing in async contexts. Use this method when calling from async code.

§Arguments
  • mutation - The mutation to write
§Returns

Ok(()) on success, or an error if the write fails.

§Errors

Returns an error if:

  • Engine has been closed
  • WAL append fails
  • Memtable insert fails
  • Automatic flush fails
Source

pub fn execute(&mut self, statement: &str) -> Result<()>

Execute a CQL statement (INSERT, UPDATE, DELETE)

This parses the CQL statement and converts it to a mutation, then writes it using the write() method.

§Arguments
  • statement - CQL statement string
§Returns

Ok(()) on success, or an error if parsing or writing fails.

§Errors

Returns an error if:

  • CQL parsing fails
  • Statement is not a mutation (INSERT/UPDATE/DELETE)
  • Mutation conversion fails
  • Write fails
§Example
engine.execute("INSERT INTO users (id, name) VALUES (1, 'Alice')")?;
engine.execute("UPDATE users SET name = 'Bob' WHERE id = 1")?;
engine.execute("DELETE FROM users WHERE id = 1")?;
Source

pub async fn flush(&mut self) -> Result<Option<SSTableInfo>>

Force a flush of the memtable to SSTable

This writes all data in the memtable to a new SSTable generation, then truncates the WAL. The memtable is cleared after a successful flush.

§Returns

Returns Some(SSTableInfo) if data was flushed, or None if the memtable was empty.

§Errors

Returns an error if:

  • Engine has been closed
  • SSTable write fails
  • WAL truncate fails
Source

pub async fn close(&mut self) -> Result<()>

Close the write engine

This flushes any remaining data in the memtable to SSTable, syncs the WAL, then marks the engine as closed. After calling close(), the engine cannot be used for further writes.

This method is idempotent - calling it multiple times is safe.

§Returns

Ok(()) on success.

§Errors

Returns an error if the final flush fails. If the WAL truncate fails after a successful SSTable write, a warning is logged but no error is returned (the data is already persisted).

Source

pub fn memtable_size(&self) -> usize

Get the current memtable size in bytes

Source

pub fn memtable_row_count(&self) -> usize

Get the current memtable row count

Source

pub fn wal_size(&self) -> u64

Get the current WAL size in bytes

Source

pub fn generation(&self) -> u64

Get the current generation number

Source

pub fn set_merge_policy(&mut self, policy: Box<dyn MergePolicy>) -> Result<()>

Set the merge policy for background compaction (M5.2, Issue #383)

§Arguments
  • policy - Merge policy implementation (e.g., STCS, LCS, TWCS)
Source

pub fn maintenance_stats(&self) -> CompactionStats

Return cumulative compaction statistics (M5.2, Issue #474)

Returns a snapshot of the lifetime totals accumulated across all compaction cycles that have completed since the WriteEngine was created. The snapshot is cheaply cloneable and safe to inspect from any thread (no lock required, because WriteEngine itself is not Sync).

§Example
let stats = engine.maintenance_stats();
println!(
    "Completed {} compactions, merged {} rows, wrote {} bytes",
    stats.compactions_completed,
    stats.rows_merged,
    stats.bytes_written,
);
Source

pub fn maintenance_step( &mut self, budget: Duration, ) -> Result<MaintenanceReport>

Perform incremental maintenance work (M5.2, Issue #384)

This method performs background compaction work within a time budget. It can be called repeatedly from a background thread or task scheduler to make incremental progress on compaction.

§Runtime contexts

This is a synchronous method, but its internal async-to-sync bridge is runtime-aware (see [merge::block_on_async]), so it is safe to call from either a plain synchronous context or from within an active Tokio runtime — including #[tokio::main]/#[tokio::test] worker threads and async fn callers. Prior to Issue #587 calling it from inside a runtime panicked with “Cannot start a runtime from within a runtime” once a merge had input SSTables to read. The sync signature is preserved so the CLI and Python bindings can keep calling it directly. (The Node binding wraps it in spawn_blocking, which remains correct.)

§Behavior
  1. If no active merge exists, consult the merge policy for work
  2. If merge work is available, start a new merge
  3. Process the active merge until budget is exhausted
  4. Return progress report
§Invariants
  • Budget is honored within 10% tolerance
  • At least one partition is processed per call (minimum progress guarantee)
  • Merge state is preserved across calls for resumption
§Budget Enforcement

The budget is honored within approximately 10% tolerance. This tolerance exists to avoid interrupting partition processing mid-stream, which would require complex state management to resume. The tolerance ensures forward progress on each call while remaining responsive to time constraints.

§Arguments
  • budget - Maximum time to spend in this call
§Returns

A report containing progress metrics and whether more work is pending.

§Errors

Returns an error if:

  • Engine has been closed
  • Merge policy returns an error
  • SSTable reading or writing fails
§Example
use std::time::Duration;

// Background compaction loop
loop {
    let report = engine.maintenance_step(Duration::from_millis(100))?;

    if !report.pending_compaction {
        // No more work, sleep or exit
        break;
    }

    // Log progress
    println!("Merged {} rows in {:?}", report.rows_merged, report.time_spent);
}

Trait Implementations§

Source§

impl Debug for WriteEngine

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.