1pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod router;
17pub mod server_ownership;
18pub mod ws_handler;
19pub mod ws_timing;
20
21use std::{
23 collections::HashMap,
24 net::SocketAddr,
25 sync::{
26 Arc, RwLock,
27 atomic::{AtomicBool, Ordering},
28 },
29 time::Duration,
30};
31
32use futures_util::StreamExt;
33pub use myko::server::*;
34use myko::{
35 client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
36 search::SearchIndex, store::StoreRegistry, wire::MEvent,
37};
38pub use peer_persister::PeerPersister;
39pub use server_ownership::ServerOwnershipManager;
40use uuid::Uuid;
41
42use crate::postgres::{
43 CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
44 PostgresHistoryStore, PostgresProducerHandle,
45};
46
47#[derive(Clone)]
49pub struct CellServerConfig {
50 pub bind_addr: SocketAddr,
52 pub postgres: Option<PostgresConfig>,
54 pub host_id: Option<Uuid>,
56 pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
58 pub default_persister: Option<Arc<dyn Persister>>,
60 pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
62 pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
66}
67
68#[derive(Default)]
70pub struct CellServerBuilder {
71 bind_addr: Option<SocketAddr>,
72 host_id: Option<Uuid>,
73 postgres: Option<PostgresConfig>,
74 peer_registry: Option<peer_registry::PeerRegistryConfig>,
75 default_persister: Option<Arc<dyn Persister>>,
76 persister_overrides: HashMap<String, Arc<dyn Persister>>,
77 peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
81 after_init: Option<AfterInitCallback>,
82}
83
84type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
85
86impl CellServerBuilder {
87 pub fn new() -> Self {
89 Self::default()
90 }
91
92 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
94 self.bind_addr = Some(addr);
95 self
96 }
97
98 pub fn with_host_id(mut self, id: Uuid) -> Self {
100 self.host_id = Some(id);
101 self
102 }
103
104 pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
106 self.postgres = Some(config);
107 self
108 }
109
110 pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
112 self.peer_registry = Some(config);
113 self
114 }
115
116 pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
118 self.default_persister = Some(persister);
119 self
120 }
121
122 pub fn with_persister_override(
124 mut self,
125 entity_type: impl Into<String>,
126 persister: Arc<dyn Persister>,
127 ) -> Self {
128 self.persister_overrides
129 .insert(entity_type.into(), persister);
130 self
131 }
132
133 pub fn with_peer_clients(
139 mut self,
140 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
141 ) -> Self {
142 self.peer_clients = Some(peer_clients);
143 self
144 }
145
146 pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
150 self.after_init = Some(Box::new(f));
151 self
152 }
153
154 pub fn build(self) -> CellServer {
156 let bind_addr = self
157 .bind_addr
158 .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
159
160 let mut server = CellServer::new(CellServerConfig {
161 bind_addr,
162 postgres: self.postgres,
163 host_id: self.host_id,
164 peer_registry: self.peer_registry,
165 default_persister: self.default_persister,
166 persister_overrides: self.persister_overrides,
167 peer_clients: self.peer_clients,
168 });
169 server.after_init = std::sync::Mutex::new(self.after_init);
170 server
171 }
172}
173
174pub struct CellServer {
178 pub registry: Arc<StoreRegistry>,
180 pub handler_registry: Arc<HandlerRegistry>,
182 pub relationship_manager: Arc<RelationshipManager>,
184 pub postgres_producer: Option<PostgresProducerHandle>,
186 pub search_index: Arc<SearchIndex>,
188 pub persisters: Arc<PersisterRouter>,
190 pub host_id: Uuid,
192 config: CellServerConfig,
194 _postgres_producer_owner: Option<CellPostgresProducer>,
196 postgres_consumer: Option<CellPostgresConsumer>,
198 ready: Arc<AtomicBool>,
200 peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
202 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
204 after_init: std::sync::Mutex<Option<AfterInitCallback>>,
206 saga_event_tx: flume::Sender<MEvent>,
208 saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
210 saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
212 _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
214 #[cfg(feature = "inspector")]
216 _inspector: hyphae::server::InspectorServer,
217}
218
219impl CellServer {
220 pub fn builder() -> CellServerBuilder {
222 CellServerBuilder::new()
223 }
224
225 pub fn new(config: CellServerConfig) -> Self {
227 let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
228 let registry = Arc::new(StoreRegistry::new());
229 let handler_registry = Arc::new(HandlerRegistry::new());
230 let relationship_manager = Arc::new(RelationshipManager::new());
231
232 init_client_registry();
234
235 let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
236 let (postgres_producer_owner, postgres_producer, postgres_consumer) =
237 if let Some(ref postgres_config) = config.postgres {
238 match CellPostgresProducer::new(postgres_config, host_id) {
239 Ok(producer) => {
240 let handle = producer.handle();
241 let consumer = match CellPostgresConsumer::start(
242 postgres_config,
243 host_id,
244 handler_registry.clone(),
245 registry.clone(),
246 ) {
247 Ok(c) => Some(c),
248 Err(e) => {
249 log::error!("Failed to start Postgres consumer: {}", e);
250 None
251 }
252 };
253 (Some(producer), Some(handle), consumer)
254 }
255 Err(e) => {
256 log::error!("Failed to create Postgres producer: {}", e);
257 (None, None, None)
258 }
259 }
260 } else {
261 (None, None, None)
262 };
263
264 let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
266
267 let search_index = Arc::new(SearchIndex::new());
269
270 let mut persister_router = PersisterRouter::default();
275 if let Some(default_persister) = config.default_persister.clone() {
276 persister_router.set_default(Some(default_persister));
277 } else if let Some(handle) = postgres_producer.clone() {
278 persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
279 }
280 for (entity_type, persister) in &config.persister_overrides {
281 persister_router.set_override(entity_type.clone(), persister.clone());
282 }
283 let persisters = Arc::new(persister_router);
284
285 #[cfg(feature = "inspector")]
287 let inspector = hyphae::server::start_server("myko");
288 #[cfg(feature = "inspector")]
289 log::info!("Hyphae inspector on port {}", inspector.port());
290
291 let peer_clients = config
292 .peer_clients
293 .clone()
294 .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
295
296 Self {
297 registry,
298 handler_registry,
299 relationship_manager,
300 postgres_producer,
301 search_index,
302 persisters,
303 host_id,
304 config,
305 _postgres_producer_owner: postgres_producer_owner,
306 postgres_consumer,
307 ready,
308 peer_registry_instance: RwLock::new(None),
309 peer_clients,
310 after_init: std::sync::Mutex::new(None),
311 saga_event_tx,
312 saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
313 saga_tasks: std::sync::Mutex::new(Vec::new()),
314 _server_ownership_guard: std::sync::Mutex::new(None),
315 #[cfg(feature = "inspector")]
316 _inspector: inspector,
317 }
318 }
319
320 pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
322 let peer_config = config.or_else(|| self.config.peer_registry.clone());
323
324 if let Some(peer_config) = peer_config {
325 log::info!("Starting peer registry");
326 let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
327 *self.peer_registry_instance.write().unwrap() = Some(pr);
328 }
329 }
330
331 pub fn has_peer_registry(&self) -> bool {
333 self.peer_registry_instance.read().unwrap().is_some()
334 }
335
336 pub fn registry(&self) -> Arc<StoreRegistry> {
338 self.registry.clone()
339 }
340
341 pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
343 self.handler_registry.clone()
344 }
345
346 pub fn ctx(&self) -> CellServerCtx {
348 let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
349 self.config.postgres.as_ref().map(|pg| {
350 Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
351 as Arc<dyn myko::server::HistoryReplayProvider>
352 });
353 CellServerCtx::new(
354 self.host_id,
355 self.registry.clone(),
356 self.handler_registry.clone(),
357 self.relationship_manager.clone(),
358 self.persisters.clone(),
359 self.search_index.clone(),
360 self.peer_clients.clone(),
361 Some(self.saga_event_tx.clone()),
362 history_replay,
363 )
364 }
365
366 fn start_saga_runtime(&self) {
367 let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
368 if registrations.is_empty() {
369 return;
370 }
371 let Some(rx) = self
372 .saga_event_rx
373 .lock()
374 .expect("saga_event_rx mutex poisoned")
375 .take()
376 else {
377 return;
378 };
379
380 log::info!("Starting saga runtime with {} saga(s)", registrations.len());
381
382 struct SagaChannel {
385 tx: flume::Sender<MEvent>,
386 entity_type: &'static str,
387 change_type: myko::event::MEventType,
388 }
389 let mut saga_channels: Vec<SagaChannel> = Vec::new();
390
391 for registration in registrations {
392 let saga = (registration.create)();
393 let saga_name = saga.name().to_string();
394 let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
395 saga_channels.push(SagaChannel {
396 tx: saga_tx,
397 entity_type: registration.event_entity_type,
398 change_type: registration.event_change_type,
399 });
400 let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
401 saga_rx,
402 move |saga_rx| async move {
403 saga_rx
404 .recv_async()
405 .await
406 .ok()
407 .map(|event| (event, saga_rx))
408 },
409 ));
410
411 let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
412 self.host_id,
413 self.registry.clone(),
414 self.saga_event_tx.clone(),
415 ));
416 let mut command_stream = saga.build_boxed(events, saga_ctx);
417
418 let host_id = self.host_id;
419 let registry = self.registry.clone();
420 let handler_registry = self.handler_registry.clone();
421 let relationship_manager = self.relationship_manager.clone();
422 let persisters = self.persisters.clone();
423 let search_index = self.search_index.clone();
424 let peer_clients = self.peer_clients.clone();
425 let saga_event_tx = self.saga_event_tx.clone();
426
427 let handle = tokio::spawn(async move {
428 while let Some(command) = command_stream.next().await {
429 let command_name = command.command_name();
430 log::debug!("Saga {} executing command {}", saga_name, command_name);
431 let req = Arc::new(RequestContext::internal(
432 Arc::from(Uuid::new_v4().to_string()),
433 host_id,
434 &format!("saga:{saga_name}"),
435 ));
436
437 let cmd_ctx = CommandContext::new(
438 Arc::from(command_name),
439 req,
440 Arc::new(CellServerCtx::new(
441 host_id,
442 registry.clone(),
443 handler_registry.clone(),
444 relationship_manager.clone(),
445 persisters.clone(),
446 search_index.clone(),
447 peer_clients.clone(),
448 Some(saga_event_tx.clone()),
449 None,
450 )),
451 );
452
453 if let Err(err) = command.execute_boxed(cmd_ctx) {
454 log::error!(
455 "Saga {} command {} failed: {}",
456 saga_name,
457 command_name,
458 err.message
459 );
460 }
461 }
462 });
463
464 self.saga_tasks
465 .lock()
466 .expect("saga_tasks mutex poisoned")
467 .push(handle);
468 }
469
470 let dispatcher = tokio::spawn(async move {
473 while let Ok(event) = rx.recv_async().await {
474 for ch in &saga_channels {
475 if event.item_type == ch.entity_type && event.change_type == ch.change_type {
476 let _ = ch.tx.send(event.clone());
477 }
478 }
479 }
480 });
481 self.saga_tasks
482 .lock()
483 .expect("saga_tasks mutex poisoned")
484 .push(dispatcher);
485 }
486
487 pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
489 self.config
490 .postgres
491 .clone()
492 .map(PostgresHistoryStore::new)
493 .transpose()
494 }
495
496 pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
498 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
499 return Err(
500 "Postgres is configured but the Postgres consumer is not running".to_string(),
501 );
502 }
503
504 if let Some(ref consumer) = self.postgres_consumer {
505 consumer.wait_until_caught_up(timeout)?;
506 self.ready.store(true, Ordering::SeqCst);
507 }
508 Ok(())
509 }
510
511 pub fn establish_relations(&self) {
513 if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
514 log::error!("Failed to establish relations: {e}");
515 }
516 }
517
518 pub fn is_ready(&self) -> bool {
520 if let Some(ref consumer) = self.postgres_consumer {
521 if consumer.is_caught_up() {
522 self.ready.store(true, Ordering::SeqCst);
523 return true;
524 }
525 return false;
526 }
527 true
528 }
529
530 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
532 use tokio::net::TcpListener;
533
534 let entity_types: Vec<&str> = self
536 .handler_registry
537 .entity_types()
538 .map(|t| t.as_ref())
539 .collect();
540 self.persisters
541 .startup_healthcheck(&entity_types)
542 .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
543
544 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
545 return Err("Postgres is configured but the Postgres consumer failed to start".into());
546 }
547
548 if self.postgres_consumer.is_some() {
550 log::info!("Waiting for Postgres event consumer to catch up...");
551 let timeout = std::time::Duration::from_secs(300);
552 self.init_postgres_and_wait(timeout)
553 .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
554 log::info!("Postgres caught up, ready to accept connections");
555 }
556
557 log::info!("Building search index...");
559 self.search_index.build_from_registry(&self.registry);
560
561 log::info!("Establishing relations...");
563 self.establish_relations();
564
565 log::info!("Checking server-owned item ownership...");
567 if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
568 log::error!("Failed to claim orphaned server-owned items: {}", e);
569 }
570 let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
571 *self
572 ._server_ownership_guard
573 .lock()
574 .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
575
576 let listener = TcpListener::bind(&self.config.bind_addr).await?;
579 log::info!("CellServer listening on {}", self.config.bind_addr);
580 log::info!(
581 "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
582 self.config.bind_addr
583 );
584
585 if self.config.peer_registry.is_some() {
587 self.start_peer_registry(None);
588 }
589
590 if let Some(hook) = self
592 .after_init
593 .lock()
594 .expect("after_init mutex poisoned")
595 .take()
596 {
597 hook(self);
598 }
599
600 self.start_saga_runtime();
601
602 crate::ws_timing::start_periodic_logger();
606
607 myko::server::report_cache_stats::start_periodic_logger();
610
611 myko::search::search_stats::start_periodic_logger();
614
615 log::info!("Server started");
616 self.run_ws_accept_loop(listener).await
617 }
618
619 pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
621 use tokio::net::TcpListener;
622
623 let listener = TcpListener::bind(&self.config.bind_addr).await?;
624 log::info!("CellServer listening on {}", self.config.bind_addr);
625 log::info!(
626 "Myko gateway: ws://{}/myko | MCP: /myko/mcp (POST + WS + SSE)",
627 self.config.bind_addr
628 );
629 self.run_ws_accept_loop(listener).await
630 }
631
632 async fn run_ws_accept_loop(
633 &self,
634 listener: tokio::net::TcpListener,
635 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
636 let ready = self.ready.clone();
637
638 loop {
639 let (stream, addr) = listener.accept().await?;
640
641 if !ready.load(Ordering::SeqCst) {
643 if self.is_ready() {
644 log::info!("Server is now ready to accept connections");
645 } else {
646 log::warn!(
647 "Rejecting connection from {} - server not ready (durable backend catching up)",
648 addr
649 );
650 drop(stream);
651 continue;
652 }
653 }
654
655 log::debug!("New connection from {}", addr);
656
657 let ctx = Arc::new(self.ctx());
658
659 tokio::spawn(async move {
660 if let Err(e) = router::route_connection(stream, addr, ctx).await {
661 log::error!("Connection error from {}: {}", addr, e);
662 }
663 });
664 }
665 }
666}
667
668#[cfg(test)]
669mod tests {
670 use super::*;
671
672 #[test]
673 fn test_server_creation() {
674 let config = CellServerConfig {
675 bind_addr: "127.0.0.1:0".parse().unwrap(),
676 postgres: None,
677 host_id: None,
678 peer_registry: None,
679 default_persister: None,
680 persister_overrides: HashMap::new(),
681 peer_clients: None,
682 };
683 let server = CellServer::new(config);
684 assert!(Arc::strong_count(&server.registry) >= 1);
685 }
686
687 #[test]
688 fn test_server_with_host_id() {
689 let host_id = Uuid::new_v4();
690 let config = CellServerConfig {
691 bind_addr: "127.0.0.1:0".parse().unwrap(),
692 postgres: None,
693 host_id: Some(host_id),
694 peer_registry: None,
695 default_persister: None,
696 persister_overrides: HashMap::new(),
697 peer_clients: None,
698 };
699 let server = CellServer::new(config);
700 assert_eq!(server.host_id, host_id);
701 }
702}