graphstream 0.3.0

Journal replication engine for graph databases. Logical WAL shipping via .hadbj segments to S3.
Documentation
# graphstream

> **Experimental.** graphstream is under active development and not yet stable. APIs will change without notice.

Journal replication engine for graph databases. Logical WAL shipping via `.hadbj` segments to S3-compatible storage.

graphstream handles journal writing, segment rotation, upload/download, compaction, and segment replay for Kuzu/LadybugDB HA. Used by [hakuzu](https://github.com/russellromney/hakuzu) for graph database high availability.

Part of the [hadb](https://github.com/russellromney/hadb) ecosystem. Shared infrastructure (S3 via ObjectStore trait, retry, circuit breaker) provided by [hadb-io](https://github.com/russellromney/hadb/tree/main/hadb-io).

## What it does

1. **Journal writing**: Cypher queries + params written to `.hadbj` segment files via hadb-changeset binary format with SHA-256 chain hashing.
2. **Segment rotation**: Auto-rotate at configurable size (default 4MB). `SealForUpload` seals the current segment for upload.
3. **S3 upload**: Sealed segments uploaded via `hadb_io::ObjectStore` trait. Up to 4 concurrent uploads via `JoinSet`. Supports `UploadWithAck` for synchronous upload confirmation (used by hakuzu's `sync()` for graceful shutdown).
4. **Segment download**: Followers poll S3 for new segments via `ObjectStore` and download incrementally.
5. **Journal reader**: Streaming reads from segments, 1MB `BufReader`, streaming zstd decompression for compressed segments (encrypted segments use full-body decode).
6. **Compression + encryption**: Optional zstd compression and ChaCha20-Poly1305 encryption per segment.
7. **Disk cache**: Manifest-based upload tracking with age/size-based cleanup. Crash-safe (write `.tmp` + rename).
8. **Compaction**: Streaming compaction merges segments without loading all into memory. Background compaction triggers after N uploaded segments.
9. **O(1) recovery**: 32-byte chain hash trailer on sealed segments. Recovery reads one header + trailer instead of scanning all entries.
10. **Retry + circuit breaker**: Via hadb-io. Transient errors retried with exponential backoff + jitter. Circuit breaker prevents hammering degraded endpoints.
11. **Prometheus metrics**: Lock-free `AtomicU64` counters for entries, segments, uploads, downloads, errors, bytes. `snapshot().to_prometheus()` text format.

## Architecture

```
Writer -> PendingEntry -> JournalWriter -> .hadbj segments -> Uploader -> ObjectStore (S3)
                                              |                              |
                                     SegmentCache (manifest)     Background Compaction
                                                                             |
Follower <- replay_entries <- JournalReader <- .hadbj segments <- ObjectStore Download
```

### Entry format (protobuf)

```protobuf
message JournalEntry {
  string query = 1;                    // Cypher query
  repeated ParamEntry params = 2;      // Named parameters
  int64 timestamp_ms = 3;             // Deterministic timestamp
  optional string uuid_override = 4;   // Deterministic UUID
}
```

### Segment format

Uses hadb-changeset's `.hadbj` binary format:
- Header (128 bytes): HADBJ magic + flags (sealed, compressed, chain hash) + sequence range + body length + checksums
- Body: binary entries (sequence + prev_hash + payload), zstd-compressed when sealed
- Trailer (sealed only): 32-byte SHA-256 chain hash for O(1) recovery and cross-segment integrity

## Usage

```rust
use graphstream::journal::{spawn_journal_writer, JournalState, PendingEntry};
use graphstream::uploader::spawn_journal_uploader;

// Spawn journal writer
let state = Arc::new(JournalState::new());
let tx = spawn_journal_writer(journal_dir, 4 * 1024 * 1024, 100, state.clone());

// Write an entry
tx.send(JournalCommand::Write(PendingEntry {
    query: "CREATE (p:Person {id: $id, name: $name})".into(),
    params: vec![
        ("id".into(), ParamValue::Int(1)),
        ("name".into(), ParamValue::String("Alice".into())),
    ],
    timestamp_ms: 1234567890,
    uuid_override: None,
}))?;

// Spawn S3 uploader (streams sealed segments via ObjectStore)
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let (upload_tx, uploader_handle) = spawn_journal_uploader(
    tx.clone(), journal_dir, object_store, "prefix/".into(),
    "mydb".into(), Duration::from_secs(10), shutdown_rx,
);
```

## Dependencies

- `hadb-changeset` -- `.hadbj` binary format (encode, decode, seal, chain hashing, S3 key layout)
- `hadb-io` -- S3 via ObjectStore trait, retry, circuit breaker
- `prost` -- Protobuf serialization (journal entry payload)
- `zstd` -- Compression (sealed segments)

## Tests

Covers journal writing, chain hash integrity, streaming decompression, compaction, uploader (including ack), cache, and metrics.

```bash
CC=/opt/homebrew/opt/llvm/bin/clang CXX=/opt/homebrew/opt/llvm/bin/clang++ \
  RUSTFLAGS="-L /opt/homebrew/opt/llvm/lib/c++" cargo test
```

## License

Apache-2.0