use std::process::{Child, Command, Stdio};
use std::sync::Arc;
use std::time::Duration;
use slipstream::{
Connection, DiscardPolicy, KvError, KvStore, KvUpdate, KvWriter, NatsConnection,
NatsConnectionConfig, StoreConfig, VersionToken, WatchCursor,
};
use tokio::sync::mpsc;
use tokio::time::timeout;
struct TestNats {
child: Child,
url: String,
_store_dir: tempfile::TempDir,
}
impl TestNats {
async fn start() -> TestNats {
let bin = std::env::var("NATS_SERVER_BIN").unwrap_or_else(|_| "nats-server".to_string());
let port = free_port();
let store_dir = tempfile::tempdir().expect("create jetstream store dir");
let child = Command::new(&bin)
.args([
"--jetstream",
"--addr",
"127.0.0.1",
"--port",
&port.to_string(),
"--store_dir",
store_dir.path().to_str().expect("utf-8 store path"),
])
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap_or_else(|e| {
panic!(
"failed to spawn `{bin}`: {e}. Is nats-server installed? \
Run `mise install` or set NATS_SERVER_BIN."
)
});
let url = format!("nats://127.0.0.1:{port}");
wait_until_ready(&url).await;
TestNats {
child,
url,
_store_dir: store_dir,
}
}
async fn connect(&self) -> NatsConnection {
let conn = NatsConnection::new(NatsConnectionConfig {
url: self.url.clone(),
creds: None,
creds_file: None,
});
conn.connect().await.expect("connect to test nats");
conn
}
async fn store(&self, bucket: &str) -> (NatsConnection, Arc<dyn KvStore>) {
let conn = self.connect().await;
let store = conn
.store_with_config(StoreConfig {
name: bucket.to_string(),
max_bytes: Some(8 * 1024 * 1024),
..Default::default()
})
.await
.expect("open store");
(conn, store)
}
}
impl Drop for TestNats {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
fn free_port() -> u16 {
std::net::TcpListener::bind("127.0.0.1:0")
.expect("bind ephemeral port")
.local_addr()
.expect("read local addr")
.port()
}
async fn wait_until_ready(url: &str) {
for _ in 0..100 {
if async_nats::connect(url).await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("nats-server at {url} never became ready");
}
async fn establish_watch(writer: &dyn KvWriter, rx: &mut mpsc::Receiver<KvUpdate>, sentinel: &str) {
loop {
writer.put(sentinel, b"ready").await.expect("put sentinel");
match timeout(Duration::from_millis(200), rx.recv()).await {
Ok(Some(u)) if u.key() == sentinel => break,
Ok(Some(_)) => {} Ok(None) => panic!("watch channel closed during handshake"),
Err(_) => {} }
}
while rx.try_recv().is_ok() {}
}
async fn collect_updates(rx: &mut mpsc::Receiver<KvUpdate>, n: usize) -> Vec<KvUpdate> {
let mut out = Vec::with_capacity(n);
for _ in 0..n {
let update = timeout(Duration::from_secs(5), rx.recv())
.await
.expect("timed out waiting for watch update")
.expect("watch channel closed early");
out.push(update);
}
out
}
#[tokio::test]
async fn health_follows_lifecycle() {
let nats = TestNats::start().await;
let conn = NatsConnection::new(NatsConnectionConfig {
url: nats.url.clone(),
creds: None,
creds_file: None,
});
assert!(!conn.is_healthy(), "fresh connection is not healthy");
conn.connect().await.expect("connect");
assert!(conn.is_healthy(), "healthy after connect");
conn.shutdown().await.expect("shutdown");
assert!(!conn.is_healthy(), "not healthy after shutdown");
}
#[tokio::test]
async fn store_before_connect_is_not_connected() {
let nats = TestNats::start().await;
let conn = NatsConnection::new(NatsConnectionConfig {
url: nats.url.clone(),
creds: None,
creds_file: None,
});
match conn.store("anything").await {
Err(KvError::NotConnected) => {}
Ok(_) => panic!("store before connect should fail"),
Err(other) => panic!("expected NotConnected, got {other:?}"),
}
}
#[tokio::test]
async fn capabilities_report_nats_features() {
let nats = TestNats::start().await;
let conn = nats.connect().await;
let caps = conn.capabilities();
assert!(caps.streaming_watch);
assert!(caps.prefix_watch);
assert!(caps.cas);
assert!(!caps.ttl, "TTL capability must be false until KvTtl ships");
assert!(!caps.transactions);
assert!(!caps.global_ordering);
}
#[tokio::test]
async fn from_client_reuses_existing_connection() {
let nats = TestNats::start().await;
let client = async_nats::connect(&nats.url).await.expect("raw connect");
let conn = NatsConnection::from_client(client);
assert!(conn.is_healthy(), "pre-connected client is healthy");
let store = conn
.store_with_config(StoreConfig {
name: "reused".into(),
max_bytes: Some(1024 * 1024),
..Default::default()
})
.await
.expect("open store on reused client");
let writer = store.writer().expect("writer");
writer.put("k", b"v").await.expect("put");
}
#[tokio::test]
async fn from_client_connection_refuses_to_reconnect() {
let nats = TestNats::start().await;
let client = async_nats::connect(&nats.url).await.expect("raw connect");
let conn = NatsConnection::from_client(client);
assert!(conn.is_healthy(), "pre-connected client is healthy");
conn.connect()
.await
.expect("connect() on a live borrowed client is a no-op");
conn.shutdown().await.expect("shutdown");
assert!(!conn.is_healthy(), "not healthy after shutdown");
let err = conn
.connect()
.await
.expect_err("from_client connection must not reconnect");
match err {
KvError::ConnectionFailed(msg) => {
assert!(
msg.contains("from_client"),
"error must name the cause: {msg}"
);
}
other => panic!("expected ConnectionFailed, got {other:?}"),
}
assert!(
!conn.is_healthy(),
"still unhealthy after a refused reconnect"
);
}
#[tokio::test]
async fn put_then_get_roundtrips() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
let version = writer.put("node.a", b"hello").await.expect("put");
assert!(version.as_u64().is_some(), "NATS version is a u64 revision");
let entry = reader.get("node.a").await.expect("get").expect("present");
assert_eq!(entry.key, "node.a");
assert_eq!(entry.value, b"hello");
assert_eq!(entry.version.as_u64(), version.as_u64());
}
#[tokio::test]
async fn get_missing_key_returns_none() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let got = store.reader().get("absent").await.expect("get");
assert!(got.is_none());
}
#[tokio::test]
async fn create_conflicts_on_live_key() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
writer.create("lock.x", b"1").await.expect("first create");
let err = writer
.create("lock.x", b"2")
.await
.expect_err("second create must conflict");
assert!(matches!(err, KvError::AlreadyExists), "got {err:?}");
}
#[tokio::test]
async fn update_cas_succeeds_then_detects_conflict() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let v1 = writer.put("node.a", b"one").await.expect("put");
let v2 = writer
.update("node.a", b"two", &v1)
.await
.expect("cas update with current version");
assert_ne!(v1.as_u64(), v2.as_u64());
let err = writer
.update("node.a", b"three", &v1)
.await
.expect_err("stale CAS must fail");
assert!(matches!(err, KvError::RevisionMismatch), "got {err:?}");
}
#[tokio::test]
async fn delete_removes_key() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("node.a", b"v").await.expect("put");
assert!(writer.delete("node.a").await.expect("delete"));
assert!(reader.get("node.a").await.expect("get").is_none());
}
#[tokio::test]
async fn delete_with_version_is_cas_gated() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
let v1 = writer.put("node.a", b"v").await.expect("put");
let stale = writer
.delete_with_version("node.a", &VersionToken::from_u64(999_999))
.await;
assert!(
matches!(stale, Err(KvError::RevisionMismatch)),
"got {stale:?}"
);
assert!(
writer
.delete_with_version("node.a", &v1)
.await
.expect("cas delete")
);
assert!(reader.get("node.a").await.expect("get").is_none());
let tombstone = reader.entry("node.a").await.expect("entry");
let tombstone = tombstone.expect("tombstone present");
assert!(tombstone.value.is_empty(), "tombstone has empty value");
}
#[tokio::test]
async fn update_with_invalid_version_errors() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
writer.put("node.a", b"v").await.expect("put");
let err = writer
.update("node.a", b"v2", &VersionToken::unknown())
.await
.expect_err("invalid version must error");
assert!(matches!(err, KvError::OperationFailed(_)), "got {err:?}");
}
#[tokio::test]
async fn scan_returns_last_value_per_key_and_skips_deletes() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("node.a", b"1").await.expect("put a");
writer.put("node.a", b"2").await.expect("update a");
writer.put("node.b", b"3").await.expect("put b");
writer.put("other.c", b"4").await.expect("put c");
writer.delete("node.b").await.expect("delete b");
let mut entries = reader.scan("node.").await.expect("scan");
entries.sort_by(|a, b| a.key.cmp(&b.key));
assert_eq!(entries.len(), 1, "deleted b excluded, c out of prefix");
assert_eq!(entries[0].key, "node.a");
assert_eq!(entries[0].value, b"2", "scan returns latest value");
}
#[tokio::test]
async fn scan_version_matches_live_revision() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("node.a", b"1").await.expect("put a");
writer.put("node.a", b"2").await.expect("update a");
let live = writer.put("node.b", b"3").await.expect("put b");
let via_get = reader
.get("node.b")
.await
.expect("get")
.expect("present")
.version;
assert_eq!(
via_get.as_u64(),
live.as_u64(),
"sanity: get == put revision"
);
let entries = reader.scan("node.").await.expect("scan");
let b = entries
.iter()
.find(|e| e.key == "node.b")
.expect("node.b in scan");
assert_eq!(
b.version.as_u64(),
live.as_u64(),
"scan version must equal the live revision returned by put()"
);
writer
.update("node.b", b"4", &b.version)
.await
.expect("CAS update using the scan-reported version must succeed");
}
#[tokio::test]
async fn keys_returns_names_under_prefix() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("node.a", b"1").await.expect("put a");
writer.put("node.b", b"2").await.expect("put b");
writer.put("other.c", b"3").await.expect("put c");
writer.delete("node.b").await.expect("delete b");
let mut keys = reader.keys("node.").await.expect("keys");
keys.sort();
assert_eq!(
keys,
vec!["node.a".to_string()],
"deleted/out-of-prefix excluded"
);
}
#[tokio::test]
async fn keys_excludes_cas_tombstones_like_get_and_scan() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("node.a", b"1").await.expect("put a");
let v = writer.put("node.b", b"2").await.expect("put b");
writer
.delete_with_version("node.b", &v)
.await
.expect("cas delete b");
assert!(reader.get("node.b").await.expect("get").is_none());
let scanned: Vec<String> = reader
.scan("node.")
.await
.expect("scan")
.into_iter()
.map(|e| e.key)
.collect();
assert!(
!scanned.contains(&"node.b".to_string()),
"scan hides tombstone"
);
let keys = reader.keys("node.").await.expect("keys");
assert_eq!(
keys,
vec!["node.a".to_string()],
"keys() must exclude CAS tombstones for consistency with get()/scan()"
);
}
#[tokio::test]
async fn watch_all_streams_live_updates() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("watch").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
let (tx, mut rx) = mpsc::channel(64);
tokio::spawn(async move {
let _ = watcher.watch_all(tx).await;
});
establish_watch(writer.as_ref(), &mut rx, "__ready__").await;
writer.put("node.a", b"1").await.expect("put a");
writer.put("node.b", b"2").await.expect("put b");
writer.delete("node.a").await.expect("delete a");
let updates = collect_updates(&mut rx, 3).await;
assert!(matches!(&updates[0], KvUpdate::Put(e) if e.key == "node.a" && e.value == b"1"));
assert!(matches!(&updates[1], KvUpdate::Put(e) if e.key == "node.b" && e.value == b"2"));
assert!(matches!(&updates[2], KvUpdate::Delete { key, .. } if key == "node.a"));
}
#[tokio::test]
async fn watch_prefix_filters_by_subject() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("watch").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
let (tx, mut rx) = mpsc::channel(64);
tokio::spawn(async move {
let _ = watcher.watch_prefix("node.", tx).await;
});
establish_watch(writer.as_ref(), &mut rx, "node.__ready__").await;
writer.put("other.x", b"skip").await.expect("put other"); writer.put("node.a", b"keep").await.expect("put node");
let updates = collect_updates(&mut rx, 1).await;
assert!(matches!(&updates[0], KvUpdate::Put(e) if e.key == "node.a"));
}
#[tokio::test]
async fn watch_prefixes_unions_on_a_single_consumer() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("watchmany").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
let (tx, mut rx) = mpsc::channel(64);
tokio::spawn(async move {
let _ = watcher.watch_prefixes(&["vpcA.", "vpcB."], tx).await;
});
establish_watch(writer.as_ref(), &mut rx, "vpcA.__ready__").await;
let js = async_nats::jetstream::new(async_nats::connect(&nats.url).await.expect("raw connect"));
let mut stream = js.get_stream("KV_watchmany").await.expect("get kv stream");
let info = stream.info().await.expect("stream info");
assert_eq!(
info.state.consumer_count, 1,
"watch_prefixes must use ONE multi-filter consumer, found {}",
info.state.consumer_count
);
writer.put("vpcC.x", b"skip").await.expect("put vpcC"); writer.put("vpcA.a", b"keep").await.expect("put vpcA");
writer.put("vpcB.b", b"keep").await.expect("put vpcB");
let updates = collect_updates(&mut rx, 2).await;
let keys: std::collections::HashSet<String> =
updates.iter().map(|u| u.key().to_string()).collect();
assert!(
keys.contains("vpcA.a") && keys.contains("vpcB.b"),
"both watched prefixes must be delivered, got {keys:?}"
);
assert!(
timeout(Duration::from_millis(500), rx.recv())
.await
.is_err(),
"a non-watched prefix leaked through watch_prefixes"
);
}
#[tokio::test]
async fn watch_prefixes_from_replays_only_the_delta() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("watchmany-from").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
writer.put("vpcA.a", b"1").await.expect("put A baseline");
let cursor_rev = writer.put("vpcB.b", b"2").await.expect("put B baseline");
let cursor = WatchCursor::from_u64(cursor_rev.as_u64().expect("u64 rev"));
writer.put("vpcC.x", b"skip").await.expect("put C"); writer.put("vpcA.a2", b"3").await.expect("put A delta");
writer.put("vpcB.b2", b"4").await.expect("put B delta");
let (tx, mut rx) = mpsc::channel(64);
tokio::spawn(async move {
let _ = watcher
.watch_prefixes_from(&["vpcA.", "vpcB."], &cursor, tx)
.await;
});
let updates = collect_updates(&mut rx, 2).await;
let keys: Vec<&str> = updates.iter().map(|u| u.key()).collect();
assert_eq!(
keys,
vec!["vpcA.a2", "vpcB.b2"],
"delta must be exactly the post-cursor in-union writes"
);
for u in &updates {
let rev = u.version().as_u64().expect("u64 revision");
assert!(
rev > cursor_rev.as_u64().unwrap(),
"delivered revision {rev} must be past the cursor"
);
}
assert!(
timeout(Duration::from_millis(500), rx.recv())
.await
.is_err(),
"a non-watched prefix leaked through watch_prefixes_from"
);
}
#[tokio::test]
async fn watch_all_from_replays_only_the_delta() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("watch").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
writer.put("node.a", b"1").await.expect("put a");
let cursor_rev = writer.put("node.b", b"2").await.expect("put b");
let cursor = WatchCursor::from_u64(cursor_rev.as_u64().expect("u64 rev"));
writer.put("node.c", b"3").await.expect("put c");
writer.put("node.a", b"1b").await.expect("update a");
let (tx, mut rx) = mpsc::channel(64);
tokio::spawn(async move {
let _ = watcher.watch_all_from(&cursor, tx).await;
});
let updates = collect_updates(&mut rx, 2).await;
let keys: Vec<&str> = updates.iter().map(|u| u.key()).collect();
assert!(keys.contains(&"node.c"), "delta includes node.c: {keys:?}");
assert!(
keys.contains(&"node.a"),
"delta includes updated node.a: {keys:?}"
);
}
#[tokio::test]
async fn watch_prefix_from_replays_only_the_delta() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("watch").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
writer.put("node.a", b"1").await.expect("put a");
let cursor_rev = writer.put("other.z", b"z").await.expect("put z");
let cursor = WatchCursor::from_u64(cursor_rev.as_u64().expect("u64 rev"));
writer.put("node.b", b"2").await.expect("put b");
writer.put("other.y", b"y").await.expect("put y");
let (tx, mut rx) = mpsc::channel(64);
tokio::spawn(async move {
let _ = watcher.watch_prefix_from("node.", &cursor, tx).await;
});
let updates = collect_updates(&mut rx, 1).await;
assert!(matches!(&updates[0], KvUpdate::Put(e) if e.key == "node.b"));
}
#[tokio::test]
async fn watch_prefix_relist_covers_seed_then_watch_gap() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("seed-race").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
writer.put("blackhole.1", b"spend").await.expect("put 1");
let snapshot = store.reader().scan("blackhole.").await.expect("scan");
assert_eq!(snapshot.len(), 1);
let baseline_rev = snapshot
.iter()
.filter_map(|e| e.version.as_u64())
.max()
.unwrap_or(0);
writer.put("blackhole.2", b"fraud").await.expect("put 2");
{
let (tx, mut rx) = mpsc::channel(64);
let w = watcher.clone();
tokio::spawn(async move {
let _ = w.watch_prefix("blackhole.", tx).await;
});
let got = collect_updates(&mut rx, 2).await;
let keys: Vec<&str> = got.iter().map(|u| u.key()).collect();
assert_eq!(
keys,
vec!["blackhole.1", "blackhole.2"],
"watch_prefix's re-list must deliver current state including the gap write"
);
writer.put("blackhole.3", b"spend").await.expect("put 3");
let got = collect_updates(&mut rx, 1).await;
assert!(
matches!(&got[0], KvUpdate::Put(e) if e.key == "blackhole.3"),
"live updates must follow the re-list; got {:?}",
got[0].key()
);
}
{
let (tx, mut rx) = mpsc::channel(64);
let cursor = WatchCursor::from_u64(baseline_rev);
let w = watcher.clone();
tokio::spawn(async move {
let _ = w.watch_prefix_from("blackhole.", &cursor, tx).await;
});
let got = collect_updates(&mut rx, 1).await;
assert!(
matches!(&got[0], KvUpdate::Put(e) if e.key == "blackhole.2"),
"watch_prefix_from(snapshot revision) must deliver the gap write; got {:?}",
got[0].key()
);
}
}
#[tokio::test]
async fn watch_from_compacted_cursor_does_not_spuriously_fail() {
let nats = TestNats::start().await;
let conn = nats.connect().await;
let store = conn
.store_with_config(StoreConfig {
name: "compacted".into(),
max_history: Some(1),
max_bytes: Some(1024 * 1024),
..Default::default()
})
.await
.expect("open store");
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
for i in 0..6u8 {
writer.put("k", &[i]).await.expect("put k");
}
let cursor = WatchCursor::from_u64(1);
let (tx, _rx) = mpsc::channel(64);
let res = timeout(Duration::from_secs(2), watcher.watch_all_from(&cursor, tx)).await;
match res {
Ok(Ok(())) => {} Ok(Err(KvError::CursorExpired)) => {}
Err(_elapsed) => {} Ok(Err(other)) => panic!("unexpected watch error from compacted cursor: {other:?}"),
}
}
#[tokio::test]
async fn reconnect_after_shutdown() {
let nats = TestNats::start().await;
let conn = nats.connect().await;
assert!(conn.is_healthy(), "healthy after first connect");
conn.shutdown().await.expect("shutdown");
assert!(!conn.is_healthy(), "not healthy after shutdown");
conn.connect().await.expect("reconnect after shutdown");
assert!(conn.is_healthy(), "healthy after reconnect");
let store = conn
.store_with_config(StoreConfig {
name: "reconnect".into(),
max_bytes: Some(1024 * 1024),
..Default::default()
})
.await
.expect("open store after reconnect");
let writer = store.writer().expect("writer");
writer.put("k", b"v").await.expect("put after reconnect");
let entry = store
.reader()
.get("k")
.await
.expect("get after reconnect")
.expect("present");
assert_eq!(entry.value, b"v");
}
#[tokio::test]
async fn concurrent_connect_is_safe() {
let nats = TestNats::start().await;
let conn = Arc::new(NatsConnection::new(NatsConnectionConfig {
url: nats.url.clone(),
creds: None,
creds_file: None,
}));
let mut handles = Vec::new();
for _ in 0..16 {
let c = Arc::clone(&conn);
handles.push(tokio::spawn(async move { c.connect().await }));
}
for h in handles {
h.await
.expect("connect task panicked")
.expect("concurrent connect failed");
}
assert!(conn.is_healthy(), "healthy after concurrent connect");
let store = conn
.store_with_config(StoreConfig {
name: "concurrent".into(),
max_bytes: Some(1024 * 1024),
..Default::default()
})
.await
.expect("open store after concurrent connect");
store
.writer()
.expect("writer")
.put("k", b"v")
.await
.expect("put after concurrent connect");
}
#[tokio::test]
async fn create_succeeds_after_plain_delete() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
writer.put("lock.x", b"1").await.expect("put");
assert!(writer.delete("lock.x").await.expect("delete"));
let v = writer
.create("lock.x", b"2")
.await
.expect("create after plain delete should succeed");
assert!(v.as_u64().is_some());
let entry = store
.reader()
.get("lock.x")
.await
.expect("get")
.expect("present");
assert_eq!(entry.value, b"2");
}
#[tokio::test]
async fn create_conflicts_after_delete_with_version() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
let v1 = writer.put("lock.y", b"1").await.expect("put");
assert!(
writer
.delete_with_version("lock.y", &v1)
.await
.expect("cas delete")
);
let err = writer
.create("lock.y", b"2")
.await
.expect_err("create after versioned delete must conflict");
assert!(matches!(err, KvError::AlreadyExists), "got {err:?}");
assert!(reader.get("lock.y").await.expect("get").is_none());
}
#[tokio::test]
async fn scan_empty_prefix_returns_all_live_entries() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("node.a", b"1").await.expect("put a");
writer.put("other.b", b"2").await.expect("put b");
writer.put("third.c", b"3").await.expect("put c");
writer.delete("other.b").await.expect("delete b");
let mut entries = reader.scan("").await.expect("scan all");
entries.sort_by(|a, b| a.key.cmp(&b.key));
let keys: Vec<&str> = entries.iter().map(|e| e.key.as_str()).collect();
assert_eq!(keys, vec!["node.a", "third.c"], "deleted b excluded");
}
#[tokio::test]
async fn keys_empty_prefix_returns_all_live_keys() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("node.a", b"1").await.expect("put a");
writer.put("other.b", b"2").await.expect("put b");
writer.delete("other.b").await.expect("delete b");
let mut keys = reader.keys("").await.expect("keys all");
keys.sort();
assert_eq!(keys, vec!["node.a".to_string()], "deleted b excluded");
}
#[tokio::test]
async fn dropping_receiver_stops_watch_task() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("watch").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
let (tx, mut rx) = mpsc::channel(64);
let task = tokio::spawn(async move { watcher.watch_all(tx).await });
establish_watch(writer.as_ref(), &mut rx, "__ready__").await;
drop(rx);
let exited = timeout(Duration::from_secs(5), async {
loop {
writer.put("node.poke", b"x").await.expect("poke put");
if task.is_finished() {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await;
assert!(
exited.is_ok(),
"watch task did not exit after the receiver was dropped"
);
let result = task.await.expect("watch task panicked");
assert!(
matches!(result, Ok(())),
"watch task should exit cleanly on receiver drop, got {result:?}"
);
}
#[tokio::test]
async fn scan_and_keys_cover_buckets_larger_than_max_ack_pending() {
let nats = TestNats::start().await;
let conn = nats.connect().await;
let store = conn
.store_with_config(StoreConfig {
name: "big".into(),
max_bytes: Some(64 * 1024 * 1024),
..Default::default()
})
.await
.expect("open store");
let writer = store.writer().expect("writer");
let reader = store.reader();
const N: usize = 1500;
for i in 0..N {
writer
.put(&format!("node.{i:05}"), b"v")
.await
.expect("put");
}
let entries = timeout(Duration::from_secs(30), reader.scan("node."))
.await
.expect("scan must not hang past max_ack_pending")
.expect("scan");
assert_eq!(
entries.len(),
N,
"scan must return every key past max_ack_pending, got {}",
entries.len()
);
let keys = timeout(Duration::from_secs(30), reader.keys("node."))
.await
.expect("keys must not hang past max_ack_pending")
.expect("keys");
assert_eq!(
keys.len(),
N,
"keys must return every key past max_ack_pending, got {}",
keys.len()
);
}
#[tokio::test]
async fn get_treats_empty_value_as_absent() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let reader = store.reader();
writer.put("flag.x", b"").await.expect("put empty value");
assert!(reader.get("flag.x").await.expect("get").is_none());
let raw = reader
.entry("flag.x")
.await
.expect("entry")
.expect("present via entry()");
assert!(raw.value.is_empty());
}
#[tokio::test]
async fn delete_with_version_on_missing_key_returns_revision_mismatch() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("rw").await;
let writer = store.writer().expect("writer");
let err = writer
.delete_with_version("never.existed", &VersionToken::from_u64(1))
.await
.expect_err("delete_with_version on absent key must fail");
assert!(
matches!(err, KvError::RevisionMismatch),
"got {err:?} — NATS does not distinguish 'wrong version' from 'key absent'"
);
}
#[tokio::test]
async fn store_with_config_is_idempotent() {
let nats = TestNats::start().await;
let conn = nats.connect().await;
let cfg = StoreConfig {
name: "idempotent".to_string(),
max_bytes: Some(1024 * 1024),
..Default::default()
};
let store1 = conn
.store_with_config(cfg.clone())
.await
.expect("first store_with_config");
let store2 = conn
.store_with_config(cfg)
.await
.expect("second store_with_config must not fail");
store1
.writer()
.expect("writer")
.put("k", b"v")
.await
.expect("put via first handle");
let entry = store2
.reader()
.get("k")
.await
.expect("get via second handle")
.expect("present");
assert_eq!(entry.value, b"v");
}
#[tokio::test]
async fn health_reflects_server_death() {
let nats = TestNats::start().await;
let conn = nats.connect().await;
assert!(conn.is_healthy(), "healthy immediately after connect");
drop(nats);
let flipped = timeout(Duration::from_secs(15), async {
while conn.is_healthy() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await;
assert!(
flipped.is_ok(),
"is_healthy() must report false after the NATS server dies"
);
}
use slipstream::snapshot::{self, AppendLogSnapshot};
use slipstream::{BatchConfig, WatchScope, watch_applied};
use tokio::sync::watch as tokio_watch;
fn put_key(u: &KvUpdate) -> Option<String> {
match u {
KvUpdate::Put(e) => Some(e.key.clone()),
_ => None,
}
}
fn node_put_key(u: &KvUpdate) -> Option<String> {
match u {
KvUpdate::Put(e) if e.key.starts_with("node.") => Some(e.key.clone()),
_ => None,
}
}
#[allow(clippy::type_complexity)]
fn spawn_applied(
watcher: Arc<dyn slipstream::KvWatcher>,
baseline: Option<WatchCursor>,
snapshot: Option<AppendLogSnapshot>,
parse: fn(&KvUpdate) -> Option<String>,
) -> (
tokio::task::JoinHandle<Result<WatchCursor, KvError>>,
mpsc::UnboundedReceiver<String>,
mpsc::UnboundedReceiver<u64>,
tokio_watch::Sender<bool>,
) {
let (applied_tx, applied_rx) = mpsc::unbounded_channel::<String>();
let (cursor_tx, cursor_rx) = mpsc::unbounded_channel::<u64>();
let (sd_tx, sd_rx) = tokio_watch::channel(false);
let task = tokio::spawn(watch_applied(
watcher,
WatchScope::All,
baseline,
None, snapshot,
None,
BatchConfig::default(),
parse,
move |batch: Vec<String>| {
for k in batch {
let _ = applied_tx.send(k);
}
},
move |c: WatchCursor| {
let _ = cursor_tx.send(c.as_u64().unwrap_or(0));
},
sd_rx,
));
(task, applied_rx, cursor_rx, sd_tx)
}
async fn collect_applied(rx: &mut mpsc::UnboundedReceiver<String>, n: usize) -> Vec<String> {
let mut out = Vec::with_capacity(n);
for _ in 0..n {
let key = timeout(Duration::from_secs(5), rx.recv())
.await
.expect("timed out waiting for an applied update")
.expect("applied channel closed early");
out.push(key);
}
out
}
#[tokio::test]
async fn applied_streams_and_advances_cursor() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("applied").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
let base = writer.put("baseline", b"x").await.expect("put baseline");
let baseline = WatchCursor::from_u64(base.as_u64().expect("u64 rev"));
writer.put("node.a", b"1").await.expect("put a");
writer.put("node.b", b"2").await.expect("put b");
let last = writer.put("node.c", b"3").await.expect("put c");
let last_rev = last.as_u64().expect("u64 rev");
let (task, mut applied_rx, _cursor_rx, sd_tx) =
spawn_applied(watcher, Some(baseline), None, put_key);
let got = collect_applied(&mut applied_rx, 3).await;
assert_eq!(
got,
vec!["node.a", "node.b", "node.c"],
"updates applied in revision order"
);
sd_tx.send(true).expect("signal shutdown");
let cursor = task.await.expect("task join").expect("watch_applied ok");
assert_eq!(
cursor.as_u64(),
Some(last_rev),
"returned cursor is the highest applied revision"
);
}
#[tokio::test]
async fn applied_resumes_from_snapshot_cursor_without_skipping() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("applied-resume").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
let snap_dir = tempfile::tempdir().expect("snap dir");
let snap_path = snap_dir.path().join("state.snap");
let base = writer.put("baseline", b"x").await.expect("put baseline");
let baseline = WatchCursor::from_u64(base.as_u64().expect("u64 rev"));
writer.put("node.a", b"1").await.expect("put a");
let b_rev = writer
.put("node.b", b"2")
.await
.expect("put b")
.as_u64()
.expect("u64 rev");
let (_resume1, store1) = AppendLogSnapshot::open(&snap_path, u64::MAX).expect("open snapshot");
let (task, mut applied_rx, _cur_rx, sd_tx) =
spawn_applied(watcher.clone(), Some(baseline), Some(store1), put_key);
let first = collect_applied(&mut applied_rx, 2).await;
assert_eq!(first, vec!["node.a", "node.b"]);
sd_tx.send(true).expect("shutdown run 1");
let cursor1 = task.await.expect("join 1").expect("run 1 ok");
assert_eq!(
cursor1.as_u64(),
Some(b_rev),
"run 1 applied through node.b"
);
let loaded = snapshot::load(&snap_path)
.expect("load snapshot")
.expect("snapshot present");
assert_eq!(
loaded.cursor.as_u64(),
Some(b_rev),
"snapshot cursor equals the applied cursor"
);
let mut snap_keys: Vec<&str> = loaded.entries.keys().map(String::as_str).collect();
snap_keys.sort_unstable();
assert_eq!(snap_keys, vec!["node.a", "node.b"]);
writer.put("node.c", b"3").await.expect("put c");
let d_rev = writer
.put("node.d", b"4")
.await
.expect("put d")
.as_u64()
.expect("u64 rev");
let (task2, mut applied_rx2, _cur_rx2, sd_tx2) =
spawn_applied(watcher, Some(loaded.cursor), None, put_key);
let delta = collect_applied(&mut applied_rx2, 2).await;
assert_eq!(
delta,
vec!["node.c", "node.d"],
"resume replays exactly the post-cursor delta, in order"
);
sd_tx2.send(true).expect("shutdown run 2");
let cursor2 = task2.await.expect("join 2").expect("run 2 ok");
assert_eq!(
cursor2.as_u64(),
Some(d_rev),
"run 2 applied through node.d"
);
}
#[tokio::test]
async fn applied_advances_cursor_over_rejected_updates() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("applied-reject").await;
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
let base = writer.put("baseline", b"x").await.expect("put baseline");
let baseline = WatchCursor::from_u64(base.as_u64().expect("u64 rev"));
writer.put("node.a", b"1").await.expect("put a");
let rejected_rev = writer
.put("other.x", b"junk")
.await
.expect("put other")
.as_u64()
.expect("u64 rev");
let (task, mut applied_rx, mut cursor_rx, sd_tx) =
spawn_applied(watcher, Some(baseline), None, node_put_key);
let got = collect_applied(&mut applied_rx, 1).await;
assert_eq!(got, vec!["node.a"]);
let reached = timeout(Duration::from_secs(5), async {
while let Some(rev) = cursor_rx.recv().await {
if rev >= rejected_rev {
return true;
}
}
false
})
.await
.expect("timed out waiting for cursor to pass the rejected update");
assert!(reached, "cursor advanced past the rejected update");
sd_tx.send(true).expect("shutdown");
let cursor = task.await.expect("join").expect("ok");
assert_eq!(
cursor.as_u64(),
Some(rejected_rev),
"final cursor covers the rejected (un-applied) update"
);
}
#[tokio::test]
async fn applied_survives_compacted_resume_cursor() {
let nats = TestNats::start().await;
let conn = nats.connect().await;
let store = conn
.store_with_config(StoreConfig {
name: "applied-compacted".into(),
max_history: Some(1),
max_bytes: Some(1024 * 1024),
..Default::default()
})
.await
.expect("open store");
let writer = store.writer().expect("writer");
let watcher = store.watcher().expect("watcher");
for i in 0..6u8 {
writer.put("node.k", &[i]).await.expect("put k");
}
let stale = WatchCursor::from_u64(1);
let (task, mut applied_rx, _cur_rx, sd_tx) =
spawn_applied(watcher, Some(stale), None, node_put_key);
let applied = timeout(Duration::from_secs(10), async {
loop {
writer.put("node.k", b"live").await.expect("put live");
if let Ok(Some(key)) = timeout(Duration::from_millis(250), applied_rx.recv()).await {
return key;
}
}
})
.await
.expect("combinator never applied an update after a compacted resume cursor");
assert_eq!(applied, "node.k");
sd_tx.send(true).expect("shutdown");
let cursor = task.await.expect("join").expect("watch_applied ok");
assert!(
cursor.as_u64().is_some(),
"combinator returned a live cursor after recovering from compaction"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn export_lease_one_winner_among_concurrent_acquirers() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("lease-race").await;
let mut tasks = Vec::new();
for i in 0..8 {
let lease = slipstream::ExportLease::new(
store.as_ref(),
"export.edge.us-east",
format!("node-{i}"),
)
.expect("lease");
tasks.push(tokio::spawn(async move {
lease
.try_acquire(Duration::from_secs(60))
.await
.expect("try_acquire")
.is_some()
}));
}
let mut winners = 0;
for t in tasks {
if t.await.expect("join") {
winners += 1;
}
}
assert_eq!(winners, 1, "exactly one node wins the round");
}
#[tokio::test(flavor = "multi_thread")]
async fn export_lease_held_blocks_next_round() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("lease-held").await;
let a = slipstream::ExportLease::new(store.as_ref(), "export.round", "node-a").expect("lease");
let b = slipstream::ExportLease::new(store.as_ref(), "export.round", "node-b").expect("lease");
let guard = a
.try_acquire(Duration::from_secs(60))
.await
.expect("acquire")
.expect("a wins the open round");
assert!(
b.try_acquire(Duration::from_secs(60))
.await
.expect("try_acquire")
.is_none(),
"a live lease blocks the round"
);
drop(guard); assert!(
b.try_acquire(Duration::from_secs(60))
.await
.expect("try_acquire")
.is_none(),
"the round persists past the winner's guard"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn export_lease_expiry_allows_takeover() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("lease-expiry").await;
let a = slipstream::ExportLease::new(store.as_ref(), "export.round", "node-a").expect("lease");
let b = slipstream::ExportLease::new(store.as_ref(), "export.round", "node-b").expect("lease");
let _ = a
.try_acquire(Duration::from_secs(1))
.await
.expect("acquire")
.expect("a wins");
tokio::time::sleep(Duration::from_millis(1100)).await;
let guard = b
.try_acquire(Duration::from_secs(60))
.await
.expect("try_acquire")
.expect("expired lease is taken over");
assert_eq!(guard.record().holder_id, "node-b");
let current = b.current().await.expect("read").expect("record exists");
assert_eq!(
current.holder_id, "node-b",
"takeover is visible fleet-wide"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn export_lease_complete_publishes_outcome() {
let nats = TestNats::start().await;
let (_conn, store) = nats.store("lease-complete").await;
let lease =
slipstream::ExportLease::new(store.as_ref(), "export.round", "node-a").expect("lease");
let guard = lease
.try_acquire(Duration::from_secs(60))
.await
.expect("acquire")
.expect("wins");
guard
.complete(&WatchCursor::from_u64(42))
.await
.expect("complete");
let record = lease.current().await.expect("read").expect("record");
assert_eq!(
record.completed_cursor_hex.as_deref(),
Some("000000000000002a"),
"exported cursor is published"
);
assert!(record.completed_at_unix.is_some());
assert_eq!(record.holder_id, "node-a");
}
#[tokio::test]
async fn discard_policy_old_evicts_default_rejects() {
use async_nats::jetstream::stream::DiscardPolicy as NatsDiscard;
let nats = TestNats::start().await;
let conn = nats.connect().await;
conn.store_with_config(StoreConfig {
name: "origins-old".to_string(),
max_bytes: Some(1024 * 1024),
max_history: Some(1),
discard: DiscardPolicy::Old,
..Default::default()
})
.await
.expect("open discard:old store");
conn.store_with_config(StoreConfig {
name: "config-new".to_string(),
max_bytes: Some(1024 * 1024),
..Default::default()
})
.await
.expect("open default store");
let js = async_nats::jetstream::new(async_nats::connect(&nats.url).await.expect("raw connect"));
let old = js
.get_stream("KV_origins-old")
.await
.expect("get old stream")
.info()
.await
.expect("old info")
.config
.discard;
let new = js
.get_stream("KV_config-new")
.await
.expect("get new stream")
.info()
.await
.expect("new info")
.config
.discard;
assert_eq!(
old,
NatsDiscard::Old,
"DiscardPolicy::Old must create an evict-oldest bucket"
);
assert_eq!(
new,
NatsDiscard::New,
"the default must remain discard:new (back-compat)"
);
}