Skip to main content

myko_server/
lib.rs

1//! Myko server runtime — WebSocket, durable event backends, peer federation.
2//!
3//! This crate contains the tokio-dependent parts of the Myko server:
4//! - `CellServer` — server lifecycle (durable catch-up init, WS accept loop)
5//! - `postgres` — PostgreSQL producer/consumer (event-table + LISTEN/NOTIFY)
6//! - `ws_handler` — WebSocket connection handling
7//! - `peer_registry` — federation with other servers
8//! - `mcp` — Model Context Protocol server
9//!
10//! Tokio-free server types (CellServerCtx, HandlerRegistry, etc.) live in `myko::server`.
11
12pub mod mcp;
13pub mod peer_registry;
14pub mod postgres;
15pub mod ws_handler;
16
17// Re-export all tokio-free server types from myko
18use std::{
19    collections::HashMap,
20    net::SocketAddr,
21    sync::{
22        Arc, RwLock,
23        atomic::{AtomicBool, Ordering},
24    },
25    time::Duration,
26};
27
28use futures_util::StreamExt;
29pub use myko::server::*;
30use myko::{
31    client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
32    search::SearchIndex, store::StoreRegistry, wire::MEvent,
33};
34use uuid::Uuid;
35
36use crate::postgres::{
37    CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
38    PostgresHistoryStore, PostgresProducerHandle,
39};
40
41/// Cell-based Myko server configuration.
42#[derive(Clone)]
43pub struct CellServerConfig {
44    /// Address to bind the WebSocket server
45    pub bind_addr: SocketAddr,
46    /// Optional Postgres configuration for event persistence/distribution
47    pub postgres: Option<PostgresConfig>,
48    /// Server host ID (auto-generated if not provided)
49    pub host_id: Option<Uuid>,
50    /// Optional peer registry configuration for federation
51    pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
52    /// Default persister override
53    pub default_persister: Option<Arc<dyn Persister>>,
54    /// Per-entity persister overrides keyed by entity type name
55    pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
56}
57
58/// Builder for creating a CellServer.
59#[derive(Default)]
60pub struct CellServerBuilder {
61    bind_addr: Option<SocketAddr>,
62    host_id: Option<Uuid>,
63    postgres: Option<PostgresConfig>,
64    peer_registry: Option<peer_registry::PeerRegistryConfig>,
65    default_persister: Option<Arc<dyn Persister>>,
66    persister_overrides: HashMap<String, Arc<dyn Persister>>,
67    after_init: Option<AfterInitCallback>,
68}
69
70type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
71
72impl CellServerBuilder {
73    /// Create a new server builder.
74    pub fn new() -> Self {
75        Self::default()
76    }
77
78    /// Set the WebSocket bind address.
79    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80        self.bind_addr = Some(addr);
81        self
82    }
83
84    /// Set the server host ID (auto-generated if not set).
85    pub fn with_host_id(mut self, id: Uuid) -> Self {
86        self.host_id = Some(id);
87        self
88    }
89
90    /// Configure Postgres for event persistence/distribution.
91    pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
92        self.postgres = Some(config);
93        self
94    }
95
96    /// Configure peer registry for federation.
97    pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
98        self.peer_registry = Some(config);
99        self
100    }
101
102    /// Set the default persister used for all entity types without explicit overrides.
103    pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
104        self.default_persister = Some(persister);
105        self
106    }
107
108    /// Override persister for a specific entity type (e.g. "Pulse").
109    pub fn with_persister_override(
110        mut self,
111        entity_type: impl Into<String>,
112        persister: Arc<dyn Persister>,
113    ) -> Self {
114        self.persister_overrides
115            .insert(entity_type.into(), persister);
116        self
117    }
118
119    /// Register a callback to run after initialization and relation establishment,
120    /// but before the WebSocket accept loop starts. Use this for starting subsystems
121    /// that need entity data (e.g., scene engine).
122    pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
123        self.after_init = Some(Box::new(f));
124        self
125    }
126
127    /// Build the server.
128    pub fn build(self) -> CellServer {
129        let bind_addr = self
130            .bind_addr
131            .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
132
133        let mut server = CellServer::new(CellServerConfig {
134            bind_addr,
135            postgres: self.postgres,
136            host_id: self.host_id,
137            peer_registry: self.peer_registry,
138            default_persister: self.default_persister,
139            persister_overrides: self.persister_overrides,
140        });
141        server.after_init = std::sync::Mutex::new(self.after_init);
142        server
143    }
144}
145
146/// Cell-based Myko server.
147///
148/// Uses hyphae cells for reactive queries and reports instead of actors.
149pub struct CellServer {
150    /// Central entity store registry
151    pub registry: Arc<StoreRegistry>,
152    /// Handler registry for items, queries, and reports
153    pub handler_registry: Arc<HandlerRegistry>,
154    /// Relationship manager for cascade operations
155    pub relationship_manager: Arc<RelationshipManager>,
156    /// Optional Postgres producer handle
157    pub postgres_producer: Option<PostgresProducerHandle>,
158    /// Full-text search index
159    pub search_index: Arc<SearchIndex>,
160    /// Persister routing (default + per-entity overrides)
161    pub persisters: Arc<PersisterRouter>,
162    /// Server host ID
163    pub host_id: Uuid,
164    /// Server configuration
165    config: CellServerConfig,
166    /// Postgres producer (kept alive)
167    _postgres_producer_owner: Option<CellPostgresProducer>,
168    /// Postgres consumer (kept alive)
169    postgres_consumer: Option<CellPostgresConsumer>,
170    /// Whether the server is ready to accept connections
171    ready: Arc<AtomicBool>,
172    /// Peer registry for federation (initialized after catch-up)
173    peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
174    /// Live peer clients shared with report context.
175    peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
176    /// Callback to run after init (catch-up + relations) but before WS loop
177    after_init: std::sync::Mutex<Option<AfterInitCallback>>,
178    /// Sender for local+replicated event fan-out to saga runtime.
179    saga_event_tx: flume::Sender<MEvent>,
180    /// Receiver consumed when saga runtime starts.
181    saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
182    /// Saga tasks kept alive for server lifetime.
183    saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
184    /// Hyphae cell inspector server (kept alive for the lifetime of the server)
185    #[cfg(feature = "inspector")]
186    _inspector: hyphae::server::InspectorServer,
187}
188
189impl CellServer {
190    /// Create a new server builder.
191    pub fn builder() -> CellServerBuilder {
192        CellServerBuilder::new()
193    }
194
195    /// Create a new cell-based server.
196    pub fn new(config: CellServerConfig) -> Self {
197        let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
198        let registry = Arc::new(StoreRegistry::new());
199        let handler_registry = Arc::new(HandlerRegistry::new());
200        let relationship_manager = Arc::new(RelationshipManager::new());
201
202        // Initialize the client registry for WebSocket client message dispatch
203        init_client_registry();
204
205        let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
206        let (postgres_producer_owner, postgres_producer, postgres_consumer) =
207            if let Some(ref postgres_config) = config.postgres {
208                match CellPostgresProducer::new(postgres_config, host_id) {
209                    Ok(producer) => {
210                        let handle = producer.handle();
211                        let consumer = match CellPostgresConsumer::start(
212                            postgres_config,
213                            host_id,
214                            handler_registry.clone(),
215                            registry.clone(),
216                        ) {
217                            Ok(c) => Some(c),
218                            Err(e) => {
219                                log::error!("Failed to start Postgres consumer: {}", e);
220                                None
221                            }
222                        };
223                        (Some(producer), Some(handle), consumer)
224                    }
225                    Err(e) => {
226                        log::error!("Failed to create Postgres producer: {}", e);
227                        (None, None, None)
228                    }
229                }
230            } else {
231                (None, None, None)
232            };
233
234        // If no durable consumer, server is immediately ready
235        let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
236
237        // Initialize full-text search index
238        let search_index = Arc::new(SearchIndex::new());
239
240        // Build persister routing:
241        // - explicit default from config if provided
242        // - otherwise Postgres producer handle when available
243        // - explicit per-entity overrides always win
244        let mut persister_router = PersisterRouter::default();
245        if let Some(default_persister) = config.default_persister.clone() {
246            persister_router.set_default(Some(default_persister));
247        } else if let Some(handle) = postgres_producer.clone() {
248            persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
249        }
250        for (entity_type, persister) in &config.persister_overrides {
251            persister_router.set_override(entity_type.clone(), persister.clone());
252        }
253        let persisters = Arc::new(persister_router);
254
255        // Start the hyphae cell inspector server
256        #[cfg(feature = "inspector")]
257        let inspector = hyphae::server::start_server("myko");
258        #[cfg(feature = "inspector")]
259        log::info!("Hyphae inspector on port {}", inspector.port());
260
261        Self {
262            registry,
263            handler_registry,
264            relationship_manager,
265            postgres_producer,
266            search_index,
267            persisters,
268            host_id,
269            config,
270            _postgres_producer_owner: postgres_producer_owner,
271            postgres_consumer,
272            ready,
273            peer_registry_instance: RwLock::new(None),
274            peer_clients: Arc::new(dashmap::DashMap::new()),
275            after_init: std::sync::Mutex::new(None),
276            saga_event_tx,
277            saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
278            saga_tasks: std::sync::Mutex::new(Vec::new()),
279            #[cfg(feature = "inspector")]
280            _inspector: inspector,
281        }
282    }
283
284    /// Start the peer registry for federation.
285    pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
286        let peer_config = config.or_else(|| self.config.peer_registry.clone());
287
288        if let Some(peer_config) = peer_config {
289            log::info!("Starting peer registry");
290            let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
291            *self.peer_registry_instance.write().unwrap() = Some(pr);
292        }
293    }
294
295    /// Check if peer registry is running.
296    pub fn has_peer_registry(&self) -> bool {
297        self.peer_registry_instance.read().unwrap().is_some()
298    }
299
300    /// Get the store registry.
301    pub fn registry(&self) -> Arc<StoreRegistry> {
302        self.registry.clone()
303    }
304
305    /// Get the handler registry.
306    pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
307        self.handler_registry.clone()
308    }
309
310    /// Get a server context for module use.
311    pub fn ctx(&self) -> CellServerCtx {
312        let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
313            self.config.postgres.as_ref().map(|pg| {
314                Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
315                    as Arc<dyn myko::server::HistoryReplayProvider>
316            });
317        CellServerCtx::new(
318            self.host_id,
319            self.registry.clone(),
320            self.handler_registry.clone(),
321            self.relationship_manager.clone(),
322            self.persisters.clone(),
323            self.search_index.clone(),
324            self.peer_clients.clone(),
325            Some(self.saga_event_tx.clone()),
326            history_replay,
327        )
328    }
329
330    fn start_saga_runtime(&self) {
331        let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
332        if registrations.is_empty() {
333            return;
334        }
335        let Some(rx) = self
336            .saga_event_rx
337            .lock()
338            .expect("saga_event_rx mutex poisoned")
339            .take()
340        else {
341            return;
342        };
343
344        log::info!("Starting saga runtime with {} saga(s)", registrations.len());
345        let (event_tx, _) = tokio::sync::broadcast::channel::<MEvent>(8192);
346        let event_tx_dispatch = event_tx.clone();
347
348        let dispatcher = tokio::spawn(async move {
349            while let Ok(event) = rx.recv_async().await {
350                let _ = event_tx_dispatch.send(event);
351            }
352        });
353        self.saga_tasks
354            .lock()
355            .expect("saga_tasks mutex poisoned")
356            .push(dispatcher);
357
358        for registration in registrations {
359            let saga = (registration.create)();
360            let saga_name = saga.name().to_string();
361            let saga_name_for_stream = saga_name.clone();
362            let event_rx = event_tx.subscribe();
363            let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
364                event_rx,
365                move |mut event_rx| {
366                    let saga_name_for_stream = saga_name_for_stream.clone();
367                    async move {
368                        loop {
369                            match event_rx.recv().await {
370                                Ok(event) => return Some((event, event_rx)),
371                                Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
372                                    log::warn!(
373                                        "Saga {} lagged; skipped {} events",
374                                        saga_name_for_stream,
375                                        skipped
376                                    );
377                                    continue;
378                                }
379                                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
380                                    return None;
381                                }
382                            }
383                        }
384                    }
385                },
386            ));
387
388            let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
389                self.host_id,
390                self.registry.clone(),
391                self.saga_event_tx.clone(),
392            ));
393            let mut command_stream = saga.build_boxed(events, saga_ctx);
394
395            let host_id = self.host_id;
396            let registry = self.registry.clone();
397            let handler_registry = self.handler_registry.clone();
398            let relationship_manager = self.relationship_manager.clone();
399            let persisters = self.persisters.clone();
400            let search_index = self.search_index.clone();
401            let peer_clients = self.peer_clients.clone();
402            let saga_event_tx = self.saga_event_tx.clone();
403
404            let handle = tokio::spawn(async move {
405                while let Some(command) = command_stream.next().await {
406                    let command_name = command.command_name();
407                    log::debug!("Saga {} executing command {}", saga_name, command_name);
408                    let req = Arc::new(RequestContext::internal(
409                        Arc::from(Uuid::new_v4().to_string()),
410                        host_id,
411                        &format!("saga:{saga_name}"),
412                    ));
413
414                    let cmd_ctx = CommandContext::new(
415                        Arc::from(command_name),
416                        req,
417                        Arc::new(CellServerCtx::new(
418                            host_id,
419                            registry.clone(),
420                            handler_registry.clone(),
421                            relationship_manager.clone(),
422                            persisters.clone(),
423                            search_index.clone(),
424                            peer_clients.clone(),
425                            Some(saga_event_tx.clone()),
426                            None,
427                        )),
428                    );
429
430                    if let Err(err) = command.execute_boxed(cmd_ctx) {
431                        log::error!(
432                            "Saga {} command {} failed: {}",
433                            saga_name,
434                            command_name,
435                            err.message
436                        );
437                    }
438                }
439            });
440
441            self.saga_tasks
442                .lock()
443                .expect("saga_tasks mutex poisoned")
444                .push(handle);
445        }
446    }
447
448    /// Create a Postgres-backed history store for replay/windback operations.
449    pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
450        self.config
451            .postgres
452            .clone()
453            .map(PostgresHistoryStore::new)
454            .transpose()
455    }
456
457    /// Initialize Postgres replay/listener and wait for catch-up.
458    pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
459        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
460            return Err(
461                "Postgres is configured but the Postgres consumer is not running".to_string(),
462            );
463        }
464
465        if let Some(ref consumer) = self.postgres_consumer {
466            consumer.wait_until_caught_up(timeout)?;
467            self.ready.store(true, Ordering::SeqCst);
468        }
469        Ok(())
470    }
471
472    /// Establish relationship invariants.
473    pub fn establish_relations(&self) {
474        if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
475            log::error!("Failed to establish relations: {e}");
476        }
477    }
478
479    /// Check if the server is ready to accept connections.
480    pub fn is_ready(&self) -> bool {
481        if let Some(ref consumer) = self.postgres_consumer {
482            if consumer.is_caught_up() {
483                self.ready.store(true, Ordering::SeqCst);
484                return true;
485            }
486            return false;
487        }
488        true
489    }
490
491    /// Run the server with full initialization.
492    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
493        use tokio::net::TcpListener;
494
495        // Persisters can veto startup via startup healthchecks.
496        let entity_types: Vec<&str> = self
497            .handler_registry
498            .entity_types()
499            .map(|t| t.as_ref())
500            .collect();
501        self.persisters
502            .startup_healthcheck(&entity_types)
503            .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
504
505        if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
506            return Err("Postgres is configured but the Postgres consumer failed to start".into());
507        }
508
509        // Wait for Postgres catch-up if configured
510        if self.postgres_consumer.is_some() {
511            log::info!("Waiting for Postgres event consumer to catch up...");
512            let timeout = std::time::Duration::from_secs(300);
513            self.init_postgres_and_wait(timeout)
514                .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
515            log::info!("Postgres caught up, ready to accept connections");
516        }
517
518        // Build search index from store data (after catch-up)
519        log::info!("Building search index...");
520        self.search_index.build_from_registry(&self.registry);
521
522        // Establish relations (cleanup orphans, ensure required entities)
523        log::info!("Establishing relations...");
524        self.establish_relations();
525
526        // Bind WebSocket listener first so peer publication only happens once
527        // the gateway is actually available.
528        let listener = TcpListener::bind(&self.config.bind_addr).await?;
529        log::info!("CellServer listening on {}", self.config.bind_addr);
530        log::info!(
531            "WebSocket server listening on ws://{}/myko",
532            self.config.bind_addr
533        );
534
535        // Start peer registry if configured
536        if self.config.peer_registry.is_some() {
537            self.start_peer_registry(None);
538        }
539
540        // Run after_init hook (e.g., scene engine startup)
541        if let Some(hook) = self
542            .after_init
543            .lock()
544            .expect("after_init mutex poisoned")
545            .take()
546        {
547            hook(self);
548        }
549
550        self.start_saga_runtime();
551
552        log::info!("Server started");
553        self.run_ws_accept_loop(listener).await
554    }
555
556    /// Run just the WebSocket accept loop.
557    pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
558        use tokio::net::TcpListener;
559
560        let listener = TcpListener::bind(&self.config.bind_addr).await?;
561        log::info!("CellServer listening on {}", self.config.bind_addr);
562        log::info!(
563            "WebSocket server listening on ws://{}/myko",
564            self.config.bind_addr
565        );
566        self.run_ws_accept_loop(listener).await
567    }
568
569    async fn run_ws_accept_loop(
570        &self,
571        listener: tokio::net::TcpListener,
572    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
573        let ready = self.ready.clone();
574
575        loop {
576            let (stream, addr) = listener.accept().await?;
577
578            // Check if server is ready (durable backend caught up)
579            if !ready.load(Ordering::SeqCst) {
580                if self.is_ready() {
581                    log::info!("Server is now ready to accept connections");
582                } else {
583                    log::warn!(
584                        "Rejecting connection from {} - server not ready (durable backend catching up)",
585                        addr
586                    );
587                    drop(stream);
588                    continue;
589                }
590            }
591
592            log::debug!("New connection from {}", addr);
593
594            let ctx = self.ctx();
595
596            tokio::spawn(async move {
597                if let Err(e) =
598                    ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
599                {
600                    log::error!("Connection error from {}: {}", addr, e);
601                }
602            });
603        }
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610
611    #[test]
612    fn test_server_creation() {
613        let config = CellServerConfig {
614            bind_addr: "127.0.0.1:0".parse().unwrap(),
615            postgres: None,
616            host_id: None,
617            peer_registry: None,
618            default_persister: None,
619            persister_overrides: HashMap::new(),
620        };
621        let server = CellServer::new(config);
622        assert!(Arc::strong_count(&server.registry) >= 1);
623    }
624
625    #[test]
626    fn test_server_with_host_id() {
627        let host_id = Uuid::new_v4();
628        let config = CellServerConfig {
629            bind_addr: "127.0.0.1:0".parse().unwrap(),
630            postgres: None,
631            host_id: Some(host_id),
632            peer_registry: None,
633            default_persister: None,
634            persister_overrides: HashMap::new(),
635        };
636        let server = CellServer::new(config);
637        assert_eq!(server.host_id, host_id);
638    }
639}