dynamo_llm/entrypoint/
input.rs1use std::{
11 fmt,
12 io::{IsTerminal as _, Read as _},
13 path::PathBuf,
14 str::FromStr,
15};
16
17pub mod batch;
18mod common;
19pub use common::{build_routed_pipeline, build_routed_pipeline_with_preprocessor};
20pub mod endpoint;
21pub mod grpc;
22pub mod http;
23pub mod text;
24
25use dynamo_runtime::protocols::ENDPOINT_SCHEME;
26use either::Either;
27
28const BATCH_PREFIX: &str = "batch:";
29
30#[derive(PartialEq)]
32pub enum Input {
33 Http,
35
36 Stdin,
38
39 Text,
41
42 Endpoint(String),
44
45 Batch(PathBuf),
47
48 Grpc,
50}
51
52impl FromStr for Input {
53 type Err = anyhow::Error;
54
55 fn from_str(s: &str) -> Result<Self, Self::Err> {
56 Input::try_from(s)
57 }
58}
59
60impl TryFrom<&str> for Input {
61 type Error = anyhow::Error;
62
63 fn try_from(s: &str) -> anyhow::Result<Self> {
64 match s {
65 "http" => Ok(Input::Http),
66 "grpc" => Ok(Input::Grpc),
67 "text" => Ok(Input::Text),
68 "stdin" => Ok(Input::Stdin),
69 endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => {
70 Ok(Input::Endpoint(endpoint_path.to_string()))
71 }
72 batch_patch if batch_patch.starts_with(BATCH_PREFIX) => {
73 let path = batch_patch.strip_prefix(BATCH_PREFIX).unwrap();
74 Ok(Input::Batch(PathBuf::from(path)))
75 }
76 e => Err(anyhow::anyhow!("Invalid in= option '{e}'")),
77 }
78 }
79}
80
81impl fmt::Display for Input {
82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83 let s = match self {
84 Input::Http => "http",
85 Input::Grpc => "grpc",
86 Input::Text => "text",
87 Input::Stdin => "stdin",
88 Input::Endpoint(path) => path,
89 Input::Batch(path) => &path.display().to_string(),
90 };
91 write!(f, "{s}")
92 }
93}
94
95impl Default for Input {
96 fn default() -> Self {
97 if std::io::stdin().is_terminal() {
98 Input::Text
99 } else {
100 Input::Stdin
101 }
102 }
103}
104
105pub async fn run_input(
110 rt: Either<dynamo_runtime::Runtime, dynamo_runtime::DistributedRuntime>,
111 in_opt: Input,
112 engine_config: super::EngineConfig,
113) -> anyhow::Result<()> {
114 let runtime = match &rt {
115 Either::Left(rt) => rt.clone(),
116 Either::Right(drt) => drt.runtime().clone(),
117 };
118 match in_opt {
119 Input::Http => {
120 http::run(runtime, engine_config).await?;
121 }
122 Input::Grpc => {
123 grpc::run(runtime, engine_config).await?;
124 }
125 Input::Text => {
126 text::run(runtime, None, engine_config).await?;
127 }
128 Input::Stdin => {
129 let mut prompt = String::new();
130 std::io::stdin().read_to_string(&mut prompt).unwrap();
131 text::run(runtime, Some(prompt), engine_config).await?;
132 }
133 Input::Batch(path) => {
134 batch::run(runtime, path, engine_config).await?;
135 }
136 Input::Endpoint(path) => {
137 let Either::Right(distributed_runtime) = rt else {
138 anyhow::bail!("Input::Endpoint requires passing a DistributedRuntime");
139 };
140 endpoint::run(distributed_runtime, path, engine_config).await?;
141 }
142 }
143 Ok(())
144}