ursula-runtime 0.1.1

Shard-owned actor runtime prototype for Ursula vNext.
Documentation
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use ursula_runtime::{
    AppendRequest, ColdStore, CreateStreamRequest, InMemoryGroupEngineFactory,
    PlanColdFlushRequest, ReadStreamRequest, RuntimeConfig, ShardRuntime,
};
use ursula_shard::BucketStreamId;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn s3_cold_path_flushes_reads_and_cleans_up_object() {
    if std::env::var("URSULA_COLD_S3_INTEGRATION").ok().as_deref() != Some("1") {
        eprintln!(
            "skipping S3 cold-path integration; set URSULA_COLD_S3_INTEGRATION=1 and URSULA_COLD_S3_BUCKET"
        );
        return;
    }

    let cold_store = Arc::new(ColdStore::s3_from_env().expect("S3 cold store from env"));
    let runtime = ShardRuntime::spawn_with_engine_factory_and_cold_store(
        RuntimeConfig::new(2, 8).with_cold_max_hot_bytes_per_group(Some(8 * 1024 * 1024)),
        InMemoryGroupEngineFactory::with_cold_store(Some(cold_store.clone())),
        Some(cold_store.clone()),
    )
    .expect("spawn runtime");

    let suffix = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .expect("system time after unix epoch")
        .as_nanos();
    let stream = BucketStreamId::new("benchcmp", format!("s3-cold-{suffix}"));
    runtime
        .create_stream(CreateStreamRequest::new(
            stream.clone(),
            "application/octet-stream",
        ))
        .await
        .expect("create stream");
    runtime
        .append(AppendRequest::from_bytes(
            stream.clone(),
            b"abcdef".to_vec(),
        ))
        .await
        .expect("append hot bytes");

    runtime
        .flush_cold_once(PlanColdFlushRequest {
            stream_id: stream.clone(),
            min_hot_bytes: 4,
            max_flush_bytes: 4,
        })
        .await
        .expect("flush to S3")
        .expect("candidate flushed");

    let read = runtime
        .read_stream(ReadStreamRequest {
            stream_id: stream.clone(),
            offset: 0,
            max_len: 6,
            now_ms: 0,
        })
        .await
        .expect("read cold and hot bytes");
    assert_eq!(read.payload, b"abcdef");
    assert_eq!(read.next_offset, 6);

    let metrics = runtime.metrics().snapshot();
    assert_eq!(metrics.cold_flush_uploads, 1);
    assert_eq!(metrics.cold_flush_upload_bytes, 4);
    assert_eq!(metrics.cold_flush_publishes, 1);
    assert_eq!(metrics.cold_flush_publish_bytes, 4);

    let snapshot = runtime
        .snapshot_group(runtime.locate(&stream).raft_group_id)
        .await
        .expect("snapshot group");
    let chunk_paths = snapshot
        .stream_snapshot
        .streams
        .iter()
        .find(|entry| entry.metadata.stream_id == stream)
        .expect("stream snapshot entry")
        .cold_chunks
        .iter()
        .map(|chunk| chunk.s3_path.clone())
        .collect::<Vec<_>>();
    assert_eq!(chunk_paths.len(), 1);
    for path in chunk_paths {
        cold_store
            .delete_chunk(&path)
            .await
            .expect("cleanup S3 chunk");
    }
}