use beachcomber::cache::Cache;
use beachcomber::protocol::Response;
use beachcomber::provider::registry::ProviderRegistry;
use beachcomber::server::Server;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
fn setup() -> (
TempDir,
std::path::PathBuf,
Arc<Cache>,
Arc<ProviderRegistry>,
Arc<beachcomber::watcher_registry::WatcherRegistry>,
) {
let tmp = TempDir::new().unwrap();
let sock = tmp.path().join("test.sock");
let watchers = Arc::new(beachcomber::watcher_registry::WatcherRegistry::new());
let cache = Arc::new(Cache::with_watchers(watchers.clone()));
let registry = Arc::new(ProviderRegistry::with_defaults());
(tmp, sock, cache, registry, watchers)
}
async fn send_recv(stream: &mut UnixStream, request: &str) -> Response {
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(&mut *stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
serde_json::from_str(&line).unwrap()
}
#[tokio::test]
async fn store_and_get_roundtrip() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let store_req = r#"{"op":"store","key":"myapp","data":{"status":"healthy","version":"1.2.3"}}"#;
let store_resp = send_recv(&mut stream, store_req).await;
assert!(
store_resp.ok,
"store should succeed: {:?}",
store_resp.error
);
let get_req = r#"{"op":"get","key":"myapp"}"#;
let get_resp = send_recv(&mut stream, get_req).await;
assert!(get_resp.ok, "get should succeed: {:?}", get_resp.error);
let data = get_resp.data.unwrap();
assert_eq!(data["status"], "healthy");
assert_eq!(data["version"], "1.2.3");
let get_field_req = r#"{"op":"get","key":"myapp.status"}"#;
let get_field_resp = send_recv(&mut stream, get_field_req).await;
assert!(get_field_resp.ok);
assert_eq!(get_field_resp.data.unwrap(), serde_json::json!("healthy"));
handle.abort();
}
#[tokio::test]
async fn store_rejects_builtin_name() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let req = r#"{"op":"store","key":"git","data":{"branch":"main"}}"#;
let resp = send_recv(&mut stream, req).await;
assert!(!resp.ok, "store under builtin name should fail");
let err = resp.error.unwrap();
assert!(
err.contains("builtin") || err.contains("script"),
"error should mention builtin or script, got: {err}"
);
handle.abort();
}
#[tokio::test]
async fn store_replaces_previous_data() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let v1 = r#"{"op":"store","key":"myapp","data":{"status":"starting","old_field":"yes"}}"#;
let r = send_recv(&mut stream, v1).await;
assert!(r.ok);
let v2 = r#"{"op":"store","key":"myapp","data":{"status":"ready","version":"2.0"}}"#;
let r = send_recv(&mut stream, v2).await;
assert!(r.ok);
let get = r#"{"op":"get","key":"myapp"}"#;
let resp = send_recv(&mut stream, get).await;
assert!(resp.ok);
let data = resp.data.unwrap();
assert_eq!(data["status"], "ready");
assert_eq!(data["version"], "2.0");
assert!(data.get("old_field").is_none() || data["old_field"].is_null());
handle.abort();
}
#[tokio::test]
async fn store_with_ttl_shows_staleness() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let req = r#"{"op":"store","key":"myapp","data":{"status":"ok"},"ttl":"1s"}"#;
let r = send_recv(&mut stream, req).await;
assert!(r.ok);
let get = r#"{"op":"get","key":"myapp"}"#;
let resp = send_recv(&mut stream, get).await;
assert!(resp.ok);
assert_eq!(
resp.stale,
Some(false),
"should not be stale immediately after store"
);
tokio::time::sleep(std::time::Duration::from_millis(2100)).await;
let resp2 = send_recv(&mut stream, get).await;
assert!(resp2.ok);
assert_eq!(resp2.stale, Some(true), "should be stale after TTL expires");
handle.abort();
}
#[tokio::test]
async fn store_appears_in_list() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let req = r#"{"op":"store","key":"myapp","data":{"status":"ok"}}"#;
let r = send_recv(&mut stream, req).await;
assert!(r.ok);
let list_req = r#"{"op":"list"}"#;
let list_resp = send_recv(&mut stream, list_req).await;
assert!(list_resp.ok);
let providers = list_resp.data.unwrap();
let names: Vec<&str> = providers
.as_array()
.unwrap()
.iter()
.map(|p| p["name"].as_str().unwrap())
.collect();
assert!(
names.contains(&"myapp"),
"myapp should appear in list, got: {names:?}"
);
handle.abort();
}
#[tokio::test]
async fn poke_virtual_provider_is_noop() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
send_recv(
&mut stream,
r#"{"op":"store","key":"myapp","data":{"v":"1"}}"#,
)
.await;
let resp = send_recv(&mut stream, r#"{"op":"poke","key":"myapp"}"#).await;
assert!(resp.ok);
let resp = send_recv(&mut stream, r#"{"op":"get","key":"myapp.v"}"#).await;
assert_eq!(resp.data.unwrap(), "1");
handle.abort();
}
#[tokio::test]
async fn store_with_path_scope() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
send_recv(
&mut stream,
r#"{"op":"store","key":"myapp","data":{"v":"proj-a"},"path":"/tmp/proj-a"}"#,
)
.await;
send_recv(
&mut stream,
r#"{"op":"store","key":"myapp","data":{"v":"proj-b"},"path":"/tmp/proj-b"}"#,
)
.await;
let resp = send_recv(
&mut stream,
r#"{"op":"get","key":"myapp.v","path":"/tmp/proj-a"}"#,
)
.await;
assert_eq!(resp.data.unwrap(), "proj-a");
let resp = send_recv(
&mut stream,
r#"{"op":"get","key":"myapp.v","path":"/tmp/proj-b"}"#,
)
.await;
assert_eq!(resp.data.unwrap(), "proj-b");
handle.abort();
}