dynamo_llm/entrypoint/input/
common.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::pin::Pin;
5
6use crate::{
7    backend::{Backend, ExecutionContext},
8    discovery::{MODEL_ROOT_PATH, ModelManager, ModelWatcher},
9    engines::StreamingEngineAdapter,
10    entrypoint::{self, EngineConfig},
11    kv_router::{KvPushRouter, KvRouter},
12    migration::Migration,
13    model_card::ModelDeploymentCard,
14    preprocessor::{OpenAIPreprocessor, prompt::PromptFormatter},
15    protocols::common::llm_backend::{BackendOutput, LLMEngineOutput, PreprocessedRequest},
16    request_template::RequestTemplate,
17    types::{
18        Annotated,
19        openai::chat_completions::{
20            NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
21            OpenAIChatCompletionsStreamingEngine,
22        },
23    },
24};
25
26use dynamo_runtime::{
27    DistributedRuntime, Runtime,
28    component::Client,
29    distributed::DistributedConfig,
30    engine::{AsyncEngineStream, Data},
31    pipeline::{
32        Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
33        ServiceEngine, ServiceFrontend, SingleIn, Source,
34    },
35};
36use std::sync::Arc;
37
38pub struct PreparedEngine {
39    pub service_name: String,
40    pub engine: OpenAIChatCompletionsStreamingEngine,
41    pub inspect_template: bool,
42    pub card: Option<ModelDeploymentCard>,
43    pub request_template: Option<RequestTemplate>,
44}
45
46impl PreparedEngine {
47    pub fn has_tokenizer(&self) -> bool {
48        if let Some(card) = self.card.as_ref() {
49            card.has_tokenizer()
50        } else {
51            false
52        }
53    }
54}
55
56/// Turns an EngineConfig into an OpenAI chat-completions and completions supported StreamingEngine.
57pub async fn prepare_engine(
58    runtime: Runtime,
59    engine_config: EngineConfig,
60) -> anyhow::Result<PreparedEngine> {
61    match engine_config {
62        EngineConfig::Dynamic(local_model) => {
63            let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
64
65            let Some(etcd_client) = distributed_runtime.etcd_client() else {
66                anyhow::bail!("Cannot be both static mode and run with dynamic discovery.");
67            };
68            let model_manager = Arc::new(ModelManager::new());
69            let watch_obj = Arc::new(ModelWatcher::new(
70                distributed_runtime,
71                model_manager.clone(),
72                dynamo_runtime::pipeline::RouterMode::RoundRobin,
73                None,
74                None,
75            ));
76            let models_watcher = etcd_client.kv_get_and_watch_prefix(MODEL_ROOT_PATH).await?;
77            let (_prefix, _watcher, receiver) = models_watcher.dissolve();
78
79            let inner_watch_obj = watch_obj.clone();
80            let _watcher_task = tokio::spawn(async move {
81                inner_watch_obj.watch(receiver, None).await;
82            });
83            tracing::info!("Waiting for remote model..");
84
85            // TODO: We use the first model to appear, usually we have only one
86            // We should add slash commands to text input `/model <name>` to choose,
87            // '/models` to list, and notifications when models are added / removed.
88
89            let model_service_name = watch_obj.wait_for_chat_model().await;
90            tracing::info!("Connected to {model_service_name}");
91            let engine = model_manager.get_chat_completions_engine(&model_service_name)?;
92            Ok(PreparedEngine {
93                service_name: model_service_name,
94                engine,
95                inspect_template: false,
96                card: None,
97                request_template: local_model.request_template(),
98            })
99        }
100        EngineConfig::StaticRemote(local_model) => {
101            // For now we only do ModelType.Backend
102            // For batch/text we only do Chat Completions
103
104            // The card should have been loaded at 'build' phase earlier
105            let card = local_model.card();
106            let router_mode = local_model.router_config().router_mode;
107
108            let dst_config = DistributedConfig::from_settings(true);
109            let distributed_runtime = DistributedRuntime::new(runtime, dst_config).await?;
110
111            let endpoint_id = local_model.endpoint_id();
112            let component = distributed_runtime
113                .namespace(&endpoint_id.namespace)?
114                .component(&endpoint_id.component)?;
115
116            let client = component.endpoint(&endpoint_id.name).client().await?;
117
118            let kv_chooser = if router_mode == RouterMode::KV {
119                let model_manager = Arc::new(ModelManager::new());
120                Some(
121                    model_manager
122                        .kv_chooser_for(
123                            local_model.display_name(),
124                            &component,
125                            card.kv_cache_block_size,
126                            Some(local_model.router_config().kv_router_config),
127                        )
128                        .await?,
129                )
130            } else {
131                None
132            };
133
134            let hf_tokenizer = card.tokenizer_hf()?;
135            let chat_engine = entrypoint::build_routed_pipeline::<
136                NvCreateChatCompletionRequest,
137                NvCreateChatCompletionStreamResponse,
138            >(
139                card,
140                &client,
141                router_mode,
142                None,
143                kv_chooser.clone(),
144                hf_tokenizer,
145            )
146            .await?;
147
148            let service_name = local_model.service_name().to_string();
149            tracing::info!("Static connecting to {service_name}");
150            Ok(PreparedEngine {
151                service_name,
152                engine: chat_engine,
153                inspect_template: false,
154                request_template: local_model.request_template(),
155                card: Some(local_model.into_card()),
156            })
157        }
158        EngineConfig::StaticFull { engine, model, .. } => {
159            let service_name = model.service_name().to_string();
160            tracing::debug!("Model: {service_name} with engine pre-processing");
161            let engine = Arc::new(StreamingEngineAdapter::new(engine));
162            Ok(PreparedEngine {
163                service_name,
164                engine,
165                inspect_template: false,
166                request_template: model.request_template(),
167                card: Some(model.into_card()),
168            })
169        }
170        EngineConfig::StaticCore {
171            engine: inner_engine,
172            model,
173            ..
174        } => {
175            let pipeline = build_pipeline::<
176                NvCreateChatCompletionRequest,
177                NvCreateChatCompletionStreamResponse,
178            >(model.card(), inner_engine, model.card().tokenizer_hf()?)
179            .await?;
180
181            let service_name = model.service_name().to_string();
182            tracing::debug!("Model: {service_name} with Dynamo pre-processing");
183            Ok(PreparedEngine {
184                service_name,
185                engine: pipeline,
186                inspect_template: true,
187                request_template: model.request_template(),
188                card: Some(model.into_card()),
189            })
190        }
191    }
192}
193
194pub async fn build_pipeline<Req, Resp>(
195    card: &ModelDeploymentCard,
196    engine: ExecutionContext,
197    hf_tokenizer: tokenizers::Tokenizer,
198) -> anyhow::Result<Arc<ServiceFrontend<SingleIn<Req>, ManyOut<Annotated<Resp>>>>>
199where
200    Req: Data,
201    Resp: Data,
202    OpenAIPreprocessor: Operator<
203            Context<Req>,
204            Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
205            Context<PreprocessedRequest>,
206            Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
207        >,
208{
209    let frontend = ServiceFrontend::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
210    let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
211    let preprocessor =
212        OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?
213            .into_operator();
214    let backend = Backend::from_tokenizer(hf_tokenizer).into_operator();
215    let engine = ServiceBackend::from_engine(engine);
216
217    Ok(frontend
218        .link(preprocessor.forward_edge())?
219        .link(backend.forward_edge())?
220        .link(engine)?
221        .link(backend.backward_edge())?
222        .link(preprocessor.backward_edge())?
223        .link(frontend)?)
224}
225
226pub async fn build_routed_pipeline<Req, Resp>(
227    card: &ModelDeploymentCard,
228    client: &Client,
229    router_mode: RouterMode,
230    busy_threshold: Option<f64>,
231    chooser: Option<Arc<KvRouter>>,
232    hf_tokenizer: tokenizers::Tokenizer,
233) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
234where
235    Req: Data,
236    Resp: Data,
237    OpenAIPreprocessor: Operator<
238            Context<Req>,
239            Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
240            Context<PreprocessedRequest>,
241            Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
242        >,
243{
244    let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
245    let preprocessor =
246        OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?;
247    build_routed_pipeline_with_preprocessor(
248        card,
249        client,
250        router_mode,
251        busy_threshold,
252        chooser,
253        preprocessor,
254        hf_tokenizer,
255    )
256    .await
257}
258
259pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
260    card: &ModelDeploymentCard,
261    client: &Client,
262    router_mode: RouterMode,
263    busy_threshold: Option<f64>,
264    chooser: Option<Arc<KvRouter>>,
265    preprocessor: Arc<OpenAIPreprocessor>,
266    hf_tokenizer: tokenizers::Tokenizer,
267) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
268where
269    Req: Data,
270    Resp: Data,
271    OpenAIPreprocessor: Operator<
272            Context<Req>,
273            Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
274            Context<PreprocessedRequest>,
275            Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
276        >,
277{
278    let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
279    let preprocessor_op = preprocessor.into_operator();
280    let backend = Backend::from_tokenizer(hf_tokenizer).into_operator();
281    let migration = Migration::from_mdc(card).into_operator();
282    let router =
283        PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold(
284            client.clone(),
285            router_mode,
286            busy_threshold,
287        )
288        .await?;
289    let service_backend = match router_mode {
290        RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
291            ServiceBackend::from_engine(Arc::new(router))
292        }
293        RouterMode::KV => {
294            let Some(chooser) = chooser else {
295                anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
296            };
297            let kv_push_router = KvPushRouter::new(router, chooser);
298            ServiceBackend::from_engine(Arc::new(kv_push_router))
299        }
300    };
301
302    let engine = frontend
303        .link(preprocessor_op.forward_edge())?
304        .link(backend.forward_edge())?
305        .link(migration.forward_edge())?
306        .link(service_backend)?
307        .link(migration.backward_edge())?
308        .link(backend.backward_edge())?
309        .link(preprocessor_op.backward_edge())?
310        .link(frontend)?;
311    Ok(engine)
312}