1use std::sync::Arc;
4
5use reovim_kernel::api::v1::ServiceRegistry;
6
7use crate::{
8 ServerConfig, TransportMode,
9 session::{Session, SessionId, SessionRegistry, SessionState, TokenRegistry},
10};
11
12#[cfg(feature = "grpc")]
13use {
14 crate::grpc::{
15 AuthInterceptor, BufferServiceImpl, CommandServiceImpl, DebugServiceImpl,
16 EditorServiceImpl, ExtensionServiceImpl, InputServiceImpl, ModuleServiceImpl,
17 NotificationServiceImpl, PresenceServiceImpl, ServerServiceImpl, StateServiceImpl,
18 SyntaxServiceImpl,
19 },
20 reovim_driver_session::bridges::BridgeRegistry,
21 reovim_protocol::v2::{
22 buffer_service_server::BufferServiceServer, command_service_server::CommandServiceServer,
23 debug_service_server::DebugServiceServer, editor_service_server::EditorServiceServer,
24 extension_service_server::ExtensionServiceServer, input_service_server::InputServiceServer,
25 module_service_server::ModuleServiceServer,
26 notification_service_server::NotificationServiceServer,
27 presence_service_server::PresenceServiceServer, server_service_server::ServerServiceServer,
28 state_service_server::StateServiceServer, syntax_service_server::SyntaxServiceServer,
29 },
30};
31
32pub type SessionFactory = Box<dyn Fn() -> SessionState + Send + Sync>;
37
38pub struct Server {
42 config: ServerConfig,
44
45 sessions: Arc<SessionRegistry>,
47
48 tokens: Arc<TokenRegistry>,
53
54 #[allow(dead_code)]
61 services: Option<Arc<ServiceRegistry>>,
62
63 session_factory: Option<SessionFactory>,
68
69 #[cfg(feature = "grpc")]
74 bridge_registry: Arc<BridgeRegistry>,
75}
76
77impl Server {
78 #[must_use]
84 pub fn new(config: ServerConfig) -> Self {
85 Self {
86 config,
87 sessions: Arc::new(SessionRegistry::new()),
88 tokens: Arc::new(TokenRegistry::new()),
89 services: None,
90 session_factory: None,
91 #[cfg(feature = "grpc")]
92 bridge_registry: Arc::new(BridgeRegistry::default()),
93 }
94 }
95
96 #[must_use]
118 pub fn with_services(config: ServerConfig, services: Arc<ServiceRegistry>) -> Self {
119 Self {
120 config,
121 sessions: Arc::new(SessionRegistry::new()),
122 tokens: Arc::new(TokenRegistry::new()),
123 services: Some(services),
124 session_factory: None,
125 #[cfg(feature = "grpc")]
126 bridge_registry: Arc::new(BridgeRegistry::default()),
127 }
128 }
129
130 #[must_use]
153 pub fn with_session_factory(config: ServerConfig, factory: SessionFactory) -> Self {
154 Self {
155 config,
156 sessions: Arc::new(SessionRegistry::new()),
157 tokens: Arc::new(TokenRegistry::new()),
158 services: None,
159 session_factory: Some(factory),
160 #[cfg(feature = "grpc")]
161 bridge_registry: Arc::new(BridgeRegistry::default()),
162 }
163 }
164
165 #[cfg(feature = "grpc")]
170 #[must_use]
171 pub fn with_bridges(mut self, registry: BridgeRegistry) -> Self {
172 self.bridge_registry = Arc::new(registry);
173 self
174 }
175
176 #[allow(clippy::option_if_let_else)] fn create_session_state(&self) -> SessionState {
179 if let Some(factory) = &self.session_factory {
180 factory()
181 } else {
182 SessionState::default()
183 }
184 }
185
186 #[cfg_attr(coverage_nightly, coverage(off))]
194 pub async fn run(&self) -> std::io::Result<()> {
195 let session_state = self.create_session_state();
197 let default_session = Arc::new(Session::from_state(
198 SessionId::new(&*self.config.default_session_name),
199 session_state,
200 ));
201 self.sessions.insert(&default_session);
202
203 tracing::info!(
204 session = %self.config.default_session_name,
205 "Created default session"
206 );
207
208 match &self.config.transport {
210 TransportMode::TcpWithFallback => self.run_tcp_fallback().await,
211 TransportMode::Tcp { port } => self.run_tcp(*port).await,
212 #[cfg(unix)]
213 TransportMode::UnixSocket { path } => self.run_unix(path).await,
214 TransportMode::Grpc { port } => self.run_grpc(*port, None, None).await,
215 }
216 }
217
218 #[cfg_attr(coverage_nightly, coverage(off))]
220 async fn run_tcp_fallback(&self) -> std::io::Result<()> {
221 for port in 12540..=12549 {
222 match self.run_tcp(port).await {
223 Ok(()) => return Ok(()),
224 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
225 tracing::debug!(port, "Port in use, trying next");
226 }
227 Err(e) => return Err(e),
228 }
229 }
230 Err(std::io::Error::new(
231 std::io::ErrorKind::AddrInUse,
232 "All ports 12540-12549 are in use",
233 ))
234 }
235
236 #[cfg_attr(coverage_nightly, coverage(off))]
238 async fn run_tcp(&self, port: u16) -> std::io::Result<()> {
239 tracing::info!(port, "Starting TCP server (JSON-RPC not implemented yet)");
240 std::future::pending::<()>().await;
243 Ok(())
244 }
245
246 #[cfg(unix)]
248 #[cfg_attr(coverage_nightly, coverage(off))]
249 async fn run_unix(&self, path: &std::path::Path) -> std::io::Result<()> {
250 tracing::info!(path = %path.display(), "Starting Unix socket server");
251 std::future::pending::<()>().await;
253 Ok(())
254 }
255
256 #[allow(clippy::too_many_lines)] #[cfg_attr(coverage_nightly, coverage(off))]
262 async fn run_grpc(
263 &self,
264 port: u16,
265 shutdown: Option<std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>>,
266 port_tx: Option<tokio::sync::oneshot::Sender<u16>>,
267 ) -> std::io::Result<()> {
268 let addr: std::net::SocketAddr = format!("0.0.0.0:{port}")
270 .parse()
271 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
272
273 let listener = tokio::net::TcpListener::bind(addr).await?;
275 let local_addr = listener.local_addr()?;
276
277 tracing::info!(address = %local_addr, "Starting gRPC server");
278 eprintln!("Listening on 127.0.0.1:{}", local_addr.port());
280
281 if let Some(tx) = port_tx {
283 let _ = tx.send(local_addr.port());
284 }
285
286 let default_session_id = SessionId::new(&*self.config.default_session_name);
287
288 let interceptor = AuthInterceptor::new(Arc::clone(&self.tokens));
290
291 let bridges = Arc::clone(&self.bridge_registry);
294
295 {
298 use reovim_driver_session::TickSchedulerHandle;
299
300 let tick_scheduler = Arc::new(crate::tick::TokioTickScheduler::new(
301 Arc::clone(&self.sessions),
302 default_session_id.clone(),
303 Arc::clone(&bridges),
304 ));
305
306 if let Some(session) = self.sessions.get(&default_session_id) {
307 session.with_state_mut_sync(|state| {
308 let handle = state.app.services.get_or_create::<TickSchedulerHandle>();
309 handle
310 .set(tick_scheduler as Arc<dyn reovim_driver_session::tick::TickScheduler>);
311 });
312 }
313 }
314
315 let buffer_service =
317 BufferServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
318 let editor_service =
319 EditorServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
320 let input_service = InputServiceImpl::new(
321 Arc::clone(&self.sessions),
322 default_session_id.clone(),
323 Arc::clone(&bridges),
324 );
325 let state_service =
326 StateServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
327 let server_service =
328 ServerServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
329 let notification_service = NotificationServiceImpl::new(
330 Arc::clone(&self.sessions),
331 default_session_id.clone(),
332 Arc::clone(&self.tokens),
333 );
334
335 let module_service = ModuleServiceImpl::new();
337
338 let syntax_service =
340 SyntaxServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
341
342 let presence_service = PresenceServiceImpl::new(
344 Arc::clone(&self.sessions),
345 default_session_id.clone(),
346 Arc::clone(&self.tokens),
347 );
348
349 let command_service =
351 CommandServiceImpl::new(Arc::clone(&self.sessions), default_session_id.clone());
352
353 let extension_service = ExtensionServiceImpl::new(
355 Arc::clone(&self.sessions),
356 default_session_id.clone(),
357 bridges,
358 );
359
360 let debug_service = DebugServiceImpl::with_sessions(
362 Arc::clone(&self.sessions),
363 default_session_id,
364 Arc::clone(&self.bridge_registry),
365 );
366
367 #[cfg(feature = "grpc-web")]
369 {
370 use tower_http::cors::{Any, CorsLayer};
371
372 tracing::info!("gRPC-Web support enabled (HTTP/1.1 + CORS)");
373
374 let cors = CorsLayer::new()
377 .allow_origin(Any)
378 .allow_headers(Any)
379 .allow_methods(Any)
380 .expose_headers(Any);
381
382 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
383 let i = &interceptor;
384 let router = tonic::transport::Server::builder()
385 .accept_http1(true) .layer(cors)
387 .layer(tonic_web::GrpcWebLayer::new())
388 .add_service(BufferServiceServer::with_interceptor(buffer_service, i.clone()))
389 .add_service(EditorServiceServer::with_interceptor(editor_service, i.clone()))
390 .add_service(InputServiceServer::with_interceptor(input_service, i.clone()))
391 .add_service(ModuleServiceServer::with_interceptor(module_service, i.clone()))
392 .add_service(StateServiceServer::with_interceptor(state_service, i.clone()))
393 .add_service(ServerServiceServer::with_interceptor(server_service, i.clone()))
394 .add_service(NotificationServiceServer::with_interceptor(
395 notification_service,
396 i.clone(),
397 ))
398 .add_service(SyntaxServiceServer::with_interceptor(syntax_service, i.clone()))
399 .add_service(PresenceServiceServer::with_interceptor(presence_service, i.clone()))
400 .add_service(ExtensionServiceServer::with_interceptor(extension_service, i.clone()))
401 .add_service(CommandServiceServer::with_interceptor(command_service, i.clone()))
402 .add_service(DebugServiceServer::with_interceptor(debug_service, i.clone()));
403
404 if let Some(signal) = shutdown {
405 router
406 .serve_with_incoming_shutdown(incoming, signal)
407 .await
408 .map_err(std::io::Error::other)
409 } else {
410 router
411 .serve_with_incoming(incoming)
412 .await
413 .map_err(std::io::Error::other)
414 }
415 }
416
417 #[cfg(not(feature = "grpc-web"))]
418 {
419 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
420 let i = &interceptor;
421 let router = tonic::transport::Server::builder()
422 .add_service(BufferServiceServer::with_interceptor(buffer_service, i.clone()))
423 .add_service(EditorServiceServer::with_interceptor(editor_service, i.clone()))
424 .add_service(InputServiceServer::with_interceptor(input_service, i.clone()))
425 .add_service(ModuleServiceServer::with_interceptor(module_service, i.clone()))
426 .add_service(StateServiceServer::with_interceptor(state_service, i.clone()))
427 .add_service(ServerServiceServer::with_interceptor(server_service, i.clone()))
428 .add_service(NotificationServiceServer::with_interceptor(
429 notification_service,
430 i.clone(),
431 ))
432 .add_service(SyntaxServiceServer::with_interceptor(syntax_service, i.clone()))
433 .add_service(PresenceServiceServer::with_interceptor(presence_service, i.clone()))
434 .add_service(ExtensionServiceServer::with_interceptor(extension_service, i.clone()))
435 .add_service(CommandServiceServer::with_interceptor(command_service, i.clone()))
436 .add_service(DebugServiceServer::with_interceptor(debug_service, i.clone()));
437
438 if let Some(signal) = shutdown {
439 router
440 .serve_with_incoming_shutdown(incoming, signal)
441 .await
442 .map_err(std::io::Error::other)
443 } else {
444 router
445 .serve_with_incoming(incoming)
446 .await
447 .map_err(std::io::Error::other)
448 }
449 }
450 }
451
452 pub async fn run_until(
466 &self,
467 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
468 port_tx: Option<tokio::sync::oneshot::Sender<u16>>,
469 ) -> std::io::Result<()> {
470 let session_state = self.create_session_state();
472 let default_session = Arc::new(Session::from_state(
473 SessionId::new(&*self.config.default_session_name),
474 session_state,
475 ));
476 self.sessions.insert(&default_session);
477
478 tracing::info!(
479 session = %self.config.default_session_name,
480 "Created default session"
481 );
482
483 let port = match &self.config.transport {
484 TransportMode::Grpc { port } => *port,
485 _ => {
486 return Err(std::io::Error::new(
487 std::io::ErrorKind::Unsupported,
488 "run_until() only supports gRPC transport",
489 ));
490 }
491 };
492
493 self.run_grpc(port, Some(Box::pin(shutdown)), port_tx).await
494 }
495
496 #[must_use]
498 pub const fn sessions(&self) -> &Arc<SessionRegistry> {
499 &self.sessions
500 }
501}
502
503#[cfg(test)]
504#[path = "server_tests.rs"]
505mod tests;