Skip to main content

myko_server/
lib.rs

1//! Myko server runtime — WebSocket, durable event backends, peer federation.
2//!
3//! This crate contains the tokio-dependent parts of the Myko server:
4//! - `CellServer` — server lifecycle (durable catch-up init, WS accept loop)
5//! - `postgres` — PostgreSQL producer/consumer (event-table + LISTEN/NOTIFY)
6//! - `ws_handler` — WebSocket connection handling
7//! - `peer_registry` — federation with other servers
8//! - `mcp` — Model Context Protocol server
9//!
10//! Tokio-free server types (CellServerCtx, HandlerRegistry, etc.) live in `myko::server`.
11
12pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod router;
17pub mod server_ownership;
18pub mod ws_handler;
19pub mod ws_timing;
20
21// Re-export all tokio-free server types from myko
22use std::{
23    collections::HashMap,
24    net::SocketAddr,
25    sync::{
26        Arc, RwLock,
27        atomic::{AtomicBool, Ordering},
28    },
29    time::Duration,
30};
31
32use futures_util::StreamExt;
33pub use myko::server::*;
34use myko::{
35    client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
36    search::SearchIndex, store::StoreRegistry, wire::MEvent,
37};
38pub use peer_persister::PeerPersister;
39pub use server_ownership::ServerOwnershipManager;
40use uuid::Uuid;
41
42use crate::postgres::{
43    CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
44    PostgresHistoryStore, PostgresProducerHandle,
45};
46
47/// Cell-based Myko server configuration.
48#[derive(Clone)]
49pub struct CellServerConfig {
50    /// Address to bind the WebSocket server
51    pub bind_addr: SocketAddr,
52    /// Optional Postgres configuration for event persistence/distribution
53    pub postgres: Option<PostgresConfig>,
54    /// Server host ID (auto-generated if not provided)
55    pub host_id: Option<Uuid>,
56    /// Optional peer registry configuration for federation
57    pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
58    /// Default persister override
59    pub default_persister: Option<Arc<dyn Persister>>,
60    /// Per-entity persister overrides keyed by entity type name
61    pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
62    /// Optional pre-constructed peer-client map. When provided, it will be
63    /// used as-is (so any `PeerPersister` built against the same `Arc`
64    /// shares the live map). If `None`, the server creates its own.
65    pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
66}
67
68/// Builder for creating a CellServer.
69#[derive(Default)]
70pub struct CellServerBuilder {
71    bind_addr: Option<SocketAddr>,
72    host_id: Option<Uuid>,
73    postgres: Option<PostgresConfig>,
74    peer_registry: Option<peer_registry::PeerRegistryConfig>,
75    default_persister: Option<Arc<dyn Persister>>,
76    persister_overrides: HashMap<String, Arc<dyn Persister>>,
77    /// Optional pre-constructed peer-client map — useful when a
78    /// `PeerPersister` must reference the same map the server will use.
79    /// Defaults to a fresh empty map if not provided.
80    peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
81    after_init: Option<AfterInitCallback>,
82    /// Optional MCP `ServerInfo`. Defaults to `ServerInfo::default()` if not
83    /// set; binaries override this to advertise their own name / version /
84    /// instructions on the `/myko/mcp` endpoint.
85    server_info: Option<mcp::dispatch::ServerInfo>,
86}
87
88type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
89
90impl CellServerBuilder {
91    /// Create a new server builder.
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Set the WebSocket bind address.
97    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
98        self.bind_addr = Some(addr);
99        self
100    }
101
102    /// Set the server host ID (auto-generated if not set).
103    pub fn with_host_id(mut self, id: Uuid) -> Self {
104        self.host_id = Some(id);
105        self
106    }
107
108    /// Configure Postgres for event persistence/distribution.
109    pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
110        self.postgres = Some(config);
111        self
112    }
113
114    /// Configure peer registry for federation.
115    pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
116        self.peer_registry = Some(config);
117        self
118    }
119
120    /// Set the default persister used for all entity types without explicit overrides.
121    pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
122        self.default_persister = Some(persister);
123        self
124    }
125
126    /// Override persister for a specific entity type (e.g. "Pulse").
127    pub fn with_persister_override(
128        mut self,
129        entity_type: impl Into<String>,
130        persister: Arc<dyn Persister>,
131    ) -> Self {
132        self.persister_overrides
133            .insert(entity_type.into(), persister);
134        self
135    }
136
137    /// Provide a pre-constructed peer-client map. The server's peer
138    /// registry will populate it as peers connect. Pass the same `Arc`
139    /// into `PeerPersister::new(...)` when you register a
140    /// `with_persister_override(..., PeerPersister)` so the persister
141    /// shares the live map.
142    pub fn with_peer_clients(
143        mut self,
144        peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
145    ) -> Self {
146        self.peer_clients = Some(peer_clients);
147        self
148    }
149
150    /// Register a callback to run after initialization and relation establishment,
151    /// but before the WebSocket accept loop starts. Use this for starting subsystems
152    /// that need entity data (e.g., scene engine).
153    pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
154        self.after_init = Some(Box::new(f));
155        self
156    }
157
158    /// Set the MCP `ServerInfo` advertised on the `/myko/mcp` `initialize`
159    /// response. Defaults to `ServerInfo::default()` (`myko-mcp` /
160    /// `CARGO_PKG_VERSION` / no instructions).
161    pub fn with_server_info(mut self, info: mcp::dispatch::ServerInfo) -> Self {
162        self.server_info = Some(info);
163        self
164    }
165
166    /// Build the server.
167    pub fn build(self) -> CellServer {
168        let bind_addr = self
169            .bind_addr
170            .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
171
172        let server_info = Arc::new(self.server_info.unwrap_or_default());
173
174        let mut server = CellServer::new(CellServerConfig {
175            bind_addr,
176            postgres: self.postgres,
177            host_id: self.host_id,
178            peer_registry: self.peer_registry,
179            default_persister: self.default_persister,
180            persister_overrides: self.persister_overrides,
181            peer_clients: self.peer_clients,
182        });
183        server.after_init = std::sync::Mutex::new(self.after_init);
184        server.server_info = server_info;
185        server
186    }
187}
188
189/// Cell-based Myko server.
190///
191/// Uses hyphae cells for reactive queries and reports instead of actors.
192pub struct CellServer {
193    /// Central entity store registry
194    pub registry: Arc<StoreRegistry>,
195    /// Handler registry for items, queries, and reports
196    pub handler_registry: Arc<HandlerRegistry>,
197    /// Relationship manager for cascade operations
198    pub relationship_manager: Arc<RelationshipManager>,
199    /// Optional Postgres producer handle
200    pub postgres_producer: Option<PostgresProducerHandle>,
201    /// Full-text search index
202    pub search_index: Arc<SearchIndex>,
203    /// Persister routing (default + per-entity overrides)
204    pub persisters: Arc<PersisterRouter>,
205    /// Server host ID
206    pub host_id: Uuid,
207    /// Server configuration
208    config: CellServerConfig,
209    /// Postgres producer (kept alive)
210    _postgres_producer_owner: Option<CellPostgresProducer>,
211    /// Postgres consumer (kept alive)
212    postgres_consumer: Option<CellPostgresConsumer>,
213    /// Whether the server is ready to accept connections
214    ready: Arc<AtomicBool>,
215    /// Peer registry for federation (initialized after catch-up)
216    peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
217    /// Live peer clients shared with report context.
218    peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
219    /// Callback to run after init (catch-up + relations) but before WS loop
220    after_init: std::sync::Mutex<Option<AfterInitCallback>>,
221    /// MCP `ServerInfo` advertised on the `/myko/mcp` `initialize` response.
222    /// Set via [`CellServerBuilder::with_server_info`]; defaults to
223    /// `ServerInfo::default()`.
224    server_info: Arc<mcp::dispatch::ServerInfo>,
225    /// Sender for local+replicated event fan-out to saga runtime.
226    saga_event_tx: flume::Sender<MEvent>,
227    /// Receiver consumed when saga runtime starts.
228    saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
229    /// Saga tasks kept alive for server lifetime.
230    saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
231    /// Server ownership death-watch guard (kept alive for server lifetime).
232    _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
233    /// Hyphae cell inspector server (kept alive for the lifetime of the server)
234    #[cfg(feature = "inspector")]
235    _inspector: hyphae::server::InspectorServer,
236}
237
238impl CellServer {
239    /// Create a new server builder.
240    pub fn builder() -> CellServerBuilder {
241        CellServerBuilder::new()
242    }
243
244    /// Create a new cell-based server.
245    pub fn new(config: CellServerConfig) -> Self {
246        let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
247        let registry = Arc::new(StoreRegistry::new());
248        let handler_registry = Arc::new(HandlerRegistry::new());
249        let relationship_manager = Arc::new(RelationshipManager::new());
250
251        // Initialize the client registry for WebSocket client message dispatch
252        init_client_registry();
253
254        let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
255        let (postgres_producer_owner, postgres_producer, postgres_consumer) =
256            if let Some(ref postgres_config) = config.postgres {
257                match CellPostgresProducer::new(postgres_config, host_id) {
258                    Ok(producer) => {
259                        let handle = producer.handle();
260                        let consumer = match CellPostgresConsumer::start(
261                            postgres_config,
262                            host_id,
263                            handler_registry.clone(),
264                            registry.clone(),
265                        ) {
266                            Ok(c) => Some(c),
267                            Err(e) => {
268                                log::error!("Failed to start Postgres consumer: {}", e);
269                                None
270                            }
271                        };
272                        (Some(producer), Some(handle), consumer)
273                    }
274                    Err(e) => {
275                        log::error!("Failed to create Postgres producer: {}", e);
276                        (None, None, None)
277                    }
278                }
279            } else {
280                (None, None, None)
281            };
282
283        // If no durable consumer, server is immediately ready
284        let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
285
286        // Initialize full-text search index
287        let search_index = Arc::new(SearchIndex::new());
288
289        // Build persister routing:
290        // - explicit default from config if provided
291        // - otherwise Postgres producer handle when available
292        // - explicit per-entity overrides always win
293        let mut persister_router = PersisterRouter::default();
294        if let Some(default_persister) = config.default_persister.clone() {
295            persister_router.set_default(Some(default_persister));
296        } else if let Some(handle) = postgres_producer.clone() {
297            persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
298        }
299        for (entity_type, persister) in &config.persister_overrides {
300            persister_router.set_override(entity_type.clone(), persister.clone());
301        }
302        let persisters = Arc::new(persister_router);
303
304        // Start the hyphae cell inspector server
305        #[cfg(feature = "inspector")]
306        let inspector = hyphae::server::start_server("myko");
307        #[cfg(feature = "inspector")]
308        log::info!("Hyphae inspector on port {}", inspector.port());
309
310        let peer_clients = config
311            .peer_clients
312            .clone()
313            .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
314
315        Self {
316            registry,
317            handler_registry,
318            relationship_manager,
319            postgres_producer,
320            search_index,
321            persisters,
322            host_id,
323            config,
324            _postgres_producer_owner: postgres_producer_owner,
325            postgres_consumer,
326            ready,
327            peer_registry_instance: RwLock::new(None),
328            peer_clients,
329            after_init: std::sync::Mutex::new(None),
330            server_info: Arc::new(mcp::dispatch::ServerInfo::default()),
331            saga_event_tx,
332            saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
333            saga_tasks: std::sync::Mutex::new(Vec::new()),
334            _server_ownership_guard: std::sync::Mutex::new(None),
335            #[cfg(feature = "inspector")]
336            _inspector: inspector,
337        }
338    }
339
340    /// Start the peer registry for federation.
341    pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
342        let peer_config = config.or_else(|| self.config.peer_registry.clone());
343
344        if let Some(peer_config) = peer_config {
345            log::info!("Starting peer registry");
346            let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
347            *self.peer_registry_instance.write().unwrap() = Some(pr);
348        }
349    }
350
351    /// Check if peer registry is running.
352    pub fn has_peer_registry(&self) -> bool {
353        self.peer_registry_instance.read().unwrap().is_some()
354    }
355
356    /// Get the store registry.
357    pub fn registry(&self) -> Arc<StoreRegistry> {
358        self.registry.clone()
359    }
360
361    /// Get the handler registry.
362    pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
363        self.handler_registry.clone()
364    }
365
366    /// Get the MCP `ServerInfo` advertised on the `/myko/mcp` `initialize`
367    /// response.
368    pub fn server_info(&self) -> Arc<mcp::dispatch::ServerInfo> {
369        self.server_info.clone()
370    }
371
372    /// Get a server context for module use.
373    pub fn ctx(&self) -> CellServerCtx {
374        let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
375            self.config.postgres.as_ref().map(|pg| {
376                Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
377                    as Arc<dyn myko::server::HistoryReplayProvider>
378            });
379        CellServerCtx::new(
380            self.host_id,
381            self.registry.clone(),
382            self.handler_registry.clone(),
383            self.relationship_manager.clone(),
384            self.persisters.clone(),
385            self.search_index.clone(),
386            self.peer_clients.clone(),
387            Some(self.saga_event_tx.clone()),
388            history_replay,
389        )
390    }
391
392    fn start_saga_runtime(&self) {
393        let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
394        if registrations.is_empty() {
395            return;
396        }
397        let Some(rx) = self
398            .saga_event_rx
399            .lock()
400            .expect("saga_event_rx mutex poisoned")
401            .take()
402        else {
403            return;
404        };
405
406        log::info!("Starting saga runtime with {} saga(s)", registrations.len());
407
408        // NOTE(ts): One unbounded flume channel per saga, with dispatch-side filtering
409        // so sagas only receive events matching their entity type and change type.
410        struct SagaChannel {
411            tx: flume::Sender<MEvent>,
412            entity_type: &'static str,
413            change_type: myko::event::MEventType,
414        }
415        let mut saga_channels: Vec<SagaChannel> = Vec::new();
416
417        for registration in registrations {
418            let saga = (registration.create)();
419            let saga_name = saga.name().to_string();
420            let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
421            saga_channels.push(SagaChannel {
422                tx: saga_tx,
423                entity_type: registration.event_entity_type,
424                change_type: registration.event_change_type,
425            });
426            let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
427                saga_rx,
428                move |saga_rx| async move {
429                    saga_rx
430                        .recv_async()
431                        .await
432                        .ok()
433                        .map(|event| (event, saga_rx))
434                },
435            ));
436
437            let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
438                self.host_id,
439                self.registry.clone(),
440                self.saga_event_tx.clone(),
441            ));
442            let mut command_stream = saga.build_boxed(events, saga_ctx);
443
444            let host_id = self.host_id;
445            let registry = self.registry.clone();
446            let handler_registry = self.handler_registry.clone();
447            let relationship_manager = self.relationship_manager.clone();
448            let persisters = self.persisters.clone();
449            let search_index = self.search_index.clone();
450            let peer_clients = self.peer_clients.clone();
451            let saga_event_tx = self.saga_event_tx.clone();
452
453            let handle = tokio::spawn(async move {
454                while let Some(command) = command_stream.next().await {
455                    let command_name = command.command_name();
456                    log::debug!("Saga {} executing command {}", saga_name, command_name);
457                    let req = Arc::new(RequestContext::internal(
458                        Arc::from(Uuid::new_v4().to_string()),
459                        host_id,
460                        &format!("saga:{saga_name}"),
461                    ));
462
463                    let cmd_ctx = CommandContext::new(
464                        Arc::from(command_name),
465                        req,
466                        Arc::new(CellServerCtx::new(
467                            host_id,
468                            registry.clone(),
469                            handler_registry.clone(),
470                            relationship_manager.clone(),
471                            persisters.clone(),
472                            search_index.clone(),
473                            peer_clients.clone(),
474                            Some(saga_event_tx.clone()),
475                            None,
476                        )),
477                    );
478
479                    if let Err(err) = command.execute_boxed(cmd_ctx) {
480                        log::error!(
481                            "Saga {} command {} failed: {}",
482                            saga_name,
483                            command_name,
484                            err.message
485                        );
486                    }
487                }
488            });
489
490            self.saga_tasks
491                .lock()
492                .expect("saga_tasks mutex poisoned")
493                .push(handle);
494        }
495
496        // NOTE(ts): Dispatcher fans out events to saga channels, filtering by
497        // entity type and change type so each saga only receives relevant events.
498        let dispatcher = tokio::spawn(async move {
499            while let Ok(event) = rx.recv_async().await {
500                for ch in &saga_channels {
501                    if event.item_type == ch.entity_type && event.change_type == ch.change_type {
502                        let _ = ch.tx.send(event.clone());
503                    }
504                }
505            }
506        });
507        self.saga_tasks
508            .lock()
509            .expect("saga_tasks mutex poisoned")
510            .push(dispatcher);
511    }
512
513    /// Create a Postgres-backed history store for replay/windback operations.
514    pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
515        self.config
516            .postgres
517            .clone()
518            .map(PostgresHistoryStore::new)
519            .transpose()
520    }
521
522    /// Initialize Postgres replay/listener and wait for catch-up.
523    pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
524        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
525            return Err(
526                "Postgres is configured but the Postgres consumer is not running".to_string(),
527            );
528        }
529
530        if let Some(ref consumer) = self.postgres_consumer {
531            consumer.wait_until_caught_up(timeout)?;
532            self.ready.store(true, Ordering::SeqCst);
533        }
534        Ok(())
535    }
536
537    /// Establish relationship invariants.
538    pub fn establish_relations(&self) {
539        if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
540            log::error!("Failed to establish relations: {e}");
541        }
542    }
543
544    /// Check if the server is ready to accept connections.
545    pub fn is_ready(&self) -> bool {
546        if let Some(ref consumer) = self.postgres_consumer {
547            if consumer.is_caught_up() {
548                self.ready.store(true, Ordering::SeqCst);
549                return true;
550            }
551            return false;
552        }
553        true
554    }
555
556    /// Run the server with full initialization.
557    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
558        use tokio::net::TcpListener;
559
560        // Persisters can veto startup via startup healthchecks.
561        let entity_types: Vec<&str> = self
562            .handler_registry
563            .entity_types()
564            .map(|t| t.as_ref())
565            .collect();
566        self.persisters
567            .startup_healthcheck(&entity_types)
568            .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
569
570        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
571            return Err("Postgres is configured but the Postgres consumer failed to start".into());
572        }
573
574        // Wait for Postgres catch-up if configured
575        if self.postgres_consumer.is_some() {
576            log::info!("Waiting for Postgres event consumer to catch up...");
577            let timeout = std::time::Duration::from_secs(300);
578            self.init_postgres_and_wait(timeout)
579                .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
580            log::info!("Postgres caught up, ready to accept connections");
581        }
582
583        // Build search index from store data (after catch-up)
584        log::info!("Building search index...");
585        self.search_index.build_from_registry(&self.registry);
586
587        // Establish relations (cleanup orphans, ensure required entities)
588        log::info!("Establishing relations...");
589        self.establish_relations();
590
591        // Claim orphaned server-owned items and start death watch
592        log::info!("Checking server-owned item ownership...");
593        if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
594            log::error!("Failed to claim orphaned server-owned items: {}", e);
595        }
596        let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
597        *self
598            ._server_ownership_guard
599            .lock()
600            .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
601
602        // Bind WebSocket listener first so peer publication only happens once
603        // the gateway is actually available.
604        let listener = TcpListener::bind(&self.config.bind_addr).await?;
605        log::info!("CellServer listening on {}", self.config.bind_addr);
606        log::info!(
607            "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
608            self.config.bind_addr
609        );
610
611        // Start peer registry if configured
612        if self.config.peer_registry.is_some() {
613            self.start_peer_registry(None);
614        }
615
616        // Run after_init hook (e.g., scene engine startup)
617        if let Some(hook) = self
618            .after_init
619            .lock()
620            .expect("after_init mutex poisoned")
621            .take()
622        {
623            hook(self);
624        }
625
626        self.start_saga_runtime();
627
628        // WS message-throughput summary thread. Emits a single log line every
629        // 250ms with inbound/outbound counts per message kind. Used for
630        // diagnosing server-vs-client pacing during slow loads.
631        crate::ws_timing::start_periodic_logger();
632
633        // Report-cache hit/miss summary thread. Replaces the per-call debug
634        // log spam that was dominating I/O during loads.
635        myko::server::report_cache_stats::start_periodic_logger();
636
637        // Per-search summary thread. One log line per window listing each
638        // search that completed (entity_type, result count, elapsed).
639        myko::search::search_stats::start_periodic_logger();
640
641        log::info!("Server started");
642        self.run_ws_accept_loop(listener).await
643    }
644
645    /// Run just the accept loop (no Postgres / relations / saga startup).
646    pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
647        use tokio::net::TcpListener;
648
649        let listener = TcpListener::bind(&self.config.bind_addr).await?;
650        log::info!("CellServer listening on {}", self.config.bind_addr);
651        log::info!(
652            "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
653            self.config.bind_addr
654        );
655        self.run_ws_accept_loop(listener).await
656    }
657
658    async fn run_ws_accept_loop(
659        &self,
660        listener: tokio::net::TcpListener,
661    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
662        let ready = self.ready.clone();
663
664        loop {
665            let (stream, addr) = listener.accept().await?;
666
667            // Check if server is ready (durable backend caught up)
668            if !ready.load(Ordering::SeqCst) {
669                if self.is_ready() {
670                    log::info!("Server is now ready to accept connections");
671                } else {
672                    log::warn!(
673                        "Rejecting connection from {} - server not ready (durable backend catching up)",
674                        addr
675                    );
676                    drop(stream);
677                    continue;
678                }
679            }
680
681            log::debug!("New connection from {}", addr);
682
683            let ctx = Arc::new(self.ctx());
684            let server_info = self.server_info.clone();
685
686            tokio::spawn(async move {
687                if let Err(e) = router::route_connection(stream, addr, ctx, server_info).await {
688                    log::error!("Connection error from {}: {}", addr, e);
689                }
690            });
691        }
692    }
693}
694
695#[cfg(test)]
696mod tests {
697    use super::*;
698
699    #[test]
700    fn test_server_creation() {
701        let config = CellServerConfig {
702            bind_addr: "127.0.0.1:0".parse().unwrap(),
703            postgres: None,
704            host_id: None,
705            peer_registry: None,
706            default_persister: None,
707            persister_overrides: HashMap::new(),
708            peer_clients: None,
709        };
710        let server = CellServer::new(config);
711        assert!(Arc::strong_count(&server.registry) >= 1);
712    }
713
714    #[test]
715    fn test_server_with_host_id() {
716        let host_id = Uuid::new_v4();
717        let config = CellServerConfig {
718            bind_addr: "127.0.0.1:0".parse().unwrap(),
719            postgres: None,
720            host_id: Some(host_id),
721            peer_registry: None,
722            default_persister: None,
723            persister_overrides: HashMap::new(),
724            peer_clients: None,
725        };
726        let server = CellServer::new(config);
727        assert_eq!(server.host_id, host_id);
728    }
729}