use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use ursula_runtime::{
AppendRequest, ColdStore, CreateStreamRequest, InMemoryGroupEngineFactory,
PlanColdFlushRequest, ReadStreamRequest, RuntimeConfig, ShardRuntime,
};
use ursula_shard::BucketStreamId;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn s3_cold_path_flushes_reads_and_cleans_up_object() {
if std::env::var("URSULA_COLD_S3_INTEGRATION").ok().as_deref() != Some("1") {
eprintln!(
"skipping S3 cold-path integration; set URSULA_COLD_S3_INTEGRATION=1 and URSULA_COLD_S3_BUCKET"
);
return;
}
let cold_store = Arc::new(ColdStore::s3_from_env().expect("S3 cold store from env"));
let runtime = ShardRuntime::spawn_with_engine_factory_and_cold_store(
RuntimeConfig::new(2, 8).with_cold_max_hot_bytes_per_group(Some(8 * 1024 * 1024)),
InMemoryGroupEngineFactory::with_cold_store(Some(cold_store.clone())),
Some(cold_store.clone()),
)
.expect("spawn runtime");
let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time after unix epoch")
.as_nanos();
let stream = BucketStreamId::new("benchcmp", format!("s3-cold-{suffix}"));
runtime
.create_stream(CreateStreamRequest::new(
stream.clone(),
"application/octet-stream",
))
.await
.expect("create stream");
runtime
.append(AppendRequest::from_bytes(
stream.clone(),
b"abcdef".to_vec(),
))
.await
.expect("append hot bytes");
runtime
.flush_cold_once(PlanColdFlushRequest {
stream_id: stream.clone(),
min_hot_bytes: 4,
max_flush_bytes: 4,
})
.await
.expect("flush to S3")
.expect("candidate flushed");
let read = runtime
.read_stream(ReadStreamRequest {
stream_id: stream.clone(),
offset: 0,
max_len: 6,
now_ms: 0,
})
.await
.expect("read cold and hot bytes");
assert_eq!(read.payload, b"abcdef");
assert_eq!(read.next_offset, 6);
let metrics = runtime.metrics().snapshot();
assert_eq!(metrics.cold_flush_uploads, 1);
assert_eq!(metrics.cold_flush_upload_bytes, 4);
assert_eq!(metrics.cold_flush_publishes, 1);
assert_eq!(metrics.cold_flush_publish_bytes, 4);
let snapshot = runtime
.snapshot_group(runtime.locate(&stream).raft_group_id)
.await
.expect("snapshot group");
let chunk_paths = snapshot
.stream_snapshot
.streams
.iter()
.find(|entry| entry.metadata.stream_id == stream)
.expect("stream snapshot entry")
.cold_chunks
.iter()
.map(|chunk| chunk.s3_path.clone())
.collect::<Vec<_>>();
assert_eq!(chunk_paths.len(), 1);
for path in chunk_paths {
cold_store
.delete_chunk(&path)
.await
.expect("cleanup S3 chunk");
}
}