1pub mod mcp;
13pub mod peer_registry;
14pub mod postgres;
15pub mod server_ownership;
16pub mod ws_handler;
17
18pub use server_ownership::ServerOwnershipManager;
19
20use std::{
22 collections::HashMap,
23 net::SocketAddr,
24 sync::{
25 Arc, RwLock,
26 atomic::{AtomicBool, Ordering},
27 },
28 time::Duration,
29};
30
31use futures_util::StreamExt;
32pub use myko::server::*;
33use myko::{
34 client::MykoClient, command::CommandContext, request::RequestContext, saga::SagaRegistration,
35 search::SearchIndex, store::StoreRegistry, wire::MEvent,
36};
37use uuid::Uuid;
38
39use crate::postgres::{
40 CellPostgresConsumer, CellPostgresProducer, PostgresConfig, PostgresHistoryReplayProvider,
41 PostgresHistoryStore, PostgresProducerHandle,
42};
43
44#[derive(Clone)]
46pub struct CellServerConfig {
47 pub bind_addr: SocketAddr,
49 pub postgres: Option<PostgresConfig>,
51 pub host_id: Option<Uuid>,
53 pub peer_registry: Option<peer_registry::PeerRegistryConfig>,
55 pub default_persister: Option<Arc<dyn Persister>>,
57 pub persister_overrides: HashMap<String, Arc<dyn Persister>>,
59}
60
61#[derive(Default)]
63pub struct CellServerBuilder {
64 bind_addr: Option<SocketAddr>,
65 host_id: Option<Uuid>,
66 postgres: Option<PostgresConfig>,
67 peer_registry: Option<peer_registry::PeerRegistryConfig>,
68 default_persister: Option<Arc<dyn Persister>>,
69 persister_overrides: HashMap<String, Arc<dyn Persister>>,
70 after_init: Option<AfterInitCallback>,
71}
72
73type AfterInitCallback = Box<dyn FnOnce(&CellServer) + Send>;
74
75impl CellServerBuilder {
76 pub fn new() -> Self {
78 Self::default()
79 }
80
81 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
83 self.bind_addr = Some(addr);
84 self
85 }
86
87 pub fn with_host_id(mut self, id: Uuid) -> Self {
89 self.host_id = Some(id);
90 self
91 }
92
93 pub fn with_postgres(mut self, config: PostgresConfig) -> Self {
95 self.postgres = Some(config);
96 self
97 }
98
99 pub fn with_peer_registry(mut self, config: peer_registry::PeerRegistryConfig) -> Self {
101 self.peer_registry = Some(config);
102 self
103 }
104
105 pub fn with_default_persister(mut self, persister: Arc<dyn Persister>) -> Self {
107 self.default_persister = Some(persister);
108 self
109 }
110
111 pub fn with_persister_override(
113 mut self,
114 entity_type: impl Into<String>,
115 persister: Arc<dyn Persister>,
116 ) -> Self {
117 self.persister_overrides
118 .insert(entity_type.into(), persister);
119 self
120 }
121
122 pub fn after_init(mut self, f: impl FnOnce(&CellServer) + Send + 'static) -> Self {
126 self.after_init = Some(Box::new(f));
127 self
128 }
129
130 pub fn build(self) -> CellServer {
132 let bind_addr = self
133 .bind_addr
134 .unwrap_or_else(|| "127.0.0.1:5155".parse().unwrap());
135
136 let mut server = CellServer::new(CellServerConfig {
137 bind_addr,
138 postgres: self.postgres,
139 host_id: self.host_id,
140 peer_registry: self.peer_registry,
141 default_persister: self.default_persister,
142 persister_overrides: self.persister_overrides,
143 });
144 server.after_init = std::sync::Mutex::new(self.after_init);
145 server
146 }
147}
148
149pub struct CellServer {
153 pub registry: Arc<StoreRegistry>,
155 pub handler_registry: Arc<HandlerRegistry>,
157 pub relationship_manager: Arc<RelationshipManager>,
159 pub postgres_producer: Option<PostgresProducerHandle>,
161 pub search_index: Arc<SearchIndex>,
163 pub persisters: Arc<PersisterRouter>,
165 pub host_id: Uuid,
167 config: CellServerConfig,
169 _postgres_producer_owner: Option<CellPostgresProducer>,
171 postgres_consumer: Option<CellPostgresConsumer>,
173 ready: Arc<AtomicBool>,
175 peer_registry_instance: RwLock<Option<peer_registry::PeerRegistry>>,
177 peer_clients: Arc<dashmap::DashMap<Arc<str>, Arc<MykoClient>>>,
179 after_init: std::sync::Mutex<Option<AfterInitCallback>>,
181 saga_event_tx: flume::Sender<MEvent>,
183 saga_event_rx: std::sync::Mutex<Option<flume::Receiver<MEvent>>>,
185 saga_tasks: std::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
187 _server_ownership_guard: std::sync::Mutex<Option<hyphae::SubscriptionGuard>>,
189 #[cfg(feature = "inspector")]
191 _inspector: hyphae::server::InspectorServer,
192}
193
194impl CellServer {
195 pub fn builder() -> CellServerBuilder {
197 CellServerBuilder::new()
198 }
199
200 pub fn new(config: CellServerConfig) -> Self {
202 let host_id = config.host_id.unwrap_or_else(Uuid::new_v4);
203 let registry = Arc::new(StoreRegistry::new());
204 let handler_registry = Arc::new(HandlerRegistry::new());
205 let relationship_manager = Arc::new(RelationshipManager::new());
206
207 init_client_registry();
209
210 let (saga_event_tx, saga_event_rx) = flume::unbounded::<MEvent>();
211 let (postgres_producer_owner, postgres_producer, postgres_consumer) =
212 if let Some(ref postgres_config) = config.postgres {
213 match CellPostgresProducer::new(postgres_config, host_id) {
214 Ok(producer) => {
215 let handle = producer.handle();
216 let consumer = match CellPostgresConsumer::start(
217 postgres_config,
218 host_id,
219 handler_registry.clone(),
220 registry.clone(),
221 ) {
222 Ok(c) => Some(c),
223 Err(e) => {
224 log::error!("Failed to start Postgres consumer: {}", e);
225 None
226 }
227 };
228 (Some(producer), Some(handle), consumer)
229 }
230 Err(e) => {
231 log::error!("Failed to create Postgres producer: {}", e);
232 (None, None, None)
233 }
234 }
235 } else {
236 (None, None, None)
237 };
238
239 let ready = Arc::new(AtomicBool::new(postgres_consumer.is_none()));
241
242 let search_index = Arc::new(SearchIndex::new());
244
245 let mut persister_router = PersisterRouter::default();
250 if let Some(default_persister) = config.default_persister.clone() {
251 persister_router.set_default(Some(default_persister));
252 } else if let Some(handle) = postgres_producer.clone() {
253 persister_router.set_default(Some(Arc::new(handle) as Arc<dyn Persister>));
254 }
255 for (entity_type, persister) in &config.persister_overrides {
256 persister_router.set_override(entity_type.clone(), persister.clone());
257 }
258 let persisters = Arc::new(persister_router);
259
260 #[cfg(feature = "inspector")]
262 let inspector = hyphae::server::start_server("myko");
263 #[cfg(feature = "inspector")]
264 log::info!("Hyphae inspector on port {}", inspector.port());
265
266 Self {
267 registry,
268 handler_registry,
269 relationship_manager,
270 postgres_producer,
271 search_index,
272 persisters,
273 host_id,
274 config,
275 _postgres_producer_owner: postgres_producer_owner,
276 postgres_consumer,
277 ready,
278 peer_registry_instance: RwLock::new(None),
279 peer_clients: Arc::new(dashmap::DashMap::new()),
280 after_init: std::sync::Mutex::new(None),
281 saga_event_tx,
282 saga_event_rx: std::sync::Mutex::new(Some(saga_event_rx)),
283 saga_tasks: std::sync::Mutex::new(Vec::new()),
284 _server_ownership_guard: std::sync::Mutex::new(None),
285 #[cfg(feature = "inspector")]
286 _inspector: inspector,
287 }
288 }
289
290 pub fn start_peer_registry(&self, config: Option<peer_registry::PeerRegistryConfig>) {
292 let peer_config = config.or_else(|| self.config.peer_registry.clone());
293
294 if let Some(peer_config) = peer_config {
295 log::info!("Starting peer registry");
296 let pr = peer_registry::PeerRegistry::new(self.ctx(), peer_config);
297 *self.peer_registry_instance.write().unwrap() = Some(pr);
298 }
299 }
300
301 pub fn has_peer_registry(&self) -> bool {
303 self.peer_registry_instance.read().unwrap().is_some()
304 }
305
306 pub fn registry(&self) -> Arc<StoreRegistry> {
308 self.registry.clone()
309 }
310
311 pub fn handler_registry(&self) -> Arc<HandlerRegistry> {
313 self.handler_registry.clone()
314 }
315
316 pub fn ctx(&self) -> CellServerCtx {
318 let history_replay: Option<Arc<dyn myko::server::HistoryReplayProvider>> =
319 self.config.postgres.as_ref().map(|pg| {
320 Arc::new(PostgresHistoryReplayProvider::new(pg.clone()))
321 as Arc<dyn myko::server::HistoryReplayProvider>
322 });
323 CellServerCtx::new(
324 self.host_id,
325 self.registry.clone(),
326 self.handler_registry.clone(),
327 self.relationship_manager.clone(),
328 self.persisters.clone(),
329 self.search_index.clone(),
330 self.peer_clients.clone(),
331 Some(self.saga_event_tx.clone()),
332 history_replay,
333 )
334 }
335
336 fn start_saga_runtime(&self) {
337 let registrations: Vec<_> = inventory::iter::<SagaRegistration>().collect();
338 if registrations.is_empty() {
339 return;
340 }
341 let Some(rx) = self
342 .saga_event_rx
343 .lock()
344 .expect("saga_event_rx mutex poisoned")
345 .take()
346 else {
347 return;
348 };
349
350 log::info!("Starting saga runtime with {} saga(s)", registrations.len());
351
352 struct SagaChannel {
355 tx: flume::Sender<MEvent>,
356 entity_type: &'static str,
357 change_type: myko::event::MEventType,
358 }
359 let mut saga_channels: Vec<SagaChannel> = Vec::new();
360
361 for registration in registrations {
362 let saga = (registration.create)();
363 let saga_name = saga.name().to_string();
364 let (saga_tx, saga_rx) = flume::unbounded::<MEvent>();
365 saga_channels.push(SagaChannel {
366 tx: saga_tx,
367 entity_type: registration.event_entity_type,
368 change_type: registration.event_change_type,
369 });
370 let events: myko::saga::EventStream = Box::pin(futures_util::stream::unfold(
371 saga_rx,
372 move |saga_rx| async move {
373 saga_rx
374 .recv_async()
375 .await
376 .ok()
377 .map(|event| (event, saga_rx))
378 },
379 ));
380
381 let saga_ctx = Arc::new(myko::saga::SagaContext::with_event_sink(
382 self.host_id,
383 self.registry.clone(),
384 self.saga_event_tx.clone(),
385 ));
386 let mut command_stream = saga.build_boxed(events, saga_ctx);
387
388 let host_id = self.host_id;
389 let registry = self.registry.clone();
390 let handler_registry = self.handler_registry.clone();
391 let relationship_manager = self.relationship_manager.clone();
392 let persisters = self.persisters.clone();
393 let search_index = self.search_index.clone();
394 let peer_clients = self.peer_clients.clone();
395 let saga_event_tx = self.saga_event_tx.clone();
396
397 let handle = tokio::spawn(async move {
398 while let Some(command) = command_stream.next().await {
399 let command_name = command.command_name();
400 log::debug!("Saga {} executing command {}", saga_name, command_name);
401 let req = Arc::new(RequestContext::internal(
402 Arc::from(Uuid::new_v4().to_string()),
403 host_id,
404 &format!("saga:{saga_name}"),
405 ));
406
407 let cmd_ctx = CommandContext::new(
408 Arc::from(command_name),
409 req,
410 Arc::new(CellServerCtx::new(
411 host_id,
412 registry.clone(),
413 handler_registry.clone(),
414 relationship_manager.clone(),
415 persisters.clone(),
416 search_index.clone(),
417 peer_clients.clone(),
418 Some(saga_event_tx.clone()),
419 None,
420 )),
421 );
422
423 if let Err(err) = command.execute_boxed(cmd_ctx) {
424 log::error!(
425 "Saga {} command {} failed: {}",
426 saga_name,
427 command_name,
428 err.message
429 );
430 }
431 }
432 });
433
434 self.saga_tasks
435 .lock()
436 .expect("saga_tasks mutex poisoned")
437 .push(handle);
438 }
439
440 let dispatcher = tokio::spawn(async move {
443 while let Ok(event) = rx.recv_async().await {
444 for ch in &saga_channels {
445 if event.item_type == ch.entity_type && event.change_type == ch.change_type {
446 let _ = ch.tx.send(event.clone());
447 }
448 }
449 }
450 });
451 self.saga_tasks
452 .lock()
453 .expect("saga_tasks mutex poisoned")
454 .push(dispatcher);
455 }
456
457 pub fn postgres_history_store(&self) -> Result<Option<PostgresHistoryStore>, String> {
459 self.config
460 .postgres
461 .clone()
462 .map(PostgresHistoryStore::new)
463 .transpose()
464 }
465
466 pub fn init_postgres_and_wait(&self, timeout: Duration) -> Result<(), String> {
468 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
469 return Err(
470 "Postgres is configured but the Postgres consumer is not running".to_string(),
471 );
472 }
473
474 if let Some(ref consumer) = self.postgres_consumer {
475 consumer.wait_until_caught_up(timeout)?;
476 self.ready.store(true, Ordering::SeqCst);
477 }
478 Ok(())
479 }
480
481 pub fn establish_relations(&self) {
483 if let Err(e) = self.relationship_manager.establish_relations(&self.ctx()) {
484 log::error!("Failed to establish relations: {e}");
485 }
486 }
487
488 pub fn is_ready(&self) -> bool {
490 if let Some(ref consumer) = self.postgres_consumer {
491 if consumer.is_caught_up() {
492 self.ready.store(true, Ordering::SeqCst);
493 return true;
494 }
495 return false;
496 }
497 true
498 }
499
500 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
502 use tokio::net::TcpListener;
503
504 let entity_types: Vec<&str> = self
506 .handler_registry
507 .entity_types()
508 .map(|t| t.as_ref())
509 .collect();
510 self.persisters
511 .startup_healthcheck(&entity_types)
512 .map_err(|reason| format!("Persister startup healthcheck failed: {reason}"))?;
513
514 if self.config.postgres.is_some() && self.postgres_consumer.is_none() {
515 return Err("Postgres is configured but the Postgres consumer failed to start".into());
516 }
517
518 if self.postgres_consumer.is_some() {
520 log::info!("Waiting for Postgres event consumer to catch up...");
521 let timeout = std::time::Duration::from_secs(300);
522 self.init_postgres_and_wait(timeout)
523 .map_err(|reason| format!("Postgres startup catch-up failed: {reason}"))?;
524 log::info!("Postgres caught up, ready to accept connections");
525 }
526
527 log::info!("Building search index...");
529 self.search_index.build_from_registry(&self.registry);
530
531 log::info!("Establishing relations...");
533 self.establish_relations();
534
535 log::info!("Checking server-owned item ownership...");
537 if let Err(e) = ServerOwnershipManager::claim_orphaned(&self.ctx()) {
538 log::error!("Failed to claim orphaned server-owned items: {}", e);
539 }
540 let ownership_guard = ServerOwnershipManager::watch_peer_deaths(&self.ctx());
541 *self
542 ._server_ownership_guard
543 .lock()
544 .expect("server_ownership_guard mutex poisoned") = Some(ownership_guard);
545
546 let listener = TcpListener::bind(&self.config.bind_addr).await?;
549 log::info!("CellServer listening on {}", self.config.bind_addr);
550 log::info!(
551 "WebSocket server listening on ws://{}/myko",
552 self.config.bind_addr
553 );
554
555 if self.config.peer_registry.is_some() {
557 self.start_peer_registry(None);
558 }
559
560 if let Some(hook) = self
562 .after_init
563 .lock()
564 .expect("after_init mutex poisoned")
565 .take()
566 {
567 hook(self);
568 }
569
570 self.start_saga_runtime();
571
572 log::info!("Server started");
573 self.run_ws_accept_loop(listener).await
574 }
575
576 pub async fn run_ws_loop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
578 use tokio::net::TcpListener;
579
580 let listener = TcpListener::bind(&self.config.bind_addr).await?;
581 log::info!("CellServer listening on {}", self.config.bind_addr);
582 log::info!(
583 "WebSocket server listening on ws://{}/myko",
584 self.config.bind_addr
585 );
586 self.run_ws_accept_loop(listener).await
587 }
588
589 async fn run_ws_accept_loop(
590 &self,
591 listener: tokio::net::TcpListener,
592 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
593 let ready = self.ready.clone();
594
595 loop {
596 let (stream, addr) = listener.accept().await?;
597
598 if !ready.load(Ordering::SeqCst) {
600 if self.is_ready() {
601 log::info!("Server is now ready to accept connections");
602 } else {
603 log::warn!(
604 "Rejecting connection from {} - server not ready (durable backend catching up)",
605 addr
606 );
607 drop(stream);
608 continue;
609 }
610 }
611
612 log::debug!("New connection from {}", addr);
613
614 let ctx = self.ctx();
615
616 tokio::spawn(async move {
617 if let Err(e) =
618 ws_handler::WsHandler::handle_connection(stream, addr, Arc::new(ctx)).await
619 {
620 log::error!("Connection error from {}: {}", addr, e);
621 }
622 });
623 }
624 }
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn test_server_creation() {
633 let config = CellServerConfig {
634 bind_addr: "127.0.0.1:0".parse().unwrap(),
635 postgres: None,
636 host_id: None,
637 peer_registry: None,
638 default_persister: None,
639 persister_overrides: HashMap::new(),
640 };
641 let server = CellServer::new(config);
642 assert!(Arc::strong_count(&server.registry) >= 1);
643 }
644
645 #[test]
646 fn test_server_with_host_id() {
647 let host_id = Uuid::new_v4();
648 let config = CellServerConfig {
649 bind_addr: "127.0.0.1:0".parse().unwrap(),
650 postgres: None,
651 host_id: Some(host_id),
652 peer_registry: None,
653 default_persister: None,
654 persister_overrides: HashMap::new(),
655 };
656 let server = CellServer::new(config);
657 assert_eq!(server.host_id, host_id);
658 }
659}