Skip to main content

myko_server/
lib.rs

1//! Myko server runtime — WebSocket, durable event backends, peer federation.
2//!
3//! This crate contains the tokio-dependent parts of the Myko server:
4//! - `CellServer` — server lifecycle (durable catch-up init, WS accept loop)
5//! - `postgres` — PostgreSQL producer/consumer (event-table + LISTEN/NOTIFY)
6//! - `ws_handler` — WebSocket connection handling
7//! - `peer_registry` — federation with other servers
8//! - `mcp` — Model Context Protocol server
9//!
10//! Tokio-free server types (CellServerCtx, HandlerRegistry, etc.) live in `myko::server`.
11
12pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod router;
17pub mod server_ownership;
18pub mod ws_handler;
19pub mod ws_timing;
20
21// Re-export all tokio-free server types from myko
22use std::{
23    collections::HashMap,
24    net::SocketAddr,
25    sync::{
26        Arc, RwLock,
27        atomic::{AtomicBool, Ordering},
28    },
29    time::Duration,
30};
31
32use futures_util::StreamExt;
33pub use myko::server::*;
34use myko::{
35    client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
36    search::SearchIndex, store::StoreRegistry, wire::MEvent,
37};
38pub use peer_persister::PeerPersister;
39pub use server_ownership::ServerOwnershipManager;
40use uuid::Uuid;
41
42use crate::postgres::{
43    CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
44    PostgresHistoryStore, PostgresProducerHandle,
45};
46
47/// Cell-based Myko server configuration.
48#[derive(Clone)]
49pub struct CellServerConfig {
50    /// Address to bind the WebSocket server
51    pub bind_addr: SocketAddr,
52    /// Optional Postgres configuration for event persistence/distribution
53    pub postgres: Option<PostgresConfig>,
54    /// Server host ID (auto-generated if not provided)
55    pub host_id: Option<Uuid>,
56    /// Optional peer registry configuration for federation
57    pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
58    /// Default persister override
59    pub default_persister: Option<Arc<dyn Persister>>,
60    /// Per-entity persister overrides keyed by entity type name
61    pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
62    /// Optional pre-constructed peer-client map. When provided, it will be
63    /// used as-is (so any `PeerPersister` built against the same `Arc`
64    /// shares the live map). If `None`, the server creates its own.
65    pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
66}
67
68/// Builder for creating a CellServer.
69#[derive(Default)]
70pub struct CellServerBuilder {
71    bind_addr: Option<SocketAddr>,
72    host_id: Option<Uuid>,
73    postgres: Option<PostgresConfig>,
74    peer_registry: Option<peer_registry::PeerRegistryConfig>,
75    default_persister: Option<Arc<dyn Persister>>,
76    persister_overrides: HashMap<String, Arc<dyn Persister>>,
77    /// Optional pre-constructed peer-client map — useful when a
78    /// `PeerPersister` must reference the same map the server will use.
79    /// Defaults to a fresh empty map if not provided.
80    peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
81    after_init: Option<AfterInitCallback>,
82}
83
84type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
85
86impl CellServerBuilder {
87    /// Create a new server builder.
88    pub fn new() -> Self {
89        Self::default()
90    }
91
92    /// Set the WebSocket bind address.
93    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
94        self.bind_addr = Some(addr);
95        self
96    }
97
98    /// Set the server host ID (auto-generated if not set).
99    pub fn with_host_id(mut self, id: Uuid) -> Self {
100        self.host_id = Some(id);
101        self
102    }
103
104    /// Configure Postgres for event persistence/distribution.
105    pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
106        self.postgres = Some(config);
107        self
108    }
109
110    /// Configure peer registry for federation.
111    pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
112        self.peer_registry = Some(config);
113        self
114    }
115
116    /// Set the default persister used for all entity types without explicit overrides.
117    pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
118        self.default_persister = Some(persister);
119        self
120    }
121
122    /// Override persister for a specific entity type (e.g. "Pulse").
123    pub fn with_persister_override(
124        mut self,
125        entity_type: impl Into<String>,
126        persister: Arc<dyn Persister>,
127    ) -> Self {
128        self.persister_overrides
129            .insert(entity_type.into(), persister);
130        self
131    }
132
133    /// Provide a pre-constructed peer-client map. The server's peer
134    /// registry will populate it as peers connect. Pass the same `Arc`
135    /// into `PeerPersister::new(...)` when you register a
136    /// `with_persister_override(..., PeerPersister)` so the persister
137    /// shares the live map.
138    pub fn with_peer_clients(
139        mut self,
140        peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
141    ) -> Self {
142        self.peer_clients = Some(peer_clients);
143        self
144    }
145
146    /// Register a callback to run after initialization and relation establishment,
147    /// but before the WebSocket accept loop starts. Use this for starting subsystems
148    /// that need entity data (e.g., scene engine).
149    pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
150        self.after_init = Some(Box::new(f));
151        self
152    }
153
154    /// Build the server.
155    pub fn build(self) -> CellServer {
156        let bind_addr = self
157            .bind_addr
158            .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
159
160        let mut server = CellServer::new(CellServerConfig {
161            bind_addr,
162            postgres: self.postgres,
163            host_id: self.host_id,
164            peer_registry: self.peer_registry,
165            default_persister: self.default_persister,
166            persister_overrides: self.persister_overrides,
167            peer_clients: self.peer_clients,
168        });
169        server.after_init = std::sync::Mutex::new(self.after_init);
170        server
171    }
172}
173
174/// Cell-based Myko server.
175///
176/// Uses hyphae cells for reactive queries and reports instead of actors.
177pub struct CellServer {
178    /// Central entity store registry
179    pub registry: Arc<StoreRegistry>,
180    /// Handler registry for items, queries, and reports
181    pub handler_registry: Arc<HandlerRegistry>,
182    /// Relationship manager for cascade operations
183    pub relationship_manager: Arc<RelationshipManager>,
184    /// Optional Postgres producer handle
185    pub postgres_producer: Option<PostgresProducerHandle>,
186    /// Full-text search index
187    pub search_index: Arc<SearchIndex>,
188    /// Persister routing (default + per-entity overrides)
189    pub persisters: Arc<PersisterRouter>,
190    /// Server host ID
191    pub host_id: Uuid,
192    /// Server configuration
193    config: CellServerConfig,
194    /// Postgres producer (kept alive)
195    _postgres_producer_owner: Option<CellPostgresProducer>,
196    /// Postgres consumer (kept alive)
197    postgres_consumer: Option<CellPostgresConsumer>,
198    /// Whether the server is ready to accept connections
199    ready: Arc<AtomicBool>,
200    /// Peer registry for federation (initialized after catch-up)
201    peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
202    /// Live peer clients shared with report context.
203    peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
204    /// Callback to run after init (catch-up + relations) but before WS loop
205    after_init: std::sync::Mutex<Option<AfterInitCallback>>,
206    /// Sender for local+replicated event fan-out to saga runtime.
207    saga_event_tx: flume::Sender<MEvent>,
208    /// Receiver consumed when saga runtime starts.
209    saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
210    /// Saga tasks kept alive for server lifetime.
211    saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
212    /// Server ownership death-watch guard (kept alive for server lifetime).
213    _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
214    /// Hyphae cell inspector server (kept alive for the lifetime of the server)
215    #[cfg(feature = "inspector")]
216    _inspector: hyphae::server::InspectorServer,
217}
218
219impl CellServer {
220    /// Create a new server builder.
221    pub fn builder() -> CellServerBuilder {
222        CellServerBuilder::new()
223    }
224
225    /// Create a new cell-based server.
226    pub fn new(config: CellServerConfig) -> Self {
227        let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
228        let registry = Arc::new(StoreRegistry::new());
229        let handler_registry = Arc::new(HandlerRegistry::new());
230        let relationship_manager = Arc::new(RelationshipManager::new());
231
232        // Initialize the client registry for WebSocket client message dispatch
233        init_client_registry();
234
235        let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
236        let (postgres_producer_owner, postgres_producer, postgres_consumer) =
237            if let Some(ref postgres_config) = config.postgres {
238                match CellPostgresProducer::new(postgres_config, host_id) {
239                    Ok(producer) => {
240                        let handle = producer.handle();
241                        let consumer = match CellPostgresConsumer::start(
242                            postgres_config,
243                            host_id,
244                            handler_registry.clone(),
245                            registry.clone(),
246                        ) {
247                            Ok(c) => Some(c),
248                            Err(e) => {
249                                log::error!("Failed to start Postgres consumer: {}", e);
250                                None
251                            }
252                        };
253                        (Some(producer), Some(handle), consumer)
254                    }
255                    Err(e) => {
256                        log::error!("Failed to create Postgres producer: {}", e);
257                        (None, None, None)
258                    }
259                }
260            } else {
261                (None, None, None)
262            };
263
264        // If no durable consumer, server is immediately ready
265        let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
266
267        // Initialize full-text search index
268        let search_index = Arc::new(SearchIndex::new());
269
270        // Build persister routing:
271        // - explicit default from config if provided
272        // - otherwise Postgres producer handle when available
273        // - explicit per-entity overrides always win
274        let mut persister_router = PersisterRouter::default();
275        if let Some(default_persister) = config.default_persister.clone() {
276            persister_router.set_default(Some(default_persister));
277        } else if let Some(handle) = postgres_producer.clone() {
278            persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
279        }
280        for (entity_type, persister) in &config.persister_overrides {
281            persister_router.set_override(entity_type.clone(), persister.clone());
282        }
283        let persisters = Arc::new(persister_router);
284
285        // Start the hyphae cell inspector server
286        #[cfg(feature = "inspector")]
287        let inspector = hyphae::server::start_server("myko");
288        #[cfg(feature = "inspector")]
289        log::info!("Hyphae inspector on port {}", inspector.port());
290
291        let peer_clients = config
292            .peer_clients
293            .clone()
294            .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
295
296        Self {
297            registry,
298            handler_registry,
299            relationship_manager,
300            postgres_producer,
301            search_index,
302            persisters,
303            host_id,
304            config,
305            _postgres_producer_owner: postgres_producer_owner,
306            postgres_consumer,
307            ready,
308            peer_registry_instance: RwLock::new(None),
309            peer_clients,
310            after_init: std::sync::Mutex::new(None),
311            saga_event_tx,
312            saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
313            saga_tasks: std::sync::Mutex::new(Vec::new()),
314            _server_ownership_guard: std::sync::Mutex::new(None),
315            #[cfg(feature = "inspector")]
316            _inspector: inspector,
317        }
318    }
319
320    /// Start the peer registry for federation.
321    pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
322        let peer_config = config.or_else(|| self.config.peer_registry.clone());
323
324        if let Some(peer_config) = peer_config {
325            log::info!("Starting peer registry");
326            let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
327            *self.peer_registry_instance.write().unwrap() = Some(pr);
328        }
329    }
330
331    /// Check if peer registry is running.
332    pub fn has_peer_registry(&self) -> bool {
333        self.peer_registry_instance.read().unwrap().is_some()
334    }
335
336    /// Get the store registry.
337    pub fn registry(&self) -> Arc<StoreRegistry> {
338        self.registry.clone()
339    }
340
341    /// Get the handler registry.
342    pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
343        self.handler_registry.clone()
344    }
345
346    /// Get a server context for module use.
347    pub fn ctx(&self) -> CellServerCtx {
348        let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
349            self.config.postgres.as_ref().map(|pg| {
350                Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
351                    as Arc<dyn myko::server::HistoryReplayProvider>
352            });
353        CellServerCtx::new(
354            self.host_id,
355            self.registry.clone(),
356            self.handler_registry.clone(),
357            self.relationship_manager.clone(),
358            self.persisters.clone(),
359            self.search_index.clone(),
360            self.peer_clients.clone(),
361            Some(self.saga_event_tx.clone()),
362            history_replay,
363        )
364    }
365
366    fn start_saga_runtime(&self) {
367        let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
368        if registrations.is_empty() {
369            return;
370        }
371        let Some(rx) = self
372            .saga_event_rx
373            .lock()
374            .expect("saga_event_rx mutex poisoned")
375            .take()
376        else {
377            return;
378        };
379
380        log::info!("Starting saga runtime with {} saga(s)", registrations.len());
381
382        // NOTE(ts): One unbounded flume channel per saga, with dispatch-side filtering
383        // so sagas only receive events matching their entity type and change type.
384        struct SagaChannel {
385            tx: flume::Sender<MEvent>,
386            entity_type: &'static str,
387            change_type: myko::event::MEventType,
388        }
389        let mut saga_channels: Vec<SagaChannel> = Vec::new();
390
391        for registration in registrations {
392            let saga = (registration.create)();
393            let saga_name = saga.name().to_string();
394            let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
395            saga_channels.push(SagaChannel {
396                tx: saga_tx,
397                entity_type: registration.event_entity_type,
398                change_type: registration.event_change_type,
399            });
400            let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
401                saga_rx,
402                move |saga_rx| async move {
403                    saga_rx
404                        .recv_async()
405                        .await
406                        .ok()
407                        .map(|event| (event, saga_rx))
408                },
409            ));
410
411            let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
412                self.host_id,
413                self.registry.clone(),
414                self.saga_event_tx.clone(),
415            ));
416            let mut command_stream = saga.build_boxed(events, saga_ctx);
417
418            let host_id = self.host_id;
419            let registry = self.registry.clone();
420            let handler_registry = self.handler_registry.clone();
421            let relationship_manager = self.relationship_manager.clone();
422            let persisters = self.persisters.clone();
423            let search_index = self.search_index.clone();
424            let peer_clients = self.peer_clients.clone();
425            let saga_event_tx = self.saga_event_tx.clone();
426
427            let handle = tokio::spawn(async move {
428                while let Some(command) = command_stream.next().await {
429                    let command_name = command.command_name();
430                    log::debug!("Saga {} executing command {}", saga_name, command_name);
431                    let req = Arc::new(RequestContext::internal(
432                        Arc::from(Uuid::new_v4().to_string()),
433                        host_id,
434                        &format!("saga:{saga_name}"),
435                    ));
436
437                    let cmd_ctx = CommandContext::new(
438                        Arc::from(command_name),
439                        req,
440                        Arc::new(CellServerCtx::new(
441                            host_id,
442                            registry.clone(),
443                            handler_registry.clone(),
444                            relationship_manager.clone(),
445                            persisters.clone(),
446                            search_index.clone(),
447                            peer_clients.clone(),
448                            Some(saga_event_tx.clone()),
449                            None,
450                        )),
451                    );
452
453                    if let Err(err) = command.execute_boxed(cmd_ctx) {
454                        log::error!(
455                            "Saga {} command {} failed: {}",
456                            saga_name,
457                            command_name,
458                            err.message
459                        );
460                    }
461                }
462            });
463
464            self.saga_tasks
465                .lock()
466                .expect("saga_tasks mutex poisoned")
467                .push(handle);
468        }
469
470        // NOTE(ts): Dispatcher fans out events to saga channels, filtering by
471        // entity type and change type so each saga only receives relevant events.
472        let dispatcher = tokio::spawn(async move {
473            while let Ok(event) = rx.recv_async().await {
474                for ch in &saga_channels {
475                    if event.item_type == ch.entity_type && event.change_type == ch.change_type {
476                        let _ = ch.tx.send(event.clone());
477                    }
478                }
479            }
480        });
481        self.saga_tasks
482            .lock()
483            .expect("saga_tasks mutex poisoned")
484            .push(dispatcher);
485    }
486
487    /// Create a Postgres-backed history store for replay/windback operations.
488    pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
489        self.config
490            .postgres
491            .clone()
492            .map(PostgresHistoryStore::new)
493            .transpose()
494    }
495
496    /// Initialize Postgres replay/listener and wait for catch-up.
497    pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
498        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
499            return Err(
500                "Postgres is configured but the Postgres consumer is not running".to_string(),
501            );
502        }
503
504        if let Some(ref consumer) = self.postgres_consumer {
505            consumer.wait_until_caught_up(timeout)?;
506            self.ready.store(true, Ordering::SeqCst);
507        }
508        Ok(())
509    }
510
511    /// Establish relationship invariants.
512    pub fn establish_relations(&self) {
513        if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
514            log::error!("Failed to establish relations: {e}");
515        }
516    }
517
518    /// Check if the server is ready to accept connections.
519    pub fn is_ready(&self) -> bool {
520        if let Some(ref consumer) = self.postgres_consumer {
521            if consumer.is_caught_up() {
522                self.ready.store(true, Ordering::SeqCst);
523                return true;
524            }
525            return false;
526        }
527        true
528    }
529
530    /// Run the server with full initialization.
531    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
532        use tokio::net::TcpListener;
533
534        // Persisters can veto startup via startup healthchecks.
535        let entity_types: Vec<&str> = self
536            .handler_registry
537            .entity_types()
538            .map(|t| t.as_ref())
539            .collect();
540        self.persisters
541            .startup_healthcheck(&entity_types)
542            .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
543
544        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
545            return Err("Postgres is configured but the Postgres consumer failed to start".into());
546        }
547
548        // Wait for Postgres catch-up if configured
549        if self.postgres_consumer.is_some() {
550            log::info!("Waiting for Postgres event consumer to catch up...");
551            let timeout = std::time::Duration::from_secs(300);
552            self.init_postgres_and_wait(timeout)
553                .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
554            log::info!("Postgres caught up, ready to accept connections");
555        }
556
557        // Build search index from store data (after catch-up)
558        log::info!("Building search index...");
559        self.search_index.build_from_registry(&self.registry);
560
561        // Establish relations (cleanup orphans, ensure required entities)
562        log::info!("Establishing relations...");
563        self.establish_relations();
564
565        // Claim orphaned server-owned items and start death watch
566        log::info!("Checking server-owned item ownership...");
567        if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
568            log::error!("Failed to claim orphaned server-owned items: {}", e);
569        }
570        let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
571        *self
572            ._server_ownership_guard
573            .lock()
574            .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
575
576        // Bind WebSocket listener first so peer publication only happens once
577        // the gateway is actually available.
578        let listener = TcpListener::bind(&self.config.bind_addr).await?;
579        log::info!("CellServer listening on {}", self.config.bind_addr);
580        log::info!(
581            "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
582            self.config.bind_addr
583        );
584
585        // Start peer registry if configured
586        if self.config.peer_registry.is_some() {
587            self.start_peer_registry(None);
588        }
589
590        // Run after_init hook (e.g., scene engine startup)
591        if let Some(hook) = self
592            .after_init
593            .lock()
594            .expect("after_init mutex poisoned")
595            .take()
596        {
597            hook(self);
598        }
599
600        self.start_saga_runtime();
601
602        // WS message-throughput summary thread. Emits a single log line every
603        // 250ms with inbound/outbound counts per message kind. Used for
604        // diagnosing server-vs-client pacing during slow loads.
605        crate::ws_timing::start_periodic_logger();
606
607        // Report-cache hit/miss summary thread. Replaces the per-call debug
608        // log spam that was dominating I/O during loads.
609        myko::server::report_cache_stats::start_periodic_logger();
610
611        // Per-search summary thread. One log line per window listing each
612        // search that completed (entity_type, result count, elapsed).
613        myko::search::search_stats::start_periodic_logger();
614
615        log::info!("Server started");
616        self.run_ws_accept_loop(listener).await
617    }
618
619    /// Run just the accept loop (no Postgres / relations / saga startup).
620    pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
621        use tokio::net::TcpListener;
622
623        let listener = TcpListener::bind(&self.config.bind_addr).await?;
624        log::info!("CellServer listening on {}", self.config.bind_addr);
625        log::info!(
626            "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
627            self.config.bind_addr
628        );
629        self.run_ws_accept_loop(listener).await
630    }
631
632    async fn run_ws_accept_loop(
633        &self,
634        listener: tokio::net::TcpListener,
635    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
636        let ready = self.ready.clone();
637
638        loop {
639            let (stream, addr) = listener.accept().await?;
640
641            // Check if server is ready (durable backend caught up)
642            if !ready.load(Ordering::SeqCst) {
643                if self.is_ready() {
644                    log::info!("Server is now ready to accept connections");
645                } else {
646                    log::warn!(
647                        "Rejecting connection from {} - server not ready (durable backend catching up)",
648                        addr
649                    );
650                    drop(stream);
651                    continue;
652                }
653            }
654
655            log::debug!("New connection from {}", addr);
656
657            let ctx = Arc::new(self.ctx());
658
659            tokio::spawn(async move {
660                if let Err(e) = router::route_connection(stream, addr, ctx).await {
661                    log::error!("Connection error from {}: {}", addr, e);
662                }
663            });
664        }
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671
672    #[test]
673    fn test_server_creation() {
674        let config = CellServerConfig {
675            bind_addr: "127.0.0.1:0".parse().unwrap(),
676            postgres: None,
677            host_id: None,
678            peer_registry: None,
679            default_persister: None,
680            persister_overrides: HashMap::new(),
681            peer_clients: None,
682        };
683        let server = CellServer::new(config);
684        assert!(Arc::strong_count(&server.registry) >= 1);
685    }
686
687    #[test]
688    fn test_server_with_host_id() {
689        let host_id = Uuid::new_v4();
690        let config = CellServerConfig {
691            bind_addr: "127.0.0.1:0".parse().unwrap(),
692            postgres: None,
693            host_id: Some(host_id),
694            peer_registry: None,
695            default_persister: None,
696            persister_overrides: HashMap::new(),
697            peer_clients: None,
698        };
699        let server = CellServer::new(config);
700        assert_eq!(server.host_id, host_id);
701    }
702}