ryo-server 0.1.0

[preview] RYO Server - tarpc-based RPC server for ryo operations
Documentation
//! Server integration tests
//!
//! These tests verify end-to-end RPC communication:
//! 1. Start server
//! 2. Connect client
//! 3. Make RPC calls
//! 4. Verify responses

use futures::StreamExt;
use ryo_app::{
    codec::{create_client_transport, create_server_transport},
    service::{RyoService, RyoServiceClient},
    Api, ConflictStrategy, DiscoverRequest, Goal, InMemoryStorage, Intent, RunRequest,
};
use ryo_server::RyoServer;
use std::time::Duration;
use tarpc::{
    client, context,
    server::{self, Channel},
};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::oneshot;

/// Initialize tracing for tests
fn init_tracing() {
    let _ = tracing_subscriber::fmt()
        .with_env_filter("ryo_server=debug,integration=debug")
        .with_test_writer()
        .try_init();
}

/// Helper to create a simple Goal for testing
fn simple_goal() -> Goal {
    Goal {
        query: String::new(),
        intents: vec![Intent::AddDerive {
            symbol_id: None,
            symbol_path: None,
            target_type: Some("TestStruct".to_string()),
            derives: vec!["Debug".to_string()],
        }],
        scope: Default::default(),
        constraints: vec![],
        conflict_strategy: ConflictStrategy::default(),
        confidence: 1.0,
    }
}

/// Create a test server with in-memory storage
fn create_test_api() -> Api {
    let storage = Box::new(InMemoryStorage::new());
    Api::new(storage)
}

/// Start a test server on a Unix socket and return shutdown sender
async fn start_test_server(socket_path: &str) -> oneshot::Sender<()> {
    let api = create_test_api();
    // Channel for RyoServer internal use (not used in tests)
    let (server_internal_tx, _server_internal_rx) = oneshot::channel();
    // Channel for test to signal shutdown
    let (test_shutdown_tx, test_shutdown_rx) = oneshot::channel();
    let server = RyoServer::new(api, server_internal_tx);

    // Remove stale socket
    let _ = std::fs::remove_file(socket_path);
    let listener = UnixListener::bind(socket_path).expect("Failed to bind socket");

    // Spawn server task
    let socket_path_owned = socket_path.to_string();
    tokio::spawn(async move {
        tokio::select! {
            _ = async {
                loop {
                    match listener.accept().await {
                        Ok((stream, _)) => {
                            // Use shared codec from ryo_app::codec
                            let transport = create_server_transport(stream);
                            let channel = server::BaseChannel::with_defaults(transport);
                            let server_clone = server.clone();
                            tokio::spawn(async move {
                                channel
                                    .execute(server_clone.serve())
                                    .for_each(|response| async move {
                                        tokio::spawn(response);
                                    })
                                    .await;
                            });
                        }
                        Err(e) => {
                            tracing::error!("accept error: {}", e);
                            break;
                        }
                    }
                }
            } => {}
            _ = test_shutdown_rx => {
                tracing::info!("Test server shutdown");
            }
        }
        let _ = std::fs::remove_file(&socket_path_owned);
    });

    // Wait for server to be ready
    tokio::time::sleep(Duration::from_millis(100)).await;

    test_shutdown_tx
}

/// Connect to the test server
async fn connect_test_client(socket_path: &str) -> RyoServiceClient {
    let stream = UnixStream::connect(socket_path)
        .await
        .expect("Failed to connect to test server");

    // Use shared codec from ryo_app::codec
    let transport = create_client_transport(stream);

    RyoServiceClient::new(client::Config::default(), transport).spawn()
}

/// Create a test context with timeout
fn test_context() -> context::Context {
    let mut ctx = context::current();
    ctx.deadline = std::time::Instant::now() + Duration::from_secs(30);
    ctx
}

// ============================================================================
// Tests
// ============================================================================

#[tokio::test]
async fn test_ping_rpc() {
    init_tracing();
    let socket_path = "/tmp/ryo-test-ping.sock";
    let _shutdown = start_test_server(socket_path).await;

    let client = connect_test_client(socket_path).await;

    // Ping should succeed
    let result = client.ping(test_context()).await;
    assert!(result.is_ok(), "Ping failed: {:?}", result);

    // Cleanup
    let _ = std::fs::remove_file(socket_path);
}

#[tokio::test]
async fn test_status_rpc() {
    init_tracing();
    let socket_path = "/tmp/ryo-test-status.sock";
    let _shutdown = start_test_server(socket_path).await;

    let client = connect_test_client(socket_path).await;

    // Status should return valid response
    let status = client.status(test_context()).await;
    assert!(status.is_ok(), "Status failed: {:?}", status);

    let status = status.unwrap();
    tracing::info!("Status: symbols={}, files={}", status.symbols, status.files);

    // Cleanup
    let _ = std::fs::remove_file(socket_path);
}

#[tokio::test]
async fn test_discover_rpc() {
    init_tracing();
    let socket_path = "/tmp/ryo-test-discover.sock";
    let _shutdown = start_test_server(socket_path).await;

    let client = connect_test_client(socket_path).await;

    // Discover with empty pattern (should work even with no files)
    let request = DiscoverRequest {
        pattern: "*".to_string(),
        limit: Some(10),
        ..Default::default()
    };

    let result = client.discover(test_context(), request).await;
    tracing::info!("Discover RPC result: {:?}", result);

    // Should succeed (even if no matches)
    assert!(
        result.is_ok(),
        "Discover RPC transport failed: {:?}",
        result
    );

    // Cleanup
    let _ = std::fs::remove_file(socket_path);
}

#[tokio::test]
async fn test_run_rpc_dry_run() {
    init_tracing();
    let socket_path = "/tmp/ryo-test-run.sock";
    let _shutdown = start_test_server(socket_path).await;

    let client = connect_test_client(socket_path).await;

    // Create a simple run request
    let request = RunRequest {
        goal: simple_goal(),
        dry_run: true,
        check_syntax: false,
    };

    tracing::info!("Sending RunRequest...");
    let result = client.run(test_context(), request).await;
    tracing::info!("Run RPC result: {:?}", result);

    // Should succeed at transport level (even if mutation fails)
    assert!(result.is_ok(), "Run RPC transport failed: {:?}", result);

    // Cleanup
    let _ = std::fs::remove_file(socket_path);
}

#[tokio::test]
async fn test_run_vs_discover_comparison() {
    init_tracing();
    let socket_path = "/tmp/ryo-test-compare.sock";
    let _shutdown = start_test_server(socket_path).await;

    let client = connect_test_client(socket_path).await;

    // First: Discover (known working)
    let discover_req = DiscoverRequest {
        pattern: "*".to_string(),
        limit: Some(10),
        ..Default::default()
    };

    tracing::info!("Testing Discover...");
    let discover_result = client.discover(test_context(), discover_req).await;
    tracing::info!("Discover result: {:?}", discover_result);
    assert!(
        discover_result.is_ok(),
        "Discover failed: {:?}",
        discover_result
    );

    // Second: Run (potentially broken)
    let run_req = RunRequest {
        goal: simple_goal(),
        dry_run: true,
        check_syntax: false,
    };

    tracing::info!("Testing Run...");
    let run_result = client.run(test_context(), run_req).await;
    tracing::info!("Run result: {:?}", run_result);
    assert!(run_result.is_ok(), "Run failed: {:?}", run_result);

    // Cleanup
    let _ = std::fs::remove_file(socket_path);
}

/// Test that verifies the exact scenario from CLI
#[tokio::test]
async fn test_run_rpc_matches_cli_usage() {
    init_tracing();
    let socket_path = "/tmp/ryo-test-cli.sock";
    let _shutdown = start_test_server(socket_path).await;

    let client = connect_test_client(socket_path).await;

    // Replicate CLI usage: --dsl '{"intent":{"type":"AddDerive","target_name":"Goal","derives":["Default"]}}'
    let goal = Goal {
        query: String::new(),
        intents: vec![Intent::AddDerive {
            symbol_id: None,
            symbol_path: None,
            target_type: Some("Goal".to_string()),
            derives: vec!["Default".to_string()],
        }],
        scope: Default::default(),
        constraints: vec![],
        conflict_strategy: ConflictStrategy::default(),
        confidence: 1.0,
    };

    let request = RunRequest {
        goal,
        dry_run: true,
        check_syntax: true,
    };

    // Log request for debugging
    tracing::info!(
        "RunRequest created with {} intents",
        request.goal.intents.len()
    );

    // Make the call
    tracing::info!("Making run RPC call (matching CLI usage)...");
    let result = client.run(test_context(), request).await;

    match &result {
        Ok(Ok(response)) => {
            tracing::info!("Run succeeded: {:?}", response);
        }
        Ok(Err(ryo_err)) => {
            tracing::info!("Run returned RyoError: {:?}", ryo_err);
        }
        Err(rpc_err) => {
            tracing::error!("RPC transport error: {:?}", rpc_err);
        }
    }

    assert!(result.is_ok(), "Run RPC transport failed: {:?}", result);

    // Cleanup
    let _ = std::fs::remove_file(socket_path);
}