Skip to main content

myko_server/
lib.rs

1//! Myko server runtime — WebSocket, durable event backends, peer federation.
2//!
3//! This crate contains the tokio-dependent parts of the Myko server:
4//! - `CellServer` — server lifecycle (durable catch-up init, WS accept loop)
5//! - `postgres` — PostgreSQL producer/consumer (event-table + LISTEN/NOTIFY)
6//! - `ws_handler` — WebSocket connection handling
7//! - `peer_registry` — federation with other servers
8//! - `mcp` — Model Context Protocol server
9//!
10//! Tokio-free server types (CellServerCtx, HandlerRegistry, etc.) live in `myko::server`.
11
12pub mod mcp;
13pub mod peer_registry;
14pub mod postgres;
15pub mod ws_handler;
16
17// Re-export all tokio-free server types from myko
18use std::{
19    collections::HashMap,
20    net::SocketAddr,
21    sync::{
22        Arc, RwLock,
23        atomic::{AtomicBool, Ordering},
24    },
25    time::Duration,
26};
27
28use futures_util::StreamExt;
29pub use myko::server::*;
30use myko::{
31    client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
32    search::SearchIndex, store::StoreRegistry, wire::MEvent,
33};
34use uuid::Uuid;
35
36use crate::postgres::{
37    CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
38    PostgresHistoryStore, PostgresProducerHandle,
39};
40
41/// Cell-based Myko server configuration.
42#[derive(Clone)]
43pub struct CellServerConfig {
44    /// Address to bind the WebSocket server
45    pub bind_addr: SocketAddr,
46    /// Optional Postgres configuration for event persistence/distribution
47    pub postgres: Option<PostgresConfig>,
48    /// Server host ID (auto-generated if not provided)
49    pub host_id: Option<Uuid>,
50    /// Optional peer registry configuration for federation
51    pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
52    /// Default persister override
53    pub default_persister: Option<Arc<dyn Persister>>,
54    /// Per-entity persister overrides keyed by entity type name
55    pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
56}
57
58/// Builder for creating a CellServer.
59#[derive(Default)]
60pub struct CellServerBuilder {
61    bind_addr: Option<SocketAddr>,
62    host_id: Option<Uuid>,
63    postgres: Option<PostgresConfig>,
64    peer_registry: Option<peer_registry::PeerRegistryConfig>,
65    default_persister: Option<Arc<dyn Persister>>,
66    persister_overrides: HashMap<String, Arc<dyn Persister>>,
67    after_init: Option<AfterInitCallback>,
68}
69
70type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
71
72impl CellServerBuilder {
73    /// Create a new server builder.
74    pub fn new() -> Self {
75        Self::default()
76    }
77
78    /// Set the WebSocket bind address.
79    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80        self.bind_addr = Some(addr);
81        self
82    }
83
84    /// Set the server host ID (auto-generated if not set).
85    pub fn with_host_id(mut self, id: Uuid) -> Self {
86        self.host_id = Some(id);
87        self
88    }
89
90    /// Configure Postgres for event persistence/distribution.
91    pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
92        self.postgres = Some(config);
93        self
94    }
95
96    /// Configure peer registry for federation.
97    pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
98        self.peer_registry = Some(config);
99        self
100    }
101
102    /// Set the default persister used for all entity types without explicit overrides.
103    pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
104        self.default_persister = Some(persister);
105        self
106    }
107
108    /// Override persister for a specific entity type (e.g. "Pulse").
109    pub fn with_persister_override(
110        mut self,
111        entity_type: impl Into<String>,
112        persister: Arc<dyn Persister>,
113    ) -> Self {
114        self.persister_overrides
115            .insert(entity_type.into(), persister);
116        self
117    }
118
119    /// Register a callback to run after initialization and relation establishment,
120    /// but before the WebSocket accept loop starts. Use this for starting subsystems
121    /// that need entity data (e.g., scene engine).
122    pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
123        self.after_init = Some(Box::new(f));
124        self
125    }
126
127    /// Build the server.
128    pub fn build(self) -> CellServer {
129        let bind_addr = self
130            .bind_addr
131            .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
132
133        let mut server = CellServer::new(CellServerConfig {
134            bind_addr,
135            postgres: self.postgres,
136            host_id: self.host_id,
137            peer_registry: self.peer_registry,
138            default_persister: self.default_persister,
139            persister_overrides: self.persister_overrides,
140        });
141        server.after_init = std::sync::Mutex::new(self.after_init);
142        server
143    }
144}
145
146/// Cell-based Myko server.
147///
148/// Uses hyphae cells for reactive queries and reports instead of actors.
149pub struct CellServer {
150    /// Central entity store registry
151    pub registry: Arc<StoreRegistry>,
152    /// Handler registry for items, queries, and reports
153    pub handler_registry: Arc<HandlerRegistry>,
154    /// Relationship manager for cascade operations
155    pub relationship_manager: Arc<RelationshipManager>,
156    /// Optional Postgres producer handle
157    pub postgres_producer: Option<PostgresProducerHandle>,
158    /// Full-text search index
159    pub search_index: Arc<SearchIndex>,
160    /// Persister routing (default + per-entity overrides)
161    pub persisters: Arc<PersisterRouter>,
162    /// Server host ID
163    pub host_id: Uuid,
164    /// Server configuration
165    config: CellServerConfig,
166    /// Postgres producer (kept alive)
167    _postgres_producer_owner: Option<CellPostgresProducer>,
168    /// Postgres consumer (kept alive)
169    postgres_consumer: Option<CellPostgresConsumer>,
170    /// Whether the server is ready to accept connections
171    ready: Arc<AtomicBool>,
172    /// Peer registry for federation (initialized after catch-up)
173    peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
174    /// Live peer clients shared with report context.
175    peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
176    /// Callback to run after init (catch-up + relations) but before WS loop
177    after_init: std::sync::Mutex<Option<AfterInitCallback>>,
178    /// Sender for local+replicated event fan-out to saga runtime.
179    saga_event_tx: flume::Sender<MEvent>,
180    /// Receiver consumed when saga runtime starts.
181    saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
182    /// Saga tasks kept alive for server lifetime.
183    saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
184    /// Hyphae cell inspector server (kept alive for the lifetime of the server)
185    #[cfg(feature = "inspector")]
186    _inspector: hyphae::server::InspectorServer,
187}
188
189impl CellServer {
190    /// Create a new server builder.
191    pub fn builder() -> CellServerBuilder {
192        CellServerBuilder::new()
193    }
194
195    /// Create a new cell-based server.
196    pub fn new(config: CellServerConfig) -> Self {
197        let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
198        let registry = Arc::new(StoreRegistry::new());
199        let handler_registry = Arc::new(HandlerRegistry::new());
200        let relationship_manager = Arc::new(RelationshipManager::new());
201
202        // Initialize the client registry for WebSocket client message dispatch
203        init_client_registry();
204
205        let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
206        let (postgres_producer_owner, postgres_producer, postgres_consumer) =
207            if let Some(ref postgres_config) = config.postgres {
208                match CellPostgresProducer::new(postgres_config, host_id) {
209                    Ok(producer) => {
210                        let handle = producer.handle();
211                        let consumer = match CellPostgresConsumer::start(
212                            postgres_config,
213                            host_id,
214                            handler_registry.clone(),
215                            registry.clone(),
216                        ) {
217                            Ok(c) => Some(c),
218                            Err(e) => {
219                                log::error!("Failed to start Postgres consumer: {}", e);
220                                None
221                            }
222                        };
223                        (Some(producer), Some(handle), consumer)
224                    }
225                    Err(e) => {
226                        log::error!("Failed to create Postgres producer: {}", e);
227                        (None, None, None)
228                    }
229                }
230            } else {
231                (None, None, None)
232            };
233
234        // If no durable consumer, server is immediately ready
235        let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
236
237        // Initialize full-text search index
238        let search_index = Arc::new(SearchIndex::new());
239
240        // Build persister routing:
241        // - explicit default from config if provided
242        // - otherwise Postgres producer handle when available
243        // - explicit per-entity overrides always win
244        let mut persister_router = PersisterRouter::default();
245        if let Some(default_persister) = config.default_persister.clone() {
246            persister_router.set_default(Some(default_persister));
247        } else if let Some(handle) = postgres_producer.clone() {
248            persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
249        }
250        for (entity_type, persister) in &config.persister_overrides {
251            persister_router.set_override(entity_type.clone(), persister.clone());
252        }
253        let persisters = Arc::new(persister_router);
254
255        // Start the hyphae cell inspector server
256        #[cfg(feature = "inspector")]
257        let inspector = hyphae::server::start_server("myko");
258        #[cfg(feature = "inspector")]
259        log::info!("Hyphae inspector on port {}", inspector.port());
260
261        Self {
262            registry,
263            handler_registry,
264            relationship_manager,
265            postgres_producer,
266            search_index,
267            persisters,
268            host_id,
269            config,
270            _postgres_producer_owner: postgres_producer_owner,
271            postgres_consumer,
272            ready,
273            peer_registry_instance: RwLock::new(None),
274            peer_clients: Arc::new(dashmap::DashMap::new()),
275            after_init: std::sync::Mutex::new(None),
276            saga_event_tx,
277            saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
278            saga_tasks: std::sync::Mutex::new(Vec::new()),
279            #[cfg(feature = "inspector")]
280            _inspector: inspector,
281        }
282    }
283
284    /// Start the peer registry for federation.
285    pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
286        let peer_config = config.or_else(|| self.config.peer_registry.clone());
287
288        if let Some(peer_config) = peer_config {
289            log::info!("Starting peer registry");
290            let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
291            *self.peer_registry_instance.write().unwrap() = Some(pr);
292        }
293    }
294
295    /// Check if peer registry is running.
296    pub fn has_peer_registry(&self) -> bool {
297        self.peer_registry_instance.read().unwrap().is_some()
298    }
299
300    /// Get the store registry.
301    pub fn registry(&self) -> Arc<StoreRegistry> {
302        self.registry.clone()
303    }
304
305    /// Get the handler registry.
306    pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
307        self.handler_registry.clone()
308    }
309
310    /// Get a server context for module use.
311    pub fn ctx(&self) -> CellServerCtx {
312        let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
313            self.config.postgres.as_ref().map(|pg| {
314                Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
315                    as Arc<dyn myko::server::HistoryReplayProvider>
316            });
317        CellServerCtx::new(
318            self.host_id,
319            self.registry.clone(),
320            self.handler_registry.clone(),
321            self.relationship_manager.clone(),
322            self.persisters.clone(),
323            self.search_index.clone(),
324            self.peer_clients.clone(),
325            Some(self.saga_event_tx.clone()),
326            history_replay,
327        )
328    }
329
330    fn start_saga_runtime(&self) {
331        let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
332        if registrations.is_empty() {
333            return;
334        }
335        let Some(rx) = self
336            .saga_event_rx
337            .lock()
338            .expect("saga_event_rx mutex poisoned")
339            .take()
340        else {
341            return;
342        };
343
344        log::info!("Starting saga runtime with {} saga(s)", registrations.len());
345
346        // NOTE(ts): One unbounded flume channel per saga, with dispatch-side filtering
347        // so sagas only receive events matching their entity type and change type.
348        struct SagaChannel {
349            tx: flume::Sender<MEvent>,
350            entity_type: &'static str,
351            change_type: myko::event::MEventType,
352        }
353        let mut saga_channels: Vec<SagaChannel> = Vec::new();
354
355        for registration in registrations {
356            let saga = (registration.create)();
357            let saga_name = saga.name().to_string();
358            let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
359            saga_channels.push(SagaChannel {
360                tx: saga_tx,
361                entity_type: registration.event_entity_type,
362                change_type: registration.event_change_type,
363            });
364            let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
365                saga_rx,
366                move |saga_rx| async move {
367                    saga_rx
368                        .recv_async()
369                        .await
370                        .ok()
371                        .map(|event| (event, saga_rx))
372                },
373            ));
374
375            let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
376                self.host_id,
377                self.registry.clone(),
378                self.saga_event_tx.clone(),
379            ));
380            let mut command_stream = saga.build_boxed(events, saga_ctx);
381
382            let host_id = self.host_id;
383            let registry = self.registry.clone();
384            let handler_registry = self.handler_registry.clone();
385            let relationship_manager = self.relationship_manager.clone();
386            let persisters = self.persisters.clone();
387            let search_index = self.search_index.clone();
388            let peer_clients = self.peer_clients.clone();
389            let saga_event_tx = self.saga_event_tx.clone();
390
391            let handle = tokio::spawn(async move {
392                while let Some(command) = command_stream.next().await {
393                    let command_name = command.command_name();
394                    log::debug!("Saga {} executing command {}", saga_name, command_name);
395                    let req = Arc::new(RequestContext::internal(
396                        Arc::from(Uuid::new_v4().to_string()),
397                        host_id,
398                        &format!("saga:{saga_name}"),
399                    ));
400
401                    let cmd_ctx = CommandContext::new(
402                        Arc::from(command_name),
403                        req,
404                        Arc::new(CellServerCtx::new(
405                            host_id,
406                            registry.clone(),
407                            handler_registry.clone(),
408                            relationship_manager.clone(),
409                            persisters.clone(),
410                            search_index.clone(),
411                            peer_clients.clone(),
412                            Some(saga_event_tx.clone()),
413                            None,
414                        )),
415                    );
416
417                    if let Err(err) = command.execute_boxed(cmd_ctx) {
418                        log::error!(
419                            "Saga {} command {} failed: {}",
420                            saga_name,
421                            command_name,
422                            err.message
423                        );
424                    }
425                }
426            });
427
428            self.saga_tasks
429                .lock()
430                .expect("saga_tasks mutex poisoned")
431                .push(handle);
432        }
433
434        // NOTE(ts): Dispatcher fans out events to saga channels, filtering by
435        // entity type and change type so each saga only receives relevant events.
436        let dispatcher = tokio::spawn(async move {
437            while let Ok(event) = rx.recv_async().await {
438                for ch in &saga_channels {
439                    if event.item_type == ch.entity_type && event.change_type == ch.change_type {
440                        let _ = ch.tx.send(event.clone());
441                    }
442                }
443            }
444        });
445        self.saga_tasks
446            .lock()
447            .expect("saga_tasks mutex poisoned")
448            .push(dispatcher);
449    }
450
451    /// Create a Postgres-backed history store for replay/windback operations.
452    pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
453        self.config
454            .postgres
455            .clone()
456            .map(PostgresHistoryStore::new)
457            .transpose()
458    }
459
460    /// Initialize Postgres replay/listener and wait for catch-up.
461    pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
462        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
463            return Err(
464                "Postgres is configured but the Postgres consumer is not running".to_string(),
465            );
466        }
467
468        if let Some(ref consumer) = self.postgres_consumer {
469            consumer.wait_until_caught_up(timeout)?;
470            self.ready.store(true, Ordering::SeqCst);
471        }
472        Ok(())
473    }
474
475    /// Establish relationship invariants.
476    pub fn establish_relations(&self) {
477        if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
478            log::error!("Failed to establish relations: {e}");
479        }
480    }
481
482    /// Check if the server is ready to accept connections.
483    pub fn is_ready(&self) -> bool {
484        if let Some(ref consumer) = self.postgres_consumer {
485            if consumer.is_caught_up() {
486                self.ready.store(true, Ordering::SeqCst);
487                return true;
488            }
489            return false;
490        }
491        true
492    }
493
494    /// Run the server with full initialization.
495    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
496        use tokio::net::TcpListener;
497
498        // Persisters can veto startup via startup healthchecks.
499        let entity_types: Vec<&str> = self
500            .handler_registry
501            .entity_types()
502            .map(|t| t.as_ref())
503            .collect();
504        self.persisters
505            .startup_healthcheck(&entity_types)
506            .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
507
508        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
509            return Err("Postgres is configured but the Postgres consumer failed to start".into());
510        }
511
512        // Wait for Postgres catch-up if configured
513        if self.postgres_consumer.is_some() {
514            log::info!("Waiting for Postgres event consumer to catch up...");
515            let timeout = std::time::Duration::from_secs(300);
516            self.init_postgres_and_wait(timeout)
517                .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
518            log::info!("Postgres caught up, ready to accept connections");
519        }
520
521        // Build search index from store data (after catch-up)
522        log::info!("Building search index...");
523        self.search_index.build_from_registry(&self.registry);
524
525        // Establish relations (cleanup orphans, ensure required entities)
526        log::info!("Establishing relations...");
527        self.establish_relations();
528
529        // Bind WebSocket listener first so peer publication only happens once
530        // the gateway is actually available.
531        let listener = TcpListener::bind(&self.config.bind_addr).await?;
532        log::info!("CellServer listening on {}", self.config.bind_addr);
533        log::info!(
534            "WebSocket server listening on ws://{}/myko",
535            self.config.bind_addr
536        );
537
538        // Start peer registry if configured
539        if self.config.peer_registry.is_some() {
540            self.start_peer_registry(None);
541        }
542
543        // Run after_init hook (e.g., scene engine startup)
544        if let Some(hook) = self
545            .after_init
546            .lock()
547            .expect("after_init mutex poisoned")
548            .take()
549        {
550            hook(self);
551        }
552
553        self.start_saga_runtime();
554
555        log::info!("Server started");
556        self.run_ws_accept_loop(listener).await
557    }
558
559    /// Run just the WebSocket accept loop.
560    pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
561        use tokio::net::TcpListener;
562
563        let listener = TcpListener::bind(&self.config.bind_addr).await?;
564        log::info!("CellServer listening on {}", self.config.bind_addr);
565        log::info!(
566            "WebSocket server listening on ws://{}/myko",
567            self.config.bind_addr
568        );
569        self.run_ws_accept_loop(listener).await
570    }
571
572    async fn run_ws_accept_loop(
573        &self,
574        listener: tokio::net::TcpListener,
575    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
576        let ready = self.ready.clone();
577
578        loop {
579            let (stream, addr) = listener.accept().await?;
580
581            // Check if server is ready (durable backend caught up)
582            if !ready.load(Ordering::SeqCst) {
583                if self.is_ready() {
584                    log::info!("Server is now ready to accept connections");
585                } else {
586                    log::warn!(
587                        "Rejecting connection from {} - server not ready (durable backend catching up)",
588                        addr
589                    );
590                    drop(stream);
591                    continue;
592                }
593            }
594
595            log::debug!("New connection from {}", addr);
596
597            let ctx = self.ctx();
598
599            tokio::spawn(async move {
600                if let Err(e) =
601                    ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
602                {
603                    log::error!("Connection error from {}: {}", addr, e);
604                }
605            });
606        }
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613
614    #[test]
615    fn test_server_creation() {
616        let config = CellServerConfig {
617            bind_addr: "127.0.0.1:0".parse().unwrap(),
618            postgres: None,
619            host_id: None,
620            peer_registry: None,
621            default_persister: None,
622            persister_overrides: HashMap::new(),
623        };
624        let server = CellServer::new(config);
625        assert!(Arc::strong_count(&server.registry) >= 1);
626    }
627
628    #[test]
629    fn test_server_with_host_id() {
630        let host_id = Uuid::new_v4();
631        let config = CellServerConfig {
632            bind_addr: "127.0.0.1:0".parse().unwrap(),
633            postgres: None,
634            host_id: Some(host_id),
635            peer_registry: None,
636            default_persister: None,
637            persister_overrides: HashMap::new(),
638        };
639        let server = CellServer::new(config);
640        assert_eq!(server.host_id, host_id);
641    }
642}