dynamo_llm/entrypoint/
input.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module contains tools to gather a prompt from a user, forward it to an engine and return
5//! the response.
6//! See the Input enum for the inputs available. Input::Http (OpenAI compatible HTTP server)
7//! and Input::Text (interactive chat) are good places to start.
8//! The main entry point is `run_input`.
9
10use std::{
11    fmt,
12    io::{IsTerminal as _, Read as _},
13    path::PathBuf,
14    str::FromStr,
15};
16
17pub mod batch;
18mod common;
19pub use common::{build_routed_pipeline, build_routed_pipeline_with_preprocessor};
20pub mod endpoint;
21pub mod grpc;
22pub mod http;
23pub mod text;
24
25use dynamo_runtime::protocols::ENDPOINT_SCHEME;
26use either::Either;
27
28const BATCH_PREFIX: &str = "batch:";
29
30/// The various ways of connecting prompts to an engine
31#[derive(PartialEq)]
32pub enum Input {
33    /// Run an OpenAI compatible HTTP server
34    Http,
35
36    /// Single prompt on stdin
37    Stdin,
38
39    /// Interactive chat
40    Text,
41
42    /// Pull requests from a namespace/component/endpoint path.
43    Endpoint(String),
44
45    /// Batch mode. Run all the prompts, write the outputs, exit.
46    Batch(PathBuf),
47
48    // Run an KServe compatible gRPC server
49    Grpc,
50}
51
52impl FromStr for Input {
53    type Err = anyhow::Error;
54
55    fn from_str(s: &str) -> Result<Self, Self::Err> {
56        Input::try_from(s)
57    }
58}
59
60impl TryFrom<&str> for Input {
61    type Error = anyhow::Error;
62
63    fn try_from(s: &str) -> anyhow::Result<Self> {
64        match s {
65            "http" => Ok(Input::Http),
66            "grpc" => Ok(Input::Grpc),
67            "text" => Ok(Input::Text),
68            "stdin" => Ok(Input::Stdin),
69            endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => {
70                Ok(Input::Endpoint(endpoint_path.to_string()))
71            }
72            batch_patch if batch_patch.starts_with(BATCH_PREFIX) => {
73                let path = batch_patch.strip_prefix(BATCH_PREFIX).unwrap();
74                Ok(Input::Batch(PathBuf::from(path)))
75            }
76            e => Err(anyhow::anyhow!("Invalid in= option '{e}'")),
77        }
78    }
79}
80
81impl fmt::Display for Input {
82    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83        let s = match self {
84            Input::Http => "http",
85            Input::Grpc => "grpc",
86            Input::Text => "text",
87            Input::Stdin => "stdin",
88            Input::Endpoint(path) => path,
89            Input::Batch(path) => &path.display().to_string(),
90        };
91        write!(f, "{s}")
92    }
93}
94
95impl Default for Input {
96    fn default() -> Self {
97        if std::io::stdin().is_terminal() {
98            Input::Text
99        } else {
100            Input::Stdin
101        }
102    }
103}
104
105/// Run the given engine (EngineConfig) connected to an input.
106/// Does not return until the input exits.
107/// For Input::Endpoint pass a DistributedRuntime. For everything else pass either a Runtime or a
108/// DistributedRuntime.
109pub async fn run_input(
110    rt: Either<dynamo_runtime::Runtime, dynamo_runtime::DistributedRuntime>,
111    in_opt: Input,
112    engine_config: super::EngineConfig,
113) -> anyhow::Result<()> {
114    let runtime = match &rt {
115        Either::Left(rt) => rt.clone(),
116        Either::Right(drt) => drt.runtime().clone(),
117    };
118    match in_opt {
119        Input::Http => {
120            http::run(runtime, engine_config).await?;
121        }
122        Input::Grpc => {
123            grpc::run(runtime, engine_config).await?;
124        }
125        Input::Text => {
126            text::run(runtime, None, engine_config).await?;
127        }
128        Input::Stdin => {
129            let mut prompt = String::new();
130            std::io::stdin().read_to_string(&mut prompt).unwrap();
131            text::run(runtime, Some(prompt), engine_config).await?;
132        }
133        Input::Batch(path) => {
134            batch::run(runtime, path, engine_config).await?;
135        }
136        Input::Endpoint(path) => {
137            let Either::Right(distributed_runtime) = rt else {
138                anyhow::bail!("Input::Endpoint requires passing a DistributedRuntime");
139            };
140            endpoint::run(distributed_runtime, path, engine_config).await?;
141        }
142    }
143    Ok(())
144}