use crate::entrypoint::EngineConfig;
use crate::entrypoint::input::common;
use crate::request_template::RequestTemplate;
use crate::types::openai::chat_completions::{
NvCreateChatCompletionRequest, OpenAIChatCompletionsStreamingEngine,
};
use dynamo_async_openai::types::ChatCompletionMessageContent;
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::pipeline::Context;
use futures::StreamExt;
use std::io::{ErrorKind, Write};
const MAX_TOKENS: u32 = 8192;
pub async fn run(
distributed_runtime: DistributedRuntime,
single_prompt: Option<String>,
engine_config: EngineConfig,
) -> anyhow::Result<()> {
let prepared_engine =
common::prepare_engine(distributed_runtime.clone(), engine_config).await?;
main_loop(
distributed_runtime,
&prepared_engine.service_name,
prepared_engine.engine,
single_prompt,
prepared_engine.inspect_template,
prepared_engine.request_template,
)
.await
}
async fn main_loop(
distributed_runtime: DistributedRuntime,
service_name: &str,
engine: OpenAIChatCompletionsStreamingEngine,
mut initial_prompt: Option<String>,
_inspect_template: bool,
template: Option<RequestTemplate>,
) -> anyhow::Result<()> {
let cancel_token = distributed_runtime.primary_token();
if initial_prompt.is_none() {
tracing::info!("Ctrl-c to exit");
}
let theme = dialoguer::theme::ColorfulTheme::default();
let single = initial_prompt.is_some();
let mut history = dialoguer::BasicHistory::default();
let mut messages = vec![];
while !cancel_token.is_cancelled() {
let prompt = match initial_prompt.take() {
Some(p) => p,
None => {
let input_ui = dialoguer::Input::<String>::with_theme(&theme)
.history_with(&mut history)
.with_prompt("User");
match input_ui.interact_text() {
Ok(prompt) => prompt,
Err(dialoguer::Error::IO(err)) => {
match err.kind() {
ErrorKind::Interrupted => {
}
k => {
tracing::info!("IO error: {k}");
}
}
break;
}
}
}
};
let user_message = dynamo_async_openai::types::ChatCompletionRequestMessage::User(
dynamo_async_openai::types::ChatCompletionRequestUserMessage {
content: dynamo_async_openai::types::ChatCompletionRequestUserMessageContent::Text(
prompt,
),
name: None,
},
);
messages.push(user_message);
let inner = dynamo_async_openai::types::CreateChatCompletionRequestArgs::default()
.messages(messages.clone())
.model(
template
.as_ref()
.map_or_else(|| service_name.to_string(), |t| t.model.clone()),
)
.stream(true)
.max_completion_tokens(
template
.as_ref()
.map_or(MAX_TOKENS, |t| t.max_completion_tokens),
)
.temperature(template.as_ref().map_or(0.7, |t| t.temperature))
.n(1) .build()?;
let req = NvCreateChatCompletionRequest {
inner,
common: Default::default(),
nvext: None,
chat_template_args: None,
media_io_kwargs: None,
unsupported_fields: Default::default(),
};
let mut stream = match engine.generate(Context::new(req)).await {
Ok(stream) => stream,
Err(err) => {
tracing::error!(%err, "Request failed.");
continue;
}
};
let mut stdout = std::io::stdout();
let mut assistant_message = String::new();
let mut assistant_reasoning = String::new();
while let Some(item) = stream.next().await {
if cancel_token.is_cancelled() {
break;
}
match (item.data.as_ref(), item.event.as_deref()) {
(Some(data), _) => {
let entry = data.choices.first();
let chat_comp = entry.as_ref().unwrap();
if let Some(c) = &chat_comp.delta.content {
match c {
ChatCompletionMessageContent::Text(text) => {
let _ = stdout.write(text.as_bytes());
let _ = stdout.flush();
assistant_message += text;
}
ChatCompletionMessageContent::Parts(_) => {
let _ = stdout.write(b"[multimodal content]");
let _ = stdout.flush();
}
}
}
if let Some(reasoning) = &chat_comp.delta.reasoning_content {
assistant_reasoning += reasoning;
}
if let Some(reason) = chat_comp.finish_reason {
tracing::trace!("finish reason: {reason:?}");
break;
}
}
(None, Some("error")) => {
for err in item.comment.unwrap_or_default() {
tracing::error!("Engine error: {err}");
}
}
(None, Some(annotation)) => {
tracing::debug!("Annotation. {annotation}: {:?}", item.comment);
}
_ => {
unreachable!("Event from engine with no data, no error, no annotation.");
}
}
}
println!();
let assistant_content =
dynamo_async_openai::types::ChatCompletionRequestAssistantMessageContent::Text(
assistant_message,
);
let assistant_message = dynamo_async_openai::types::ChatCompletionRequestMessage::Assistant(
dynamo_async_openai::types::ChatCompletionRequestAssistantMessage {
content: Some(assistant_content),
reasoning_content: (!assistant_reasoning.is_empty()).then_some(
dynamo_async_openai::types::ReasoningContent::Text(assistant_reasoning),
),
..Default::default()
},
);
messages.push(assistant_message);
if single {
break;
}
}
println!();
distributed_runtime.shutdown();
cancel_token.cancelled().await;
Ok(())
}