Skip to main content

myko/server/
context.rs

1//! Server context for the cell-based server.
2//!
3//! Provides modules (like PeerRegistry) with the ability to:
4//! - Run reactive queries (like GetPeerServers)
5//! - Publish entities (Reduce → Relationships → Persist)
6//! - Access server identity (host_id)
7
8use std::{
9    any::Any,
10    collections::HashMap,
11    sync::{Arc, Mutex},
12    thread,
13    time::Duration,
14};
15
16use dashmap::DashMap;
17use hyphae::{Cell, CellImmutable, CellMap, CellMutable, Gettable, IdFor, Mutable, WeakCellMap};
18use serde::de::DeserializeOwned;
19use uuid::Uuid;
20
21use super::{
22    HandlerRegistry, RelationshipManager,
23    persister::{PersistError, PersistHealth, PersisterRouter},
24};
25use crate::{
26    cache::CacheKey,
27    client::{ConnectionStatus, MykoClient},
28    common::{
29        to_value::ToValue,
30        with_id::{WithId, WithTypedId},
31    },
32    core::item::{
33        AnyItem, Eventable, IngestBufferPolicy, downcast_any_item_arc, typed_map_arc_from_any_item,
34    },
35    query::{
36        FilteredCellMap, QueryContext, QueryFactory, QueryHandler, QueryParams, QueryRequest,
37        QueryTestCtx,
38    },
39    report::{ReportContext, ReportHandler, ReportId},
40    request::RequestContext,
41    search::SearchIndex,
42    store::StoreRegistry,
43    view::{FilteredViewCellMap, TypedViewCellMap, ViewFactory},
44    wire::{EventOptions, MEvent, MEventType},
45};
46
47type AnyItemArc = Arc<dyn AnyItem>;
48type AnyItemBatchEntries = Vec<(Arc<str>, AnyItemArc)>;
49type AnyItemEntriesByType = HashMap<Arc<str>, AnyItemBatchEntries>;
50
51/// Weak-ref report cache entry. The cell stays alive as long as someone is
52/// subscribed to it. When all subscribers drop, the weak ref fails to upgrade
53/// and the next request recomputes.
54trait ReportCacheEntryDyn: Any + Send + Sync {
55    fn as_any(&self) -> &dyn Any;
56    fn is_alive(&self) -> bool;
57}
58
59struct ReportCacheEntry<T> {
60    weak: hyphae::cell::WeakCell<T, CellImmutable>,
61}
62
63impl<T> ReportCacheEntry<T>
64where
65    T: Clone + Send + Sync + 'static,
66{
67    fn new(cell: &Cell<T, CellImmutable>) -> Self {
68        Self {
69            weak: cell.downgrade(),
70        }
71    }
72
73    fn get(&self) -> Option<Cell<T, CellImmutable>> {
74        self.weak.upgrade()
75    }
76}
77
78impl<T> ReportCacheEntryDyn for ReportCacheEntry<T>
79where
80    T: Clone + Send + Sync + 'static,
81{
82    fn as_any(&self) -> &dyn Any {
83        self
84    }
85
86    fn is_alive(&self) -> bool {
87        self.weak.upgrade().is_some()
88    }
89}
90
91struct MapCacheEntry {
92    weak: WeakCellMap<Arc<str>, AnyItemArc>,
93}
94
95#[derive(Default)]
96struct BufferedIngestState {
97    events: Vec<MEvent>,
98    flush_scheduled: bool,
99}
100
101struct BufferedIngestType {
102    state: Mutex<BufferedIngestState>,
103}
104
105impl BufferedIngestType {
106    fn new() -> Self {
107        Self {
108            state: Mutex::new(BufferedIngestState::default()),
109        }
110    }
111}
112
113impl MapCacheEntry {
114    fn new(map: &FilteredCellMap) -> Self {
115        Self {
116            weak: map.downgrade(),
117        }
118    }
119
120    fn get(&self) -> Option<FilteredCellMap> {
121        self.weak.upgrade().map(|map| map.lock())
122    }
123}
124
125/// Context providing capabilities to server modules.
126///
127/// This is the cell-based equivalent of `MykoServerCtx`, providing:
128/// - Entity store access (read-only, via queries)
129/// - Event publishing (Reduce → Relationships → Persist)
130/// - Server identity
131#[derive(Clone)]
132pub struct CellServerCtx {
133    /// Unique identifier for this server instance
134    pub host_id: Uuid,
135    /// Store registry for entity access
136    pub registry: Arc<StoreRegistry>,
137    /// Handler registry for item parsers
138    pub handler_registry: Arc<HandlerRegistry>,
139    /// Relationship manager - handles cascades
140    relationship_manager: Arc<RelationshipManager>,
141    /// Persister routing (default + per-entity overrides)
142    persisters: Arc<PersisterRouter>,
143    /// Full-text search index
144    search_index: Arc<SearchIndex>,
145    /// Live peer clients by peer server id (populated by peer registry).
146    peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>,
147    /// Monotonic tick bumped on peer client register/unregister.
148    peer_clients_tick: Cell<u64, CellMutable>,
149    /// Optional event sink used to fan out applied events to saga runtimes.
150    event_sink: Option<flume::Sender<MEvent>>,
151    /// Top-level cache for reactive query maps.
152    query_cache: Arc<DashMap<String, MapCacheEntry>>,
153    /// Top-level cache for reactive view maps.
154    view_cache: Arc<DashMap<String, MapCacheEntry>>,
155    /// Top-level cache for reactive report cells with short-lived strong retention.
156    report_cache: Arc<DashMap<String, Arc<dyn ReportCacheEntryDyn>>>,
157    /// Optional ingest buffers keyed by entity type for opt-in burst smoothing.
158    ingest_buffers: Arc<DashMap<Arc<str>, Arc<BufferedIngestType>>>,
159    /// Optional history replay provider for point-in-time snapshots.
160    history_replay: Option<Arc<dyn crate::server::HistoryReplayProvider>>,
161}
162
163impl CellServerCtx {
164    /// Create a new server context.
165    #[allow(clippy::too_many_arguments)]
166    pub fn new(
167        host_id: Uuid,
168        registry: Arc<StoreRegistry>,
169        handler_registry: Arc<HandlerRegistry>,
170        relationship_manager: Arc<RelationshipManager>,
171        persisters: Arc<PersisterRouter>,
172        search_index: Arc<SearchIndex>,
173        peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>,
174        event_sink: Option<flume::Sender<MEvent>>,
175        history_replay: Option<Arc<dyn crate::server::HistoryReplayProvider>>,
176    ) -> Self {
177        Self {
178            host_id,
179            registry,
180            handler_registry,
181            relationship_manager,
182            persisters,
183            search_index,
184            peer_clients,
185            peer_clients_tick: Cell::new(0).with_name("peer_clients_tick"),
186            event_sink,
187            query_cache: Arc::new(DashMap::new()),
188            view_cache: Arc::new(DashMap::new()),
189            report_cache: Arc::new(DashMap::new()),
190            ingest_buffers: Arc::new(DashMap::new()),
191            history_replay,
192        }
193    }
194
195    fn cache_key<T: CacheKey>(
196        &self,
197        kind: &str,
198        id: &str,
199        params: &T,
200        request: &RequestContext,
201    ) -> String {
202        let payload_hash = params.cache_key_hash();
203        format!("{}:{kind}:{id}:{payload_hash:016x}", request.host_id)
204    }
205
206    /// Get the search index.
207    pub fn search_index(&self) -> &Arc<SearchIndex> {
208        &self.search_index
209    }
210
211    /// Get the history replay provider, if configured.
212    pub fn history_replay(&self) -> Option<&Arc<dyn crate::server::HistoryReplayProvider>> {
213        self.history_replay.as_ref()
214    }
215
216    /// Register or replace a live peer client for a server id.
217    pub fn register_peer_client<S: AsRef<str>>(&self, peer_id: S, client: Arc<MykoClient>) {
218        self.peer_clients
219            .insert(Arc::<str>::from(peer_id.as_ref()), client);
220        let next = self.peer_clients_tick.get().saturating_add(1);
221        self.peer_clients_tick.set(next);
222    }
223
224    /// Remove a live peer client for a server id.
225    pub fn unregister_peer_client(&self, peer_id: &str) {
226        if self.peer_clients.remove(peer_id).is_some() {
227            let next = self.peer_clients_tick.get().saturating_add(1);
228            self.peer_clients_tick.set(next);
229        }
230    }
231
232    /// Get a live peer client by server id, if present.
233    pub fn peer_client(&self, peer_id: &str) -> Option<Arc<MykoClient>> {
234        self.peer_clients
235            .get(peer_id)
236            .map(|entry| entry.value().clone())
237    }
238
239    /// Get a peer's current connection status if the client is present.
240    pub fn peer_connection_status(&self, peer_id: &str) -> Option<ConnectionStatus> {
241        self.peer_client(peer_id)
242            .map(|client| client.get_connection_status_sync())
243    }
244
245    /// Reactive tick that updates whenever peer client membership changes.
246    pub fn peer_clients_tick(&self) -> Cell<u64, CellImmutable> {
247        self.peer_clients_tick.clone().lock()
248    }
249
250    /// Number of currently tracked peer clients.
251    pub fn peer_client_count(&self) -> usize {
252        self.peer_clients.len()
253    }
254
255    /// Get the live persist health counters from the default persister.
256    pub fn persist_health(&self) -> Arc<PersistHealth> {
257        self.persisters.default_health()
258    }
259
260    /// Number of entries in the query cache (includes dead weak refs).
261    pub fn query_cache_len(&self) -> usize {
262        self.query_cache.len()
263    }
264
265    /// Number of entries in the view cache (includes dead weak refs).
266    pub fn view_cache_len(&self) -> usize {
267        self.view_cache.len()
268    }
269
270    /// Number of entries in the report cache (includes dead weak refs).
271    pub fn report_cache_len(&self) -> usize {
272        self.report_cache.len()
273    }
274
275    /// Count live (upgradeable) entries in the report cache.
276    pub fn report_cache_live_count(&self) -> usize {
277        self.report_cache
278            .iter()
279            .filter(|entry| entry.value().is_alive())
280            .count()
281    }
282
283    /// Count live (upgradeable) entries in the query cache.
284    pub fn query_cache_live_count(&self) -> usize {
285        self.query_cache
286            .iter()
287            .filter(|entry| entry.value().weak.upgrade().is_some())
288            .count()
289    }
290
291    /// Count live (upgradeable) entries in the view cache.
292    pub fn view_cache_live_count(&self) -> usize {
293        self.view_cache
294            .iter()
295            .filter(|entry| entry.value().weak.upgrade().is_some())
296            .count()
297    }
298
299    /// Remove dead weak-ref entries from all caches.
300    /// Returns (query_removed, view_removed, report_removed).
301    pub fn sweep_dead_cache_entries(&self) -> (usize, usize, usize) {
302        let q_before = self.query_cache.len();
303        self.query_cache
304            .retain(|_, entry| entry.weak.upgrade().is_some());
305        let q_removed = q_before - self.query_cache.len();
306
307        let v_before = self.view_cache.len();
308        self.view_cache
309            .retain(|_, entry| entry.weak.upgrade().is_some());
310        let v_removed = v_before - self.view_cache.len();
311
312        let r_before = self.report_cache.len();
313        self.report_cache.retain(|_, entry| entry.is_alive());
314        let r_removed = r_before - self.report_cache.len();
315
316        (q_removed, v_removed, r_removed)
317    }
318
319    /// Parse JSON to a typed entity using the registered item parser.
320    ///
321    /// Returns None if the entity type is not registered or parsing fails.
322    pub fn parse_item(
323        &self,
324        entity_type: &str,
325        json: &serde_json::Value,
326    ) -> Option<Arc<dyn AnyItem>> {
327        let parse = self.handler_registry.get_item_parser(entity_type)?;
328        parse(json.clone()).ok()
329    }
330
331    // ─────────────────────────────────────────────────────────────────────────
332    // Typed entity publishing (for server modules)
333    // ─────────────────────────────────────────────────────────────────────────
334
335    /// Publish an entity (SET) with default options.
336    ///
337    /// Default behavior: Reduce + Relationships + Persist
338    pub fn set<T>(&self, entity: &T) -> Result<(), PersistError>
339    where
340        T: Eventable + 'static,
341    {
342        self.set_with_options(entity, None)
343    }
344
345    /// Publish an entity (SET) with options.
346    ///
347    /// Options control:
348    /// - `prevent_relationship_updates`: skip cascade processing
349    /// - `prevent_persist`: skip durable backend
350    pub fn set_with_options<T>(
351        &self,
352        entity: &T,
353        options: Option<EventOptions>,
354    ) -> Result<(), PersistError>
355    where
356        T: Eventable + 'static,
357    {
358        let options = options.unwrap_or_default();
359        let id = entity.id();
360        let entity_type = entity.entity_type();
361        let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
362
363        // Reduce: update store
364        self.registry
365            .get_or_create(entity_type)
366            .insert(id.clone(), item.clone());
367
368        log::debug!("[entity] SET {} id={}", entity_type, id);
369
370        // Search: index searchable fields
371        self.search_index.index_item(&item);
372
373        // Relationships: process cascades (unless prevented)
374        if !options.prevent_relationship_updates {
375            self.relationship_manager.forward_set(item, self)?;
376        }
377
378        // Persist: produce to durable backend (unless prevented)
379        if !options.prevent_persist {
380            self.produce_set(entity)?;
381        }
382
383        Ok(())
384    }
385
386    /// Delete an entity (DEL) with default options.
387    ///
388    /// Default behavior: Reduce + Relationships + Persist
389    pub fn del<T>(&self, entity: &T) -> Result<(), PersistError>
390    where
391        T: Eventable + Clone + 'static,
392    {
393        self.del_with_options(entity, None)
394    }
395
396    /// Delete an entity (DEL) with options.
397    pub fn del_with_options<T>(
398        &self,
399        entity: &T,
400        options: Option<EventOptions>,
401    ) -> Result<(), PersistError>
402    where
403        T: Eventable + Clone + 'static,
404    {
405        let options = options.unwrap_or_default();
406        let entity_type = entity.entity_type();
407        let id = entity.id();
408        let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
409
410        log::debug!("[entity] DEL {} id={}", entity_type, id);
411
412        // Reduce: remove from store
413        self.registry.get_or_create(entity_type).remove(&id);
414
415        // Search: remove from index
416        self.search_index.remove_entity(&id);
417
418        // Relationships: process cascades (unless prevented)
419        if !options.prevent_relationship_updates {
420            self.relationship_manager.forward_del(item.clone(), self)?;
421        }
422
423        // Persist: produce to durable backend (unless prevented)
424        if !options.prevent_persist {
425            self.produce_del(entity)?;
426        }
427
428        log::trace!("Published DEL {}:{}", entity_type, id);
429        Ok(())
430    }
431
432    /// Publish a batch of entities (SET) with default options.
433    ///
434    /// Default behavior: Reduce + Relationships + Persist
435    pub fn batch_set<T>(&self, entities: &[T]) -> Result<(), PersistError>
436    where
437        T: Eventable + Clone + 'static,
438    {
439        self.batch_set_with_options(entities, None)
440    }
441
442    /// Publish a batch of entities (SET) with shared options.
443    ///
444    /// This avoids manual `MEvent` construction and performs a grouped store insert.
445    pub fn batch_set_with_options<T>(
446        &self,
447        entities: &[T],
448        options: Option<EventOptions>,
449    ) -> Result<(), PersistError>
450    where
451        T: Eventable + Clone + 'static,
452    {
453        if entities.is_empty() {
454            return Ok(());
455        }
456
457        let options = options.unwrap_or_default();
458        let entity_type = T::entity_name_static();
459        let store = self.registry.get_or_create(entity_type);
460
461        let mut entries: Vec<(Arc<str>, Arc<dyn AnyItem>)> = Vec::with_capacity(entities.len());
462        let mut items: Vec<Arc<dyn AnyItem>> = Vec::with_capacity(entities.len());
463
464        for entity in entities {
465            let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
466            log::debug!("[entity] SET {} id={}", entity_type, entity.id());
467            self.search_index.index_item(&item);
468            entries.push((entity.id(), item.clone()));
469            items.push(item);
470        }
471
472        // Reduce: one diff emission for the whole batch.
473        store.insert_many(entries);
474
475        // Relationships: process cascades (unless prevented)
476        if !options.prevent_relationship_updates {
477            for item in &items {
478                self.relationship_manager.forward_set(item.clone(), self)?;
479            }
480        }
481
482        // Persist: produce to durable backend (unless prevented)
483        if !options.prevent_persist {
484            for item in &items {
485                self.produce_set_dyn(item)?;
486            }
487        }
488
489        log::trace!("Published batch SET {} count={}", entity_type, items.len());
490        Ok(())
491    }
492
493    /// Delete a batch of entities (DEL) with default options.
494    ///
495    /// Default behavior: Reduce + Relationships + Persist
496    pub fn batch_del<T>(&self, entities: &[T]) -> Result<(), PersistError>
497    where
498        T: Eventable + Clone + 'static,
499    {
500        self.batch_del_with_options(entities, None)
501    }
502
503    /// Delete a batch of entities (DEL) with shared options.
504    ///
505    /// This avoids manual `MEvent` construction and performs a grouped store remove.
506    pub fn batch_del_with_options<T>(
507        &self,
508        entities: &[T],
509        options: Option<EventOptions>,
510    ) -> Result<(), PersistError>
511    where
512        T: Eventable + Clone + 'static,
513    {
514        if entities.is_empty() {
515            return Ok(());
516        }
517
518        let options = options.unwrap_or_default();
519        let entity_type = T::entity_name_static();
520        let store = self.registry.get_or_create(entity_type);
521
522        let mut ids: Vec<Arc<str>> = Vec::with_capacity(entities.len());
523        let mut items: Vec<Arc<dyn AnyItem>> = Vec::with_capacity(entities.len());
524
525        for entity in entities {
526            let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
527            let id = item.id();
528            log::debug!("[entity] DEL {} id={}", entity_type, id);
529            self.search_index.remove_entity(&id);
530            ids.push(id);
531            items.push(item);
532        }
533
534        // Reduce: one diff emission for the whole batch.
535        store.remove_many(ids);
536
537        // Relationships: process cascades (unless prevented)
538        if !options.prevent_relationship_updates {
539            self.relationship_manager.forward_del_batch(&items, self)?;
540        }
541
542        // Persist: produce to durable backend (unless prevented)
543        if !options.prevent_persist {
544            for item in &items {
545                self.produce_del_dyn(item)?;
546            }
547        }
548
549        log::trace!("Published batch DEL {} count={}", entity_type, items.len());
550        Ok(())
551    }
552
553    // ─────────────────────────────────────────────────────────────────────────
554    // Dynamic item publishing (for parsed JSON)
555    // ─────────────────────────────────────────────────────────────────────────
556
557    /// Publish a dynamic item (SET) with default options.
558    ///
559    /// Default behavior: Reduce + Relationships + Persist
560    pub fn set_dyn(&self, item: Arc<dyn AnyItem>) -> Result<(), PersistError> {
561        self.set_dyn_with_options(item, None)
562    }
563
564    /// Publish a dynamic item (SET) with options.
565    pub fn set_dyn_with_options(
566        &self,
567        item: Arc<dyn AnyItem>,
568        options: Option<EventOptions>,
569    ) -> Result<(), PersistError> {
570        let options = options.unwrap_or_default();
571        let entity_type = item.entity_type();
572        let id = item.id();
573
574        log::debug!("[entity] SET {} id={}", entity_type, id);
575
576        // Reduce: update store
577        self.registry
578            .get_or_create(entity_type)
579            .insert(id.clone(), item.clone());
580
581        // Search: index searchable fields
582        self.search_index.index_item(&item);
583
584        // Relationships: process cascades (unless prevented)
585        if !options.prevent_relationship_updates {
586            self.relationship_manager.forward_set(item.clone(), self)?;
587        }
588
589        // Persist: produce to durable backend (unless prevented)
590        if !options.prevent_persist {
591            self.produce_set_dyn(&item)?;
592        }
593
594        log::trace!("Published SET {}:{}", entity_type, id);
595        Ok(())
596    }
597
598    /// Publish a batch of dynamic items (SET) with shared options.
599    pub fn batch_set_dyn_with_options(
600        &self,
601        items: &[Arc<dyn AnyItem>],
602        options: Option<EventOptions>,
603    ) -> Result<(), PersistError> {
604        if items.is_empty() {
605            return Ok(());
606        }
607
608        let options = options.unwrap_or_default();
609        let mut items_by_type: std::collections::BTreeMap<&'static str, Vec<Arc<dyn AnyItem>>> =
610            std::collections::BTreeMap::new();
611
612        for item in items {
613            items_by_type
614                .entry(item.entity_type())
615                .or_default()
616                .push(item.clone());
617        }
618
619        for (entity_type, typed_items) in items_by_type {
620            let store = self.registry.get_or_create(entity_type);
621            let mut entries: Vec<(Arc<str>, Arc<dyn AnyItem>)> =
622                Vec::with_capacity(typed_items.len());
623
624            for item in &typed_items {
625                log::debug!("[entity] SET {} id={}", entity_type, item.id());
626                self.search_index.index_item(item);
627                entries.push((item.id(), item.clone()));
628            }
629
630            store.insert_many(entries);
631
632            if !options.prevent_relationship_updates {
633                for item in &typed_items {
634                    self.relationship_manager.forward_set(item.clone(), self)?;
635                }
636            }
637
638            if !options.prevent_persist {
639                for item in &typed_items {
640                    self.produce_set_dyn(item)?;
641                }
642            }
643
644            log::trace!(
645                "Published batch SET {} count={}",
646                entity_type,
647                typed_items.len()
648            );
649        }
650        Ok(())
651    }
652
653    /// Delete a dynamic item (DEL) with default options.
654    ///
655    /// Default behavior: Reduce + Relationships + Persist
656    pub fn del_dyn(&self, item: Arc<dyn AnyItem>) -> Result<(), PersistError> {
657        self.del_dyn_with_options(item, None)
658    }
659
660    /// Delete a dynamic item (DEL) with options.
661    pub fn del_dyn_with_options(
662        &self,
663        item: Arc<dyn AnyItem>,
664        options: Option<EventOptions>,
665    ) -> Result<(), PersistError> {
666        let options = options.unwrap_or_default();
667        let entity_type = item.entity_type();
668        let id = item.id();
669
670        log::debug!("[entity] DEL {} id={}", entity_type, id);
671
672        // Reduce: remove from store
673        self.registry.get_or_create(entity_type).remove(&id);
674
675        // Search: remove from index
676        self.search_index.remove_entity(&id);
677
678        // Relationships: process cascades (unless prevented)
679        if !options.prevent_relationship_updates {
680            self.relationship_manager.forward_del(item.clone(), self)?;
681        }
682
683        // Persist: produce to durable backend (unless prevented)
684        if !options.prevent_persist {
685            self.produce_del_dyn(&item)?;
686        }
687
688        log::trace!("Published DEL {}:{}", entity_type, id);
689        Ok(())
690    }
691
692    /// Publish a batch of dynamic items (DEL) with shared options.
693    pub fn batch_del_dyn_with_options(
694        &self,
695        items: &[Arc<dyn AnyItem>],
696        options: Option<EventOptions>,
697    ) -> Result<(), PersistError> {
698        if items.is_empty() {
699            return Ok(());
700        }
701
702        let options = options.unwrap_or_default();
703        let mut items_by_type: std::collections::BTreeMap<&'static str, Vec<Arc<dyn AnyItem>>> =
704            std::collections::BTreeMap::new();
705
706        for item in items {
707            items_by_type
708                .entry(item.entity_type())
709                .or_default()
710                .push(item.clone());
711        }
712
713        for (entity_type, typed_items) in items_by_type {
714            let store = self.registry.get_or_create(entity_type);
715            let mut ids: Vec<Arc<str>> = Vec::with_capacity(typed_items.len());
716
717            for item in &typed_items {
718                let id = item.id();
719                log::debug!("[entity] DEL {} id={}", entity_type, id);
720                self.search_index.remove_entity(&id);
721                ids.push(id);
722            }
723
724            store.remove_many(ids);
725
726            if !options.prevent_relationship_updates {
727                self.relationship_manager
728                    .forward_del_batch(&typed_items, self)?;
729            }
730
731            if !options.prevent_persist {
732                for item in &typed_items {
733                    self.produce_del_dyn(item)?;
734                }
735            }
736        }
737        Ok(())
738    }
739
740    /// Delete an entity by type/id and publish DEL even if the item is not present locally.
741    ///
742    /// This is useful for explicit tombstoning of entities (e.g. disconnected peers)
743    /// where we must ensure a DEL event is produced to durable backend.
744    ///
745    /// Note: relationship cascades require the full item and are therefore skipped here.
746    pub fn del_by_id_with_options(
747        &self,
748        entity_type: &str,
749        id: &str,
750        options: Option<EventOptions>,
751    ) -> Result<(), PersistError> {
752        let options = options.unwrap_or_default();
753        let id_arc: Arc<str> = id.into();
754
755        let existing = self
756            .registry
757            .get(entity_type)
758            .and_then(|store| store.get(&id_arc).get());
759
760        // Reduce: remove from store
761        self.registry.get_or_create(entity_type).remove(&id_arc);
762
763        // Search: remove from index
764        self.search_index.remove_entity(id);
765
766        // Persist: produce to durable backend (unless prevented)
767        if !options.prevent_persist {
768            if let Some(item) = existing {
769                self.produce_del_dyn(&item)?;
770            } else {
771                log::warn!(
772                    "del_by_id could not persist DEL without full entity: {}:{}",
773                    entity_type,
774                    id
775                );
776            }
777        }
778
779        log::trace!("Published DEL {}:{}", entity_type, id);
780        Ok(())
781    }
782
783    /// Delete an entity by type/id with default options.
784    pub fn del_by_id(&self, entity_type: &str, id: &str) -> Result<(), PersistError> {
785        self.del_by_id_with_options(entity_type, id, None)
786    }
787
788    /// Apply a single wire event (parse -> reduce -> relationships -> persist).
789    ///
790    /// Returns `true` when the event was parsed and applied, `false` otherwise.
791    pub fn apply_event(&self, event: MEvent) -> Result<bool, PersistError> {
792        Ok(self.apply_event_batch(vec![event])? == 1)
793    }
794
795    /// Apply a batch of wire events with a single parse pass and grouped store updates.
796    ///
797    /// This reduces overhead versus calling `set_dyn`/`del_dyn` for each event individually.
798    /// Returns the number of successfully parsed/applied events.
799    pub fn apply_event_batch(&self, events: Vec<MEvent>) -> Result<usize, PersistError> {
800        if events.is_empty() {
801            return Ok(0);
802        }
803
804        let mut accepted = 0usize;
805        let mut immediate_events = Vec::new();
806        let mut buffered_by_type: HashMap<Arc<str>, (u64, Vec<MEvent>)> = HashMap::new();
807
808        for event in events {
809            match self
810                .handler_registry
811                .get_item_buffer_policy(&event.item_type)
812            {
813                IngestBufferPolicy::None => immediate_events.push(event),
814                IngestBufferPolicy::TimeWindow { window_ms } => {
815                    let entity_type: Arc<str> = event.item_type.clone().into();
816                    buffered_by_type
817                        .entry(entity_type)
818                        .or_insert_with(|| (window_ms, Vec::new()))
819                        .1
820                        .push(event);
821                }
822            }
823        }
824
825        if !immediate_events.is_empty() {
826            accepted += self.apply_event_batch_immediate(immediate_events)?;
827        }
828
829        for (entity_type, (window_ms, buffered_events)) in buffered_by_type {
830            accepted += buffered_events.len();
831            self.enqueue_buffered_events(entity_type, window_ms, buffered_events);
832        }
833
834        Ok(accepted)
835    }
836
837    fn apply_event_batch_immediate(&self, events: Vec<MEvent>) -> Result<usize, PersistError> {
838        if events.is_empty() {
839            return Ok(0);
840        }
841        let input_len = events.len();
842
843        #[derive(Clone)]
844        struct SetOp {
845            item: Arc<dyn AnyItem>,
846            options: EventOptions,
847        }
848
849        #[derive(Clone)]
850        struct DelOp {
851            item: Arc<dyn AnyItem>,
852            options: EventOptions,
853        }
854
855        let mut sets: Vec<SetOp> = Vec::new();
856        let mut dels: Vec<DelOp> = Vec::new();
857
858        for event in events {
859            let options = event.options.clone().unwrap_or_default();
860            match event.change_type {
861                MEventType::SET => {
862                    if let Some(item) = self.parse_item(&event.item_type, &event.item) {
863                        sets.push(SetOp { item, options });
864                    } else {
865                        log::warn!(
866                            "Unknown entity type or parse error for SET: {}",
867                            event.item_type
868                        );
869                    }
870                }
871                MEventType::DEL => {
872                    if let Some(item) = self.parse_item(&event.item_type, &event.item) {
873                        dels.push(DelOp { item, options });
874                    } else {
875                        log::warn!(
876                            "Unknown entity type or parse error for DEL: {}",
877                            event.item_type
878                        );
879                    }
880                }
881            }
882        }
883
884        if sets.is_empty() && dels.is_empty() {
885            return Ok(0);
886        }
887
888        log::trace!(
889            target: "myko::server::context",
890            "apply_event_batch parsed: input_events={} sets={} dels={}",
891            input_len,
892            sets.len(),
893            dels.len()
894        );
895
896        let mut inserts_by_type: AnyItemEntriesByType = HashMap::new();
897        let mut removes_by_type: HashMap<Arc<str>, Vec<Arc<str>>> = HashMap::new();
898
899        for op in &sets {
900            let entity_type: Arc<str> = op.item.entity_type().into();
901            let id = op.item.id();
902            log::debug!("[entity] SET {} id={}", entity_type, id);
903            inserts_by_type
904                .entry(entity_type)
905                .or_default()
906                .push((id, op.item.clone()));
907            self.search_index.index_item(&op.item);
908        }
909        for op in &dels {
910            let entity_type: Arc<str> = op.item.entity_type().into();
911            let id = op.item.id();
912            log::debug!("[entity] DEL {} id={}", entity_type, id);
913            removes_by_type
914                .entry(entity_type)
915                .or_default()
916                .push(id.clone());
917            self.search_index.remove_entity(&id);
918        }
919
920        // Reduce: one diff emission per entity type per operation kind.
921        for (entity_type, entries) in inserts_by_type {
922            log::trace!(
923                target: "myko::server::context",
924                "apply_event_batch reduce inserts: entity_type={} count={}",
925                entity_type,
926                entries.len()
927            );
928            let store = self.registry.get_or_create(entity_type.as_ref());
929            store.insert_many(entries);
930        }
931        for (entity_type, keys) in removes_by_type {
932            log::trace!(
933                target: "myko::server::context",
934                "apply_event_batch reduce removes: entity_type={} count={}",
935                entity_type,
936                keys.len()
937            );
938            let store = self.registry.get_or_create(entity_type.as_ref());
939            store.remove_many(keys);
940        }
941
942        // Relationships
943        for op in &sets {
944            if !op.options.prevent_relationship_updates {
945                self.relationship_manager
946                    .forward_set(op.item.clone(), self)?;
947            }
948        }
949        let mut dels_with_relationships: HashMap<Arc<str>, Vec<Arc<dyn AnyItem>>> = HashMap::new();
950        for op in &dels {
951            if !op.options.prevent_relationship_updates {
952                dels_with_relationships
953                    .entry(op.item.entity_type().into())
954                    .or_default()
955                    .push(op.item.clone());
956            }
957        }
958        for (_, items) in dels_with_relationships {
959            self.relationship_manager.forward_del_batch(&items, self)?;
960        }
961
962        // Persist
963        for op in &sets {
964            if !op.options.prevent_persist {
965                self.produce_set_dyn(&op.item)?;
966            }
967        }
968        for op in &dels {
969            if !op.options.prevent_persist {
970                self.produce_del_dyn(&op.item)?;
971            }
972        }
973
974        Ok(sets.len() + dels.len())
975    }
976
977    fn ingest_buffer_for(&self, entity_type: Arc<str>) -> Arc<BufferedIngestType> {
978        self.ingest_buffers
979            .entry(entity_type)
980            .or_insert_with(|| Arc::new(BufferedIngestType::new()))
981            .clone()
982    }
983
984    fn enqueue_buffered_events(&self, entity_type: Arc<str>, window_ms: u64, events: Vec<MEvent>) {
985        let buffer = self.ingest_buffer_for(entity_type.clone());
986        let should_schedule = {
987            let Ok(mut state) = buffer.state.lock() else {
988                log::error!(
989                    "Could not acquire ingest buffer lock for entity_type={}",
990                    entity_type
991                );
992                if let Err(e) = self.apply_event_batch_immediate(events) {
993                    log::error!("Failed to apply buffered events for {}: {}", entity_type, e);
994                }
995                return;
996            };
997
998            state.events.extend(events);
999            if state.flush_scheduled {
1000                false
1001            } else {
1002                state.flush_scheduled = true;
1003                true
1004            }
1005        };
1006
1007        if !should_schedule {
1008            return;
1009        }
1010
1011        let ctx = self.clone();
1012        thread::spawn(move || {
1013            thread::sleep(Duration::from_millis(window_ms));
1014            ctx.flush_buffered_events_for_type(&entity_type);
1015        });
1016    }
1017
1018    fn flush_buffered_events_for_type(&self, entity_type: &Arc<str>) -> usize {
1019        let Some(buffer) = self
1020            .ingest_buffers
1021            .get(entity_type.as_ref())
1022            .map(|entry| entry.clone())
1023        else {
1024            return 0;
1025        };
1026
1027        let events = {
1028            let Ok(mut state) = buffer.state.lock() else {
1029                log::error!(
1030                    "Could not acquire ingest buffer lock for flush entity_type={}",
1031                    entity_type
1032                );
1033                return 0;
1034            };
1035
1036            state.flush_scheduled = false;
1037            if state.events.is_empty() {
1038                return 0;
1039            }
1040
1041            std::mem::take(&mut state.events)
1042        };
1043
1044        log::trace!(
1045            target: "myko::server::context",
1046            "flush_buffered_events entity_type={} count={}",
1047            entity_type,
1048            events.len()
1049        );
1050
1051        match self.apply_event_batch_immediate(events) {
1052            Ok(count) => count,
1053            Err(e) => {
1054                log::error!("Failed to flush buffered events for {}: {}", entity_type, e);
1055                0
1056            }
1057        }
1058    }
1059
1060    #[cfg(test)]
1061    fn flush_all_buffered_events(&self) -> usize {
1062        let entity_types: Vec<Arc<str>> = self
1063            .ingest_buffers
1064            .iter()
1065            .map(|entry| entry.key().clone())
1066            .collect();
1067
1068        entity_types
1069            .into_iter()
1070            .map(|entity_type| self.flush_buffered_events_for_type(&entity_type))
1071            .sum()
1072    }
1073
1074    // ─────────────────────────────────────────────────────────────────────────
1075    // durable backend production (private)
1076    // ─────────────────────────────────────────────────────────────────────────
1077
1078    fn produce_set<T: Eventable>(&self, entity: &T) -> Result<(), PersistError> {
1079        if let Some(persister) = self.persisters.resolve(T::entity_name_static()) {
1080            let event = MEvent::from_item(entity, MEventType::SET, &self.host_id.to_string());
1081            persister.persist(event)?;
1082        }
1083        if let Some(sink) = &self.event_sink {
1084            let event = MEvent::from_item(entity, MEventType::SET, &self.host_id.to_string());
1085            let _ = sink.send(event);
1086        }
1087        Ok(())
1088    }
1089
1090    fn produce_del<T: Eventable>(&self, entity: &T) -> Result<(), PersistError> {
1091        if let Some(persister) = self.persisters.resolve(T::entity_name_static()) {
1092            let event = MEvent::del(entity, &self.host_id.to_string());
1093            persister.persist(event)?;
1094        }
1095        if let Some(sink) = &self.event_sink {
1096            let event = MEvent::del(entity, &self.host_id.to_string());
1097            let _ = sink.send(event);
1098        }
1099        Ok(())
1100    }
1101
1102    fn produce_del_dyn(&self, item: &Arc<dyn AnyItem>) -> Result<(), PersistError> {
1103        if let Some(persister) = self.persisters.resolve(item.entity_type()) {
1104            let event = MEvent::del_from_any(item, &self.host_id.to_string());
1105            persister.persist(event)?;
1106        }
1107        if let Some(sink) = &self.event_sink {
1108            let event = MEvent::del_from_any(item, &self.host_id.to_string());
1109            let _ = sink.send(event);
1110        }
1111        Ok(())
1112    }
1113
1114    fn produce_set_dyn(&self, item: &Arc<dyn AnyItem>) -> Result<(), PersistError> {
1115        if let Some(persister) = self.persisters.resolve(item.entity_type()) {
1116            let event = MEvent::set_from_value(
1117                item.entity_type(),
1118                item.to_value(),
1119                &self.host_id.to_string(),
1120            );
1121            persister.persist(event)?;
1122        }
1123        if let Some(sink) = &self.event_sink {
1124            let event = MEvent::set_from_value(
1125                item.entity_type(),
1126                item.to_value(),
1127                &self.host_id.to_string(),
1128            );
1129            let _ = sink.send(event);
1130        }
1131        Ok(())
1132    }
1133
1134    // ─────────────────────────────────────────────────────────────────────────
1135    // Query methods
1136    // ─────────────────────────────────────────────────────────────────────────
1137
1138    /// Run a reactive query and return a typed map keyed by canonical string ids.
1139    ///
1140    /// Use `query_map_untyped()` when framework internals need erased `AnyItem`.
1141    pub fn query_map<Q>(
1142        &self,
1143        query: Q,
1144        request: Arc<RequestContext>,
1145    ) -> CellMap<Arc<str>, Arc<Q::Item>, CellImmutable>
1146    where
1147        Q: QueryFactory + QueryHandler + QueryParams + Clone + Send + Sync + 'static,
1148        Q::Item:
1149            Eventable + WithId + DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1150    {
1151        typed_map_arc_from_any_item(
1152            self.query_map_untyped(query, request),
1153            "CellServerCtx::query_map",
1154        )
1155    }
1156
1157    /// Run a reactive query.
1158    ///
1159    /// Returns a type-erased map that updates whenever the query results change.
1160    /// The query's `test_entity` is applied with proper server context.
1161    ///
1162    /// # Example
1163    ///
1164    /// ```rust,no_run
1165    /// use std::sync::Arc;
1166    /// use myko::entities::server::GetPeerServers;
1167    /// use myko::request::RequestContext;
1168    /// use myko::server::CellServerCtx;
1169    ///
1170    /// fn demo(ctx: &CellServerCtx, req: Arc<RequestContext>) {
1171    ///     let _peer_servers = ctx.query_map_untyped(GetPeerServers {}, req);
1172    ///     // _peer_servers is CellMap<Arc<str>, Arc<dyn AnyItem>, CellImmutable>
1173    /// }
1174    /// ```
1175    pub fn query_map_untyped<Q>(&self, query: Q, request: Arc<RequestContext>) -> FilteredCellMap
1176    where
1177        Q: QueryFactory + QueryHandler + QueryParams + Clone + Send + Sync + 'static,
1178        Q::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1179    {
1180        let key = self.cache_key("query", Q::query_id_static().as_ref(), &query, &request);
1181        if let Some(existing) = self.query_cache.get(&key) {
1182            if let Some(shared) = existing.value().get() {
1183                return shared;
1184            }
1185            drop(existing);
1186            self.query_cache.remove(&key);
1187        }
1188
1189        let query_req = QueryRequest::with_tx(query, request.tx.clone());
1190        let any_query: Arc<dyn crate::query::AnyQuery> = Arc::new(query_req);
1191
1192        let built = Q::cell_factory(
1193            any_query,
1194            self.registry.clone(),
1195            request,
1196            Some(Arc::new(self.clone())),
1197        )
1198        .expect("query cell factory should not fail for typed query");
1199        self.query_cache.insert(key, MapCacheEntry::new(&built));
1200        built
1201    }
1202
1203    /// Build a reactive view cell map (type-erased for framework internals).
1204    pub fn view_map_untyped<V>(&self, view: V, request: Arc<RequestContext>) -> FilteredViewCellMap
1205    where
1206        V: ViewFactory + Clone + Send + Sync + 'static,
1207        V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1208    {
1209        let key = self.cache_key("view", V::view_id_static().as_ref(), &view, &request);
1210        if let Some(existing) = self.view_cache.get(&key) {
1211            if let Some(shared) = existing.value().get() {
1212                return shared;
1213            }
1214            drop(existing);
1215            self.view_cache.remove(&key);
1216        }
1217
1218        let view_req = crate::view::ViewRequest::with_tx(view, request.tx.clone());
1219        let any_view: Arc<dyn crate::view::AnyView> = Arc::new(view_req);
1220
1221        let built = V::cell_factory(
1222            any_view,
1223            self.registry.clone(),
1224            request,
1225            Arc::new(self.clone()),
1226        )
1227        .expect("view cell factory should not fail for typed view");
1228        self.view_cache.insert(key, MapCacheEntry::new(&built));
1229        built
1230    }
1231
1232    /// Back-compat alias for type-erased view map.
1233    pub fn view_map<V>(&self, view: V, request: Arc<RequestContext>) -> FilteredViewCellMap
1234    where
1235        V: ViewFactory + Clone + Send + Sync + 'static,
1236        V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1237    {
1238        self.view_map_untyped(view, request)
1239    }
1240
1241    /// Build a typed reactive view cell map.
1242    pub fn view<V>(&self, view: V, request: Arc<RequestContext>) -> TypedViewCellMap<V::Item>
1243    where
1244        V: ViewFactory + Clone + Send + Sync + 'static,
1245        V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1246    {
1247        typed_map_arc_from_any_item(self.view_map_untyped(view, request), "CellServerCtx::view")
1248    }
1249
1250    /// Get a one-shot typed entity snapshot by id.
1251    pub fn entity_snapshot<T>(&self, id: &<T as WithTypedId>::Id) -> Option<Arc<T>>
1252    where
1253        T: Eventable + WithTypedId + Send + Sync + 'static,
1254        <T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
1255    {
1256        let store = self.registry.get_or_create(T::entity_name_static());
1257        let map_key = id.map_key();
1258        let item = store.get_value(&map_key)?;
1259        Some(downcast_any_item_arc::<T>(
1260            &item,
1261            "CellServerCtx::entity_snapshot",
1262        ))
1263    }
1264
1265    /// Get one-shot typed entity snapshots for an item type.
1266    pub fn entity_snapshots<T>(&self) -> Vec<Arc<T>>
1267    where
1268        T: Eventable + WithTypedId + Send + Sync + 'static,
1269        <T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
1270    {
1271        let store = self.registry.get_or_create(T::entity_name_static());
1272        store
1273            .snapshot()
1274            .into_iter()
1275            .map(|(_, item)| downcast_any_item_arc::<T>(&item, "CellServerCtx::entity_snapshots"))
1276            .collect()
1277    }
1278
1279    /// Get one-shot typed entity snapshots for the provided ids.
1280    pub fn entity_snapshots_by_id<T>(
1281        &self,
1282        ids: impl IntoIterator<Item = <T as WithTypedId>::Id>,
1283    ) -> Vec<Arc<T>>
1284    where
1285        T: Eventable + WithTypedId + Send + Sync + 'static,
1286        <T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
1287    {
1288        ids.into_iter()
1289            .filter_map(|id| self.entity_snapshot::<T>(&id))
1290            .collect()
1291    }
1292
1293    /// Run a one-shot (non-reactive) query.
1294    ///
1295    /// Iterates the store directly and returns matching entities without creating
1296    /// any reactive cells or subscriptions. Use this for command handlers and other
1297    /// contexts where you need a point-in-time snapshot, not a live query.
1298    pub fn query_snapshot<Q>(&self, query: Q, request: Arc<RequestContext>) -> Vec<Arc<Q::Item>>
1299    where
1300        Q: QueryHandler + QueryParams + Clone + Send + Sync + 'static,
1301        Q::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
1302    {
1303        let query_item_type = Q::query_item_type_static();
1304        let store = self.registry.get_or_create(&query_item_type);
1305
1306        let query_context = Arc::new(QueryContext {
1307            req: request.clone(),
1308        });
1309        let query = Arc::new(query);
1310
1311        store
1312            .snapshot()
1313            .into_iter()
1314            .filter_map(|(_, item)| {
1315                let typed_item =
1316                    downcast_any_item_arc::<Q::Item>(&item, "CellServerCtx::query_snapshot");
1317                let ctx = QueryTestCtx {
1318                    item: typed_item.clone(),
1319                    query: query.clone(),
1320                    query_context: query_context.clone(),
1321                };
1322                if Q::test_entity(ctx) {
1323                    Some(typed_item)
1324                } else {
1325                    None
1326                }
1327            })
1328            .collect()
1329    }
1330
1331    pub fn report<R>(
1332        &self,
1333        report: R,
1334        request: Arc<RequestContext>,
1335    ) -> Cell<Arc<R::Output>, CellImmutable>
1336    where
1337        R: ReportHandler + ReportId + CacheKey + Clone + serde::Serialize + 'static,
1338    {
1339        let key = self.cache_key("report", report.report_id().as_ref(), &report, &request);
1340
1341        // Cache hit: if the cell is still alive (has subscribers), reuse it.
1342        if let Some(existing) = self.report_cache.get(&key) {
1343            if let Some(entry) = existing
1344                .value()
1345                .as_any()
1346                .downcast_ref::<ReportCacheEntry<Arc<R::Output>>>()
1347                && let Some(shared) = entry.get()
1348            {
1349                return shared;
1350            }
1351            // Dead entry — drop the ref before removing to avoid DashMap deadlock
1352            drop(existing);
1353            self.report_cache.remove(&key);
1354        }
1355
1356        let nested_ctx = ReportContext::new(request, Arc::new(self.clone()));
1357        let built = report.compute(nested_ctx);
1358        self.report_cache
1359            .insert(key.clone(), Arc::new(ReportCacheEntry::new(&built)));
1360
1361        built
1362    }
1363
1364    pub fn new_server_transaction(&self) -> Arc<RequestContext> {
1365        Arc::new(RequestContext {
1366            tx: Arc::<str>::from(Uuid::new_v4().to_string()),
1367            client_id: None,
1368            lineage: vec![],
1369            host_id: self.host_id,
1370            created_at: chrono::Utc::now().to_string(),
1371            windback: None,
1372        })
1373    }
1374}
1375
1376impl std::fmt::Debug for CellServerCtx {
1377    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1378        f.debug_struct("CellServerCtx").finish()
1379    }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384    use std::sync::Arc;
1385
1386    use serde::{Deserialize, Serialize};
1387    use serde_json::json;
1388    use uuid::Uuid;
1389
1390    use super::CellServerCtx;
1391    use crate::{
1392        common::with_id::WithId,
1393        core::item::{
1394            AnyItem, Eventable, IngestBufferPolicy, IngestBufferRegistration, ItemRegistration,
1395        },
1396        hyphae::Gettable,
1397        search::SearchIndex,
1398        server::{HandlerRegistry, RelationshipManager, persister::PersisterRouter},
1399        store::StoreRegistry,
1400        wire::{MEvent, MEventType},
1401    };
1402
1403    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1404    struct BufferedTestItem {
1405        id: Arc<str>,
1406        value: i32,
1407    }
1408
1409    impl WithId for BufferedTestItem {
1410        fn id(&self) -> Arc<str> {
1411            self.id.clone()
1412        }
1413    }
1414
1415    impl AnyItem for BufferedTestItem {
1416        fn as_any(&self) -> &dyn std::any::Any {
1417            self
1418        }
1419
1420        fn entity_type(&self) -> &'static str {
1421            "BufferedTestItem"
1422        }
1423
1424        fn equals(&self, other: &dyn AnyItem) -> bool {
1425            other
1426                .as_any()
1427                .downcast_ref::<Self>()
1428                .map(|typed| self == typed)
1429                .unwrap_or(false)
1430        }
1431    }
1432
1433    impl Eventable for BufferedTestItem {
1434        const ENTITY_NAME_STATIC: &'static str = "BufferedTestItem";
1435    }
1436
1437    inventory::submit! {
1438        ItemRegistration {
1439            entity_type: "BufferedTestItem",
1440            crate_name: env!("CARGO_PKG_NAME"),
1441            parse: BufferedTestItem::parse,
1442        }
1443    }
1444
1445    inventory::submit! {
1446        IngestBufferRegistration {
1447            entity_type: "BufferedTestItem",
1448            policy: IngestBufferPolicy::TimeWindow { window_ms: 60_000 },
1449        }
1450    }
1451
1452    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1453    struct ImmediateTestItem {
1454        id: Arc<str>,
1455        value: i32,
1456    }
1457
1458    impl WithId for ImmediateTestItem {
1459        fn id(&self) -> Arc<str> {
1460            self.id.clone()
1461        }
1462    }
1463
1464    impl AnyItem for ImmediateTestItem {
1465        fn as_any(&self) -> &dyn std::any::Any {
1466            self
1467        }
1468
1469        fn entity_type(&self) -> &'static str {
1470            "ImmediateTestItem"
1471        }
1472
1473        fn equals(&self, other: &dyn AnyItem) -> bool {
1474            other
1475                .as_any()
1476                .downcast_ref::<Self>()
1477                .map(|typed| self == typed)
1478                .unwrap_or(false)
1479        }
1480    }
1481
1482    impl Eventable for ImmediateTestItem {
1483        const ENTITY_NAME_STATIC: &'static str = "ImmediateTestItem";
1484    }
1485
1486    inventory::submit! {
1487        ItemRegistration {
1488            entity_type: "ImmediateTestItem",
1489            crate_name: env!("CARGO_PKG_NAME"),
1490            parse: ImmediateTestItem::parse,
1491        }
1492    }
1493
1494    fn make_ctx() -> CellServerCtx {
1495        CellServerCtx::new(
1496            Uuid::new_v4(),
1497            Arc::new(StoreRegistry::new()),
1498            Arc::new(HandlerRegistry::new()),
1499            Arc::new(RelationshipManager::new()),
1500            Arc::new(PersisterRouter::default()),
1501            Arc::new(SearchIndex::new()),
1502            Arc::new(dashmap::DashMap::new()),
1503            None,
1504            None,
1505        )
1506    }
1507
1508    #[test]
1509    fn apply_event_batch_keeps_default_entities_immediate() {
1510        let ctx = make_ctx();
1511        let applied = ctx
1512            .apply_event_batch(vec![MEvent {
1513                item: json!({
1514                    "id": "immediate-1",
1515                    "value": 7,
1516                }),
1517                change_type: MEventType::SET,
1518                item_type: "ImmediateTestItem".to_string(),
1519                created_at: "2026-03-12T00:00:00Z".to_string(),
1520                tx: "tx-immediate".to_string(),
1521                source_id: Some("test".to_string()),
1522                options: None,
1523            }])
1524            .expect("apply_event_batch should succeed");
1525
1526        assert_eq!(applied, 1);
1527        let store = ctx.registry.get_or_create("ImmediateTestItem");
1528        assert!(store.get(&Arc::<str>::from("immediate-1")).get().is_some());
1529    }
1530
1531    #[test]
1532    fn apply_event_batch_buffers_opted_in_entities() {
1533        let ctx = make_ctx();
1534        let applied = ctx
1535            .apply_event_batch(vec![MEvent {
1536                item: json!({
1537                    "id": "buffered-1",
1538                    "value": 42,
1539                }),
1540                change_type: MEventType::SET,
1541                item_type: "BufferedTestItem".to_string(),
1542                created_at: "2026-03-12T00:00:00Z".to_string(),
1543                tx: "tx-buffered".to_string(),
1544                source_id: Some("test".to_string()),
1545                options: None,
1546            }])
1547            .expect("apply_event_batch should succeed");
1548
1549        assert_eq!(applied, 1);
1550        let store = ctx.registry.get_or_create("BufferedTestItem");
1551        assert!(store.get(&Arc::<str>::from("buffered-1")).get().is_none());
1552
1553        let flushed = ctx.flush_all_buffered_events();
1554        assert_eq!(flushed, 1);
1555        assert!(store.get(&Arc::<str>::from("buffered-1")).get().is_some());
1556    }
1557}