Skip to main content

myko_server/
lib.rs

1//! Myko server runtime — WebSocket, durable event backends, peer federation.
2//!
3//! This crate contains the tokio-dependent parts of the Myko server:
4//! - `CellServer` — server lifecycle (durable catch-up init, WS accept loop)
5//! - `postgres` — PostgreSQL producer/consumer (event-table + LISTEN/NOTIFY)
6//! - `ws_handler` — WebSocket connection handling
7//! - `peer_registry` — federation with other servers
8//! - `mcp` — Model Context Protocol server
9//!
10//! Tokio-free server types (CellServerCtx, HandlerRegistry, etc.) live in `myko::server`.
11
12pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod server_ownership;
17pub mod ws_handler;
18pub mod ws_timing;
19
20// Re-export all tokio-free server types from myko
21use std::{
22    collections::HashMap,
23    net::SocketAddr,
24    sync::{
25        Arc, RwLock,
26        atomic::{AtomicBool, Ordering},
27    },
28    time::Duration,
29};
30
31use futures_util::StreamExt;
32pub use myko::server::*;
33use myko::{
34    client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
35    search::SearchIndex, store::StoreRegistry, wire::MEvent,
36};
37pub use peer_persister::PeerPersister;
38pub use server_ownership::ServerOwnershipManager;
39use uuid::Uuid;
40
41use crate::postgres::{
42    CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
43    PostgresHistoryStore, PostgresProducerHandle,
44};
45
46/// Cell-based Myko server configuration.
47#[derive(Clone)]
48pub struct CellServerConfig {
49    /// Address to bind the WebSocket server
50    pub bind_addr: SocketAddr,
51    /// Optional Postgres configuration for event persistence/distribution
52    pub postgres: Option<PostgresConfig>,
53    /// Server host ID (auto-generated if not provided)
54    pub host_id: Option<Uuid>,
55    /// Optional peer registry configuration for federation
56    pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
57    /// Default persister override
58    pub default_persister: Option<Arc<dyn Persister>>,
59    /// Per-entity persister overrides keyed by entity type name
60    pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
61    /// Optional pre-constructed peer-client map. When provided, it will be
62    /// used as-is (so any `PeerPersister` built against the same `Arc`
63    /// shares the live map). If `None`, the server creates its own.
64    pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
65}
66
67/// Builder for creating a CellServer.
68#[derive(Default)]
69pub struct CellServerBuilder {
70    bind_addr: Option<SocketAddr>,
71    host_id: Option<Uuid>,
72    postgres: Option<PostgresConfig>,
73    peer_registry: Option<peer_registry::PeerRegistryConfig>,
74    default_persister: Option<Arc<dyn Persister>>,
75    persister_overrides: HashMap<String, Arc<dyn Persister>>,
76    /// Optional pre-constructed peer-client map — useful when a
77    /// `PeerPersister` must reference the same map the server will use.
78    /// Defaults to a fresh empty map if not provided.
79    peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
80    after_init: Option<AfterInitCallback>,
81}
82
83type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
84
85impl CellServerBuilder {
86    /// Create a new server builder.
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Set the WebSocket bind address.
92    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
93        self.bind_addr = Some(addr);
94        self
95    }
96
97    /// Set the server host ID (auto-generated if not set).
98    pub fn with_host_id(mut self, id: Uuid) -> Self {
99        self.host_id = Some(id);
100        self
101    }
102
103    /// Configure Postgres for event persistence/distribution.
104    pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
105        self.postgres = Some(config);
106        self
107    }
108
109    /// Configure peer registry for federation.
110    pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
111        self.peer_registry = Some(config);
112        self
113    }
114
115    /// Set the default persister used for all entity types without explicit overrides.
116    pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
117        self.default_persister = Some(persister);
118        self
119    }
120
121    /// Override persister for a specific entity type (e.g. "Pulse").
122    pub fn with_persister_override(
123        mut self,
124        entity_type: impl Into<String>,
125        persister: Arc<dyn Persister>,
126    ) -> Self {
127        self.persister_overrides
128            .insert(entity_type.into(), persister);
129        self
130    }
131
132    /// Provide a pre-constructed peer-client map. The server's peer
133    /// registry will populate it as peers connect. Pass the same `Arc`
134    /// into `PeerPersister::new(...)` when you register a
135    /// `with_persister_override(..., PeerPersister)` so the persister
136    /// shares the live map.
137    pub fn with_peer_clients(
138        mut self,
139        peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
140    ) -> Self {
141        self.peer_clients = Some(peer_clients);
142        self
143    }
144
145    /// Register a callback to run after initialization and relation establishment,
146    /// but before the WebSocket accept loop starts. Use this for starting subsystems
147    /// that need entity data (e.g., scene engine).
148    pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
149        self.after_init = Some(Box::new(f));
150        self
151    }
152
153    /// Build the server.
154    pub fn build(self) -> CellServer {
155        let bind_addr = self
156            .bind_addr
157            .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
158
159        let mut server = CellServer::new(CellServerConfig {
160            bind_addr,
161            postgres: self.postgres,
162            host_id: self.host_id,
163            peer_registry: self.peer_registry,
164            default_persister: self.default_persister,
165            persister_overrides: self.persister_overrides,
166            peer_clients: self.peer_clients,
167        });
168        server.after_init = std::sync::Mutex::new(self.after_init);
169        server
170    }
171}
172
173/// Cell-based Myko server.
174///
175/// Uses hyphae cells for reactive queries and reports instead of actors.
176pub struct CellServer {
177    /// Central entity store registry
178    pub registry: Arc<StoreRegistry>,
179    /// Handler registry for items, queries, and reports
180    pub handler_registry: Arc<HandlerRegistry>,
181    /// Relationship manager for cascade operations
182    pub relationship_manager: Arc<RelationshipManager>,
183    /// Optional Postgres producer handle
184    pub postgres_producer: Option<PostgresProducerHandle>,
185    /// Full-text search index
186    pub search_index: Arc<SearchIndex>,
187    /// Persister routing (default + per-entity overrides)
188    pub persisters: Arc<PersisterRouter>,
189    /// Server host ID
190    pub host_id: Uuid,
191    /// Server configuration
192    config: CellServerConfig,
193    /// Postgres producer (kept alive)
194    _postgres_producer_owner: Option<CellPostgresProducer>,
195    /// Postgres consumer (kept alive)
196    postgres_consumer: Option<CellPostgresConsumer>,
197    /// Whether the server is ready to accept connections
198    ready: Arc<AtomicBool>,
199    /// Peer registry for federation (initialized after catch-up)
200    peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
201    /// Live peer clients shared with report context.
202    peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
203    /// Callback to run after init (catch-up + relations) but before WS loop
204    after_init: std::sync::Mutex<Option<AfterInitCallback>>,
205    /// Sender for local+replicated event fan-out to saga runtime.
206    saga_event_tx: flume::Sender<MEvent>,
207    /// Receiver consumed when saga runtime starts.
208    saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
209    /// Saga tasks kept alive for server lifetime.
210    saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
211    /// Server ownership death-watch guard (kept alive for server lifetime).
212    _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
213    /// Hyphae cell inspector server (kept alive for the lifetime of the server)
214    #[cfg(feature = "inspector")]
215    _inspector: hyphae::server::InspectorServer,
216}
217
218impl CellServer {
219    /// Create a new server builder.
220    pub fn builder() -> CellServerBuilder {
221        CellServerBuilder::new()
222    }
223
224    /// Create a new cell-based server.
225    pub fn new(config: CellServerConfig) -> Self {
226        let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
227        let registry = Arc::new(StoreRegistry::new());
228        let handler_registry = Arc::new(HandlerRegistry::new());
229        let relationship_manager = Arc::new(RelationshipManager::new());
230
231        // Initialize the client registry for WebSocket client message dispatch
232        init_client_registry();
233
234        let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
235        let (postgres_producer_owner, postgres_producer, postgres_consumer) =
236            if let Some(ref postgres_config) = config.postgres {
237                match CellPostgresProducer::new(postgres_config, host_id) {
238                    Ok(producer) => {
239                        let handle = producer.handle();
240                        let consumer = match CellPostgresConsumer::start(
241                            postgres_config,
242                            host_id,
243                            handler_registry.clone(),
244                            registry.clone(),
245                        ) {
246                            Ok(c) => Some(c),
247                            Err(e) => {
248                                log::error!("Failed to start Postgres consumer: {}", e);
249                                None
250                            }
251                        };
252                        (Some(producer), Some(handle), consumer)
253                    }
254                    Err(e) => {
255                        log::error!("Failed to create Postgres producer: {}", e);
256                        (None, None, None)
257                    }
258                }
259            } else {
260                (None, None, None)
261            };
262
263        // If no durable consumer, server is immediately ready
264        let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
265
266        // Initialize full-text search index
267        let search_index = Arc::new(SearchIndex::new());
268
269        // Build persister routing:
270        // - explicit default from config if provided
271        // - otherwise Postgres producer handle when available
272        // - explicit per-entity overrides always win
273        let mut persister_router = PersisterRouter::default();
274        if let Some(default_persister) = config.default_persister.clone() {
275            persister_router.set_default(Some(default_persister));
276        } else if let Some(handle) = postgres_producer.clone() {
277            persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
278        }
279        for (entity_type, persister) in &config.persister_overrides {
280            persister_router.set_override(entity_type.clone(), persister.clone());
281        }
282        let persisters = Arc::new(persister_router);
283
284        // Start the hyphae cell inspector server
285        #[cfg(feature = "inspector")]
286        let inspector = hyphae::server::start_server("myko");
287        #[cfg(feature = "inspector")]
288        log::info!("Hyphae inspector on port {}", inspector.port());
289
290        let peer_clients = config
291            .peer_clients
292            .clone()
293            .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
294
295        Self {
296            registry,
297            handler_registry,
298            relationship_manager,
299            postgres_producer,
300            search_index,
301            persisters,
302            host_id,
303            config,
304            _postgres_producer_owner: postgres_producer_owner,
305            postgres_consumer,
306            ready,
307            peer_registry_instance: RwLock::new(None),
308            peer_clients,
309            after_init: std::sync::Mutex::new(None),
310            saga_event_tx,
311            saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
312            saga_tasks: std::sync::Mutex::new(Vec::new()),
313            _server_ownership_guard: std::sync::Mutex::new(None),
314            #[cfg(feature = "inspector")]
315            _inspector: inspector,
316        }
317    }
318
319    /// Start the peer registry for federation.
320    pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
321        let peer_config = config.or_else(|| self.config.peer_registry.clone());
322
323        if let Some(peer_config) = peer_config {
324            log::info!("Starting peer registry");
325            let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
326            *self.peer_registry_instance.write().unwrap() = Some(pr);
327        }
328    }
329
330    /// Check if peer registry is running.
331    pub fn has_peer_registry(&self) -> bool {
332        self.peer_registry_instance.read().unwrap().is_some()
333    }
334
335    /// Get the store registry.
336    pub fn registry(&self) -> Arc<StoreRegistry> {
337        self.registry.clone()
338    }
339
340    /// Get the handler registry.
341    pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
342        self.handler_registry.clone()
343    }
344
345    /// Get a server context for module use.
346    pub fn ctx(&self) -> CellServerCtx {
347        let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
348            self.config.postgres.as_ref().map(|pg| {
349                Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
350                    as Arc<dyn myko::server::HistoryReplayProvider>
351            });
352        CellServerCtx::new(
353            self.host_id,
354            self.registry.clone(),
355            self.handler_registry.clone(),
356            self.relationship_manager.clone(),
357            self.persisters.clone(),
358            self.search_index.clone(),
359            self.peer_clients.clone(),
360            Some(self.saga_event_tx.clone()),
361            history_replay,
362        )
363    }
364
365    fn start_saga_runtime(&self) {
366        let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
367        if registrations.is_empty() {
368            return;
369        }
370        let Some(rx) = self
371            .saga_event_rx
372            .lock()
373            .expect("saga_event_rx mutex poisoned")
374            .take()
375        else {
376            return;
377        };
378
379        log::info!("Starting saga runtime with {} saga(s)", registrations.len());
380
381        // NOTE(ts): One unbounded flume channel per saga, with dispatch-side filtering
382        // so sagas only receive events matching their entity type and change type.
383        struct SagaChannel {
384            tx: flume::Sender<MEvent>,
385            entity_type: &'static str,
386            change_type: myko::event::MEventType,
387        }
388        let mut saga_channels: Vec<SagaChannel> = Vec::new();
389
390        for registration in registrations {
391            let saga = (registration.create)();
392            let saga_name = saga.name().to_string();
393            let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
394            saga_channels.push(SagaChannel {
395                tx: saga_tx,
396                entity_type: registration.event_entity_type,
397                change_type: registration.event_change_type,
398            });
399            let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
400                saga_rx,
401                move |saga_rx| async move {
402                    saga_rx
403                        .recv_async()
404                        .await
405                        .ok()
406                        .map(|event| (event, saga_rx))
407                },
408            ));
409
410            let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
411                self.host_id,
412                self.registry.clone(),
413                self.saga_event_tx.clone(),
414            ));
415            let mut command_stream = saga.build_boxed(events, saga_ctx);
416
417            let host_id = self.host_id;
418            let registry = self.registry.clone();
419            let handler_registry = self.handler_registry.clone();
420            let relationship_manager = self.relationship_manager.clone();
421            let persisters = self.persisters.clone();
422            let search_index = self.search_index.clone();
423            let peer_clients = self.peer_clients.clone();
424            let saga_event_tx = self.saga_event_tx.clone();
425
426            let handle = tokio::spawn(async move {
427                while let Some(command) = command_stream.next().await {
428                    let command_name = command.command_name();
429                    log::debug!("Saga {} executing command {}", saga_name, command_name);
430                    let req = Arc::new(RequestContext::internal(
431                        Arc::from(Uuid::new_v4().to_string()),
432                        host_id,
433                        &format!("saga:{saga_name}"),
434                    ));
435
436                    let cmd_ctx = CommandContext::new(
437                        Arc::from(command_name),
438                        req,
439                        Arc::new(CellServerCtx::new(
440                            host_id,
441                            registry.clone(),
442                            handler_registry.clone(),
443                            relationship_manager.clone(),
444                            persisters.clone(),
445                            search_index.clone(),
446                            peer_clients.clone(),
447                            Some(saga_event_tx.clone()),
448                            None,
449                        )),
450                    );
451
452                    if let Err(err) = command.execute_boxed(cmd_ctx) {
453                        log::error!(
454                            "Saga {} command {} failed: {}",
455                            saga_name,
456                            command_name,
457                            err.message
458                        );
459                    }
460                }
461            });
462
463            self.saga_tasks
464                .lock()
465                .expect("saga_tasks mutex poisoned")
466                .push(handle);
467        }
468
469        // NOTE(ts): Dispatcher fans out events to saga channels, filtering by
470        // entity type and change type so each saga only receives relevant events.
471        let dispatcher = tokio::spawn(async move {
472            while let Ok(event) = rx.recv_async().await {
473                for ch in &saga_channels {
474                    if event.item_type == ch.entity_type && event.change_type == ch.change_type {
475                        let _ = ch.tx.send(event.clone());
476                    }
477                }
478            }
479        });
480        self.saga_tasks
481            .lock()
482            .expect("saga_tasks mutex poisoned")
483            .push(dispatcher);
484    }
485
486    /// Create a Postgres-backed history store for replay/windback operations.
487    pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
488        self.config
489            .postgres
490            .clone()
491            .map(PostgresHistoryStore::new)
492            .transpose()
493    }
494
495    /// Initialize Postgres replay/listener and wait for catch-up.
496    pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
497        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
498            return Err(
499                "Postgres is configured but the Postgres consumer is not running".to_string(),
500            );
501        }
502
503        if let Some(ref consumer) = self.postgres_consumer {
504            consumer.wait_until_caught_up(timeout)?;
505            self.ready.store(true, Ordering::SeqCst);
506        }
507        Ok(())
508    }
509
510    /// Establish relationship invariants.
511    pub fn establish_relations(&self) {
512        if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
513            log::error!("Failed to establish relations: {e}");
514        }
515    }
516
517    /// Check if the server is ready to accept connections.
518    pub fn is_ready(&self) -> bool {
519        if let Some(ref consumer) = self.postgres_consumer {
520            if consumer.is_caught_up() {
521                self.ready.store(true, Ordering::SeqCst);
522                return true;
523            }
524            return false;
525        }
526        true
527    }
528
529    /// Run the server with full initialization.
530    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
531        use tokio::net::TcpListener;
532
533        // Persisters can veto startup via startup healthchecks.
534        let entity_types: Vec<&str> = self
535            .handler_registry
536            .entity_types()
537            .map(|t| t.as_ref())
538            .collect();
539        self.persisters
540            .startup_healthcheck(&entity_types)
541            .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
542
543        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
544            return Err("Postgres is configured but the Postgres consumer failed to start".into());
545        }
546
547        // Wait for Postgres catch-up if configured
548        if self.postgres_consumer.is_some() {
549            log::info!("Waiting for Postgres event consumer to catch up...");
550            let timeout = std::time::Duration::from_secs(300);
551            self.init_postgres_and_wait(timeout)
552                .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
553            log::info!("Postgres caught up, ready to accept connections");
554        }
555
556        // Build search index from store data (after catch-up)
557        log::info!("Building search index...");
558        self.search_index.build_from_registry(&self.registry);
559
560        // Establish relations (cleanup orphans, ensure required entities)
561        log::info!("Establishing relations...");
562        self.establish_relations();
563
564        // Claim orphaned server-owned items and start death watch
565        log::info!("Checking server-owned item ownership...");
566        if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
567            log::error!("Failed to claim orphaned server-owned items: {}", e);
568        }
569        let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
570        *self
571            ._server_ownership_guard
572            .lock()
573            .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
574
575        // Bind WebSocket listener first so peer publication only happens once
576        // the gateway is actually available.
577        let listener = TcpListener::bind(&self.config.bind_addr).await?;
578        log::info!("CellServer listening on {}", self.config.bind_addr);
579        log::info!(
580            "WebSocket server listening on ws://{}/myko",
581            self.config.bind_addr
582        );
583
584        // Start peer registry if configured
585        if self.config.peer_registry.is_some() {
586            self.start_peer_registry(None);
587        }
588
589        // Run after_init hook (e.g., scene engine startup)
590        if let Some(hook) = self
591            .after_init
592            .lock()
593            .expect("after_init mutex poisoned")
594            .take()
595        {
596            hook(self);
597        }
598
599        self.start_saga_runtime();
600
601        // WS message-throughput summary thread. Emits a single log line every
602        // 250ms with inbound/outbound counts per message kind. Used for
603        // diagnosing server-vs-client pacing during slow loads.
604        crate::ws_timing::start_periodic_logger();
605
606        // Report-cache hit/miss summary thread. Replaces the per-call debug
607        // log spam that was dominating I/O during loads.
608        myko::server::report_cache_stats::start_periodic_logger();
609
610        log::info!("Server started");
611        self.run_ws_accept_loop(listener).await
612    }
613
614    /// Run just the WebSocket accept loop.
615    pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
616        use tokio::net::TcpListener;
617
618        let listener = TcpListener::bind(&self.config.bind_addr).await?;
619        log::info!("CellServer listening on {}", self.config.bind_addr);
620        log::info!(
621            "WebSocket server listening on ws://{}/myko",
622            self.config.bind_addr
623        );
624        self.run_ws_accept_loop(listener).await
625    }
626
627    async fn run_ws_accept_loop(
628        &self,
629        listener: tokio::net::TcpListener,
630    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
631        let ready = self.ready.clone();
632
633        loop {
634            let (stream, addr) = listener.accept().await?;
635
636            // Check if server is ready (durable backend caught up)
637            if !ready.load(Ordering::SeqCst) {
638                if self.is_ready() {
639                    log::info!("Server is now ready to accept connections");
640                } else {
641                    log::warn!(
642                        "Rejecting connection from {} - server not ready (durable backend catching up)",
643                        addr
644                    );
645                    drop(stream);
646                    continue;
647                }
648            }
649
650            log::debug!("New connection from {}", addr);
651
652            let ctx = self.ctx();
653
654            tokio::spawn(async move {
655                if let Err(e) =
656                    ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
657                {
658                    log::error!("Connection error from {}: {}", addr, e);
659                }
660            });
661        }
662    }
663}
664
665#[cfg(test)]
666mod tests {
667    use super::*;
668
669    #[test]
670    fn test_server_creation() {
671        let config = CellServerConfig {
672            bind_addr: "127.0.0.1:0".parse().unwrap(),
673            postgres: None,
674            host_id: None,
675            peer_registry: None,
676            default_persister: None,
677            persister_overrides: HashMap::new(),
678            peer_clients: None,
679        };
680        let server = CellServer::new(config);
681        assert!(Arc::strong_count(&server.registry) >= 1);
682    }
683
684    #[test]
685    fn test_server_with_host_id() {
686        let host_id = Uuid::new_v4();
687        let config = CellServerConfig {
688            bind_addr: "127.0.0.1:0".parse().unwrap(),
689            postgres: None,
690            host_id: Some(host_id),
691            peer_registry: None,
692            default_persister: None,
693            persister_overrides: HashMap::new(),
694            peer_clients: None,
695        };
696        let server = CellServer::new(config);
697        assert_eq!(server.host_id, host_id);
698    }
699}