use futures::StreamExt;
use ryo_app::{
codec::{create_client_transport, create_server_transport},
service::{RyoService, RyoServiceClient},
Api, ConflictStrategy, DiscoverRequest, Goal, InMemoryStorage, Intent, RunRequest,
};
use ryo_server::RyoServer;
use std::time::Duration;
use tarpc::{
client, context,
server::{self, Channel},
};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::oneshot;
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter("ryo_server=debug,integration=debug")
.with_test_writer()
.try_init();
}
fn simple_goal() -> Goal {
Goal {
query: String::new(),
intents: vec![Intent::AddDerive {
symbol_id: None,
symbol_path: None,
target_type: Some("TestStruct".to_string()),
derives: vec!["Debug".to_string()],
}],
scope: Default::default(),
constraints: vec![],
conflict_strategy: ConflictStrategy::default(),
confidence: 1.0,
}
}
fn create_test_api() -> Api {
let storage = Box::new(InMemoryStorage::new());
Api::new(storage)
}
async fn start_test_server(socket_path: &str) -> oneshot::Sender<()> {
let api = create_test_api();
let (server_internal_tx, _server_internal_rx) = oneshot::channel();
let (test_shutdown_tx, test_shutdown_rx) = oneshot::channel();
let server = RyoServer::new(api, server_internal_tx);
let _ = std::fs::remove_file(socket_path);
let listener = UnixListener::bind(socket_path).expect("Failed to bind socket");
let socket_path_owned = socket_path.to_string();
tokio::spawn(async move {
tokio::select! {
_ = async {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let transport = create_server_transport(stream);
let channel = server::BaseChannel::with_defaults(transport);
let server_clone = server.clone();
tokio::spawn(async move {
channel
.execute(server_clone.serve())
.for_each(|response| async move {
tokio::spawn(response);
})
.await;
});
}
Err(e) => {
tracing::error!("accept error: {}", e);
break;
}
}
}
} => {}
_ = test_shutdown_rx => {
tracing::info!("Test server shutdown");
}
}
let _ = std::fs::remove_file(&socket_path_owned);
});
tokio::time::sleep(Duration::from_millis(100)).await;
test_shutdown_tx
}
async fn connect_test_client(socket_path: &str) -> RyoServiceClient {
let stream = UnixStream::connect(socket_path)
.await
.expect("Failed to connect to test server");
let transport = create_client_transport(stream);
RyoServiceClient::new(client::Config::default(), transport).spawn()
}
fn test_context() -> context::Context {
let mut ctx = context::current();
ctx.deadline = std::time::Instant::now() + Duration::from_secs(30);
ctx
}
#[tokio::test]
async fn test_ping_rpc() {
init_tracing();
let socket_path = "/tmp/ryo-test-ping.sock";
let _shutdown = start_test_server(socket_path).await;
let client = connect_test_client(socket_path).await;
let result = client.ping(test_context()).await;
assert!(result.is_ok(), "Ping failed: {:?}", result);
let _ = std::fs::remove_file(socket_path);
}
#[tokio::test]
async fn test_status_rpc() {
init_tracing();
let socket_path = "/tmp/ryo-test-status.sock";
let _shutdown = start_test_server(socket_path).await;
let client = connect_test_client(socket_path).await;
let status = client.status(test_context()).await;
assert!(status.is_ok(), "Status failed: {:?}", status);
let status = status.unwrap();
tracing::info!("Status: symbols={}, files={}", status.symbols, status.files);
let _ = std::fs::remove_file(socket_path);
}
#[tokio::test]
async fn test_discover_rpc() {
init_tracing();
let socket_path = "/tmp/ryo-test-discover.sock";
let _shutdown = start_test_server(socket_path).await;
let client = connect_test_client(socket_path).await;
let request = DiscoverRequest {
pattern: "*".to_string(),
limit: Some(10),
..Default::default()
};
let result = client.discover(test_context(), request).await;
tracing::info!("Discover RPC result: {:?}", result);
assert!(
result.is_ok(),
"Discover RPC transport failed: {:?}",
result
);
let _ = std::fs::remove_file(socket_path);
}
#[tokio::test]
async fn test_run_rpc_dry_run() {
init_tracing();
let socket_path = "/tmp/ryo-test-run.sock";
let _shutdown = start_test_server(socket_path).await;
let client = connect_test_client(socket_path).await;
let request = RunRequest {
goal: simple_goal(),
dry_run: true,
check_syntax: false,
};
tracing::info!("Sending RunRequest...");
let result = client.run(test_context(), request).await;
tracing::info!("Run RPC result: {:?}", result);
assert!(result.is_ok(), "Run RPC transport failed: {:?}", result);
let _ = std::fs::remove_file(socket_path);
}
#[tokio::test]
async fn test_run_vs_discover_comparison() {
init_tracing();
let socket_path = "/tmp/ryo-test-compare.sock";
let _shutdown = start_test_server(socket_path).await;
let client = connect_test_client(socket_path).await;
let discover_req = DiscoverRequest {
pattern: "*".to_string(),
limit: Some(10),
..Default::default()
};
tracing::info!("Testing Discover...");
let discover_result = client.discover(test_context(), discover_req).await;
tracing::info!("Discover result: {:?}", discover_result);
assert!(
discover_result.is_ok(),
"Discover failed: {:?}",
discover_result
);
let run_req = RunRequest {
goal: simple_goal(),
dry_run: true,
check_syntax: false,
};
tracing::info!("Testing Run...");
let run_result = client.run(test_context(), run_req).await;
tracing::info!("Run result: {:?}", run_result);
assert!(run_result.is_ok(), "Run failed: {:?}", run_result);
let _ = std::fs::remove_file(socket_path);
}
#[tokio::test]
async fn test_run_rpc_matches_cli_usage() {
init_tracing();
let socket_path = "/tmp/ryo-test-cli.sock";
let _shutdown = start_test_server(socket_path).await;
let client = connect_test_client(socket_path).await;
let goal = Goal {
query: String::new(),
intents: vec![Intent::AddDerive {
symbol_id: None,
symbol_path: None,
target_type: Some("Goal".to_string()),
derives: vec!["Default".to_string()],
}],
scope: Default::default(),
constraints: vec![],
conflict_strategy: ConflictStrategy::default(),
confidence: 1.0,
};
let request = RunRequest {
goal,
dry_run: true,
check_syntax: true,
};
tracing::info!(
"RunRequest created with {} intents",
request.goal.intents.len()
);
tracing::info!("Making run RPC call (matching CLI usage)...");
let result = client.run(test_context(), request).await;
match &result {
Ok(Ok(response)) => {
tracing::info!("Run succeeded: {:?}", response);
}
Ok(Err(ryo_err)) => {
tracing::info!("Run returned RyoError: {:?}", ryo_err);
}
Err(rpc_err) => {
tracing::error!("RPC transport error: {:?}", rpc_err);
}
}
assert!(result.is_ok(), "Run RPC transport failed: {:?}", result);
let _ = std::fs::remove_file(socket_path);
}