use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use anyhow::{Result, anyhow, bail};
use futures::future::BoxFuture;
use kvbm_common::LogicalLayoutHandle;
use kvbm_engine::object::ObjectBlockOps;
use kvbm_engine::worker::{
ConnectRemoteResponse, ImportMetadataResponse, RemoteDescriptor, SerializedLayoutResponse,
Worker, WorkerTransfers,
};
use kvbm_engine::{BlockId, InstanceId, SequenceHash};
use kvbm_physical::manager::{LayoutHandle, SerializedLayout};
use kvbm_physical::transfer::{PhysicalLayout, TransferCompleteNotification, TransferOptions};
use tokio::sync::Notify;
use velo::{Event, EventManager};
use super::bandwidth_sharing_model::{BandwidthSharingModel, TransferId};
use super::shared_g3::SharedG3Pool;
use super::shared_g4::SharedG4Store;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransferDirection {
G1ToG2,
G2ToG1,
G2ToG3,
G3ToG2,
G2ToG4,
G4ToG2,
}
impl TransferDirection {
fn is_g3(self) -> bool {
matches!(self, Self::G2ToG3 | Self::G3ToG2)
}
fn is_g4(self) -> bool {
matches!(self, Self::G2ToG4 | Self::G4ToG2)
}
fn is_offload(self) -> bool {
matches!(self, Self::G1ToG2 | Self::G2ToG3 | Self::G2ToG4)
}
fn label(self) -> &'static str {
match self {
Self::G1ToG2 => "G1→G2",
Self::G2ToG1 => "G2→G1",
Self::G2ToG3 => "G2→G3",
Self::G3ToG2 => "G3→G2",
Self::G2ToG4 => "G2→G4",
Self::G4ToG2 => "G4→G2",
}
}
}
struct PipelineAwaiter {
event: Event,
owner_id: u64,
direction: TransferDirection,
num_blocks: usize,
}
pub(crate) struct TransferState {
offload_bw: BandwidthSharingModel,
onboard_bw: BandwidthSharingModel,
awaiters: HashMap<TransferId, PipelineAwaiter>,
swap_in_flags: HashMap<TransferId, Arc<std::sync::atomic::AtomicBool>>,
}
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct DrainCounts {
pub(crate) offload_transfers: usize,
pub(crate) onboard_transfers: usize,
pub(crate) offload_blocks: usize,
pub(crate) onboard_blocks: usize,
}
impl DrainCounts {
fn into_tuple(self) -> (usize, usize, usize, usize) {
(
self.offload_transfers,
self.onboard_transfers,
self.offload_blocks,
self.onboard_blocks,
)
}
fn add_transfer(&mut self, direction: TransferDirection, blocks: usize) {
if direction.is_offload() {
self.offload_transfers += 1;
self.offload_blocks += blocks;
} else {
self.onboard_transfers += 1;
self.onboard_blocks += blocks;
}
}
pub(crate) fn add_counts(&mut self, other: DrainCounts) {
self.offload_transfers += other.offload_transfers;
self.onboard_transfers += other.onboard_transfers;
self.offload_blocks += other.offload_blocks;
self.onboard_blocks += other.onboard_blocks;
}
}
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct DrainSummary {
pub(crate) local: DrainCounts,
pub(crate) shared_g3: SharedDrainCounts,
pub(crate) shared_g4: SharedDrainCounts,
}
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct SharedDrainCounts {
pub(crate) counts: DrainCounts,
pub(crate) deferred_onboard_blocks: usize,
pub(crate) offload_registration_baseline: Option<u64>,
}
impl SharedDrainCounts {
pub(crate) fn add_record(&mut self, record: SharedDrainCounts) {
self.counts.add_counts(record.counts);
self.deferred_onboard_blocks += record.deferred_onboard_blocks;
if record.counts.offload_blocks > 0 {
self.offload_registration_baseline = match (
self.offload_registration_baseline,
record.offload_registration_baseline,
) {
(Some(existing), Some(incoming)) => Some(existing.min(incoming)),
(None, incoming) => incoming,
(existing, None) => existing,
};
}
}
pub(crate) fn add_deferred_record(&mut self, mut record: SharedDrainCounts) {
record.deferred_onboard_blocks += record.counts.onboard_blocks;
self.add_record(record);
}
}
#[derive(Debug, Default)]
pub(crate) struct DrainResult {
pub(crate) total: DrainCounts,
pub(crate) by_owner: HashMap<u64, DrainCounts>,
pub(crate) completed: Vec<CompletedTransfer>,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct CompletedTransfer {
pub(crate) id: TransferId,
pub(crate) direction: TransferDirection,
}
impl TransferState {
pub(crate) fn new(offload_gbps: f64, onboard_gbps: f64) -> Self {
let id_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
Self {
offload_bw: BandwidthSharingModel::new(offload_gbps, id_counter.clone()),
onboard_bw: BandwidthSharingModel::new(onboard_gbps, id_counter),
awaiters: HashMap::new(),
swap_in_flags: HashMap::new(),
}
}
pub(crate) fn earliest_finish(&self) -> Option<f64> {
self.offload_bw
.earliest_finish()
.into_iter()
.chain(self.onboard_bw.earliest_finish())
.reduce(f64::min)
}
pub(crate) fn earliest_offload_finish(&self) -> Option<f64> {
self.offload_bw.earliest_finish()
}
pub(crate) fn earliest_onboard_finish(&self) -> Option<f64> {
self.onboard_bw.earliest_finish()
}
pub(crate) fn drain_completions(&mut self, now_ms: f64, scope: &'static str) -> DrainResult {
let offload_before = self.offload_bw.active_count();
let onboard_before = self.onboard_bw.active_count();
let offload_drained = self.offload_bw.advance_to(now_ms);
let onboard_drained = self.onboard_bw.advance_to(now_ms);
let offload_drained_count = offload_drained.len();
let onboard_drained_count = onboard_drained.len();
let drained: Vec<TransferId> = offload_drained.into_iter().chain(onboard_drained).collect();
tracing::debug!(
now_ms,
offload_active_before = offload_before,
onboard_active_before = onboard_before,
drained_count = drained.len(),
offload_drained_count,
onboard_drained_count,
awaiter_map_size = self.awaiters.len(),
"kvbm-offload: drain transfer completions"
);
let mut awaiter_fired = 0usize;
let mut offload_awaiter_blocks = 0usize;
let mut onboard_awaiter_blocks = 0usize;
let mut swap_in_flipped = 0usize;
let mut result = DrainResult {
total: DrainCounts {
offload_transfers: offload_drained_count,
onboard_transfers: onboard_drained_count,
..Default::default()
},
by_owner: HashMap::new(),
completed: Vec::new(),
};
for id in drained {
if let Some(awaiter) = self.awaiters.remove(&id) {
tracing::debug!(
now_ms,
scope,
transfer_id = id,
direction = awaiter.direction.label(),
blocks = awaiter.num_blocks,
"kvbm-offload: mock transfer complete"
);
if awaiter.direction.is_offload() {
offload_awaiter_blocks += awaiter.num_blocks;
} else {
onboard_awaiter_blocks += awaiter.num_blocks;
}
result
.by_owner
.entry(awaiter.owner_id)
.or_default()
.add_transfer(awaiter.direction, awaiter.num_blocks);
result.completed.push(CompletedTransfer {
id,
direction: awaiter.direction,
});
let _ = awaiter.event.trigger();
awaiter_fired += 1;
}
if let Some(flag) = self.swap_in_flags.remove(&id) {
flag.store(true, Ordering::Release);
swap_in_flipped += 1;
}
}
tracing::debug!(
awaiter_fired,
offload_awaiter_blocks,
onboard_awaiter_blocks,
swap_in_flipped,
"kvbm-offload: fired completed transfer waiters"
);
result.total.offload_blocks = offload_awaiter_blocks;
result.total.onboard_blocks = onboard_awaiter_blocks;
result
}
}
static NEXT_WORKER_ID: AtomicU64 = AtomicU64::new(1);
fn ms_to_us(ms: f64) -> u64 {
(ms.max(0.0) * 1000.0) as u64
}
fn us_to_ms(us: u64) -> f64 {
(us as f64) / 1000.0
}
pub struct MockWorker {
owner_id: u64,
now_us: Arc<AtomicU64>,
pub(crate) state: Arc<Mutex<TransferState>>,
event_manager: EventManager,
reservation_count: AtomicU64,
reservation_notify: Arc<Notify>,
block_bytes: usize,
g1_handle: Option<LayoutHandle>,
g2_handle: Option<LayoutHandle>,
shared_g3: Option<Arc<SharedG3Pool>>,
shared_g4: Option<Arc<SharedG4Store>>,
}
impl MockWorker {
pub(crate) fn new(
block_bytes: usize,
offload_gbps: f64,
onboard_gbps: f64,
g1_handle: Option<LayoutHandle>,
g2_handle: Option<LayoutHandle>,
shared_g3: Option<Arc<SharedG3Pool>>,
shared_g4: Option<Arc<SharedG4Store>>,
) -> Self {
Self {
owner_id: NEXT_WORKER_ID.fetch_add(1, Ordering::Relaxed),
now_us: Arc::new(AtomicU64::new(0)),
state: Arc::new(Mutex::new(TransferState::new(offload_gbps, onboard_gbps))),
event_manager: EventManager::local(),
reservation_count: AtomicU64::new(0),
reservation_notify: Arc::new(Notify::new()),
block_bytes,
g1_handle,
g2_handle,
shared_g3,
shared_g4,
}
}
pub fn set_now_ms(&self, now_ms: f64) {
self.now_us.store(ms_to_us(now_ms), Ordering::Release);
}
pub fn now_ms(&self) -> f64 {
us_to_ms(self.now_us.load(Ordering::Acquire))
}
pub fn reservation_count(&self) -> u64 {
self.reservation_count.load(Ordering::Acquire)
}
pub(crate) fn reservation_notifier(&self) -> Arc<Notify> {
self.reservation_notify.clone()
}
pub fn drain_completions(&self, now_ms: f64) -> (usize, usize, usize, usize) {
self.drain_completions_summary(now_ms).local.into_tuple()
}
pub(crate) fn drain_completions_summary(&self, now_ms: f64) -> DrainSummary {
let mut state = self.state.lock().expect("TransferState mutex poisoned");
let local = state.drain_completions(now_ms, "worker").total;
drop(state);
let shared_g3 = self
.shared_g3
.as_ref()
.map(|shared_g3| shared_g3.drain_completions(now_ms, self.owner_id))
.unwrap_or_default();
let shared_g4 = self
.shared_g4
.as_ref()
.map(|shared_g4| shared_g4.drain_completions(now_ms, self.owner_id))
.unwrap_or_default();
DrainSummary {
local,
shared_g3,
shared_g4,
}
}
pub fn reserve_swap_in(
&self,
now_ms: f64,
num_blocks: usize,
complete: Arc<std::sync::atomic::AtomicBool>,
) -> TransferId {
let bytes = num_blocks.saturating_mul(self.block_bytes);
let mut state = self.state.lock().expect("TransferState mutex poisoned");
state.drain_completions(now_ms, "worker");
let id = state.onboard_bw.start_transfer(now_ms, bytes);
let next_deadline_ms = state.onboard_bw.earliest_finish();
tracing::debug!(
now_ms,
scope = "worker",
transfer_id = id,
direction = TransferDirection::G2ToG1.label(),
blocks = num_blocks,
bytes,
next_deadline_ms = ?next_deadline_ms,
"kvbm-offload: reserve mock swap-in transfer"
);
state.swap_in_flags.insert(id, complete);
id
}
pub fn earliest_finish(&self) -> Option<f64> {
let state = self.state.lock().expect("TransferState mutex poisoned");
let local = state.earliest_finish();
drop(state);
local
.into_iter()
.chain(self.shared_g3.as_ref().and_then(|g3| g3.earliest_finish()))
.chain(self.shared_g4.as_ref().and_then(|g4| g4.earliest_finish()))
.reduce(f64::min)
}
pub(crate) fn earliest_local_offload_finish(&self) -> Option<f64> {
let state = self.state.lock().expect("TransferState mutex poisoned");
state.earliest_offload_finish()
}
pub(crate) fn earliest_shared_g3_offload_finish(&self) -> Option<f64> {
self.shared_g3
.as_ref()
.and_then(|g3| g3.earliest_offload_finish())
}
pub(crate) fn earliest_shared_g4_offload_finish(&self) -> Option<f64> {
self.shared_g4
.as_ref()
.and_then(|g4| g4.earliest_offload_finish())
}
pub fn earliest_foreground_finish(&self) -> Option<f64> {
let state = self.state.lock().expect("TransferState mutex poisoned");
let local = state.earliest_finish();
drop(state);
local
.into_iter()
.chain(
self.shared_g3
.as_ref()
.and_then(|g3| g3.earliest_onboard_finish()),
)
.chain(
self.shared_g4
.as_ref()
.and_then(|g4| g4.earliest_onboard_finish()),
)
.reduce(f64::min)
}
fn reserve_transfer(
&self,
direction: TransferDirection,
now_ms: f64,
num_blocks: usize,
) -> Result<TransferCompleteNotification> {
self.reserve_transfer_with(direction, now_ms, num_blocks, |_| {})
.map(|(_id, notification)| notification)
}
fn reserve_transfer_with(
&self,
direction: TransferDirection,
now_ms: f64,
num_blocks: usize,
on_reserved: impl FnOnce(TransferId),
) -> Result<(TransferId, TransferCompleteNotification)> {
let bytes = num_blocks.saturating_mul(self.block_bytes);
let (state_arc, scope, already_drained) =
if direction.is_g3() {
let shared_g3 = self.shared_g3.as_ref().ok_or_else(|| {
anyhow!("MockWorker: G2↔G3 transfer requested without shared G3")
})?;
shared_g3.drain_completions_to_pending(now_ms);
(shared_g3.transfer_state(), "shared-g3", true)
} else if direction.is_g4() {
let shared_g4 = self.shared_g4.as_ref().ok_or_else(|| {
anyhow!("MockWorker: G2↔G4 transfer requested without shared G4")
})?;
shared_g4.drain_completions_to_pending(now_ms);
(shared_g4.transfer_state(), "shared-g4", true)
} else {
(self.state.clone(), "worker", false)
};
let mut state = state_arc.lock().expect("TransferState mutex poisoned");
if !already_drained {
state.drain_completions(now_ms, scope);
}
let (id, next_deadline_ms) = if direction.is_offload() {
let id = state.offload_bw.start_transfer(now_ms, bytes);
(id, state.offload_bw.earliest_finish())
} else {
let id = state.onboard_bw.start_transfer(now_ms, bytes);
(id, state.onboard_bw.earliest_finish())
};
tracing::debug!(
now_ms,
scope,
transfer_id = id,
direction = direction.label(),
blocks = num_blocks,
bytes,
next_deadline_ms = ?next_deadline_ms,
"kvbm-offload: reserve mock transfer"
);
self.reservation_count.fetch_add(1, Ordering::AcqRel);
self.reservation_notify.notify_waiters();
let event = self
.event_manager
.new_event()
.map_err(|e| anyhow!("MockWorker: failed to allocate velo event: {e}"))?;
let awaiter = event
.awaiter()
.map_err(|e| anyhow!("MockWorker: failed to build event awaiter: {e}"))?;
state.awaiters.insert(
id,
PipelineAwaiter {
event,
owner_id: self.owner_id,
direction,
num_blocks,
},
);
on_reserved(id);
drop(state);
Ok((id, TransferCompleteNotification::from_awaiter(awaiter)))
}
}
fn infer_direction(
src: LogicalLayoutHandle,
dst: LogicalLayoutHandle,
) -> Result<TransferDirection> {
match (src, dst) {
(LogicalLayoutHandle::G1, LogicalLayoutHandle::G2) => Ok(TransferDirection::G1ToG2),
(LogicalLayoutHandle::G2, LogicalLayoutHandle::G1) => Ok(TransferDirection::G2ToG1),
(LogicalLayoutHandle::G2, LogicalLayoutHandle::G3) => Ok(TransferDirection::G2ToG3),
(LogicalLayoutHandle::G3, LogicalLayoutHandle::G2) => Ok(TransferDirection::G3ToG2),
(s, d) => bail!(
"MockWorker only simulates local G1↔G2 and G2↔G3 transfers; got src={:?} dst={:?}",
s,
d
),
}
}
impl WorkerTransfers for MockWorker {
fn execute_local_transfer(
&self,
src: LogicalLayoutHandle,
dst: LogicalLayoutHandle,
src_block_ids: Arc<[BlockId]>,
_dst_block_ids: Arc<[BlockId]>,
_options: TransferOptions,
) -> Result<TransferCompleteNotification> {
let direction = infer_direction(src, dst)?;
let now_ms = self.now_ms();
self.reserve_transfer(direction, now_ms, src_block_ids.len())
}
fn execute_remote_onboard(
&self,
_src: RemoteDescriptor,
_dst: LogicalLayoutHandle,
_dst_block_ids: Arc<[BlockId]>,
_options: TransferOptions,
) -> Result<TransferCompleteNotification> {
bail!(
"MockWorker: execute_remote_onboard not supported (mocker simulates local G1↔G2/G2↔G3 only)"
)
}
fn execute_remote_offload(
&self,
_src: LogicalLayoutHandle,
_src_block_ids: Arc<[BlockId]>,
_dst: RemoteDescriptor,
_options: TransferOptions,
) -> Result<TransferCompleteNotification> {
bail!("MockWorker: execute_remote_offload not supported")
}
fn connect_remote(
&self,
_instance_id: InstanceId,
_metadata: Vec<SerializedLayout>,
) -> Result<ConnectRemoteResponse> {
bail!("MockWorker: connect_remote not supported")
}
fn has_remote_metadata(&self, _instance_id: InstanceId) -> bool {
false
}
fn execute_remote_onboard_for_instance(
&self,
_instance_id: InstanceId,
_remote_logical_type: LogicalLayoutHandle,
_src_block_ids: Vec<BlockId>,
_dst: LogicalLayoutHandle,
_dst_block_ids: Arc<[BlockId]>,
_options: TransferOptions,
) -> Result<TransferCompleteNotification> {
bail!("MockWorker: execute_remote_onboard_for_instance not supported")
}
}
impl Worker for MockWorker {
fn g1_handle(&self) -> Option<LayoutHandle> {
self.g1_handle
}
fn g2_handle(&self) -> Option<LayoutHandle> {
self.g2_handle
}
fn g3_handle(&self) -> Option<LayoutHandle> {
None
}
fn export_metadata(&self) -> Result<SerializedLayoutResponse> {
bail!("MockWorker: export_metadata not supported (mocker is single-instance)")
}
fn import_metadata(&self, _metadata: SerializedLayout) -> Result<ImportMetadataResponse> {
bail!("MockWorker: import_metadata not supported (mocker is single-instance)")
}
}
impl ObjectBlockOps for MockWorker {
fn has_blocks(
&self,
keys: Vec<SequenceHash>,
) -> BoxFuture<'static, Vec<(SequenceHash, Option<usize>)>> {
let shared_g4 = self.shared_g4.clone();
Box::pin(async move {
keys.into_iter()
.map(|key| {
let size = shared_g4.as_ref().and_then(|store| store.has_object(&key));
(key, size)
})
.collect()
})
}
fn put_blocks(
&self,
keys: Vec<SequenceHash>,
src_layout: LogicalLayoutHandle,
block_ids: Vec<BlockId>,
) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>> {
if keys.is_empty() {
return Box::pin(async move { Vec::new() });
}
if src_layout != LogicalLayoutHandle::G2 || keys.len() != block_ids.len() {
tracing::warn!(
?src_layout,
keys = keys.len(),
block_ids = block_ids.len(),
"kvbm-offload: rejected mock G2→G4 object put"
);
return Box::pin(async move { keys.into_iter().map(Err).collect() });
}
let Some(shared_g4) = self.shared_g4.clone() else {
tracing::warn!("kvbm-offload: put_blocks called without shared G4 store");
return Box::pin(async move { keys.into_iter().map(Err).collect() });
};
let block_bytes = self.block_bytes;
let publish_keys = keys.clone();
let publish_store = shared_g4.clone();
let notification = match self.reserve_transfer_with(
TransferDirection::G2ToG4,
self.now_ms(),
keys.len(),
move |transfer_id| {
publish_store.register_pending_put(transfer_id, publish_keys, block_bytes);
},
) {
Ok((_transfer_id, notification)) => notification,
Err(error) => {
tracing::warn!(
error = %error,
"kvbm-offload: failed to reserve mock G2→G4 object put"
);
return Box::pin(async move { keys.into_iter().map(Err).collect() });
}
};
Box::pin(async move {
match notification.await {
Ok(()) => {
for key in &keys {
shared_g4.insert_object(*key, block_bytes);
}
keys.into_iter().map(Ok).collect()
}
Err(error) => {
tracing::warn!(
error = %error,
"kvbm-offload: mock G2→G4 object put did not complete"
);
keys.into_iter().map(Err).collect()
}
}
})
}
fn get_blocks(
&self,
keys: Vec<SequenceHash>,
dst_layout: LogicalLayoutHandle,
block_ids: Vec<BlockId>,
) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>> {
if keys.is_empty() {
return Box::pin(async move { Vec::new() });
}
if dst_layout != LogicalLayoutHandle::G2 || keys.len() != block_ids.len() {
tracing::warn!(
?dst_layout,
keys = keys.len(),
block_ids = block_ids.len(),
"kvbm-offload: rejected mock G4→G2 object get"
);
return Box::pin(async move { keys.into_iter().map(Err).collect() });
}
let Some(shared_g4) = self.shared_g4.clone() else {
tracing::warn!("kvbm-offload: get_blocks called without shared G4 store");
return Box::pin(async move { keys.into_iter().map(Err).collect() });
};
let present: Vec<bool> = keys
.iter()
.map(|key| shared_g4.has_object(key).is_some())
.collect();
let present_count = present.iter().filter(|found| **found).count();
if present_count == 0 {
return Box::pin(async move { keys.into_iter().map(Err).collect() });
}
let notification =
match self.reserve_transfer(TransferDirection::G4ToG2, self.now_ms(), present_count) {
Ok(notification) => notification,
Err(error) => {
tracing::warn!(
error = %error,
"kvbm-offload: failed to reserve mock G4→G2 object get"
);
return Box::pin(async move { keys.into_iter().map(Err).collect() });
}
};
Box::pin(async move {
match notification.await {
Ok(()) => keys
.into_iter()
.zip(present)
.map(|(key, found)| if found { Ok(key) } else { Err(key) })
.collect(),
Err(error) => {
tracing::warn!(
error = %error,
"kvbm-offload: mock G4→G2 object get did not complete"
);
keys.into_iter().map(Err).collect()
}
}
})
}
fn put_blocks_with_layout(
&self,
keys: Vec<SequenceHash>,
_layout: PhysicalLayout,
_block_ids: Vec<BlockId>,
) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>> {
Box::pin(async move { keys.into_iter().map(Err).collect() })
}
fn get_blocks_with_layout(
&self,
keys: Vec<SequenceHash>,
_layout: PhysicalLayout,
_block_ids: Vec<BlockId>,
) -> BoxFuture<'static, Vec<Result<SequenceHash, SequenceHash>>> {
Box::pin(async move { keys.into_iter().map(Err).collect() })
}
}
#[cfg(test)]
mod tests {
use super::super::config::KvbmOffloadConfig;
use super::super::shared_g3::shared_g3_test_guard;
use super::super::shared_g4::{SharedG4Store, shared_g4_test_guard};
use super::*;
const EPS: f64 = 1e-6;
fn make_worker() -> MockWorker {
MockWorker::new(1_000_000, 1.0, 1.0, None, None, None, None)
}
fn shared_g3_two_workers() -> (MockWorker, MockWorker) {
let config = KvbmOffloadConfig {
num_g3_blocks: Some(128),
block_size_bytes: Some(1_000_000),
bandwidth_g2_to_g3_gbps: 1.0,
bandwidth_g3_to_g2_gbps: 1.0,
..Default::default()
};
let shared_g3 = SharedG3Pool::get_or_create(&config).unwrap();
(
MockWorker::new(1_000_000, 1.0, 1.0, None, None, shared_g3.clone(), None),
MockWorker::new(1_000_000, 1.0, 1.0, None, None, shared_g3, None),
)
}
fn shared_g4_worker() -> (MockWorker, Arc<SharedG4Store>) {
let config = KvbmOffloadConfig {
enable_g4_storage: true,
block_size_bytes: Some(1_000_000),
bandwidth_g2_to_g4_gbps: 1.0,
bandwidth_g4_to_g2_gbps: 1.0,
..Default::default()
};
let shared_g4 = SharedG4Store::get_or_create(&config)
.unwrap()
.expect("G4 enabled");
(
MockWorker::new(
1_000_000,
1.0,
1.0,
None,
None,
None,
Some(shared_g4.clone()),
),
shared_g4,
)
}
fn shared_g4_two_workers() -> (MockWorker, MockWorker, Arc<SharedG4Store>) {
let config = KvbmOffloadConfig {
enable_g4_storage: true,
block_size_bytes: Some(1_000_000),
bandwidth_g2_to_g4_gbps: 1.0,
bandwidth_g4_to_g2_gbps: 1.0,
..Default::default()
};
let shared_g4 = SharedG4Store::get_or_create(&config)
.unwrap()
.expect("G4 enabled");
(
MockWorker::new(
1_000_000,
1.0,
1.0,
None,
None,
None,
Some(shared_g4.clone()),
),
MockWorker::new(
1_000_000,
1.0,
1.0,
None,
None,
None,
Some(shared_g4.clone()),
),
shared_g4,
)
}
#[tokio::test]
async fn mock_worker_single_transfer_completes_on_tick() {
let worker = make_worker();
worker.set_now_ms(0.0);
let src_ids: Arc<[BlockId]> = Arc::from(vec![0usize]);
let dst_ids: Arc<[BlockId]> = Arc::from(vec![0usize]);
let notification = worker
.execute_local_transfer(
LogicalLayoutHandle::G1,
LogicalLayoutHandle::G2,
src_ids,
dst_ids,
TransferOptions::default(),
)
.expect("reservation should succeed");
assert!(notification.could_yield());
worker.drain_completions(1.0);
notification
.await
.expect("transfer notification should resolve Ok after drain");
}
#[tokio::test]
async fn mock_worker_two_concurrent_transfers_complete_at_2x() {
let worker = make_worker();
worker.set_now_ms(0.0);
let mk_ids = || -> Arc<[BlockId]> { Arc::from(vec![0usize]) };
let n1 = worker
.execute_local_transfer(
LogicalLayoutHandle::G1,
LogicalLayoutHandle::G2,
mk_ids(),
mk_ids(),
TransferOptions::default(),
)
.unwrap();
let n2 = worker
.execute_local_transfer(
LogicalLayoutHandle::G1,
LogicalLayoutHandle::G2,
mk_ids(),
mk_ids(),
TransferOptions::default(),
)
.unwrap();
worker.drain_completions(1.0);
assert!(n1.could_yield());
assert!(n2.could_yield());
worker.drain_completions(2.0);
n1.await.expect("n1 should resolve Ok");
n2.await.expect("n2 should resolve Ok");
}
#[tokio::test]
async fn mock_worker_rejects_unsupported_directions() {
let worker = make_worker();
worker.set_now_ms(0.0);
let ids: Arc<[BlockId]> = Arc::from(vec![0usize]);
let result = worker.execute_local_transfer(
LogicalLayoutHandle::G1,
LogicalLayoutHandle::G3,
ids.clone(),
ids,
TransferOptions::default(),
);
let err = match result {
Ok(_) => panic!("G1→G3 must be rejected"),
Err(e) => e,
};
let msg = err.to_string();
assert!(msg.contains("G2↔G3"), "unexpected error: {msg}");
}
#[tokio::test]
async fn mock_worker_g2_to_g3_bandwidth_is_shared_across_workers() {
let _guard = shared_g3_test_guard().await;
let (worker_a, worker_b) = shared_g3_two_workers();
let ids = || -> Arc<[BlockId]> { Arc::from(vec![0usize]) };
worker_a.set_now_ms(0.0);
worker_b.set_now_ms(0.0);
let a = worker_a
.execute_local_transfer(
LogicalLayoutHandle::G2,
LogicalLayoutHandle::G3,
ids(),
ids(),
TransferOptions::default(),
)
.unwrap();
let b = worker_b
.execute_local_transfer(
LogicalLayoutHandle::G2,
LogicalLayoutHandle::G3,
ids(),
ids(),
TransferOptions::default(),
)
.unwrap();
let early = worker_a.drain_completions_summary(1.0).shared_g3.counts;
assert_eq!(early.offload_blocks, 0);
assert!(a.could_yield());
assert!(b.could_yield());
let b_drained = worker_b.drain_completions_summary(2.0).shared_g3;
assert_eq!(b_drained.offload_registration_baseline, Some(0));
let b_drained = b_drained.counts;
assert_eq!(b_drained.offload_blocks, 1);
let a_drained = worker_a.drain_completions_summary(2.0).shared_g3;
assert_eq!(a_drained.offload_registration_baseline, Some(0));
let a_drained = a_drained.counts;
assert_eq!(a_drained.offload_blocks, 1);
a.await
.expect("worker A G2→G3 should complete at shared PS 2x");
b.await
.expect("worker B G2→G3 should complete at shared PS 2x");
}
#[tokio::test]
async fn mock_worker_shared_prereservation_drain_preserves_owner_accounting() {
let _guard = shared_g3_test_guard().await;
let (worker_a, worker_b) = shared_g3_two_workers();
let ids = || -> Arc<[BlockId]> { Arc::from(vec![0usize]) };
worker_a.set_now_ms(0.0);
let a = worker_a
.execute_local_transfer(
LogicalLayoutHandle::G2,
LogicalLayoutHandle::G3,
ids(),
ids(),
TransferOptions::default(),
)
.unwrap();
worker_b.set_now_ms(1.0);
let b = worker_b
.execute_local_transfer(
LogicalLayoutHandle::G2,
LogicalLayoutHandle::G3,
ids(),
ids(),
TransferOptions::default(),
)
.unwrap();
let a_drained = worker_a.drain_completions_summary(1.0).shared_g3;
assert_eq!(a_drained.counts.offload_blocks, 1);
assert_eq!(a_drained.offload_registration_baseline, Some(0));
a.await
.expect("worker A G2→G3 should complete during worker B reservation");
let b_drained = worker_b.drain_completions_summary(2.0).shared_g3;
assert_eq!(b_drained.counts.offload_blocks, 1);
b.await
.expect("worker B G2→G3 should complete after its own reservation");
}
#[tokio::test]
async fn mock_worker_shared_prereservation_drain_marks_deferred_onboard_blocks() {
let _guard = shared_g3_test_guard().await;
let (worker_a, worker_b) = shared_g3_two_workers();
let ids = || -> Arc<[BlockId]> { Arc::from(vec![0usize]) };
worker_a.set_now_ms(0.0);
let a = worker_a
.execute_local_transfer(
LogicalLayoutHandle::G3,
LogicalLayoutHandle::G2,
ids(),
ids(),
TransferOptions::default(),
)
.unwrap();
worker_b.set_now_ms(1.0);
let b = worker_b
.execute_local_transfer(
LogicalLayoutHandle::G2,
LogicalLayoutHandle::G3,
ids(),
ids(),
TransferOptions::default(),
)
.unwrap();
let a_drained = worker_a.drain_completions_summary(1.0).shared_g3;
assert_eq!(a_drained.counts.onboard_blocks, 1);
assert_eq!(a_drained.deferred_onboard_blocks, 1);
a.await
.expect("worker A G3→G2 should complete during worker B reservation");
let b_drained = worker_b.drain_completions_summary(2.0).shared_g3;
assert_eq!(b_drained.counts.offload_blocks, 1);
b.await
.expect("worker B G2→G3 should complete after its own reservation");
}
#[tokio::test]
async fn mock_worker_object_put_updates_shared_g4_after_transfer() {
use dynamo_tokens::PositionalLineageHash;
let _guard = shared_g4_test_guard().await;
let (worker, shared_g4) = shared_g4_worker();
let plh = PositionalLineageHash::new(6_000, None, 0);
worker.set_now_ms(0.0);
let put = worker.put_blocks(vec![plh], LogicalLayoutHandle::G2, vec![0]);
assert_eq!(shared_g4.has_object(&plh), None);
let early = worker.drain_completions_summary(0.5).shared_g4.counts;
assert_eq!(early.offload_blocks, 0);
assert_eq!(shared_g4.has_object(&plh), None);
let drained = worker.drain_completions_summary(1.0).shared_g4.counts;
assert_eq!(drained.offload_blocks, 1);
assert_eq!(shared_g4.has_object(&plh), Some(1_000_000));
let results = put.await;
assert_eq!(results, vec![Ok(plh)]);
assert_eq!(shared_g4.has_object(&plh), Some(1_000_000));
}
#[tokio::test]
async fn mock_worker_object_put_visible_when_another_worker_drains_shared_g4() {
use dynamo_tokens::PositionalLineageHash;
let _guard = shared_g4_test_guard().await;
let (worker_a, worker_b, shared_g4) = shared_g4_two_workers();
let plh = PositionalLineageHash::new(6_050, None, 0);
worker_a.set_now_ms(0.0);
let put = worker_a.put_blocks(vec![plh], LogicalLayoutHandle::G2, vec![0]);
assert_eq!(shared_g4.has_object(&plh), None);
let b_drained = worker_b.drain_completions_summary(1.0).shared_g4.counts;
assert_eq!(
b_drained.offload_blocks, 0,
"worker B should not consume worker A's owner accounting"
);
assert_eq!(
shared_g4.has_object(&plh),
Some(1_000_000),
"shared G4 completion should publish the object even when another worker drains"
);
let results = put.await;
assert_eq!(results, vec![Ok(plh)]);
}
#[tokio::test]
async fn mock_worker_object_get_charges_present_g4_blocks_only() {
use dynamo_tokens::PositionalLineageHash;
let _guard = shared_g4_test_guard().await;
let (worker, shared_g4) = shared_g4_worker();
let present = PositionalLineageHash::new(6_100, None, 0);
let missing = PositionalLineageHash::new(6_101, None, 1);
shared_g4.insert_object(present, 1_000_000);
worker.set_now_ms(0.0);
let get = worker.get_blocks(vec![present, missing], LogicalLayoutHandle::G2, vec![0, 1]);
let drained = worker.drain_completions_summary(1.0).shared_g4.counts;
assert_eq!(drained.onboard_blocks, 1);
let results = get.await;
assert_eq!(results, vec![Ok(present), Err(missing)]);
}
#[tokio::test]
async fn mock_worker_offload_and_swap_in_share_id_keyspace() {
use std::sync::atomic::AtomicBool;
let worker = make_worker();
worker.set_now_ms(0.0);
let swap_id = worker.reserve_swap_in(0.0, 1, Arc::new(AtomicBool::new(false)));
let ids: Arc<[BlockId]> = Arc::from(vec![0usize]);
let _offload = worker
.execute_local_transfer(
LogicalLayoutHandle::G1,
LogicalLayoutHandle::G2,
ids.clone(),
ids,
TransferOptions::default(),
)
.unwrap();
let state = worker.state.lock().unwrap();
let awaiter_id = *state
.awaiters
.keys()
.next()
.expect("offload must register an awaiter");
assert_ne!(
awaiter_id, swap_id,
"offload and swap-in must draw distinct TransferIds"
);
}
#[tokio::test]
async fn mock_worker_swap_in_flag_flips_on_drain() {
use std::sync::atomic::{AtomicBool, Ordering};
let worker = make_worker();
worker.set_now_ms(0.0);
let complete = Arc::new(AtomicBool::new(false));
let _id = worker.reserve_swap_in(0.0, 1, complete.clone());
assert!(!complete.load(Ordering::Acquire));
worker.drain_completions(0.5);
assert!(
!complete.load(Ordering::Acquire),
"swap-in must not complete before its finish time"
);
worker.drain_completions(1.0);
assert!(
complete.load(Ordering::Acquire),
"swap-in flag must flip after drain past finish time"
);
}
#[tokio::test]
async fn mock_worker_earliest_finish_min_of_both_links() {
let worker = make_worker();
worker.set_now_ms(0.0);
let ids: Arc<[BlockId]> = Arc::from(vec![0usize]);
let _n1 = worker
.execute_local_transfer(
LogicalLayoutHandle::G1,
LogicalLayoutHandle::G2,
ids.clone(),
ids.clone(),
TransferOptions::default(),
)
.unwrap();
let _n2 = worker
.execute_local_transfer(
LogicalLayoutHandle::G2,
LogicalLayoutHandle::G1,
ids.clone(),
ids,
TransferOptions::default(),
)
.unwrap();
let earliest = worker.earliest_finish().unwrap();
assert!(
(earliest - 1.0).abs() < EPS,
"expected 1.0 ms, got {earliest}"
);
}
}