1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use anyhow::Result;
use tokio_util::sync::CancellationToken;
use dynamo_kv_router::PrefillLoadEstimator;
use dynamo_runtime::{
pipeline::{
AsyncEngineContextProvider, Context, ManyOut, Operator, RouterMode, ServerStreamingEngine,
SingleIn, async_trait,
},
protocols::{EndpointId, annotated::Annotated},
};
use crate::{
discovery::ModelManager,
protocols::common::{
llm_backend::{LLMEngineOutput, PreprocessedRequest},
timing::{RequestPhase, RequestTracker},
},
};
mod activation;
mod execution;
mod inner;
mod types;
use inner::InnerPrefillRouter;
pub use types::PrefillError;
use types::{PrefillOutcome, PrefillResolveDecision, build_decode_router_override};
/// PrefillRouter is a forward-only operator that sits between Migration and the decode router.
/// It optionally calls a prefill worker before routing to decode, extracting disaggregated_params
/// from the prefill response and injecting them into the decode request.
///
/// Modes:
/// - Query-only: `query_instance_id` annotation present → returns worker IDs without execution
/// - Pre-routed: `prefill_worker_id`/`decode_worker_id` set → routes to specified workers
/// - Normal: Worker IDs determined by router based on KV cache state
pub struct PrefillRouter {
prefill_router: OnceLock<InnerPrefillRouter>,
model_manager: Arc<ModelManager>,
endpoint_id: OnceLock<EndpointId>,
cancel_token: CancellationToken,
router_mode: RouterMode,
enforce_disagg: bool,
prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
/// Model name used to look up the worker monitor for prefill client registration
model_name: String,
/// Namespace used to look up the correct WorkerSet's worker monitor
namespace: String,
is_eagle: bool,
/// Set to true when all prefill workers die. Checked in generate() to prevent
/// routing to dead workers. Cleared on reactivation when workers rejoin.
deactivated: AtomicBool,
/// Set to true when the prefill router has been activated (inner router populated).
/// Used by `can_serve_requests()` to gate enforce_disagg readiness so a cold-started
/// strict-disagg model isn't listed before the prefill has rendezvoused.
activated: AtomicBool,
}
impl Drop for PrefillRouter {
fn drop(&mut self) {
tracing::debug!("Dropping PrefillRouter, cancelling background activation task");
self.cancel_token.cancel();
}
}
#[async_trait]
impl
Operator<
SingleIn<PreprocessedRequest>,
ManyOut<Annotated<LLMEngineOutput>>,
SingleIn<PreprocessedRequest>,
ManyOut<Annotated<LLMEngineOutput>>,
> for PrefillRouter
{
async fn generate(
&self,
request: SingleIn<PreprocessedRequest>,
next: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>> {
// Extract request data while preserving context
let (mut req, context) = request.into_parts();
let request_id = context.id().to_string();
let engine_ctx = context.context();
// Save original max_tokens for decode
let original_max_tokens = req.stop_conditions.max_tokens;
// If prefill router is not activated (no prefill workers discovered) or has been
// deactivated (all prefill workers died), this is aggregated mode -- route directly
// to decode. With --enforce-disagg, fail instead of falling back.
if self.prefill_router.get().is_none() || self.deactivated.load(Ordering::Relaxed) {
if self.enforce_disagg {
return Err(anyhow::anyhow!(PrefillError::NotActivated));
}
return next.generate(context.map(|_| req)).await;
}
// Ensure tracker exists for routing decisions in disaggregated mode.
// Create one if not provided by the upstream DeltaGenerator.
if req.tracker.is_none() {
req.tracker = Some(Arc::new(RequestTracker::new()));
}
let tracker = req.tracker.as_ref().unwrap();
let prefill_phase_barrier = tracker.set_phase(RequestPhase::Prefill).await;
// Prepare prefill request with max_tokens = 1 (clone after tracker is set)
let mut prefill_req = req.clone();
prefill_req.stop_conditions.max_tokens = Some(1);
// Try to resolve prefill worker upfront: if we can get bootstrap info early,
// spawn prefill in background and proceed to decode immediately.
let preselected_worker = prefill_req
.routing
.as_ref()
.and_then(|r| r.prefill_worker_id);
if self.router_mode.is_direct_routing() && preselected_worker.is_none() {
return Err(anyhow::anyhow!(
"Prefill worker ID required in Direct routing mode but none found in request. \
Expected prefill_worker_id to be set via x-prefill-instance-id header by external router (e.g., EPP)."
));
}
let prefill_result = match self
.resolve_prefill_worker(&prefill_req, preselected_worker)
.await
{
PrefillResolveDecision::Resolved {
worker_id,
dp_rank,
bootstrap_info,
} => {
// Bootstrap optimization path: spawn prefill in background
// We successfully used the peeked worker, so we must now advance the router state
// to ensure the next request gets a different worker.
if !self.router_mode.is_kv_routing()
&& let Some(router) = self.prefill_router.get()
{
router.select_next_worker();
}
let routing = prefill_req.routing_mut();
routing.prefill_worker_id = Some(worker_id);
routing.dp_rank = dp_rank;
prefill_req.bootstrap_info = Some(bootstrap_info.clone());
// NVBugs 5969206: Do NOT link prefill as child of engine context.
// Kill propagation tears down the RPC transport, interrupting NIXL
// KV cache transfers and leaking blocks permanently. The prefill
// runs to completion independently; blocks are freed via the normal
// completion path (state 21→22).
// NOTE: This means prefill runs to completion even if the client
// disconnects, wasting prefill compute. This is an accepted
// trade-off (wasted compute vs permanent KV block leak). Future
// work: add NIXL-level cancellation that properly frees blocks.
let prefill_context = Context::with_id(prefill_req, request_id.clone());
// Pass the phase barrier to the spawned task. It is released after routing
// completes so worker recording finishes before phase changes to Decode.
self.spawn_prefill_task(prefill_context, Some(worker_id), prefill_phase_barrier);
Ok(PrefillOutcome::Bootstrap(bootstrap_info))
}
PrefillResolveDecision::Unavailable
| PrefillResolveDecision::NotActivated
| PrefillResolveDecision::NoBootstrapEndpoint => {
// Original prefill path: wait for prefill to complete
tracing::debug!("Using original prefill path");
// Drop the phase barrier because we wait for prefill completion in this task,
// so there is no race with set_phase(Decode) below.
drop(prefill_phase_barrier);
// NVBugs 5969206: Do NOT link prefill as child (same rationale as bootstrap path).
let prefill_context = Context::with_id(prefill_req, request_id.clone());
// In Direct mode, pass preselected_worker so execute_prefill uses
// router.direct() instead of router.generate() (which bails in Direct mode).
let (result, _worker_info) = Self::execute_prefill(
self.prefill_router.get().cloned(),
prefill_context,
preselected_worker,
None,
)
.await?;
Ok(PrefillOutcome::Completed(result))
}
};
// NVBugs 5969206: Do NOT abort decode routing when context is killed.
// In disaggregated serving, the prefill may have completed and KV transfer
// is in flight. Blocking decode here orphans the transfer (no receiver)
// and leaks KV blocks permanently. The decode handler's
// kv_transfer_complete_event guard will clean up after KV is received.
// Log-only; decode routing must proceed for KV transfer cleanup.
if engine_ctx.is_stopped() || engine_ctx.is_killed() {
tracing::debug!(
"Context {} killed/stopped after prefill, allowing decode routing for KV transfer",
engine_ctx.id()
);
}
// Handle prefill result
match prefill_result {
Ok(outcome) => {
tracing::debug!("Prefill completed, proceeding to decode");
// Set phase to Decode for the decode request.
// In bootstrap path, this blocks until the spawned prefill task releases its
// phase barrier after routing completes, ensuring correct worker attribution.
if let Some(ref tracker) = req.tracker {
let _decode_permit = tracker.set_phase(RequestPhase::Decode).await;
// Permit is dropped immediately - decode proceeds, no need to hold it
}
let mut decode_req = req;
match outcome {
PrefillOutcome::Bootstrap(info) => {
decode_req.bootstrap_info = Some(info);
}
PrefillOutcome::Completed(result) => {
decode_req.prefill_result = Some(result);
}
}
// Restore original max_tokens for decode
decode_req.stop_conditions.max_tokens = original_max_tokens;
// Set router_config_override for decode:
// - overlap_score_weight = 0 (no KV cache overlap scoring for decode)
// - assume_kv_reuse = false (generate random hashes since decode workers
// may already have blocks cached from prefill transfer)
// - track_prefill_tokens = false (decode router should ignore prompt-side load)
let existing_override = decode_req.router_config_override.take();
decode_req.router_config_override =
Some(build_decode_router_override(existing_override));
// Map the modified request through with preserved context
let decode_request = context.map(|_| decode_req);
next.generate(decode_request).await
}
Err(PrefillError::NotActivated) => {
tracing::error!("Prefill router not activated, failing request");
Err(anyhow::anyhow!(PrefillError::NotActivated))
}
Err(e) => {
tracing::error!(error = %e, "Remote prefill failed, failing request");
Err(anyhow::anyhow!(e))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use dynamo_kv_router::config::RouterConfigOverride;
#[test]
fn decode_router_override_disables_overlap_and_prefill_tracking() {
let override_config = build_decode_router_override(Some(RouterConfigOverride {
router_temperature: Some(0.7),
..Default::default()
}));
assert_eq!(override_config.overlap_score_weight, Some(0.0));
assert_eq!(override_config.assume_kv_reuse, Some(false));
assert_eq!(override_config.track_prefill_tokens, Some(false));
assert_eq!(override_config.router_temperature, Some(0.7));
}
// -- Prefill death handling tests --
/// Helper: create a disabled PrefillRouter for testing deactivation behavior.
fn make_test_router(enforce_disagg: bool) -> Arc<PrefillRouter> {
PrefillRouter::disabled(
Arc::new(crate::discovery::ModelManager::new()),
RouterMode::RoundRobin,
enforce_disagg,
)
}
#[test]
fn test_deactivated_flag_blocks_when_enforce_disagg() {
let router = make_test_router(true);
// Not activated, so enforce_disagg blocks even before deactivation
assert!(
!router.can_serve_requests(),
"enforce_disagg must block before prefill activation"
);
router.deactivate();
assert!(router.is_deactivated());
assert!(
!router.can_serve_requests(),
"deactivated + enforce_disagg must block"
);
}
#[test]
fn test_deactivated_flag_allows_fallback_no_enforce() {
let router = make_test_router(false);
router.deactivate();
assert!(router.is_deactivated());
assert!(
router.can_serve_requests(),
"deactivated + !enforce_disagg must allow fallback"
);
}
#[test]
fn test_reactivate_clears_deactivated_no_enforce() {
let router = make_test_router(false);
router.deactivate();
// !enforce_disagg allows fallback even while deactivated
assert!(router.can_serve_requests());
router.reactivate();
assert!(!router.is_deactivated());
assert!(
router.can_serve_requests(),
"reactivated non-enforce router must serve requests"
);
}
#[test]
fn test_reactivate_clears_deactivated_enforce_needs_activation() {
// disabled() never sets the activated flag, so enforce_disagg stays blocked.
// In a real deployment, activate() sets the flag before the first
// deactivate/reactivate cycle, so this only exercises the flag reset.
let router = make_test_router(true);
router.deactivate();
assert!(!router.can_serve_requests());
router.reactivate();
assert!(!router.is_deactivated());
assert!(
!router.can_serve_requests(),
"enforce_disagg without activation still can't serve"
);
}
#[test]
fn test_fresh_router_not_deactivated() {
let router = make_test_router(true);
assert!(!router.is_deactivated());
// enforce_disagg + no prefill activation => not servable
assert!(!router.can_serve_requests());
}
#[test]
fn test_fresh_router_no_enforce_disagg_can_serve() {
let router = make_test_router(false);
assert!(!router.is_deactivated());
assert!(
router.can_serve_requests(),
"non-enforce_disagg router must be servable even without prefill activation"
);
}
#[test]
fn test_deactivate_is_idempotent() {
let router = make_test_router(true);
router.deactivate();
router.deactivate();
assert!(router.is_deactivated());
assert!(!router.can_serve_requests());
}
}