1pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod router;
17pub mod server_ownership;
18pub mod ws_handler;
19pub mod ws_timing;
20
21use std::{
23 collections::HashMap,
24 net::SocketAddr,
25 sync::{
26 Arc, RwLock,
27 atomic::{AtomicBool, Ordering},
28 },
29 time::Duration,
30};
31
32use futures_util::StreamExt;
33pub use myko::server::*;
34use myko::{
35 client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
36 search::SearchIndex, store::StoreRegistry, wire::MEvent,
37};
38pub use peer_persister::PeerPersister;
39pub use server_ownership::ServerOwnershipManager;
40use uuid::Uuid;
41
42use crate::postgres::{
43 CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
44 PostgresHistoryStore, PostgresProducerHandle,
45};
46
47#[derive(Clone)]
49pub struct CellServerConfig {
50 pub bind_addr: SocketAddr,
52 pub postgres: Option<PostgresConfig>,
54 pub host_id: Option<Uuid>,
56 pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
58 pub default_persister: Option<Arc<dyn Persister>>,
60 pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
62 pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
66}
67
68#[derive(Default)]
70pub struct CellServerBuilder {
71 bind_addr: Option<SocketAddr>,
72 host_id: Option<Uuid>,
73 postgres: Option<PostgresConfig>,
74 peer_registry: Option<peer_registry::PeerRegistryConfig>,
75 default_persister: Option<Arc<dyn Persister>>,
76 persister_overrides: HashMap<String, Arc<dyn Persister>>,
77 peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
81 after_init: Option<AfterInitCallback>,
82 server_info: Option<mcp::dispatch::ServerInfo>,
86}
87
88type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
89
90impl CellServerBuilder {
91 pub fn new() -> Self {
93 Self::default()
94 }
95
96 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
98 self.bind_addr = Some(addr);
99 self
100 }
101
102 pub fn with_host_id(mut self, id: Uuid) -> Self {
104 self.host_id = Some(id);
105 self
106 }
107
108 pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
110 self.postgres = Some(config);
111 self
112 }
113
114 pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
116 self.peer_registry = Some(config);
117 self
118 }
119
120 pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
122 self.default_persister = Some(persister);
123 self
124 }
125
126 pub fn with_persister_override(
128 mut self,
129 entity_type: impl Into<String>,
130 persister: Arc<dyn Persister>,
131 ) -> Self {
132 self.persister_overrides
133 .insert(entity_type.into(), persister);
134 self
135 }
136
137 pub fn with_peer_clients(
143 mut self,
144 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
145 ) -> Self {
146 self.peer_clients = Some(peer_clients);
147 self
148 }
149
150 pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
154 self.after_init = Some(Box::new(f));
155 self
156 }
157
158 pub fn with_server_info(mut self, info: mcp::dispatch::ServerInfo) -> Self {
162 self.server_info = Some(info);
163 self
164 }
165
166 pub fn build(self) -> CellServer {
168 let bind_addr = self
169 .bind_addr
170 .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
171
172 let server_info = Arc::new(self.server_info.unwrap_or_default());
173
174 let mut server = CellServer::new(CellServerConfig {
175 bind_addr,
176 postgres: self.postgres,
177 host_id: self.host_id,
178 peer_registry: self.peer_registry,
179 default_persister: self.default_persister,
180 persister_overrides: self.persister_overrides,
181 peer_clients: self.peer_clients,
182 });
183 server.after_init = std::sync::Mutex::new(self.after_init);
184 server.server_info = server_info;
185 server
186 }
187}
188
189pub struct CellServer {
193 pub registry: Arc<StoreRegistry>,
195 pub handler_registry: Arc<HandlerRegistry>,
197 pub relationship_manager: Arc<RelationshipManager>,
199 pub postgres_producer: Option<PostgresProducerHandle>,
201 pub search_index: Arc<SearchIndex>,
203 pub persisters: Arc<PersisterRouter>,
205 pub host_id: Uuid,
207 config: CellServerConfig,
209 _postgres_producer_owner: Option<CellPostgresProducer>,
211 postgres_consumer: Option<CellPostgresConsumer>,
213 ready: Arc<AtomicBool>,
215 peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
217 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
219 after_init: std::sync::Mutex<Option<AfterInitCallback>>,
221 server_info: Arc<mcp::dispatch::ServerInfo>,
225 saga_event_tx: flume::Sender<MEvent>,
227 saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
229 saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
231 _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
233 #[cfg(feature = "inspector")]
235 _inspector: hyphae::server::InspectorServer,
236}
237
238impl CellServer {
239 pub fn builder() -> CellServerBuilder {
241 CellServerBuilder::new()
242 }
243
244 pub fn new(config: CellServerConfig) -> Self {
246 let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
247 let registry = Arc::new(StoreRegistry::new());
248 let handler_registry = Arc::new(HandlerRegistry::new());
249 let relationship_manager = Arc::new(RelationshipManager::new());
250
251 init_client_registry();
253
254 let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
255 let (postgres_producer_owner, postgres_producer, postgres_consumer) =
256 if let Some(ref postgres_config) = config.postgres {
257 match CellPostgresProducer::new(postgres_config, host_id) {
258 Ok(producer) => {
259 let handle = producer.handle();
260 let consumer = match CellPostgresConsumer::start(
261 postgres_config,
262 host_id,
263 handler_registry.clone(),
264 registry.clone(),
265 ) {
266 Ok(c) => Some(c),
267 Err(e) => {
268 log::error!("Failed to start Postgres consumer: {}", e);
269 None
270 }
271 };
272 (Some(producer), Some(handle), consumer)
273 }
274 Err(e) => {
275 log::error!("Failed to create Postgres producer: {}", e);
276 (None, None, None)
277 }
278 }
279 } else {
280 (None, None, None)
281 };
282
283 let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
285
286 let search_index = Arc::new(SearchIndex::new());
288
289 let mut persister_router = PersisterRouter::default();
294 if let Some(default_persister) = config.default_persister.clone() {
295 persister_router.set_default(Some(default_persister));
296 } else if let Some(handle) = postgres_producer.clone() {
297 persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
298 }
299 for (entity_type, persister) in &config.persister_overrides {
300 persister_router.set_override(entity_type.clone(), persister.clone());
301 }
302 let persisters = Arc::new(persister_router);
303
304 #[cfg(feature = "inspector")]
306 let inspector = hyphae::server::start_server("myko");
307 #[cfg(feature = "inspector")]
308 log::info!("Hyphae inspector on port {}", inspector.port());
309
310 let peer_clients = config
311 .peer_clients
312 .clone()
313 .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
314
315 Self {
316 registry,
317 handler_registry,
318 relationship_manager,
319 postgres_producer,
320 search_index,
321 persisters,
322 host_id,
323 config,
324 _postgres_producer_owner: postgres_producer_owner,
325 postgres_consumer,
326 ready,
327 peer_registry_instance: RwLock::new(None),
328 peer_clients,
329 after_init: std::sync::Mutex::new(None),
330 server_info: Arc::new(mcp::dispatch::ServerInfo::default()),
331 saga_event_tx,
332 saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
333 saga_tasks: std::sync::Mutex::new(Vec::new()),
334 _server_ownership_guard: std::sync::Mutex::new(None),
335 #[cfg(feature = "inspector")]
336 _inspector: inspector,
337 }
338 }
339
340 pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
342 let peer_config = config.or_else(|| self.config.peer_registry.clone());
343
344 if let Some(peer_config) = peer_config {
345 log::info!("Starting peer registry");
346 let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
347 *self.peer_registry_instance.write().unwrap() = Some(pr);
348 }
349 }
350
351 pub fn has_peer_registry(&self) -> bool {
353 self.peer_registry_instance.read().unwrap().is_some()
354 }
355
356 pub fn registry(&self) -> Arc<StoreRegistry> {
358 self.registry.clone()
359 }
360
361 pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
363 self.handler_registry.clone()
364 }
365
366 pub fn server_info(&self) -> Arc<mcp::dispatch::ServerInfo> {
369 self.server_info.clone()
370 }
371
372 pub fn ctx(&self) -> CellServerCtx {
374 let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
375 self.config.postgres.as_ref().map(|pg| {
376 Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
377 as Arc<dyn myko::server::HistoryReplayProvider>
378 });
379 CellServerCtx::new(
380 self.host_id,
381 self.registry.clone(),
382 self.handler_registry.clone(),
383 self.relationship_manager.clone(),
384 self.persisters.clone(),
385 self.search_index.clone(),
386 self.peer_clients.clone(),
387 Some(self.saga_event_tx.clone()),
388 history_replay,
389 )
390 }
391
392 fn start_saga_runtime(&self) {
393 let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
394 if registrations.is_empty() {
395 return;
396 }
397 let Some(rx) = self
398 .saga_event_rx
399 .lock()
400 .expect("saga_event_rx mutex poisoned")
401 .take()
402 else {
403 return;
404 };
405
406 log::info!("Starting saga runtime with {} saga(s)", registrations.len());
407
408 struct SagaChannel {
411 tx: flume::Sender<MEvent>,
412 entity_type: &'static str,
413 change_type: myko::event::MEventType,
414 }
415 let mut saga_channels: Vec<SagaChannel> = Vec::new();
416
417 for registration in registrations {
418 let saga = (registration.create)();
419 let saga_name = saga.name().to_string();
420 let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
421 saga_channels.push(SagaChannel {
422 tx: saga_tx,
423 entity_type: registration.event_entity_type,
424 change_type: registration.event_change_type,
425 });
426 let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
427 saga_rx,
428 move |saga_rx| async move {
429 saga_rx
430 .recv_async()
431 .await
432 .ok()
433 .map(|event| (event, saga_rx))
434 },
435 ));
436
437 let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
438 self.host_id,
439 self.registry.clone(),
440 self.saga_event_tx.clone(),
441 ));
442 let mut command_stream = saga.build_boxed(events, saga_ctx);
443
444 let host_id = self.host_id;
445 let registry = self.registry.clone();
446 let handler_registry = self.handler_registry.clone();
447 let relationship_manager = self.relationship_manager.clone();
448 let persisters = self.persisters.clone();
449 let search_index = self.search_index.clone();
450 let peer_clients = self.peer_clients.clone();
451 let saga_event_tx = self.saga_event_tx.clone();
452
453 let handle = tokio::spawn(async move {
454 while let Some(command) = command_stream.next().await {
455 let command_name = command.command_name();
456 log::debug!("Saga {} executing command {}", saga_name, command_name);
457 let req = Arc::new(RequestContext::internal(
458 Arc::from(Uuid::new_v4().to_string()),
459 host_id,
460 &format!("saga:{saga_name}"),
461 ));
462
463 let cmd_ctx = CommandContext::new(
464 Arc::from(command_name),
465 req,
466 Arc::new(CellServerCtx::new(
467 host_id,
468 registry.clone(),
469 handler_registry.clone(),
470 relationship_manager.clone(),
471 persisters.clone(),
472 search_index.clone(),
473 peer_clients.clone(),
474 Some(saga_event_tx.clone()),
475 None,
476 )),
477 );
478
479 if let Err(err) = command.execute_boxed(cmd_ctx) {
480 log::error!(
481 "Saga {} command {} failed: {}",
482 saga_name,
483 command_name,
484 err.message
485 );
486 }
487 }
488 });
489
490 self.saga_tasks
491 .lock()
492 .expect("saga_tasks mutex poisoned")
493 .push(handle);
494 }
495
496 let dispatcher = tokio::spawn(async move {
499 while let Ok(event) = rx.recv_async().await {
500 for ch in &saga_channels {
501 if event.item_type == ch.entity_type && event.change_type == ch.change_type {
502 let _ = ch.tx.send(event.clone());
503 }
504 }
505 }
506 });
507 self.saga_tasks
508 .lock()
509 .expect("saga_tasks mutex poisoned")
510 .push(dispatcher);
511 }
512
513 pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
515 self.config
516 .postgres
517 .clone()
518 .map(PostgresHistoryStore::new)
519 .transpose()
520 }
521
522 pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
524 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
525 return Err(
526 "Postgres is configured but the Postgres consumer is not running".to_string(),
527 );
528 }
529
530 if let Some(ref consumer) = self.postgres_consumer {
531 consumer.wait_until_caught_up(timeout)?;
532 self.ready.store(true, Ordering::SeqCst);
533 }
534 Ok(())
535 }
536
537 pub fn establish_relations(&self) {
539 if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
540 log::error!("Failed to establish relations: {e}");
541 }
542 }
543
544 pub fn is_ready(&self) -> bool {
546 if let Some(ref consumer) = self.postgres_consumer {
547 if consumer.is_caught_up() {
548 self.ready.store(true, Ordering::SeqCst);
549 return true;
550 }
551 return false;
552 }
553 true
554 }
555
556 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
558 use tokio::net::TcpListener;
559
560 let entity_types: Vec<&str> = self
562 .handler_registry
563 .entity_types()
564 .map(|t| t.as_ref())
565 .collect();
566 self.persisters
567 .startup_healthcheck(&entity_types)
568 .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
569
570 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
571 return Err("Postgres is configured but the Postgres consumer failed to start".into());
572 }
573
574 if self.postgres_consumer.is_some() {
576 log::info!("Waiting for Postgres event consumer to catch up...");
577 let timeout = std::time::Duration::from_secs(300);
578 self.init_postgres_and_wait(timeout)
579 .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
580 log::info!("Postgres caught up, ready to accept connections");
581 }
582
583 log::info!("Building search index...");
585 self.search_index.build_from_registry(&self.registry);
586
587 log::info!("Establishing relations...");
589 self.establish_relations();
590
591 log::info!("Checking server-owned item ownership...");
593 if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
594 log::error!("Failed to claim orphaned server-owned items: {}", e);
595 }
596 let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
597 *self
598 ._server_ownership_guard
599 .lock()
600 .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
601
602 let listener = TcpListener::bind(&self.config.bind_addr).await?;
605 log::info!("CellServer listening on {}", self.config.bind_addr);
606 log::info!(
607 "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
608 self.config.bind_addr
609 );
610
611 if self.config.peer_registry.is_some() {
613 self.start_peer_registry(None);
614 }
615
616 if let Some(hook) = self
618 .after_init
619 .lock()
620 .expect("after_init mutex poisoned")
621 .take()
622 {
623 hook(self);
624 }
625
626 self.start_saga_runtime();
627
628 crate::ws_timing::start_periodic_logger();
632
633 myko::server::report_cache_stats::start_periodic_logger();
636
637 myko::search::search_stats::start_periodic_logger();
640
641 log::info!("Server started");
642 self.run_ws_accept_loop(listener).await
643 }
644
645 pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
647 use tokio::net::TcpListener;
648
649 let listener = TcpListener::bind(&self.config.bind_addr).await?;
650 log::info!("CellServer listening on {}", self.config.bind_addr);
651 log::info!(
652 "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
653 self.config.bind_addr
654 );
655 self.run_ws_accept_loop(listener).await
656 }
657
658 async fn run_ws_accept_loop(
659 &self,
660 listener: tokio::net::TcpListener,
661 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
662 let ready = self.ready.clone();
663
664 loop {
665 let (stream, addr) = listener.accept().await?;
666
667 if !ready.load(Ordering::SeqCst) {
669 if self.is_ready() {
670 log::info!("Server is now ready to accept connections");
671 } else {
672 log::warn!(
673 "Rejecting connection from {} - server not ready (durable backend catching up)",
674 addr
675 );
676 drop(stream);
677 continue;
678 }
679 }
680
681 log::debug!("New connection from {}", addr);
682
683 let ctx = Arc::new(self.ctx());
684 let server_info = self.server_info.clone();
685
686 tokio::spawn(async move {
687 if let Err(e) = router::route_connection(stream, addr, ctx, server_info).await {
688 log::error!("Connection error from {}: {}", addr, e);
689 }
690 });
691 }
692 }
693}
694
695#[cfg(test)]
696mod tests {
697 use super::*;
698
699 #[test]
700 fn test_server_creation() {
701 let config = CellServerConfig {
702 bind_addr: "127.0.0.1:0".parse().unwrap(),
703 postgres: None,
704 host_id: None,
705 peer_registry: None,
706 default_persister: None,
707 persister_overrides: HashMap::new(),
708 peer_clients: None,
709 };
710 let server = CellServer::new(config);
711 assert!(Arc::strong_count(&server.registry) >= 1);
712 }
713
714 #[test]
715 fn test_server_with_host_id() {
716 let host_id = Uuid::new_v4();
717 let config = CellServerConfig {
718 bind_addr: "127.0.0.1:0".parse().unwrap(),
719 postgres: None,
720 host_id: Some(host_id),
721 peer_registry: None,
722 default_persister: None,
723 persister_overrides: HashMap::new(),
724 peer_clients: None,
725 };
726 let server = CellServer::new(config);
727 assert_eq!(server.host_id, host_id);
728 }
729}