1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use super::events::EventManager;
use super::*;
use dynamo_runtime::config::environment_names::kvbm::cpu_cache as env_cpu_cache;
use dynamo_runtime::config::environment_names::kvbm::disk_cache as env_disk_cache;
use prometheus::Registry;
#[derive(Debug, Clone)]
pub enum NixlOptions {
/// Enable NIXL and create a new NIXL agent
Enabled,
/// Enable NIXL and use the provided NIXL agent
EnabledWithAgent(NixlAgent),
/// Disable NIXL
Disabled,
}
#[derive(Debug, Clone, Builder, Validate)]
#[builder(pattern = "owned")]
pub struct KvManagerRuntimeConfig {
pub worker_id: u64,
#[builder(default)]
pub cancellation_token: CancellationToken,
#[builder(default = "NixlOptions::Enabled")]
pub nixl: NixlOptions,
#[builder(default)]
pub async_runtime: Option<Arc<tokio::runtime::Runtime>>,
#[builder(default = "Arc::new(Registry::new())")]
pub metrics_registry: Arc<Registry>,
}
impl KvManagerRuntimeConfig {
pub fn builder() -> KvManagerRuntimeConfigBuilder {
KvManagerRuntimeConfigBuilder::default()
}
}
impl KvManagerRuntimeConfigBuilder {
pub fn enable_nixl(mut self) -> Self {
self.nixl = Some(NixlOptions::Enabled);
self
}
pub fn use_nixl_agent(mut self, agent: NixlAgent) -> Self {
self.nixl = Some(NixlOptions::EnabledWithAgent(agent));
self
}
pub fn disable_nixl(mut self) -> Self {
self.nixl = Some(NixlOptions::Disabled);
self
}
}
#[derive(Debug, Clone, Builder, Validate)]
#[builder(pattern = "owned")]
pub struct KvManagerModelConfig {
#[validate(range(min = 1))]
pub num_layers: usize,
#[validate(range(min = 1, max = 2))]
pub outer_dim: usize,
#[validate(range(min = 1))]
pub page_size: usize,
#[validate(range(min = 1))]
pub inner_dim: usize,
#[builder(default = "2")]
pub dtype_width_bytes: usize,
}
impl KvManagerModelConfig {
pub fn builder() -> KvManagerModelConfigBuilder {
KvManagerModelConfigBuilder::default()
}
}
#[derive(Debug, Clone)]
pub enum BlockParallelismStrategy {
/// KV blocks are sharded across all workers.
/// This reduces the memory footprint and computational cost of each worker; however,
/// requires extra communication between workers.
LeaderWorkerSharded,
}
#[derive(Builder, Validate)]
#[builder(pattern = "owned", build_fn(validate = "Self::validate"))]
pub struct KvManagerLayoutConfig<S: Storage + NixlRegisterableStorage> {
/// The number of blocks to allocate
#[validate(range(min = 1))]
pub num_blocks: usize,
/// The type of layout to use
#[builder(default = "LayoutType::FullyContiguous")]
pub layout_type: LayoutType,
/// Storage for the blocks
/// If provided, the blocks will be allocated from the provided storage
/// Otherwise, the blocks will be allocated from
#[builder(default)]
pub storage: Option<Vec<S>>,
/// If provided, the blocks will be allocated from the provided allocator
/// This option is mutually exclusive with the `storage` option
#[builder(default, setter(custom))]
pub allocator: Option<Arc<dyn StorageAllocator<S>>>,
/// The type of block parallelism strategy to use
#[builder(default)]
pub logical: Option<BlockParallelismStrategy>,
/// The offload filter to use (if any).
/// This dictates which blocks will be offloaded to the next-lowest cache level.
#[builder(default = "None")]
pub offload_filter: Option<Arc<dyn OffloadFilter>>,
}
impl<S: Storage + NixlRegisterableStorage> KvManagerLayoutConfig<S> {
/// Create a new builder for the KvManagerLayoutConfig
pub fn builder() -> KvManagerLayoutConfigBuilder<S> {
KvManagerLayoutConfigBuilder::default()
}
}
// Implement the validation and build functions on the generated builder type
// Note: derive_builder generates KvManagerBlockConfigBuilder<S>
impl<S: Storage + NixlRegisterableStorage> KvManagerLayoutConfigBuilder<S> {
/// Custom setter for the `allocator` field
pub fn allocator(mut self, allocator: impl StorageAllocator<S> + 'static) -> Self {
self.allocator = Some(Some(Arc::new(allocator)));
self
}
// Validation function
fn validate(&self) -> Result<(), String> {
match (
self.storage.is_some(),
self.allocator.is_some(),
self.logical.is_some(),
) {
(true, false, false) | (false, true, false) | (false, false, true) => Ok(()), // XOR condition met
(false, false, false) => {
Err("Must provide either `storage` or `allocator` or `logical`.".to_string())
}
_ => Err(
"Only one selection of either `storage` and `allocator` or `logical`.".to_string(),
),
}
}
}
/// Configuration for the KvBlockManager
#[derive(Builder, Validate)]
#[builder(pattern = "owned")]
pub struct KvBlockManagerConfig {
/// Runtime configuration
///
/// This provides core runtime configuration for the KvBlockManager.
pub runtime: KvManagerRuntimeConfig,
/// Model configuration
///
/// This provides model-specific configuration for the KvBlockManager, specifically,
/// the number of layers and the size of the inner dimension which is directly related
/// to the type of attention used by the model.
///
/// Included in this configuration is also the page_size, i.e. the number of tokens that will
/// be represented in each "paged" KV block.
pub model: KvManagerModelConfig,
/// Specific configuration for the device layout
///
/// This includes the number of blocks and the layout of the data into the device memory/storage.
#[builder(default, setter(strip_option))]
pub device_layout: Option<KvManagerLayoutConfig<DeviceStorage>>,
/// Specific configuration for the host layout
///
/// This includes the number of blocks and the layout of the data into the host memory/storage.
#[builder(default, setter(strip_option))]
pub host_layout: Option<KvManagerLayoutConfig<PinnedStorage>>,
// Specific configuration for the disk layout
#[builder(default, setter(strip_option))]
pub disk_layout: Option<KvManagerLayoutConfig<DiskStorage>>,
/// Event manager to handle block related events
#[builder(default)]
pub event_manager: Option<Arc<dyn EventManager>>,
/// Channel to reset the block manager to a specific cache level
#[builder(default)]
pub block_reset_channel: Option<BlockResetChannel>,
/// Optional KVBM-level metrics for tracking offload/onboard operations
#[builder(default)]
pub kvbm_metrics: Option<crate::block_manager::metrics_kvbm::KvbmMetrics>,
/// Optional KV Event Consolidator Configuration
///
/// If provided, KVBM will create a KV Event Consolidator that deduplicates
/// KV cache events from vLLM (G1) and KVBM (G2/G3) before sending to the router.
/// This is used when `--connector kvbm` is enabled with prefix caching.
#[builder(default, setter(custom))]
pub consolidator_config:
Option<crate::block_manager::kv_consolidator::KvEventConsolidatorConfig>,
}
impl KvBlockManagerConfig {
/// Create a new builder for the KvBlockManagerConfig
pub fn builder() -> KvBlockManagerConfigBuilder {
KvBlockManagerConfigBuilder::default()
}
}
impl KvBlockManagerConfigBuilder {
/// Set the consolidator config using individual parameters
pub fn consolidator_config(
mut self,
engine_endpoint: String,
output_endpoint: Option<String>,
engine_source: crate::block_manager::kv_consolidator::EventSource,
) -> Self {
let config = match engine_source {
crate::block_manager::kv_consolidator::EventSource::Vllm => {
let output_ep = output_endpoint.expect("output_endpoint is required for vLLM");
crate::block_manager::kv_consolidator::KvEventConsolidatorConfig::new_vllm(
engine_endpoint,
output_ep,
)
}
crate::block_manager::kv_consolidator::EventSource::Trtllm => {
// output_endpoint is the ZMQ endpoint where consolidator publishes
// Worker-side publishers subscribe to this and forward to NATS
let output_ep = output_endpoint.expect(
"output_endpoint (consolidated_event_endpoint) is required for TensorRT-LLM",
);
crate::block_manager::kv_consolidator::KvEventConsolidatorConfig::new_trtllm(
engine_endpoint,
output_ep,
)
}
crate::block_manager::kv_consolidator::EventSource::Kvbm => {
// This case should never be reached - consolidator_config() is only called with
// EventSource::Vllm or EventSource::Trtllm. EventSource::Kvbm is used when KVBM
// sends events TO the consolidator (via DynamoEventManager), but KVBM is never
// the engine_source that publishes events via ZMQ that the consolidator subscribes to.
unreachable!(
"consolidator_config() should never be called with EventSource::Kvbm. \
KVBM events are sent directly to the consolidator handle, not via ZMQ."
)
}
};
// With setter(custom), the builder field is Option<Option<T>>, so we need Some(Some(...))
self.consolidator_config = Some(Some(config));
self
}
}
/// Determines if CPU memory (G2) should be bypassed for direct G1->G3 (Device->Disk) offloading.
///
/// Returns `true` if:
/// - Disk cache env vars are set (`DYN_KVBM_DISK_CACHE_GB` or `DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS`)
/// AND their values are non-zero
/// - AND CPU cache env vars are NOT set (`DYN_KVBM_CPU_CACHE_GB` or `DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS`)
/// OR their values are zero (treated as not set)
pub fn should_bypass_cpu_cache() -> bool {
let cpu_cache_gb_set = std::env::var(env_cpu_cache::DYN_KVBM_CPU_CACHE_GB)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let cpu_cache_override_set =
std::env::var(env_cpu_cache::DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS)
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let disk_cache_gb_set = std::env::var(env_disk_cache::DYN_KVBM_DISK_CACHE_GB)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let disk_cache_override_set =
std::env::var(env_disk_cache::DYN_KVBM_DISK_CACHE_OVERRIDE_NUM_BLOCKS)
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map(|v| v > 0)
.unwrap_or(false);
let cpu_cache_set = cpu_cache_gb_set || cpu_cache_override_set;
let disk_cache_set = disk_cache_gb_set || disk_cache_override_set;
disk_cache_set && !cpu_cache_set
}