1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
//! Server - the main entry point for running reovim server.
use std::sync::Arc;
use reovim_kernel::api::v1::ServiceRegistry;
use crate::{
ServerConfig, TransportMode,
session::{Session, SessionId, SessionRegistry, SessionState, TokenRegistry},
};
#[cfg(feature = "grpc")]
use {
crate::grpc::{
AuthInterceptor, BufferServiceImpl, CommandServiceImpl, DebugServiceImpl,
EditorServiceImpl, ExtensionServiceImpl, InputServiceImpl, ModuleServiceImpl,
NotificationServiceImpl, PresenceServiceImpl, ServerServiceImpl, StateServiceImpl,
SyntaxServiceImpl,
},
reovim_driver_session::bridges::BridgeRegistry,
reovim_protocol::v2::{
buffer_service_server::BufferServiceServer, command_service_server::CommandServiceServer,
debug_service_server::DebugServiceServer, editor_service_server::EditorServiceServer,
extension_service_server::ExtensionServiceServer, input_service_server::InputServiceServer,
module_service_server::ModuleServiceServer,
notification_service_server::NotificationServiceServer,
presence_service_server::PresenceServiceServer, server_service_server::ServerServiceServer,
state_service_server::StateServiceServer, syntax_service_server::SyntaxServiceServer,
},
};
/// Session factory function type.
///
/// Creates a `SessionState` for new sessions. This allows the runner to inject
/// module-initialized registries into sessions.
pub type SessionFactory = Box<dyn Fn() -> SessionState + Send + Sync>;
/// The reovim server.
///
/// Manages sessions and handles client connections via the configured transport.
pub struct Server {
/// Server configuration.
config: ServerConfig,
/// Registry of active sessions.
sessions: Arc<SessionRegistry>,
/// Token registry for session-based authentication (#483).
///
/// Maps session tokens to client IDs. Shared with the gRPC interceptor
/// and presence service.
tokens: Arc<TokenRegistry>,
/// Optional service registry populated by modules.
///
/// When set, sessions created by the server will use services from this registry
/// (resolvers, command handlers, keybindings, etc.).
///
/// Note: Currently unused - will be used when we add service-based session creation.
#[allow(dead_code)]
services: Option<Arc<ServiceRegistry>>,
/// Optional session factory for creating sessions with custom state.
///
/// If provided, this factory is used to create `SessionState` for new sessions.
/// This enables the runner to inject module-initialized registries.
session_factory: Option<SessionFactory>,
/// Extension bridge registry for gRPC notification emission (#468).
///
/// Bridges are collected from `BridgeProvider` in bootstrap.
/// Defaults to empty registry (no extension notifications).
#[cfg(feature = "grpc")]
bridge_registry: Arc<BridgeRegistry>,
}
impl Server {
/// Create a new server with the given configuration.
///
/// This creates a server with empty registries. For full vim functionality,
/// use [`Server::with_services`] or [`Server::with_session_factory`] to
/// inject module-initialized registries.
#[must_use]
pub fn new(config: ServerConfig) -> Self {
Self {
config,
sessions: Arc::new(SessionRegistry::new()),
tokens: Arc::new(TokenRegistry::new()),
services: None,
session_factory: None,
#[cfg(feature = "grpc")]
bridge_registry: Arc::new(BridgeRegistry::default()),
}
}
/// Create a server with a service registry populated by modules.
///
/// The service registry should contain:
/// - `ResolverRegistry` - mode key resolvers
/// - `KeybindingStore` - keybindings
/// - `CommandHandlerStore` - command handlers
/// - `ModeInfoStore` - mode metadata
///
/// # Example
///
/// ```ignore
/// use reovim_server::{Server, ServerConfig};
/// use reovim_kernel::api::v1::ServiceRegistry;
///
/// // Bootstrap: load modules and populate services
/// let services = Arc::new(ServiceRegistry::new());
/// bootstrap_modules(&services);
///
/// let server = Server::with_services(ServerConfig::default(), services);
/// server.run().await?;
/// ```
#[must_use]
pub fn with_services(config: ServerConfig, services: Arc<ServiceRegistry>) -> Self {
Self {
config,
sessions: Arc::new(SessionRegistry::new()),
tokens: Arc::new(TokenRegistry::new()),
services: Some(services),
session_factory: None,
#[cfg(feature = "grpc")]
bridge_registry: Arc::new(BridgeRegistry::default()),
}
}
/// Create a server with a custom session factory.
///
/// The factory function is called each time a new session is created,
/// allowing the runner to inject fully-configured `SessionState` instances
/// with module-initialized registries.
///
/// This is the most flexible option for module integration.
///
/// # Example
///
/// ```ignore
/// use reovim_server::{Server, ServerConfig, SessionState};
///
/// let server = Server::with_session_factory(
/// ServerConfig::default(),
/// Box::new(|| {
/// // Create session state with populated registries
/// create_session_state_with_modules()
/// }),
/// );
/// server.run().await?;
/// ```
#[must_use]
pub fn with_session_factory(config: ServerConfig, factory: SessionFactory) -> Self {
Self {
config,
sessions: Arc::new(SessionRegistry::new()),
tokens: Arc::new(TokenRegistry::new()),
services: None,
session_factory: Some(factory),
#[cfg(feature = "grpc")]
bridge_registry: Arc::new(BridgeRegistry::default()),
}
}
/// Set the extension bridge registry (#468).
///
/// Bridges are collected from `BridgeProvider` in bootstrap.
/// Must be called before `run()`.
#[cfg(feature = "grpc")]
#[must_use]
pub fn with_bridges(mut self, registry: BridgeRegistry) -> Self {
self.bridge_registry = Arc::new(registry);
self
}
/// Create a session state using the configured factory or default.
#[allow(clippy::option_if_let_else)] // More readable with if-let
fn create_session_state(&self) -> SessionState {
if let Some(factory) = &self.session_factory {
factory()
} else {
SessionState::default()
}
}
/// Run the server.
///
/// This method blocks until the server is shut down.
///
/// # Errors
///
/// Returns an error if the transport fails to start (e.g., port in use).
#[cfg_attr(coverage_nightly, coverage(off))]
pub async fn run(&self) -> std::io::Result<()> {
// Create the default session with module-initialized state
let session_state = self.create_session_state();
let default_session = Arc::new(Session::from_state(
SessionId::new(&*self.config.default_session_name),
session_state,
));
self.sessions.insert(&default_session);
tracing::info!(
session = %self.config.default_session_name,
"Created default session"
);
// Start the appropriate transport
match &self.config.transport {
TransportMode::TcpWithFallback => self.run_tcp_fallback().await,
TransportMode::Tcp { port } => self.run_tcp(*port).await,
#[cfg(unix)]
TransportMode::UnixSocket { path } => self.run_unix(path).await,
TransportMode::Grpc { port } => self.run_grpc(*port, None, None).await,
}
}
/// Run with TCP transport, trying ports 12540-12549.
#[cfg_attr(coverage_nightly, coverage(off))]
async fn run_tcp_fallback(&self) -> std::io::Result<()> {
for port in 12540..=12549 {
match self.run_tcp(port).await {
Ok(()) => return Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
tracing::debug!(port, "Port in use, trying next");
}
Err(e) => return Err(e),
}
}
Err(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
"All ports 12540-12549 are in use",
))
}
/// Run with TCP transport on a specific port.
#[cfg_attr(coverage_nightly, coverage(off))]
async fn run_tcp(&self, port: u16) -> std::io::Result<()> {
tracing::info!(port, "Starting TCP server (JSON-RPC not implemented yet)");
// TODO: Implement JSON-RPC server
// For now, just wait forever
std::future::pending::<()>().await;
Ok(())
}
/// Run with Unix socket transport.
#[cfg(unix)]
#[cfg_attr(coverage_nightly, coverage(off))]
async fn run_unix(&self, path: &std::path::Path) -> std::io::Result<()> {
tracing::info!(path = %path.display(), "Starting Unix socket server");
// TODO: Implement Unix socket server
std::future::pending::<()>().await;
Ok(())
}
/// Run with gRPC transport.
///
/// When `shutdown` is `Some`, the server will stop when the future resolves.
/// When `port_tx` is `Some`, the bound port is sent before serving starts.
#[allow(clippy::too_many_lines)] // Service wiring is inherently verbose
#[cfg_attr(coverage_nightly, coverage(off))]
async fn run_grpc(
&self,
port: u16,
shutdown: Option<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
port_tx: Option<tokio::sync::oneshot::Sender<u16>>,
) -> std::io::Result<()> {
// TODO: Make bind address configurable (currently 0.0.0.0 for dev testing)
let addr: std::net::SocketAddr = format!("0.0.0.0:{port}")
.parse()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
// Bind first to get actual port (handles port 0 for testing)
let listener = tokio::net::TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
tracing::info!(address = %local_addr, "Starting gRPC server");
// Output for test harness (expects exact format)
eprintln!("Listening on 127.0.0.1:{}", local_addr.port());
// Report port to caller if requested (for integrated mode with OS-assigned port)
if let Some(tx) = port_tx {
let _ = tx.send(local_addr.port());
}
let default_session_id = SessionId::new(&*self.config.default_session_name);
// Auth interceptor: resolves x-reovim-token → ClientId (#483)
let interceptor = AuthInterceptor::new(Arc::clone(&self.tokens));
// Extension bridge registry (#514/#468) — shared between InputService and ExtensionService.
// Bridges are now collected from BridgeProvider by bootstrap, not hardcoded here.
let bridges = Arc::clone(&self.bridge_registry);
// Tick scheduler for server-driven state advancement (#546).
// Modules call TickSchedulerHandle.start() to begin periodic ticking.
{
use reovim_driver_session::TickSchedulerHandle;
let tick_scheduler = Arc::new(crate::tick::TokioTickScheduler::new(
Arc::clone(&self.sessions),
default_session_id.clone(),
Arc::clone(&bridges),
));
if let Some(session) = self.sessions.get(&default_session_id) {
session.with_state_mut_sync(|state| {
let handle = state.app.services.get_or_create::<TickSchedulerHandle>();
handle
.set(tick_scheduler as Arc<dyn reovim_driver_session::tick::TickScheduler>);
});
}
}
// Create all gRPC services
let buffer_service =
BufferServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
let editor_service =
EditorServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
let input_service = InputServiceImpl::new(
Arc::clone(&self.sessions),
default_session_id.clone(),
Arc::clone(&bridges),
);
let state_service =
StateServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
let server_service =
ServerServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
let notification_service = NotificationServiceImpl::new(
Arc::clone(&self.sessions),
default_session_id.clone(),
Arc::clone(&self.tokens),
);
// ModuleService is a stub - full implementation is in runner
let module_service = ModuleServiceImpl::new();
// SyntaxService provides token data for syntax highlighting
let syntax_service =
SyntaxServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
// PresenceService for multi-client awareness (Phase 14)
let presence_service = PresenceServiceImpl::new(
Arc::clone(&self.sessions),
default_session_id.clone(),
Arc::clone(&self.tokens),
);
// CommandService for command completion (#453)
let command_service =
CommandServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
// ExtensionService for querying extension state (#514)
let extension_service = ExtensionServiceImpl::new(
Arc::clone(&self.sessions),
default_session_id.clone(),
bridges,
);
// DebugService for CLI client-targeting operations (#468)
let debug_service = DebugServiceImpl::with_sessions(
Arc::clone(&self.sessions),
default_session_id,
Arc::clone(&self.bridge_registry),
);
// Build gRPC server with optional gRPC-Web support
#[cfg(feature = "grpc-web")]
{
use tower_http::cors::{Any, CorsLayer};
tracing::info!("gRPC-Web support enabled (HTTP/1.1 + CORS)");
// CORS layer for browser access (permissive for development)
// TODO (Phase 9+): Production CORS with configurable allowed origins
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_headers(Any)
.allow_methods(Any)
.expose_headers(Any);
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
let i = &interceptor;
let router = tonic::transport::Server::builder()
.accept_http1(true) // Required for gRPC-Web
.layer(cors)
.layer(tonic_web::GrpcWebLayer::new())
.add_service(BufferServiceServer::with_interceptor(buffer_service, i.clone()))
.add_service(EditorServiceServer::with_interceptor(editor_service, i.clone()))
.add_service(InputServiceServer::with_interceptor(input_service, i.clone()))
.add_service(ModuleServiceServer::with_interceptor(module_service, i.clone()))
.add_service(StateServiceServer::with_interceptor(state_service, i.clone()))
.add_service(ServerServiceServer::with_interceptor(server_service, i.clone()))
.add_service(NotificationServiceServer::with_interceptor(
notification_service,
i.clone(),
))
.add_service(SyntaxServiceServer::with_interceptor(syntax_service, i.clone()))
.add_service(PresenceServiceServer::with_interceptor(presence_service, i.clone()))
.add_service(ExtensionServiceServer::with_interceptor(extension_service, i.clone()))
.add_service(CommandServiceServer::with_interceptor(command_service, i.clone()))
.add_service(DebugServiceServer::with_interceptor(debug_service, i.clone()));
if let Some(signal) = shutdown {
router
.serve_with_incoming_shutdown(incoming, signal)
.await
.map_err(std::io::Error::other)
} else {
router
.serve_with_incoming(incoming)
.await
.map_err(std::io::Error::other)
}
}
#[cfg(not(feature = "grpc-web"))]
{
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
let i = &interceptor;
let router = tonic::transport::Server::builder()
.add_service(BufferServiceServer::with_interceptor(buffer_service, i.clone()))
.add_service(EditorServiceServer::with_interceptor(editor_service, i.clone()))
.add_service(InputServiceServer::with_interceptor(input_service, i.clone()))
.add_service(ModuleServiceServer::with_interceptor(module_service, i.clone()))
.add_service(StateServiceServer::with_interceptor(state_service, i.clone()))
.add_service(ServerServiceServer::with_interceptor(server_service, i.clone()))
.add_service(NotificationServiceServer::with_interceptor(
notification_service,
i.clone(),
))
.add_service(SyntaxServiceServer::with_interceptor(syntax_service, i.clone()))
.add_service(PresenceServiceServer::with_interceptor(presence_service, i.clone()))
.add_service(ExtensionServiceServer::with_interceptor(extension_service, i.clone()))
.add_service(CommandServiceServer::with_interceptor(command_service, i.clone()))
.add_service(DebugServiceServer::with_interceptor(debug_service, i.clone()));
if let Some(signal) = shutdown {
router
.serve_with_incoming_shutdown(incoming, signal)
.await
.map_err(std::io::Error::other)
} else {
router
.serve_with_incoming(incoming)
.await
.map_err(std::io::Error::other)
}
}
}
/// Run the server until a shutdown signal is received.
///
/// Similar to [`run()`](Self::run) but accepts a shutdown future and an optional
/// port sender. When the shutdown future resolves, the server performs a graceful
/// shutdown. The port sender reports the actual bound port (useful when binding
/// to port 0 for OS-assigned ports).
///
/// Only gRPC transport is supported.
///
/// # Errors
///
/// Returns an error if the transport fails to start or if the configured
/// transport is not gRPC.
pub async fn run_until(
&self,
shutdown: impl std::future::Future<Output = ()> + Send + 'static,
port_tx: Option<tokio::sync::oneshot::Sender<u16>>,
) -> std::io::Result<()> {
// Create the default session with module-initialized state
let session_state = self.create_session_state();
let default_session = Arc::new(Session::from_state(
SessionId::new(&*self.config.default_session_name),
session_state,
));
self.sessions.insert(&default_session);
tracing::info!(
session = %self.config.default_session_name,
"Created default session"
);
let port = match &self.config.transport {
TransportMode::Grpc { port } => *port,
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"run_until() only supports gRPC transport",
));
}
};
self.run_grpc(port, Some(Box::pin(shutdown)), port_tx).await
}
/// Get a reference to the session registry.
#[must_use]
pub const fn sessions(&self) -> &Arc<SessionRegistry> {
&self.sessions
}
}
#[cfg(test)]
#[path = "server_tests.rs"]
mod tests;