Skip to main content

ryo_server/
lib.rs

1//! RYO Server - tarpc-based RPC server
2//!
3//! This crate provides the server implementation for RYO RPC.
4//! It wraps the Api and exposes it via tarpc over Unix Domain Socket.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────────┐
10//! │  RyoServer                                                          │
11//! │  ├── api: Arc<Mutex<Api>>  (single mutex for all operations)       │
12//! │  ├── shutdown_tx: Shutdown signal sender                           │
13//! │  └── last_activity: Atomic timestamp for idle detection            │
14//! └─────────────────────────────────────────────────────────────────────┘
15//!                              │
16//!                              │ UDS + tarpc (serde_transport)
17//!                              ▼
18//! ┌─────────────────────────────────────────────────────────────────────┐
19//! │  Clients (ryo-cli)                                                  │
20//! │  └── RyoServiceClient                                               │
21//! └─────────────────────────────────────────────────────────────────────┘
22//! ```
23//!
24//! # Idle Timeout
25//!
26//! Server automatically shuts down after configurable idle time (default 1 hour).
27//! Configure via `~/.ryo/config.toml`:
28//! ```toml
29//! [server]
30//! idle_timeout = 3600  # 1 hour (0 = never timeout)
31//! ```
32//!
33//! # Parallel Initialization
34//!
35//! By default, server uses parallel initialization for faster startup.
36//! Configure via `~/.ryo/config.toml`:
37//! ```toml
38//! [server]
39//! parallel_init = true
40//! ```
41//!
42//! # Locking Strategy
43//!
44//! All operations acquire a single Mutex. This is sufficient because:
45//! - Write operations (run) are high-frequency but short-duration (~100μs)
46//! - Read operations (discover, suggest) are low-frequency
47//! - Lock contention is ~1% (100 writes/sec × 100μs = 10ms/sec)
48//!
49//! RWLock overhead is unnecessary for this workload.
50
51pub mod watcher;
52
53use ryo_analysis::AnalysisContext;
54use ryo_app::api::{
55    Api, BorrowAnalysisRequest, BorrowAnalysisResponse, CascadeRequest, CascadeResponse,
56    ChainAnalysisRequest, ChainAnalysisResponse, DiscoverRequest, DiscoverResponse,
57    FlowAnalysisRequest, FlowAnalysisResponse, GraphSummaryRequest, GraphSummaryResponse,
58    LiteralSearchRequest, LiteralSearchResponse, LockAnalysisRequest, LockAnalysisResponse,
59    OverviewRequest, OverviewResponse, PingResponse, QueryResponse, RunRequest, RunResponse,
60    RyoqlRequest, SpecRequest, SpecResponse, StatusResponse, SuggestApplyRequest,
61    SuggestApplyResponse, SuggestChoicesRequest, SuggestChoicesResponse, SuggestCompareRequest,
62    SuggestCompareResponse, SuggestGenerateRequest, SuggestGenerateResponse, SuggestRequest,
63    SuggestResponse, SuggestVerifyRequest, SuggestVerifyResponse, TypeAnalysisRequest,
64    TypeAnalysisResponse,
65};
66use ryo_app::service::{RyoError, RyoService};
67use ryo_app::{InMemoryStorage, Project};
68use ryo_storage::GlobalConfig;
69use ryo_symbol::write_with_parents;
70use std::path::PathBuf;
71use std::sync::atomic::{AtomicU64, Ordering};
72use std::sync::Arc;
73use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
74use tokio::sync::{oneshot, Mutex};
75
76/// Default idle timeout: 1 hour (from config default)
77pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60 * 60);
78
79/// Get current timestamp in seconds since UNIX_EPOCH
80fn now_secs() -> u64 {
81    SystemTime::now()
82        .duration_since(UNIX_EPOCH)
83        .unwrap_or_default()
84        .as_secs()
85}
86
87/// Server startup options
88#[derive(Debug, Clone)]
89pub struct ServerOptions {
90    /// Use parallel initialization
91    pub parallel_init: bool,
92    /// Idle timeout (None = never timeout)
93    pub idle_timeout: Option<Duration>,
94    /// Enable file watching for automatic reload
95    pub watch: bool,
96    /// Debounce duration for file watching (ms)
97    pub watch_debounce_ms: u64,
98}
99
100impl Default for ServerOptions {
101    fn default() -> Self {
102        Self {
103            parallel_init: true,
104            idle_timeout: Some(DEFAULT_IDLE_TIMEOUT),
105            watch: false,
106            watch_debounce_ms: 500,
107        }
108    }
109}
110
111impl ServerOptions {
112    /// Load options from global config (~/.ryo/config.toml)
113    pub fn from_config() -> Self {
114        let config = GlobalConfig::load_global().unwrap_or_default();
115        Self {
116            parallel_init: config.server.parallel_init,
117            idle_timeout: config.server.idle_timeout_duration(),
118            watch: config.server.watch, // Respect config setting
119            watch_debounce_ms: config.server.watch_debounce_ms,
120        }
121    }
122
123    /// Enable file watching
124    pub fn with_watch(mut self, enabled: bool) -> Self {
125        self.watch = enabled;
126        self
127    }
128}
129
130/// Server wraps Api and implements RyoService
131///
132/// ## Locking Strategy: Single Mutex
133/// - All operations (read/write) acquire the same Mutex
134/// - Rationale: Write is high-frequency but short-duration (~100μs)
135/// - Lock contention is ~1% (100 writes/sec × 100μs = 10ms/sec)
136/// - RWLock overhead is unnecessary for this workload
137#[derive(Clone)]
138pub struct RyoServer {
139    /// Single Mutex for all operations
140    api: Arc<Mutex<Api>>,
141    /// Shutdown signal sender (shared via Arc for clone)
142    shutdown_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
143    /// Last activity timestamp (seconds since UNIX_EPOCH)
144    last_activity: Arc<AtomicU64>,
145}
146
147impl RyoServer {
148    /// Create a new RyoServer with the given Api and shutdown channel
149    pub fn new(api: Api, shutdown_tx: oneshot::Sender<()>) -> Self {
150        Self {
151            api: Arc::new(Mutex::new(api)),
152            shutdown_tx: Arc::new(Mutex::new(Some(shutdown_tx))),
153            last_activity: Arc::new(AtomicU64::new(now_secs())),
154        }
155    }
156
157    /// Update last activity timestamp (called on each RPC)
158    fn touch(&self) {
159        self.last_activity.store(now_secs(), Ordering::Relaxed);
160    }
161
162    /// Get seconds since last activity
163    pub fn idle_secs(&self) -> u64 {
164        now_secs().saturating_sub(self.last_activity.load(Ordering::Relaxed))
165    }
166
167    /// Reload the project (rebuild AnalysisContext)
168    ///
169    /// Called when files change to update the internal state.
170    /// Preserves the suggestion store across reloads.
171    pub async fn reload(&self, project_path: &std::path::Path) -> anyhow::Result<()> {
172        let start = Instant::now();
173        tracing::info!("Reloading project due to file changes...");
174
175        // Take suggestion store from old api before replacing
176        let mut api = self.api.lock().await;
177        let old_store = api.take_suggest_store();
178        let store_count = old_store.len();
179
180        // Build new context
181        let project = Project::load(project_path)?;
182        let context = AnalysisContext::from_workspace_root_parallel(project.workspace_root())
183            .map_err(|e| anyhow::anyhow!("Context rebuild failed: {}", e))?;
184        let new_api = Api::with_context(context, project, Box::new(InMemoryStorage::new()));
185
186        // Restore suggestion store to new api
187        new_api.restore_suggest_store(old_store);
188
189        // Replace the api
190        *api = new_api;
191
192        let status = api.status();
193        tracing::info!(
194            "Reloaded: {} symbols, {} files, {} suggestions preserved in {:.2}s",
195            status.symbols,
196            status.files,
197            store_count,
198            start.elapsed().as_secs_f64()
199        );
200
201        Ok(())
202    }
203}
204
205impl RyoService for RyoServer {
206    async fn ping(self, _: tarpc::context::Context) -> PingResponse {
207        self.touch();
208        PingResponse {
209            version: format!("{}-{}", env!("CARGO_PKG_VERSION"), env!("RYO_COMMIT_HASH")),
210        }
211    }
212
213    async fn status(self, _: tarpc::context::Context) -> StatusResponse {
214        self.touch();
215        let api = self.api.lock().await;
216        api.status()
217    }
218
219    async fn shutdown(self, _: tarpc::context::Context) {
220        // Save UUID mappings before shutdown for persistence
221        {
222            let api = self.api.lock().await;
223            if let Err(e) = api.save_uuid_mappings() {
224                eprintln!("Warning: Failed to save UUID mappings: {}", e);
225            }
226        }
227
228        // Graceful shutdown: signal the server loop to stop
229        if let Some(tx) = self.shutdown_tx.lock().await.take() {
230            let _ = tx.send(());
231        }
232    }
233
234    async fn discover(
235        self,
236        _: tarpc::context::Context,
237        req: DiscoverRequest,
238    ) -> Result<DiscoverResponse, RyoError> {
239        self.touch();
240        let mut api = self.api.lock().await;
241        api.discover(req).map_err(Into::into)
242    }
243
244    async fn overview(
245        self,
246        _: tarpc::context::Context,
247        req: OverviewRequest,
248    ) -> Result<OverviewResponse, RyoError> {
249        self.touch();
250        let api = self.api.lock().await;
251        api.overview(req).map_err(Into::into)
252    }
253
254    async fn run(
255        self,
256        _: tarpc::context::Context,
257        req: RunRequest,
258    ) -> Result<RunResponse, RyoError> {
259        tracing::info!(
260            "RPC run: intents={}, dry_run={}",
261            req.goal.intents.len(),
262            req.dry_run
263        );
264        self.touch();
265        let is_dry_run = req.dry_run;
266        let mut api = self.api.lock().await;
267        tracing::debug!("Acquired API lock, executing run...");
268        let response = api.run(req).map_err(|e| {
269            tracing::error!("Run failed: {:?}", e);
270            RyoError::from(e)
271        })?;
272        tracing::info!(
273            "Run completed: success={}, files_modified={}",
274            response.success,
275            response.files_modified
276        );
277
278        // Write modified files to disk (server's responsibility in server mode)
279        // INVARIANT: Only write when actual mutations occurred.
280        // total_changes == 0 means no AST was modified → no file should be touched.
281        if response.success
282            && !is_dry_run
283            && response.total_changes > 0
284            && !response.modified_files.is_empty()
285        {
286            for path in &response.modified_files {
287                if let Some(file) = api.project().get_file(path) {
288                    let source = match file.to_source() {
289                        Ok(s) => s,
290                        Err(e) => {
291                            tracing::error!("Failed to generate source for {:?}: {}", path, e);
292                            return Ok(RunResponse {
293                                success: false,
294                                error: Some(format!(
295                                    "Failed to generate source for {:?}: {}",
296                                    path, e
297                                )),
298                                ..response
299                            });
300                        }
301                    };
302                    // Use write_with_parents to create intermediate directories if needed
303                    if let Err(e) = write_with_parents(path, &source) {
304                        tracing::error!("Failed to write file {:?}: {}", path, e);
305                        return Ok(RunResponse {
306                            success: false,
307                            error: Some(format!("Failed to write file {:?}: {}", path, e)),
308                            ..response
309                        });
310                    }
311                    tracing::debug!("Wrote {} bytes to {:?}", source.len(), path);
312                }
313            }
314            tracing::info!("Wrote {} files to disk", response.modified_files.len());
315        }
316
317        Ok(response)
318    }
319
320    async fn cascade(
321        self,
322        _: tarpc::context::Context,
323        req: CascadeRequest,
324    ) -> Result<CascadeResponse, RyoError> {
325        self.touch();
326        let api = self.api.lock().await;
327        api.graph_cascade(req).map_err(Into::into)
328    }
329
330    async fn graph_summary(
331        self,
332        _: tarpc::context::Context,
333        req: GraphSummaryRequest,
334    ) -> Result<GraphSummaryResponse, RyoError> {
335        self.touch();
336        let api = self.api.lock().await;
337        api.graph_summary(req).map_err(Into::into)
338    }
339
340    async fn graph_type(
341        self,
342        _: tarpc::context::Context,
343        req: TypeAnalysisRequest,
344    ) -> Result<TypeAnalysisResponse, RyoError> {
345        self.touch();
346        let api = self.api.lock().await;
347        api.graph_type(req).map_err(Into::into)
348    }
349
350    async fn graph_flow(
351        self,
352        _: tarpc::context::Context,
353        req: FlowAnalysisRequest,
354    ) -> Result<FlowAnalysisResponse, RyoError> {
355        self.touch();
356        let api = self.api.lock().await;
357        api.graph_flow(req).map_err(Into::into)
358    }
359
360    async fn graph_borrow(
361        self,
362        _: tarpc::context::Context,
363        req: BorrowAnalysisRequest,
364    ) -> Result<BorrowAnalysisResponse, RyoError> {
365        self.touch();
366        let api = self.api.lock().await;
367        api.graph_borrow(req).map_err(Into::into)
368    }
369
370    async fn graph_lock(
371        self,
372        _: tarpc::context::Context,
373        req: LockAnalysisRequest,
374    ) -> Result<LockAnalysisResponse, RyoError> {
375        self.touch();
376        let api = self.api.lock().await;
377        api.graph_lock(req).map_err(Into::into)
378    }
379
380    async fn graph_chain(
381        self,
382        _: tarpc::context::Context,
383        req: ChainAnalysisRequest,
384    ) -> Result<ChainAnalysisResponse, RyoError> {
385        self.touch();
386        let api = self.api.lock().await;
387        api.graph_chain(req).map_err(Into::into)
388    }
389
390    async fn suggest(
391        self,
392        _: tarpc::context::Context,
393        req: SuggestRequest,
394    ) -> Result<SuggestResponse, RyoError> {
395        self.touch();
396        let api = self.api.lock().await;
397        api.suggest(req).map_err(Into::into)
398    }
399
400    async fn suggest_apply(
401        self,
402        _: tarpc::context::Context,
403        req: SuggestApplyRequest,
404    ) -> Result<SuggestApplyResponse, RyoError> {
405        self.touch();
406        let mut api = self.api.lock().await;
407        api.suggest_apply(req).map_err(Into::into)
408    }
409
410    async fn suggest_choices(
411        self,
412        _: tarpc::context::Context,
413        req: SuggestChoicesRequest,
414    ) -> Result<SuggestChoicesResponse, RyoError> {
415        self.touch();
416        let api = self.api.lock().await;
417        api.suggest_choices(req).map_err(Into::into)
418    }
419
420    async fn suggest_verify(
421        self,
422        _: tarpc::context::Context,
423        req: SuggestVerifyRequest,
424    ) -> Result<SuggestVerifyResponse, RyoError> {
425        self.touch();
426        let api = self.api.lock().await;
427        api.suggest_verify(req).map_err(Into::into)
428    }
429
430    async fn suggest_compare(
431        self,
432        _: tarpc::context::Context,
433        req: SuggestCompareRequest,
434    ) -> Result<SuggestCompareResponse, RyoError> {
435        self.touch();
436        let api = self.api.lock().await;
437        api.suggest_compare(req).map_err(Into::into)
438    }
439
440    async fn suggest_generate(
441        self,
442        _: tarpc::context::Context,
443        req: SuggestGenerateRequest,
444    ) -> Result<SuggestGenerateResponse, RyoError> {
445        self.touch();
446        let api = self.api.lock().await;
447        api.suggest_generate(req).map_err(Into::into)
448    }
449
450    async fn spec(
451        self,
452        _: tarpc::context::Context,
453        req: SpecRequest,
454    ) -> Result<SpecResponse, RyoError> {
455        self.touch();
456        let mut api = self.api.lock().await;
457        api.spec(req).map_err(Into::into)
458    }
459
460    async fn query_ryoql(
461        self,
462        _: tarpc::context::Context,
463        req: RyoqlRequest,
464    ) -> Result<QueryResponse, RyoError> {
465        self.touch();
466        let api = self.api.lock().await;
467        api.query_ryoql(req).map_err(Into::into)
468    }
469
470    async fn search_literal(
471        self,
472        _: tarpc::context::Context,
473        req: LiteralSearchRequest,
474    ) -> Result<LiteralSearchResponse, RyoError> {
475        self.touch();
476        let api = self.api.lock().await;
477        api.search_literal(req).map_err(Into::into)
478    }
479}
480
481/// Run the server on Unix Domain Socket with default options from config
482///
483/// # Arguments
484/// * `socket_path` - Path to the Unix socket file
485/// * `project_path` - Path to the project root
486///
487/// # Example
488/// ```ignore
489/// run_server("/tmp/ryo.sock".into(), "/path/to/project".into()).await?;
490/// ```
491pub async fn run_server(socket_path: PathBuf, project_path: PathBuf) -> anyhow::Result<()> {
492    let opts = ServerOptions::from_config();
493    run_server_with_options(socket_path, project_path, opts).await
494}
495
496/// Run the server on Unix Domain Socket with configurable idle timeout (legacy)
497///
498/// # Arguments
499/// * `socket_path` - Path to the Unix socket file
500/// * `project_path` - Path to the project root
501/// * `idle_timeout` - Optional idle timeout (None = no timeout)
502pub async fn run_server_with_timeout(
503    socket_path: PathBuf,
504    project_path: PathBuf,
505    idle_timeout: Option<Duration>,
506) -> anyhow::Result<()> {
507    let config = GlobalConfig::load_global().unwrap_or_default();
508    let opts = ServerOptions {
509        parallel_init: config.server.parallel_init,
510        idle_timeout,
511        watch: false,
512        watch_debounce_ms: 500,
513    };
514    run_server_with_options(socket_path, project_path, opts).await
515}
516
517/// Run the server on Unix Domain Socket with full options
518///
519/// # Arguments
520/// * `socket_path` - Path to the Unix socket file
521/// * `project_path` - Path to the project root
522/// * `options` - Server options including parallel_init and idle_timeout
523pub async fn run_server_with_options(
524    socket_path: PathBuf,
525    project_path: PathBuf,
526    options: ServerOptions,
527) -> anyhow::Result<()> {
528    use futures::StreamExt;
529    use tarpc::server::{self, Channel};
530    use tokio::net::UnixListener;
531
532    let start = Instant::now();
533
534    tracing::info!("━━━ RYO Server Starting ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
535    tracing::info!("   Project: {:?}", project_path);
536    tracing::info!("   Socket:  {:?}", socket_path);
537    tracing::info!("   Parallel init: {}", options.parallel_init);
538    tracing::info!(
539        "   Watch mode:    {}",
540        if options.watch { "enabled" } else { "disabled" }
541    );
542    if let Some(timeout) = options.idle_timeout {
543        tracing::info!("   Idle timeout:  {}s", timeout.as_secs());
544    } else {
545        tracing::info!("   Idle timeout:  disabled (daemon mode)");
546    }
547
548    // Initialize Api with parallel or sequential loading
549    tracing::info!("   Loading project...");
550
551    let api = if options.parallel_init {
552        // Use parallel initialization for faster startup
553        let project = Project::load(&project_path)?;
554        let context = AnalysisContext::from_workspace_root_parallel(project.workspace_root())
555            .map_err(|e| anyhow::anyhow!("Context build failed: {}", e))?;
556        Api::with_context(context, project, Box::new(InMemoryStorage::new()))
557    } else {
558        // Sequential initialization (legacy)
559        Api::from_path(&project_path)?
560    };
561
562    let status = api.status();
563    let load_time = start.elapsed();
564    tracing::info!(
565        "   Loaded: {} symbols, {} files in {:.2}s",
566        status.symbols,
567        status.files,
568        load_time.as_secs_f64()
569    );
570
571    // Setup graceful shutdown
572    let (shutdown_tx, shutdown_rx) = oneshot::channel();
573    let server = RyoServer::new(api, shutdown_tx);
574
575    // Cleanup stale socket file if exists
576    let _ = std::fs::remove_file(&socket_path);
577    let listener = UnixListener::bind(&socket_path)?;
578
579    tracing::info!("━━━ Server Ready ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
580    tracing::info!("   Listening on {:?}", socket_path);
581
582    // Setup file watcher if enabled
583    let mut file_watcher = if options.watch {
584        let config = watcher::WatcherConfig {
585            debounce: Duration::from_millis(options.watch_debounce_ms),
586            ..Default::default()
587        };
588        match watcher::FileWatcher::new(&project_path, config) {
589            Ok(w) => {
590                tracing::info!("   File watcher started");
591                Some(w)
592            }
593            Err(e) => {
594                tracing::warn!("   Failed to start file watcher: {}", e);
595                None
596            }
597        }
598    } else {
599        None
600    };
601
602    // Idle timeout checker (runs every 60 seconds)
603    let server_for_idle = server.clone();
604    let idle_timeout = options.idle_timeout;
605    let idle_check = async move {
606        if let Some(timeout) = idle_timeout {
607            let check_interval = Duration::from_secs(60);
608            loop {
609                tokio::time::sleep(check_interval).await;
610                let idle = server_for_idle.idle_secs();
611                tracing::debug!(
612                    "Idle check: {} secs (timeout: {} secs)",
613                    idle,
614                    timeout.as_secs()
615                );
616                if idle >= timeout.as_secs() {
617                    tracing::info!("Idle timeout reached ({} secs), shutting down", idle);
618                    // Save UUID mappings before shutdown
619                    {
620                        let api = server_for_idle.api.lock().await;
621                        if let Err(e) = api.save_uuid_mappings() {
622                            tracing::warn!("Failed to save UUID mappings on idle timeout: {}", e);
623                        }
624                    }
625                    // Trigger shutdown
626                    if let Some(tx) = server_for_idle.shutdown_tx.lock().await.take() {
627                        let _ = tx.send(());
628                    }
629                    break;
630                }
631            }
632        } else {
633            // No timeout - wait forever (daemon mode)
634            tracing::info!("Daemon mode: no idle timeout");
635            std::future::pending::<()>().await;
636        }
637    };
638
639    // File watcher event handler
640    let server_for_watch = server.clone();
641    let project_path_for_watch = project_path.clone();
642    let watch_handler = async move {
643        if let Some(ref mut watcher) = file_watcher {
644            loop {
645                match watcher.recv().await {
646                    Some(watcher::WatchEvent::FilesChanged(paths)) => {
647                        tracing::info!("Files changed: {:?}", paths.len());
648                        if let Err(e) = server_for_watch.reload(&project_path_for_watch).await {
649                            tracing::error!("Reload failed: {}", e);
650                        }
651                    }
652                    Some(watcher::WatchEvent::Error(e)) => {
653                        tracing::warn!("Watch error: {}", e);
654                    }
655                    None => {
656                        tracing::debug!("Watcher channel closed");
657                        break;
658                    }
659                }
660            }
661        } else {
662            // No watcher, wait forever
663            std::future::pending::<()>().await;
664        }
665    };
666
667    // Accept connections until shutdown signal or idle timeout
668    tokio::select! {
669        _ = async {
670            loop {
671                match listener.accept().await {
672                    Ok((stream, _)) => {
673                        tracing::debug!("Client connected");
674                        // Use create_server_transport for proper MessagePack serialization
675                        // (compatible with skip_serializing_if attributes)
676                        let transport = ryo_app::codec::create_server_transport(stream);
677
678                        let channel = server::BaseChannel::with_defaults(transport);
679                        let server_clone = server.clone();
680                        tokio::spawn(async move {
681                            channel
682                                .execute(server_clone.serve())
683                                .for_each(|response| async move {
684                                    tokio::spawn(response);
685                                })
686                                .await;
687                        });
688                    }
689                    Err(e) => {
690                        tracing::error!("accept error: {}", e);
691                    }
692                }
693            }
694        } => {}
695        _ = idle_check => {}
696        _ = watch_handler => {}
697        _ = shutdown_rx => {
698            tracing::info!("Shutdown signal received");
699        }
700    }
701
702    // Cleanup
703    let _ = std::fs::remove_file(&socket_path);
704    tracing::info!("Server stopped");
705    Ok(())
706}
707
708// ============================================================================
709// Test Harness Module
710// ============================================================================
711
712/// Test harness for API integration testing.
713///
714/// Provides utilities for testing Request/Response serialization and
715/// in-process tarpc server/client communication without UDS.
716#[cfg(test)]
717pub mod test_harness {
718    use super::*;
719    use ryo_app::api::{
720        CascadeRequest, CascadeResponse, DiscoverRequest, DiscoverResponse, RunRequest,
721        RunResponse, StatusResponse, SuggestRequest,
722    };
723    use ryo_app::service::RyoError;
724    use ryo_app::{ConflictStrategy, Goal, IdentKind, Intent};
725
726    /// Test helper: Create a minimal Api instance for testing
727    pub fn create_test_api() -> Api {
728        use ryo_app::InMemoryStorage;
729        let storage = Box::new(InMemoryStorage::new());
730        Api::new(storage)
731    }
732
733    /// Test helper: Create a RyoServer with test Api
734    pub fn create_test_server() -> (RyoServer, oneshot::Receiver<()>) {
735        let api = create_test_api();
736        let (tx, rx) = oneshot::channel();
737        (RyoServer::new(api, tx), rx)
738    }
739
740    // ========================================================================
741    // MessagePack Serialization Tests (used by tarpc transport)
742    // ========================================================================
743
744    /// Roundtrip test helper for MessagePack serialization
745    fn msgpack_roundtrip<T: serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug>(
746        value: &T,
747        type_name: &str,
748    ) {
749        // Use to_vec_named for map-based serialization (compatible with skip_serializing_if)
750        let encoded = rmp_serde::to_vec_named(value)
751            .unwrap_or_else(|e| panic!("Failed to serialize {}: {}", type_name, e));
752        let _decoded: T = rmp_serde::from_slice(&encoded)
753            .unwrap_or_else(|e| panic!("Failed to deserialize {}: {}", type_name, e));
754    }
755
756    #[test]
757    fn test_msgpack_discover_request() {
758        let req = DiscoverRequest {
759            pattern: "*Config*".to_string(),
760            kind: None,
761            sort: None,
762            limit: Some(10),
763            view: None,
764            is_async: None,
765            is_unsafe: None,
766            scope_path: None,
767            ignore_case: false,
768            ignore_word_separate: false,
769            attr: None,
770            is_id: false,
771        };
772        msgpack_roundtrip(&req, "DiscoverRequest");
773    }
774
775    #[test]
776    fn test_msgpack_discover_response() {
777        let resp = DiscoverResponse {
778            status: "found".to_string(),
779            symbols: vec![],
780            total: 0,
781            elapsed_ms: 42,
782            hint: None,
783        };
784        msgpack_roundtrip(&resp, "DiscoverResponse");
785    }
786
787    #[test]
788    fn test_msgpack_status_response() {
789        let resp = StatusResponse {
790            project: std::path::PathBuf::from("/test/project"),
791            symbols: 100,
792            files: 10,
793        };
794        msgpack_roundtrip(&resp, "StatusResponse");
795    }
796
797    #[test]
798    fn test_msgpack_suggest_request() {
799        let req = SuggestRequest {
800            pattern_filter: Some("*Handler*".to_string()),
801            high_impact: true,
802            quick: false,
803            scan: false,
804            precheck: false,
805            exclude_rules: vec![],
806            enhanced: false,
807            scope_filter: vec![],
808        };
809        msgpack_roundtrip(&req, "SuggestRequest");
810    }
811
812    #[test]
813    fn test_msgpack_cascade_request() {
814        let req = CascadeRequest {
815            id: "1v1".to_string(),
816            uuid: None,
817            depth: Some(3),
818        };
819        msgpack_roundtrip(&req, "CascadeRequest");
820    }
821
822    #[test]
823    fn test_msgpack_cascade_response() {
824        let resp = CascadeResponse {
825            display_name: "test_symbol".to_string(),
826            callers: vec!["foo".to_string(), "bar".to_string()],
827            users: vec!["baz".to_string()],
828            match_functions: vec![],
829            containing_types: vec![],
830        };
831        msgpack_roundtrip(&resp, "CascadeResponse");
832    }
833
834    #[test]
835    fn test_msgpack_ryo_error() {
836        let err = RyoError::NotFound {
837            name: "test".to_string(),
838        };
839        msgpack_roundtrip(&err, "RyoError::NotFound");
840
841        let err = RyoError::ParseError {
842            message: "syntax error".to_string(),
843        };
844        msgpack_roundtrip(&err, "RyoError::ParseError");
845
846        let err = RyoError::InvalidRequest {
847            message: "bad input".to_string(),
848        };
849        msgpack_roundtrip(&err, "RyoError::InvalidRequest");
850
851        let err = RyoError::Internal {
852            message: "crash".to_string(),
853        };
854        msgpack_roundtrip(&err, "RyoError::Internal");
855    }
856
857    #[test]
858    fn test_msgpack_goal() {
859        let goal = Goal::new(
860            "rename foo to bar",
861            Intent::RenameIdent {
862                symbol_id: None,
863                symbol_path: None,
864                target_ident: Some("foo".to_string()),
865                to: "bar".to_string(),
866                kind: IdentKind::Any,
867            },
868        );
869        msgpack_roundtrip(&goal, "Goal");
870    }
871
872    #[test]
873    fn test_msgpack_run_request() {
874        let goal = Goal::new(
875            "rename foo to bar",
876            Intent::RenameIdent {
877                symbol_id: None,
878                symbol_path: None,
879                target_ident: Some("foo".to_string()),
880                to: "bar".to_string(),
881                kind: IdentKind::Any,
882            },
883        );
884        let req = RunRequest {
885            goal,
886            dry_run: true,
887            check_syntax: false,
888        };
889        msgpack_roundtrip(&req, "RunRequest");
890    }
891
892    #[test]
893    fn test_msgpack_run_response() {
894        let resp = RunResponse {
895            success: true,
896            files_modified: 2,
897            total_changes: 5,
898            modified_files: vec![
899                std::path::PathBuf::from("/test/file1.rs"),
900                std::path::PathBuf::from("/test/file2.rs"),
901            ],
902            conflicts: vec![],
903            syntax_errors: vec![],
904            error: None,
905        };
906        msgpack_roundtrip(&resp, "RunResponse");
907    }
908
909    #[test]
910    fn test_msgpack_intent_variants() {
911        // Test various Intent variants with 3-field specification
912        let intents = [
913            Intent::RenameIdent {
914                symbol_id: None,
915                symbol_path: None,
916                target_ident: Some("old".to_string()),
917                to: "new".to_string(),
918                kind: IdentKind::Fn,
919            },
920            Intent::RenameIdent {
921                symbol_id: None,
922                symbol_path: None,
923                target_ident: Some("old_struct".to_string()),
924                to: "new_struct".to_string(),
925                kind: IdentKind::Type,
926            },
927            Intent::RenameIdent {
928                symbol_id: None,
929                symbol_path: Some("crate::config".to_string()),
930                target_ident: None,
931                to: "ConfigNew".to_string(),
932                kind: IdentKind::Any,
933            },
934        ];
935
936        for (i, intent) in intents.iter().enumerate() {
937            msgpack_roundtrip(intent, &format!("Intent variant {}", i));
938        }
939    }
940
941    #[test]
942    fn test_msgpack_conflict_strategy() {
943        let strategies = [
944            ConflictStrategy::Fail,
945            ConflictStrategy::IntentOrder,
946            ConflictStrategy::ParallelOnly,
947        ];
948
949        for (i, strategy) in strategies.iter().enumerate() {
950            msgpack_roundtrip(strategy, &format!("ConflictStrategy variant {}", i));
951        }
952    }
953
954    // ========================================================================
955    // In-Process tarpc Tests (no UDS)
956    // ========================================================================
957
958    #[tokio::test]
959    async fn test_harness_status_via_trait() {
960        use ryo_app::service::RyoService;
961
962        let (server, _rx) = create_test_server();
963        let ctx = tarpc::context::current();
964        let status = server.status(ctx).await;
965
966        let _ = status.symbols; // verify status fields are accessible
967        let _ = status.files;
968    }
969
970    #[tokio::test]
971    async fn test_harness_discover_via_trait() {
972        use ryo_app::service::RyoService;
973
974        let (server, _rx) = create_test_server();
975        let ctx = tarpc::context::current();
976        let req = DiscoverRequest {
977            pattern: "*".to_string(),
978            ..Default::default()
979        };
980        let result = server.discover(ctx, req).await;
981
982        assert!(result.is_ok());
983        let resp = result.unwrap();
984        let _ = resp.elapsed_ms; // verify elapsed_ms field is accessible
985    }
986
987    #[tokio::test]
988    async fn test_harness_cascade_via_trait() {
989        use ryo_app::service::RyoService;
990
991        let (server, _rx) = create_test_server();
992        let ctx = tarpc::context::current();
993        // Use a very large index that definitely doesn't exist
994        let req = CascadeRequest {
995            id: "9999999v1".to_string(),
996            uuid: None,
997            depth: Some(2),
998        };
999        let result = server.cascade(ctx, req).await;
1000
1001        // Should return NotFound for nonexistent symbol
1002        assert!(result.is_err());
1003        match result.unwrap_err() {
1004            RyoError::NotFound { name } => assert_eq!(name, "'9999999v1'"),
1005            other => panic!("Expected NotFound, got {:?}", other),
1006        }
1007    }
1008
1009    #[tokio::test]
1010    async fn test_harness_suggest_via_trait() {
1011        use ryo_app::service::RyoService;
1012
1013        let (server, _rx) = create_test_server();
1014        let ctx = tarpc::context::current();
1015        let req = SuggestRequest::default();
1016        let result = server.suggest(ctx, req).await;
1017
1018        assert!(result.is_ok());
1019    }
1020
1021    #[tokio::test]
1022    async fn test_harness_ping_via_trait() {
1023        use ryo_app::service::RyoService;
1024
1025        let (server, _rx) = create_test_server();
1026        let ctx = tarpc::context::current();
1027        // ping returns () so just verify no panic
1028        server.ping(ctx).await;
1029    }
1030
1031    #[tokio::test]
1032    async fn test_harness_shutdown_via_trait() {
1033        use ryo_app::service::RyoService;
1034
1035        let (server, rx) = create_test_server();
1036        let ctx = tarpc::context::current();
1037
1038        // Shutdown should send signal through the channel
1039        server.shutdown(ctx).await;
1040
1041        // The receiver should be notified
1042        // Use a timeout to avoid hanging
1043        let result = tokio::time::timeout(std::time::Duration::from_millis(100), rx).await;
1044
1045        assert!(result.is_ok(), "Shutdown signal should be received");
1046    }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051    use super::*;
1052
1053    #[tokio::test]
1054    async fn test_server_creation() {
1055        use ryo_app::InMemoryStorage;
1056        use tokio::sync::oneshot;
1057
1058        let storage = Box::new(InMemoryStorage::new());
1059        let api = Api::new(storage);
1060        let (tx, _rx) = oneshot::channel();
1061        let _server = RyoServer::new(api, tx);
1062        // Just verify no panic
1063    }
1064
1065    #[test]
1066    fn test_server_options_default() {
1067        let opts = ServerOptions::default();
1068        assert!(opts.parallel_init);
1069        assert!(opts.idle_timeout.is_some());
1070    }
1071
1072    #[tokio::test]
1073    async fn test_status_returns_non_empty_project_path() {
1074        use ryo_app::service::RyoService;
1075        use ryo_app::InMemoryStorage;
1076        use tokio::sync::oneshot;
1077
1078        // Api::new uses current directory as project root
1079        let storage = Box::new(InMemoryStorage::new());
1080        let api = Api::new(storage);
1081        let (tx, _rx) = oneshot::channel();
1082        let server = RyoServer::new(api, tx);
1083
1084        // Call status and verify project path is set
1085        let ctx = tarpc::context::current();
1086        let status = server.status(ctx).await;
1087
1088        // Project path should not be empty
1089        assert!(
1090            !status.project.as_os_str().is_empty(),
1091            "Project path should not be empty"
1092        );
1093        // Project path should exist (current directory)
1094        assert!(
1095            status.project.exists(),
1096            "Project path should exist: {:?}",
1097            status.project
1098        );
1099    }
1100
1101    #[tokio::test]
1102    async fn test_ping_updates_last_activity() {
1103        use ryo_app::service::RyoService;
1104        use ryo_app::InMemoryStorage;
1105        use tokio::sync::oneshot;
1106
1107        let storage = Box::new(InMemoryStorage::new());
1108        let api = Api::new(storage);
1109        let (tx, _rx) = oneshot::channel();
1110        let server = RyoServer::new(api, tx);
1111
1112        // Record initial idle time
1113        let _initial_idle = server.idle_secs();
1114
1115        // Wait a bit
1116        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1117
1118        // Ping should update last_activity
1119        let ctx = tarpc::context::current();
1120        server.clone().ping(ctx).await;
1121
1122        // After ping, idle time should be reset (close to 0)
1123        let after_ping_idle = server.idle_secs();
1124        assert!(
1125            after_ping_idle <= 1,
1126            "Idle time after ping should be ~0, got {}",
1127            after_ping_idle
1128        );
1129    }
1130
1131    #[tokio::test]
1132    async fn test_status_updates_last_activity() {
1133        use ryo_app::service::RyoService;
1134        use ryo_app::InMemoryStorage;
1135        use tokio::sync::oneshot;
1136
1137        let storage = Box::new(InMemoryStorage::new());
1138        let api = Api::new(storage);
1139        let (tx, _rx) = oneshot::channel();
1140        let server = RyoServer::new(api, tx);
1141
1142        // Wait a bit
1143        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1144
1145        // Status should update last_activity
1146        let ctx = tarpc::context::current();
1147        let _ = server.clone().status(ctx).await;
1148
1149        // After status, idle time should be reset (close to 0)
1150        let after_status_idle = server.idle_secs();
1151        assert!(
1152            after_status_idle <= 1,
1153            "Idle time after status should be ~0, got {}",
1154            after_status_idle
1155        );
1156    }
1157}