1pub mod mcp;
13pub mod peer_registry;
14pub mod postgres;
15pub mod ws_handler;
16
17use std::{
19 collections::HashMap,
20 net::SocketAddr,
21 sync::{
22 Arc, RwLock,
23 atomic::{AtomicBool, Ordering},
24 },
25 time::Duration,
26};
27
28use futures_util::StreamExt;
29pub use myko::server::*;
30use myko::{
31 client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
32 search::SearchIndex, store::StoreRegistry, wire::MEvent,
33};
34use uuid::Uuid;
35
36use crate::postgres::{
37 CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
38 PostgresHistoryStore, PostgresProducerHandle,
39};
40
41#[derive(Clone)]
43pub struct CellServerConfig {
44 pub bind_addr: SocketAddr,
46 pub postgres: Option<PostgresConfig>,
48 pub host_id: Option<Uuid>,
50 pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
52 pub default_persister: Option<Arc<dyn Persister>>,
54 pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
56}
57
58#[derive(Default)]
60pub struct CellServerBuilder {
61 bind_addr: Option<SocketAddr>,
62 host_id: Option<Uuid>,
63 postgres: Option<PostgresConfig>,
64 peer_registry: Option<peer_registry::PeerRegistryConfig>,
65 default_persister: Option<Arc<dyn Persister>>,
66 persister_overrides: HashMap<String, Arc<dyn Persister>>,
67 after_init: Option<AfterInitCallback>,
68}
69
70type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
71
72impl CellServerBuilder {
73 pub fn new() -> Self {
75 Self::default()
76 }
77
78 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80 self.bind_addr = Some(addr);
81 self
82 }
83
84 pub fn with_host_id(mut self, id: Uuid) -> Self {
86 self.host_id = Some(id);
87 self
88 }
89
90 pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
92 self.postgres = Some(config);
93 self
94 }
95
96 pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
98 self.peer_registry = Some(config);
99 self
100 }
101
102 pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
104 self.default_persister = Some(persister);
105 self
106 }
107
108 pub fn with_persister_override(
110 mut self,
111 entity_type: impl Into<String>,
112 persister: Arc<dyn Persister>,
113 ) -> Self {
114 self.persister_overrides
115 .insert(entity_type.into(), persister);
116 self
117 }
118
119 pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
123 self.after_init = Some(Box::new(f));
124 self
125 }
126
127 pub fn build(self) -> CellServer {
129 let bind_addr = self
130 .bind_addr
131 .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
132
133 let mut server = CellServer::new(CellServerConfig {
134 bind_addr,
135 postgres: self.postgres,
136 host_id: self.host_id,
137 peer_registry: self.peer_registry,
138 default_persister: self.default_persister,
139 persister_overrides: self.persister_overrides,
140 });
141 server.after_init = std::sync::Mutex::new(self.after_init);
142 server
143 }
144}
145
146pub struct CellServer {
150 pub registry: Arc<StoreRegistry>,
152 pub handler_registry: Arc<HandlerRegistry>,
154 pub relationship_manager: Arc<RelationshipManager>,
156 pub postgres_producer: Option<PostgresProducerHandle>,
158 pub search_index: Arc<SearchIndex>,
160 pub persisters: Arc<PersisterRouter>,
162 pub host_id: Uuid,
164 config: CellServerConfig,
166 _postgres_producer_owner: Option<CellPostgresProducer>,
168 postgres_consumer: Option<CellPostgresConsumer>,
170 ready: Arc<AtomicBool>,
172 peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
174 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
176 after_init: std::sync::Mutex<Option<AfterInitCallback>>,
178 saga_event_tx: flume::Sender<MEvent>,
180 saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
182 saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
184 #[cfg(feature = "inspector")]
186 _inspector: hyphae::server::InspectorServer,
187}
188
189impl CellServer {
190 pub fn builder() -> CellServerBuilder {
192 CellServerBuilder::new()
193 }
194
195 pub fn new(config: CellServerConfig) -> Self {
197 let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
198 let registry = Arc::new(StoreRegistry::new());
199 let handler_registry = Arc::new(HandlerRegistry::new());
200 let relationship_manager = Arc::new(RelationshipManager::new());
201
202 init_client_registry();
204
205 let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
206 let (postgres_producer_owner, postgres_producer, postgres_consumer) =
207 if let Some(ref postgres_config) = config.postgres {
208 match CellPostgresProducer::new(postgres_config, host_id) {
209 Ok(producer) => {
210 let handle = producer.handle();
211 let consumer = match CellPostgresConsumer::start(
212 postgres_config,
213 host_id,
214 handler_registry.clone(),
215 registry.clone(),
216 ) {
217 Ok(c) => Some(c),
218 Err(e) => {
219 log::error!("Failed to start Postgres consumer: {}", e);
220 None
221 }
222 };
223 (Some(producer), Some(handle), consumer)
224 }
225 Err(e) => {
226 log::error!("Failed to create Postgres producer: {}", e);
227 (None, None, None)
228 }
229 }
230 } else {
231 (None, None, None)
232 };
233
234 let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
236
237 let search_index = Arc::new(SearchIndex::new());
239
240 let mut persister_router = PersisterRouter::default();
245 if let Some(default_persister) = config.default_persister.clone() {
246 persister_router.set_default(Some(default_persister));
247 } else if let Some(handle) = postgres_producer.clone() {
248 persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
249 }
250 for (entity_type, persister) in &config.persister_overrides {
251 persister_router.set_override(entity_type.clone(), persister.clone());
252 }
253 let persisters = Arc::new(persister_router);
254
255 #[cfg(feature = "inspector")]
257 let inspector = hyphae::server::start_server("myko");
258 #[cfg(feature = "inspector")]
259 log::info!("Hyphae inspector on port {}", inspector.port());
260
261 Self {
262 registry,
263 handler_registry,
264 relationship_manager,
265 postgres_producer,
266 search_index,
267 persisters,
268 host_id,
269 config,
270 _postgres_producer_owner: postgres_producer_owner,
271 postgres_consumer,
272 ready,
273 peer_registry_instance: RwLock::new(None),
274 peer_clients: Arc::new(dashmap::DashMap::new()),
275 after_init: std::sync::Mutex::new(None),
276 saga_event_tx,
277 saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
278 saga_tasks: std::sync::Mutex::new(Vec::new()),
279 #[cfg(feature = "inspector")]
280 _inspector: inspector,
281 }
282 }
283
284 pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
286 let peer_config = config.or_else(|| self.config.peer_registry.clone());
287
288 if let Some(peer_config) = peer_config {
289 log::info!("Starting peer registry");
290 let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
291 *self.peer_registry_instance.write().unwrap() = Some(pr);
292 }
293 }
294
295 pub fn has_peer_registry(&self) -> bool {
297 self.peer_registry_instance.read().unwrap().is_some()
298 }
299
300 pub fn registry(&self) -> Arc<StoreRegistry> {
302 self.registry.clone()
303 }
304
305 pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
307 self.handler_registry.clone()
308 }
309
310 pub fn ctx(&self) -> CellServerCtx {
312 let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
313 self.config.postgres.as_ref().map(|pg| {
314 Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
315 as Arc<dyn myko::server::HistoryReplayProvider>
316 });
317 CellServerCtx::new(
318 self.host_id,
319 self.registry.clone(),
320 self.handler_registry.clone(),
321 self.relationship_manager.clone(),
322 self.persisters.clone(),
323 self.search_index.clone(),
324 self.peer_clients.clone(),
325 Some(self.saga_event_tx.clone()),
326 history_replay,
327 )
328 }
329
330 fn start_saga_runtime(&self) {
331 let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
332 if registrations.is_empty() {
333 return;
334 }
335 let Some(rx) = self
336 .saga_event_rx
337 .lock()
338 .expect("saga_event_rx mutex poisoned")
339 .take()
340 else {
341 return;
342 };
343
344 log::info!("Starting saga runtime with {} saga(s)", registrations.len());
345 let (event_tx, _) = tokio::sync::broadcast::channel::<MEvent>(8192);
346 let event_tx_dispatch = event_tx.clone();
347
348 let dispatcher = tokio::spawn(async move {
349 while let Ok(event) = rx.recv_async().await {
350 let _ = event_tx_dispatch.send(event);
351 }
352 });
353 self.saga_tasks
354 .lock()
355 .expect("saga_tasks mutex poisoned")
356 .push(dispatcher);
357
358 for registration in registrations {
359 let saga = (registration.create)();
360 let saga_name = saga.name().to_string();
361 let saga_name_for_stream = saga_name.clone();
362 let event_rx = event_tx.subscribe();
363 let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
364 event_rx,
365 move |mut event_rx| {
366 let saga_name_for_stream = saga_name_for_stream.clone();
367 async move {
368 loop {
369 match event_rx.recv().await {
370 Ok(event) => return Some((event, event_rx)),
371 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
372 log::warn!(
373 "Saga {} lagged; skipped {} events",
374 saga_name_for_stream,
375 skipped
376 );
377 continue;
378 }
379 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
380 return None;
381 }
382 }
383 }
384 }
385 },
386 ));
387
388 let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
389 self.host_id,
390 self.registry.clone(),
391 self.saga_event_tx.clone(),
392 ));
393 let mut command_stream = saga.build_boxed(events, saga_ctx);
394
395 let host_id = self.host_id;
396 let registry = self.registry.clone();
397 let handler_registry = self.handler_registry.clone();
398 let relationship_manager = self.relationship_manager.clone();
399 let persisters = self.persisters.clone();
400 let search_index = self.search_index.clone();
401 let peer_clients = self.peer_clients.clone();
402 let saga_event_tx = self.saga_event_tx.clone();
403
404 let handle = tokio::spawn(async move {
405 while let Some(command) = command_stream.next().await {
406 let command_name = command.command_name();
407 log::debug!("Saga {} executing command {}", saga_name, command_name);
408 let req = Arc::new(RequestContext::internal(
409 Arc::from(Uuid::new_v4().to_string()),
410 host_id,
411 &format!("saga:{saga_name}"),
412 ));
413
414 let cmd_ctx = CommandContext::new(
415 Arc::from(command_name),
416 req,
417 Arc::new(CellServerCtx::new(
418 host_id,
419 registry.clone(),
420 handler_registry.clone(),
421 relationship_manager.clone(),
422 persisters.clone(),
423 search_index.clone(),
424 peer_clients.clone(),
425 Some(saga_event_tx.clone()),
426 None,
427 )),
428 );
429
430 if let Err(err) = command.execute_boxed(cmd_ctx) {
431 log::error!(
432 "Saga {} command {} failed: {}",
433 saga_name,
434 command_name,
435 err.message
436 );
437 }
438 }
439 });
440
441 self.saga_tasks
442 .lock()
443 .expect("saga_tasks mutex poisoned")
444 .push(handle);
445 }
446 }
447
448 pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
450 self.config
451 .postgres
452 .clone()
453 .map(PostgresHistoryStore::new)
454 .transpose()
455 }
456
457 pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
459 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
460 return Err(
461 "Postgres is configured but the Postgres consumer is not running".to_string(),
462 );
463 }
464
465 if let Some(ref consumer) = self.postgres_consumer {
466 consumer.wait_until_caught_up(timeout)?;
467 self.ready.store(true, Ordering::SeqCst);
468 }
469 Ok(())
470 }
471
472 pub fn establish_relations(&self) {
474 if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
475 log::error!("Failed to establish relations: {e}");
476 }
477 }
478
479 pub fn is_ready(&self) -> bool {
481 if let Some(ref consumer) = self.postgres_consumer {
482 if consumer.is_caught_up() {
483 self.ready.store(true, Ordering::SeqCst);
484 return true;
485 }
486 return false;
487 }
488 true
489 }
490
491 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
493 use tokio::net::TcpListener;
494
495 let entity_types: Vec<&str> = self
497 .handler_registry
498 .entity_types()
499 .map(|t| t.as_ref())
500 .collect();
501 self.persisters
502 .startup_healthcheck(&entity_types)
503 .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
504
505 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
506 return Err("Postgres is configured but the Postgres consumer failed to start".into());
507 }
508
509 if self.postgres_consumer.is_some() {
511 log::info!("Waiting for Postgres event consumer to catch up...");
512 let timeout = std::time::Duration::from_secs(300);
513 self.init_postgres_and_wait(timeout)
514 .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
515 log::info!("Postgres caught up, ready to accept connections");
516 }
517
518 log::info!("Building search index...");
520 self.search_index.build_from_registry(&self.registry);
521
522 log::info!("Establishing relations...");
524 self.establish_relations();
525
526 let listener = TcpListener::bind(&self.config.bind_addr).await?;
529 log::info!("CellServer listening on {}", self.config.bind_addr);
530 log::info!(
531 "WebSocket server listening on ws://{}/myko",
532 self.config.bind_addr
533 );
534
535 if self.config.peer_registry.is_some() {
537 self.start_peer_registry(None);
538 }
539
540 if let Some(hook) = self
542 .after_init
543 .lock()
544 .expect("after_init mutex poisoned")
545 .take()
546 {
547 hook(self);
548 }
549
550 self.start_saga_runtime();
551
552 log::info!("Server started");
553 self.run_ws_accept_loop(listener).await
554 }
555
556 pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
558 use tokio::net::TcpListener;
559
560 let listener = TcpListener::bind(&self.config.bind_addr).await?;
561 log::info!("CellServer listening on {}", self.config.bind_addr);
562 log::info!(
563 "WebSocket server listening on ws://{}/myko",
564 self.config.bind_addr
565 );
566 self.run_ws_accept_loop(listener).await
567 }
568
569 async fn run_ws_accept_loop(
570 &self,
571 listener: tokio::net::TcpListener,
572 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
573 let ready = self.ready.clone();
574
575 loop {
576 let (stream, addr) = listener.accept().await?;
577
578 if !ready.load(Ordering::SeqCst) {
580 if self.is_ready() {
581 log::info!("Server is now ready to accept connections");
582 } else {
583 log::warn!(
584 "Rejecting connection from {} - server not ready (durable backend catching up)",
585 addr
586 );
587 drop(stream);
588 continue;
589 }
590 }
591
592 log::debug!("New connection from {}", addr);
593
594 let ctx = self.ctx();
595
596 tokio::spawn(async move {
597 if let Err(e) =
598 ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
599 {
600 log::error!("Connection error from {}: {}", addr, e);
601 }
602 });
603 }
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610
611 #[test]
612 fn test_server_creation() {
613 let config = CellServerConfig {
614 bind_addr: "127.0.0.1:0".parse().unwrap(),
615 postgres: None,
616 host_id: None,
617 peer_registry: None,
618 default_persister: None,
619 persister_overrides: HashMap::new(),
620 };
621 let server = CellServer::new(config);
622 assert!(Arc::strong_count(&server.registry) >= 1);
623 }
624
625 #[test]
626 fn test_server_with_host_id() {
627 let host_id = Uuid::new_v4();
628 let config = CellServerConfig {
629 bind_addr: "127.0.0.1:0".parse().unwrap(),
630 postgres: None,
631 host_id: Some(host_id),
632 peer_registry: None,
633 default_persister: None,
634 persister_overrides: HashMap::new(),
635 };
636 let server = CellServer::new(config);
637 assert_eq!(server.host_id, host_id);
638 }
639}