durable-streams-server 0.3.0

Durable Streams protocol server in Rust, built with axum and tokio
Documentation
use super::*;
use crate::protocol::error::Error;
use crate::protocol::offset::Offset;
use crate::storage::{CreateStreamResult, ForkInfo, Storage, StreamConfig, StreamState};
use bytes::Bytes;
use chrono::Duration;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;

fn test_storage_dir() -> PathBuf {
    static COUNTER: AtomicU64 = AtomicU64::new(0);
    let stamp = Utc::now().timestamp_nanos_opt().unwrap_or_default();
    let seq = COUNTER.fetch_add(1, Ordering::Relaxed);
    let pid = std::process::id();
    std::env::temp_dir().join(format!("ds-acid-storage-test-{stamp}-{pid}-{seq}"))
}

fn test_storage() -> AcidStorage {
    AcidStorage::new(
        test_storage_dir(),
        16,
        1024 * 1024,
        100 * 1024,
        AcidBackend::File,
    )
    .expect("acid storage should initialize")
}

fn names_on_different_shards(storage: &AcidStorage) -> (String, usize, String, usize) {
    let source = "source-0".to_string();
    let source_idx = storage.shard_index(&source);

    for i in 1..512 {
        let fork = format!("fork-{i}");
        let fork_idx = storage.shard_index(&fork);
        if fork_idx != source_idx {
            return (source, source_idx, fork, fork_idx);
        }
    }

    panic!("failed to find stream names on different shards");
}

// Restore-from-disk and producer-state durability tests live in
// tests/acid_crash_recovery.rs and the storage_backend_contract suite.

#[test]
fn test_shard_routing_same_stream_is_stable() {
    let storage = test_storage();
    let a = storage.shard_index("same-stream");
    let b = storage.shard_index("same-stream");
    assert_eq!(a, b);
}

#[test]
fn test_shard_distribution_uses_multiple_shards() {
    let storage = test_storage();
    let mut seen = std::collections::HashSet::new();
    for i in 0..256 {
        seen.insert(storage.shard_index(&format!("stream-{i}")));
    }
    assert!(seen.len() > 1);
}

#[test]
fn test_startup_purges_expired_streams() {
    let root = test_storage_dir();
    {
        let storage =
            AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
        let expires = Utc::now() + Duration::milliseconds(200);
        let cfg = StreamConfig::new("text/plain".to_string()).with_expires_at(expires);
        storage.create_stream("expiring", cfg).unwrap();
        storage
            .append("expiring", Bytes::from("x"), "text/plain")
            .unwrap();
    }

    std::thread::sleep(std::time::Duration::from_millis(250));

    let restored = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
    assert!(!restored.exists("expiring"));
    assert!(matches!(
        restored.read("expiring", &Offset::start()),
        Err(Error::NotFound(_) | Error::StreamExpired)
    ));
}

#[test]
fn test_global_cap_strict_under_concurrency() {
    let storage =
        Arc::new(AcidStorage::new(test_storage_dir(), 16, 120, 120, AcidBackend::File).unwrap());
    let shard_count = (0..8)
        .map(|i| storage.shard_index(&format!("s-{i}")))
        .collect::<std::collections::HashSet<_>>()
        .len();
    assert!(
        shard_count > 1,
        "test streams must span multiple shards to validate cross-shard cap behavior"
    );

    for i in 0..8 {
        storage
            .create_stream(
                &format!("s-{i}"),
                StreamConfig::new("text/plain".to_string()),
            )
            .unwrap();
    }

    let mut handles = Vec::new();
    for i in 0..8 {
        let storage = Arc::clone(&storage);
        handles.push(thread::spawn(move || {
            storage.append(&format!("s-{i}"), Bytes::from(vec![0_u8; 40]), "text/plain")
        }));
    }

    for h in handles {
        let _ = h.join().unwrap();
    }

    assert!(storage.total_bytes() <= 120);
}

#[test]
fn test_layout_manifest_mismatch_fails_fast() {
    let root = test_storage_dir();
    let first = AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
    assert!(first.is_ok());

    let mismatch = AcidStorage::new(root, 8, 1024 * 1024, 100 * 1024, AcidBackend::File);
    assert!(matches!(mismatch, Err(Error::Storage(_))));
}

#[test]
fn test_create_fork_routes_to_source_shard_when_names_hash_apart() {
    let storage = test_storage();
    let (source, source_idx, fork, fork_hash_idx) = names_on_different_shards(&storage);

    storage
        .create_stream(&source, StreamConfig::new("text/plain".to_string()))
        .unwrap();
    storage
        .append(&source, Bytes::from("root"), "text/plain")
        .unwrap();
    assert_eq!(
        storage.find_stream_shard_index(&source).unwrap(),
        Some(source_idx)
    );
    assert_ne!(
        source_idx, fork_hash_idx,
        "test inputs must naturally hash to different shards"
    );

    let created = storage
        .create_fork(
            &fork,
            &source,
            None,
            StreamConfig::new("text/plain".to_string()),
        )
        .unwrap();
    assert_eq!(created, CreateStreamResult::Created);
    assert_eq!(
        storage.find_stream_shard_index(&fork).unwrap(),
        Some(source_idx)
    );

    let fork_read = storage.read(&fork, &Offset::start()).unwrap();
    assert_eq!(fork_read.messages, vec![Bytes::from("root")]);
}

#[test]
fn test_reopen_rejects_legacy_cross_shard_fork_lineage() {
    let root = test_storage_dir();
    let storage =
        AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
    let (source, source_idx, fork, fork_idx) = names_on_different_shards(&storage);
    assert_ne!(source_idx, fork_idx);

    storage
        .create_stream(&source, StreamConfig::new("text/plain".to_string()))
        .unwrap();

    let fork_meta = StoredStreamMeta {
        config: StreamConfig::new("text/plain".to_string()),
        closed: false,
        next_read_seq: 0,
        next_byte_offset: 0,
        total_bytes: 0,
        created_at: Utc::now(),
        updated_at: None,
        last_seq: None,
        producers: HashMap::new(),
        fork_info: Some(ForkInfo {
            source_name: source.clone(),
            fork_offset: Offset::start(),
        }),
        ref_count: 0,
        state: StreamState::Active,
    };

    let shard = &storage.shards[fork_idx];
    let txn = AcidStorage::begin_write_txn(&shard.db).unwrap();
    let mut streams = txn.open_table(STREAMS).unwrap();
    AcidStorage::write_stream_meta(&mut streams, &fork, &fork_meta).unwrap();
    drop(streams);
    txn.commit().unwrap();
    drop(storage);

    let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
    assert!(matches!(
        reopened,
        Err(Error::Storage(message))
        if message.contains("legacy cross-shard fork lineage requires migration")
    ));
}

#[test]
fn test_layout_manifest_invalid_json_fails_fast() {
    let root = test_storage_dir();
    let acid_dir = root.join("acid");
    fs::create_dir_all(&acid_dir).unwrap();
    fs::write(acid_dir.join("layout.json"), b"{invalid-json").unwrap();

    let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
    assert!(matches!(reopened, Err(Error::Storage(_))));
}

#[test]
fn test_layout_manifest_hash_policy_mismatch_fails_fast() {
    let root = test_storage_dir();
    let storage =
        AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
    drop(storage);

    let layout_path = root.join("acid").join("layout.json");
    let mut layout: serde_json::Value =
        serde_json::from_slice(&fs::read(&layout_path).unwrap()).unwrap();
    layout["hash_policy"] = serde_json::Value::String("tampered-hash-policy".to_string());
    fs::write(layout_path, serde_json::to_vec_pretty(&layout).unwrap()).unwrap();

    let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
    assert!(matches!(reopened, Err(Error::Storage(_))));
}

#[test]
fn test_corrupted_stream_metadata_fails_fast_on_startup() {
    let root = test_storage_dir();
    let storage =
        AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
    storage
        .create_stream("s", StreamConfig::new("text/plain".to_string()))
        .unwrap();
    storage
        .append("s", Bytes::from("payload"), "text/plain")
        .unwrap();

    let shard_idx = storage.shard_index("s");
    let txn = AcidStorage::begin_write_txn(&storage.shards[shard_idx].db).unwrap();
    let mut streams = txn.open_table(STREAMS).unwrap();
    let corrupt = b"{not-json".to_vec();
    streams.insert("s", corrupt.as_slice()).unwrap();
    drop(streams);
    txn.commit().unwrap();
    drop(storage);

    let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
    assert!(matches!(reopened, Err(Error::Storage(_))));
}

#[test]
fn test_tampered_shard_file_fails_fast_on_startup() {
    let root = test_storage_dir();
    let storage =
        AcidStorage::new(root.clone(), 16, 1024 * 1024, 100 * 1024, AcidBackend::File).unwrap();
    storage
        .create_stream("s", StreamConfig::new("text/plain".to_string()))
        .unwrap();
    storage
        .append("s", Bytes::from("payload"), "text/plain")
        .unwrap();
    let shard_idx = storage.shard_index("s");
    drop(storage);

    let shard_path = root
        .join("acid")
        .join(format!("shard_{shard_idx:02x}.redb"));
    fs::write(&shard_path, b"not-a-valid-redb-file").unwrap();

    let reopened = AcidStorage::new(root, 16, 1024 * 1024, 100 * 1024, AcidBackend::File);
    assert!(matches!(reopened, Err(Error::Storage(_))));
}

#[test]
fn test_in_memory_backend_create_append_read() {
    let storage = AcidStorage::new(
        test_storage_dir(),
        4,
        1024 * 1024,
        100 * 1024,
        AcidBackend::InMemory,
    )
    .expect("in-memory acid storage should initialize");

    let cfg = StreamConfig::new("text/plain".to_string());
    storage.create_stream("s", cfg).unwrap();
    storage
        .append("s", Bytes::from("hello"), "text/plain")
        .unwrap();
    storage
        .append("s", Bytes::from("world"), "text/plain")
        .unwrap();

    let read = storage.read("s", &Offset::start()).unwrap();
    assert_eq!(read.messages.len(), 2);
    assert_eq!(read.messages[0], Bytes::from("hello"));
    assert_eq!(read.messages[1], Bytes::from("world"));

    let meta = storage.head("s").unwrap();
    assert_eq!(meta.message_count, 2);
    assert_eq!(meta.total_bytes, 10);
}

#[test]
fn test_in_memory_backend_global_cap() {
    let storage = AcidStorage::new(test_storage_dir(), 4, 50, 50, AcidBackend::InMemory)
        .expect("in-memory acid storage should initialize");

    let cfg = StreamConfig::new("text/plain".to_string());
    storage.create_stream("s", cfg).unwrap();
    storage
        .append("s", Bytes::from(vec![0_u8; 40]), "text/plain")
        .unwrap();

    let result = storage.append("s", Bytes::from(vec![0_u8; 20]), "text/plain");
    assert!(result.is_err());
    assert_eq!(storage.total_bytes(), 40);
}