dynamo_llm/entrypoint/input/
common.rs1use std::pin::Pin;
5
6use crate::{
7 backend::{Backend, ExecutionContext},
8 discovery::{MODEL_ROOT_PATH, ModelManager, ModelWatcher},
9 engines::StreamingEngineAdapter,
10 entrypoint::{self, EngineConfig},
11 kv_router::{KvPushRouter, KvRouter},
12 migration::Migration,
13 model_card::ModelDeploymentCard,
14 preprocessor::{OpenAIPreprocessor, prompt::PromptFormatter},
15 protocols::common::llm_backend::{BackendOutput, LLMEngineOutput, PreprocessedRequest},
16 request_template::RequestTemplate,
17 types::{
18 Annotated,
19 openai::chat_completions::{
20 NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
21 OpenAIChatCompletionsStreamingEngine,
22 },
23 },
24};
25
26use dynamo_runtime::{
27 DistributedRuntime, Runtime,
28 component::Client,
29 distributed::DistributedConfig,
30 engine::{AsyncEngineStream, Data},
31 pipeline::{
32 Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
33 ServiceEngine, ServiceFrontend, SingleIn, Source,
34 },
35};
36use std::sync::Arc;
37
38pub struct PreparedEngine {
39 pub service_name: String,
40 pub engine: OpenAIChatCompletionsStreamingEngine,
41 pub inspect_template: bool,
42 pub card: Option<ModelDeploymentCard>,
43 pub request_template: Option<RequestTemplate>,
44}
45
46impl PreparedEngine {
47 pub fn has_tokenizer(&self) -> bool {
48 if let Some(card) = self.card.as_ref() {
49 card.has_tokenizer()
50 } else {
51 false
52 }
53 }
54}
55
56pub async fn prepare_engine(
58 runtime: Runtime,
59 engine_config: EngineConfig,
60) -> anyhow::Result<PreparedEngine> {
61 match engine_config {
62 EngineConfig::Dynamic(local_model) => {
63 let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
64
65 let Some(etcd_client) = distributed_runtime.etcd_client() else {
66 anyhow::bail!("Cannot be both static mode and run with dynamic discovery.");
67 };
68 let model_manager = Arc::new(ModelManager::new());
69 let watch_obj = Arc::new(ModelWatcher::new(
70 distributed_runtime,
71 model_manager.clone(),
72 dynamo_runtime::pipeline::RouterMode::RoundRobin,
73 None,
74 None,
75 ));
76 let models_watcher = etcd_client.kv_get_and_watch_prefix(MODEL_ROOT_PATH).await?;
77 let (_prefix, _watcher, receiver) = models_watcher.dissolve();
78
79 let inner_watch_obj = watch_obj.clone();
80 let _watcher_task = tokio::spawn(async move {
81 inner_watch_obj.watch(receiver, None).await;
82 });
83 tracing::info!("Waiting for remote model..");
84
85 let model_service_name = watch_obj.wait_for_chat_model().await;
90 tracing::info!("Connected to {model_service_name}");
91 let engine = model_manager.get_chat_completions_engine(&model_service_name)?;
92 Ok(PreparedEngine {
93 service_name: model_service_name,
94 engine,
95 inspect_template: false,
96 card: None,
97 request_template: local_model.request_template(),
98 })
99 }
100 EngineConfig::StaticRemote(local_model) => {
101 let card = local_model.card();
106 let router_mode = local_model.router_config().router_mode;
107
108 let dst_config = DistributedConfig::from_settings(true);
109 let distributed_runtime = DistributedRuntime::new(runtime, dst_config).await?;
110
111 let endpoint_id = local_model.endpoint_id();
112 let component = distributed_runtime
113 .namespace(&endpoint_id.namespace)?
114 .component(&endpoint_id.component)?;
115
116 let client = component.endpoint(&endpoint_id.name).client().await?;
117
118 let kv_chooser = if router_mode == RouterMode::KV {
119 let model_manager = Arc::new(ModelManager::new());
120 Some(
121 model_manager
122 .kv_chooser_for(
123 local_model.display_name(),
124 &component,
125 card.kv_cache_block_size,
126 Some(local_model.router_config().kv_router_config),
127 )
128 .await?,
129 )
130 } else {
131 None
132 };
133
134 let hf_tokenizer = card.tokenizer_hf()?;
135 let chat_engine = entrypoint::build_routed_pipeline::<
136 NvCreateChatCompletionRequest,
137 NvCreateChatCompletionStreamResponse,
138 >(
139 card,
140 &client,
141 router_mode,
142 None,
143 kv_chooser.clone(),
144 hf_tokenizer,
145 )
146 .await?;
147
148 let service_name = local_model.service_name().to_string();
149 tracing::info!("Static connecting to {service_name}");
150 Ok(PreparedEngine {
151 service_name,
152 engine: chat_engine,
153 inspect_template: false,
154 request_template: local_model.request_template(),
155 card: Some(local_model.into_card()),
156 })
157 }
158 EngineConfig::StaticFull { engine, model, .. } => {
159 let service_name = model.service_name().to_string();
160 tracing::debug!("Model: {service_name} with engine pre-processing");
161 let engine = Arc::new(StreamingEngineAdapter::new(engine));
162 Ok(PreparedEngine {
163 service_name,
164 engine,
165 inspect_template: false,
166 request_template: model.request_template(),
167 card: Some(model.into_card()),
168 })
169 }
170 EngineConfig::StaticCore {
171 engine: inner_engine,
172 model,
173 ..
174 } => {
175 let pipeline = build_pipeline::<
176 NvCreateChatCompletionRequest,
177 NvCreateChatCompletionStreamResponse,
178 >(model.card(), inner_engine, model.card().tokenizer_hf()?)
179 .await?;
180
181 let service_name = model.service_name().to_string();
182 tracing::debug!("Model: {service_name} with Dynamo pre-processing");
183 Ok(PreparedEngine {
184 service_name,
185 engine: pipeline,
186 inspect_template: true,
187 request_template: model.request_template(),
188 card: Some(model.into_card()),
189 })
190 }
191 }
192}
193
194pub async fn build_pipeline<Req, Resp>(
195 card: &ModelDeploymentCard,
196 engine: ExecutionContext,
197 hf_tokenizer: tokenizers::Tokenizer,
198) -> anyhow::Result<Arc<ServiceFrontend<SingleIn<Req>, ManyOut<Annotated<Resp>>>>>
199where
200 Req: Data,
201 Resp: Data,
202 OpenAIPreprocessor: Operator<
203 Context<Req>,
204 Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
205 Context<PreprocessedRequest>,
206 Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
207 >,
208{
209 let frontend = ServiceFrontend::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
210 let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
211 let preprocessor =
212 OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?
213 .into_operator();
214 let backend = Backend::from_tokenizer(hf_tokenizer).into_operator();
215 let engine = ServiceBackend::from_engine(engine);
216
217 Ok(frontend
218 .link(preprocessor.forward_edge())?
219 .link(backend.forward_edge())?
220 .link(engine)?
221 .link(backend.backward_edge())?
222 .link(preprocessor.backward_edge())?
223 .link(frontend)?)
224}
225
226pub async fn build_routed_pipeline<Req, Resp>(
227 card: &ModelDeploymentCard,
228 client: &Client,
229 router_mode: RouterMode,
230 busy_threshold: Option<f64>,
231 chooser: Option<Arc<KvRouter>>,
232 hf_tokenizer: tokenizers::Tokenizer,
233) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
234where
235 Req: Data,
236 Resp: Data,
237 OpenAIPreprocessor: Operator<
238 Context<Req>,
239 Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
240 Context<PreprocessedRequest>,
241 Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
242 >,
243{
244 let PromptFormatter::OAI(formatter) = PromptFormatter::from_mdc(card)?;
245 let preprocessor =
246 OpenAIPreprocessor::new_with_parts(card.clone(), formatter, hf_tokenizer.clone())?;
247 build_routed_pipeline_with_preprocessor(
248 card,
249 client,
250 router_mode,
251 busy_threshold,
252 chooser,
253 preprocessor,
254 hf_tokenizer,
255 )
256 .await
257}
258
259pub async fn build_routed_pipeline_with_preprocessor<Req, Resp>(
260 card: &ModelDeploymentCard,
261 client: &Client,
262 router_mode: RouterMode,
263 busy_threshold: Option<f64>,
264 chooser: Option<Arc<KvRouter>>,
265 preprocessor: Arc<OpenAIPreprocessor>,
266 hf_tokenizer: tokenizers::Tokenizer,
267) -> anyhow::Result<ServiceEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>>>
268where
269 Req: Data,
270 Resp: Data,
271 OpenAIPreprocessor: Operator<
272 Context<Req>,
273 Pin<Box<dyn AsyncEngineStream<Annotated<Resp>>>>,
274 Context<PreprocessedRequest>,
275 Pin<Box<dyn AsyncEngineStream<Annotated<BackendOutput>>>>,
276 >,
277{
278 let frontend = SegmentSource::<SingleIn<Req>, ManyOut<Annotated<Resp>>>::new();
279 let preprocessor_op = preprocessor.into_operator();
280 let backend = Backend::from_tokenizer(hf_tokenizer).into_operator();
281 let migration = Migration::from_mdc(card).into_operator();
282 let router =
283 PushRouter::<PreprocessedRequest, Annotated<LLMEngineOutput>>::from_client_with_threshold(
284 client.clone(),
285 router_mode,
286 busy_threshold,
287 )
288 .await?;
289 let service_backend = match router_mode {
290 RouterMode::Random | RouterMode::RoundRobin | RouterMode::Direct(_) => {
291 ServiceBackend::from_engine(Arc::new(router))
292 }
293 RouterMode::KV => {
294 let Some(chooser) = chooser else {
295 anyhow::bail!("RouterMode::KV requires KVRouter to not be null");
296 };
297 let kv_push_router = KvPushRouter::new(router, chooser);
298 ServiceBackend::from_engine(Arc::new(kv_push_router))
299 }
300 };
301
302 let engine = frontend
303 .link(preprocessor_op.forward_edge())?
304 .link(backend.forward_edge())?
305 .link(migration.forward_edge())?
306 .link(service_backend)?
307 .link(migration.backward_edge())?
308 .link(backend.backward_edge())?
309 .link(preprocessor_op.backward_edge())?
310 .link(frontend)?;
311 Ok(engine)
312}