graphstream 0.3.0

Journal replication engine for graph databases. Logical WAL shipping via .hadbj segments to S3.
Documentation

graphstream

Experimental. graphstream is under active development and not yet stable. APIs will change without notice.

Journal replication engine for graph databases. Logical WAL shipping via .hadbj segments to S3-compatible storage.

graphstream handles journal writing, segment rotation, upload/download, compaction, and segment replay for Kuzu/LadybugDB HA. Used by hakuzu for graph database high availability.

Part of the hadb ecosystem. Shared infrastructure (S3 via ObjectStore trait, retry, circuit breaker) provided by hadb-io.

What it does

  1. Journal writing: Cypher queries + params written to .hadbj segment files via hadb-changeset binary format with SHA-256 chain hashing.
  2. Segment rotation: Auto-rotate at configurable size (default 4MB). SealForUpload seals the current segment for upload.
  3. S3 upload: Sealed segments uploaded via hadb_io::ObjectStore trait. Up to 4 concurrent uploads via JoinSet. Supports UploadWithAck for synchronous upload confirmation (used by hakuzu's sync() for graceful shutdown).
  4. Segment download: Followers poll S3 for new segments via ObjectStore and download incrementally.
  5. Journal reader: Streaming reads from segments, 1MB BufReader, streaming zstd decompression for compressed segments (encrypted segments use full-body decode).
  6. Compression + encryption: Optional zstd compression and ChaCha20-Poly1305 encryption per segment.
  7. Disk cache: Manifest-based upload tracking with age/size-based cleanup. Crash-safe (write .tmp + rename).
  8. Compaction: Streaming compaction merges segments without loading all into memory. Background compaction triggers after N uploaded segments.
  9. O(1) recovery: 32-byte chain hash trailer on sealed segments. Recovery reads one header + trailer instead of scanning all entries.
  10. Retry + circuit breaker: Via hadb-io. Transient errors retried with exponential backoff + jitter. Circuit breaker prevents hammering degraded endpoints.
  11. Prometheus metrics: Lock-free AtomicU64 counters for entries, segments, uploads, downloads, errors, bytes. snapshot().to_prometheus() text format.

Architecture

Writer -> PendingEntry -> JournalWriter -> .hadbj segments -> Uploader -> ObjectStore (S3)
                                              |                              |
                                     SegmentCache (manifest)     Background Compaction
                                                                             |
Follower <- replay_entries <- JournalReader <- .hadbj segments <- ObjectStore Download

Entry format (protobuf)

message JournalEntry {
  string query = 1;                    // Cypher query
  repeated ParamEntry params = 2;      // Named parameters
  int64 timestamp_ms = 3;             // Deterministic timestamp
  optional string uuid_override = 4;   // Deterministic UUID
}

Segment format

Uses hadb-changeset's .hadbj binary format:

  • Header (128 bytes): HADBJ magic + flags (sealed, compressed, chain hash) + sequence range + body length + checksums
  • Body: binary entries (sequence + prev_hash + payload), zstd-compressed when sealed
  • Trailer (sealed only): 32-byte SHA-256 chain hash for O(1) recovery and cross-segment integrity

Usage

use graphstream::journal::{spawn_journal_writer, JournalState, PendingEntry};
use graphstream::uploader::spawn_journal_uploader;

// Spawn journal writer
let state = Arc::new(JournalState::new());
let tx = spawn_journal_writer(journal_dir, 4 * 1024 * 1024, 100, state.clone());

// Write an entry
tx.send(JournalCommand::Write(PendingEntry {
    query: "CREATE (p:Person {id: $id, name: $name})".into(),
    params: vec![
        ("id".into(), ParamValue::Int(1)),
        ("name".into(), ParamValue::String("Alice".into())),
    ],
    timestamp_ms: 1234567890,
    uuid_override: None,
}))?;

// Spawn S3 uploader (streams sealed segments via ObjectStore)
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let (upload_tx, uploader_handle) = spawn_journal_uploader(
    tx.clone(), journal_dir, object_store, "prefix/".into(),
    "mydb".into(), Duration::from_secs(10), shutdown_rx,
);

Dependencies

  • hadb-changeset -- .hadbj binary format (encode, decode, seal, chain hashing, S3 key layout)
  • hadb-io -- S3 via ObjectStore trait, retry, circuit breaker
  • prost -- Protobuf serialization (journal entry payload)
  • zstd -- Compression (sealed segments)

Tests

Covers journal writing, chain hash integrity, streaming decompression, compaction, uploader (including ack), cache, and metrics.

CC=/opt/homebrew/opt/llvm/bin/clang CXX=/opt/homebrew/opt/llvm/bin/clang++ \
  RUSTFLAGS="-L /opt/homebrew/opt/llvm/lib/c++" cargo test

License

Apache-2.0