use std::sync::Arc;
use anyhow::Result;
use dynamo_kv_router::protocols::WorkerWithDpRank;
use dynamo_runtime::{
pipeline::{AsyncEngine, ManyOut, PushRouter, SingleIn},
protocols::annotated::Annotated,
};
use crate::{
kv_router::KvPushRouter,
protocols::common::{
llm_backend::{LLMEngineOutput, PreprocessedRequest},
timing::RequestPhase,
},
};
#[derive(Clone)]
pub(super) enum InnerPrefillRouter {
KvRouter(Arc<KvPushRouter>),
SimpleRouter(Arc<PushRouter<PreprocessedRequest, Annotated<LLMEngineOutput>>>),
}
impl InnerPrefillRouter {
pub(super) async fn generate_to_worker(
&self,
request: SingleIn<PreprocessedRequest>,
target_worker: Option<u64>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>> {
match (self, target_worker) {
(InnerPrefillRouter::KvRouter(router), _) => router.generate(request).await,
(InnerPrefillRouter::SimpleRouter(router), Some(worker_id)) => {
router.direct(request, worker_id).await
}
(InnerPrefillRouter::SimpleRouter(router), None) => router.generate(request).await,
}
}
pub(super) fn select_next_worker(&self) -> Option<u64> {
match self {
InnerPrefillRouter::SimpleRouter(router) => router.select_next_worker(),
InnerPrefillRouter::KvRouter(_) => None,
}
}
pub(super) fn sticky_worker_for_prefill(
&self,
request: &PreprocessedRequest,
) -> Option<WorkerWithDpRank> {
match self {
InnerPrefillRouter::KvRouter(router) => router
.sticky
.worker_for_phase(request, RequestPhase::Prefill),
InnerPrefillRouter::SimpleRouter(_) => None,
}
}
pub(super) async fn validate_sticky_prefill_worker(
&self,
context_id: &str,
request: &PreprocessedRequest,
worker: WorkerWithDpRank,
) -> Result<WorkerWithDpRank> {
match self {
InnerPrefillRouter::KvRouter(router) => Ok(router
.validate_sticky_worker_for_phase(
context_id,
request,
RequestPhase::Prefill,
worker,
)
.await?),
InnerPrefillRouter::SimpleRouter(_) => Ok(worker),
}
}
pub(super) fn unbind_ineligible_sticky_prefill_worker(
&self,
context_id: &str,
request: &PreprocessedRequest,
worker: WorkerWithDpRank,
) -> bool {
match self {
InnerPrefillRouter::KvRouter(router) => router
.unbind_ineligible_sticky_worker_for_phase(
context_id,
request,
RequestPhase::Prefill,
worker,
),
InnerPrefillRouter::SimpleRouter(_) => false,
}
}
pub(super) fn refresh_sticky_prefill_worker(&self, request: &PreprocessedRequest) {
if let InnerPrefillRouter::KvRouter(router) = self {
router
.sticky
.refresh_worker_for_phase(request, RequestPhase::Prefill);
}
}
}