lilo-rm-client 0.7.1

Runtime Matters client shell for the public rtmd Unix socket contract
Documentation
use std::fs::{OpenOptions, create_dir_all};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

use lilo_rm_client::{ClientError, EventWatcher, RuntimeClient};
use lilo_rm_core::{
    EventBatch, EventsRequest, ProtocolError, RUNTIME_PROTOCOL_VERSION, RuntimeResponse,
    RuntimeRpc, VersionInfo, VersionPayload, read_json_line, write_json_line,
};
use rtm_daemon::{DaemonConfig, ReconcileConfig, run_daemon};
use rtm_store::StoreConfig;
use serde_json::json;
use tokio::io::BufReader;
use tokio::net::{UnixListener, UnixStream};
use tokio::task::JoinHandle;
use uuid::Uuid;

struct TestDaemon {
    client: RuntimeClient,
    task: JoinHandle<()>,
    tempdir: tempfile::TempDir,
}

impl TestDaemon {
    async fn start_with_log(records: &[serde_json::Value]) -> Self {
        let tempdir = tempfile::tempdir().expect("tempdir");
        let socket_path = tempdir.path().join("rtmd.sock");
        write_event_log(tempdir.path(), records);
        let config = DaemonConfig {
            endpoint: rtm_paths::RuntimeEndpoint::unix_socket(socket_path.clone()),
            shim_path: std::env::current_exe().expect("current test executable"),
            log_root: tempdir.path().join("logs"),
            store: StoreConfig {
                db_path: tempdir.path().join("rtm.sqlite"),
            },
            reconcile: ReconcileConfig::default(),
            docker_preflight: Default::default(),
        };
        let task = tokio::spawn(async move {
            run_daemon(config).await.expect("daemon run");
        });
        wait_for_socket(&socket_path).await;
        Self {
            client: RuntimeClient::new(socket_path),
            task,
            tempdir,
        }
    }

    fn client(&self) -> RuntimeClient {
        self.client.clone()
    }

    async fn stop(self) {
        let response = self
            .client
            .request(RuntimeRpc::Stop)
            .await
            .expect("stop daemon");
        assert_eq!(response, RuntimeResponse::Stopping);
        self.task.await.expect("daemon task");
        drop(self.tempdir);
    }
}

async fn wait_for_socket(socket_path: &Path) {
    let deadline = Instant::now() + Duration::from_secs(5);
    let mut last_error = None;
    while Instant::now() < deadline {
        match UnixStream::connect(socket_path).await {
            Ok(_) => return,
            Err(error) => {
                last_error = Some(error);
                tokio::time::sleep(Duration::from_millis(25)).await;
            }
        }
    }
    panic!(
        "daemon socket never accepted connections at {}; last error={last_error:?}",
        socket_path.display()
    );
}

#[tokio::test]
async fn connect_rejects_protocol_mismatch() {
    let (client, server) = mock_version_client("0.3").await;

    let error = EventWatcher::builder()
        .connect(client)
        .await
        .expect_err("protocol mismatch should fail before polling");

    match error {
        ClientError::Protocol {
            source: ProtocolError::UnsupportedVersion { expected, got },
        } => {
            assert_eq!(expected, RUNTIME_PROTOCOL_VERSION);
            assert_eq!(got, "0.3");
        }
        other => panic!("unexpected client error: {other:?}"),
    }
    server.await.expect("server task");
}

#[tokio::test]
async fn connect_accepts_matching_protocol() {
    let (client, server) = mock_version_client(RUNTIME_PROTOCOL_VERSION).await;

    let watcher = EventWatcher::builder()
        .since(7)
        .connect(client)
        .await
        .expect("matching protocol should connect");

    assert_eq!(watcher.current_cursor(), Some(&7));
    server.await.expect("server task");
}

#[tokio::test]
async fn next_uses_default_wait_ms() {
    let request = next_request(EventWatcher::builder()).await;

    assert_eq!(
        request,
        EventsRequest {
            since: None,
            wait_ms: Some(30_000)
        }
    );
}

#[tokio::test]
async fn next_uses_configured_wait_ms_and_seek_cursor() {
    let request = next_request(EventWatcher::builder().since(3).wait_ms(25)).await;

    assert_eq!(
        request,
        EventsRequest {
            since: Some(3),
            wait_ms: Some(25)
        }
    );
}

#[tokio::test]
async fn cursor_durability_survives_watcher_rebuild() {
    let daemon = TestDaemon::start_with_log(&[event_record(1), event_record(2)]).await;
    let mut watcher = EventWatcher::builder()
        .wait_ms(0)
        .connect(daemon.client())
        .await
        .expect("connect watcher");

    let first = watcher.next().await.expect("first batch");
    assert_event_count(&first, 2);
    let persisted = *watcher.current_cursor().expect("persisted cursor");
    drop(watcher);

    let mut rebuilt = EventWatcher::builder()
        .since(persisted)
        .wait_ms(0)
        .connect(daemon.client())
        .await
        .expect("reconnect watcher");
    let second = rebuilt.next().await.expect("resumed batch");

    assert_event_count(&second, 0);
    assert_eq!(rebuilt.current_cursor(), Some(&persisted));
    daemon.stop().await;
}

#[tokio::test]
async fn cursor_expired_advances_cursor_and_can_resume_from_oldest() {
    let daemon = TestDaemon::start_with_log(&[event_record(3)]).await;
    let mut watcher = EventWatcher::builder()
        .since(0)
        .wait_ms(0)
        .connect(daemon.client())
        .await
        .expect("connect watcher");

    let expired = watcher.next().await.expect("expired cursor batch");
    assert_eq!(expired, EventBatch::CursorExpired { oldest: 2 });
    assert_eq!(watcher.current_cursor(), Some(&2));

    let resumed = watcher.next().await.expect("resumed batch");
    assert_event_count(&resumed, 1);
    assert_eq!(watcher.current_cursor(), Some(&3));
    daemon.stop().await;
}

#[tokio::test]
async fn seek_repositions_next_request() {
    let daemon = TestDaemon::start_with_log(&[event_record(1), event_record(2)]).await;
    let mut watcher = EventWatcher::builder()
        .since(2)
        .wait_ms(0)
        .connect(daemon.client())
        .await
        .expect("connect watcher");

    watcher.seek(1);
    let batch = watcher.next().await.expect("seek batch");

    assert_event_count(&batch, 1);
    assert_eq!(watcher.current_cursor(), Some(&2));
    daemon.stop().await;
}

async fn mock_version_client(protocol_version: &str) -> (RuntimeClient, JoinHandle<()>) {
    let tempdir = tempfile::tempdir().expect("tempdir");
    let socket_path = tempdir.path().join("rtmd.sock");
    let listener = UnixListener::bind(&socket_path).expect("bind test socket");
    let client = RuntimeClient::new(socket_path);
    let mut version = VersionInfo::new("0.6.0", "test-sha");
    version.protocol_version = protocol_version.to_owned();
    let server = tokio::spawn(async move {
        let _tempdir = tempdir;
        let (stream, _) = listener.accept().await.expect("accept client");
        let (read_half, mut write_half) = stream.into_split();
        let mut reader = BufReader::new(read_half);
        let rpc: RuntimeRpc = read_json_line(&mut reader).await.expect("read rpc");
        assert_eq!(rpc, RuntimeRpc::Version);
        write_json_line(
            &mut write_half,
            &RuntimeResponse::Version(VersionPayload { version }),
        )
        .await
        .expect("write response");
    });
    (client, server)
}

async fn next_request(builder: lilo_rm_client::EventWatcherBuilder) -> EventsRequest {
    let (tempdir, socket_path) = temp_socket_path();
    let listener = UnixListener::bind(&socket_path).expect("bind test socket");
    let client = RuntimeClient::new(socket_path);
    let server = tokio::spawn(async move {
        let _tempdir = tempdir;
        let (stream, _) = listener.accept().await.expect("accept client");
        let (read_half, mut write_half) = stream.into_split();
        let mut reader = BufReader::new(read_half);
        let rpc: RuntimeRpc = read_json_line(&mut reader).await.expect("read rpc");
        let RuntimeRpc::Events { request } = rpc else {
            panic!("expected events rpc");
        };
        write_json_line(
            &mut write_half,
            &RuntimeResponse::Events(lilo_rm_core::EventsPayload {
                events: Vec::new(),
                cursor: request.since.unwrap_or_default(),
            }),
        )
        .await
        .expect("write response");
        request
    });
    let mut watcher = builder.build(client);
    watcher.next().await.expect("watcher next");
    server.await.expect("server task")
}

fn temp_socket_path() -> (tempfile::TempDir, PathBuf) {
    let tempdir = tempfile::tempdir().expect("tempdir");
    let socket_path = tempdir.path().join("rtmd.sock");
    (tempdir, socket_path)
}

fn assert_event_count(batch: &EventBatch, expected: usize) {
    match batch {
        EventBatch::Events { events, .. } => assert_eq!(events.len(), expected),
        other => panic!("expected events batch, got {other:?}"),
    }
}

fn write_event_log(root: &Path, records: &[serde_json::Value]) {
    let path = root.join("events.jsonl");
    create_dir_all(path.parent().expect("event log parent")).expect("event log dir");
    let mut file = OpenOptions::new()
        .create(true)
        .truncate(true)
        .write(true)
        .open(&path)
        .expect("event log");
    for record in records {
        writeln!(file, "{record}").expect("record");
    }
}

fn event_record(seq: u64) -> serde_json::Value {
    json!({
        "seq": seq,
        "ts_ms": 1_700_000_000_000_u64,
        "kind": "running",
        "payload": {
            "session_id": Uuid::now_v7(),
            "runtime_pid": 4242,
            "start_time": "2023-11-14T22:13:20Z"
        }
    })
}