1pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod server_ownership;
17pub mod ws_handler;
18pub mod ws_timing;
19
20use std::{
22 collections::HashMap,
23 net::SocketAddr,
24 sync::{
25 Arc, RwLock,
26 atomic::{AtomicBool, Ordering},
27 },
28 time::Duration,
29};
30
31use futures_util::StreamExt;
32pub use myko::server::*;
33use myko::{
34 client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
35 search::SearchIndex, store::StoreRegistry, wire::MEvent,
36};
37pub use peer_persister::PeerPersister;
38pub use server_ownership::ServerOwnershipManager;
39use uuid::Uuid;
40
41use crate::postgres::{
42 CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
43 PostgresHistoryStore, PostgresProducerHandle,
44};
45
46#[derive(Clone)]
48pub struct CellServerConfig {
49 pub bind_addr: SocketAddr,
51 pub postgres: Option<PostgresConfig>,
53 pub host_id: Option<Uuid>,
55 pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
57 pub default_persister: Option<Arc<dyn Persister>>,
59 pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
61 pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
65}
66
67#[derive(Default)]
69pub struct CellServerBuilder {
70 bind_addr: Option<SocketAddr>,
71 host_id: Option<Uuid>,
72 postgres: Option<PostgresConfig>,
73 peer_registry: Option<peer_registry::PeerRegistryConfig>,
74 default_persister: Option<Arc<dyn Persister>>,
75 persister_overrides: HashMap<String, Arc<dyn Persister>>,
76 peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
80 after_init: Option<AfterInitCallback>,
81}
82
83type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
84
85impl CellServerBuilder {
86 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
93 self.bind_addr = Some(addr);
94 self
95 }
96
97 pub fn with_host_id(mut self, id: Uuid) -> Self {
99 self.host_id = Some(id);
100 self
101 }
102
103 pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
105 self.postgres = Some(config);
106 self
107 }
108
109 pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
111 self.peer_registry = Some(config);
112 self
113 }
114
115 pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
117 self.default_persister = Some(persister);
118 self
119 }
120
121 pub fn with_persister_override(
123 mut self,
124 entity_type: impl Into<String>,
125 persister: Arc<dyn Persister>,
126 ) -> Self {
127 self.persister_overrides
128 .insert(entity_type.into(), persister);
129 self
130 }
131
132 pub fn with_peer_clients(
138 mut self,
139 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
140 ) -> Self {
141 self.peer_clients = Some(peer_clients);
142 self
143 }
144
145 pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
149 self.after_init = Some(Box::new(f));
150 self
151 }
152
153 pub fn build(self) -> CellServer {
155 let bind_addr = self
156 .bind_addr
157 .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
158
159 let mut server = CellServer::new(CellServerConfig {
160 bind_addr,
161 postgres: self.postgres,
162 host_id: self.host_id,
163 peer_registry: self.peer_registry,
164 default_persister: self.default_persister,
165 persister_overrides: self.persister_overrides,
166 peer_clients: self.peer_clients,
167 });
168 server.after_init = std::sync::Mutex::new(self.after_init);
169 server
170 }
171}
172
173pub struct CellServer {
177 pub registry: Arc<StoreRegistry>,
179 pub handler_registry: Arc<HandlerRegistry>,
181 pub relationship_manager: Arc<RelationshipManager>,
183 pub postgres_producer: Option<PostgresProducerHandle>,
185 pub search_index: Arc<SearchIndex>,
187 pub persisters: Arc<PersisterRouter>,
189 pub host_id: Uuid,
191 config: CellServerConfig,
193 _postgres_producer_owner: Option<CellPostgresProducer>,
195 postgres_consumer: Option<CellPostgresConsumer>,
197 ready: Arc<AtomicBool>,
199 peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
201 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
203 after_init: std::sync::Mutex<Option<AfterInitCallback>>,
205 saga_event_tx: flume::Sender<MEvent>,
207 saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
209 saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
211 _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
213 #[cfg(feature = "inspector")]
215 _inspector: hyphae::server::InspectorServer,
216}
217
218impl CellServer {
219 pub fn builder() -> CellServerBuilder {
221 CellServerBuilder::new()
222 }
223
224 pub fn new(config: CellServerConfig) -> Self {
226 let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
227 let registry = Arc::new(StoreRegistry::new());
228 let handler_registry = Arc::new(HandlerRegistry::new());
229 let relationship_manager = Arc::new(RelationshipManager::new());
230
231 init_client_registry();
233
234 let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
235 let (postgres_producer_owner, postgres_producer, postgres_consumer) =
236 if let Some(ref postgres_config) = config.postgres {
237 match CellPostgresProducer::new(postgres_config, host_id) {
238 Ok(producer) => {
239 let handle = producer.handle();
240 let consumer = match CellPostgresConsumer::start(
241 postgres_config,
242 host_id,
243 handler_registry.clone(),
244 registry.clone(),
245 ) {
246 Ok(c) => Some(c),
247 Err(e) => {
248 log::error!("Failed to start Postgres consumer: {}", e);
249 None
250 }
251 };
252 (Some(producer), Some(handle), consumer)
253 }
254 Err(e) => {
255 log::error!("Failed to create Postgres producer: {}", e);
256 (None, None, None)
257 }
258 }
259 } else {
260 (None, None, None)
261 };
262
263 let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
265
266 let search_index = Arc::new(SearchIndex::new());
268
269 let mut persister_router = PersisterRouter::default();
274 if let Some(default_persister) = config.default_persister.clone() {
275 persister_router.set_default(Some(default_persister));
276 } else if let Some(handle) = postgres_producer.clone() {
277 persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
278 }
279 for (entity_type, persister) in &config.persister_overrides {
280 persister_router.set_override(entity_type.clone(), persister.clone());
281 }
282 let persisters = Arc::new(persister_router);
283
284 #[cfg(feature = "inspector")]
286 let inspector = hyphae::server::start_server("myko");
287 #[cfg(feature = "inspector")]
288 log::info!("Hyphae inspector on port {}", inspector.port());
289
290 let peer_clients = config
291 .peer_clients
292 .clone()
293 .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
294
295 Self {
296 registry,
297 handler_registry,
298 relationship_manager,
299 postgres_producer,
300 search_index,
301 persisters,
302 host_id,
303 config,
304 _postgres_producer_owner: postgres_producer_owner,
305 postgres_consumer,
306 ready,
307 peer_registry_instance: RwLock::new(None),
308 peer_clients,
309 after_init: std::sync::Mutex::new(None),
310 saga_event_tx,
311 saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
312 saga_tasks: std::sync::Mutex::new(Vec::new()),
313 _server_ownership_guard: std::sync::Mutex::new(None),
314 #[cfg(feature = "inspector")]
315 _inspector: inspector,
316 }
317 }
318
319 pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
321 let peer_config = config.or_else(|| self.config.peer_registry.clone());
322
323 if let Some(peer_config) = peer_config {
324 log::info!("Starting peer registry");
325 let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
326 *self.peer_registry_instance.write().unwrap() = Some(pr);
327 }
328 }
329
330 pub fn has_peer_registry(&self) -> bool {
332 self.peer_registry_instance.read().unwrap().is_some()
333 }
334
335 pub fn registry(&self) -> Arc<StoreRegistry> {
337 self.registry.clone()
338 }
339
340 pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
342 self.handler_registry.clone()
343 }
344
345 pub fn ctx(&self) -> CellServerCtx {
347 let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
348 self.config.postgres.as_ref().map(|pg| {
349 Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
350 as Arc<dyn myko::server::HistoryReplayProvider>
351 });
352 CellServerCtx::new(
353 self.host_id,
354 self.registry.clone(),
355 self.handler_registry.clone(),
356 self.relationship_manager.clone(),
357 self.persisters.clone(),
358 self.search_index.clone(),
359 self.peer_clients.clone(),
360 Some(self.saga_event_tx.clone()),
361 history_replay,
362 )
363 }
364
365 fn start_saga_runtime(&self) {
366 let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
367 if registrations.is_empty() {
368 return;
369 }
370 let Some(rx) = self
371 .saga_event_rx
372 .lock()
373 .expect("saga_event_rx mutex poisoned")
374 .take()
375 else {
376 return;
377 };
378
379 log::info!("Starting saga runtime with {} saga(s)", registrations.len());
380
381 struct SagaChannel {
384 tx: flume::Sender<MEvent>,
385 entity_type: &'static str,
386 change_type: myko::event::MEventType,
387 }
388 let mut saga_channels: Vec<SagaChannel> = Vec::new();
389
390 for registration in registrations {
391 let saga = (registration.create)();
392 let saga_name = saga.name().to_string();
393 let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
394 saga_channels.push(SagaChannel {
395 tx: saga_tx,
396 entity_type: registration.event_entity_type,
397 change_type: registration.event_change_type,
398 });
399 let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
400 saga_rx,
401 move |saga_rx| async move {
402 saga_rx
403 .recv_async()
404 .await
405 .ok()
406 .map(|event| (event, saga_rx))
407 },
408 ));
409
410 let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
411 self.host_id,
412 self.registry.clone(),
413 self.saga_event_tx.clone(),
414 ));
415 let mut command_stream = saga.build_boxed(events, saga_ctx);
416
417 let host_id = self.host_id;
418 let registry = self.registry.clone();
419 let handler_registry = self.handler_registry.clone();
420 let relationship_manager = self.relationship_manager.clone();
421 let persisters = self.persisters.clone();
422 let search_index = self.search_index.clone();
423 let peer_clients = self.peer_clients.clone();
424 let saga_event_tx = self.saga_event_tx.clone();
425
426 let handle = tokio::spawn(async move {
427 while let Some(command) = command_stream.next().await {
428 let command_name = command.command_name();
429 log::debug!("Saga {} executing command {}", saga_name, command_name);
430 let req = Arc::new(RequestContext::internal(
431 Arc::from(Uuid::new_v4().to_string()),
432 host_id,
433 &format!("saga:{saga_name}"),
434 ));
435
436 let cmd_ctx = CommandContext::new(
437 Arc::from(command_name),
438 req,
439 Arc::new(CellServerCtx::new(
440 host_id,
441 registry.clone(),
442 handler_registry.clone(),
443 relationship_manager.clone(),
444 persisters.clone(),
445 search_index.clone(),
446 peer_clients.clone(),
447 Some(saga_event_tx.clone()),
448 None,
449 )),
450 );
451
452 if let Err(err) = command.execute_boxed(cmd_ctx) {
453 log::error!(
454 "Saga {} command {} failed: {}",
455 saga_name,
456 command_name,
457 err.message
458 );
459 }
460 }
461 });
462
463 self.saga_tasks
464 .lock()
465 .expect("saga_tasks mutex poisoned")
466 .push(handle);
467 }
468
469 let dispatcher = tokio::spawn(async move {
472 while let Ok(event) = rx.recv_async().await {
473 for ch in &saga_channels {
474 if event.item_type == ch.entity_type && event.change_type == ch.change_type {
475 let _ = ch.tx.send(event.clone());
476 }
477 }
478 }
479 });
480 self.saga_tasks
481 .lock()
482 .expect("saga_tasks mutex poisoned")
483 .push(dispatcher);
484 }
485
486 pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
488 self.config
489 .postgres
490 .clone()
491 .map(PostgresHistoryStore::new)
492 .transpose()
493 }
494
495 pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
497 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
498 return Err(
499 "Postgres is configured but the Postgres consumer is not running".to_string(),
500 );
501 }
502
503 if let Some(ref consumer) = self.postgres_consumer {
504 consumer.wait_until_caught_up(timeout)?;
505 self.ready.store(true, Ordering::SeqCst);
506 }
507 Ok(())
508 }
509
510 pub fn establish_relations(&self) {
512 if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
513 log::error!("Failed to establish relations: {e}");
514 }
515 }
516
517 pub fn is_ready(&self) -> bool {
519 if let Some(ref consumer) = self.postgres_consumer {
520 if consumer.is_caught_up() {
521 self.ready.store(true, Ordering::SeqCst);
522 return true;
523 }
524 return false;
525 }
526 true
527 }
528
529 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
531 use tokio::net::TcpListener;
532
533 let entity_types: Vec<&str> = self
535 .handler_registry
536 .entity_types()
537 .map(|t| t.as_ref())
538 .collect();
539 self.persisters
540 .startup_healthcheck(&entity_types)
541 .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
542
543 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
544 return Err("Postgres is configured but the Postgres consumer failed to start".into());
545 }
546
547 if self.postgres_consumer.is_some() {
549 log::info!("Waiting for Postgres event consumer to catch up...");
550 let timeout = std::time::Duration::from_secs(300);
551 self.init_postgres_and_wait(timeout)
552 .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
553 log::info!("Postgres caught up, ready to accept connections");
554 }
555
556 log::info!("Building search index...");
558 self.search_index.build_from_registry(&self.registry);
559
560 log::info!("Establishing relations...");
562 self.establish_relations();
563
564 log::info!("Checking server-owned item ownership...");
566 if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
567 log::error!("Failed to claim orphaned server-owned items: {}", e);
568 }
569 let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
570 *self
571 ._server_ownership_guard
572 .lock()
573 .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
574
575 let listener = TcpListener::bind(&self.config.bind_addr).await?;
578 log::info!("CellServer listening on {}", self.config.bind_addr);
579 log::info!(
580 "WebSocket server listening on ws://{}/myko",
581 self.config.bind_addr
582 );
583
584 if self.config.peer_registry.is_some() {
586 self.start_peer_registry(None);
587 }
588
589 if let Some(hook) = self
591 .after_init
592 .lock()
593 .expect("after_init mutex poisoned")
594 .take()
595 {
596 hook(self);
597 }
598
599 self.start_saga_runtime();
600
601 crate::ws_timing::start_periodic_logger();
605
606 myko::server::report_cache_stats::start_periodic_logger();
609
610 myko::search::search_stats::start_periodic_logger();
613
614 log::info!("Server started");
615 self.run_ws_accept_loop(listener).await
616 }
617
618 pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
620 use tokio::net::TcpListener;
621
622 let listener = TcpListener::bind(&self.config.bind_addr).await?;
623 log::info!("CellServer listening on {}", self.config.bind_addr);
624 log::info!(
625 "WebSocket server listening on ws://{}/myko",
626 self.config.bind_addr
627 );
628 self.run_ws_accept_loop(listener).await
629 }
630
631 async fn run_ws_accept_loop(
632 &self,
633 listener: tokio::net::TcpListener,
634 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
635 let ready = self.ready.clone();
636
637 loop {
638 let (stream, addr) = listener.accept().await?;
639
640 if !ready.load(Ordering::SeqCst) {
642 if self.is_ready() {
643 log::info!("Server is now ready to accept connections");
644 } else {
645 log::warn!(
646 "Rejecting connection from {} - server not ready (durable backend catching up)",
647 addr
648 );
649 drop(stream);
650 continue;
651 }
652 }
653
654 log::debug!("New connection from {}", addr);
655
656 let ctx = self.ctx();
657
658 tokio::spawn(async move {
659 if let Err(e) =
660 ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
661 {
662 log::error!("Connection error from {}: {}", addr, e);
663 }
664 });
665 }
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672
673 #[test]
674 fn test_server_creation() {
675 let config = CellServerConfig {
676 bind_addr: "127.0.0.1:0".parse().unwrap(),
677 postgres: None,
678 host_id: None,
679 peer_registry: None,
680 default_persister: None,
681 persister_overrides: HashMap::new(),
682 peer_clients: None,
683 };
684 let server = CellServer::new(config);
685 assert!(Arc::strong_count(&server.registry) >= 1);
686 }
687
688 #[test]
689 fn test_server_with_host_id() {
690 let host_id = Uuid::new_v4();
691 let config = CellServerConfig {
692 bind_addr: "127.0.0.1:0".parse().unwrap(),
693 postgres: None,
694 host_id: Some(host_id),
695 peer_registry: None,
696 default_persister: None,
697 persister_overrides: HashMap::new(),
698 peer_clients: None,
699 };
700 let server = CellServer::new(config);
701 assert_eq!(server.host_id, host_id);
702 }
703}