use crate::jsonrpc::{JsonRpcError, JsonRpcMessage, JsonRpcRequest, JsonRpcResponse, NdJsonCodec};
use futures::SinkExt;
use tokio_util::codec::{FramedRead, FramedWrite};
use crate::error::ServiceSdkError;
use crate::trait_def::Service;
use crate::types::ServiceInitializeParams;
use futures::StreamExt;
pub struct ServiceHarness<S: Service> {
service: S,
}
impl<S: Service> ServiceHarness<S> {
pub fn new(service: S) -> Self {
Self { service }
}
pub async fn run_stdio(self) -> Result<(), ServiceSdkError> {
self.run_with_io(tokio::io::stdin(), tokio::io::stdout())
.await
}
pub async fn run_with_io<R, W>(mut self, reader: R, writer: W) -> Result<(), ServiceSdkError>
where
R: tokio::io::AsyncRead + Unpin + Send,
W: tokio::io::AsyncWrite + Unpin + Send,
{
let mut framed_read = FramedRead::new(reader, NdJsonCodec::new());
let mut framed_write = FramedWrite::new(writer, NdJsonCodec::new());
while let Some(frame) = framed_read.next().await {
let msg = match frame {
Ok(msg) => msg,
Err(e) => {
tracing::warn!(error = %e, "failed to decode JSON-RPC frame, skipping");
continue;
}
};
let req = match msg {
JsonRpcMessage::Request(req) => req,
JsonRpcMessage::Response(_) => continue,
};
let (response, should_exit) = self.dispatch(req).await?;
if let Some(resp) = response {
framed_write.send(resp).await.map_err(ServiceSdkError::Io)?;
}
if should_exit {
break;
}
}
Ok(())
}
async fn dispatch(
&mut self,
req: JsonRpcRequest,
) -> Result<(Option<JsonRpcResponse>, bool), ServiceSdkError> {
let id = req.id.clone();
let params = req.params.unwrap_or(serde_json::Value::Null);
match req.method.as_str() {
"initialize" => {
let init_params: ServiceInitializeParams = serde_json::from_value(params)?;
let result = self.service.on_initialize(init_params).await?;
let result_value = serde_json::to_value(&result)?;
Ok((
id.map(|id| JsonRpcResponse::success(Some(id), result_value)),
false,
))
}
"health" => {
let status = self.service.on_health().await;
let result_value = serde_json::to_value(&status)?;
Ok((
id.map(|id| JsonRpcResponse::success(Some(id), result_value)),
false,
))
}
"shutdown" => {
self.service.on_shutdown().await?;
let response =
id.map(|id| JsonRpcResponse::success(Some(id), serde_json::Value::Null));
Ok((response, true))
}
method => {
if let Some(result) = self.service.handle_unknown(method, params) {
let response = id.map(|id| JsonRpcResponse::success(Some(id), result));
Ok((response, false))
} else {
let response = id.map(|id| {
JsonRpcResponse::error(
Some(id),
JsonRpcError {
code: -32601,
message: format!("method not found: {method}"),
data: None,
},
)
});
Ok((response, false))
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{HealthStatus, ServiceHealthStatus, ServiceInitializeResult};
use rstest::rstest;
use std::collections::HashMap;
struct MockService {
init_port: u16,
}
impl MockService {
fn new(port: u16) -> Self {
Self { init_port: port }
}
}
impl Service for MockService {
async fn on_initialize(
&mut self,
_params: ServiceInitializeParams,
) -> Result<ServiceInitializeResult, ServiceSdkError> {
Ok(ServiceInitializeResult {
port: self.init_port,
installation: HashMap::new(),
})
}
async fn on_health(&self) -> ServiceHealthStatus {
ServiceHealthStatus {
status: HealthStatus::Healthy,
message: None,
}
}
async fn on_shutdown(&mut self) -> Result<(), ServiceSdkError> {
Ok(())
}
}
fn make_request(id: u64, method: &str, params: serde_json::Value) -> String {
let msg = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
format!("{}\n", serde_json::to_string(&msg).unwrap())
}
fn parse_responses(output: &[u8]) -> Vec<serde_json::Value> {
let text = String::from_utf8_lossy(output);
text.lines()
.filter(|l| !l.trim().is_empty())
.filter_map(|l| serde_json::from_str(l).ok())
.collect()
}
#[rstest]
#[tokio::test]
async fn when_initialize_received_then_responds_with_port_and_installation() {
let svc = MockService::new(8080);
let input = make_request(
1,
"initialize",
serde_json::json!({"namespace": "test", "options": {}, "port": 8080}),
);
let reader = std::io::Cursor::new(input.into_bytes());
let mut output = Vec::new();
let harness = ServiceHarness::new(svc);
harness.run_with_io(reader, &mut output).await.unwrap();
let responses = parse_responses(&output);
assert_eq!(responses.len(), 1);
assert_eq!(responses[0]["id"], 1);
assert_eq!(responses[0]["result"]["port"], 8080);
assert!(responses[0]["result"]["installation"].is_object());
}
#[rstest]
#[tokio::test]
async fn when_health_received_then_responds_with_status() {
let svc = MockService::new(9090);
let mut input = make_request(
1,
"initialize",
serde_json::json!({"namespace": "test", "options": {}, "port": 9090}),
);
input.push_str(&make_request(2, "health", serde_json::json!({})));
let reader = std::io::Cursor::new(input.into_bytes());
let mut output = Vec::new();
let harness = ServiceHarness::new(svc);
harness.run_with_io(reader, &mut output).await.unwrap();
let responses = parse_responses(&output);
assert_eq!(responses.len(), 2);
assert_eq!(responses[1]["id"], 2);
assert_eq!(responses[1]["result"]["status"], "healthy");
}
#[rstest]
#[tokio::test]
async fn when_shutdown_received_then_harness_exits() {
let svc = MockService::new(7070);
let mut input = make_request(
1,
"initialize",
serde_json::json!({"namespace": "test", "options": {}, "port": 7070}),
);
input.push_str(&make_request(2, "shutdown", serde_json::json!({})));
input.push_str(&make_request(3, "health", serde_json::json!({})));
let reader = std::io::Cursor::new(input.into_bytes());
let mut output = Vec::new();
let harness = ServiceHarness::new(svc);
harness.run_with_io(reader, &mut output).await.unwrap();
let responses = parse_responses(&output);
assert_eq!(responses.len(), 2);
assert_eq!(responses[0]["id"], 1);
assert_eq!(responses[1]["id"], 2);
assert!(responses[1]["result"].is_null());
}
#[rstest]
#[tokio::test]
async fn when_unknown_method_received_then_responds_with_error() {
let svc = MockService::new(6060);
let mut input = make_request(
1,
"initialize",
serde_json::json!({"namespace": "test", "options": {}, "port": 6060}),
);
input.push_str(&make_request(2, "custom/unknown", serde_json::json!({})));
let reader = std::io::Cursor::new(input.into_bytes());
let mut output = Vec::new();
let harness = ServiceHarness::new(svc);
harness.run_with_io(reader, &mut output).await.unwrap();
let responses = parse_responses(&output);
assert_eq!(responses.len(), 2);
assert_eq!(responses[1]["id"], 2);
assert_eq!(responses[1]["error"]["code"], -32601);
assert!(
responses[1]["error"]["message"]
.as_str()
.unwrap()
.contains("custom/unknown")
);
}
#[rstest]
#[tokio::test]
async fn when_handle_unknown_returns_some_then_harness_responds_with_success() {
struct CustomService;
impl Service for CustomService {
async fn on_initialize(
&mut self,
_params: ServiceInitializeParams,
) -> Result<ServiceInitializeResult, ServiceSdkError> {
Ok(ServiceInitializeResult {
port: 1234,
installation: HashMap::new(),
})
}
async fn on_health(&self) -> ServiceHealthStatus {
ServiceHealthStatus {
status: HealthStatus::Healthy,
message: None,
}
}
async fn on_shutdown(&mut self) -> Result<(), ServiceSdkError> {
Ok(())
}
fn handle_unknown(
&mut self,
method: &str,
_params: serde_json::Value,
) -> Option<serde_json::Value> {
if method == "custom/ping" {
Some(serde_json::json!({"pong": true}))
} else {
None
}
}
}
let svc = CustomService;
let mut input = make_request(
1,
"initialize",
serde_json::json!({"namespace": "test", "options": {}, "port": 1234}),
);
input.push_str(&make_request(2, "custom/ping", serde_json::json!({})));
let reader = std::io::Cursor::new(input.into_bytes());
let mut output = Vec::new();
let harness = ServiceHarness::new(svc);
harness.run_with_io(reader, &mut output).await.unwrap();
let responses = parse_responses(&output);
assert_eq!(responses.len(), 2);
assert_eq!(responses[1]["id"], 2);
assert!(
responses[1].get("error").is_none(),
"expected success response, got error: {:?}",
responses[1]["error"]
);
assert_eq!(responses[1]["result"]["pong"], true);
}
#[rstest]
#[tokio::test]
async fn when_handle_unknown_returns_none_then_harness_responds_with_32601() {
struct DefaultService;
impl Service for DefaultService {
async fn on_initialize(
&mut self,
_params: ServiceInitializeParams,
) -> Result<ServiceInitializeResult, ServiceSdkError> {
Ok(ServiceInitializeResult {
port: 2345,
installation: HashMap::new(),
})
}
async fn on_health(&self) -> ServiceHealthStatus {
ServiceHealthStatus {
status: HealthStatus::Healthy,
message: None,
}
}
async fn on_shutdown(&mut self) -> Result<(), ServiceSdkError> {
Ok(())
}
}
let svc = DefaultService;
let mut input = make_request(
1,
"initialize",
serde_json::json!({"namespace": "test", "options": {}, "port": 2345}),
);
input.push_str(&make_request(2, "custom/nope", serde_json::json!({})));
let reader = std::io::Cursor::new(input.into_bytes());
let mut output = Vec::new();
let harness = ServiceHarness::new(svc);
harness.run_with_io(reader, &mut output).await.unwrap();
let responses = parse_responses(&output);
assert_eq!(responses.len(), 2);
assert_eq!(responses[1]["id"], 2);
assert_eq!(responses[1]["error"]["code"], -32601);
}
#[rstest]
#[tokio::test]
async fn when_reader_reaches_eof_then_harness_exits_cleanly() {
let svc = MockService::new(5050);
let reader = std::io::Cursor::new(Vec::<u8>::new());
let mut output = Vec::new();
let harness = ServiceHarness::new(svc);
let result = harness.run_with_io(reader, &mut output).await;
assert!(result.is_ok());
}
}