1use std::{
9 any::Any,
10 collections::HashMap,
11 sync::{Arc, Mutex},
12 thread,
13 time::Duration,
14};
15
16use dashmap::DashMap;
17use hyphae::{Cell, CellImmutable, CellMap, CellMutable, Gettable, IdFor, Mutable, WeakCellMap};
18use serde::de::DeserializeOwned;
19use uuid::Uuid;
20
21use super::{
22 HandlerRegistry, RelationshipManager,
23 persister::{PersistError, PersistHealth, PersisterRouter},
24};
25use crate::{
26 cache::CacheKey,
27 client::{ConnectionStatus, MykoClient},
28 common::{
29 to_value::ToValue,
30 with_id::{WithId, WithTypedId},
31 },
32 core::item::{
33 AnyItem, Eventable, IngestBufferPolicy, downcast_any_item_arc, typed_map_arc_from_any_item,
34 },
35 query::{
36 FilteredCellMap, QueryContext, QueryFactory, QueryHandler, QueryParams, QueryRequest,
37 QueryTestCtx,
38 },
39 report::{ReportContext, ReportHandler, ReportId},
40 request::RequestContext,
41 search::SearchIndex,
42 store::StoreRegistry,
43 view::{FilteredViewCellMap, TypedViewCellMap, ViewFactory},
44 wire::{EventOptions, MEvent, MEventType},
45};
46
47type AnyItemArc = Arc<dyn AnyItem>;
48type AnyItemBatchEntries = Vec<(Arc<str>, AnyItemArc)>;
49type AnyItemEntriesByType = HashMap<Arc<str>, AnyItemBatchEntries>;
50
51trait ReportCacheEntryDyn: Any + Send + Sync {
55 fn as_any(&self) -> &dyn Any;
56 fn is_alive(&self) -> bool;
57}
58
59struct ReportCacheEntry<T> {
60 weak: hyphae::cell::WeakCell<T, CellImmutable>,
61}
62
63impl<T> ReportCacheEntry<T>
64where
65 T: Clone + Send + Sync + 'static,
66{
67 fn new(cell: &Cell<T, CellImmutable>) -> Self {
68 Self {
69 weak: cell.downgrade(),
70 }
71 }
72
73 fn get(&self) -> Option<Cell<T, CellImmutable>> {
74 self.weak.upgrade()
75 }
76}
77
78impl<T> ReportCacheEntryDyn for ReportCacheEntry<T>
79where
80 T: Clone + Send + Sync + 'static,
81{
82 fn as_any(&self) -> &dyn Any {
83 self
84 }
85
86 fn is_alive(&self) -> bool {
87 self.weak.upgrade().is_some()
88 }
89}
90
91struct MapCacheEntry {
92 weak: WeakCellMap<Arc<str>, AnyItemArc>,
93}
94
95#[derive(Default)]
96struct BufferedIngestState {
97 events: Vec<MEvent>,
98 flush_scheduled: bool,
99}
100
101struct BufferedIngestType {
102 state: Mutex<BufferedIngestState>,
103}
104
105impl BufferedIngestType {
106 fn new() -> Self {
107 Self {
108 state: Mutex::new(BufferedIngestState::default()),
109 }
110 }
111}
112
113impl MapCacheEntry {
114 fn new(map: &FilteredCellMap) -> Self {
115 Self {
116 weak: map.downgrade(),
117 }
118 }
119
120 fn get(&self) -> Option<FilteredCellMap> {
121 self.weak.upgrade().map(|map| map.lock())
122 }
123}
124
125#[derive(Clone)]
132pub struct CellServerCtx {
133 pub host_id: Uuid,
135 pub registry: Arc<StoreRegistry>,
137 pub handler_registry: Arc<HandlerRegistry>,
139 relationship_manager: Arc<RelationshipManager>,
141 persisters: Arc<PersisterRouter>,
143 search_index: Arc<SearchIndex>,
145 peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>,
147 peer_clients_tick: Cell<u64, CellMutable>,
149 event_sink: Option<flume::Sender<MEvent>>,
151 query_cache: Arc<DashMap<String, MapCacheEntry>>,
153 view_cache: Arc<DashMap<String, MapCacheEntry>>,
155 report_cache: Arc<DashMap<String, Arc<dyn ReportCacheEntryDyn>>>,
157 ingest_buffers: Arc<DashMap<Arc<str>, Arc<BufferedIngestType>>>,
159 history_replay: Option<Arc<dyn crate::server::HistoryReplayProvider>>,
161}
162
163impl CellServerCtx {
164 #[allow(clippy::too_many_arguments)]
166 pub fn new(
167 host_id: Uuid,
168 registry: Arc<StoreRegistry>,
169 handler_registry: Arc<HandlerRegistry>,
170 relationship_manager: Arc<RelationshipManager>,
171 persisters: Arc<PersisterRouter>,
172 search_index: Arc<SearchIndex>,
173 peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>,
174 event_sink: Option<flume::Sender<MEvent>>,
175 history_replay: Option<Arc<dyn crate::server::HistoryReplayProvider>>,
176 ) -> Self {
177 Self {
178 host_id,
179 registry,
180 handler_registry,
181 relationship_manager,
182 persisters,
183 search_index,
184 peer_clients,
185 peer_clients_tick: Cell::new(0).with_name("peer_clients_tick"),
186 event_sink,
187 query_cache: Arc::new(DashMap::new()),
188 view_cache: Arc::new(DashMap::new()),
189 report_cache: Arc::new(DashMap::new()),
190 ingest_buffers: Arc::new(DashMap::new()),
191 history_replay,
192 }
193 }
194
195 fn cache_key<T: CacheKey>(
196 &self,
197 kind: &str,
198 id: &str,
199 params: &T,
200 request: &RequestContext,
201 ) -> String {
202 let payload_hash = params.cache_key_hash();
203 format!("{}:{kind}:{id}:{payload_hash:016x}", request.host_id)
204 }
205
206 pub fn search_index(&self) -> &Arc<SearchIndex> {
208 &self.search_index
209 }
210
211 pub fn history_replay(&self) -> Option<&Arc<dyn crate::server::HistoryReplayProvider>> {
213 self.history_replay.as_ref()
214 }
215
216 pub fn register_peer_client<S: AsRef<str>>(&self, peer_id: S, client: Arc<MykoClient>) {
218 self.peer_clients
219 .insert(Arc::<str>::from(peer_id.as_ref()), client);
220 let next = self.peer_clients_tick.get().saturating_add(1);
221 self.peer_clients_tick.set(next);
222 }
223
224 pub fn unregister_peer_client(&self, peer_id: &str) {
226 if self.peer_clients.remove(peer_id).is_some() {
227 let next = self.peer_clients_tick.get().saturating_add(1);
228 self.peer_clients_tick.set(next);
229 }
230 }
231
232 pub fn peer_client(&self, peer_id: &str) -> Option<Arc<MykoClient>> {
234 self.peer_clients
235 .get(peer_id)
236 .map(|entry| entry.value().clone())
237 }
238
239 pub fn peer_connection_status(&self, peer_id: &str) -> Option<ConnectionStatus> {
241 self.peer_client(peer_id)
242 .map(|client| client.get_connection_status_sync())
243 }
244
245 pub fn peer_clients_tick(&self) -> Cell<u64, CellImmutable> {
247 self.peer_clients_tick.clone().lock()
248 }
249
250 pub fn peer_client_count(&self) -> usize {
252 self.peer_clients.len()
253 }
254
255 pub fn persist_health(&self) -> Arc<PersistHealth> {
257 self.persisters.default_health()
258 }
259
260 pub fn query_cache_len(&self) -> usize {
262 self.query_cache.len()
263 }
264
265 pub fn view_cache_len(&self) -> usize {
267 self.view_cache.len()
268 }
269
270 pub fn report_cache_len(&self) -> usize {
272 self.report_cache.len()
273 }
274
275 pub fn report_cache_live_count(&self) -> usize {
277 self.report_cache
278 .iter()
279 .filter(|entry| entry.value().is_alive())
280 .count()
281 }
282
283 pub fn query_cache_live_count(&self) -> usize {
285 self.query_cache
286 .iter()
287 .filter(|entry| entry.value().weak.upgrade().is_some())
288 .count()
289 }
290
291 pub fn view_cache_live_count(&self) -> usize {
293 self.view_cache
294 .iter()
295 .filter(|entry| entry.value().weak.upgrade().is_some())
296 .count()
297 }
298
299 pub fn sweep_dead_cache_entries(&self) -> (usize, usize, usize) {
302 let q_before = self.query_cache.len();
303 self.query_cache
304 .retain(|_, entry| entry.weak.upgrade().is_some());
305 let q_removed = q_before - self.query_cache.len();
306
307 let v_before = self.view_cache.len();
308 self.view_cache
309 .retain(|_, entry| entry.weak.upgrade().is_some());
310 let v_removed = v_before - self.view_cache.len();
311
312 let r_before = self.report_cache.len();
313 self.report_cache.retain(|_, entry| entry.is_alive());
314 let r_removed = r_before - self.report_cache.len();
315
316 (q_removed, v_removed, r_removed)
317 }
318
319 pub fn parse_item(
323 &self,
324 entity_type: &str,
325 json: &serde_json::Value,
326 ) -> Option<Arc<dyn AnyItem>> {
327 let parse = self.handler_registry.get_item_parser(entity_type)?;
328 parse(json.clone()).ok()
329 }
330
331 pub fn set<T>(&self, entity: &T) -> Result<(), PersistError>
339 where
340 T: Eventable + 'static,
341 {
342 self.set_with_options(entity, None)
343 }
344
345 pub fn set_with_options<T>(
351 &self,
352 entity: &T,
353 options: Option<EventOptions>,
354 ) -> Result<(), PersistError>
355 where
356 T: Eventable + 'static,
357 {
358 let options = options.unwrap_or_default();
359 let id = entity.id();
360 let entity_type = entity.entity_type();
361 let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
362
363 self.registry
365 .get_or_create(entity_type)
366 .insert(id.clone(), item.clone());
367
368 log::debug!("[entity] SET {} id={}", entity_type, id);
369
370 self.search_index.index_item(&item);
372
373 if !options.prevent_relationship_updates {
375 self.relationship_manager.forward_set(item, self)?;
376 }
377
378 if !options.prevent_persist {
380 self.produce_set(entity)?;
381 }
382
383 Ok(())
384 }
385
386 pub fn del<T>(&self, entity: &T) -> Result<(), PersistError>
390 where
391 T: Eventable + Clone + 'static,
392 {
393 self.del_with_options(entity, None)
394 }
395
396 pub fn del_with_options<T>(
398 &self,
399 entity: &T,
400 options: Option<EventOptions>,
401 ) -> Result<(), PersistError>
402 where
403 T: Eventable + Clone + 'static,
404 {
405 let options = options.unwrap_or_default();
406 let entity_type = entity.entity_type();
407 let id = entity.id();
408 let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
409
410 log::debug!("[entity] DEL {} id={}", entity_type, id);
411
412 self.registry.get_or_create(entity_type).remove(&id);
414
415 self.search_index.remove_entity(&id);
417
418 if !options.prevent_relationship_updates {
420 self.relationship_manager.forward_del(item.clone(), self)?;
421 }
422
423 if !options.prevent_persist {
425 self.produce_del(entity)?;
426 }
427
428 log::trace!("Published DEL {}:{}", entity_type, id);
429 Ok(())
430 }
431
432 pub fn batch_set<T>(&self, entities: &[T]) -> Result<(), PersistError>
436 where
437 T: Eventable + Clone + 'static,
438 {
439 self.batch_set_with_options(entities, None)
440 }
441
442 pub fn batch_set_with_options<T>(
446 &self,
447 entities: &[T],
448 options: Option<EventOptions>,
449 ) -> Result<(), PersistError>
450 where
451 T: Eventable + Clone + 'static,
452 {
453 if entities.is_empty() {
454 return Ok(());
455 }
456
457 let options = options.unwrap_or_default();
458 let entity_type = T::entity_name_static();
459 let store = self.registry.get_or_create(entity_type);
460
461 let mut entries: Vec<(Arc<str>, Arc<dyn AnyItem>)> = Vec::with_capacity(entities.len());
462 let mut items: Vec<Arc<dyn AnyItem>> = Vec::with_capacity(entities.len());
463
464 for entity in entities {
465 let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
466 log::debug!("[entity] SET {} id={}", entity_type, entity.id());
467 self.search_index.index_item(&item);
468 entries.push((entity.id(), item.clone()));
469 items.push(item);
470 }
471
472 store.insert_many(entries);
474
475 if !options.prevent_relationship_updates {
477 for item in &items {
478 self.relationship_manager.forward_set(item.clone(), self)?;
479 }
480 }
481
482 if !options.prevent_persist {
484 for item in &items {
485 self.produce_set_dyn(item)?;
486 }
487 }
488
489 log::trace!("Published batch SET {} count={}", entity_type, items.len());
490 Ok(())
491 }
492
493 pub fn batch_del<T>(&self, entities: &[T]) -> Result<(), PersistError>
497 where
498 T: Eventable + Clone + 'static,
499 {
500 self.batch_del_with_options(entities, None)
501 }
502
503 pub fn batch_del_with_options<T>(
507 &self,
508 entities: &[T],
509 options: Option<EventOptions>,
510 ) -> Result<(), PersistError>
511 where
512 T: Eventable + Clone + 'static,
513 {
514 if entities.is_empty() {
515 return Ok(());
516 }
517
518 let options = options.unwrap_or_default();
519 let entity_type = T::entity_name_static();
520 let store = self.registry.get_or_create(entity_type);
521
522 let mut ids: Vec<Arc<str>> = Vec::with_capacity(entities.len());
523 let mut items: Vec<Arc<dyn AnyItem>> = Vec::with_capacity(entities.len());
524
525 for entity in entities {
526 let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
527 let id = item.id();
528 log::debug!("[entity] DEL {} id={}", entity_type, id);
529 self.search_index.remove_entity(&id);
530 ids.push(id);
531 items.push(item);
532 }
533
534 store.remove_many(ids);
536
537 if !options.prevent_relationship_updates {
539 self.relationship_manager.forward_del_batch(&items, self)?;
540 }
541
542 if !options.prevent_persist {
544 for item in &items {
545 self.produce_del_dyn(item)?;
546 }
547 }
548
549 log::trace!("Published batch DEL {} count={}", entity_type, items.len());
550 Ok(())
551 }
552
553 pub fn set_dyn(&self, item: Arc<dyn AnyItem>) -> Result<(), PersistError> {
561 self.set_dyn_with_options(item, None)
562 }
563
564 pub fn set_dyn_with_options(
566 &self,
567 item: Arc<dyn AnyItem>,
568 options: Option<EventOptions>,
569 ) -> Result<(), PersistError> {
570 let options = options.unwrap_or_default();
571 let entity_type = item.entity_type();
572 let id = item.id();
573
574 log::debug!("[entity] SET {} id={}", entity_type, id);
575
576 self.registry
578 .get_or_create(entity_type)
579 .insert(id.clone(), item.clone());
580
581 self.search_index.index_item(&item);
583
584 if !options.prevent_relationship_updates {
586 self.relationship_manager.forward_set(item.clone(), self)?;
587 }
588
589 if !options.prevent_persist {
591 self.produce_set_dyn(&item)?;
592 }
593
594 log::trace!("Published SET {}:{}", entity_type, id);
595 Ok(())
596 }
597
598 pub fn batch_set_dyn_with_options(
600 &self,
601 items: &[Arc<dyn AnyItem>],
602 options: Option<EventOptions>,
603 ) -> Result<(), PersistError> {
604 if items.is_empty() {
605 return Ok(());
606 }
607
608 let options = options.unwrap_or_default();
609 let mut items_by_type: std::collections::BTreeMap<&'static str, Vec<Arc<dyn AnyItem>>> =
610 std::collections::BTreeMap::new();
611
612 for item in items {
613 items_by_type
614 .entry(item.entity_type())
615 .or_default()
616 .push(item.clone());
617 }
618
619 for (entity_type, typed_items) in items_by_type {
620 let store = self.registry.get_or_create(entity_type);
621 let mut entries: Vec<(Arc<str>, Arc<dyn AnyItem>)> =
622 Vec::with_capacity(typed_items.len());
623
624 for item in &typed_items {
625 log::debug!("[entity] SET {} id={}", entity_type, item.id());
626 self.search_index.index_item(item);
627 entries.push((item.id(), item.clone()));
628 }
629
630 store.insert_many(entries);
631
632 if !options.prevent_relationship_updates {
633 for item in &typed_items {
634 self.relationship_manager.forward_set(item.clone(), self)?;
635 }
636 }
637
638 if !options.prevent_persist {
639 for item in &typed_items {
640 self.produce_set_dyn(item)?;
641 }
642 }
643
644 log::trace!(
645 "Published batch SET {} count={}",
646 entity_type,
647 typed_items.len()
648 );
649 }
650 Ok(())
651 }
652
653 pub fn del_dyn(&self, item: Arc<dyn AnyItem>) -> Result<(), PersistError> {
657 self.del_dyn_with_options(item, None)
658 }
659
660 pub fn del_dyn_with_options(
662 &self,
663 item: Arc<dyn AnyItem>,
664 options: Option<EventOptions>,
665 ) -> Result<(), PersistError> {
666 let options = options.unwrap_or_default();
667 let entity_type = item.entity_type();
668 let id = item.id();
669
670 log::debug!("[entity] DEL {} id={}", entity_type, id);
671
672 self.registry.get_or_create(entity_type).remove(&id);
674
675 self.search_index.remove_entity(&id);
677
678 if !options.prevent_relationship_updates {
680 self.relationship_manager.forward_del(item.clone(), self)?;
681 }
682
683 if !options.prevent_persist {
685 self.produce_del_dyn(&item)?;
686 }
687
688 log::trace!("Published DEL {}:{}", entity_type, id);
689 Ok(())
690 }
691
692 pub fn batch_del_dyn_with_options(
694 &self,
695 items: &[Arc<dyn AnyItem>],
696 options: Option<EventOptions>,
697 ) -> Result<(), PersistError> {
698 if items.is_empty() {
699 return Ok(());
700 }
701
702 let options = options.unwrap_or_default();
703 let mut items_by_type: std::collections::BTreeMap<&'static str, Vec<Arc<dyn AnyItem>>> =
704 std::collections::BTreeMap::new();
705
706 for item in items {
707 items_by_type
708 .entry(item.entity_type())
709 .or_default()
710 .push(item.clone());
711 }
712
713 for (entity_type, typed_items) in items_by_type {
714 let store = self.registry.get_or_create(entity_type);
715 let mut ids: Vec<Arc<str>> = Vec::with_capacity(typed_items.len());
716
717 for item in &typed_items {
718 let id = item.id();
719 log::debug!("[entity] DEL {} id={}", entity_type, id);
720 self.search_index.remove_entity(&id);
721 ids.push(id);
722 }
723
724 store.remove_many(ids);
725
726 if !options.prevent_relationship_updates {
727 self.relationship_manager
728 .forward_del_batch(&typed_items, self)?;
729 }
730
731 if !options.prevent_persist {
732 for item in &typed_items {
733 self.produce_del_dyn(item)?;
734 }
735 }
736 }
737 Ok(())
738 }
739
740 pub fn del_by_id_with_options(
747 &self,
748 entity_type: &str,
749 id: &str,
750 options: Option<EventOptions>,
751 ) -> Result<(), PersistError> {
752 let options = options.unwrap_or_default();
753 let id_arc: Arc<str> = id.into();
754
755 let existing = self
756 .registry
757 .get(entity_type)
758 .and_then(|store| store.get(&id_arc).get());
759
760 self.registry.get_or_create(entity_type).remove(&id_arc);
762
763 self.search_index.remove_entity(id);
765
766 if !options.prevent_persist {
768 if let Some(item) = existing {
769 self.produce_del_dyn(&item)?;
770 } else {
771 log::warn!(
772 "del_by_id could not persist DEL without full entity: {}:{}",
773 entity_type,
774 id
775 );
776 }
777 }
778
779 log::trace!("Published DEL {}:{}", entity_type, id);
780 Ok(())
781 }
782
783 pub fn del_by_id(&self, entity_type: &str, id: &str) -> Result<(), PersistError> {
785 self.del_by_id_with_options(entity_type, id, None)
786 }
787
788 pub fn apply_event(&self, event: MEvent) -> Result<bool, PersistError> {
792 Ok(self.apply_event_batch(vec![event])? == 1)
793 }
794
795 pub fn apply_event_batch(&self, events: Vec<MEvent>) -> Result<usize, PersistError> {
800 if events.is_empty() {
801 return Ok(0);
802 }
803
804 let mut accepted = 0usize;
805 let mut immediate_events = Vec::new();
806 let mut buffered_by_type: HashMap<Arc<str>, (u64, Vec<MEvent>)> = HashMap::new();
807
808 for event in events {
809 match self
810 .handler_registry
811 .get_item_buffer_policy(&event.item_type)
812 {
813 IngestBufferPolicy::None => immediate_events.push(event),
814 IngestBufferPolicy::TimeWindow { window_ms } => {
815 let entity_type: Arc<str> = event.item_type.clone().into();
816 buffered_by_type
817 .entry(entity_type)
818 .or_insert_with(|| (window_ms, Vec::new()))
819 .1
820 .push(event);
821 }
822 }
823 }
824
825 if !immediate_events.is_empty() {
826 accepted += self.apply_event_batch_immediate(immediate_events)?;
827 }
828
829 for (entity_type, (window_ms, buffered_events)) in buffered_by_type {
830 accepted += buffered_events.len();
831 self.enqueue_buffered_events(entity_type, window_ms, buffered_events);
832 }
833
834 Ok(accepted)
835 }
836
837 fn apply_event_batch_immediate(&self, events: Vec<MEvent>) -> Result<usize, PersistError> {
838 if events.is_empty() {
839 return Ok(0);
840 }
841 let input_len = events.len();
842
843 #[derive(Clone)]
844 struct SetOp {
845 item: Arc<dyn AnyItem>,
846 options: EventOptions,
847 }
848
849 #[derive(Clone)]
850 struct DelOp {
851 item: Arc<dyn AnyItem>,
852 options: EventOptions,
853 }
854
855 let mut sets: Vec<SetOp> = Vec::new();
856 let mut dels: Vec<DelOp> = Vec::new();
857
858 for event in events {
859 let options = event.options.clone().unwrap_or_default();
860 match event.change_type {
861 MEventType::SET => {
862 if let Some(item) = self.parse_item(&event.item_type, &event.item) {
863 sets.push(SetOp { item, options });
864 } else {
865 log::warn!(
866 "Unknown entity type or parse error for SET: {}",
867 event.item_type
868 );
869 }
870 }
871 MEventType::DEL => {
872 if let Some(item) = self.parse_item(&event.item_type, &event.item) {
873 dels.push(DelOp { item, options });
874 } else {
875 log::warn!(
876 "Unknown entity type or parse error for DEL: {}",
877 event.item_type
878 );
879 }
880 }
881 }
882 }
883
884 if sets.is_empty() && dels.is_empty() {
885 return Ok(0);
886 }
887
888 log::trace!(
889 target: "myko::server::context",
890 "apply_event_batch parsed: input_events={} sets={} dels={}",
891 input_len,
892 sets.len(),
893 dels.len()
894 );
895
896 let mut inserts_by_type: AnyItemEntriesByType = HashMap::new();
897 let mut removes_by_type: HashMap<Arc<str>, Vec<Arc<str>>> = HashMap::new();
898
899 for op in &sets {
900 let entity_type: Arc<str> = op.item.entity_type().into();
901 let id = op.item.id();
902 log::debug!("[entity] SET {} id={}", entity_type, id);
903 inserts_by_type
904 .entry(entity_type)
905 .or_default()
906 .push((id, op.item.clone()));
907 self.search_index.index_item(&op.item);
908 }
909 for op in &dels {
910 let entity_type: Arc<str> = op.item.entity_type().into();
911 let id = op.item.id();
912 log::debug!("[entity] DEL {} id={}", entity_type, id);
913 removes_by_type
914 .entry(entity_type)
915 .or_default()
916 .push(id.clone());
917 self.search_index.remove_entity(&id);
918 }
919
920 for (entity_type, entries) in inserts_by_type {
922 log::trace!(
923 target: "myko::server::context",
924 "apply_event_batch reduce inserts: entity_type={} count={}",
925 entity_type,
926 entries.len()
927 );
928 let store = self.registry.get_or_create(entity_type.as_ref());
929 store.insert_many(entries);
930 }
931 for (entity_type, keys) in removes_by_type {
932 log::trace!(
933 target: "myko::server::context",
934 "apply_event_batch reduce removes: entity_type={} count={}",
935 entity_type,
936 keys.len()
937 );
938 let store = self.registry.get_or_create(entity_type.as_ref());
939 store.remove_many(keys);
940 }
941
942 for op in &sets {
944 if !op.options.prevent_relationship_updates {
945 self.relationship_manager
946 .forward_set(op.item.clone(), self)?;
947 }
948 }
949 let mut dels_with_relationships: HashMap<Arc<str>, Vec<Arc<dyn AnyItem>>> = HashMap::new();
950 for op in &dels {
951 if !op.options.prevent_relationship_updates {
952 dels_with_relationships
953 .entry(op.item.entity_type().into())
954 .or_default()
955 .push(op.item.clone());
956 }
957 }
958 for (_, items) in dels_with_relationships {
959 self.relationship_manager.forward_del_batch(&items, self)?;
960 }
961
962 for op in &sets {
964 if !op.options.prevent_persist {
965 self.produce_set_dyn(&op.item)?;
966 }
967 }
968 for op in &dels {
969 if !op.options.prevent_persist {
970 self.produce_del_dyn(&op.item)?;
971 }
972 }
973
974 Ok(sets.len() + dels.len())
975 }
976
977 fn ingest_buffer_for(&self, entity_type: Arc<str>) -> Arc<BufferedIngestType> {
978 self.ingest_buffers
979 .entry(entity_type)
980 .or_insert_with(|| Arc::new(BufferedIngestType::new()))
981 .clone()
982 }
983
984 fn enqueue_buffered_events(&self, entity_type: Arc<str>, window_ms: u64, events: Vec<MEvent>) {
985 let buffer = self.ingest_buffer_for(entity_type.clone());
986 let should_schedule = {
987 let Ok(mut state) = buffer.state.lock() else {
988 log::error!(
989 "Could not acquire ingest buffer lock for entity_type={}",
990 entity_type
991 );
992 if let Err(e) = self.apply_event_batch_immediate(events) {
993 log::error!("Failed to apply buffered events for {}: {}", entity_type, e);
994 }
995 return;
996 };
997
998 state.events.extend(events);
999 if state.flush_scheduled {
1000 false
1001 } else {
1002 state.flush_scheduled = true;
1003 true
1004 }
1005 };
1006
1007 if !should_schedule {
1008 return;
1009 }
1010
1011 let ctx = self.clone();
1012 thread::spawn(move || {
1013 thread::sleep(Duration::from_millis(window_ms));
1014 ctx.flush_buffered_events_for_type(&entity_type);
1015 });
1016 }
1017
1018 fn flush_buffered_events_for_type(&self, entity_type: &Arc<str>) -> usize {
1019 let Some(buffer) = self
1020 .ingest_buffers
1021 .get(entity_type.as_ref())
1022 .map(|entry| entry.clone())
1023 else {
1024 return 0;
1025 };
1026
1027 let events = {
1028 let Ok(mut state) = buffer.state.lock() else {
1029 log::error!(
1030 "Could not acquire ingest buffer lock for flush entity_type={}",
1031 entity_type
1032 );
1033 return 0;
1034 };
1035
1036 state.flush_scheduled = false;
1037 if state.events.is_empty() {
1038 return 0;
1039 }
1040
1041 std::mem::take(&mut state.events)
1042 };
1043
1044 log::trace!(
1045 target: "myko::server::context",
1046 "flush_buffered_events entity_type={} count={}",
1047 entity_type,
1048 events.len()
1049 );
1050
1051 match self.apply_event_batch_immediate(events) {
1052 Ok(count) => count,
1053 Err(e) => {
1054 log::error!("Failed to flush buffered events for {}: {}", entity_type, e);
1055 0
1056 }
1057 }
1058 }
1059
1060 #[cfg(test)]
1061 fn flush_all_buffered_events(&self) -> usize {
1062 let entity_types: Vec<Arc<str>> = self
1063 .ingest_buffers
1064 .iter()
1065 .map(|entry| entry.key().clone())
1066 .collect();
1067
1068 entity_types
1069 .into_iter()
1070 .map(|entity_type| self.flush_buffered_events_for_type(&entity_type))
1071 .sum()
1072 }
1073
1074 fn produce_set<T: Eventable>(&self, entity: &T) -> Result<(), PersistError> {
1079 if let Some(persister) = self.persisters.resolve(T::entity_name_static()) {
1080 let event = MEvent::from_item(entity, MEventType::SET, &self.host_id.to_string());
1081 persister.persist(event)?;
1082 }
1083 if let Some(sink) = &self.event_sink {
1084 let event = MEvent::from_item(entity, MEventType::SET, &self.host_id.to_string());
1085 let _ = sink.send(event);
1086 }
1087 Ok(())
1088 }
1089
1090 fn produce_del<T: Eventable>(&self, entity: &T) -> Result<(), PersistError> {
1091 if let Some(persister) = self.persisters.resolve(T::entity_name_static()) {
1092 let event = MEvent::del(entity, &self.host_id.to_string());
1093 persister.persist(event)?;
1094 }
1095 if let Some(sink) = &self.event_sink {
1096 let event = MEvent::del(entity, &self.host_id.to_string());
1097 let _ = sink.send(event);
1098 }
1099 Ok(())
1100 }
1101
1102 fn produce_del_dyn(&self, item: &Arc<dyn AnyItem>) -> Result<(), PersistError> {
1103 if let Some(persister) = self.persisters.resolve(item.entity_type()) {
1104 let event = MEvent::del_from_any(item, &self.host_id.to_string());
1105 persister.persist(event)?;
1106 }
1107 if let Some(sink) = &self.event_sink {
1108 let event = MEvent::del_from_any(item, &self.host_id.to_string());
1109 let _ = sink.send(event);
1110 }
1111 Ok(())
1112 }
1113
1114 fn produce_set_dyn(&self, item: &Arc<dyn AnyItem>) -> Result<(), PersistError> {
1115 if let Some(persister) = self.persisters.resolve(item.entity_type()) {
1116 let event = MEvent::set_from_value(
1117 item.entity_type(),
1118 item.to_value(),
1119 &self.host_id.to_string(),
1120 );
1121 persister.persist(event)?;
1122 }
1123 if let Some(sink) = &self.event_sink {
1124 let event = MEvent::set_from_value(
1125 item.entity_type(),
1126 item.to_value(),
1127 &self.host_id.to_string(),
1128 );
1129 let _ = sink.send(event);
1130 }
1131 Ok(())
1132 }
1133
1134 pub fn query_map<Q>(
1142 &self,
1143 query: Q,
1144 request: Arc<RequestContext>,
1145 ) -> CellMap<Arc<str>, Arc<Q::Item>, CellImmutable>
1146 where
1147 Q: QueryFactory + QueryHandler + QueryParams + Clone + Send + Sync + 'static,
1148 Q::Item:
1149 Eventable + WithId + DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1150 {
1151 typed_map_arc_from_any_item(
1152 self.query_map_untyped(query, request),
1153 "CellServerCtx::query_map",
1154 )
1155 }
1156
1157 pub fn query_map_untyped<Q>(&self, query: Q, request: Arc<RequestContext>) -> FilteredCellMap
1176 where
1177 Q: QueryFactory + QueryHandler + QueryParams + Clone + Send + Sync + 'static,
1178 Q::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1179 {
1180 let key = self.cache_key("query", Q::query_id_static().as_ref(), &query, &request);
1181 if let Some(existing) = self.query_cache.get(&key) {
1182 if let Some(shared) = existing.value().get() {
1183 return shared;
1184 }
1185 drop(existing);
1186 self.query_cache.remove(&key);
1187 }
1188
1189 let query_req = QueryRequest::with_tx(query, request.tx.clone());
1190 let any_query: Arc<dyn crate::query::AnyQuery> = Arc::new(query_req);
1191
1192 let built = Q::cell_factory(
1193 any_query,
1194 self.registry.clone(),
1195 request,
1196 Some(Arc::new(self.clone())),
1197 )
1198 .expect("query cell factory should not fail for typed query");
1199 self.query_cache.insert(key, MapCacheEntry::new(&built));
1200 built
1201 }
1202
1203 pub fn view_map_untyped<V>(&self, view: V, request: Arc<RequestContext>) -> FilteredViewCellMap
1205 where
1206 V: ViewFactory + Clone + Send + Sync + 'static,
1207 V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1208 {
1209 let key = self.cache_key("view", V::view_id_static().as_ref(), &view, &request);
1210 if let Some(existing) = self.view_cache.get(&key) {
1211 if let Some(shared) = existing.value().get() {
1212 return shared;
1213 }
1214 drop(existing);
1215 self.view_cache.remove(&key);
1216 }
1217
1218 let view_req = crate::view::ViewRequest::with_tx(view, request.tx.clone());
1219 let any_view: Arc<dyn crate::view::AnyView> = Arc::new(view_req);
1220
1221 let built = V::cell_factory(
1222 any_view,
1223 self.registry.clone(),
1224 request,
1225 Arc::new(self.clone()),
1226 )
1227 .expect("view cell factory should not fail for typed view");
1228 self.view_cache.insert(key, MapCacheEntry::new(&built));
1229 built
1230 }
1231
1232 pub fn view_map<V>(&self, view: V, request: Arc<RequestContext>) -> FilteredViewCellMap
1234 where
1235 V: ViewFactory + Clone + Send + Sync + 'static,
1236 V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1237 {
1238 self.view_map_untyped(view, request)
1239 }
1240
1241 pub fn view<V>(&self, view: V, request: Arc<RequestContext>) -> TypedViewCellMap<V::Item>
1243 where
1244 V: ViewFactory + Clone + Send + Sync + 'static,
1245 V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1246 {
1247 typed_map_arc_from_any_item(self.view_map_untyped(view, request), "CellServerCtx::view")
1248 }
1249
1250 pub fn entity_snapshot<T>(&self, id: &<T as WithTypedId>::Id) -> Option<Arc<T>>
1252 where
1253 T: Eventable + WithTypedId + Send + Sync + 'static,
1254 <T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
1255 {
1256 let store = self.registry.get_or_create(T::entity_name_static());
1257 let map_key = id.map_key();
1258 let item = store.get_value(&map_key)?;
1259 Some(downcast_any_item_arc::<T>(
1260 &item,
1261 "CellServerCtx::entity_snapshot",
1262 ))
1263 }
1264
1265 pub fn entity_snapshots<T>(&self) -> Vec<Arc<T>>
1267 where
1268 T: Eventable + WithTypedId + Send + Sync + 'static,
1269 <T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
1270 {
1271 let store = self.registry.get_or_create(T::entity_name_static());
1272 store
1273 .snapshot()
1274 .into_iter()
1275 .map(|(_, item)| downcast_any_item_arc::<T>(&item, "CellServerCtx::entity_snapshots"))
1276 .collect()
1277 }
1278
1279 pub fn entity_snapshots_by_id<T>(
1281 &self,
1282 ids: impl IntoIterator<Item = <T as WithTypedId>::Id>,
1283 ) -> Vec<Arc<T>>
1284 where
1285 T: Eventable + WithTypedId + Send + Sync + 'static,
1286 <T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
1287 {
1288 ids.into_iter()
1289 .filter_map(|id| self.entity_snapshot::<T>(&id))
1290 .collect()
1291 }
1292
1293 pub fn query_snapshot<Q>(&self, query: Q, request: Arc<RequestContext>) -> Vec<Arc<Q::Item>>
1299 where
1300 Q: QueryHandler + QueryParams + Clone + Send + Sync + 'static,
1301 Q::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1302 {
1303 let query_item_type = Q::query_item_type_static();
1304 let store = self.registry.get_or_create(&query_item_type);
1305
1306 let query_context = Arc::new(QueryContext {
1307 req: request.clone(),
1308 });
1309 let query = Arc::new(query);
1310
1311 store
1312 .snapshot()
1313 .into_iter()
1314 .filter_map(|(_, item)| {
1315 let typed_item =
1316 downcast_any_item_arc::<Q::Item>(&item, "CellServerCtx::query_snapshot");
1317 let ctx = QueryTestCtx {
1318 item: typed_item.clone(),
1319 query: query.clone(),
1320 query_context: query_context.clone(),
1321 };
1322 if Q::test_entity(ctx) {
1323 Some(typed_item)
1324 } else {
1325 None
1326 }
1327 })
1328 .collect()
1329 }
1330
1331 pub fn report<R>(
1332 &self,
1333 report: R,
1334 request: Arc<RequestContext>,
1335 ) -> Cell<Arc<R::Output>, CellImmutable>
1336 where
1337 R: ReportHandler + ReportId + CacheKey + Clone + serde::Serialize + 'static,
1338 {
1339 let key = self.cache_key("report", report.report_id().as_ref(), &report, &request);
1340
1341 if let Some(existing) = self.report_cache.get(&key) {
1343 if let Some(entry) = existing
1344 .value()
1345 .as_any()
1346 .downcast_ref::<ReportCacheEntry<Arc<R::Output>>>()
1347 && let Some(shared) = entry.get()
1348 {
1349 return shared;
1350 }
1351 drop(existing);
1353 self.report_cache.remove(&key);
1354 }
1355
1356 let nested_ctx = ReportContext::new(request, Arc::new(self.clone()));
1357 let built = report.compute(nested_ctx);
1358 self.report_cache
1359 .insert(key.clone(), Arc::new(ReportCacheEntry::new(&built)));
1360
1361 built
1362 }
1363
1364 pub fn new_server_transaction(&self) -> Arc<RequestContext> {
1365 Arc::new(RequestContext {
1366 tx: Arc::<str>::from(Uuid::new_v4().to_string()),
1367 client_id: None,
1368 lineage: vec![],
1369 host_id: self.host_id,
1370 created_at: chrono::Utc::now().to_string(),
1371 windback: None,
1372 })
1373 }
1374}
1375
1376impl std::fmt::Debug for CellServerCtx {
1377 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1378 f.debug_struct("CellServerCtx").finish()
1379 }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384 use std::sync::Arc;
1385
1386 use serde::{Deserialize, Serialize};
1387 use serde_json::json;
1388 use uuid::Uuid;
1389
1390 use super::CellServerCtx;
1391 use crate::{
1392 common::with_id::WithId,
1393 core::item::{
1394 AnyItem, Eventable, IngestBufferPolicy, IngestBufferRegistration, ItemRegistration,
1395 },
1396 hyphae::Gettable,
1397 search::SearchIndex,
1398 server::{HandlerRegistry, RelationshipManager, persister::PersisterRouter},
1399 store::StoreRegistry,
1400 wire::{MEvent, MEventType},
1401 };
1402
1403 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1404 struct BufferedTestItem {
1405 id: Arc<str>,
1406 value: i32,
1407 }
1408
1409 impl WithId for BufferedTestItem {
1410 fn id(&self) -> Arc<str> {
1411 self.id.clone()
1412 }
1413 }
1414
1415 impl AnyItem for BufferedTestItem {
1416 fn as_any(&self) -> &dyn std::any::Any {
1417 self
1418 }
1419
1420 fn entity_type(&self) -> &'static str {
1421 "BufferedTestItem"
1422 }
1423
1424 fn equals(&self, other: &dyn AnyItem) -> bool {
1425 other
1426 .as_any()
1427 .downcast_ref::<Self>()
1428 .map(|typed| self == typed)
1429 .unwrap_or(false)
1430 }
1431 }
1432
1433 impl Eventable for BufferedTestItem {
1434 const ENTITY_NAME_STATIC: &'static str = "BufferedTestItem";
1435 }
1436
1437 inventory::submit! {
1438 ItemRegistration {
1439 entity_type: "BufferedTestItem",
1440 crate_name: env!("CARGO_PKG_NAME"),
1441 parse: BufferedTestItem::parse,
1442 }
1443 }
1444
1445 inventory::submit! {
1446 IngestBufferRegistration {
1447 entity_type: "BufferedTestItem",
1448 policy: IngestBufferPolicy::TimeWindow { window_ms: 60_000 },
1449 }
1450 }
1451
1452 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1453 struct ImmediateTestItem {
1454 id: Arc<str>,
1455 value: i32,
1456 }
1457
1458 impl WithId for ImmediateTestItem {
1459 fn id(&self) -> Arc<str> {
1460 self.id.clone()
1461 }
1462 }
1463
1464 impl AnyItem for ImmediateTestItem {
1465 fn as_any(&self) -> &dyn std::any::Any {
1466 self
1467 }
1468
1469 fn entity_type(&self) -> &'static str {
1470 "ImmediateTestItem"
1471 }
1472
1473 fn equals(&self, other: &dyn AnyItem) -> bool {
1474 other
1475 .as_any()
1476 .downcast_ref::<Self>()
1477 .map(|typed| self == typed)
1478 .unwrap_or(false)
1479 }
1480 }
1481
1482 impl Eventable for ImmediateTestItem {
1483 const ENTITY_NAME_STATIC: &'static str = "ImmediateTestItem";
1484 }
1485
1486 inventory::submit! {
1487 ItemRegistration {
1488 entity_type: "ImmediateTestItem",
1489 crate_name: env!("CARGO_PKG_NAME"),
1490 parse: ImmediateTestItem::parse,
1491 }
1492 }
1493
1494 fn make_ctx() -> CellServerCtx {
1495 CellServerCtx::new(
1496 Uuid::new_v4(),
1497 Arc::new(StoreRegistry::new()),
1498 Arc::new(HandlerRegistry::new()),
1499 Arc::new(RelationshipManager::new()),
1500 Arc::new(PersisterRouter::default()),
1501 Arc::new(SearchIndex::new()),
1502 Arc::new(dashmap::DashMap::new()),
1503 None,
1504 None,
1505 )
1506 }
1507
1508 #[test]
1509 fn apply_event_batch_keeps_default_entities_immediate() {
1510 let ctx = make_ctx();
1511 let applied = ctx
1512 .apply_event_batch(vec![MEvent {
1513 item: json!({
1514 "id": "immediate-1",
1515 "value": 7,
1516 }),
1517 change_type: MEventType::SET,
1518 item_type: "ImmediateTestItem".to_string(),
1519 created_at: "2026-03-12T00:00:00Z".to_string(),
1520 tx: "tx-immediate".to_string(),
1521 source_id: Some("test".to_string()),
1522 options: None,
1523 }])
1524 .expect("apply_event_batch should succeed");
1525
1526 assert_eq!(applied, 1);
1527 let store = ctx.registry.get_or_create("ImmediateTestItem");
1528 assert!(store.get(&Arc::<str>::from("immediate-1")).get().is_some());
1529 }
1530
1531 #[test]
1532 fn apply_event_batch_buffers_opted_in_entities() {
1533 let ctx = make_ctx();
1534 let applied = ctx
1535 .apply_event_batch(vec![MEvent {
1536 item: json!({
1537 "id": "buffered-1",
1538 "value": 42,
1539 }),
1540 change_type: MEventType::SET,
1541 item_type: "BufferedTestItem".to_string(),
1542 created_at: "2026-03-12T00:00:00Z".to_string(),
1543 tx: "tx-buffered".to_string(),
1544 source_id: Some("test".to_string()),
1545 options: None,
1546 }])
1547 .expect("apply_event_batch should succeed");
1548
1549 assert_eq!(applied, 1);
1550 let store = ctx.registry.get_or_create("BufferedTestItem");
1551 assert!(store.get(&Arc::<str>::from("buffered-1")).get().is_none());
1552
1553 let flushed = ctx.flush_all_buffered_events();
1554 assert_eq!(flushed, 1);
1555 assert!(store.get(&Arc::<str>::from("buffered-1")).get().is_some());
1556 }
1557}