Skip to main content

myko_server/
lib.rs

1//! Myko server runtime — WebSocket, durable event backends, peer federation.
2//!
3//! This crate contains the tokio-dependent parts of the Myko server:
4//! - `CellServer` — server lifecycle (durable catch-up init, WS accept loop)
5//! - `postgres` — PostgreSQL producer/consumer (event-table + LISTEN/NOTIFY)
6//! - `ws_handler` — WebSocket connection handling
7//! - `peer_registry` — federation with other servers
8//! - `mcp` — Model Context Protocol server
9//!
10//! Tokio-free server types (CellServerCtx, HandlerRegistry, etc.) live in `myko::server`.
11
12pub mod mcp;
13pub mod peer_registry;
14pub mod postgres;
15pub mod server_ownership;
16pub mod ws_handler;
17
18pub use server_ownership::ServerOwnershipManager;
19
20// Re-export all tokio-free server types from myko
21use std::{
22    collections::HashMap,
23    net::SocketAddr,
24    sync::{
25        Arc, RwLock,
26        atomic::{AtomicBool, Ordering},
27    },
28    time::Duration,
29};
30
31use futures_util::StreamExt;
32pub use myko::server::*;
33use myko::{
34    client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
35    search::SearchIndex, store::StoreRegistry, wire::MEvent,
36};
37use uuid::Uuid;
38
39use crate::postgres::{
40    CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
41    PostgresHistoryStore, PostgresProducerHandle,
42};
43
44/// Cell-based Myko server configuration.
45#[derive(Clone)]
46pub struct CellServerConfig {
47    /// Address to bind the WebSocket server
48    pub bind_addr: SocketAddr,
49    /// Optional Postgres configuration for event persistence/distribution
50    pub postgres: Option<PostgresConfig>,
51    /// Server host ID (auto-generated if not provided)
52    pub host_id: Option<Uuid>,
53    /// Optional peer registry configuration for federation
54    pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
55    /// Default persister override
56    pub default_persister: Option<Arc<dyn Persister>>,
57    /// Per-entity persister overrides keyed by entity type name
58    pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
59}
60
61/// Builder for creating a CellServer.
62#[derive(Default)]
63pub struct CellServerBuilder {
64    bind_addr: Option<SocketAddr>,
65    host_id: Option<Uuid>,
66    postgres: Option<PostgresConfig>,
67    peer_registry: Option<peer_registry::PeerRegistryConfig>,
68    default_persister: Option<Arc<dyn Persister>>,
69    persister_overrides: HashMap<String, Arc<dyn Persister>>,
70    after_init: Option<AfterInitCallback>,
71}
72
73type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
74
75impl CellServerBuilder {
76    /// Create a new server builder.
77    pub fn new() -> Self {
78        Self::default()
79    }
80
81    /// Set the WebSocket bind address.
82    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
83        self.bind_addr = Some(addr);
84        self
85    }
86
87    /// Set the server host ID (auto-generated if not set).
88    pub fn with_host_id(mut self, id: Uuid) -> Self {
89        self.host_id = Some(id);
90        self
91    }
92
93    /// Configure Postgres for event persistence/distribution.
94    pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
95        self.postgres = Some(config);
96        self
97    }
98
99    /// Configure peer registry for federation.
100    pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
101        self.peer_registry = Some(config);
102        self
103    }
104
105    /// Set the default persister used for all entity types without explicit overrides.
106    pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
107        self.default_persister = Some(persister);
108        self
109    }
110
111    /// Override persister for a specific entity type (e.g. "Pulse").
112    pub fn with_persister_override(
113        mut self,
114        entity_type: impl Into<String>,
115        persister: Arc<dyn Persister>,
116    ) -> Self {
117        self.persister_overrides
118            .insert(entity_type.into(), persister);
119        self
120    }
121
122    /// Register a callback to run after initialization and relation establishment,
123    /// but before the WebSocket accept loop starts. Use this for starting subsystems
124    /// that need entity data (e.g., scene engine).
125    pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
126        self.after_init = Some(Box::new(f));
127        self
128    }
129
130    /// Build the server.
131    pub fn build(self) -> CellServer {
132        let bind_addr = self
133            .bind_addr
134            .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
135
136        let mut server = CellServer::new(CellServerConfig {
137            bind_addr,
138            postgres: self.postgres,
139            host_id: self.host_id,
140            peer_registry: self.peer_registry,
141            default_persister: self.default_persister,
142            persister_overrides: self.persister_overrides,
143        });
144        server.after_init = std::sync::Mutex::new(self.after_init);
145        server
146    }
147}
148
149/// Cell-based Myko server.
150///
151/// Uses hyphae cells for reactive queries and reports instead of actors.
152pub struct CellServer {
153    /// Central entity store registry
154    pub registry: Arc<StoreRegistry>,
155    /// Handler registry for items, queries, and reports
156    pub handler_registry: Arc<HandlerRegistry>,
157    /// Relationship manager for cascade operations
158    pub relationship_manager: Arc<RelationshipManager>,
159    /// Optional Postgres producer handle
160    pub postgres_producer: Option<PostgresProducerHandle>,
161    /// Full-text search index
162    pub search_index: Arc<SearchIndex>,
163    /// Persister routing (default + per-entity overrides)
164    pub persisters: Arc<PersisterRouter>,
165    /// Server host ID
166    pub host_id: Uuid,
167    /// Server configuration
168    config: CellServerConfig,
169    /// Postgres producer (kept alive)
170    _postgres_producer_owner: Option<CellPostgresProducer>,
171    /// Postgres consumer (kept alive)
172    postgres_consumer: Option<CellPostgresConsumer>,
173    /// Whether the server is ready to accept connections
174    ready: Arc<AtomicBool>,
175    /// Peer registry for federation (initialized after catch-up)
176    peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
177    /// Live peer clients shared with report context.
178    peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
179    /// Callback to run after init (catch-up + relations) but before WS loop
180    after_init: std::sync::Mutex<Option<AfterInitCallback>>,
181    /// Sender for local+replicated event fan-out to saga runtime.
182    saga_event_tx: flume::Sender<MEvent>,
183    /// Receiver consumed when saga runtime starts.
184    saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
185    /// Saga tasks kept alive for server lifetime.
186    saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
187    /// Server ownership death-watch guard (kept alive for server lifetime).
188    _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
189    /// Hyphae cell inspector server (kept alive for the lifetime of the server)
190    #[cfg(feature = "inspector")]
191    _inspector: hyphae::server::InspectorServer,
192}
193
194impl CellServer {
195    /// Create a new server builder.
196    pub fn builder() -> CellServerBuilder {
197        CellServerBuilder::new()
198    }
199
200    /// Create a new cell-based server.
201    pub fn new(config: CellServerConfig) -> Self {
202        let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
203        let registry = Arc::new(StoreRegistry::new());
204        let handler_registry = Arc::new(HandlerRegistry::new());
205        let relationship_manager = Arc::new(RelationshipManager::new());
206
207        // Initialize the client registry for WebSocket client message dispatch
208        init_client_registry();
209
210        let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
211        let (postgres_producer_owner, postgres_producer, postgres_consumer) =
212            if let Some(ref postgres_config) = config.postgres {
213                match CellPostgresProducer::new(postgres_config, host_id) {
214                    Ok(producer) => {
215                        let handle = producer.handle();
216                        let consumer = match CellPostgresConsumer::start(
217                            postgres_config,
218                            host_id,
219                            handler_registry.clone(),
220                            registry.clone(),
221                        ) {
222                            Ok(c) => Some(c),
223                            Err(e) => {
224                                log::error!("Failed to start Postgres consumer: {}", e);
225                                None
226                            }
227                        };
228                        (Some(producer), Some(handle), consumer)
229                    }
230                    Err(e) => {
231                        log::error!("Failed to create Postgres producer: {}", e);
232                        (None, None, None)
233                    }
234                }
235            } else {
236                (None, None, None)
237            };
238
239        // If no durable consumer, server is immediately ready
240        let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
241
242        // Initialize full-text search index
243        let search_index = Arc::new(SearchIndex::new());
244
245        // Build persister routing:
246        // - explicit default from config if provided
247        // - otherwise Postgres producer handle when available
248        // - explicit per-entity overrides always win
249        let mut persister_router = PersisterRouter::default();
250        if let Some(default_persister) = config.default_persister.clone() {
251            persister_router.set_default(Some(default_persister));
252        } else if let Some(handle) = postgres_producer.clone() {
253            persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
254        }
255        for (entity_type, persister) in &config.persister_overrides {
256            persister_router.set_override(entity_type.clone(), persister.clone());
257        }
258        let persisters = Arc::new(persister_router);
259
260        // Start the hyphae cell inspector server
261        #[cfg(feature = "inspector")]
262        let inspector = hyphae::server::start_server("myko");
263        #[cfg(feature = "inspector")]
264        log::info!("Hyphae inspector on port {}", inspector.port());
265
266        Self {
267            registry,
268            handler_registry,
269            relationship_manager,
270            postgres_producer,
271            search_index,
272            persisters,
273            host_id,
274            config,
275            _postgres_producer_owner: postgres_producer_owner,
276            postgres_consumer,
277            ready,
278            peer_registry_instance: RwLock::new(None),
279            peer_clients: Arc::new(dashmap::DashMap::new()),
280            after_init: std::sync::Mutex::new(None),
281            saga_event_tx,
282            saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
283            saga_tasks: std::sync::Mutex::new(Vec::new()),
284            _server_ownership_guard: std::sync::Mutex::new(None),
285            #[cfg(feature = "inspector")]
286            _inspector: inspector,
287        }
288    }
289
290    /// Start the peer registry for federation.
291    pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
292        let peer_config = config.or_else(|| self.config.peer_registry.clone());
293
294        if let Some(peer_config) = peer_config {
295            log::info!("Starting peer registry");
296            let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
297            *self.peer_registry_instance.write().unwrap() = Some(pr);
298        }
299    }
300
301    /// Check if peer registry is running.
302    pub fn has_peer_registry(&self) -> bool {
303        self.peer_registry_instance.read().unwrap().is_some()
304    }
305
306    /// Get the store registry.
307    pub fn registry(&self) -> Arc<StoreRegistry> {
308        self.registry.clone()
309    }
310
311    /// Get the handler registry.
312    pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
313        self.handler_registry.clone()
314    }
315
316    /// Get a server context for module use.
317    pub fn ctx(&self) -> CellServerCtx {
318        let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
319            self.config.postgres.as_ref().map(|pg| {
320                Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
321                    as Arc<dyn myko::server::HistoryReplayProvider>
322            });
323        CellServerCtx::new(
324            self.host_id,
325            self.registry.clone(),
326            self.handler_registry.clone(),
327            self.relationship_manager.clone(),
328            self.persisters.clone(),
329            self.search_index.clone(),
330            self.peer_clients.clone(),
331            Some(self.saga_event_tx.clone()),
332            history_replay,
333        )
334    }
335
336    fn start_saga_runtime(&self) {
337        let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
338        if registrations.is_empty() {
339            return;
340        }
341        let Some(rx) = self
342            .saga_event_rx
343            .lock()
344            .expect("saga_event_rx mutex poisoned")
345            .take()
346        else {
347            return;
348        };
349
350        log::info!("Starting saga runtime with {} saga(s)", registrations.len());
351
352        // NOTE(ts): One unbounded flume channel per saga, with dispatch-side filtering
353        // so sagas only receive events matching their entity type and change type.
354        struct SagaChannel {
355            tx: flume::Sender<MEvent>,
356            entity_type: &'static str,
357            change_type: myko::event::MEventType,
358        }
359        let mut saga_channels: Vec<SagaChannel> = Vec::new();
360
361        for registration in registrations {
362            let saga = (registration.create)();
363            let saga_name = saga.name().to_string();
364            let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
365            saga_channels.push(SagaChannel {
366                tx: saga_tx,
367                entity_type: registration.event_entity_type,
368                change_type: registration.event_change_type,
369            });
370            let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
371                saga_rx,
372                move |saga_rx| async move {
373                    saga_rx
374                        .recv_async()
375                        .await
376                        .ok()
377                        .map(|event| (event, saga_rx))
378                },
379            ));
380
381            let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
382                self.host_id,
383                self.registry.clone(),
384                self.saga_event_tx.clone(),
385            ));
386            let mut command_stream = saga.build_boxed(events, saga_ctx);
387
388            let host_id = self.host_id;
389            let registry = self.registry.clone();
390            let handler_registry = self.handler_registry.clone();
391            let relationship_manager = self.relationship_manager.clone();
392            let persisters = self.persisters.clone();
393            let search_index = self.search_index.clone();
394            let peer_clients = self.peer_clients.clone();
395            let saga_event_tx = self.saga_event_tx.clone();
396
397            let handle = tokio::spawn(async move {
398                while let Some(command) = command_stream.next().await {
399                    let command_name = command.command_name();
400                    log::debug!("Saga {} executing command {}", saga_name, command_name);
401                    let req = Arc::new(RequestContext::internal(
402                        Arc::from(Uuid::new_v4().to_string()),
403                        host_id,
404                        &format!("saga:{saga_name}"),
405                    ));
406
407                    let cmd_ctx = CommandContext::new(
408                        Arc::from(command_name),
409                        req,
410                        Arc::new(CellServerCtx::new(
411                            host_id,
412                            registry.clone(),
413                            handler_registry.clone(),
414                            relationship_manager.clone(),
415                            persisters.clone(),
416                            search_index.clone(),
417                            peer_clients.clone(),
418                            Some(saga_event_tx.clone()),
419                            None,
420                        )),
421                    );
422
423                    if let Err(err) = command.execute_boxed(cmd_ctx) {
424                        log::error!(
425                            "Saga {} command {} failed: {}",
426                            saga_name,
427                            command_name,
428                            err.message
429                        );
430                    }
431                }
432            });
433
434            self.saga_tasks
435                .lock()
436                .expect("saga_tasks mutex poisoned")
437                .push(handle);
438        }
439
440        // NOTE(ts): Dispatcher fans out events to saga channels, filtering by
441        // entity type and change type so each saga only receives relevant events.
442        let dispatcher = tokio::spawn(async move {
443            while let Ok(event) = rx.recv_async().await {
444                for ch in &saga_channels {
445                    if event.item_type == ch.entity_type && event.change_type == ch.change_type {
446                        let _ = ch.tx.send(event.clone());
447                    }
448                }
449            }
450        });
451        self.saga_tasks
452            .lock()
453            .expect("saga_tasks mutex poisoned")
454            .push(dispatcher);
455    }
456
457    /// Create a Postgres-backed history store for replay/windback operations.
458    pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
459        self.config
460            .postgres
461            .clone()
462            .map(PostgresHistoryStore::new)
463            .transpose()
464    }
465
466    /// Initialize Postgres replay/listener and wait for catch-up.
467    pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
468        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
469            return Err(
470                "Postgres is configured but the Postgres consumer is not running".to_string(),
471            );
472        }
473
474        if let Some(ref consumer) = self.postgres_consumer {
475            consumer.wait_until_caught_up(timeout)?;
476            self.ready.store(true, Ordering::SeqCst);
477        }
478        Ok(())
479    }
480
481    /// Establish relationship invariants.
482    pub fn establish_relations(&self) {
483        if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
484            log::error!("Failed to establish relations: {e}");
485        }
486    }
487
488    /// Check if the server is ready to accept connections.
489    pub fn is_ready(&self) -> bool {
490        if let Some(ref consumer) = self.postgres_consumer {
491            if consumer.is_caught_up() {
492                self.ready.store(true, Ordering::SeqCst);
493                return true;
494            }
495            return false;
496        }
497        true
498    }
499
500    /// Run the server with full initialization.
501    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
502        use tokio::net::TcpListener;
503
504        // Persisters can veto startup via startup healthchecks.
505        let entity_types: Vec<&str> = self
506            .handler_registry
507            .entity_types()
508            .map(|t| t.as_ref())
509            .collect();
510        self.persisters
511            .startup_healthcheck(&entity_types)
512            .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
513
514        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
515            return Err("Postgres is configured but the Postgres consumer failed to start".into());
516        }
517
518        // Wait for Postgres catch-up if configured
519        if self.postgres_consumer.is_some() {
520            log::info!("Waiting for Postgres event consumer to catch up...");
521            let timeout = std::time::Duration::from_secs(300);
522            self.init_postgres_and_wait(timeout)
523                .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
524            log::info!("Postgres caught up, ready to accept connections");
525        }
526
527        // Build search index from store data (after catch-up)
528        log::info!("Building search index...");
529        self.search_index.build_from_registry(&self.registry);
530
531        // Establish relations (cleanup orphans, ensure required entities)
532        log::info!("Establishing relations...");
533        self.establish_relations();
534
535        // Claim orphaned server-owned items and start death watch
536        log::info!("Checking server-owned item ownership...");
537        if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
538            log::error!("Failed to claim orphaned server-owned items: {}", e);
539        }
540        let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
541        *self
542            ._server_ownership_guard
543            .lock()
544            .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
545
546        // Bind WebSocket listener first so peer publication only happens once
547        // the gateway is actually available.
548        let listener = TcpListener::bind(&self.config.bind_addr).await?;
549        log::info!("CellServer listening on {}", self.config.bind_addr);
550        log::info!(
551            "WebSocket server listening on ws://{}/myko",
552            self.config.bind_addr
553        );
554
555        // Start peer registry if configured
556        if self.config.peer_registry.is_some() {
557            self.start_peer_registry(None);
558        }
559
560        // Run after_init hook (e.g., scene engine startup)
561        if let Some(hook) = self
562            .after_init
563            .lock()
564            .expect("after_init mutex poisoned")
565            .take()
566        {
567            hook(self);
568        }
569
570        self.start_saga_runtime();
571
572        log::info!("Server started");
573        self.run_ws_accept_loop(listener).await
574    }
575
576    /// Run just the WebSocket accept loop.
577    pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
578        use tokio::net::TcpListener;
579
580        let listener = TcpListener::bind(&self.config.bind_addr).await?;
581        log::info!("CellServer listening on {}", self.config.bind_addr);
582        log::info!(
583            "WebSocket server listening on ws://{}/myko",
584            self.config.bind_addr
585        );
586        self.run_ws_accept_loop(listener).await
587    }
588
589    async fn run_ws_accept_loop(
590        &self,
591        listener: tokio::net::TcpListener,
592    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
593        let ready = self.ready.clone();
594
595        loop {
596            let (stream, addr) = listener.accept().await?;
597
598            // Check if server is ready (durable backend caught up)
599            if !ready.load(Ordering::SeqCst) {
600                if self.is_ready() {
601                    log::info!("Server is now ready to accept connections");
602                } else {
603                    log::warn!(
604                        "Rejecting connection from {} - server not ready (durable backend catching up)",
605                        addr
606                    );
607                    drop(stream);
608                    continue;
609                }
610            }
611
612            log::debug!("New connection from {}", addr);
613
614            let ctx = self.ctx();
615
616            tokio::spawn(async move {
617                if let Err(e) =
618                    ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
619                {
620                    log::error!("Connection error from {}: {}", addr, e);
621                }
622            });
623        }
624    }
625}
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630
631    #[test]
632    fn test_server_creation() {
633        let config = CellServerConfig {
634            bind_addr: "127.0.0.1:0".parse().unwrap(),
635            postgres: None,
636            host_id: None,
637            peer_registry: None,
638            default_persister: None,
639            persister_overrides: HashMap::new(),
640        };
641        let server = CellServer::new(config);
642        assert!(Arc::strong_count(&server.registry) >= 1);
643    }
644
645    #[test]
646    fn test_server_with_host_id() {
647        let host_id = Uuid::new_v4();
648        let config = CellServerConfig {
649            bind_addr: "127.0.0.1:0".parse().unwrap(),
650            postgres: None,
651            host_id: Some(host_id),
652            peer_registry: None,
653            default_persister: None,
654            persister_overrides: HashMap::new(),
655        };
656        let server = CellServer::new(config);
657        assert_eq!(server.host_id, host_id);
658    }
659}