use std::{
any::Any,
collections::HashMap,
sync::{Arc, Mutex},
thread,
time::Duration,
};
use dashmap::DashMap;
use hyphae::{Cell, CellImmutable, CellMap, CellMutable, Gettable, IdFor, Mutable, WeakCellMap};
use serde::de::DeserializeOwned;
use uuid::Uuid;
use super::{
HandlerRegistry, RelationshipManager,
persister::{PersistError, PersistHealth, PersisterRouter},
};
use crate::{
cache::CacheKey,
client::{ConnectionStatus, MykoClient},
common::{
to_value::ToValue,
with_id::{WithId, WithTypedId},
},
core::item::{
AnyItem, Eventable, IngestBufferPolicy, downcast_any_item_arc, typed_map_arc_from_any_item,
},
query::{
FilteredCellMap, QueryContext, QueryFactory, QueryHandler, QueryParams, QueryRequest,
QueryTestCtx,
},
report::{ReportContext, ReportHandler, ReportId},
request::RequestContext,
search::SearchIndex,
store::StoreRegistry,
view::{FilteredViewCellMap, TypedViewCellMap, ViewFactory},
wire::{EventOptions, MEvent, MEventType},
};
type AnyItemArc = Arc<dyn AnyItem>;
type AnyItemBatchEntries = Vec<(Arc<str>, AnyItemArc)>;
type AnyItemEntriesByType = HashMap<Arc<str>, AnyItemBatchEntries>;
trait ReportCacheEntryDyn: Any + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn is_alive(&self) -> bool;
}
struct ReportCacheEntry<T> {
weak: hyphae::cell::WeakCell<T, CellImmutable>,
}
impl<T> ReportCacheEntry<T>
where
T: Clone + Send + Sync + 'static,
{
fn new(cell: &Cell<T, CellImmutable>) -> Self {
Self {
weak: cell.downgrade(),
}
}
fn get(&self) -> Option<Cell<T, CellImmutable>> {
self.weak.upgrade()
}
}
impl<T> ReportCacheEntryDyn for ReportCacheEntry<T>
where
T: Clone + Send + Sync + 'static,
{
fn as_any(&self) -> &dyn Any {
self
}
fn is_alive(&self) -> bool {
self.weak.upgrade().is_some()
}
}
struct MapCacheEntry {
weak: WeakCellMap<Arc<str>, AnyItemArc>,
}
#[derive(Default)]
struct BufferedIngestState {
events: Vec<MEvent>,
flush_scheduled: bool,
}
struct BufferedIngestType {
state: Mutex<BufferedIngestState>,
}
impl BufferedIngestType {
fn new() -> Self {
Self {
state: Mutex::new(BufferedIngestState::default()),
}
}
}
impl MapCacheEntry {
fn new(map: &FilteredCellMap) -> Self {
Self {
weak: map.downgrade(),
}
}
fn get(&self) -> Option<FilteredCellMap> {
self.weak.upgrade().map(|map| map.lock())
}
}
#[derive(Clone)]
pub struct CellServerCtx {
pub host_id: Uuid,
pub registry: Arc<StoreRegistry>,
pub handler_registry: Arc<HandlerRegistry>,
relationship_manager: Arc<RelationshipManager>,
persisters: Arc<PersisterRouter>,
search_index: Arc<SearchIndex>,
peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>,
peer_clients_tick: Cell<u64, CellMutable>,
event_sink: Option<flume::Sender<MEvent>>,
query_cache: Arc<DashMap<String, MapCacheEntry>>,
view_cache: Arc<DashMap<String, MapCacheEntry>>,
report_cache: Arc<DashMap<String, Arc<dyn ReportCacheEntryDyn>>>,
ingest_buffers: Arc<DashMap<Arc<str>, Arc<BufferedIngestType>>>,
history_replay: Option<Arc<dyn crate::server::HistoryReplayProvider>>,
}
impl CellServerCtx {
#[allow(clippy::too_many_arguments)]
pub fn new(
host_id: Uuid,
registry: Arc<StoreRegistry>,
handler_registry: Arc<HandlerRegistry>,
relationship_manager: Arc<RelationshipManager>,
persisters: Arc<PersisterRouter>,
search_index: Arc<SearchIndex>,
peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>,
event_sink: Option<flume::Sender<MEvent>>,
history_replay: Option<Arc<dyn crate::server::HistoryReplayProvider>>,
) -> Self {
Self {
host_id,
registry,
handler_registry,
relationship_manager,
persisters,
search_index,
peer_clients,
peer_clients_tick: Cell::new(0).with_name("peer_clients_tick"),
event_sink,
query_cache: Arc::new(DashMap::new()),
view_cache: Arc::new(DashMap::new()),
report_cache: Arc::new(DashMap::new()),
ingest_buffers: Arc::new(DashMap::new()),
history_replay,
}
}
fn cache_key<T: CacheKey>(
&self,
kind: &str,
id: &str,
params: &T,
request: &RequestContext,
) -> String {
let payload_hash = params.cache_key_hash();
format!("{}:{kind}:{id}:{payload_hash:016x}", request.host_id)
}
pub fn search_index(&self) -> &Arc<SearchIndex> {
&self.search_index
}
pub fn history_replay(&self) -> Option<&Arc<dyn crate::server::HistoryReplayProvider>> {
self.history_replay.as_ref()
}
pub fn register_peer_client<S: AsRef<str>>(&self, peer_id: S, client: Arc<MykoClient>) {
self.peer_clients
.insert(Arc::<str>::from(peer_id.as_ref()), client);
let next = self.peer_clients_tick.get().saturating_add(1);
self.peer_clients_tick.set(next);
}
pub fn unregister_peer_client(&self, peer_id: &str) {
if self.peer_clients.remove(peer_id).is_some() {
let next = self.peer_clients_tick.get().saturating_add(1);
self.peer_clients_tick.set(next);
}
}
pub fn peer_client(&self, peer_id: &str) -> Option<Arc<MykoClient>> {
self.peer_clients
.get(peer_id)
.map(|entry| entry.value().clone())
}
pub fn peer_connection_status(&self, peer_id: &str) -> Option<ConnectionStatus> {
self.peer_client(peer_id)
.map(|client| client.get_connection_status_sync())
}
pub fn peer_clients_tick(&self) -> Cell<u64, CellImmutable> {
self.peer_clients_tick.clone().lock()
}
pub fn peer_client_count(&self) -> usize {
self.peer_clients.len()
}
pub fn persist_health(&self) -> Arc<PersistHealth> {
self.persisters.default_health()
}
pub fn query_cache_len(&self) -> usize {
self.query_cache.len()
}
pub fn view_cache_len(&self) -> usize {
self.view_cache.len()
}
pub fn report_cache_len(&self) -> usize {
self.report_cache.len()
}
pub fn report_cache_live_count(&self) -> usize {
self.report_cache
.iter()
.filter(|entry| entry.value().is_alive())
.count()
}
pub fn query_cache_live_count(&self) -> usize {
self.query_cache
.iter()
.filter(|entry| entry.value().weak.upgrade().is_some())
.count()
}
pub fn view_cache_live_count(&self) -> usize {
self.view_cache
.iter()
.filter(|entry| entry.value().weak.upgrade().is_some())
.count()
}
pub fn sweep_dead_cache_entries(&self) -> (usize, usize, usize) {
let q_before = self.query_cache.len();
self.query_cache
.retain(|_, entry| entry.weak.upgrade().is_some());
let q_removed = q_before - self.query_cache.len();
let v_before = self.view_cache.len();
self.view_cache
.retain(|_, entry| entry.weak.upgrade().is_some());
let v_removed = v_before - self.view_cache.len();
let r_before = self.report_cache.len();
self.report_cache.retain(|_, entry| entry.is_alive());
let r_removed = r_before - self.report_cache.len();
(q_removed, v_removed, r_removed)
}
pub fn parse_item(
&self,
entity_type: &str,
json: &serde_json::Value,
) -> Option<Arc<dyn AnyItem>> {
let parse = self.handler_registry.get_item_parser(entity_type)?;
parse(json.clone()).ok()
}
pub fn set<T>(&self, entity: &T) -> Result<(), PersistError>
where
T: Eventable + 'static,
{
self.set_with_options(entity, None)
}
pub fn set_with_options<T>(
&self,
entity: &T,
options: Option<EventOptions>,
) -> Result<(), PersistError>
where
T: Eventable + 'static,
{
let options = options.unwrap_or_default();
let id = entity.id();
let entity_type = entity.entity_type();
let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
self.registry
.get_or_create(entity_type)
.insert(id.clone(), item.clone());
log::debug!("[entity] SET {} id={}", entity_type, id);
self.search_index.index_item(&item);
if !options.prevent_relationship_updates {
self.relationship_manager.forward_set(item, self)?;
}
if !options.prevent_persist {
self.produce_set(entity)?;
}
Ok(())
}
pub fn del<T>(&self, entity: &T) -> Result<(), PersistError>
where
T: Eventable + Clone + 'static,
{
self.del_with_options(entity, None)
}
pub fn del_with_options<T>(
&self,
entity: &T,
options: Option<EventOptions>,
) -> Result<(), PersistError>
where
T: Eventable + Clone + 'static,
{
let options = options.unwrap_or_default();
let entity_type = entity.entity_type();
let id = entity.id();
let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
log::debug!("[entity] DEL {} id={}", entity_type, id);
self.registry.get_or_create(entity_type).remove(&id);
self.search_index.remove_entity(&id);
if !options.prevent_relationship_updates {
self.relationship_manager.forward_del(item.clone(), self)?;
}
if !options.prevent_persist {
self.produce_del(entity)?;
}
log::trace!("Published DEL {}:{}", entity_type, id);
Ok(())
}
pub fn batch_set<T>(&self, entities: &[T]) -> Result<(), PersistError>
where
T: Eventable + Clone + 'static,
{
self.batch_set_with_options(entities, None)
}
pub fn batch_set_with_options<T>(
&self,
entities: &[T],
options: Option<EventOptions>,
) -> Result<(), PersistError>
where
T: Eventable + Clone + 'static,
{
if entities.is_empty() {
return Ok(());
}
let options = options.unwrap_or_default();
let entity_type = T::entity_name_static();
let store = self.registry.get_or_create(entity_type);
let mut entries: Vec<(Arc<str>, Arc<dyn AnyItem>)> = Vec::with_capacity(entities.len());
let mut items: Vec<Arc<dyn AnyItem>> = Vec::with_capacity(entities.len());
for entity in entities {
let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
log::debug!("[entity] SET {} id={}", entity_type, entity.id());
self.search_index.index_item(&item);
entries.push((entity.id(), item.clone()));
items.push(item);
}
store.insert_many(entries);
if !options.prevent_relationship_updates {
for item in &items {
self.relationship_manager.forward_set(item.clone(), self)?;
}
}
if !options.prevent_persist {
for item in &items {
self.produce_set_dyn(item)?;
}
}
log::trace!("Published batch SET {} count={}", entity_type, items.len());
Ok(())
}
pub fn batch_del<T>(&self, entities: &[T]) -> Result<(), PersistError>
where
T: Eventable + Clone + 'static,
{
self.batch_del_with_options(entities, None)
}
pub fn batch_del_with_options<T>(
&self,
entities: &[T],
options: Option<EventOptions>,
) -> Result<(), PersistError>
where
T: Eventable + Clone + 'static,
{
if entities.is_empty() {
return Ok(());
}
let options = options.unwrap_or_default();
let entity_type = T::entity_name_static();
let store = self.registry.get_or_create(entity_type);
let mut ids: Vec<Arc<str>> = Vec::with_capacity(entities.len());
let mut items: Vec<Arc<dyn AnyItem>> = Vec::with_capacity(entities.len());
for entity in entities {
let item: Arc<dyn AnyItem> = Arc::new(entity.clone());
let id = item.id();
log::debug!("[entity] DEL {} id={}", entity_type, id);
self.search_index.remove_entity(&id);
ids.push(id);
items.push(item);
}
store.remove_many(ids);
if !options.prevent_relationship_updates {
self.relationship_manager.forward_del_batch(&items, self)?;
}
if !options.prevent_persist {
for item in &items {
self.produce_del_dyn(item)?;
}
}
log::trace!("Published batch DEL {} count={}", entity_type, items.len());
Ok(())
}
pub fn set_dyn(&self, item: Arc<dyn AnyItem>) -> Result<(), PersistError> {
self.set_dyn_with_options(item, None)
}
pub fn set_dyn_with_options(
&self,
item: Arc<dyn AnyItem>,
options: Option<EventOptions>,
) -> Result<(), PersistError> {
let options = options.unwrap_or_default();
let entity_type = item.entity_type();
let id = item.id();
log::debug!("[entity] SET {} id={}", entity_type, id);
self.registry
.get_or_create(entity_type)
.insert(id.clone(), item.clone());
self.search_index.index_item(&item);
if !options.prevent_relationship_updates {
self.relationship_manager.forward_set(item.clone(), self)?;
}
if !options.prevent_persist {
self.produce_set_dyn(&item)?;
}
log::trace!("Published SET {}:{}", entity_type, id);
Ok(())
}
pub fn batch_set_dyn_with_options(
&self,
items: &[Arc<dyn AnyItem>],
options: Option<EventOptions>,
) -> Result<(), PersistError> {
if items.is_empty() {
return Ok(());
}
let options = options.unwrap_or_default();
let mut items_by_type: std::collections::BTreeMap<&'static str, Vec<Arc<dyn AnyItem>>> =
std::collections::BTreeMap::new();
for item in items {
items_by_type
.entry(item.entity_type())
.or_default()
.push(item.clone());
}
for (entity_type, typed_items) in items_by_type {
let store = self.registry.get_or_create(entity_type);
let mut entries: Vec<(Arc<str>, Arc<dyn AnyItem>)> =
Vec::with_capacity(typed_items.len());
for item in &typed_items {
log::debug!("[entity] SET {} id={}", entity_type, item.id());
self.search_index.index_item(item);
entries.push((item.id(), item.clone()));
}
store.insert_many(entries);
if !options.prevent_relationship_updates {
for item in &typed_items {
self.relationship_manager.forward_set(item.clone(), self)?;
}
}
if !options.prevent_persist {
for item in &typed_items {
self.produce_set_dyn(item)?;
}
}
log::trace!(
"Published batch SET {} count={}",
entity_type,
typed_items.len()
);
}
Ok(())
}
pub fn del_dyn(&self, item: Arc<dyn AnyItem>) -> Result<(), PersistError> {
self.del_dyn_with_options(item, None)
}
pub fn del_dyn_with_options(
&self,
item: Arc<dyn AnyItem>,
options: Option<EventOptions>,
) -> Result<(), PersistError> {
let options = options.unwrap_or_default();
let entity_type = item.entity_type();
let id = item.id();
log::debug!("[entity] DEL {} id={}", entity_type, id);
self.registry.get_or_create(entity_type).remove(&id);
self.search_index.remove_entity(&id);
if !options.prevent_relationship_updates {
self.relationship_manager.forward_del(item.clone(), self)?;
}
if !options.prevent_persist {
self.produce_del_dyn(&item)?;
}
log::trace!("Published DEL {}:{}", entity_type, id);
Ok(())
}
pub fn batch_del_dyn_with_options(
&self,
items: &[Arc<dyn AnyItem>],
options: Option<EventOptions>,
) -> Result<(), PersistError> {
if items.is_empty() {
return Ok(());
}
let options = options.unwrap_or_default();
let mut items_by_type: std::collections::BTreeMap<&'static str, Vec<Arc<dyn AnyItem>>> =
std::collections::BTreeMap::new();
for item in items {
items_by_type
.entry(item.entity_type())
.or_default()
.push(item.clone());
}
for (entity_type, typed_items) in items_by_type {
let store = self.registry.get_or_create(entity_type);
let mut ids: Vec<Arc<str>> = Vec::with_capacity(typed_items.len());
for item in &typed_items {
let id = item.id();
log::debug!("[entity] DEL {} id={}", entity_type, id);
self.search_index.remove_entity(&id);
ids.push(id);
}
store.remove_many(ids);
if !options.prevent_relationship_updates {
self.relationship_manager
.forward_del_batch(&typed_items, self)?;
}
if !options.prevent_persist {
for item in &typed_items {
self.produce_del_dyn(item)?;
}
}
}
Ok(())
}
pub fn del_by_id_with_options(
&self,
entity_type: &str,
id: &str,
options: Option<EventOptions>,
) -> Result<(), PersistError> {
let options = options.unwrap_or_default();
let id_arc: Arc<str> = id.into();
let existing = self
.registry
.get(entity_type)
.and_then(|store| store.get(&id_arc).get());
self.registry.get_or_create(entity_type).remove(&id_arc);
self.search_index.remove_entity(id);
if !options.prevent_persist {
if let Some(item) = existing {
self.produce_del_dyn(&item)?;
} else {
log::warn!(
"del_by_id could not persist DEL without full entity: {}:{}",
entity_type,
id
);
}
}
log::trace!("Published DEL {}:{}", entity_type, id);
Ok(())
}
pub fn del_by_id(&self, entity_type: &str, id: &str) -> Result<(), PersistError> {
self.del_by_id_with_options(entity_type, id, None)
}
pub fn apply_event(&self, event: MEvent) -> Result<bool, PersistError> {
Ok(self.apply_event_batch(vec![event])? == 1)
}
pub fn apply_event_batch(&self, events: Vec<MEvent>) -> Result<usize, PersistError> {
if events.is_empty() {
return Ok(0);
}
let mut accepted = 0usize;
let mut immediate_events = Vec::new();
let mut buffered_by_type: HashMap<Arc<str>, (u64, Vec<MEvent>)> = HashMap::new();
for event in events {
match self
.handler_registry
.get_item_buffer_policy(&event.item_type)
{
IngestBufferPolicy::None => immediate_events.push(event),
IngestBufferPolicy::TimeWindow { window_ms } => {
let entity_type: Arc<str> = event.item_type.clone().into();
buffered_by_type
.entry(entity_type)
.or_insert_with(|| (window_ms, Vec::new()))
.1
.push(event);
}
}
}
if !immediate_events.is_empty() {
accepted += self.apply_event_batch_immediate(immediate_events)?;
}
for (entity_type, (window_ms, buffered_events)) in buffered_by_type {
accepted += buffered_events.len();
self.enqueue_buffered_events(entity_type, window_ms, buffered_events);
}
Ok(accepted)
}
fn apply_event_batch_immediate(&self, events: Vec<MEvent>) -> Result<usize, PersistError> {
if events.is_empty() {
return Ok(0);
}
let input_len = events.len();
#[derive(Clone)]
struct SetOp {
item: Arc<dyn AnyItem>,
options: EventOptions,
}
#[derive(Clone)]
struct DelOp {
item: Arc<dyn AnyItem>,
options: EventOptions,
}
let mut sets: Vec<SetOp> = Vec::new();
let mut dels: Vec<DelOp> = Vec::new();
for event in events {
let options = event.options.clone().unwrap_or_default();
match event.change_type {
MEventType::SET => {
if let Some(item) = self.parse_item(&event.item_type, &event.item) {
sets.push(SetOp { item, options });
} else {
log::warn!(
"Unknown entity type or parse error for SET: {}",
event.item_type
);
}
}
MEventType::DEL => {
if let Some(item) = self.parse_item(&event.item_type, &event.item) {
dels.push(DelOp { item, options });
} else {
log::warn!(
"Unknown entity type or parse error for DEL: {}",
event.item_type
);
}
}
}
}
if sets.is_empty() && dels.is_empty() {
return Ok(0);
}
log::trace!(
target: "myko::server::context",
"apply_event_batch parsed: input_events={} sets={} dels={}",
input_len,
sets.len(),
dels.len()
);
let mut inserts_by_type: AnyItemEntriesByType = HashMap::new();
let mut removes_by_type: HashMap<Arc<str>, Vec<Arc<str>>> = HashMap::new();
for op in &sets {
let entity_type: Arc<str> = op.item.entity_type().into();
let id = op.item.id();
log::debug!("[entity] SET {} id={}", entity_type, id);
inserts_by_type
.entry(entity_type)
.or_default()
.push((id, op.item.clone()));
self.search_index.index_item(&op.item);
}
for op in &dels {
let entity_type: Arc<str> = op.item.entity_type().into();
let id = op.item.id();
log::debug!("[entity] DEL {} id={}", entity_type, id);
removes_by_type
.entry(entity_type)
.or_default()
.push(id.clone());
self.search_index.remove_entity(&id);
}
for (entity_type, entries) in inserts_by_type {
log::trace!(
target: "myko::server::context",
"apply_event_batch reduce inserts: entity_type={} count={}",
entity_type,
entries.len()
);
let store = self.registry.get_or_create(entity_type.as_ref());
let count = entries.len();
let before = store.snapshot().len();
store.insert_many(entries);
let after = store.snapshot().len();
if after != before || count > 100 {
log::info!(
"insert_many {}: submitted={} store_before={} store_after={} net_new={}",
entity_type,
count,
before,
after,
after as i64 - before as i64
);
}
}
for (entity_type, keys) in removes_by_type {
log::trace!(
target: "myko::server::context",
"apply_event_batch reduce removes: entity_type={} count={}",
entity_type,
keys.len()
);
let store = self.registry.get_or_create(entity_type.as_ref());
store.remove_many(keys);
}
for op in &sets {
if !op.options.prevent_relationship_updates {
self.relationship_manager
.forward_set(op.item.clone(), self)?;
}
}
let mut dels_with_relationships: HashMap<Arc<str>, Vec<Arc<dyn AnyItem>>> = HashMap::new();
for op in &dels {
if !op.options.prevent_relationship_updates {
dels_with_relationships
.entry(op.item.entity_type().into())
.or_default()
.push(op.item.clone());
}
}
for (_, items) in dels_with_relationships {
self.relationship_manager.forward_del_batch(&items, self)?;
}
for op in &sets {
if !op.options.prevent_persist {
self.produce_set_dyn(&op.item)?;
}
}
for op in &dels {
if !op.options.prevent_persist {
self.produce_del_dyn(&op.item)?;
}
}
Ok(sets.len() + dels.len())
}
fn ingest_buffer_for(&self, entity_type: Arc<str>) -> Arc<BufferedIngestType> {
self.ingest_buffers
.entry(entity_type)
.or_insert_with(|| Arc::new(BufferedIngestType::new()))
.clone()
}
fn enqueue_buffered_events(&self, entity_type: Arc<str>, window_ms: u64, events: Vec<MEvent>) {
let buffer = self.ingest_buffer_for(entity_type.clone());
let should_schedule = {
let Ok(mut state) = buffer.state.lock() else {
log::error!(
"Could not acquire ingest buffer lock for entity_type={}",
entity_type
);
if let Err(e) = self.apply_event_batch_immediate(events) {
log::error!("Failed to apply buffered events for {}: {}", entity_type, e);
}
return;
};
state.events.extend(events);
if state.flush_scheduled {
false
} else {
state.flush_scheduled = true;
true
}
};
if !should_schedule {
return;
}
let ctx = self.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(window_ms));
ctx.flush_buffered_events_for_type(&entity_type);
});
}
fn flush_buffered_events_for_type(&self, entity_type: &Arc<str>) -> usize {
let Some(buffer) = self
.ingest_buffers
.get(entity_type.as_ref())
.map(|entry| entry.clone())
else {
return 0;
};
let events = {
let Ok(mut state) = buffer.state.lock() else {
log::error!(
"Could not acquire ingest buffer lock for flush entity_type={}",
entity_type
);
return 0;
};
state.flush_scheduled = false;
if state.events.is_empty() {
return 0;
}
std::mem::take(&mut state.events)
};
log::trace!(
target: "myko::server::context",
"flush_buffered_events entity_type={} count={}",
entity_type,
events.len()
);
match self.apply_event_batch_immediate(events) {
Ok(count) => count,
Err(e) => {
log::error!("Failed to flush buffered events for {}: {}", entity_type, e);
0
}
}
}
#[cfg(test)]
fn flush_all_buffered_events(&self) -> usize {
let entity_types: Vec<Arc<str>> = self
.ingest_buffers
.iter()
.map(|entry| entry.key().clone())
.collect();
entity_types
.into_iter()
.map(|entity_type| self.flush_buffered_events_for_type(&entity_type))
.sum()
}
fn produce_set<T: Eventable>(&self, entity: &T) -> Result<(), PersistError> {
if let Some(persister) = self.persisters.resolve(T::entity_name_static()) {
let event = MEvent::from_item(entity, MEventType::SET, &self.host_id.to_string());
persister.persist(event)?;
}
if let Some(sink) = &self.event_sink {
let event = MEvent::from_item(entity, MEventType::SET, &self.host_id.to_string());
let _ = sink.send(event);
}
Ok(())
}
fn produce_del<T: Eventable>(&self, entity: &T) -> Result<(), PersistError> {
if let Some(persister) = self.persisters.resolve(T::entity_name_static()) {
let event = MEvent::del(entity, &self.host_id.to_string());
persister.persist(event)?;
}
if let Some(sink) = &self.event_sink {
let event = MEvent::del(entity, &self.host_id.to_string());
let _ = sink.send(event);
}
Ok(())
}
fn produce_del_dyn(&self, item: &Arc<dyn AnyItem>) -> Result<(), PersistError> {
if let Some(persister) = self.persisters.resolve(item.entity_type()) {
let event = MEvent::del_from_any(item, &self.host_id.to_string());
persister.persist(event)?;
}
if let Some(sink) = &self.event_sink {
let event = MEvent::del_from_any(item, &self.host_id.to_string());
let _ = sink.send(event);
}
Ok(())
}
fn produce_set_dyn(&self, item: &Arc<dyn AnyItem>) -> Result<(), PersistError> {
if let Some(persister) = self.persisters.resolve(item.entity_type()) {
let event = MEvent::set_from_value(
item.entity_type(),
item.to_value(),
&self.host_id.to_string(),
);
persister.persist(event)?;
}
if let Some(sink) = &self.event_sink {
let event = MEvent::set_from_value(
item.entity_type(),
item.to_value(),
&self.host_id.to_string(),
);
let _ = sink.send(event);
}
Ok(())
}
pub fn query_map<Q>(
&self,
query: Q,
request: Arc<RequestContext>,
) -> CellMap<Arc<str>, Arc<Q::Item>, CellImmutable>
where
Q: QueryFactory + QueryHandler + QueryParams + Clone + Send + Sync + 'static,
Q::Item:
Eventable + WithId + DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
{
typed_map_arc_from_any_item(
self.query_map_untyped(query, request),
"CellServerCtx::query_map",
)
}
pub fn query_map_untyped<Q>(&self, query: Q, request: Arc<RequestContext>) -> FilteredCellMap
where
Q: QueryFactory + QueryHandler + QueryParams + Clone + Send + Sync + 'static,
Q::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
{
let key = self.cache_key("query", Q::query_id_static().as_ref(), &query, &request);
if let Some(existing) = self.query_cache.get(&key) {
if let Some(shared) = existing.value().get() {
return shared;
}
drop(existing);
self.query_cache.remove(&key);
}
let query_req = QueryRequest::with_tx(query, request.tx.clone());
let any_query: Arc<dyn crate::query::AnyQuery> = Arc::new(query_req);
let built = Q::cell_factory(
any_query,
self.registry.clone(),
request,
Some(Arc::new(self.clone())),
)
.expect("query cell factory should not fail for typed query");
self.query_cache.insert(key, MapCacheEntry::new(&built));
built
}
pub fn view_map_untyped<V>(&self, view: V, request: Arc<RequestContext>) -> FilteredViewCellMap
where
V: ViewFactory + Clone + Send + Sync + 'static,
V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
{
let key = self.cache_key("view", V::view_id_static().as_ref(), &view, &request);
if let Some(existing) = self.view_cache.get(&key) {
if let Some(shared) = existing.value().get() {
return shared;
}
drop(existing);
self.view_cache.remove(&key);
}
let view_req = crate::view::ViewRequest::with_tx(view, request.tx.clone());
let any_view: Arc<dyn crate::view::AnyView> = Arc::new(view_req);
let built = V::cell_factory(
any_view,
self.registry.clone(),
request,
Arc::new(self.clone()),
)
.expect("view cell factory should not fail for typed view");
self.view_cache.insert(key, MapCacheEntry::new(&built));
built
}
pub fn view_map<V>(&self, view: V, request: Arc<RequestContext>) -> FilteredViewCellMap
where
V: ViewFactory + Clone + Send + Sync + 'static,
V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
{
self.view_map_untyped(view, request)
}
pub fn view<V>(&self, view: V, request: Arc<RequestContext>) -> TypedViewCellMap<V::Item>
where
V: ViewFactory + Clone + Send + Sync + 'static,
V::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
{
typed_map_arc_from_any_item(self.view_map_untyped(view, request), "CellServerCtx::view")
}
pub fn entity_snapshot<T>(&self, id: &<T as WithTypedId>::Id) -> Option<Arc<T>>
where
T: Eventable + WithTypedId + Send + Sync + 'static,
<T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
{
let store = self.registry.get_or_create(T::entity_name_static());
let map_key = id.map_key();
let item = store.get_value(&map_key)?;
Some(downcast_any_item_arc::<T>(
&item,
"CellServerCtx::entity_snapshot",
))
}
pub fn entity_snapshots<T>(&self) -> Vec<Arc<T>>
where
T: Eventable + WithTypedId + Send + Sync + 'static,
<T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
{
let store = self.registry.get_or_create(T::entity_name_static());
store
.snapshot()
.into_iter()
.map(|(_, item)| downcast_any_item_arc::<T>(&item, "CellServerCtx::entity_snapshots"))
.collect()
}
pub fn entity_snapshots_by_id<T>(
&self,
ids: impl IntoIterator<Item = <T as WithTypedId>::Id>,
) -> Vec<Arc<T>>
where
T: Eventable + WithTypedId + Send + Sync + 'static,
<T as WithTypedId>::Id: hyphae::IdFor<T, MapKey = Arc<str>>,
{
ids.into_iter()
.filter_map(|id| self.entity_snapshot::<T>(&id))
.collect()
}
pub fn query_snapshot<Q>(&self, query: Q, request: Arc<RequestContext>) -> Vec<Arc<Q::Item>>
where
Q: QueryHandler + QueryParams + Clone + Send + Sync + 'static,
Q::Item: DeserializeOwned + Clone + std::fmt::Debug + Send + Sync + 'static,
{
let query_item_type = Q::query_item_type_static();
let store = self.registry.get_or_create(&query_item_type);
let query_context = Arc::new(QueryContext {
req: request.clone(),
});
let query = Arc::new(query);
store
.snapshot()
.into_iter()
.filter_map(|(_, item)| {
let typed_item =
downcast_any_item_arc::<Q::Item>(&item, "CellServerCtx::query_snapshot");
let ctx = QueryTestCtx {
item: typed_item.clone(),
query: query.clone(),
query_context: query_context.clone(),
};
if Q::test_entity(ctx) {
Some(typed_item)
} else {
None
}
})
.collect()
}
pub fn report<R>(
&self,
report: R,
request: Arc<RequestContext>,
) -> Cell<Arc<R::Output>, CellImmutable>
where
R: ReportHandler + ReportId + CacheKey + Clone + serde::Serialize + 'static,
{
let key = self.cache_key("report", report.report_id().as_ref(), &report, &request);
if let Some(existing) = self.report_cache.get(&key) {
if let Some(entry) = existing
.value()
.as_any()
.downcast_ref::<ReportCacheEntry<Arc<R::Output>>>()
&& let Some(shared) = entry.get()
{
return shared;
}
drop(existing);
self.report_cache.remove(&key);
}
let nested_ctx = ReportContext::new(request, Arc::new(self.clone()));
let built = report.compute(nested_ctx);
self.report_cache
.insert(key.clone(), Arc::new(ReportCacheEntry::new(&built)));
built
}
pub fn new_server_transaction(&self) -> Arc<RequestContext> {
Arc::new(RequestContext {
tx: Arc::<str>::from(Uuid::new_v4().to_string()),
client_id: None,
lineage: vec![],
host_id: self.host_id,
created_at: chrono::Utc::now().to_string(),
windback: None,
})
}
}
impl std::fmt::Debug for CellServerCtx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CellServerCtx").finish()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;
use super::CellServerCtx;
use crate::{
common::with_id::WithId,
core::item::{
AnyItem, Eventable, IngestBufferPolicy, IngestBufferRegistration, ItemRegistration,
},
hyphae::Gettable,
search::SearchIndex,
server::{HandlerRegistry, RelationshipManager, persister::PersisterRouter},
store::StoreRegistry,
wire::{MEvent, MEventType},
};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct BufferedTestItem {
id: Arc<str>,
value: i32,
}
impl WithId for BufferedTestItem {
fn id(&self) -> Arc<str> {
self.id.clone()
}
}
impl AnyItem for BufferedTestItem {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn entity_type(&self) -> &'static str {
"BufferedTestItem"
}
fn equals(&self, other: &dyn AnyItem) -> bool {
other
.as_any()
.downcast_ref::<Self>()
.map(|typed| self == typed)
.unwrap_or(false)
}
}
impl Eventable for BufferedTestItem {
const ENTITY_NAME_STATIC: &'static str = "BufferedTestItem";
}
inventory::submit! {
ItemRegistration {
entity_type: "BufferedTestItem",
crate_name: env!("CARGO_PKG_NAME"),
parse: BufferedTestItem::parse,
}
}
inventory::submit! {
IngestBufferRegistration {
entity_type: "BufferedTestItem",
policy: IngestBufferPolicy::TimeWindow { window_ms: 60_000 },
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct ImmediateTestItem {
id: Arc<str>,
value: i32,
}
impl WithId for ImmediateTestItem {
fn id(&self) -> Arc<str> {
self.id.clone()
}
}
impl AnyItem for ImmediateTestItem {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn entity_type(&self) -> &'static str {
"ImmediateTestItem"
}
fn equals(&self, other: &dyn AnyItem) -> bool {
other
.as_any()
.downcast_ref::<Self>()
.map(|typed| self == typed)
.unwrap_or(false)
}
}
impl Eventable for ImmediateTestItem {
const ENTITY_NAME_STATIC: &'static str = "ImmediateTestItem";
}
inventory::submit! {
ItemRegistration {
entity_type: "ImmediateTestItem",
crate_name: env!("CARGO_PKG_NAME"),
parse: ImmediateTestItem::parse,
}
}
fn make_ctx() -> CellServerCtx {
CellServerCtx::new(
Uuid::new_v4(),
Arc::new(StoreRegistry::new()),
Arc::new(HandlerRegistry::new()),
Arc::new(RelationshipManager::new()),
Arc::new(PersisterRouter::default()),
Arc::new(SearchIndex::new()),
Arc::new(dashmap::DashMap::new()),
None,
None,
)
}
#[test]
fn apply_event_batch_keeps_default_entities_immediate() {
let ctx = make_ctx();
let applied = ctx
.apply_event_batch(vec![MEvent {
item: json!({
"id": "immediate-1",
"value": 7,
}),
change_type: MEventType::SET,
item_type: "ImmediateTestItem".to_string(),
created_at: "2026-03-12T00:00:00Z".to_string(),
tx: "tx-immediate".to_string(),
source_id: Some("test".to_string()),
options: None,
}])
.expect("apply_event_batch should succeed");
assert_eq!(applied, 1);
let store = ctx.registry.get_or_create("ImmediateTestItem");
assert!(store.get(&Arc::<str>::from("immediate-1")).get().is_some());
}
#[test]
fn apply_event_batch_buffers_opted_in_entities() {
let ctx = make_ctx();
let applied = ctx
.apply_event_batch(vec![MEvent {
item: json!({
"id": "buffered-1",
"value": 42,
}),
change_type: MEventType::SET,
item_type: "BufferedTestItem".to_string(),
created_at: "2026-03-12T00:00:00Z".to_string(),
tx: "tx-buffered".to_string(),
source_id: Some("test".to_string()),
options: None,
}])
.expect("apply_event_batch should succeed");
assert_eq!(applied, 1);
let store = ctx.registry.get_or_create("BufferedTestItem");
assert!(store.get(&Arc::<str>::from("buffered-1")).get().is_none());
let flushed = ctx.flush_all_buffered_events();
assert_eq!(flushed, 1);
assert!(store.get(&Arc::<str>::from("buffered-1")).get().is_some());
}
}