Skip to main content

myko_server/
lib.rs

1//! Myko server runtime — WebSocket, durable event backends, peer federation.
2//!
3//! This crate contains the tokio-dependent parts of the Myko server:
4//! - `CellServer` — server lifecycle (durable catch-up init, WS accept loop)
5//! - `postgres` — PostgreSQL producer/consumer (event-table + LISTEN/NOTIFY)
6//! - `ws_handler` — WebSocket connection handling
7//! - `peer_registry` — federation with other servers
8//! - `mcp` — Model Context Protocol server
9//!
10//! Tokio-free server types (CellServerCtx, HandlerRegistry, etc.) live in `myko::server`.
11
12pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod server_ownership;
17pub mod ws_handler;
18
19// Re-export all tokio-free server types from myko
20use std::{
21    collections::HashMap,
22    net::SocketAddr,
23    sync::{
24        Arc, RwLock,
25        atomic::{AtomicBool, Ordering},
26    },
27    time::Duration,
28};
29
30use futures_util::StreamExt;
31pub use myko::server::*;
32use myko::{
33    client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
34    search::SearchIndex, store::StoreRegistry, wire::MEvent,
35};
36pub use peer_persister::PeerPersister;
37pub use server_ownership::ServerOwnershipManager;
38use uuid::Uuid;
39
40use crate::postgres::{
41    CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
42    PostgresHistoryStore, PostgresProducerHandle,
43};
44
45/// Cell-based Myko server configuration.
46#[derive(Clone)]
47pub struct CellServerConfig {
48    /// Address to bind the WebSocket server
49    pub bind_addr: SocketAddr,
50    /// Optional Postgres configuration for event persistence/distribution
51    pub postgres: Option<PostgresConfig>,
52    /// Server host ID (auto-generated if not provided)
53    pub host_id: Option<Uuid>,
54    /// Optional peer registry configuration for federation
55    pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
56    /// Default persister override
57    pub default_persister: Option<Arc<dyn Persister>>,
58    /// Per-entity persister overrides keyed by entity type name
59    pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
60    /// Optional pre-constructed peer-client map. When provided, it will be
61    /// used as-is (so any `PeerPersister` built against the same `Arc`
62    /// shares the live map). If `None`, the server creates its own.
63    pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
64}
65
66/// Builder for creating a CellServer.
67#[derive(Default)]
68pub struct CellServerBuilder {
69    bind_addr: Option<SocketAddr>,
70    host_id: Option<Uuid>,
71    postgres: Option<PostgresConfig>,
72    peer_registry: Option<peer_registry::PeerRegistryConfig>,
73    default_persister: Option<Arc<dyn Persister>>,
74    persister_overrides: HashMap<String, Arc<dyn Persister>>,
75    /// Optional pre-constructed peer-client map — useful when a
76    /// `PeerPersister` must reference the same map the server will use.
77    /// Defaults to a fresh empty map if not provided.
78    peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
79    after_init: Option<AfterInitCallback>,
80}
81
82type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
83
84impl CellServerBuilder {
85    /// Create a new server builder.
86    pub fn new() -> Self {
87        Self::default()
88    }
89
90    /// Set the WebSocket bind address.
91    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
92        self.bind_addr = Some(addr);
93        self
94    }
95
96    /// Set the server host ID (auto-generated if not set).
97    pub fn with_host_id(mut self, id: Uuid) -> Self {
98        self.host_id = Some(id);
99        self
100    }
101
102    /// Configure Postgres for event persistence/distribution.
103    pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
104        self.postgres = Some(config);
105        self
106    }
107
108    /// Configure peer registry for federation.
109    pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
110        self.peer_registry = Some(config);
111        self
112    }
113
114    /// Set the default persister used for all entity types without explicit overrides.
115    pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
116        self.default_persister = Some(persister);
117        self
118    }
119
120    /// Override persister for a specific entity type (e.g. "Pulse").
121    pub fn with_persister_override(
122        mut self,
123        entity_type: impl Into<String>,
124        persister: Arc<dyn Persister>,
125    ) -> Self {
126        self.persister_overrides
127            .insert(entity_type.into(), persister);
128        self
129    }
130
131    /// Provide a pre-constructed peer-client map. The server's peer
132    /// registry will populate it as peers connect. Pass the same `Arc`
133    /// into `PeerPersister::new(...)` when you register a
134    /// `with_persister_override(..., PeerPersister)` so the persister
135    /// shares the live map.
136    pub fn with_peer_clients(
137        mut self,
138        peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
139    ) -> Self {
140        self.peer_clients = Some(peer_clients);
141        self
142    }
143
144    /// Register a callback to run after initialization and relation establishment,
145    /// but before the WebSocket accept loop starts. Use this for starting subsystems
146    /// that need entity data (e.g., scene engine).
147    pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
148        self.after_init = Some(Box::new(f));
149        self
150    }
151
152    /// Build the server.
153    pub fn build(self) -> CellServer {
154        let bind_addr = self
155            .bind_addr
156            .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
157
158        let mut server = CellServer::new(CellServerConfig {
159            bind_addr,
160            postgres: self.postgres,
161            host_id: self.host_id,
162            peer_registry: self.peer_registry,
163            default_persister: self.default_persister,
164            persister_overrides: self.persister_overrides,
165            peer_clients: self.peer_clients,
166        });
167        server.after_init = std::sync::Mutex::new(self.after_init);
168        server
169    }
170}
171
172/// Cell-based Myko server.
173///
174/// Uses hyphae cells for reactive queries and reports instead of actors.
175pub struct CellServer {
176    /// Central entity store registry
177    pub registry: Arc<StoreRegistry>,
178    /// Handler registry for items, queries, and reports
179    pub handler_registry: Arc<HandlerRegistry>,
180    /// Relationship manager for cascade operations
181    pub relationship_manager: Arc<RelationshipManager>,
182    /// Optional Postgres producer handle
183    pub postgres_producer: Option<PostgresProducerHandle>,
184    /// Full-text search index
185    pub search_index: Arc<SearchIndex>,
186    /// Persister routing (default + per-entity overrides)
187    pub persisters: Arc<PersisterRouter>,
188    /// Server host ID
189    pub host_id: Uuid,
190    /// Server configuration
191    config: CellServerConfig,
192    /// Postgres producer (kept alive)
193    _postgres_producer_owner: Option<CellPostgresProducer>,
194    /// Postgres consumer (kept alive)
195    postgres_consumer: Option<CellPostgresConsumer>,
196    /// Whether the server is ready to accept connections
197    ready: Arc<AtomicBool>,
198    /// Peer registry for federation (initialized after catch-up)
199    peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
200    /// Live peer clients shared with report context.
201    peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
202    /// Callback to run after init (catch-up + relations) but before WS loop
203    after_init: std::sync::Mutex<Option<AfterInitCallback>>,
204    /// Sender for local+replicated event fan-out to saga runtime.
205    saga_event_tx: flume::Sender<MEvent>,
206    /// Receiver consumed when saga runtime starts.
207    saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
208    /// Saga tasks kept alive for server lifetime.
209    saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
210    /// Server ownership death-watch guard (kept alive for server lifetime).
211    _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
212    /// Hyphae cell inspector server (kept alive for the lifetime of the server)
213    #[cfg(feature = "inspector")]
214    _inspector: hyphae::server::InspectorServer,
215}
216
217impl CellServer {
218    /// Create a new server builder.
219    pub fn builder() -> CellServerBuilder {
220        CellServerBuilder::new()
221    }
222
223    /// Create a new cell-based server.
224    pub fn new(config: CellServerConfig) -> Self {
225        let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
226        let registry = Arc::new(StoreRegistry::new());
227        let handler_registry = Arc::new(HandlerRegistry::new());
228        let relationship_manager = Arc::new(RelationshipManager::new());
229
230        // Initialize the client registry for WebSocket client message dispatch
231        init_client_registry();
232
233        let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
234        let (postgres_producer_owner, postgres_producer, postgres_consumer) =
235            if let Some(ref postgres_config) = config.postgres {
236                match CellPostgresProducer::new(postgres_config, host_id) {
237                    Ok(producer) => {
238                        let handle = producer.handle();
239                        let consumer = match CellPostgresConsumer::start(
240                            postgres_config,
241                            host_id,
242                            handler_registry.clone(),
243                            registry.clone(),
244                        ) {
245                            Ok(c) => Some(c),
246                            Err(e) => {
247                                log::error!("Failed to start Postgres consumer: {}", e);
248                                None
249                            }
250                        };
251                        (Some(producer), Some(handle), consumer)
252                    }
253                    Err(e) => {
254                        log::error!("Failed to create Postgres producer: {}", e);
255                        (None, None, None)
256                    }
257                }
258            } else {
259                (None, None, None)
260            };
261
262        // If no durable consumer, server is immediately ready
263        let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
264
265        // Initialize full-text search index
266        let search_index = Arc::new(SearchIndex::new());
267
268        // Build persister routing:
269        // - explicit default from config if provided
270        // - otherwise Postgres producer handle when available
271        // - explicit per-entity overrides always win
272        let mut persister_router = PersisterRouter::default();
273        if let Some(default_persister) = config.default_persister.clone() {
274            persister_router.set_default(Some(default_persister));
275        } else if let Some(handle) = postgres_producer.clone() {
276            persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
277        }
278        for (entity_type, persister) in &config.persister_overrides {
279            persister_router.set_override(entity_type.clone(), persister.clone());
280        }
281        let persisters = Arc::new(persister_router);
282
283        // Start the hyphae cell inspector server
284        #[cfg(feature = "inspector")]
285        let inspector = hyphae::server::start_server("myko");
286        #[cfg(feature = "inspector")]
287        log::info!("Hyphae inspector on port {}", inspector.port());
288
289        let peer_clients = config
290            .peer_clients
291            .clone()
292            .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
293
294        Self {
295            registry,
296            handler_registry,
297            relationship_manager,
298            postgres_producer,
299            search_index,
300            persisters,
301            host_id,
302            config,
303            _postgres_producer_owner: postgres_producer_owner,
304            postgres_consumer,
305            ready,
306            peer_registry_instance: RwLock::new(None),
307            peer_clients,
308            after_init: std::sync::Mutex::new(None),
309            saga_event_tx,
310            saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
311            saga_tasks: std::sync::Mutex::new(Vec::new()),
312            _server_ownership_guard: std::sync::Mutex::new(None),
313            #[cfg(feature = "inspector")]
314            _inspector: inspector,
315        }
316    }
317
318    /// Start the peer registry for federation.
319    pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
320        let peer_config = config.or_else(|| self.config.peer_registry.clone());
321
322        if let Some(peer_config) = peer_config {
323            log::info!("Starting peer registry");
324            let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
325            *self.peer_registry_instance.write().unwrap() = Some(pr);
326        }
327    }
328
329    /// Check if peer registry is running.
330    pub fn has_peer_registry(&self) -> bool {
331        self.peer_registry_instance.read().unwrap().is_some()
332    }
333
334    /// Get the store registry.
335    pub fn registry(&self) -> Arc<StoreRegistry> {
336        self.registry.clone()
337    }
338
339    /// Get the handler registry.
340    pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
341        self.handler_registry.clone()
342    }
343
344    /// Get a server context for module use.
345    pub fn ctx(&self) -> CellServerCtx {
346        let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
347            self.config.postgres.as_ref().map(|pg| {
348                Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
349                    as Arc<dyn myko::server::HistoryReplayProvider>
350            });
351        CellServerCtx::new(
352            self.host_id,
353            self.registry.clone(),
354            self.handler_registry.clone(),
355            self.relationship_manager.clone(),
356            self.persisters.clone(),
357            self.search_index.clone(),
358            self.peer_clients.clone(),
359            Some(self.saga_event_tx.clone()),
360            history_replay,
361        )
362    }
363
364    fn start_saga_runtime(&self) {
365        let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
366        if registrations.is_empty() {
367            return;
368        }
369        let Some(rx) = self
370            .saga_event_rx
371            .lock()
372            .expect("saga_event_rx mutex poisoned")
373            .take()
374        else {
375            return;
376        };
377
378        log::info!("Starting saga runtime with {} saga(s)", registrations.len());
379
380        // NOTE(ts): One unbounded flume channel per saga, with dispatch-side filtering
381        // so sagas only receive events matching their entity type and change type.
382        struct SagaChannel {
383            tx: flume::Sender<MEvent>,
384            entity_type: &'static str,
385            change_type: myko::event::MEventType,
386        }
387        let mut saga_channels: Vec<SagaChannel> = Vec::new();
388
389        for registration in registrations {
390            let saga = (registration.create)();
391            let saga_name = saga.name().to_string();
392            let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
393            saga_channels.push(SagaChannel {
394                tx: saga_tx,
395                entity_type: registration.event_entity_type,
396                change_type: registration.event_change_type,
397            });
398            let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
399                saga_rx,
400                move |saga_rx| async move {
401                    saga_rx
402                        .recv_async()
403                        .await
404                        .ok()
405                        .map(|event| (event, saga_rx))
406                },
407            ));
408
409            let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
410                self.host_id,
411                self.registry.clone(),
412                self.saga_event_tx.clone(),
413            ));
414            let mut command_stream = saga.build_boxed(events, saga_ctx);
415
416            let host_id = self.host_id;
417            let registry = self.registry.clone();
418            let handler_registry = self.handler_registry.clone();
419            let relationship_manager = self.relationship_manager.clone();
420            let persisters = self.persisters.clone();
421            let search_index = self.search_index.clone();
422            let peer_clients = self.peer_clients.clone();
423            let saga_event_tx = self.saga_event_tx.clone();
424
425            let handle = tokio::spawn(async move {
426                while let Some(command) = command_stream.next().await {
427                    let command_name = command.command_name();
428                    log::debug!("Saga {} executing command {}", saga_name, command_name);
429                    let req = Arc::new(RequestContext::internal(
430                        Arc::from(Uuid::new_v4().to_string()),
431                        host_id,
432                        &format!("saga:{saga_name}"),
433                    ));
434
435                    let cmd_ctx = CommandContext::new(
436                        Arc::from(command_name),
437                        req,
438                        Arc::new(CellServerCtx::new(
439                            host_id,
440                            registry.clone(),
441                            handler_registry.clone(),
442                            relationship_manager.clone(),
443                            persisters.clone(),
444                            search_index.clone(),
445                            peer_clients.clone(),
446                            Some(saga_event_tx.clone()),
447                            None,
448                        )),
449                    );
450
451                    if let Err(err) = command.execute_boxed(cmd_ctx) {
452                        log::error!(
453                            "Saga {} command {} failed: {}",
454                            saga_name,
455                            command_name,
456                            err.message
457                        );
458                    }
459                }
460            });
461
462            self.saga_tasks
463                .lock()
464                .expect("saga_tasks mutex poisoned")
465                .push(handle);
466        }
467
468        // NOTE(ts): Dispatcher fans out events to saga channels, filtering by
469        // entity type and change type so each saga only receives relevant events.
470        let dispatcher = tokio::spawn(async move {
471            while let Ok(event) = rx.recv_async().await {
472                for ch in &saga_channels {
473                    if event.item_type == ch.entity_type && event.change_type == ch.change_type {
474                        let _ = ch.tx.send(event.clone());
475                    }
476                }
477            }
478        });
479        self.saga_tasks
480            .lock()
481            .expect("saga_tasks mutex poisoned")
482            .push(dispatcher);
483    }
484
485    /// Create a Postgres-backed history store for replay/windback operations.
486    pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
487        self.config
488            .postgres
489            .clone()
490            .map(PostgresHistoryStore::new)
491            .transpose()
492    }
493
494    /// Initialize Postgres replay/listener and wait for catch-up.
495    pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
496        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
497            return Err(
498                "Postgres is configured but the Postgres consumer is not running".to_string(),
499            );
500        }
501
502        if let Some(ref consumer) = self.postgres_consumer {
503            consumer.wait_until_caught_up(timeout)?;
504            self.ready.store(true, Ordering::SeqCst);
505        }
506        Ok(())
507    }
508
509    /// Establish relationship invariants.
510    pub fn establish_relations(&self) {
511        if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
512            log::error!("Failed to establish relations: {e}");
513        }
514    }
515
516    /// Check if the server is ready to accept connections.
517    pub fn is_ready(&self) -> bool {
518        if let Some(ref consumer) = self.postgres_consumer {
519            if consumer.is_caught_up() {
520                self.ready.store(true, Ordering::SeqCst);
521                return true;
522            }
523            return false;
524        }
525        true
526    }
527
528    /// Run the server with full initialization.
529    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
530        use tokio::net::TcpListener;
531
532        // Persisters can veto startup via startup healthchecks.
533        let entity_types: Vec<&str> = self
534            .handler_registry
535            .entity_types()
536            .map(|t| t.as_ref())
537            .collect();
538        self.persisters
539            .startup_healthcheck(&entity_types)
540            .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
541
542        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
543            return Err("Postgres is configured but the Postgres consumer failed to start".into());
544        }
545
546        // Wait for Postgres catch-up if configured
547        if self.postgres_consumer.is_some() {
548            log::info!("Waiting for Postgres event consumer to catch up...");
549            let timeout = std::time::Duration::from_secs(300);
550            self.init_postgres_and_wait(timeout)
551                .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
552            log::info!("Postgres caught up, ready to accept connections");
553        }
554
555        // Build search index from store data (after catch-up)
556        log::info!("Building search index...");
557        self.search_index.build_from_registry(&self.registry);
558
559        // Establish relations (cleanup orphans, ensure required entities)
560        log::info!("Establishing relations...");
561        self.establish_relations();
562
563        // Claim orphaned server-owned items and start death watch
564        log::info!("Checking server-owned item ownership...");
565        if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
566            log::error!("Failed to claim orphaned server-owned items: {}", e);
567        }
568        let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
569        *self
570            ._server_ownership_guard
571            .lock()
572            .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
573
574        // Bind WebSocket listener first so peer publication only happens once
575        // the gateway is actually available.
576        let listener = TcpListener::bind(&self.config.bind_addr).await?;
577        log::info!("CellServer listening on {}", self.config.bind_addr);
578        log::info!(
579            "WebSocket server listening on ws://{}/myko",
580            self.config.bind_addr
581        );
582
583        // Start peer registry if configured
584        if self.config.peer_registry.is_some() {
585            self.start_peer_registry(None);
586        }
587
588        // Run after_init hook (e.g., scene engine startup)
589        if let Some(hook) = self
590            .after_init
591            .lock()
592            .expect("after_init mutex poisoned")
593            .take()
594        {
595            hook(self);
596        }
597
598        self.start_saga_runtime();
599
600        log::info!("Server started");
601        self.run_ws_accept_loop(listener).await
602    }
603
604    /// Run just the WebSocket accept loop.
605    pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
606        use tokio::net::TcpListener;
607
608        let listener = TcpListener::bind(&self.config.bind_addr).await?;
609        log::info!("CellServer listening on {}", self.config.bind_addr);
610        log::info!(
611            "WebSocket server listening on ws://{}/myko",
612            self.config.bind_addr
613        );
614        self.run_ws_accept_loop(listener).await
615    }
616
617    async fn run_ws_accept_loop(
618        &self,
619        listener: tokio::net::TcpListener,
620    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
621        let ready = self.ready.clone();
622
623        loop {
624            let (stream, addr) = listener.accept().await?;
625
626            // Check if server is ready (durable backend caught up)
627            if !ready.load(Ordering::SeqCst) {
628                if self.is_ready() {
629                    log::info!("Server is now ready to accept connections");
630                } else {
631                    log::warn!(
632                        "Rejecting connection from {} - server not ready (durable backend catching up)",
633                        addr
634                    );
635                    drop(stream);
636                    continue;
637                }
638            }
639
640            log::debug!("New connection from {}", addr);
641
642            let ctx = self.ctx();
643
644            tokio::spawn(async move {
645                if let Err(e) =
646                    ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
647                {
648                    log::error!("Connection error from {}: {}", addr, e);
649                }
650            });
651        }
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use super::*;
658
659    #[test]
660    fn test_server_creation() {
661        let config = CellServerConfig {
662            bind_addr: "127.0.0.1:0".parse().unwrap(),
663            postgres: None,
664            host_id: None,
665            peer_registry: None,
666            default_persister: None,
667            persister_overrides: HashMap::new(),
668            peer_clients: None,
669        };
670        let server = CellServer::new(config);
671        assert!(Arc::strong_count(&server.registry) >= 1);
672    }
673
674    #[test]
675    fn test_server_with_host_id() {
676        let host_id = Uuid::new_v4();
677        let config = CellServerConfig {
678            bind_addr: "127.0.0.1:0".parse().unwrap(),
679            postgres: None,
680            host_id: Some(host_id),
681            peer_registry: None,
682            default_persister: None,
683            persister_overrides: HashMap::new(),
684            peer_clients: None,
685        };
686        let server = CellServer::new(config);
687        assert_eq!(server.host_id, host_id);
688    }
689}