use std::pin::Pin;
use crate::{
backend::{Backend, ExecutionContext},
discovery::{KvWorkerMonitor, ModelManager, ModelWatcher},
engines::StreamingEngineAdapter,
entrypoint::{EngineConfig, RouterConfig},
http::service::metrics::Metrics,
kv_router::{
DirectRoutingRouter, KvPushRouter, KvRouter, PrefillRouter, metrics::RouterRequestMetrics,
},
migration::Migration,
model_card::ModelDeploymentCard,
namespace::NamespaceFilter,
preprocessor::{OpenAIPreprocessor, prompt::PromptFormatter},
protocols::common::llm_backend::{BackendOutput, LLMEngineOutput, PreprocessedRequest},
request_template::RequestTemplate,
types::{
Annotated,
openai::chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
OpenAIChatCompletionsStreamingEngine,
},
},
};
use anyhow::Context as _;
use dynamo_runtime::{
DistributedRuntime,
component::Client,
engine::{AsyncEngineStream, Data},
pipeline::{
Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
ServiceEngine, ServiceFrontend, SingleIn, Source,
},
};
use std::sync::Arc;
pub struct PreparedEngine {
pub service_name: String,
pub engine: OpenAIChatCompletionsStreamingEngine,
pub inspect_template: bool,
pub card: Option<ModelDeploymentCard>,
pub request_template: Option<RequestTemplate>,
}
impl PreparedEngine {
pub fn has_tokenizer(&self) -> bool {
if let Some(card) = self.card.as_ref() {
card.has_tokenizer()
} else {
false
}
}
}
pub async fn prepare_engine(
distributed_runtime: DistributedRuntime,
engine_config: EngineConfig,
) -> anyhow::Result<PreparedEngine> {
match engine_config {
EngineConfig::Dynamic {
model: local_model, ..
} => {
let model_manager = Arc::new(ModelManager::new());
let metrics = Arc::new(Metrics::new());
let watch_obj = Arc::new(ModelWatcher::new(
distributed_runtime.clone(),
model_manager.clone(),
RouterConfig::default(),
local_model.migration_limit(),
None,
metrics,
));
let discovery = distributed_runtime.discovery();
let discovery_stream = discovery
.list_and_watch(
dynamo_runtime::discovery::DiscoveryQuery::AllModels,
Some(distributed_runtime.primary_token().clone()),
)
.await?;
let inner_watch_obj = watch_obj.clone();
let namespace_filter = NamespaceFilter::from_namespace_and_prefix(
local_model.namespace(),
local_model.namespace_prefix(),
);
let _watcher_task = tokio::spawn(async move {
inner_watch_obj
.watch(discovery_stream, namespace_filter)
.await;
});
tracing::info!("Waiting for remote model..");
let model_service_name = watch_obj.wait_for_chat_model().await;
tracing::info!("Connected to {model_service_name}");
let engine = model_manager.get_chat_completions_engine(&model_service_name)?;
Ok(PreparedEngine {
service_name: model_service_name,
engine,
inspect_template: false,
card: None,
request_template: local_model.request_template(),
})
}
EngineConfig::InProcessText { engine, model, .. } => {
let service_name = model.service_name().to_string();
tracing::debug!("Model: {service_name} with engine pre-processing");
let engine = Arc::new(StreamingEngineAdapter::new(engine));
Ok(PreparedEngine {
service_name,
engine,
inspect_template: false,
request_template: model.request_template(),
card: Some(model.into_card()),
})
}
EngineConfig::InProcessTokens {
engine: inner_engine,
model,
..
} => {
let pipeline = build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(model.card(), inner_engine, model.card().tokenizer()?)
.await?;
let service_name = model.service_name().to_string();
tracing::debug!("Model: {service_name} with Dynamo pre-processing");
Ok(PreparedEngine {
service_name,
engine: pipeline,
inspect_template: true,
request_template: model.request_template(),
card: Some(model.into_card()),
})
}
}
}
pub async fn build_pipeline<Req, Resp>(
card: &ModelDeploymentCard,
engine: ExecutionContext,
tokenizer: crate::tokenizers::Tokenizer,
) -> anyhow::Result<Arc<ServiceFrontend<SingleIn<Req>, ManyOut<Annotated<Resp>>>>>
where
Req: Data,
Resp: Data,
OpenAIPreprocessor: Operator<
Context<Req>,
Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
Context<PreprocessedRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
>,
{
let frontend = ServiceFrontend::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
let preprocessor =
OpenAIPreprocessor::new_with_parts(card.clone(), formatter, tokenizer.clone())?
.into_operator();
let backend = Backend::from_tokenizer(tokenizer).into_operator();
let engine = ServiceBackend::from_engine(engine);
Ok(frontend
.link(preprocessor.forward_edge())?
.link(backend.forward_edge())?
.link(engine)?
.link(backend.backward_edge())?
.link(preprocessor.backward_edge())?
.link(frontend)?)
}
#[allow(clippy::too_many_arguments)]
pub async fn build_routed_pipeline<Req, Resp>(
card: &ModelDeploymentCard,
client: &Client,
model_manager: Arc<crate::discovery::ModelManager>,
router_mode: RouterMode,
worker_monitor: Option<KvWorkerMonitor>,
chooser: Option<Arc<KvRouter>>,
tokenizer: crate::tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>,
decode_fallback: bool,
migration_limit: u32,
metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
where
Req: Data,
Resp: Data,
OpenAIPreprocessor: Operator<
Context<Req>,
Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
Context<PreprocessedRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
>,
{
let PromptFormatter::OAI(formatter) =
PromptFormatter::from_mdc(card).context("PromptFormatter.from_mdc")?;
let preprocessor =
OpenAIPreprocessor::new_with_parts(card.clone(), formatter, tokenizer.clone())
.context("OpenAIPreprocessor.new_with_parts")?;
build_routed_pipeline_with_preprocessor(
card,
client,
model_manager,
router_mode,
worker_monitor,
chooser,
preprocessor,
tokenizer,
prefill_chooser,
decode_fallback,
migration_limit,
metrics,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
card: &ModelDeploymentCard,
client: &Client,
model_manager: Arc<crate::discovery::ModelManager>,
router_mode: RouterMode,
worker_monitor: Option<KvWorkerMonitor>,
chooser: Option<Arc<KvRouter>>,
preprocessor: Arc<OpenAIPreprocessor>,
tokenizer: crate::tokenizers::Tokenizer,
prefill_chooser: Option<Arc<PrefillRouter>>,
decode_fallback: bool,
migration_limit: u32,
metrics: Arc<Metrics>,
) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
where
Req: Data,
Resp: Data,
OpenAIPreprocessor: Operator<
Context<Req>,
Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
Context<PreprocessedRequest>,
Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
>,
{
let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
let preprocessor_op = preprocessor.into_operator();
let backend = Backend::from_tokenizer(tokenizer).into_operator();
let migration = Migration::from_mdc(card, migration_limit, metrics).into_operator();
let router_client = if router_mode == RouterMode::KV {
let Some(ref chooser) = chooser else {
anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
};
chooser.client().clone()
} else {
client.clone()
};
let threshold_value = worker_monitor
.as_ref()
.map(|m| m.active_decode_blocks_threshold());
let monitor_arc =
worker_monitor.map(|m| Arc::new(m) as Arc<dyn dynamo_runtime::pipeline::WorkerLoadMonitor>);
let router =
PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold(
router_client,
router_mode,
threshold_value,
monitor_arc,
)
.await?;
RouterRequestMetrics::from_component(client.endpoint.component());
let service_backend = match router_mode {
RouterMode::Direct => {
ServiceBackend::from_engine(Arc::new(DirectRoutingRouter::new(router)))
}
RouterMode::Random | RouterMode::RoundRobin => {
ServiceBackend::from_engine(Arc::new(router))
}
RouterMode::KV => {
let Some(chooser) = chooser else {
anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
};
ServiceBackend::from_engine(Arc::new(KvPushRouter::new(router, chooser)))
}
};
let prefill_chooser = prefill_chooser
.unwrap_or_else(|| PrefillRouter::disabled(model_manager, router_mode, decode_fallback));
let prefill_op = prefill_chooser.into_operator();
let engine = frontend
.link(preprocessor_op.forward_edge())?
.link(migration.forward_edge())?
.link(backend.forward_edge())?
.link(prefill_op.forward_edge())?
.link(service_backend)?
.link(prefill_op.backward_edge())?
.link(backend.backward_edge())?
.link(migration.backward_edge())?
.link(preprocessor_op.backward_edge())?
.link(frontend)?;
Ok(engine)
}