use std::future::Future;
use std::net::TcpListener;
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
use anyhow::Result;
use dynamo_tokens::{BlockHash, SequenceHash as RouterSequenceHash};
use futures::Stream;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::noop_waker_ref;
use kvbm_engine::leader::{FindMatchesOptions, InstanceLeader, Leader, StagingMode};
use kvbm_engine::offload::{
ExternalBlock, OffloadEngine, PendingTracker, PipelineBuilder, PresenceFilter, SourceBlocks,
TransferHandle, TransferStatus,
};
use kvbm_engine::worker::Worker;
use kvbm_engine::{BlockId, G1 as EngineG1, G2, SequenceHash};
use kvbm_logical::blocks::{ImmutableBlock, MutableBlock};
use kvbm_logical::events::{EventsManager, KvCacheEvent as LogicalKvCacheEvent};
use kvbm_logical::manager::{BlockManager, FrequencyTrackingCapacity};
use kvbm_logical::pools::BlockDuplicationPolicy;
use kvbm_logical::registry::BlockRegistry;
use rustc_hash::FxHashMap;
use crate::common::protocols::G1 as MockerG1;
use super::config::KvbmOffloadConfig;
use super::worker::MockWorker;
const PIPELINE_BARRIER_TIMEOUT: Duration = Duration::from_secs(1);
pub struct SwapInHandle {
complete: Arc<AtomicBool>,
block_count: usize,
_g2_blocks: Vec<ImmutableBlock<G2>>,
}
impl SwapInHandle {
pub fn is_complete(&self) -> bool {
self.complete.load(std::sync::atomic::Ordering::Acquire)
}
pub fn block_count(&self) -> usize {
self.block_count
}
}
pub struct PreparedSwapIn {
requested_blocks: usize,
g2_blocks: Vec<ImmutableBlock<G2>>,
}
impl PreparedSwapIn {
pub fn block_count(&self) -> usize {
self.g2_blocks.len()
}
}
#[derive(Clone, Debug)]
pub(crate) struct G2BlockEventMetadata {
pub(crate) seq_hash: RouterSequenceHash,
pub(crate) parent_hash: Option<RouterSequenceHash>,
pub(crate) local_hash: BlockHash,
pub(crate) token_ids: Option<Vec<u32>>,
}
#[derive(Clone, Debug)]
pub(crate) struct G2OffloadBlock {
pub(crate) block_id: BlockId,
pub(crate) plh: SequenceHash,
pub(crate) metadata: G2BlockEventMetadata,
}
#[derive(Clone, Debug)]
pub(crate) enum G2RouterEvent {
Stored(G2BlockEventMetadata),
Removed { seq_hash: RouterSequenceHash },
}
struct PendingG1ToG2 {
handle: TransferHandle,
_source_slots: Vec<MutableBlock<MockerG1>>,
}
impl PendingG1ToG2 {
fn source_slots_releasable(&self) -> bool {
if self.handle.is_complete() {
return true;
}
let passed = self.handle.passed_blocks().len();
if passed == 0 {
return !matches!(self.handle.status(), TransferStatus::Evaluating);
}
self.handle.completed_blocks().len() + self.handle.failed_blocks().len() >= passed
}
}
#[allow(dead_code)]
pub struct MockOffloadEngine {
config: KvbmOffloadConfig,
engine: OffloadEngine,
leader: Arc<InstanceLeader>,
worker: Arc<MockWorker>,
registry: Arc<BlockRegistry>,
g2_manager: Arc<BlockManager<G2>>,
pending_g1_to_g2: Mutex<Vec<PendingG1ToG2>>,
g2_event_stream: Mutex<Pin<Box<dyn Stream<Item = LogicalKvCacheEvent> + Send>>>,
g2_event_metadata: Mutex<FxHashMap<SequenceHash, G2BlockEventMetadata>>,
_runtime: Option<tokio::runtime::Runtime>,
}
impl MockOffloadEngine {
pub async fn new(config: KvbmOffloadConfig) -> Result<Self> {
let messenger = create_local_messenger().await?;
let g2_events_manager = Arc::new(EventsManager::builder().build());
let g2_event_stream = Box::pin(g2_events_manager.subscribe());
let registry = Arc::new(build_registry(g2_events_manager));
let g2_manager = Arc::new(build_g2_block_manager(
config.num_g2_blocks,
config.block_size_tokens,
®istry,
));
let worker = Arc::new(MockWorker::new(
config.block_size_bytes.unwrap_or(0),
config.bandwidth_g1_to_g2_gbps,
config.bandwidth_g2_to_g1_gbps,
None,
None,
));
let worker_for_leader: Arc<dyn Worker> = worker.clone();
let leader = Arc::new(
InstanceLeader::builder()
.messenger(messenger)
.registry((*registry).clone())
.g2_manager(g2_manager.clone())
.worker(worker_for_leader)
.build()?,
);
let g1_to_g2_pending = Arc::new(PendingTracker::new());
let g1_to_g2_presence = PresenceFilter::<EngineG1, G2>::new(registry.clone())
.with_pending_tracker(g1_to_g2_pending.clone());
let g1_to_g2_pipeline = PipelineBuilder::<EngineG1, G2>::new()
.policy(Arc::new(g1_to_g2_presence))
.pending_tracker(g1_to_g2_pending)
.batch_size(config.offload_batch_size)
.max_concurrent_transfers(config.offload_batch_size)
.build();
let engine = OffloadEngine::builder(leader.clone())
.with_registry(registry.clone())
.with_g2_manager(g2_manager.clone())
.with_g1_to_g2_pipeline(g1_to_g2_pipeline)
.build()?;
Ok(Self {
config,
engine,
leader,
worker,
registry,
g2_manager,
pending_g1_to_g2: Mutex::new(Vec::new()),
g2_event_stream: Mutex::new(g2_event_stream),
g2_event_metadata: Mutex::new(FxHashMap::default()),
_runtime: None,
})
}
pub fn attach_runtime(&mut self, rt: tokio::runtime::Runtime) {
self._runtime = Some(rt);
}
fn remember_g2_event_metadata(&self, blocks: &[G2OffloadBlock]) {
let mut metadata = self
.g2_event_metadata
.lock()
.expect("G2 event metadata mutex poisoned");
for block in blocks {
metadata.insert(block.plh, block.metadata.clone());
}
}
fn drain_g2_lifecycle_events(&self) -> Vec<LogicalKvCacheEvent> {
let mut stream = self
.g2_event_stream
.lock()
.expect("G2 event stream mutex poisoned");
let mut events = Vec::new();
let mut cx = Context::from_waker(noop_waker_ref());
while let Poll::Ready(Some(event)) = stream.as_mut().poll_next(&mut cx) {
events.push(event);
}
events
}
pub(crate) fn drain_g2_router_events(&self) -> Vec<G2RouterEvent> {
let lifecycle_events = self.drain_g2_lifecycle_events();
if lifecycle_events.is_empty() {
return Vec::new();
}
let mut metadata = self
.g2_event_metadata
.lock()
.expect("G2 event metadata mutex poisoned");
let mut router_events = Vec::new();
for event in lifecycle_events {
match event {
LogicalKvCacheEvent::Create(plh) => {
if let Some(meta) = metadata.get(&plh).cloned() {
router_events.push(G2RouterEvent::Stored(meta));
}
}
LogicalKvCacheEvent::Remove(plh) => {
if let Some(meta) = metadata.remove(&plh) {
router_events.push(G2RouterEvent::Removed {
seq_hash: meta.seq_hash,
});
}
}
}
}
router_events
}
async fn with_barrier_timeout<F>(wait: F) -> bool
where
F: Future<Output = bool>,
{
tokio::time::timeout(PIPELINE_BARRIER_TIMEOUT, wait)
.await
.unwrap_or_default()
}
fn wait_on_attached_runtime<F>(&self, wait: F) -> bool
where
F: Future<Output = bool>,
{
let Some(rt) = self._runtime.as_ref() else {
return true;
};
let current = tokio::runtime::Handle::try_current().ok();
match current.as_ref().map(tokio::runtime::Handle::runtime_flavor) {
Some(tokio::runtime::RuntimeFlavor::MultiThread) => {
tokio::task::block_in_place(|| rt.block_on(Self::with_barrier_timeout(wait)))
}
Some(_) => true,
None => rt.block_on(Self::with_barrier_timeout(wait)),
}
}
fn wait_for_policy_evaluation(&self, handle: &TransferHandle) -> bool {
let mut status = handle.subscribe_status();
self.wait_on_attached_runtime(async move {
loop {
if !matches!(*status.borrow(), TransferStatus::Evaluating) {
return true;
}
if status.changed().await.is_err() {
return false;
}
}
})
}
fn wait_for_reservations_or_completion(
&self,
handle: &TransferHandle,
target_reservation_count: u64,
) -> bool {
let reservation_notify = self.worker.reservation_notifier();
let mut status = handle.subscribe_status();
self.wait_on_attached_runtime(async move {
loop {
if handle.is_complete()
|| self.worker.reservation_count() >= target_reservation_count
{
return true;
}
if self.worker.earliest_finish().is_some() {
return false;
}
tokio::select! {
_ = reservation_notify.notified() => {}
changed = status.changed() => {
if changed.is_err() {
return false;
}
}
}
}
})
}
fn wait_for_settled_g1_to_g2_blocks(&self, expected_settled_blocks: usize) -> bool {
let pending = self
.pending_g1_to_g2
.lock()
.expect("pending G1→G2 handles mutex poisoned");
let handles: Vec<TransferHandle> = pending
.iter()
.map(|pending| pending.handle.clone())
.collect();
drop(pending);
let mut completed: Vec<_> = handles
.iter()
.map(TransferHandle::subscribe_completed)
.collect();
let mut failed: Vec<_> = handles
.iter()
.map(TransferHandle::subscribe_failed)
.collect();
self.wait_on_attached_runtime(async move {
loop {
let settled_blocks: usize = handles
.iter()
.map(|handle| handle.completed_blocks().len() + handle.failed_blocks().len())
.sum();
if settled_blocks >= expected_settled_blocks {
return true;
}
if handles.is_empty() {
return false;
}
let mut changes = FuturesUnordered::new();
for rx in completed.iter_mut() {
changes.push(rx.changed());
}
for rx in failed.iter_mut() {
changes.push(rx.changed());
}
let mut observed_change = false;
while let Some(changed) = changes.next().await {
if changed.is_ok() {
observed_change = true;
break;
}
}
if !observed_change {
return false;
}
}
})
}
fn initial_runnable_transfer_batches(&self, passed_blocks: usize) -> usize {
if passed_blocks == 0 {
return 0;
}
let transfer_batch_size = self.config.offload_batch_size.max(1);
let max_concurrent_transfer_batches = self.config.offload_batch_size.max(1);
passed_blocks
.div_ceil(transfer_batch_size)
.min(max_concurrent_transfer_batches)
}
fn pending_g1_to_g2_settled_blocks(&self) -> usize {
let pending = self
.pending_g1_to_g2
.lock()
.expect("pending G1→G2 handles mutex poisoned");
pending
.iter()
.map(|pending| {
pending.handle.completed_blocks().len() + pending.handle.failed_blocks().len()
})
.sum()
}
fn prune_releasable_g1_to_g2_sources(&self) {
let mut pending = self
.pending_g1_to_g2
.lock()
.expect("pending G1→G2 handles mutex poisoned");
pending.retain(|pending| !pending.source_slots_releasable());
}
pub fn tick(&self, now_ms: f64) {
self.worker.set_now_ms(now_ms);
let settled_before = self.pending_g1_to_g2_settled_blocks();
let (offload_drained, _, offload_drained_blocks, _) = self.worker.drain_completions(now_ms);
if offload_drained > 0 {
if offload_drained_blocks > 0 {
let expected_settled_blocks = settled_before + offload_drained_blocks;
let published = self.wait_for_settled_g1_to_g2_blocks(expected_settled_blocks);
if !published {
tracing::warn!(
now_ms,
offload_drained,
offload_drained_blocks,
settled_before,
settled_after = self.pending_g1_to_g2_settled_blocks(),
"kvbm-offload: pipeline did not publish drained transfers"
);
}
}
self.prune_releasable_g1_to_g2_sources();
} else {
self.prune_releasable_g1_to_g2_sources();
}
}
pub fn earliest_pending_deadline(&self) -> Option<f64> {
self.worker.earliest_finish()
}
pub(crate) fn enqueue_g1_evictions_with_metadata(
&mut self,
evicted: &[G2OffloadBlock],
source_slots: Vec<MutableBlock<MockerG1>>,
now_ms: Option<f64>,
) {
if evicted.is_empty() {
drop(source_slots);
return;
}
self.remember_g2_event_metadata(evicted);
let engine_blocks: Vec<_> = evicted
.iter()
.map(|block| (block.block_id, block.plh))
.collect();
self.enqueue_g1_evictions_holding_sources(&engine_blocks, source_slots, now_ms);
}
fn enqueue_g1_evictions_holding_sources(
&mut self,
evicted: &[(BlockId, SequenceHash)],
source_slots: Vec<MutableBlock<MockerG1>>,
now_ms: Option<f64>,
) {
if evicted.is_empty() {
return;
}
if let Some(ms) = now_ms {
self.worker.set_now_ms(ms);
}
tracing::debug!(
now_ms = self.worker.now_ms(),
blocks = evicted.len(),
"kvbm-offload: G1→G2 enqueue evictions"
);
let reservation_count_before = self.worker.reservation_count();
let source: SourceBlocks<EngineG1> = SourceBlocks::External(
evicted
.iter()
.map(|(block_id, seq_hash)| ExternalBlock::<EngineG1>::new(*block_id, *seq_hash))
.collect(),
);
let handle = self
.engine
.enqueue_g1_to_g2(source)
.expect("G1→G2 pipeline must be configured at engine construction");
{
let mut pending = self
.pending_g1_to_g2
.lock()
.expect("pending G1→G2 handles mutex poisoned");
pending.push(PendingG1ToG2 {
handle: handle.clone(),
_source_slots: source_slots,
});
}
self.wait_for_policy_evaluation(&handle);
let target_reservation_count = reservation_count_before
+ self.initial_runnable_transfer_batches(handle.passed_blocks().len()) as u64;
if target_reservation_count > reservation_count_before {
self.wait_for_reservations_or_completion(&handle, target_reservation_count);
}
if handle.is_complete() {
self.prune_releasable_g1_to_g2_sources();
}
}
pub fn prepare_onboard_prefix(&mut self, plhs: &[SequenceHash]) -> Option<PreparedSwapIn> {
if plhs.is_empty() {
return None;
}
let options = FindMatchesOptions {
search_remote: false,
staging_mode: StagingMode::Hold,
};
let mut result = self
.leader
.find_matches_with_options(plhs, options)
.expect("find_matches_with_options must not fail on local-only Hold lookup");
let g2_blocks = result
.take_g2_blocks()
.expect("Ready variant must yield G2 blocks on Hold + !search_remote path");
if g2_blocks.is_empty() {
tracing::debug!(
plhs_len = plhs.len(),
"kvbm-offload: G2→G1 lookup MISS (0 blocks in G2)"
);
return None;
}
Some(PreparedSwapIn {
requested_blocks: plhs.len(),
g2_blocks,
})
}
pub fn start_onboard_prefix(
&mut self,
prepared: PreparedSwapIn,
now_ms: Option<f64>,
) -> SwapInHandle {
let block_count = prepared.g2_blocks.len();
let now_ms = now_ms.unwrap_or_else(|| self.worker.now_ms());
self.worker.set_now_ms(now_ms);
tracing::debug!(
now_ms,
plhs_len = prepared.requested_blocks,
block_count,
"kvbm-offload: G2→G1 swap-in HIT"
);
let complete = Arc::new(AtomicBool::new(false));
self.worker
.reserve_swap_in(now_ms, block_count, complete.clone());
SwapInHandle {
complete,
block_count,
_g2_blocks: prepared.g2_blocks,
}
}
#[cfg(test)]
pub(crate) fn g2_manager(&self) -> &Arc<BlockManager<G2>> {
&self.g2_manager
}
}
impl Drop for MockOffloadEngine {
fn drop(&mut self) {
let Some(rt) = self._runtime.take() else {
return;
};
if tokio::runtime::Handle::try_current().is_ok() {
let _ = std::thread::spawn(move || drop(rt)).join();
} else {
drop(rt);
}
}
}
async fn create_local_messenger() -> Result<Arc<velo::Messenger>> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let transport: Arc<dyn velo::backend::Transport> = Arc::new(
velo::backend::tcp::TcpTransportBuilder::new()
.from_listener(listener)?
.build()?,
);
let messenger = velo::Messenger::builder()
.add_transport(transport)
.build()
.await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(messenger)
}
fn build_registry(events_manager: Arc<EventsManager>) -> BlockRegistry {
BlockRegistry::builder()
.frequency_tracker(FrequencyTrackingCapacity::Medium.create_tracker())
.event_manager(events_manager)
.build()
}
fn build_g2_block_manager(
block_count: usize,
block_size_tokens: usize,
registry: &BlockRegistry,
) -> BlockManager<G2> {
BlockManager::<G2>::builder()
.block_count(block_count)
.block_size(block_size_tokens)
.registry(registry.clone())
.duplication_policy(BlockDuplicationPolicy::Reject)
.with_lineage_backend()
.build()
.expect("BlockManager<G2> should build with valid config")
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn mock_offload_engine_new_builds_end_to_end() {
let config = KvbmOffloadConfig::default();
let engine = MockOffloadEngine::new(config)
.await
.expect("construction should succeed");
assert!(engine.engine.has_g1_to_g2());
assert!(!engine.engine.has_g2_to_g3());
assert!(!engine.engine.has_g2_to_g4());
assert_eq!(engine.earliest_pending_deadline(), None);
}
#[tokio::test]
async fn tick_is_noop_when_idle() {
let engine = MockOffloadEngine::new(KvbmOffloadConfig::default())
.await
.unwrap();
engine.tick(100.0);
engine.tick(1_000_000.0);
assert_eq!(engine.earliest_pending_deadline(), None);
}
#[test]
fn offline_runtime_attach_pattern() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let mut engine = rt
.block_on(MockOffloadEngine::new(KvbmOffloadConfig::default()))
.expect("offline construction succeeds");
engine.attach_runtime(rt);
assert_eq!(engine.earliest_pending_deadline(), None);
}
#[tokio::test]
async fn prepare_onboard_prefix_empty_input_returns_none() {
let mut engine = MockOffloadEngine::new(KvbmOffloadConfig::default())
.await
.unwrap();
assert!(engine.prepare_onboard_prefix(&[]).is_none());
}
#[tokio::test]
async fn prepare_onboard_prefix_returns_none_when_g2_empty() {
use dynamo_tokens::PositionalLineageHash;
let mut engine = MockOffloadEngine::new(KvbmOffloadConfig::default())
.await
.unwrap();
let hashes: Vec<SequenceHash> = (0..5)
.map(|i| PositionalLineageHash::new(i as u64, None, 0))
.collect();
assert!(engine.prepare_onboard_prefix(&hashes).is_none());
}
#[tokio::test]
async fn start_onboard_prefix_pins_g2_blocks_until_handle_drops() {
use dynamo_tokens::PositionalLineageHash;
use kvbm_logical::MutableBlock;
let config = KvbmOffloadConfig {
block_size_bytes: Some(1_000_000),
bandwidth_g2_to_g1_gbps: 1.0,
..Default::default()
};
let mut engine = MockOffloadEngine::new(config).await.unwrap();
engine.tick(0.0);
let plh = PositionalLineageHash::new(42, None, 0);
let (mut alloc, _evicted) = engine
.g2_manager
.allocate_blocks_with_evictions(1)
.expect("G2 allocate");
let mutable: MutableBlock<G2> = alloc.pop().unwrap();
let complete = mutable
.stage(plh, engine.g2_manager.block_size())
.expect("G2 stage");
drop(engine.g2_manager.register_block(complete));
let prepared = engine
.prepare_onboard_prefix(&[plh])
.expect("G2 prefix match must produce a prepared swap-in");
let handle = engine.start_onboard_prefix(prepared, Some(0.0));
assert_eq!(handle.block_count(), 1);
assert!(!handle.is_complete());
let deadline = engine
.earliest_pending_deadline()
.expect("swap-in must appear in earliest_finish");
assert!(
(deadline - 1.0).abs() < 1e-6,
"1 MB / 1 GB/s = 1.0 ms, got {deadline}"
);
engine.tick(0.5);
assert!(
!handle.is_complete(),
"swap-in must remain pending before finish time"
);
engine.tick(1.0);
assert!(
handle.is_complete(),
"swap-in bit must flip once tick advances past finish"
);
}
}