beachcomber 0.5.1

A centralized daemon that caches shell state (git, battery, hostname, etc.) so every consumer reads from one fast cache instead of independently forking shells
Documentation
use beachcomber::cache::Cache;
use beachcomber::protocol::Response;
use beachcomber::provider::registry::ProviderRegistry;
use beachcomber::provider::{ProviderResult, Value};
use beachcomber::server::Server;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;

fn setup() -> (
    TempDir,
    std::path::PathBuf,
    Arc<Cache>,
    Arc<ProviderRegistry>,
    Arc<beachcomber::watcher_registry::WatcherRegistry>,
) {
    let tmp = TempDir::new().unwrap();
    let sock = tmp.path().join("test.sock");
    let watchers = Arc::new(beachcomber::watcher_registry::WatcherRegistry::new());
    let cache = Arc::new(Cache::with_watchers(watchers.clone()));
    let registry = Arc::new(ProviderRegistry::with_defaults());
    (tmp, sock, cache, registry, watchers)
}

#[tokio::test]
async fn server_accepts_connection() {
    let (_tmp, sock, cache, registry, watchers) = setup();
    let server = Server::new(sock.clone(), cache, registry, None, watchers);

    let handle = tokio::spawn(async move { server.run().await });

    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let stream = UnixStream::connect(&sock).await;
    assert!(stream.is_ok(), "Should connect to server socket");

    handle.abort();
}

#[tokio::test]
async fn server_handles_get_global_provider() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let mut result = ProviderResult::new();
    result.insert("name", Value::String("testhost.local".to_string()));
    result.insert("short", Value::String("testhost".to_string()));
    cache.put("hostname", None, result);

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "get", "key": "hostname"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    let response: Response = serde_json::from_str(&line).unwrap();
    assert!(response.ok, "Response should be ok");
    let data = response.data.unwrap();
    assert_eq!(data["name"], "testhost.local");
    assert_eq!(data["short"], "testhost");

    handle.abort();
}

#[tokio::test]
async fn server_handles_get_single_field() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let mut result = ProviderResult::new();
    result.insert("name", Value::String("testhost.local".to_string()));
    result.insert("short", Value::String("testhost".to_string()));
    cache.put("hostname", None, result);

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "get", "key": "hostname.short"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    let response: Response = serde_json::from_str(&line).unwrap();
    assert!(response.ok);
    assert_eq!(response.data.unwrap(), serde_json::json!("testhost"));

    handle.abort();
}

#[tokio::test]
async fn server_handles_get_text_format() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let mut result = ProviderResult::new();
    result.insert("name", Value::String("testhost.local".to_string()));
    cache.put("hostname", None, result);

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "get", "key": "hostname.name", "format": "text"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    // Text format for single field: raw value followed by newline
    assert_eq!(line.trim(), "testhost.local");

    handle.abort();
}

#[tokio::test]
async fn server_handles_cache_miss_with_sync_execution() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    // No cache populated — the server should execute the provider inline
    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    // hostname is a global provider that always returns data
    let request = r#"{"op": "get", "key": "hostname"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    let response: Response = serde_json::from_str(&line).unwrap();
    assert!(response.ok, "Response should be ok");
    assert!(
        response.data.is_some(),
        "Sync cache miss should return data from inline execution"
    );
    let data = response.data.unwrap();
    assert!(
        data.get("name").is_some(),
        "hostname provider should return a name field"
    );

    handle.abort();
}

#[tokio::test]
async fn server_handles_cache_miss_provider_returns_none() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    // git provider with no path — execute() returns None
    let request = r#"{"op": "get", "key": "git"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    let response: Response = serde_json::from_str(&line).unwrap();
    assert!(
        response.ok,
        "Response should be ok even when provider returns None"
    );
    assert!(
        response.data.is_none(),
        "Provider returning None should still produce a miss"
    );

    handle.abort();
}

#[tokio::test]
async fn server_handles_cache_miss_virtual_provider() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    // Register a virtual provider name but don't populate cache
    registry.register_virtual("myvirtual");

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "get", "key": "myvirtual"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    let response: Response = serde_json::from_str(&line).unwrap();
    assert!(response.ok);
    assert!(
        response.data.is_none(),
        "Virtual provider with no cache data should return miss (no execute to call)"
    );

    handle.abort();
}

#[tokio::test]
async fn server_handles_unknown_provider() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "get", "key": "nonexistent"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    let response: Response = serde_json::from_str(&line).unwrap();
    assert!(!response.ok, "Unknown provider should return error");
    assert!(response.error.unwrap().contains("unknown provider"));

    handle.abort();
}

#[tokio::test]
async fn server_handles_poke() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "poke", "key": "hostname"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut line = String::new();
    reader.read_line(&mut line).await.unwrap();

    let response: Response = serde_json::from_str(&line).unwrap();
    assert!(response.ok, "Poke should return ok");

    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    handle.abort();
}

#[tokio::test]
async fn server_handles_get_sh_format() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let mut result = ProviderResult::new();
    result.insert("name", Value::String("testhost.local".to_string()));
    result.insert("short", Value::String("testhost".to_string()));
    cache.put("hostname", None, result);

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "get", "key": "hostname", "format": "sh"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut lines = String::new();
    loop {
        let mut line = String::new();
        let n = reader.read_line(&mut line).await.unwrap();
        if n == 0 || line == "\n" {
            break;
        }
        lines.push_str(&line);
    }

    // Sh format for objects: sorted key=value pairs
    assert!(lines.contains("name=testhost.local"));
    assert!(lines.contains("short=testhost"));

    handle.abort();
}

#[tokio::test]
async fn server_text_format_object_returns_values_only() {
    let (_tmp, sock, cache, registry, watchers) = setup();

    let mut result = ProviderResult::new();
    result.insert("name", Value::String("testhost.local".to_string()));
    result.insert("short", Value::String("testhost".to_string()));
    cache.put("hostname", None, result);

    let server = Server::new(sock.clone(), cache, registry, None, watchers);
    let handle = tokio::spawn(async move { server.run().await });
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    let mut stream = UnixStream::connect(&sock).await.unwrap();
    let request = r#"{"op": "get", "key": "hostname", "format": "text"}"#;
    stream
        .write_all(format!("{request}\n").as_bytes())
        .await
        .unwrap();

    let mut reader = BufReader::new(stream);
    let mut lines = String::new();
    loop {
        let mut line = String::new();
        let n = reader.read_line(&mut line).await.unwrap();
        if n == 0 || line == "\n" {
            break;
        }
        lines.push_str(&line);
    }

    // Text format for objects: sorted values only, no key= prefix
    assert!(
        !lines.contains("name="),
        "Text format should not include key= prefix"
    );
    assert!(
        !lines.contains("short="),
        "Text format should not include key= prefix"
    );
    assert!(lines.contains("testhost.local"));
    assert!(lines.contains("testhost"));

    handle.abort();
}