1pub mod mcp;
13pub mod peer_persister;
14pub mod peer_registry;
15pub mod postgres;
16pub mod server_ownership;
17pub mod ws_handler;
18
19use std::{
21 collections::HashMap,
22 net::SocketAddr,
23 sync::{
24 Arc, RwLock,
25 atomic::{AtomicBool, Ordering},
26 },
27 time::Duration,
28};
29
30use futures_util::StreamExt;
31pub use myko::server::*;
32use myko::{
33 client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
34 search::SearchIndex, store::StoreRegistry, wire::MEvent,
35};
36pub use peer_persister::PeerPersister;
37pub use server_ownership::ServerOwnershipManager;
38use uuid::Uuid;
39
40use crate::postgres::{
41 CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
42 PostgresHistoryStore, PostgresProducerHandle,
43};
44
45#[derive(Clone)]
47pub struct CellServerConfig {
48 pub bind_addr: SocketAddr,
50 pub postgres: Option<PostgresConfig>,
52 pub host_id: Option<Uuid>,
54 pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
56 pub default_persister: Option<Arc<dyn Persister>>,
58 pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
60 pub peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
64}
65
66#[derive(Default)]
68pub struct CellServerBuilder {
69 bind_addr: Option<SocketAddr>,
70 host_id: Option<Uuid>,
71 postgres: Option<PostgresConfig>,
72 peer_registry: Option<peer_registry::PeerRegistryConfig>,
73 default_persister: Option<Arc<dyn Persister>>,
74 persister_overrides: HashMap<String, Arc<dyn Persister>>,
75 peer_clients: Option<Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>>,
79 after_init: Option<AfterInitCallback>,
80}
81
82type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
83
84impl CellServerBuilder {
85 pub fn new() -> Self {
87 Self::default()
88 }
89
90 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
92 self.bind_addr = Some(addr);
93 self
94 }
95
96 pub fn with_host_id(mut self, id: Uuid) -> Self {
98 self.host_id = Some(id);
99 self
100 }
101
102 pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
104 self.postgres = Some(config);
105 self
106 }
107
108 pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
110 self.peer_registry = Some(config);
111 self
112 }
113
114 pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
116 self.default_persister = Some(persister);
117 self
118 }
119
120 pub fn with_persister_override(
122 mut self,
123 entity_type: impl Into<String>,
124 persister: Arc<dyn Persister>,
125 ) -> Self {
126 self.persister_overrides
127 .insert(entity_type.into(), persister);
128 self
129 }
130
131 pub fn with_peer_clients(
137 mut self,
138 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
139 ) -> Self {
140 self.peer_clients = Some(peer_clients);
141 self
142 }
143
144 pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
148 self.after_init = Some(Box::new(f));
149 self
150 }
151
152 pub fn build(self) -> CellServer {
154 let bind_addr = self
155 .bind_addr
156 .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
157
158 let mut server = CellServer::new(CellServerConfig {
159 bind_addr,
160 postgres: self.postgres,
161 host_id: self.host_id,
162 peer_registry: self.peer_registry,
163 default_persister: self.default_persister,
164 persister_overrides: self.persister_overrides,
165 peer_clients: self.peer_clients,
166 });
167 server.after_init = std::sync::Mutex::new(self.after_init);
168 server
169 }
170}
171
172pub struct CellServer {
176 pub registry: Arc<StoreRegistry>,
178 pub handler_registry: Arc<HandlerRegistry>,
180 pub relationship_manager: Arc<RelationshipManager>,
182 pub postgres_producer: Option<PostgresProducerHandle>,
184 pub search_index: Arc<SearchIndex>,
186 pub persisters: Arc<PersisterRouter>,
188 pub host_id: Uuid,
190 config: CellServerConfig,
192 _postgres_producer_owner: Option<CellPostgresProducer>,
194 postgres_consumer: Option<CellPostgresConsumer>,
196 ready: Arc<AtomicBool>,
198 peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
200 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
202 after_init: std::sync::Mutex<Option<AfterInitCallback>>,
204 saga_event_tx: flume::Sender<MEvent>,
206 saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
208 saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
210 _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
212 #[cfg(feature = "inspector")]
214 _inspector: hyphae::server::InspectorServer,
215}
216
217impl CellServer {
218 pub fn builder() -> CellServerBuilder {
220 CellServerBuilder::new()
221 }
222
223 pub fn new(config: CellServerConfig) -> Self {
225 let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
226 let registry = Arc::new(StoreRegistry::new());
227 let handler_registry = Arc::new(HandlerRegistry::new());
228 let relationship_manager = Arc::new(RelationshipManager::new());
229
230 init_client_registry();
232
233 let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
234 let (postgres_producer_owner, postgres_producer, postgres_consumer) =
235 if let Some(ref postgres_config) = config.postgres {
236 match CellPostgresProducer::new(postgres_config, host_id) {
237 Ok(producer) => {
238 let handle = producer.handle();
239 let consumer = match CellPostgresConsumer::start(
240 postgres_config,
241 host_id,
242 handler_registry.clone(),
243 registry.clone(),
244 ) {
245 Ok(c) => Some(c),
246 Err(e) => {
247 log::error!("Failed to start Postgres consumer: {}", e);
248 None
249 }
250 };
251 (Some(producer), Some(handle), consumer)
252 }
253 Err(e) => {
254 log::error!("Failed to create Postgres producer: {}", e);
255 (None, None, None)
256 }
257 }
258 } else {
259 (None, None, None)
260 };
261
262 let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
264
265 let search_index = Arc::new(SearchIndex::new());
267
268 let mut persister_router = PersisterRouter::default();
273 if let Some(default_persister) = config.default_persister.clone() {
274 persister_router.set_default(Some(default_persister));
275 } else if let Some(handle) = postgres_producer.clone() {
276 persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
277 }
278 for (entity_type, persister) in &config.persister_overrides {
279 persister_router.set_override(entity_type.clone(), persister.clone());
280 }
281 let persisters = Arc::new(persister_router);
282
283 #[cfg(feature = "inspector")]
285 let inspector = hyphae::server::start_server("myko");
286 #[cfg(feature = "inspector")]
287 log::info!("Hyphae inspector on port {}", inspector.port());
288
289 let peer_clients = config
290 .peer_clients
291 .clone()
292 .unwrap_or_else(|| Arc::new(dashmap::DashMap::new()));
293
294 Self {
295 registry,
296 handler_registry,
297 relationship_manager,
298 postgres_producer,
299 search_index,
300 persisters,
301 host_id,
302 config,
303 _postgres_producer_owner: postgres_producer_owner,
304 postgres_consumer,
305 ready,
306 peer_registry_instance: RwLock::new(None),
307 peer_clients,
308 after_init: std::sync::Mutex::new(None),
309 saga_event_tx,
310 saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
311 saga_tasks: std::sync::Mutex::new(Vec::new()),
312 _server_ownership_guard: std::sync::Mutex::new(None),
313 #[cfg(feature = "inspector")]
314 _inspector: inspector,
315 }
316 }
317
318 pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
320 let peer_config = config.or_else(|| self.config.peer_registry.clone());
321
322 if let Some(peer_config) = peer_config {
323 log::info!("Starting peer registry");
324 let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
325 *self.peer_registry_instance.write().unwrap() = Some(pr);
326 }
327 }
328
329 pub fn has_peer_registry(&self) -> bool {
331 self.peer_registry_instance.read().unwrap().is_some()
332 }
333
334 pub fn registry(&self) -> Arc<StoreRegistry> {
336 self.registry.clone()
337 }
338
339 pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
341 self.handler_registry.clone()
342 }
343
344 pub fn ctx(&self) -> CellServerCtx {
346 let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
347 self.config.postgres.as_ref().map(|pg| {
348 Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
349 as Arc<dyn myko::server::HistoryReplayProvider>
350 });
351 CellServerCtx::new(
352 self.host_id,
353 self.registry.clone(),
354 self.handler_registry.clone(),
355 self.relationship_manager.clone(),
356 self.persisters.clone(),
357 self.search_index.clone(),
358 self.peer_clients.clone(),
359 Some(self.saga_event_tx.clone()),
360 history_replay,
361 )
362 }
363
364 fn start_saga_runtime(&self) {
365 let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
366 if registrations.is_empty() {
367 return;
368 }
369 let Some(rx) = self
370 .saga_event_rx
371 .lock()
372 .expect("saga_event_rx mutex poisoned")
373 .take()
374 else {
375 return;
376 };
377
378 log::info!("Starting saga runtime with {} saga(s)", registrations.len());
379
380 struct SagaChannel {
383 tx: flume::Sender<MEvent>,
384 entity_type: &'static str,
385 change_type: myko::event::MEventType,
386 }
387 let mut saga_channels: Vec<SagaChannel> = Vec::new();
388
389 for registration in registrations {
390 let saga = (registration.create)();
391 let saga_name = saga.name().to_string();
392 let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
393 saga_channels.push(SagaChannel {
394 tx: saga_tx,
395 entity_type: registration.event_entity_type,
396 change_type: registration.event_change_type,
397 });
398 let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
399 saga_rx,
400 move |saga_rx| async move {
401 saga_rx
402 .recv_async()
403 .await
404 .ok()
405 .map(|event| (event, saga_rx))
406 },
407 ));
408
409 let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
410 self.host_id,
411 self.registry.clone(),
412 self.saga_event_tx.clone(),
413 ));
414 let mut command_stream = saga.build_boxed(events, saga_ctx);
415
416 let host_id = self.host_id;
417 let registry = self.registry.clone();
418 let handler_registry = self.handler_registry.clone();
419 let relationship_manager = self.relationship_manager.clone();
420 let persisters = self.persisters.clone();
421 let search_index = self.search_index.clone();
422 let peer_clients = self.peer_clients.clone();
423 let saga_event_tx = self.saga_event_tx.clone();
424
425 let handle = tokio::spawn(async move {
426 while let Some(command) = command_stream.next().await {
427 let command_name = command.command_name();
428 log::debug!("Saga {} executing command {}", saga_name, command_name);
429 let req = Arc::new(RequestContext::internal(
430 Arc::from(Uuid::new_v4().to_string()),
431 host_id,
432 &format!("saga:{saga_name}"),
433 ));
434
435 let cmd_ctx = CommandContext::new(
436 Arc::from(command_name),
437 req,
438 Arc::new(CellServerCtx::new(
439 host_id,
440 registry.clone(),
441 handler_registry.clone(),
442 relationship_manager.clone(),
443 persisters.clone(),
444 search_index.clone(),
445 peer_clients.clone(),
446 Some(saga_event_tx.clone()),
447 None,
448 )),
449 );
450
451 if let Err(err) = command.execute_boxed(cmd_ctx) {
452 log::error!(
453 "Saga {} command {} failed: {}",
454 saga_name,
455 command_name,
456 err.message
457 );
458 }
459 }
460 });
461
462 self.saga_tasks
463 .lock()
464 .expect("saga_tasks mutex poisoned")
465 .push(handle);
466 }
467
468 let dispatcher = tokio::spawn(async move {
471 while let Ok(event) = rx.recv_async().await {
472 for ch in &saga_channels {
473 if event.item_type == ch.entity_type && event.change_type == ch.change_type {
474 let _ = ch.tx.send(event.clone());
475 }
476 }
477 }
478 });
479 self.saga_tasks
480 .lock()
481 .expect("saga_tasks mutex poisoned")
482 .push(dispatcher);
483 }
484
485 pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
487 self.config
488 .postgres
489 .clone()
490 .map(PostgresHistoryStore::new)
491 .transpose()
492 }
493
494 pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
496 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
497 return Err(
498 "Postgres is configured but the Postgres consumer is not running".to_string(),
499 );
500 }
501
502 if let Some(ref consumer) = self.postgres_consumer {
503 consumer.wait_until_caught_up(timeout)?;
504 self.ready.store(true, Ordering::SeqCst);
505 }
506 Ok(())
507 }
508
509 pub fn establish_relations(&self) {
511 if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
512 log::error!("Failed to establish relations: {e}");
513 }
514 }
515
516 pub fn is_ready(&self) -> bool {
518 if let Some(ref consumer) = self.postgres_consumer {
519 if consumer.is_caught_up() {
520 self.ready.store(true, Ordering::SeqCst);
521 return true;
522 }
523 return false;
524 }
525 true
526 }
527
528 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
530 use tokio::net::TcpListener;
531
532 let entity_types: Vec<&str> = self
534 .handler_registry
535 .entity_types()
536 .map(|t| t.as_ref())
537 .collect();
538 self.persisters
539 .startup_healthcheck(&entity_types)
540 .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
541
542 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
543 return Err("Postgres is configured but the Postgres consumer failed to start".into());
544 }
545
546 if self.postgres_consumer.is_some() {
548 log::info!("Waiting for Postgres event consumer to catch up...");
549 let timeout = std::time::Duration::from_secs(300);
550 self.init_postgres_and_wait(timeout)
551 .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
552 log::info!("Postgres caught up, ready to accept connections");
553 }
554
555 log::info!("Building search index...");
557 self.search_index.build_from_registry(&self.registry);
558
559 log::info!("Establishing relations...");
561 self.establish_relations();
562
563 log::info!("Checking server-owned item ownership...");
565 if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
566 log::error!("Failed to claim orphaned server-owned items: {}", e);
567 }
568 let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
569 *self
570 ._server_ownership_guard
571 .lock()
572 .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
573
574 let listener = TcpListener::bind(&self.config.bind_addr).await?;
577 log::info!("CellServer listening on {}", self.config.bind_addr);
578 log::info!(
579 "WebSocket server listening on ws://{}/myko",
580 self.config.bind_addr
581 );
582
583 if self.config.peer_registry.is_some() {
585 self.start_peer_registry(None);
586 }
587
588 if let Some(hook) = self
590 .after_init
591 .lock()
592 .expect("after_init mutex poisoned")
593 .take()
594 {
595 hook(self);
596 }
597
598 self.start_saga_runtime();
599
600 log::info!("Server started");
601 self.run_ws_accept_loop(listener).await
602 }
603
604 pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
606 use tokio::net::TcpListener;
607
608 let listener = TcpListener::bind(&self.config.bind_addr).await?;
609 log::info!("CellServer listening on {}", self.config.bind_addr);
610 log::info!(
611 "WebSocket server listening on ws://{}/myko",
612 self.config.bind_addr
613 );
614 self.run_ws_accept_loop(listener).await
615 }
616
617 async fn run_ws_accept_loop(
618 &self,
619 listener: tokio::net::TcpListener,
620 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
621 let ready = self.ready.clone();
622
623 loop {
624 let (stream, addr) = listener.accept().await?;
625
626 if !ready.load(Ordering::SeqCst) {
628 if self.is_ready() {
629 log::info!("Server is now ready to accept connections");
630 } else {
631 log::warn!(
632 "Rejecting connection from {} - server not ready (durable backend catching up)",
633 addr
634 );
635 drop(stream);
636 continue;
637 }
638 }
639
640 log::debug!("New connection from {}", addr);
641
642 let ctx = self.ctx();
643
644 tokio::spawn(async move {
645 if let Err(e) =
646 ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
647 {
648 log::error!("Connection error from {}: {}", addr, e);
649 }
650 });
651 }
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658
659 #[test]
660 fn test_server_creation() {
661 let config = CellServerConfig {
662 bind_addr: "127.0.0.1:0".parse().unwrap(),
663 postgres: None,
664 host_id: None,
665 peer_registry: None,
666 default_persister: None,
667 persister_overrides: HashMap::new(),
668 peer_clients: None,
669 };
670 let server = CellServer::new(config);
671 assert!(Arc::strong_count(&server.registry) >= 1);
672 }
673
674 #[test]
675 fn test_server_with_host_id() {
676 let host_id = Uuid::new_v4();
677 let config = CellServerConfig {
678 bind_addr: "127.0.0.1:0".parse().unwrap(),
679 postgres: None,
680 host_id: Some(host_id),
681 peer_registry: None,
682 default_persister: None,
683 persister_overrides: HashMap::new(),
684 peer_clients: None,
685 };
686 let server = CellServer::new(config);
687 assert_eq!(server.host_id, host_id);
688 }
689}