#![cfg(feature = "bench")]
use std::sync::Arc;
use myko::{
bench_entities::{BenchItem, GetBenchItemsByQuery, PartialBenchItem, SwitchMapReport},
hyphae::{Cell, Gettable, Mutable, SwitchMapExt},
search::SearchIndex,
server::{CellServerCtx, HandlerRegistry, RelationshipManager, persister::PersisterRouter},
store::StoreRegistry,
wire::{MEvent, MEventType},
};
use uuid::Uuid;
fn make_ctx() -> CellServerCtx {
CellServerCtx::new(
Uuid::new_v4(),
Arc::new(StoreRegistry::new()),
Arc::new(HandlerRegistry::new()),
Arc::new(RelationshipManager::new()),
Arc::new(PersisterRouter::default()),
Arc::new(SearchIndex::new()),
Arc::new(dashmap::DashMap::new()),
None,
None,
)
}
fn insert_bench_item(ctx: &CellServerCtx, id: &str, category: &str, value: i64) {
let item = BenchItem {
id: id.into(),
name: format!("item-{}", id),
category: category.to_string(),
value,
};
let event = MEvent::from_item(&item, MEventType::SET, &format!("tx-{}", id));
ctx.apply_event_batch(vec![event]).unwrap();
}
#[test]
fn query_map_inside_switch_map_cache_entries_become_reclaimable() {
let ctx = make_ctx();
for i in 0..10 {
insert_bench_item(&ctx, &format!("a-{}", i), "alpha", i);
insert_bench_item(&ctx, &format!("b-{}", i), "beta", i);
insert_bench_item(&ctx, &format!("g-{}", i), "gamma", i);
}
let categories = ["alpha", "beta", "gamma"];
let selector = Cell::new(0usize);
let ctx_clone = ctx.clone();
let switched = selector.switch_map(move |idx| {
let category = categories[*idx % categories.len()].to_string();
let request = ctx_clone.new_server_transaction();
let query_result = ctx_clone.query_map(
GetBenchItemsByQuery(PartialBenchItem {
category: Some(category),
..Default::default()
}),
request,
);
query_result.items()
});
assert_eq!(switched.get().len(), 10);
let cache_before = ctx.query_cache_len();
for i in 1..=30 {
selector.set(i);
}
assert_eq!(switched.get().len(), 10);
let cache_after = ctx.query_cache_len();
assert!(
cache_after >= cache_before,
"cache should have grown or stayed the same"
);
drop(switched);
drop(selector);
let (q_removed, _, _) = ctx.sweep_dead_cache_entries();
let cache_final = ctx.query_cache_len();
assert!(
q_removed > 0,
"sweep should have removed dead query cache entries, but removed 0 \
(cache before={}, after switch={}, final={})",
cache_before,
cache_after,
cache_final,
);
}
#[test]
fn query_map_same_params_inside_switch_map_reuses_cache() {
let ctx = make_ctx();
for i in 0..5 {
insert_bench_item(&ctx, &format!("item-{}", i), "tools", i);
}
let trigger = Cell::new(0u64);
let ctx_clone = ctx.clone();
let switched = trigger.switch_map(move |_| {
let request = ctx_clone.new_server_transaction();
ctx_clone
.query_map(
GetBenchItemsByQuery(PartialBenchItem {
category: Some("tools".to_string()),
..Default::default()
}),
request,
)
.items()
});
assert_eq!(switched.get().len(), 5);
let cache_after_init = ctx.query_cache_len();
for i in 1..=50 {
trigger.set(i);
}
assert_eq!(switched.get().len(), 5);
let cache_after_switches = ctx.query_cache_len();
assert!(
cache_after_switches <= cache_after_init + 5,
"cache grew from {} to {} after 50 same-param switches — \
expected cache reuse, got accumulation",
cache_after_init,
cache_after_switches,
);
}
#[test]
fn query_map_inside_switch_map_with_active_store_mutations() {
let ctx = make_ctx();
for i in 0..10 {
insert_bench_item(&ctx, &format!("a-{}", i), "alpha", i);
insert_bench_item(&ctx, &format!("b-{}", i), "beta", i);
}
let categories = ["alpha", "beta"];
let selector = Cell::new(0usize);
let ctx_clone = ctx.clone();
let switched = selector.switch_map(move |idx| {
let category = categories[*idx % categories.len()].to_string();
let request = ctx_clone.new_server_transaction();
ctx_clone
.query_map(
GetBenchItemsByQuery(PartialBenchItem {
category: Some(category),
..Default::default()
}),
request,
)
.items()
});
assert_eq!(switched.get().len(), 10);
for round in 0..20 {
selector.set(round + 1);
let cat = categories[round % categories.len()];
for i in 0..5 {
insert_bench_item(
&ctx,
&format!("{}-{}", cat.chars().next().unwrap(), i),
cat,
(round * 10 + i) as i64,
);
}
}
let live_before_drop = ctx.query_cache_live_count();
let total_before_drop = ctx.query_cache_len();
drop(switched);
drop(selector);
let (q_removed, _, _) = ctx.sweep_dead_cache_entries();
let live_after = ctx.query_cache_live_count();
assert_eq!(
live_after, 0,
"expected 0 live cache entries after dropping switch_map, found {} \
(total before drop={}, live before drop={}, swept={})",
live_after, total_before_drop, live_before_drop, q_removed,
);
}
#[test]
fn query_cache_live_count_tracks_reachable_entries() {
let ctx = make_ctx();
for i in 0..3 {
insert_bench_item(&ctx, &format!("item-{}", i), "test", i);
}
assert_eq!(ctx.query_cache_live_count(), 0);
let request = ctx.new_server_transaction();
let map = ctx.query_map(
GetBenchItemsByQuery(PartialBenchItem {
category: Some("test".to_string()),
..Default::default()
}),
request,
);
assert_eq!(ctx.query_cache_live_count(), 1);
let request2 = ctx.new_server_transaction();
let map2 = ctx.query_map(
GetBenchItemsByQuery(PartialBenchItem {
category: Some("other".to_string()),
..Default::default()
}),
request2,
);
assert_eq!(ctx.query_cache_live_count(), 2);
drop(map);
ctx.sweep_dead_cache_entries();
assert_eq!(ctx.query_cache_live_count(), 1);
drop(map2);
ctx.sweep_dead_cache_entries();
assert_eq!(ctx.query_cache_live_count(), 0);
}
#[test]
fn report_with_switch_map_query_map_cleans_up_cache() {
let ctx = make_ctx();
for i in 0..10 {
insert_bench_item(&ctx, &format!("item-{}", i), "alpha", i);
}
let cache_before = ctx.query_cache_len();
let report_cache_before = ctx.report_cache_len();
let request = ctx.new_server_transaction();
let report_cell = ctx.report(
SwitchMapReport {
category: "alpha".to_string(),
},
request,
);
assert_eq!(report_cell.get().len(), 10);
let cache_after_report = ctx.query_cache_len();
let report_cache_after_report = ctx.report_cache_len();
assert!(cache_after_report > cache_before);
assert!(report_cache_after_report > report_cache_before);
for round in 0..20 {
insert_bench_item(&ctx, &format!("new-{}", round), "alpha", 100 + round as i64);
}
assert_eq!(report_cell.get().len(), 30);
let cache_during = ctx.query_cache_len();
drop(report_cell);
let (q_swept, _, r_swept) = ctx.sweep_dead_cache_entries();
let cache_final = ctx.query_cache_live_count();
let report_cache_final = ctx.report_cache_live_count();
assert_eq!(
cache_final, 0,
"expected 0 live query cache entries after dropping report, found {} \
(cache grew to {} during mutations, swept {})",
cache_final, cache_during, q_swept,
);
assert_eq!(
report_cache_final, 0,
"expected 0 live report cache entries after dropping report, found {} (swept {})",
report_cache_final, r_swept,
);
}
#[test]
fn report_switch_map_cache_bounded_during_active_mutations() {
let ctx = make_ctx();
for i in 0..5 {
insert_bench_item(&ctx, &format!("item-{}", i), "beta", i);
}
let request = ctx.new_server_transaction();
let report_cell = ctx.report(
SwitchMapReport {
category: "beta".to_string(),
},
request,
);
assert_eq!(report_cell.get().len(), 5);
for round in 0..100 {
insert_bench_item(
&ctx,
&format!("stress-{}", round),
"beta",
1000 + round as i64,
);
}
assert_eq!(report_cell.get().len(), 105);
ctx.sweep_dead_cache_entries();
let live_count = ctx.query_cache_live_count();
let total_count = ctx.query_cache_len();
assert!(
live_count <= 10,
"live query cache entries ({}) should be bounded during active use, \
but found {} total cache entries — old entries are being kept alive",
live_count,
total_count,
);
drop(report_cell);
ctx.sweep_dead_cache_entries();
assert_eq!(
ctx.query_cache_live_count(),
0,
"all entries should be dead after drop"
);
}