1use std::env;
17use std::sync::Arc;
18use std::sync::LazyLock;
19use std::time::Duration;
20
21use async_stream::stream;
22use async_trait::async_trait;
23
24use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
25use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
26use dynamo_runtime::protocols::annotated::Annotated;
27
28use crate::backend::ExecutionContext;
29use crate::preprocessor::BackendInput;
30use crate::protocols::common::llm_backend::LLMEngineOutput;
31use crate::protocols::openai::chat_completions::{
32 NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse,
33};
34use crate::types::openai::chat_completions::OpenAIChatCompletionsStreamingEngine;
35
36#[derive(Debug, Clone)]
41pub struct MultiNodeConfig {
42 pub num_nodes: u32,
44 pub node_rank: u32,
46 pub leader_addr: String,
48}
49
50impl Default for MultiNodeConfig {
51 fn default() -> Self {
52 MultiNodeConfig {
53 num_nodes: 1,
54 node_rank: 0,
55 leader_addr: "".to_string(),
56 }
57 }
58}
59
60pub static TOKEN_ECHO_DELAY: LazyLock<Duration> = LazyLock::new(|| {
68 const DEFAULT_DELAY_MS: u64 = 10;
69
70 let delay_ms = env::var("DYN_TOKEN_ECHO_DELAY_MS")
71 .ok()
72 .and_then(|val| val.parse::<u64>().ok())
73 .unwrap_or(DEFAULT_DELAY_MS);
74
75 Duration::from_millis(delay_ms)
76});
77
78struct EchoEngineCore {}
82pub fn make_engine_core() -> ExecutionContext {
83 Arc::new(EchoEngineCore {})
84}
85
86#[async_trait]
87impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Error>
88 for EchoEngineCore
89{
90 async fn generate(
91 &self,
92 incoming_request: SingleIn<BackendInput>,
93 ) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
94 let (request, context) = incoming_request.into_parts();
95 let ctx = context.context();
96
97 let output = stream! {
98 for tok in request.token_ids {
99 tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
100 yield delta_core(tok);
101 }
102 yield Annotated::from_data(LLMEngineOutput::stop());
103 };
104 Ok(ResponseStream::new(Box::pin(output), ctx))
105 }
106}
107
108fn delta_core(tok: u32) -> Annotated<LLMEngineOutput> {
109 let delta = LLMEngineOutput {
110 token_ids: vec![tok],
111 tokens: None,
112 text: None,
113 cum_log_probs: None,
114 log_probs: None,
115 finish_reason: None,
116 };
117 Annotated::from_data(delta)
118}
119
120struct EchoEngineFull {}
123pub fn make_engine_full() -> OpenAIChatCompletionsStreamingEngine {
124 Arc::new(EchoEngineFull {})
125}
126
127#[async_trait]
128impl
129 AsyncEngine<
130 SingleIn<NvCreateChatCompletionRequest>,
131 ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
132 Error,
133 > for EchoEngineFull
134{
135 async fn generate(
136 &self,
137 incoming_request: SingleIn<NvCreateChatCompletionRequest>,
138 ) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
139 let (request, context) = incoming_request.transfer(());
140 let deltas = request.response_generator();
141 let ctx = context.context();
142 let req = request.inner.messages.into_iter().next_back().unwrap();
143
144 let prompt = match req {
145 async_openai::types::ChatCompletionRequestMessage::User(user_msg) => {
146 match user_msg.content {
147 async_openai::types::ChatCompletionRequestUserMessageContent::Text(prompt) => {
148 prompt
149 }
150 _ => anyhow::bail!("Invalid request content field, expected Content::Text"),
151 }
152 }
153 _ => anyhow::bail!("Invalid request type, expected User message"),
154 };
155
156 let output = stream! {
157 let mut id = 1;
158 for c in prompt.chars() {
159 tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
161 let inner = deltas.create_choice(0, Some(c.to_string()), None, None);
162 let response = NvCreateChatCompletionStreamResponse {
163 inner,
164 };
165 yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
166 id += 1;
167 }
168
169 let inner = deltas.create_choice(0, None, Some(async_openai::types::FinishReason::Stop), None);
170 let response = NvCreateChatCompletionStreamResponse {
171 inner,
172 };
173 yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
174 };
175
176 Ok(ResponseStream::new(Box::pin(output), ctx))
177 }
178}