use beachcomber::cache::Cache;
use beachcomber::protocol::Response;
use beachcomber::provider::registry::ProviderRegistry;
use beachcomber::provider::{ProviderResult, Value};
use beachcomber::server::Server;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
fn setup() -> (
TempDir,
std::path::PathBuf,
Arc<Cache>,
Arc<ProviderRegistry>,
Arc<beachcomber::watcher_registry::WatcherRegistry>,
) {
let tmp = TempDir::new().unwrap();
let sock = tmp.path().join("test.sock");
let watchers = Arc::new(beachcomber::watcher_registry::WatcherRegistry::new());
let cache = Arc::new(Cache::with_watchers(watchers.clone()));
let registry = Arc::new(ProviderRegistry::with_defaults());
(tmp, sock, cache, registry, watchers)
}
#[tokio::test]
async fn server_accepts_connection() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let stream = UnixStream::connect(&sock).await;
assert!(stream.is_ok(), "Should connect to server socket");
handle.abort();
}
#[tokio::test]
async fn server_handles_get_global_provider() {
let (_tmp, sock, cache, registry, watchers) = setup();
let mut result = ProviderResult::new();
result.insert("name", Value::String("testhost.local".to_string()));
result.insert("short", Value::String("testhost".to_string()));
cache.put("hostname", None, result);
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "hostname"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
let response: Response = serde_json::from_str(&line).unwrap();
assert!(response.ok, "Response should be ok");
let data = response.data.unwrap();
assert_eq!(data["name"], "testhost.local");
assert_eq!(data["short"], "testhost");
handle.abort();
}
#[tokio::test]
async fn server_handles_get_single_field() {
let (_tmp, sock, cache, registry, watchers) = setup();
let mut result = ProviderResult::new();
result.insert("name", Value::String("testhost.local".to_string()));
result.insert("short", Value::String("testhost".to_string()));
cache.put("hostname", None, result);
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "hostname.short"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
let response: Response = serde_json::from_str(&line).unwrap();
assert!(response.ok);
assert_eq!(response.data.unwrap(), serde_json::json!("testhost"));
handle.abort();
}
#[tokio::test]
async fn server_handles_get_text_format() {
let (_tmp, sock, cache, registry, watchers) = setup();
let mut result = ProviderResult::new();
result.insert("name", Value::String("testhost.local".to_string()));
cache.put("hostname", None, result);
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "hostname.name", "format": "text"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
assert_eq!(line.trim(), "testhost.local");
handle.abort();
}
#[tokio::test]
async fn server_handles_cache_miss_with_sync_execution() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "hostname"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
let response: Response = serde_json::from_str(&line).unwrap();
assert!(response.ok, "Response should be ok");
assert!(
response.data.is_some(),
"Sync cache miss should return data from inline execution"
);
let data = response.data.unwrap();
assert!(
data.get("name").is_some(),
"hostname provider should return a name field"
);
handle.abort();
}
#[tokio::test]
async fn server_handles_cache_miss_provider_returns_none() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "git"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
let response: Response = serde_json::from_str(&line).unwrap();
assert!(
response.ok,
"Response should be ok even when provider returns None"
);
assert!(
response.data.is_none(),
"Provider returning None should still produce a miss"
);
handle.abort();
}
#[tokio::test]
async fn server_handles_cache_miss_virtual_provider() {
let (_tmp, sock, cache, registry, watchers) = setup();
registry.register_virtual("myvirtual");
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "myvirtual"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
let response: Response = serde_json::from_str(&line).unwrap();
assert!(response.ok);
assert!(
response.data.is_none(),
"Virtual provider with no cache data should return miss (no execute to call)"
);
handle.abort();
}
#[tokio::test]
async fn server_handles_unknown_provider() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "nonexistent"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
let response: Response = serde_json::from_str(&line).unwrap();
assert!(!response.ok, "Unknown provider should return error");
assert!(response.error.unwrap().contains("unknown provider"));
handle.abort();
}
#[tokio::test]
async fn server_handles_poke() {
let (_tmp, sock, cache, registry, watchers) = setup();
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "poke", "key": "hostname"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader.read_line(&mut line).await.unwrap();
let response: Response = serde_json::from_str(&line).unwrap();
assert!(response.ok, "Poke should return ok");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
handle.abort();
}
#[tokio::test]
async fn server_handles_get_sh_format() {
let (_tmp, sock, cache, registry, watchers) = setup();
let mut result = ProviderResult::new();
result.insert("name", Value::String("testhost.local".to_string()));
result.insert("short", Value::String("testhost".to_string()));
cache.put("hostname", None, result);
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "hostname", "format": "sh"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut lines = String::new();
loop {
let mut line = String::new();
let n = reader.read_line(&mut line).await.unwrap();
if n == 0 || line == "\n" {
break;
}
lines.push_str(&line);
}
assert!(lines.contains("name=testhost.local"));
assert!(lines.contains("short=testhost"));
handle.abort();
}
#[tokio::test]
async fn server_text_format_object_returns_values_only() {
let (_tmp, sock, cache, registry, watchers) = setup();
let mut result = ProviderResult::new();
result.insert("name", Value::String("testhost.local".to_string()));
result.insert("short", Value::String("testhost".to_string()));
cache.put("hostname", None, result);
let server = Server::new(sock.clone(), cache, registry, None, watchers);
let handle = tokio::spawn(async move { server.run().await });
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = UnixStream::connect(&sock).await.unwrap();
let request = r#"{"op": "get", "key": "hostname", "format": "text"}"#;
stream
.write_all(format!("{request}\n").as_bytes())
.await
.unwrap();
let mut reader = BufReader::new(stream);
let mut lines = String::new();
loop {
let mut line = String::new();
let n = reader.read_line(&mut line).await.unwrap();
if n == 0 || line == "\n" {
break;
}
lines.push_str(&line);
}
assert!(
!lines.contains("name="),
"Text format should not include key= prefix"
);
assert!(
!lines.contains("short="),
"Text format should not include key= prefix"
);
assert!(lines.contains("testhost.local"));
assert!(lines.contains("testhost"));
handle.abort();
}