anyclaw-sdk-service 0.2.0

SDK for building anyclaw service extensions
Documentation
use crate::jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, NdJsonCodec};
use futures::SinkExt;
use tokio_util::codec::{FramedRead, FramedWrite};

use crate::error::ServiceSdkError;
use crate::trait_def::Service;
use crate::types::ServiceInitializeParams;

use futures::StreamExt;

/// JSON-RPC stdio harness that drives a [`Service`] implementation.
///
/// Handles line-delimited JSON framing and dispatches `initialize`, `health`,
/// and `shutdown` methods to the wrapped [`Service`]. The harness exits after
/// processing `shutdown` or when stdin reaches EOF.
pub struct ServiceHarness<S: Service> {
    service: S,
}

impl<S: Service> ServiceHarness<S> {
    /// Wrap a [`Service`] implementation for harness-driven execution.
    pub fn new(service: S) -> Self {
        Self { service }
    }

    /// Run the harness event loop over real stdio (stdin/stdout).
    pub async fn run_stdio(self) -> Result<(), ServiceSdkError> {
        self.run_with_io(tokio::io::stdin(), tokio::io::stdout())
            .await
    }

    /// Run the harness event loop over the given async reader and writer.
    ///
    /// This is the testable entry point — tests pass `tokio::io::duplex` halves.
    pub async fn run_with_io<R, W>(mut self, reader: R, writer: W) -> Result<(), ServiceSdkError>
    where
        R: tokio::io::AsyncRead + Unpin + Send,
        W: tokio::io::AsyncWrite + Unpin + Send,
    {
        let mut framed_read = FramedRead::new(reader, NdJsonCodec::new());
        let mut framed_write = FramedWrite::new(writer, NdJsonCodec::new());

        while let Some(frame) = framed_read.next().await {
            let msg = match frame {
                Ok(msg) => msg,
                Err(e) => {
                    tracing::warn!(error = %e, "failed to decode JSON-RPC frame, skipping");
                    continue;
                }
            };

            let req = match msg {
                JsonRpcMessage::Request(req) => req,
                JsonRpcMessage::Response(_) => continue,
            };

            let (response, should_exit) = self.dispatch(req).await?;

            if let Some(resp) = response {
                framed_write.send(resp).await.map_err(ServiceSdkError::Io)?;
            }

            if should_exit {
                break;
            }
        }

        Ok(())
    }

    /// Dispatch a single JSON-RPC request to the appropriate [`Service`] method.
    ///
    /// Returns `(Option<response>, should_exit)`.
    async fn dispatch(
        &mut self,
        req: JsonRpcRequest,
    ) -> Result<(Option<JsonRpcResponse>, bool), ServiceSdkError> {
        let id = req.id.clone();
        let params = req.params.unwrap_or(serde_json::Value::Null);

        match req.method.as_str() {
            "initialize" => {
                let init_params: ServiceInitializeParams = serde_json::from_value(params)?;
                let result = self.service.on_initialize(init_params).await?;
                let result_value = serde_json::to_value(&result)?;
                Ok((
                    id.map(|id| JsonRpcResponse::success(Some(id), result_value)),
                    false,
                ))
            }
            "health" => {
                let status = self.service.on_health().await;
                let result_value = serde_json::to_value(&status)?;
                Ok((
                    id.map(|id| JsonRpcResponse::success(Some(id), result_value)),
                    false,
                ))
            }
            "shutdown" => {
                self.service.on_shutdown().await?;
                let response =
                    id.map(|id| JsonRpcResponse::success(Some(id), serde_json::Value::Null));
                Ok((response, true))
            }
            method => {
                let response = id.map(|id| {
                    JsonRpcResponse::error(
                        Some(id),
                        JsonRpcError {
                            code: -32601,
                            message: format!("method not found: {method}"),
                            data: None,
                        },
                    )
                });
                Ok((response, false))
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::{HealthStatus, ServiceHealthStatus, ServiceInitializeResult};
    use rstest::rstest;
    use std::collections::HashMap;

    struct MockService {
        init_port: u16,
    }

    impl MockService {
        fn new(port: u16) -> Self {
            Self { init_port: port }
        }
    }

    impl Service for MockService {
        async fn on_initialize(
            &mut self,
            _params: ServiceInitializeParams,
        ) -> Result<ServiceInitializeResult, ServiceSdkError> {
            Ok(ServiceInitializeResult {
                port: self.init_port,
                installation: HashMap::new(),
            })
        }

        async fn on_health(&self) -> ServiceHealthStatus {
            ServiceHealthStatus {
                status: HealthStatus::Healthy,
                message: None,
            }
        }

        async fn on_shutdown(&mut self) -> Result<(), ServiceSdkError> {
            Ok(())
        }
    }

    fn make_request(id: u64, method: &str, params: serde_json::Value) -> String {
        let msg = serde_json::json!({
            "jsonrpc": "2.0",
            "id": id,
            "method": method,
            "params": params,
        });
        format!("{}\n", serde_json::to_string(&msg).unwrap())
    }

    fn parse_responses(output: &[u8]) -> Vec<serde_json::Value> {
        let text = String::from_utf8_lossy(output);
        text.lines()
            .filter(|l| !l.trim().is_empty())
            .filter_map(|l| serde_json::from_str(l).ok())
            .collect()
    }

    #[rstest]
    #[tokio::test]
    async fn when_initialize_received_then_responds_with_port_and_installation() {
        let svc = MockService::new(8080);
        let input = make_request(
            1,
            "initialize",
            serde_json::json!({"namespace": "test", "options": {}, "port": 8080}),
        );
        let reader = std::io::Cursor::new(input.into_bytes());
        let mut output = Vec::new();

        let harness = ServiceHarness::new(svc);
        harness.run_with_io(reader, &mut output).await.unwrap();

        let responses = parse_responses(&output);
        assert_eq!(responses.len(), 1);
        assert_eq!(responses[0]["id"], 1);
        assert_eq!(responses[0]["result"]["port"], 8080);
        assert!(responses[0]["result"]["installation"].is_object());
    }

    #[rstest]
    #[tokio::test]
    async fn when_health_received_then_responds_with_status() {
        let svc = MockService::new(9090);
        let mut input = make_request(
            1,
            "initialize",
            serde_json::json!({"namespace": "test", "options": {}, "port": 9090}),
        );
        input.push_str(&make_request(2, "health", serde_json::json!({})));

        let reader = std::io::Cursor::new(input.into_bytes());
        let mut output = Vec::new();

        let harness = ServiceHarness::new(svc);
        harness.run_with_io(reader, &mut output).await.unwrap();

        let responses = parse_responses(&output);
        assert_eq!(responses.len(), 2);
        assert_eq!(responses[1]["id"], 2);
        assert_eq!(responses[1]["result"]["status"], "healthy");
    }

    #[rstest]
    #[tokio::test]
    async fn when_shutdown_received_then_harness_exits() {
        let svc = MockService::new(7070);
        let mut input = make_request(
            1,
            "initialize",
            serde_json::json!({"namespace": "test", "options": {}, "port": 7070}),
        );
        input.push_str(&make_request(2, "shutdown", serde_json::json!({})));
        // This request should never be processed — harness exits after shutdown.
        input.push_str(&make_request(3, "health", serde_json::json!({})));

        let reader = std::io::Cursor::new(input.into_bytes());
        let mut output = Vec::new();

        let harness = ServiceHarness::new(svc);
        harness.run_with_io(reader, &mut output).await.unwrap();

        let responses = parse_responses(&output);
        // Only initialize + shutdown responses; health after shutdown is not processed.
        assert_eq!(responses.len(), 2);
        assert_eq!(responses[0]["id"], 1);
        assert_eq!(responses[1]["id"], 2);
        assert!(responses[1]["result"].is_null());
    }

    #[rstest]
    #[tokio::test]
    async fn when_unknown_method_received_then_responds_with_error() {
        let svc = MockService::new(6060);
        let mut input = make_request(
            1,
            "initialize",
            serde_json::json!({"namespace": "test", "options": {}, "port": 6060}),
        );
        input.push_str(&make_request(2, "custom/unknown", serde_json::json!({})));

        let reader = std::io::Cursor::new(input.into_bytes());
        let mut output = Vec::new();

        let harness = ServiceHarness::new(svc);
        harness.run_with_io(reader, &mut output).await.unwrap();

        let responses = parse_responses(&output);
        assert_eq!(responses.len(), 2);
        assert_eq!(responses[1]["id"], 2);
        assert_eq!(responses[1]["error"]["code"], -32601);
        assert!(
            responses[1]["error"]["message"]
                .as_str()
                .unwrap()
                .contains("custom/unknown")
        );
    }

    #[rstest]
    #[tokio::test]
    async fn when_reader_reaches_eof_then_harness_exits_cleanly() {
        let svc = MockService::new(5050);
        let reader = std::io::Cursor::new(Vec::<u8>::new());
        let mut output = Vec::new();

        let harness = ServiceHarness::new(svc);
        let result = harness.run_with_io(reader, &mut output).await;
        assert!(result.is_ok());
    }
}