dynamo_llm/
engines.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::env;
17use std::sync::Arc;
18use std::sync::LazyLock;
19use std::time::Duration;
20
21use async_stream::stream;
22use async_trait::async_trait;
23
24use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
25use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
26use dynamo_runtime::protocols::annotated::Annotated;
27
28use crate::backend::ExecutionContext;
29use crate::preprocessor::BackendInput;
30use crate::protocols::common::llm_backend::LLMEngineOutput;
31use crate::protocols::openai::chat_completions::{
32    NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
33};
34use crate::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
35
36//
37// The engines are each in their own crate under `lib/engines`
38//
39
40#[derive(Debug, Clone)]
41pub struct MultiNodeConfig {
42    /// How many nodes / hosts we are using
43    pub num_nodes: u32,
44    /// Unique consecutive integer to identify this node
45    pub node_rank: u32,
46    /// host:port of head / control node
47    pub leader_addr: String,
48}
49
50impl Default for MultiNodeConfig {
51    fn default() -> Self {
52        MultiNodeConfig {
53            num_nodes: 1,
54            node_rank: 0,
55            leader_addr: "".to_string(),
56        }
57    }
58}
59
60//
61// Example echo engines
62//
63
64/// How long to sleep between echoed tokens.
65/// Default is 10ms which gives us 100 tok/s.
66/// Can be configured via the DYN_TOKEN_ECHO_DELAY_MS environment variable.
67pub static TOKEN_ECHO_DELAY: LazyLock<Duration> = LazyLock::new(|| {
68    const DEFAULT_DELAY_MS: u64 = 10;
69
70    let delay_ms = env::var("DYN_TOKEN_ECHO_DELAY_MS")
71        .ok()
72        .and_then(|val| val.parse::<u64>().ok())
73        .unwrap_or(DEFAULT_DELAY_MS);
74
75    Duration::from_millis(delay_ms)
76});
77
78/// Engine that accepts pre-processed requests and echos the tokens back as the response
79/// The response will include the full prompt template.
80/// Useful for testing pre-processing.
81struct EchoEngineCore {}
82pub fn make_engine_core() -> ExecutionContext {
83    Arc::new(EchoEngineCore {})
84}
85
86#[async_trait]
87impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Error>
88    for EchoEngineCore
89{
90    async fn generate(
91        &self,
92        incoming_request: SingleIn<BackendInput>,
93    ) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
94        let (request, context) = incoming_request.into_parts();
95        let ctx = context.context();
96
97        let output = stream! {
98            for tok in request.token_ids {
99                tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
100                yield delta_core(tok);
101            }
102            yield Annotated::from_data(LLMEngineOutput::stop());
103        };
104        Ok(ResponseStream::new(Box::pin(output), ctx))
105    }
106}
107
108fn delta_core(tok: u32) -> Annotated<LLMEngineOutput> {
109    let delta = LLMEngineOutput {
110        token_ids: vec![tok],
111        tokens: None,
112        text: None,
113        cum_log_probs: None,
114        log_probs: None,
115        finish_reason: None,
116    };
117    Annotated::from_data(delta)
118}
119
120/// Engine that accepts un-preprocessed requests and echos the prompt back as the response
121/// Useful for testing ingress such as service-http.
122struct EchoEngineFull {}
123pub fn make_engine_full() -> OpenAIChatCompletionsStreamingEngine {
124    Arc::new(EchoEngineFull {})
125}
126
127#[async_trait]
128impl
129    AsyncEngine<
130        SingleIn<NvCreateChatCompletionRequest>,
131        ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
132        Error,
133    > for EchoEngineFull
134{
135    async fn generate(
136        &self,
137        incoming_request: SingleIn<NvCreateChatCompletionRequest>,
138    ) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
139        let (request, context) = incoming_request.transfer(());
140        let deltas = request.response_generator();
141        let ctx = context.context();
142        let req = request.inner.messages.into_iter().next_back().unwrap();
143
144        let prompt = match req {
145            async_openai::types::ChatCompletionRequestMessage::User(user_msg) => {
146                match user_msg.content {
147                    async_openai::types::ChatCompletionRequestUserMessageContent::Text(prompt) => {
148                        prompt
149                    }
150                    _ => anyhow::bail!("Invalid request content field, expected Content::Text"),
151                }
152            }
153            _ => anyhow::bail!("Invalid request type, expected User message"),
154        };
155
156        let output = stream! {
157            let mut id = 1;
158            for c in prompt.chars() {
159                // we are returning characters not tokens, so there will be some postprocessing overhead
160                tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
161                let inner = deltas.create_choice(0, Some(c.to_string()), None, None);
162                let response = NvCreateChatCompletionStreamResponse {
163                    inner,
164                };
165                yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
166                id += 1;
167            }
168
169            let inner = deltas.create_choice(0, None, Some(async_openai::types::FinishReason::Stop), None);
170            let response = NvCreateChatCompletionStreamResponse {
171                inner,
172            };
173            yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
174        };
175
176        Ok(ResponseStream::new(Box::pin(output), ctx))
177    }
178}