Skip to main content

reovim_server/
server.rs

1//! Server - the main entry point for running reovim server.
2
3use std::sync::Arc;
4
5use reovim_kernel::api::v1::ServiceRegistry;
6
7use crate::{
8    ServerConfig, TransportMode,
9    session::{Session, SessionId, SessionRegistry, SessionState, TokenRegistry},
10};
11
12#[cfg(feature = "grpc")]
13use {
14    crate::grpc::{
15        AuthInterceptor, BufferServiceImpl, CommandServiceImpl, DebugServiceImpl,
16        EditorServiceImpl, ExtensionServiceImpl, InputServiceImpl, ModuleServiceImpl,
17        NotificationServiceImpl, PresenceServiceImpl, ServerServiceImpl, StateServiceImpl,
18        SyntaxServiceImpl,
19    },
20    reovim_driver_session::bridges::BridgeRegistry,
21    reovim_protocol::v2::{
22        buffer_service_server::BufferServiceServer, command_service_server::CommandServiceServer,
23        debug_service_server::DebugServiceServer, editor_service_server::EditorServiceServer,
24        extension_service_server::ExtensionServiceServer, input_service_server::InputServiceServer,
25        module_service_server::ModuleServiceServer,
26        notification_service_server::NotificationServiceServer,
27        presence_service_server::PresenceServiceServer, server_service_server::ServerServiceServer,
28        state_service_server::StateServiceServer, syntax_service_server::SyntaxServiceServer,
29    },
30};
31
32/// Session factory function type.
33///
34/// Creates a `SessionState` for new sessions. This allows the runner to inject
35/// module-initialized registries into sessions.
36pub type SessionFactory = Box<dyn Fn() -> SessionState + Send + Sync>;
37
38/// The reovim server.
39///
40/// Manages sessions and handles client connections via the configured transport.
41pub struct Server {
42    /// Server configuration.
43    config: ServerConfig,
44
45    /// Registry of active sessions.
46    sessions: Arc<SessionRegistry>,
47
48    /// Token registry for session-based authentication (#483).
49    ///
50    /// Maps session tokens to client IDs. Shared with the gRPC interceptor
51    /// and presence service.
52    tokens: Arc<TokenRegistry>,
53
54    /// Optional service registry populated by modules.
55    ///
56    /// When set, sessions created by the server will use services from this registry
57    /// (resolvers, command handlers, keybindings, etc.).
58    ///
59    /// Note: Currently unused - will be used when we add service-based session creation.
60    #[allow(dead_code)]
61    services: Option<Arc<ServiceRegistry>>,
62
63    /// Optional session factory for creating sessions with custom state.
64    ///
65    /// If provided, this factory is used to create `SessionState` for new sessions.
66    /// This enables the runner to inject module-initialized registries.
67    session_factory: Option<SessionFactory>,
68
69    /// Extension bridge registry for gRPC notification emission (#468).
70    ///
71    /// Bridges are collected from `BridgeProvider` in bootstrap.
72    /// Defaults to empty registry (no extension notifications).
73    #[cfg(feature = "grpc")]
74    bridge_registry: Arc<BridgeRegistry>,
75}
76
77impl Server {
78    /// Create a new server with the given configuration.
79    ///
80    /// This creates a server with empty registries. For full vim functionality,
81    /// use [`Server::with_services`] or [`Server::with_session_factory`] to
82    /// inject module-initialized registries.
83    #[must_use]
84    pub fn new(config: ServerConfig) -> Self {
85        Self {
86            config,
87            sessions: Arc::new(SessionRegistry::new()),
88            tokens: Arc::new(TokenRegistry::new()),
89            services: None,
90            session_factory: None,
91            #[cfg(feature = "grpc")]
92            bridge_registry: Arc::new(BridgeRegistry::default()),
93        }
94    }
95
96    /// Create a server with a service registry populated by modules.
97    ///
98    /// The service registry should contain:
99    /// - `ResolverRegistry` - mode key resolvers
100    /// - `KeybindingStore` - keybindings
101    /// - `CommandHandlerStore` - command handlers
102    /// - `ModeInfoStore` - mode metadata
103    ///
104    /// # Example
105    ///
106    /// ```ignore
107    /// use reovim_server::{Server, ServerConfig};
108    /// use reovim_kernel::api::v1::ServiceRegistry;
109    ///
110    /// // Bootstrap: load modules and populate services
111    /// let services = Arc::new(ServiceRegistry::new());
112    /// bootstrap_modules(&services);
113    ///
114    /// let server = Server::with_services(ServerConfig::default(), services);
115    /// server.run().await?;
116    /// ```
117    #[must_use]
118    pub fn with_services(config: ServerConfig, services: Arc<ServiceRegistry>) -> Self {
119        Self {
120            config,
121            sessions: Arc::new(SessionRegistry::new()),
122            tokens: Arc::new(TokenRegistry::new()),
123            services: Some(services),
124            session_factory: None,
125            #[cfg(feature = "grpc")]
126            bridge_registry: Arc::new(BridgeRegistry::default()),
127        }
128    }
129
130    /// Create a server with a custom session factory.
131    ///
132    /// The factory function is called each time a new session is created,
133    /// allowing the runner to inject fully-configured `SessionState` instances
134    /// with module-initialized registries.
135    ///
136    /// This is the most flexible option for module integration.
137    ///
138    /// # Example
139    ///
140    /// ```ignore
141    /// use reovim_server::{Server, ServerConfig, SessionState};
142    ///
143    /// let server = Server::with_session_factory(
144    ///     ServerConfig::default(),
145    ///     Box::new(|| {
146    ///         // Create session state with populated registries
147    ///         create_session_state_with_modules()
148    ///     }),
149    /// );
150    /// server.run().await?;
151    /// ```
152    #[must_use]
153    pub fn with_session_factory(config: ServerConfig, factory: SessionFactory) -> Self {
154        Self {
155            config,
156            sessions: Arc::new(SessionRegistry::new()),
157            tokens: Arc::new(TokenRegistry::new()),
158            services: None,
159            session_factory: Some(factory),
160            #[cfg(feature = "grpc")]
161            bridge_registry: Arc::new(BridgeRegistry::default()),
162        }
163    }
164
165    /// Set the extension bridge registry (#468).
166    ///
167    /// Bridges are collected from `BridgeProvider` in bootstrap.
168    /// Must be called before `run()`.
169    #[cfg(feature = "grpc")]
170    #[must_use]
171    pub fn with_bridges(mut self, registry: BridgeRegistry) -> Self {
172        self.bridge_registry = Arc::new(registry);
173        self
174    }
175
176    /// Create a session state using the configured factory or default.
177    #[allow(clippy::option_if_let_else)] // More readable with if-let
178    fn create_session_state(&self) -> SessionState {
179        if let Some(factory) = &self.session_factory {
180            factory()
181        } else {
182            SessionState::default()
183        }
184    }
185
186    /// Run the server.
187    ///
188    /// This method blocks until the server is shut down.
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if the transport fails to start (e.g., port in use).
193    #[cfg_attr(coverage_nightly, coverage(off))]
194    pub async fn run(&self) -> std::io::Result<()> {
195        // Create the default session with module-initialized state
196        let session_state = self.create_session_state();
197        let default_session = Arc::new(Session::from_state(
198            SessionId::new(&*self.config.default_session_name),
199            session_state,
200        ));
201        self.sessions.insert(&default_session);
202
203        tracing::info!(
204            session = %self.config.default_session_name,
205            "Created default session"
206        );
207
208        // Start the appropriate transport
209        match &self.config.transport {
210            TransportMode::TcpWithFallback => self.run_tcp_fallback().await,
211            TransportMode::Tcp { port } => self.run_tcp(*port).await,
212            #[cfg(unix)]
213            TransportMode::UnixSocket { path } => self.run_unix(path).await,
214            TransportMode::Grpc { port } => self.run_grpc(*port, None, None).await,
215        }
216    }
217
218    /// Run with TCP transport, trying ports 12540-12549.
219    #[cfg_attr(coverage_nightly, coverage(off))]
220    async fn run_tcp_fallback(&self) -> std::io::Result<()> {
221        for port in 12540..=12549 {
222            match self.run_tcp(port).await {
223                Ok(()) => return Ok(()),
224                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
225                    tracing::debug!(port, "Port in use, trying next");
226                }
227                Err(e) => return Err(e),
228            }
229        }
230        Err(std::io::Error::new(
231            std::io::ErrorKind::AddrInUse,
232            "All ports 12540-12549 are in use",
233        ))
234    }
235
236    /// Run with TCP transport on a specific port.
237    #[cfg_attr(coverage_nightly, coverage(off))]
238    async fn run_tcp(&self, port: u16) -> std::io::Result<()> {
239        tracing::info!(port, "Starting TCP server (JSON-RPC not implemented yet)");
240        // TODO: Implement JSON-RPC server
241        // For now, just wait forever
242        std::future::pending::<()>().await;
243        Ok(())
244    }
245
246    /// Run with Unix socket transport.
247    #[cfg(unix)]
248    #[cfg_attr(coverage_nightly, coverage(off))]
249    async fn run_unix(&self, path: &std::path::Path) -> std::io::Result<()> {
250        tracing::info!(path = %path.display(), "Starting Unix socket server");
251        // TODO: Implement Unix socket server
252        std::future::pending::<()>().await;
253        Ok(())
254    }
255
256    /// Run with gRPC transport.
257    ///
258    /// When `shutdown` is `Some`, the server will stop when the future resolves.
259    /// When `port_tx` is `Some`, the bound port is sent before serving starts.
260    #[allow(clippy::too_many_lines)] // Service wiring is inherently verbose
261    #[cfg_attr(coverage_nightly, coverage(off))]
262    async fn run_grpc(
263        &self,
264        port: u16,
265        shutdown: Option<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
266        port_tx: Option<tokio::sync::oneshot::Sender<u16>>,
267    ) -> std::io::Result<()> {
268        // TODO: Make bind address configurable (currently 0.0.0.0 for dev testing)
269        let addr: std::net::SocketAddr = format!("0.0.0.0:{port}")
270            .parse()
271            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
272
273        // Bind first to get actual port (handles port 0 for testing)
274        let listener = tokio::net::TcpListener::bind(addr).await?;
275        let local_addr = listener.local_addr()?;
276
277        tracing::info!(address = %local_addr, "Starting gRPC server");
278        // Output for test harness (expects exact format)
279        eprintln!("Listening on 127.0.0.1:{}", local_addr.port());
280
281        // Report port to caller if requested (for integrated mode with OS-assigned port)
282        if let Some(tx) = port_tx {
283            let _ = tx.send(local_addr.port());
284        }
285
286        let default_session_id = SessionId::new(&*self.config.default_session_name);
287
288        // Auth interceptor: resolves x-reovim-token → ClientId (#483)
289        let interceptor = AuthInterceptor::new(Arc::clone(&self.tokens));
290
291        // Extension bridge registry (#514/#468) — shared between InputService and ExtensionService.
292        // Bridges are now collected from BridgeProvider by bootstrap, not hardcoded here.
293        let bridges = Arc::clone(&self.bridge_registry);
294
295        // Tick scheduler for server-driven state advancement (#546).
296        // Modules call TickSchedulerHandle.start() to begin periodic ticking.
297        {
298            use reovim_driver_session::TickSchedulerHandle;
299
300            let tick_scheduler = Arc::new(crate::tick::TokioTickScheduler::new(
301                Arc::clone(&self.sessions),
302                default_session_id.clone(),
303                Arc::clone(&bridges),
304            ));
305
306            if let Some(session) = self.sessions.get(&default_session_id) {
307                session.with_state_mut_sync(|state| {
308                    let handle = state.app.services.get_or_create::<TickSchedulerHandle>();
309                    handle
310                        .set(tick_scheduler as Arc<dyn reovim_driver_session::tick::TickScheduler>);
311                });
312            }
313        }
314
315        // Create all gRPC services
316        let buffer_service =
317            BufferServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
318        let editor_service =
319            EditorServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
320        let input_service = InputServiceImpl::new(
321            Arc::clone(&self.sessions),
322            default_session_id.clone(),
323            Arc::clone(&bridges),
324        );
325        let state_service =
326            StateServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
327        let server_service =
328            ServerServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
329        let notification_service = NotificationServiceImpl::new(
330            Arc::clone(&self.sessions),
331            default_session_id.clone(),
332            Arc::clone(&self.tokens),
333        );
334
335        // ModuleService is a stub - full implementation is in runner
336        let module_service = ModuleServiceImpl::new();
337
338        // SyntaxService provides token data for syntax highlighting
339        let syntax_service =
340            SyntaxServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
341
342        // PresenceService for multi-client awareness (Phase 14)
343        let presence_service = PresenceServiceImpl::new(
344            Arc::clone(&self.sessions),
345            default_session_id.clone(),
346            Arc::clone(&self.tokens),
347        );
348
349        // CommandService for command completion (#453)
350        let command_service =
351            CommandServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
352
353        // ExtensionService for querying extension state (#514)
354        let extension_service = ExtensionServiceImpl::new(
355            Arc::clone(&self.sessions),
356            default_session_id.clone(),
357            bridges,
358        );
359
360        // DebugService for CLI client-targeting operations (#468)
361        let debug_service = DebugServiceImpl::with_sessions(
362            Arc::clone(&self.sessions),
363            default_session_id,
364            Arc::clone(&self.bridge_registry),
365        );
366
367        // Build gRPC server with optional gRPC-Web support
368        #[cfg(feature = "grpc-web")]
369        {
370            use tower_http::cors::{Any, CorsLayer};
371
372            tracing::info!("gRPC-Web support enabled (HTTP/1.1 + CORS)");
373
374            // CORS layer for browser access (permissive for development)
375            // TODO (Phase 9+): Production CORS with configurable allowed origins
376            let cors = CorsLayer::new()
377                .allow_origin(Any)
378                .allow_headers(Any)
379                .allow_methods(Any)
380                .expose_headers(Any);
381
382            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
383            let i = &interceptor;
384            let router = tonic::transport::Server::builder()
385                .accept_http1(true) // Required for gRPC-Web
386                .layer(cors)
387                .layer(tonic_web::GrpcWebLayer::new())
388                .add_service(BufferServiceServer::with_interceptor(buffer_service, i.clone()))
389                .add_service(EditorServiceServer::with_interceptor(editor_service, i.clone()))
390                .add_service(InputServiceServer::with_interceptor(input_service, i.clone()))
391                .add_service(ModuleServiceServer::with_interceptor(module_service, i.clone()))
392                .add_service(StateServiceServer::with_interceptor(state_service, i.clone()))
393                .add_service(ServerServiceServer::with_interceptor(server_service, i.clone()))
394                .add_service(NotificationServiceServer::with_interceptor(
395                    notification_service,
396                    i.clone(),
397                ))
398                .add_service(SyntaxServiceServer::with_interceptor(syntax_service, i.clone()))
399                .add_service(PresenceServiceServer::with_interceptor(presence_service, i.clone()))
400                .add_service(ExtensionServiceServer::with_interceptor(extension_service, i.clone()))
401                .add_service(CommandServiceServer::with_interceptor(command_service, i.clone()))
402                .add_service(DebugServiceServer::with_interceptor(debug_service, i.clone()));
403
404            if let Some(signal) = shutdown {
405                router
406                    .serve_with_incoming_shutdown(incoming, signal)
407                    .await
408                    .map_err(std::io::Error::other)
409            } else {
410                router
411                    .serve_with_incoming(incoming)
412                    .await
413                    .map_err(std::io::Error::other)
414            }
415        }
416
417        #[cfg(not(feature = "grpc-web"))]
418        {
419            let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
420            let i = &interceptor;
421            let router = tonic::transport::Server::builder()
422                .add_service(BufferServiceServer::with_interceptor(buffer_service, i.clone()))
423                .add_service(EditorServiceServer::with_interceptor(editor_service, i.clone()))
424                .add_service(InputServiceServer::with_interceptor(input_service, i.clone()))
425                .add_service(ModuleServiceServer::with_interceptor(module_service, i.clone()))
426                .add_service(StateServiceServer::with_interceptor(state_service, i.clone()))
427                .add_service(ServerServiceServer::with_interceptor(server_service, i.clone()))
428                .add_service(NotificationServiceServer::with_interceptor(
429                    notification_service,
430                    i.clone(),
431                ))
432                .add_service(SyntaxServiceServer::with_interceptor(syntax_service, i.clone()))
433                .add_service(PresenceServiceServer::with_interceptor(presence_service, i.clone()))
434                .add_service(ExtensionServiceServer::with_interceptor(extension_service, i.clone()))
435                .add_service(CommandServiceServer::with_interceptor(command_service, i.clone()))
436                .add_service(DebugServiceServer::with_interceptor(debug_service, i.clone()));
437
438            if let Some(signal) = shutdown {
439                router
440                    .serve_with_incoming_shutdown(incoming, signal)
441                    .await
442                    .map_err(std::io::Error::other)
443            } else {
444                router
445                    .serve_with_incoming(incoming)
446                    .await
447                    .map_err(std::io::Error::other)
448            }
449        }
450    }
451
452    /// Run the server until a shutdown signal is received.
453    ///
454    /// Similar to [`run()`](Self::run) but accepts a shutdown future and an optional
455    /// port sender. When the shutdown future resolves, the server performs a graceful
456    /// shutdown. The port sender reports the actual bound port (useful when binding
457    /// to port 0 for OS-assigned ports).
458    ///
459    /// Only gRPC transport is supported.
460    ///
461    /// # Errors
462    ///
463    /// Returns an error if the transport fails to start or if the configured
464    /// transport is not gRPC.
465    pub async fn run_until(
466        &self,
467        shutdown: impl std::future::Future<Output = ()> + Send + 'static,
468        port_tx: Option<tokio::sync::oneshot::Sender<u16>>,
469    ) -> std::io::Result<()> {
470        // Create the default session with module-initialized state
471        let session_state = self.create_session_state();
472        let default_session = Arc::new(Session::from_state(
473            SessionId::new(&*self.config.default_session_name),
474            session_state,
475        ));
476        self.sessions.insert(&default_session);
477
478        tracing::info!(
479            session = %self.config.default_session_name,
480            "Created default session"
481        );
482
483        let port = match &self.config.transport {
484            TransportMode::Grpc { port } => *port,
485            _ => {
486                return Err(std::io::Error::new(
487                    std::io::ErrorKind::Unsupported,
488                    "run_until() only supports gRPC transport",
489                ));
490            }
491        };
492
493        self.run_grpc(port, Some(Box::pin(shutdown)), port_tx).await
494    }
495
496    /// Get a reference to the session registry.
497    #[must_use]
498    pub const fn sessions(&self) -> &Arc<SessionRegistry> {
499        &self.sessions
500    }
501}
502
503#[cfg(test)]
504#[path = "server_tests.rs"]
505mod tests;