use std::sync::Arc;
#[cfg(feature = "kvbm-offload")]
use std::sync::Mutex;
use dynamo_kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, KvCacheStoreData,
KvCacheStoredBlockData, LocalBlockHash, StorageTier,
};
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, PositionalLineageHash, SequenceHash};
use kvbm_logical::registry::BlockRegistry;
use kvbm_logical::tinylfu::TinyLFUTracker;
use kvbm_logical::{BlockManager, ImmutableBlock, MutableBlock};
use rustc_hash::FxHashMap;
use uuid::Uuid;
use crate::common::kv_cache_trace;
use crate::common::protocols::{
G1, KvEventPublishers, MockerEvictionBackend, MoveBlock, PrefillCost,
};
use crate::common::sequence::ActiveSequence;
#[cfg(feature = "kvbm-offload")]
use crate::kvbm_offload::{
G2BlockEventMetadata, G2OffloadBlock, G2RouterEvent, MockOffloadEngine, SwapInHandle,
};
#[cfg(feature = "kvbm-offload")]
pub enum BatchSwapInOutcome {
NoHits,
Scheduled {
handle: SwapInHandle,
destination_slots: Vec<MutableBlock<G1>>,
},
BlockedOnG1Offload,
}
#[cfg(feature = "kvbm-offload")]
pub struct SwapInRegistrationOutcome {
pub consumed_entries: usize,
}
#[cfg(feature = "kvbm-offload")]
pub(crate) struct SwapInRegistrationBlock {
pub(crate) seq_hash: SequenceHash,
pub(crate) plh: PositionalLineageHash,
pub(crate) local_hash: BlockHash,
pub(crate) token_ids: Option<Vec<u32>>,
}
#[cfg(feature = "kvbm-offload")]
enum SwapInSlotReservation {
Reserved(Vec<MutableBlock<G1>>),
BlockedOnG1Offload,
NoCapacity,
}
enum UseOutcome {
ActiveHit,
InactiveHit,
NewStore,
}
enum G1AllocationAttempt {
Allocated {
mutable: MutableBlock<G1>,
evicted_plhs: Vec<PositionalLineageHash>,
},
BlockedOnOffload {
evicted_plhs: Vec<PositionalLineageHash>,
source_slots: Vec<MutableBlock<G1>>,
},
}
#[derive(Clone)]
struct RegisteredBlockInfo {
seq_hash: SequenceHash,
#[cfg_attr(not(feature = "kvbm-offload"), allow(dead_code))]
block_id: usize,
#[cfg_attr(not(feature = "kvbm-offload"), allow(dead_code))]
parent_hash: Option<SequenceHash>,
#[cfg_attr(not(feature = "kvbm-offload"), allow(dead_code))]
local_hash: BlockHash,
#[cfg_attr(not(feature = "kvbm-offload"), allow(dead_code))]
token_ids: Option<Vec<u32>>,
}
pub struct KvManager {
block_manager: BlockManager<G1>,
max_capacity: usize,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
next_event_id: u64,
active_partial: FxHashMap<Uuid, MutableBlock<G1>>,
active_full: FxHashMap<SequenceHash, Vec<ImmutableBlock<G1>>>,
registered_blocks: FxHashMap<PositionalLineageHash, RegisteredBlockInfo>,
#[cfg(feature = "kvbm-offload")]
offload_engine: Option<Arc<Mutex<MockOffloadEngine>>>,
}
impl KvManager {
pub fn new_with_event_sink(
max_capacity: usize,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
) -> Self {
Self::new_with_eviction_backend(
max_capacity,
block_size,
kv_event_publishers,
dp_rank,
MockerEvictionBackend::default(),
)
}
pub fn new_with_eviction_backend(
max_capacity: usize,
block_size: usize,
kv_event_publishers: KvEventPublishers,
dp_rank: u32,
eviction_backend: MockerEvictionBackend,
) -> Self {
debug_assert!(max_capacity > 0, "max_capacity must be > 0");
let mut registry_builder = BlockRegistry::builder();
if matches!(eviction_backend, MockerEvictionBackend::MultiLru) {
let tracker = Arc::new(TinyLFUTracker::new(max_capacity));
registry_builder = registry_builder.frequency_tracker(tracker);
}
let registry = registry_builder.build();
let mut mgr_builder = BlockManager::builder()
.block_count(max_capacity)
.block_size(block_size)
.registry(registry);
mgr_builder = match eviction_backend {
MockerEvictionBackend::Lineage => mgr_builder.with_lineage_backend(),
MockerEvictionBackend::Lru => mgr_builder.with_lru_backend(),
MockerEvictionBackend::MultiLru => mgr_builder.with_multi_lru_backend(),
};
let block_manager = mgr_builder.build().expect("BlockManager build failed");
if !kv_event_publishers.is_empty() {
tracing::info!(
"KvManager initialized with event sink for DP rank {dp_rank} with block_size {block_size}, eviction={eviction_backend:?}"
);
}
Self {
block_manager,
max_capacity,
block_size,
kv_event_publishers,
dp_rank,
next_event_id: 0,
active_partial: FxHashMap::default(),
active_full: FxHashMap::default(),
registered_blocks: FxHashMap::default(),
#[cfg(feature = "kvbm-offload")]
offload_engine: None,
}
}
#[cfg(feature = "kvbm-offload")]
pub fn attach_new_offload_engine(
&mut self,
engine: MockOffloadEngine,
) -> Arc<Mutex<MockOffloadEngine>> {
let shared = Arc::new(Mutex::new(engine));
self.offload_engine = Some(shared.clone());
shared
}
#[cfg(feature = "kvbm-offload")]
pub fn has_offload_engine(&self) -> bool {
self.offload_engine.is_some()
}
#[cfg(feature = "kvbm-offload")]
pub fn tick_offload_engine(&mut self, now_ms: f64) {
let g2_events = if let Some(engine_arc) = self.offload_engine.as_ref() {
let engine = engine_arc.lock().expect("offload engine mutex poisoned");
engine.tick(now_ms);
engine.drain_g2_router_events()
} else {
Vec::new()
};
self.publish_g2_router_events(g2_events);
}
#[cfg(feature = "kvbm-offload")]
pub fn earliest_offload_deadline(&self) -> Option<f64> {
let engine_arc = self.offload_engine.as_ref()?;
let engine = engine_arc.lock().expect("offload engine mutex poisoned");
engine.earliest_pending_deadline()
}
#[cfg(feature = "kvbm-offload")]
fn enqueue_evictions_to_g2(
&mut self,
evicted: &[G2OffloadBlock],
source_slots: Vec<MutableBlock<G1>>,
) -> Vec<G2RouterEvent> {
let Some(engine_arc) = self.offload_engine.as_ref() else {
drop(source_slots);
return Vec::new();
};
if evicted.is_empty() {
drop(source_slots);
return Vec::new();
}
let mut engine = engine_arc.lock().expect("offload engine mutex poisoned");
engine.enqueue_g1_evictions_with_metadata(evicted, source_slots, None);
engine.drain_g2_router_events()
}
#[cfg(feature = "kvbm-offload")]
pub(crate) fn register_swapped_in_blocks(
&mut self,
entries: Vec<SwapInRegistrationBlock>,
initial_parent_hash: Option<SequenceHash>,
destination_slots: Vec<MutableBlock<G1>>,
) -> SwapInRegistrationOutcome {
let total_entries = entries.len();
let mut stored_seq_hashes = Vec::with_capacity(total_entries);
let mut stored_local_hashes = Vec::with_capacity(total_entries);
let mut stored_token_ids = Vec::with_capacity(total_entries);
let mut stored_parent_hash = initial_parent_hash;
let mut metadata_parent_hash = initial_parent_hash;
let mut consumed_entries = 0usize;
let mut destination_slots = destination_slots.into_iter();
for entry in entries {
let Some(mutable) = destination_slots.next() else {
tracing::warn!(
consumed_entries,
entries = total_entries,
"kvbm-offload: swap-in registration ran out of reserved G1 slots"
);
break;
};
if self.active_full.contains_key(&entry.seq_hash) {
drop(mutable);
if !stored_seq_hashes.is_empty() {
self.publish_swap_in_stored_batch(
&mut stored_seq_hashes,
&mut stored_local_hashes,
&mut stored_token_ids,
stored_parent_hash,
);
}
stored_parent_hash = Some(entry.seq_hash);
metadata_parent_hash = Some(entry.seq_hash);
consumed_entries += 1;
continue;
}
let presence = self
.block_manager
.block_registry()
.check_presence::<G1>(&[entry.plh]);
if presence.first().is_some_and(|(_, p)| *p) {
drop(mutable);
if !stored_seq_hashes.is_empty() {
self.publish_swap_in_stored_batch(
&mut stored_seq_hashes,
&mut stored_local_hashes,
&mut stored_token_ids,
stored_parent_hash,
);
}
stored_parent_hash = Some(entry.seq_hash);
metadata_parent_hash = Some(entry.seq_hash);
consumed_entries += 1;
continue;
}
let complete = mutable
.stage(entry.plh, self.block_size)
.expect("stage failed during swap-in registration");
let immutable = self.block_manager.register_block(complete);
let block_id = immutable.block_id();
drop(immutable);
let registry_token_ids = entry.token_ids.clone();
if let Some(token_ids) = entry.token_ids {
stored_token_ids.push(token_ids);
}
self.registered_blocks.insert(
entry.plh,
RegisteredBlockInfo {
seq_hash: entry.seq_hash,
block_id,
parent_hash: metadata_parent_hash,
local_hash: entry.local_hash,
token_ids: registry_token_ids,
},
);
stored_seq_hashes.push(entry.seq_hash);
stored_local_hashes.push(entry.local_hash);
metadata_parent_hash = Some(entry.seq_hash);
consumed_entries += 1;
}
if !stored_seq_hashes.is_empty() {
self.publish_swap_in_stored_batch(
&mut stored_seq_hashes,
&mut stored_local_hashes,
&mut stored_token_ids,
stored_parent_hash,
);
}
SwapInRegistrationOutcome { consumed_entries }
}
#[cfg(feature = "kvbm-offload")]
fn publish_swap_in_stored_batch(
&mut self,
stored_seq_hashes: &mut Vec<SequenceHash>,
stored_local_hashes: &mut Vec<BlockHash>,
stored_token_ids: &mut Vec<Vec<u32>>,
parent_hash: Option<SequenceHash>,
) {
if stored_seq_hashes.is_empty() {
return;
}
let full_blocks = std::mem::take(stored_seq_hashes);
let local_hashes = std::mem::take(stored_local_hashes);
let token_ids = if stored_token_ids.len() == full_blocks.len() {
Some(std::mem::take(stored_token_ids))
} else {
stored_token_ids.clear();
None
};
self.publish_kv_event(full_blocks, &local_hashes, parent_hash, true, token_ids);
}
#[cfg(feature = "kvbm-offload")]
pub fn try_batch_swap_in(
&mut self,
remaining_plhs: &[PositionalLineageHash],
now_ms: Option<f64>,
) -> BatchSwapInOutcome {
let Some(engine_arc) = self.offload_engine.clone() else {
return BatchSwapInOutcome::NoHits;
};
let Some(prepared) = ({
let mut engine = engine_arc.lock().expect("offload engine mutex poisoned");
engine.prepare_onboard_prefix(remaining_plhs)
}) else {
return BatchSwapInOutcome::NoHits;
};
let block_count = prepared.block_count();
let destination_slots = match self.reserve_swap_in_destination_slots(block_count) {
SwapInSlotReservation::Reserved(slots) => slots,
SwapInSlotReservation::BlockedOnG1Offload => {
return BatchSwapInOutcome::BlockedOnG1Offload;
}
SwapInSlotReservation::NoCapacity => return BatchSwapInOutcome::NoHits,
};
let handle = {
let mut engine = engine_arc.lock().expect("offload engine mutex poisoned");
engine.start_onboard_prefix(prepared, now_ms)
};
BatchSwapInOutcome::Scheduled {
handle,
destination_slots,
}
}
#[cfg(feature = "kvbm-offload")]
pub(crate) fn try_pin_g1_prefix(
&mut self,
prefix_plhs: &[PositionalLineageHash],
) -> Option<Vec<ImmutableBlock<G1>>> {
if prefix_plhs.is_empty() {
return Some(Vec::new());
}
let pins = self.block_manager.match_blocks(prefix_plhs);
if pins.len() == prefix_plhs.len() {
Some(pins)
} else {
None
}
}
fn publish_kv_event(
&mut self,
full_blocks: Vec<SequenceHash>,
local_hashes: &[BlockHash],
parent_hash: Option<u64>,
is_store: bool,
token_ids: Option<Vec<Vec<u32>>>,
) {
self.publish_kv_event_for_tier(
full_blocks,
local_hashes,
parent_hash,
is_store,
token_ids,
StorageTier::Device,
);
}
fn publish_kv_event_for_tier(
&mut self,
full_blocks: Vec<SequenceHash>,
local_hashes: &[BlockHash],
parent_hash: Option<u64>,
is_store: bool,
token_ids: Option<Vec<Vec<u32>>>,
storage_tier: StorageTier,
) {
if full_blocks.is_empty() {
return;
}
kv_cache_trace::log_vllm_trace(
if is_store { "allocation" } else { "eviction" },
self.dp_rank,
self.block_size,
self.num_active_blocks(),
self.num_inactive_blocks(),
self.max_capacity,
);
if self.kv_event_publishers.is_empty() {
return;
}
let event_data = if is_store {
debug_assert!(
local_hashes.is_empty() || local_hashes.len() == full_blocks.len(),
"publish_kv_event: local_hashes must be empty or 1:1 with full_blocks ({} vs {})",
local_hashes.len(),
full_blocks.len(),
);
KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: parent_hash.map(ExternalSequenceBlockHash),
start_position: None,
blocks: full_blocks
.into_iter()
.enumerate()
.map(|(i, global_hash)| KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(global_hash),
tokens_hash: LocalBlockHash(
local_hashes.get(i).copied().unwrap_or_default(),
),
mm_extra_info: None,
})
.collect(),
})
} else {
KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: full_blocks
.into_iter()
.map(ExternalSequenceBlockHash)
.collect(),
})
};
let event_id = self.next_event_id;
self.next_event_id += 1;
let event = KvCacheEvent {
event_id,
data: event_data,
dp_rank: self.dp_rank,
};
if let Err(e) = self.kv_event_publishers.publish_with_storage_tier(
event,
token_ids.as_deref(),
storage_tier,
) {
tracing::warn!("Failed to publish KV event: {e}");
}
}
#[cfg(feature = "kvbm-offload")]
fn publish_g2_router_events(&mut self, events: Vec<G2RouterEvent>) {
for event in events {
match event {
G2RouterEvent::Stored(meta) => {
self.publish_kv_event_for_tier(
vec![meta.seq_hash],
&[meta.local_hash],
meta.parent_hash,
true,
meta.token_ids.map(|ids| vec![ids]),
StorageTier::HostPinned,
);
}
G2RouterEvent::Removed { seq_hash } => {
self.publish_kv_event_for_tier(
vec![seq_hash],
&[],
None,
false,
None,
StorageTier::HostPinned,
);
}
}
}
}
pub fn process(&mut self, event: &MoveBlock) -> usize {
match event {
MoveBlock::Use(blocks, local_hashes, plhs, token_ids, parent) => self.process_use(
blocks,
local_hashes,
plhs,
token_ids.as_deref(),
parent.as_ref(),
),
MoveBlock::Deref(hashes) => {
self.process_deref(hashes);
1
}
MoveBlock::Promote(uuid, seq_hash, parent_hash, local_hash, plh, token_ids) => {
self.process_promote(
*uuid,
*seq_hash,
*parent_hash,
*local_hash,
*plh,
token_ids.clone(),
);
1
}
}
}
fn allocate_one_g1_slot(&mut self) -> Option<G1AllocationAttempt> {
let (mut alloc, evicted_plhs) = self.block_manager.allocate_blocks_with_evictions(1)?;
let mutable = alloc.pop().expect("allocate_blocks(1) returned no block");
if self.should_block_on_g1_offload(&evicted_plhs) {
return Some(G1AllocationAttempt::BlockedOnOffload {
evicted_plhs,
source_slots: vec![mutable],
});
}
Some(G1AllocationAttempt::Allocated {
mutable,
evicted_plhs,
})
}
#[cfg(feature = "kvbm-offload")]
fn reserve_swap_in_destination_slots(&mut self, count: usize) -> SwapInSlotReservation {
let mut destination_slots = Vec::with_capacity(count);
let mut evicted_plhs = Vec::new();
let mut blocked_evicted_plhs = Vec::new();
let mut blocked_source_slots = Vec::new();
while destination_slots.len() < count {
let Some(allocation) = self.allocate_one_g1_slot() else {
self.handle_evictions(evicted_plhs);
return SwapInSlotReservation::NoCapacity;
};
match allocation {
G1AllocationAttempt::Allocated {
mutable,
evicted_plhs: evicted,
} => {
evicted_plhs.extend(evicted);
destination_slots.push(mutable);
}
G1AllocationAttempt::BlockedOnOffload {
evicted_plhs: evicted,
source_slots,
} => {
blocked_evicted_plhs.extend(evicted);
blocked_source_slots.extend(source_slots);
let remaining_allocations = count - destination_slots.len();
self.extend_blocked_g1_offload_batch(
&mut blocked_evicted_plhs,
&mut blocked_source_slots,
remaining_allocations,
);
drop(destination_slots);
self.handle_evictions(evicted_plhs);
self.handle_evictions_with_source_slots(
blocked_evicted_plhs,
blocked_source_slots,
);
return SwapInSlotReservation::BlockedOnG1Offload;
}
}
}
self.handle_evictions(evicted_plhs);
SwapInSlotReservation::Reserved(destination_slots)
}
fn full_block_present_in_g1(
&self,
seq_hash: &SequenceHash,
plh: PositionalLineageHash,
) -> bool {
if self.active_full.contains_key(seq_hash) {
return true;
}
let presence = self
.block_manager
.block_registry()
.check_presence::<G1>(&[plh]);
presence.first().is_some_and(|(_, present)| *present)
}
fn pending_use_allocations(
&self,
blocks: &[UniqueBlock],
plhs: &[PositionalLineageHash],
mut plh_idx: usize,
) -> usize {
let mut allocations = 0usize;
for block in blocks {
match block {
UniqueBlock::FullBlock(seq_hash) => {
let Some(plh) = plhs.get(plh_idx).copied() else {
break;
};
plh_idx += 1;
if !self.full_block_present_in_g1(seq_hash, plh) {
allocations += 1;
}
}
UniqueBlock::PartialBlock(uuid) => {
if !self.active_partial.contains_key(uuid) {
allocations += 1;
}
}
}
}
allocations
}
fn extend_blocked_g1_offload_batch(
&mut self,
evicted_plhs: &mut Vec<PositionalLineageHash>,
source_slots: &mut Vec<MutableBlock<G1>>,
max_source_slots: usize,
) {
while source_slots.len() < max_source_slots {
let Some((mut alloc, evicted)) = self.block_manager.allocate_blocks_with_evictions(1)
else {
return;
};
let mutable = alloc.pop().expect("allocate_blocks(1) returned no block");
if !self.should_block_on_g1_offload(&evicted) {
drop(mutable);
return;
}
evicted_plhs.extend(evicted);
source_slots.push(mutable);
}
}
#[cfg(feature = "kvbm-offload")]
fn should_block_on_g1_offload(&self, evicted_plhs: &[PositionalLineageHash]) -> bool {
self.offload_engine.is_some() && !evicted_plhs.is_empty()
}
#[cfg(not(feature = "kvbm-offload"))]
fn should_block_on_g1_offload(&self, _evicted_plhs: &[PositionalLineageHash]) -> bool {
false
}
fn process_use(
&mut self,
blocks: &[UniqueBlock],
local_hashes: &[BlockHash],
plhs: &[PositionalLineageHash],
token_ids: Option<&[Vec<u32>]>,
parent: Option<&UniqueBlock>,
) -> usize {
let expected_full_blocks = blocks
.iter()
.filter(|b| matches!(b, UniqueBlock::FullBlock(_)))
.count();
assert_eq!(
plhs.len(),
expected_full_blocks,
"Use: plhs.len() must match FullBlock count in blocks"
);
assert!(
local_hashes.is_empty() || local_hashes.len() == expected_full_blocks,
"Use: local_hashes must be empty or match FullBlock count ({} vs {})",
local_hashes.len(),
expected_full_blocks,
);
assert!(
token_ids.is_none_or(|ids| ids.len() == expected_full_blocks),
"Use: token_ids must be absent or match FullBlock count ({} vs {})",
token_ids.map_or(0, |ids| ids.len()),
expected_full_blocks,
);
let mut blocks_stored = Vec::<SequenceHash>::new();
let mut stored_local_hashes = Vec::<BlockHash>::new();
let mut stored_token_ids: Option<Vec<Vec<u32>>> = token_ids.map(|_| Vec::new());
let mut evicted_plhs = Vec::<PositionalLineageHash>::new();
let mut blocked_evicted_plhs = Vec::<PositionalLineageHash>::new();
let mut blocked_source_slots = Vec::<MutableBlock<G1>>::new();
let mut parent_block: Option<&UniqueBlock> = parent;
let mut metadata_parent_hash: Option<SequenceHash> = match parent {
None => None,
Some(UniqueBlock::FullBlock(block)) => Some(*block),
Some(UniqueBlock::PartialBlock(_)) => panic!("parent block cannot be partial"),
};
let mut plh_idx = 0usize;
let mut allocated = 0usize;
for (i, block) in blocks.iter().enumerate() {
let mut current_full_idx: Option<usize> = None;
let outcome = match block {
UniqueBlock::FullBlock(seq_hash) => {
let full_idx = plh_idx;
current_full_idx = Some(full_idx);
if let Some(vec) = self.active_full.get_mut(seq_hash) {
let cloned = vec[0].clone();
vec.push(cloned);
plh_idx += 1;
UseOutcome::ActiveHit
} else {
let plh = plhs[plh_idx];
plh_idx += 1;
if let Some(immutable) =
self.block_manager.match_blocks(&[plh]).into_iter().next()
{
self.active_full
.entry(*seq_hash)
.or_default()
.push(immutable);
UseOutcome::InactiveHit
} else {
let Some(allocation) = self.allocate_one_g1_slot() else {
break; };
let mutable = match allocation {
G1AllocationAttempt::Allocated {
mutable,
evicted_plhs: evicted,
} => {
evicted_plhs.extend(evicted);
mutable
}
G1AllocationAttempt::BlockedOnOffload {
evicted_plhs: evicted,
source_slots,
} => {
blocked_evicted_plhs.extend(evicted);
blocked_source_slots.extend(source_slots);
let remaining_allocations = self.pending_use_allocations(
&blocks[i + 1..],
plhs,
plh_idx,
);
self.extend_blocked_g1_offload_batch(
&mut blocked_evicted_plhs,
&mut blocked_source_slots,
1 + remaining_allocations,
);
break;
}
};
let complete =
mutable.stage(plh, self.block_size).expect("stage failed");
let immutable = self.block_manager.register_block(complete);
let block_id = immutable.block_id();
self.active_full
.entry(*seq_hash)
.or_default()
.push(immutable);
self.registered_blocks.insert(
plh,
RegisteredBlockInfo {
seq_hash: *seq_hash,
block_id,
parent_hash: metadata_parent_hash,
local_hash: local_hashes
.get(full_idx)
.copied()
.unwrap_or_default(),
token_ids: token_ids.and_then(|ids| ids.get(full_idx).cloned()),
},
);
UseOutcome::NewStore
}
}
}
UniqueBlock::PartialBlock(uuid) => {
if self.active_partial.contains_key(uuid) {
UseOutcome::ActiveHit
} else {
let Some(allocation) = self.allocate_one_g1_slot() else {
break;
};
let mutable = match allocation {
G1AllocationAttempt::Allocated {
mutable,
evicted_plhs: evicted,
} => {
evicted_plhs.extend(evicted);
mutable
}
G1AllocationAttempt::BlockedOnOffload {
evicted_plhs: evicted,
source_slots,
} => {
blocked_evicted_plhs.extend(evicted);
blocked_source_slots.extend(source_slots);
let remaining_allocations =
self.pending_use_allocations(&blocks[i + 1..], plhs, plh_idx);
self.extend_blocked_g1_offload_batch(
&mut blocked_evicted_plhs,
&mut blocked_source_slots,
1 + remaining_allocations,
);
break;
}
};
self.active_partial.insert(*uuid, mutable);
UseOutcome::ActiveHit
}
}
};
match outcome {
UseOutcome::ActiveHit | UseOutcome::InactiveHit => {
if matches!(block, UniqueBlock::FullBlock(_)) {
parent_block = Some(block);
}
}
UseOutcome::NewStore => {
if let UniqueBlock::FullBlock(seq_hash) = block {
blocks_stored.push(*seq_hash);
let full_idx =
current_full_idx.expect("NewStore is only emitted for full blocks");
if let Some(lh) = local_hashes.get(full_idx) {
stored_local_hashes.push(*lh);
}
if let (Some(ref mut stids), Some(ids)) =
(stored_token_ids.as_mut(), token_ids)
{
stids.push(ids[full_idx].clone());
}
}
}
}
if let UniqueBlock::FullBlock(seq_hash) = block {
metadata_parent_hash = Some(*seq_hash);
}
allocated += 1;
}
let parent_hash = match parent_block {
None => None,
Some(UniqueBlock::FullBlock(block)) => Some(*block),
Some(UniqueBlock::PartialBlock(_)) => panic!("parent block cannot be partial"),
};
self.publish_kv_event(
blocks_stored,
&stored_local_hashes,
parent_hash,
true,
stored_token_ids,
);
self.handle_evictions(evicted_plhs);
self.handle_evictions_with_source_slots(blocked_evicted_plhs, blocked_source_slots);
allocated
}
fn handle_evictions(&mut self, evicted_plhs: Vec<PositionalLineageHash>) {
self.handle_evictions_with_source_slots(evicted_plhs, Vec::new());
}
fn handle_evictions_with_source_slots(
&mut self,
evicted_plhs: Vec<PositionalLineageHash>,
source_slots: Vec<MutableBlock<G1>>,
) {
if evicted_plhs.is_empty() {
drop(source_slots);
return;
}
let mut evicted_seq_hashes = Vec::with_capacity(evicted_plhs.len());
#[cfg(feature = "kvbm-offload")]
let mut offload_blocks = Vec::with_capacity(evicted_plhs.len());
for plh in evicted_plhs {
let Some(info) = self.registered_blocks.remove(&plh) else {
continue;
};
evicted_seq_hashes.push(info.seq_hash);
#[cfg(feature = "kvbm-offload")]
offload_blocks.push(G2OffloadBlock {
block_id: info.block_id,
plh,
metadata: G2BlockEventMetadata {
seq_hash: info.seq_hash,
parent_hash: info.parent_hash,
local_hash: info.local_hash,
token_ids: info.token_ids,
},
});
}
#[cfg(feature = "kvbm-offload")]
let g2_events = {
if !source_slots.is_empty() && source_slots.len() != offload_blocks.len() {
tracing::warn!(
source_slots = source_slots.len(),
offload_blocks = offload_blocks.len(),
"kvbm-offload: source-slot hold count does not match offload block count"
);
}
self.enqueue_evictions_to_g2(&offload_blocks, source_slots)
};
#[cfg(not(feature = "kvbm-offload"))]
drop(source_slots);
if !evicted_seq_hashes.is_empty() {
self.publish_kv_event(evicted_seq_hashes, &[], None, false, None);
}
#[cfg(feature = "kvbm-offload")]
self.publish_g2_router_events(g2_events);
}
fn process_deref(&mut self, blocks: &[UniqueBlock]) {
for block in blocks {
match block {
UniqueBlock::PartialBlock(uuid) => {
self.active_partial
.remove(uuid)
.expect("Deref: partial block not in active pool");
}
UniqueBlock::FullBlock(seq_hash) => {
let vec = self
.active_full
.get_mut(seq_hash)
.expect("Deref: full block not in active pool");
vec.pop();
if vec.is_empty() {
self.active_full.remove(seq_hash);
}
}
}
}
}
fn process_promote(
&mut self,
uuid: Uuid,
seq_hash: SequenceHash,
parent_hash: Option<u64>,
local_hash: BlockHash,
plh: PositionalLineageHash,
token_ids: Option<Vec<u32>>,
) {
let mutable = self
.active_partial
.remove(&uuid)
.expect("Promote: partial block not found");
let is_new = if let Some(vec) = self.active_full.get_mut(&seq_hash) {
drop(mutable);
let existing = vec[0].clone();
vec.push(existing);
false
} else if let Some(immutable) = self.block_manager.match_blocks(&[plh]).into_iter().next() {
drop(mutable);
self.active_full.insert(seq_hash, vec![immutable]);
false
} else {
let complete = mutable
.stage(plh, self.block_size)
.expect("stage failed during promote");
let immutable = self.block_manager.register_block(complete);
let block_id = immutable.block_id();
self.active_full.insert(seq_hash, vec![immutable]);
self.registered_blocks.insert(
plh,
RegisteredBlockInfo {
seq_hash,
block_id,
parent_hash,
local_hash,
token_ids: token_ids.clone(),
},
);
true
};
if is_new {
self.publish_kv_event(
vec![seq_hash],
&[local_hash],
parent_hash,
true,
token_ids.map(|t| vec![t]),
);
}
}
pub fn num_active_blocks(&self) -> usize {
self.block_manager.total_blocks() - self.block_manager.available_blocks()
}
pub fn num_active_block_refs(&self) -> usize {
self.active_partial.len() + self.active_full.values().map(|v| v.len()).sum::<usize>()
}
pub fn get_active_perc(&self) -> f64 {
self.num_active_blocks() as f64 / self.max_capacity as f64
}
pub fn num_inactive_blocks(&self) -> usize {
self.block_manager.metrics().snapshot().inactive_pool_size as usize
}
pub fn max_capacity(&self) -> usize {
self.max_capacity
}
pub fn block_size(&self) -> usize {
self.block_size
}
pub fn dp_rank(&self) -> u32 {
self.dp_rank
}
pub fn get_prefill_cost(&self, sequence: &ActiveSequence) -> PrefillCost {
let seq_blocks = sequence.unique_blocks();
let overlap_blocks = if sequence.enable_prefix_caching() {
let plhs = sequence.positional_lineage_hashes();
let mut overlap = 0;
for (i, block) in seq_blocks.iter().enumerate() {
match block {
UniqueBlock::FullBlock(seq_hash) => {
if self.active_full.contains_key(seq_hash) {
overlap += 1;
continue;
}
let Some(plh) = plhs.get(i) else {
break;
};
if self.registered_blocks.contains_key(plh) {
overlap += 1;
} else {
break;
}
}
UniqueBlock::PartialBlock(_) => break,
}
}
overlap
} else {
0
};
let new_blocks = seq_blocks.len() - overlap_blocks;
let cached_tokens = (overlap_blocks * self.block_size).min(sequence.num_input_tokens());
let new_tokens = sequence.num_input_tokens() - cached_tokens;
PrefillCost {
new_blocks,
new_tokens,
cached_tokens,
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
use crate::common::protocols::KvCacheEventSink;
#[derive(Default)]
struct CapturingSink {
events: Mutex<Vec<KvCacheEvent>>,
}
impl KvCacheEventSink for CapturingSink {
fn publish(&self, event: KvCacheEvent) -> anyhow::Result<()> {
self.events.lock().unwrap().push(event);
Ok(())
}
}
fn make_mgr(capacity: usize, block_size: usize) -> KvManager {
KvManager::new_with_event_sink(capacity, block_size, KvEventPublishers::default(), 0)
}
fn make_mgr_capturing(capacity: usize, block_size: usize) -> (KvManager, Arc<CapturingSink>) {
let sink = Arc::new(CapturingSink::default());
let publishers = KvEventPublishers::new(Some(sink.clone() as _), None);
(
KvManager::new_with_event_sink(capacity, block_size, publishers, 0),
sink,
)
}
fn make_mgr_capturing_with_backend(
capacity: usize,
block_size: usize,
backend: MockerEvictionBackend,
) -> (KvManager, Arc<CapturingSink>) {
let sink = Arc::new(CapturingSink::default());
let publishers = KvEventPublishers::new(Some(sink.clone() as _), None);
(
KvManager::new_with_eviction_backend(capacity, block_size, publishers, 0, backend),
sink,
)
}
fn plh(v: u64) -> PositionalLineageHash {
PositionalLineageHash::new(v, None, 0)
}
fn lineage_plh(id: u64) -> PositionalLineageHash {
match id {
0 => PositionalLineageHash::new(0, None, 0),
1 => PositionalLineageHash::new(1, Some(0), 1),
2 => PositionalLineageHash::new(2, Some(1), 2),
3 => PositionalLineageHash::new(3, Some(2), 3),
4 => PositionalLineageHash::new(4, Some(3), 4),
5 => PositionalLineageHash::new(5, Some(1), 2),
6 => PositionalLineageHash::new(6, Some(5), 3),
7 => PositionalLineageHash::new(7, Some(2), 3),
8 => PositionalLineageHash::new(8, Some(7), 4),
9 => PositionalLineageHash::new(9, Some(8), 5),
10 => PositionalLineageHash::new(10, None, 0),
11 => PositionalLineageHash::new(11, Some(10), 1),
12 => PositionalLineageHash::new(12, Some(11), 2),
13 => PositionalLineageHash::new(13, None, 0),
_ => plh(id),
}
}
fn use_full(mgr: &mut KvManager, seq_hash: u64, p: PositionalLineageHash) -> usize {
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(seq_hash)],
vec![],
vec![p],
None,
None,
))
}
fn use_partial(mgr: &mut KvManager, uuid: Uuid) -> usize {
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::PartialBlock(uuid)],
vec![],
vec![],
None,
None,
))
}
fn deref_full(mgr: &mut KvManager, seq_hash: u64) {
mgr.process(&MoveBlock::Deref(vec![UniqueBlock::FullBlock(seq_hash)]));
}
fn deref_partial(mgr: &mut KvManager, uuid: Uuid) {
mgr.process(&MoveBlock::Deref(vec![UniqueBlock::PartialBlock(uuid)]));
}
#[test]
fn test_use_single_full_block() {
let mut mgr = make_mgr(10, 16);
assert_eq!(use_full(&mut mgr, 1, plh(100)), 1);
assert_eq!(mgr.num_active_blocks(), 1);
}
#[test]
fn use_rejects_short_token_ids_before_mutating_state() {
let (mut mgr, sink) = make_mgr_capturing(10, 4);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(1), UniqueBlock::FullBlock(2)],
vec![101, 102],
vec![plh(100), plh(200)],
Some(vec![vec![1, 2, 3, 4]]),
None,
));
}));
assert!(result.is_err());
assert_eq!(mgr.num_active_blocks(), 0);
assert!(mgr.active_full.is_empty());
assert!(sink.events.lock().unwrap().is_empty());
}
#[test]
fn test_duplicate_use_bumps_refcount() {
let mut mgr = make_mgr(10, 16);
use_full(&mut mgr, 1, plh(100));
use_full(&mut mgr, 1, plh(100));
assert_eq!(mgr.num_active_blocks(), 1);
assert_eq!(mgr.num_active_block_refs(), 2);
}
#[test]
fn test_capacity_exhaustion_returns_partial() {
let mut mgr = make_mgr(4, 16);
for i in 0..4 {
assert_eq!(use_full(&mut mgr, i, plh(i + 100)), 1);
}
assert_eq!(use_full(&mut mgr, 4, plh(500)), 0);
}
#[test]
fn test_deref_returns_to_inactive() {
let mut mgr = make_mgr(4, 16);
use_full(&mut mgr, 1, plh(100));
deref_full(&mut mgr, 1);
assert_eq!(mgr.num_active_blocks(), 0);
}
#[test]
fn test_inactive_reuse_via_match_blocks() {
let mut mgr = make_mgr(10, 16);
let p = plh(100);
use_full(&mut mgr, 1, p);
deref_full(&mut mgr, 1);
assert_eq!(use_full(&mut mgr, 2, p), 1);
}
#[test]
fn test_eviction_frees_inactive_for_new_allocation() {
let mut mgr = make_mgr(4, 16);
for i in 0..4 {
use_full(&mut mgr, i, plh(i + 100));
}
for i in 0..4 {
deref_full(&mut mgr, i);
}
for i in 10..14 {
assert_eq!(use_full(&mut mgr, i, plh(i + 1000)), 1);
}
assert_eq!(mgr.num_active_blocks(), 4);
}
#[test]
fn test_promote_basic() {
let mut mgr = make_mgr(10, 16);
let uuid = Uuid::new_v4();
use_partial(&mut mgr, uuid);
mgr.process(&MoveBlock::Promote(uuid, 42, None, 0, plh(500), None));
assert_eq!(mgr.num_active_blocks(), 1);
assert!(mgr.active_partial.is_empty());
assert!(mgr.active_full.contains_key(&42));
}
#[test]
#[should_panic(expected = "Promote: partial block not found")]
fn test_promote_nonexistent_panics() {
let mut mgr = make_mgr(10, 16);
mgr.process(&MoveBlock::Promote(
Uuid::new_v4(),
42,
None,
0,
plh(500),
None,
));
}
#[test]
fn test_deref_partial_returns_to_reset() {
let mut mgr = make_mgr(10, 16);
let uuid = Uuid::new_v4();
use_partial(&mut mgr, uuid);
assert_eq!(mgr.active_partial.len(), 1);
deref_partial(&mut mgr, uuid);
assert!(mgr.active_partial.is_empty());
assert_eq!(mgr.num_active_block_refs(), 0);
}
#[test]
fn test_prefill_cost_no_overlap() {
let mgr = make_mgr(10, 16);
let tokens: Vec<u32> = (0..35).collect();
let seq = ActiveSequence::new(tokens, 10, Some(16), true, false);
let cost = mgr.get_prefill_cost(&seq);
assert_eq!(cost.new_blocks, seq.unique_blocks().len());
assert_eq!(cost.new_tokens, 35);
}
#[test]
fn test_eviction_backend_lru_and_multi_lru() {
for backend in [MockerEvictionBackend::Lru, MockerEvictionBackend::MultiLru] {
let mut mgr = KvManager::new_with_eviction_backend(
4,
16,
KvEventPublishers::default(),
0,
backend,
);
for i in 0..4u64 {
assert_eq!(use_full(&mut mgr, i, plh(i + 100)), 1);
}
for i in 0..4u64 {
deref_full(&mut mgr, i);
}
for i in 10..14u64 {
assert_eq!(
use_full(&mut mgr, i, plh(i + 1000)),
1,
"backend={backend:?}"
);
}
assert_eq!(mgr.num_active_blocks(), 4);
}
}
#[test]
fn test_failure_on_max_capacity() {
fn use_batch(mgr: &mut KvManager, ids: &[u64]) -> usize {
let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
let plhs: Vec<_> = ids.iter().map(|&id| plh(id)).collect();
mgr.process(&MoveBlock::Use(blocks, vec![], plhs, None, None))
}
let mut mgr = make_mgr(10, 16);
let ids: Vec<u64> = (0..10).collect();
assert_eq!(use_batch(&mut mgr, &ids), 10, "all 10 should allocate");
assert_eq!(mgr.num_active_blocks(), 10);
assert_eq!(
use_batch(&mut mgr, &[10]),
0,
"over-capacity Use must return 0"
);
}
#[test]
fn test_block_lifecycle_stringent() {
fn use_blocks(mgr: &mut KvManager, ids: &[u64]) -> usize {
let blocks: Vec<_> = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
let plhs: Vec<_> = ids.iter().map(|&id| lineage_plh(id)).collect();
mgr.process(&MoveBlock::Use(blocks, vec![], plhs, None, None))
}
fn deref_blocks(mgr: &mut KvManager, ids: &[u64]) {
let blocks = ids.iter().map(|&id| UniqueBlock::FullBlock(id)).collect();
mgr.process(&MoveBlock::Deref(blocks));
}
fn refcount(mgr: &KvManager, id: u64) -> usize {
mgr.active_full.get(&id).map(|v| v.len()).unwrap_or(0)
}
fn assert_active(mgr: &KvManager, expected: &[(u64, usize)]) {
let distinct = expected.len();
let total_refs: usize = expected.iter().map(|&(_, r)| r).sum();
assert_eq!(
mgr.num_active_blocks(),
distinct,
"distinct active-block count mismatch; expected={expected:?}"
);
assert_eq!(
mgr.num_active_block_refs(),
total_refs,
"active handle-refcount mismatch; expected={expected:?}"
);
for &(id, r) in expected {
assert_eq!(refcount(mgr, id), r, "block {id} refcount mismatch");
}
}
fn assert_inactive_blocks(mgr: &KvManager, expected_ids: &[u64]) {
assert_eq!(
mgr.num_inactive_blocks(),
expected_ids.len(),
"inactive count mismatch; expected={expected_ids:?}"
);
let plhs: Vec<_> = expected_ids.iter().map(|&id| lineage_plh(id)).collect();
let presence = mgr
.block_manager
.block_registry()
.check_presence::<G1>(&plhs);
for ((_, present), &id) in presence.iter().zip(expected_ids.iter()) {
assert!(
*present,
"block {id} expected in inactive pool, not found in registry"
);
assert!(
!mgr.active_full.contains_key(&id),
"block {id} expected inactive but is in active pool"
);
}
}
fn drain_events(sink: &Arc<CapturingSink>) -> Vec<KvCacheEvent> {
std::mem::take(&mut *sink.events.lock().unwrap())
}
fn assert_stored_event(
event: &KvCacheEvent,
expected_blocks: &[u64],
expected_parent: Option<u64>,
) {
let KvCacheEventData::Stored(data) = &event.data else {
panic!("expected Stored event, got {:?}", event.data);
};
let actual_blocks: Vec<u64> =
data.blocks.iter().map(|block| block.block_hash.0).collect();
assert_eq!(actual_blocks, expected_blocks, "stored blocks mismatch");
assert_eq!(
data.parent_hash.map(|hash| hash.0),
expected_parent,
"stored parent_hash mismatch"
);
}
fn assert_removed_event(event: &KvCacheEvent, expected_blocks: &[u64]) {
let KvCacheEventData::Removed(data) = &event.data else {
panic!("expected Removed event, got {:?}", event.data);
};
let actual_blocks: Vec<u64> = data.block_hashes.iter().map(|hash| hash.0).collect();
assert_eq!(actual_blocks, expected_blocks, "removed blocks mismatch");
}
let (mut mgr, sink) =
make_mgr_capturing_with_backend(10, 16, MockerEvictionBackend::Lineage);
assert_eq!(use_blocks(&mut mgr, &[0, 1, 2, 3, 4]), 5);
let events = drain_events(&sink);
assert_eq!(events.len(), 1, "expected one Stored event for [0..=4]");
assert_stored_event(&events[0], &[0, 1, 2, 3, 4], None);
assert_eq!(use_blocks(&mut mgr, &[0, 1, 5, 6]), 4);
let events = drain_events(&sink);
assert_eq!(events.len(), 1, "expected one Stored event for [5, 6]");
assert_stored_event(&events[0], &[5, 6], Some(1));
assert_active(
&mgr,
&[(0, 2), (1, 2), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1)],
);
deref_blocks(&mut mgr, &[4, 3, 2, 1, 0]);
let events = drain_events(&sink);
assert!(events.is_empty(), "Deref should not emit KV events");
assert_active(&mgr, &[(0, 1), (1, 1), (5, 1), (6, 1)]);
assert_inactive_blocks(&mgr, &[2, 3, 4]);
deref_blocks(&mut mgr, &[6, 5, 1, 0]);
let events = drain_events(&sink);
assert!(events.is_empty(), "Deref should not emit KV events");
assert_active(&mgr, &[]);
assert_inactive_blocks(&mgr, &[0, 1, 2, 3, 4, 5, 6]);
assert_eq!(use_blocks(&mut mgr, &[0, 1, 2, 7, 8, 9]), 6);
let events = drain_events(&sink);
assert_eq!(events.len(), 1, "expected one Stored event for [7, 8, 9]");
assert_stored_event(&events[0], &[7, 8, 9], Some(2));
assert_active(&mgr, &[(0, 1), (1, 1), (2, 1), (7, 1), (8, 1), (9, 1)]);
assert_inactive_blocks(&mgr, &[3, 4, 5, 6]);
assert_eq!(use_blocks(&mut mgr, &[10, 11, 12]), 3);
let events = drain_events(&sink);
assert_eq!(
events.len(),
2,
"expected Stored + Removed for [10, 11, 12]"
);
assert_stored_event(&events[0], &[10, 11, 12], None);
assert_removed_event(&events[1], &[4, 3, 6]);
assert_active(
&mgr,
&[
(0, 1),
(1, 1),
(2, 1),
(7, 1),
(8, 1),
(9, 1),
(10, 1),
(11, 1),
(12, 1),
],
);
assert_inactive_blocks(&mgr, &[5]);
assert_eq!(use_blocks(&mut mgr, &[13]), 1);
let events = drain_events(&sink);
assert_eq!(events.len(), 2, "expected Stored + Removed for [13]");
assert_stored_event(&events[0], &[13], None);
assert_removed_event(&events[1], &[5]);
assert_active(
&mgr,
&[
(0, 1),
(1, 1),
(2, 1),
(7, 1),
(8, 1),
(9, 1),
(10, 1),
(11, 1),
(12, 1),
(13, 1),
],
);
assert_eq!(mgr.num_inactive_blocks(), 0);
}
#[test]
fn test_chunked_prefill_parent_hash() {
let block_size = 64;
let tokens: Vec<u32> = (0..512).collect(); let mut seq = ActiveSequence::new(tokens, 100, Some(block_size), true, false);
let (mut mgr, sink) = make_mgr_capturing(256, block_size);
let signal = seq.prepare_allocation(256).unwrap();
mgr.process(&signal);
seq.commit_allocation(256);
let signal = seq.prepare_allocation(512).unwrap();
mgr.process(&signal);
seq.commit_allocation(512);
let events = sink.events.lock().unwrap();
assert_eq!(events.len(), 2, "expected two Stored events");
let KvCacheEventData::Stored(ref store1) = events[0].data else {
panic!("expected Stored event");
};
assert!(
store1.parent_hash.is_none(),
"first chunk should have no parent_hash"
);
let KvCacheEventData::Stored(ref store2) = events[1].data else {
panic!("expected Stored event");
};
let UniqueBlock::FullBlock(expected_hash) = seq.unique_blocks()[3].clone() else {
panic!("expected FullBlock at index 3");
};
assert_eq!(
store2.parent_hash,
Some(ExternalSequenceBlockHash(expected_hash)),
"second chunk's parent_hash should be block 3's seq_hash"
);
}
#[test]
fn test_repreempt_after_partial_recompute_only_frees_reallocated_blocks() {
let mut seq = ActiveSequence::new((0..6).collect(), 16, Some(4), true, false);
let mut mgr = make_mgr(16, 4);
let signal = seq.take_creation_signal().unwrap();
assert_eq!(mgr.process(&signal), 2);
for _ in 0..3 {
let signals = seq.generate();
for signal in &signals {
mgr.process(signal);
}
if seq.generated_tokens() < seq.max_output_tokens() {
seq.commit_allocation(seq.len());
}
}
assert_eq!(mgr.num_active_blocks(), 3);
let first_reset = seq.reset_with_signal();
for signal in &first_reset {
mgr.process(signal);
}
assert_eq!(mgr.num_active_blocks(), 0);
let prompt_only = seq.prepare_allocation(seq.num_input_tokens()).unwrap();
assert_eq!(mgr.process(&prompt_only), 2);
seq.commit_allocation(seq.num_input_tokens());
assert_eq!(mgr.num_active_blocks(), 2);
let second_reset = seq.reset_with_signal();
for signal in &second_reset {
mgr.process(signal);
}
assert_eq!(mgr.num_active_blocks(), 0);
}
#[test]
fn test_inactive_hit_does_not_republish_stored() {
let (mut mgr, sink) = make_mgr_capturing(4, 16);
use_full(&mut mgr, 1, plh(100));
deref_full(&mut mgr, 1);
use_full(&mut mgr, 1, plh(100));
let events = sink.events.lock().unwrap();
let stored_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Stored(_)))
.count();
let removed_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Removed(_)))
.count();
assert_eq!(stored_count, 1, "reactivation must not re-emit Stored");
assert_eq!(removed_count, 0, "Deref must not emit Removed");
}
#[test]
fn test_stored_suffix_anchors_to_last_reused_block() {
let (mut mgr, sink) = make_mgr_capturing(8, 16);
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(10), UniqueBlock::FullBlock(11)],
vec![],
vec![plh(10), plh(11)],
None,
None,
));
deref_full(&mut mgr, 10);
deref_full(&mut mgr, 11);
sink.events.lock().unwrap().clear();
mgr.process(&MoveBlock::Use(
vec![
UniqueBlock::FullBlock(10),
UniqueBlock::FullBlock(11),
UniqueBlock::FullBlock(12),
],
vec![],
vec![plh(10), plh(11), plh(12)],
None,
None, ));
let events = sink.events.lock().unwrap();
assert_eq!(events.len(), 1, "only new suffix should fire a Stored");
let KvCacheEventData::Stored(ref data) = events[0].data else {
panic!("expected Stored");
};
assert_eq!(data.blocks.len(), 1, "Stored must only include C");
assert_eq!(data.blocks[0].block_hash, ExternalSequenceBlockHash(12));
assert_eq!(
data.parent_hash,
Some(ExternalSequenceBlockHash(11)),
"parent_hash must anchor to last reused full block (B=11)"
);
}
#[cfg(feature = "kvbm-offload")]
#[test]
fn test_swap_in_registration_anchors_suffix_to_reused_prefix_parent() {
let (mut mgr, sink) = make_mgr_capturing(8, 16);
let slots = match mgr.reserve_swap_in_destination_slots(2) {
SwapInSlotReservation::Reserved(slots) => slots,
SwapInSlotReservation::BlockedOnG1Offload => {
panic!("fresh manager should not need G1 offload")
}
SwapInSlotReservation::NoCapacity => panic!("fresh manager should have capacity"),
};
let entries = vec![
SwapInRegistrationBlock {
seq_hash: 12,
plh: plh(12),
local_hash: 120,
token_ids: Some(vec![1; 16]),
},
SwapInRegistrationBlock {
seq_hash: 13,
plh: plh(13),
local_hash: 130,
token_ids: Some(vec![2; 16]),
},
];
let entries_len = entries.len();
let outcome = mgr.register_swapped_in_blocks(entries, Some(11), slots);
assert_eq!(
outcome.consumed_entries, entries_len,
"all reserved swap-in slots should be consumed"
);
let events = sink.events.lock().unwrap();
assert_eq!(events.len(), 1, "swap-in suffix should publish one Stored");
let KvCacheEventData::Stored(ref data) = events[0].data else {
panic!("expected Stored");
};
assert_eq!(
data.parent_hash,
Some(ExternalSequenceBlockHash(11)),
"swapped-in suffix must anchor to the last reused prefix block"
);
let blocks: Vec<u64> = data.blocks.iter().map(|block| block.block_hash.0).collect();
assert_eq!(blocks, vec![12, 13]);
let local_hashes: Vec<u64> = data
.blocks
.iter()
.map(|block| block.tokens_hash.0)
.collect();
assert_eq!(local_hashes, vec![120, 130]);
assert_eq!(
mgr.num_inactive_blocks(),
2,
"registered swap-in blocks should land in inactive G1"
);
}
#[test]
fn test_shared_prefix_distinct_vs_refcount() {
let mut mgr = make_mgr(8, 16);
mgr.process(&MoveBlock::Use(
vec![
UniqueBlock::FullBlock(10),
UniqueBlock::FullBlock(11),
UniqueBlock::FullBlock(12),
],
vec![],
vec![plh(10), plh(11), plh(12)],
None,
None,
));
assert_eq!(mgr.num_active_blocks(), 3);
assert_eq!(mgr.num_active_block_refs(), 3);
mgr.process(&MoveBlock::Use(
vec![
UniqueBlock::FullBlock(10),
UniqueBlock::FullBlock(11),
UniqueBlock::FullBlock(13),
],
vec![],
vec![plh(10), plh(11), plh(13)],
None,
None,
));
assert_eq!(
mgr.num_active_blocks(),
4,
"shared prefix must not inflate distinct count"
);
assert_eq!(
mgr.num_active_block_refs(),
6,
"handle count should reflect per-request refcount"
);
}
#[test]
fn test_random_plh_stable_across_preempt_retry() {
let block_size = 16;
let tokens: Vec<u32> = (0..64).collect();
let mut seq = ActiveSequence::new(tokens, 100, Some(block_size), false, false);
let (mut mgr, sink) = make_mgr_capturing(8, block_size);
let signal = seq.take_creation_signal().unwrap();
assert_eq!(mgr.process(&signal), 4);
assert_eq!(mgr.num_active_blocks(), 4);
let reset_signals = seq.reset_with_signal();
for signal in &reset_signals {
mgr.process(signal);
}
assert_eq!(mgr.num_active_blocks(), 0);
assert_eq!(mgr.num_inactive_blocks(), 4);
let signal = seq.take_creation_signal().unwrap();
assert_eq!(mgr.process(&signal), 4);
assert_eq!(mgr.num_active_blocks(), 4);
assert_eq!(mgr.num_inactive_blocks(), 0);
let events = sink.events.lock().unwrap();
let stored_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Stored(_)))
.count();
assert_eq!(
stored_count, 1,
"preempted request should self-match on re-admit (no duplicate Stored)"
);
}
#[test]
fn test_eviction_emits_exact_removed_event() {
let (mut mgr, sink) = make_mgr_capturing(2, 16);
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(10), UniqueBlock::FullBlock(11)],
vec![],
vec![plh(10), plh(11)],
None,
None,
));
deref_full(&mut mgr, 10);
deref_full(&mut mgr, 11);
assert_eq!(mgr.num_active_blocks(), 0);
assert_eq!(mgr.num_inactive_blocks(), 2);
sink.events.lock().unwrap().clear();
use_full(&mut mgr, 12, plh(12));
let events = sink.events.lock().unwrap();
let removed: Vec<u64> = events
.iter()
.filter_map(|e| match &e.data {
KvCacheEventData::Removed(data) => Some(
data.block_hashes
.iter()
.map(|ExternalSequenceBlockHash(h)| *h)
.collect::<Vec<_>>(),
),
_ => None,
})
.flatten()
.collect();
let stored_count = events
.iter()
.filter(|e| matches!(e.data, KvCacheEventData::Stored(_)))
.count();
assert_eq!(
removed.len(),
1,
"exactly one block should be reported as evicted"
);
assert!(
removed[0] == 10 || removed[0] == 11,
"evicted hash must be one we seeded ({}), got {}",
"10 or 11",
removed[0]
);
assert_eq!(stored_count, 1, "one Stored event for the fresh block 12");
}
#[cfg(feature = "kvbm-offload")]
mod offload {
use super::*;
use crate::common::protocols::{RawKvEvent, RawKvEventSink};
use crate::kvbm_offload::{KvbmOffloadConfig, MockOffloadEngine};
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct TierCapturingSink {
events: Mutex<Vec<(StorageTier, KvCacheEvent)>>,
}
impl TierCapturingSink {
fn clear(&self) {
self.events.lock().unwrap().clear();
}
fn take(&self) -> Vec<(StorageTier, KvCacheEvent)> {
std::mem::take(&mut *self.events.lock().unwrap())
}
}
impl KvCacheEventSink for TierCapturingSink {
fn publish(&self, event: KvCacheEvent) -> anyhow::Result<()> {
self.publish_with_storage_tier(event, StorageTier::Device)
}
fn publish_with_storage_tier(
&self,
event: KvCacheEvent,
storage_tier: StorageTier,
) -> anyhow::Result<()> {
self.events.lock().unwrap().push((storage_tier, event));
Ok(())
}
}
#[derive(Default)]
struct RawCapturingSink {
events: Mutex<Vec<RawKvEvent>>,
}
impl RawCapturingSink {
fn clear(&self) {
self.events.lock().unwrap().clear();
}
fn take(&self) -> Vec<RawKvEvent> {
std::mem::take(&mut *self.events.lock().unwrap())
}
}
impl RawKvEventSink for RawCapturingSink {
fn publish(&self, event: RawKvEvent) -> anyhow::Result<()> {
self.events.lock().unwrap().push(event);
Ok(())
}
}
fn make_mgr_tier_capturing(
capacity: usize,
block_size: usize,
) -> (KvManager, Arc<TierCapturingSink>) {
let sink = Arc::new(TierCapturingSink::default());
let publishers = KvEventPublishers::new(Some(sink.clone() as _), None);
(
KvManager::new_with_event_sink(capacity, block_size, publishers, 0),
sink,
)
}
fn make_mgr_raw_capturing(
capacity: usize,
block_size: usize,
) -> (KvManager, Arc<RawCapturingSink>) {
let sink = Arc::new(RawCapturingSink::default());
let publishers = KvEventPublishers::new(None, Some(sink.clone() as _));
(
KvManager::new_with_event_sink(capacity, block_size, publishers, 0),
sink,
)
}
fn attach_test_offload_engine(
mgr: &mut KvManager,
num_g2_blocks: usize,
block_size_tokens: usize,
) {
let config = KvbmOffloadConfig {
num_g2_blocks,
block_size_tokens,
block_size_bytes: Some(1_000_000),
bandwidth_g1_to_g2_gbps: 1.0,
..Default::default()
};
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let mut engine = rt
.block_on(MockOffloadEngine::new(config))
.expect("engine build");
engine.attach_runtime(rt);
mgr.attach_new_offload_engine(engine);
}
fn use_full_with_hash(
mgr: &mut KvManager,
seq_hash: u64,
p: PositionalLineageHash,
local_hash: BlockHash,
token_ids: Vec<u32>,
) -> usize {
mgr.process(&MoveBlock::Use(
vec![UniqueBlock::FullBlock(seq_hash)],
vec![local_hash],
vec![p],
Some(vec![token_ids]),
None,
))
}
fn has_removed(
events: &[(StorageTier, KvCacheEvent)],
storage_tier: StorageTier,
seq_hash: u64,
) -> bool {
events.iter().any(|(tier, event)| {
*tier == storage_tier
&& matches!(
&event.data,
KvCacheEventData::Removed(data)
if data.block_hashes.contains(&ExternalSequenceBlockHash(seq_hash))
)
})
}
fn stored_block(
events: &[(StorageTier, KvCacheEvent)],
storage_tier: StorageTier,
seq_hash: u64,
) -> Option<KvCacheStoredBlockData> {
events.iter().find_map(|(tier, event)| {
if *tier != storage_tier {
return None;
}
let KvCacheEventData::Stored(data) = &event.data else {
return None;
};
data.blocks
.iter()
.find(|block| block.block_hash == ExternalSequenceBlockHash(seq_hash))
.cloned()
})
}
fn raw_stored_with_token_ids(
events: &[RawKvEvent],
storage_tier: StorageTier,
seq_hash: u64,
token_ids: &[u32],
) -> bool {
let expected_token_ids = vec![token_ids.to_vec()];
events.iter().any(|event| {
event.storage_tier == storage_tier
&& event.block_token_ids.as_ref() == Some(&expected_token_ids)
&& matches!(
&event.event.data,
KvCacheEventData::Stored(data)
if data.blocks.iter().any(|block| {
block.block_hash == ExternalSequenceBlockHash(seq_hash)
})
)
})
}
#[test]
fn fresh_manager_has_no_offload_engine() {
let mgr = make_mgr(8, 4);
assert!(!mgr.has_offload_engine());
}
#[tokio::test]
async fn attach_new_offload_engine_wires_in_after_construction() {
let mut mgr = make_mgr(16, 4);
assert!(!mgr.has_offload_engine());
let engine = MockOffloadEngine::new(KvbmOffloadConfig::default())
.await
.expect("engine build");
mgr.attach_new_offload_engine(engine);
assert!(mgr.has_offload_engine());
}
#[test]
fn g2_completion_publishes_host_pinned_stored_event() {
let (mut mgr, sink) = make_mgr_tier_capturing(1, 4);
attach_test_offload_engine(&mut mgr, 1, 4);
assert_eq!(
use_full_with_hash(&mut mgr, 1, plh(1), 101, vec![1, 2, 3, 4]),
1
);
deref_full(&mut mgr, 1);
sink.clear();
assert_eq!(
use_full_with_hash(&mut mgr, 2, plh(2), 202, vec![5, 6, 7, 8]),
0
);
let immediate = sink.take();
assert!(
has_removed(&immediate, StorageTier::Device, 1),
"G1 eviction should publish a Device-tier Removed event"
);
assert!(
stored_block(&immediate, StorageTier::HostPinned, 1).is_none(),
"G2 Stored must not publish before the transfer completes"
);
let deadline = mgr
.earliest_offload_deadline()
.expect("G1→G2 offload should expose a completion deadline");
mgr.tick_offload_engine(deadline);
let completed = sink.take();
let stored = stored_block(&completed, StorageTier::HostPinned, 1)
.expect("G2 completion should publish HostPinned Stored");
assert_eq!(stored.tokens_hash, LocalBlockHash(101));
}
#[test]
fn g2_eviction_publishes_host_pinned_removed_event() {
let (mut mgr, sink) = make_mgr_tier_capturing(1, 4);
attach_test_offload_engine(&mut mgr, 1, 4);
assert_eq!(
use_full_with_hash(&mut mgr, 1, plh(1), 101, vec![1, 2, 3, 4]),
1
);
deref_full(&mut mgr, 1);
assert_eq!(
use_full_with_hash(&mut mgr, 2, plh(2), 202, vec![5, 6, 7, 8]),
0
);
let deadline = mgr
.earliest_offload_deadline()
.expect("first G1→G2 offload should expose a deadline");
mgr.tick_offload_engine(deadline);
assert_eq!(
use_full_with_hash(&mut mgr, 2, plh(2), 202, vec![5, 6, 7, 8]),
1
);
deref_full(&mut mgr, 2);
sink.clear();
assert_eq!(
use_full_with_hash(&mut mgr, 3, plh(3), 303, vec![9, 10, 11, 12]),
0
);
let deadline = mgr
.earliest_offload_deadline()
.expect("second G1→G2 offload should expose a deadline");
mgr.tick_offload_engine(deadline);
let events = sink.take();
assert!(
has_removed(&events, StorageTier::HostPinned, 1),
"G2 capacity eviction should publish HostPinned Removed"
);
assert!(
stored_block(&events, StorageTier::HostPinned, 2).is_some(),
"second G2 completion should publish HostPinned Stored for block 2"
);
}
#[test]
fn reoffloaded_swapped_in_block_keeps_token_ids_for_g2_raw_event() {
let (mut mgr, sink) = make_mgr_raw_capturing(1, 4);
attach_test_offload_engine(&mut mgr, 1, 4);
let slots = match mgr.reserve_swap_in_destination_slots(1) {
SwapInSlotReservation::Reserved(slots) => slots,
SwapInSlotReservation::BlockedOnG1Offload => {
panic!("fresh manager should not need G1 offload")
}
SwapInSlotReservation::NoCapacity => panic!("fresh manager should have capacity"),
};
let token_ids = vec![1, 2, 3, 4];
let entries = vec![SwapInRegistrationBlock {
seq_hash: 1,
plh: plh(1),
local_hash: 101,
token_ids: Some(token_ids.clone()),
}];
let outcome = mgr.register_swapped_in_blocks(entries, None, slots);
assert_eq!(outcome.consumed_entries, 1);
sink.clear();
assert_eq!(
use_full_with_hash(&mut mgr, 2, plh(2), 202, vec![5, 6, 7, 8]),
0
);
let deadline = mgr
.earliest_offload_deadline()
.expect("G1→G2 offload should expose a deadline");
mgr.tick_offload_engine(deadline);
let events = sink.take();
assert!(
raw_stored_with_token_ids(&events, StorageTier::HostPinned, 1, &token_ids),
"re-offloaded swapped-in block should preserve token ids for HostPinned raw Stored"
);
}
#[test]
fn g1_eviction_offload_holds_source_slot_until_complete() {
let mut mgr = make_mgr(1, 4);
let config = KvbmOffloadConfig {
block_size_tokens: 4,
block_size_bytes: Some(1_000_000),
bandwidth_g1_to_g2_gbps: 1.0,
..Default::default()
};
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let mut engine = rt
.block_on(MockOffloadEngine::new(config))
.expect("engine build");
engine.attach_runtime(rt);
mgr.attach_new_offload_engine(engine);
assert_eq!(use_full(&mut mgr, 1, plh(1)), 1);
deref_full(&mut mgr, 1);
assert_eq!(mgr.num_active_blocks(), 0);
assert_eq!(mgr.num_inactive_blocks(), 1);
assert_eq!(use_full(&mut mgr, 2, plh(2)), 0);
assert_eq!(
mgr.num_active_blocks(),
1,
"quarantined source slot must count against G1 capacity"
);
let deadline = mgr
.earliest_offload_deadline()
.expect("G1→G2 offload should expose a stall-advance deadline");
mgr.tick_offload_engine(deadline);
assert_eq!(
mgr.num_active_blocks(),
0,
"source slot should release after transfer completion"
);
assert_eq!(use_full(&mut mgr, 2, plh(2)), 1);
assert_eq!(mgr.num_active_blocks(), 1);
}
#[test]
fn try_batch_swap_in_returns_no_hits_without_engine() {
let mut mgr = make_mgr(8, 4);
let plhs = [plh(1), plh(2), plh(3)];
let outcome = mgr.try_batch_swap_in(&plhs, None);
assert!(matches!(outcome, BatchSwapInOutcome::NoHits));
}
}
}