use std::{
fmt,
io::{IsTerminal as _, Read as _},
str::FromStr,
};
mod common;
pub use common::{build_routed_pipeline, build_routed_pipeline_with_preprocessor};
pub mod endpoint;
pub mod grpc;
pub mod http;
pub mod text;
use dynamo_runtime::protocols::ENDPOINT_SCHEME;
#[derive(PartialEq)]
pub enum Input {
Http,
Stdin,
Text,
Endpoint(String),
Grpc,
}
impl FromStr for Input {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Input::try_from(s)
}
}
impl TryFrom<&str> for Input {
type Error = anyhow::Error;
fn try_from(s: &str) -> anyhow::Result<Self> {
match s {
"http" => Ok(Input::Http),
"grpc" => Ok(Input::Grpc),
"text" => Ok(Input::Text),
"stdin" => Ok(Input::Stdin),
endpoint_path if endpoint_path.starts_with(ENDPOINT_SCHEME) => {
Ok(Input::Endpoint(endpoint_path.to_string()))
}
e => Err(anyhow::anyhow!("Invalid in= option '{e}'")),
}
}
}
impl fmt::Display for Input {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let s = match self {
Input::Http => "http",
Input::Grpc => "grpc",
Input::Text => "text",
Input::Stdin => "stdin",
Input::Endpoint(path) => path,
};
write!(f, "{s}")
}
}
impl Default for Input {
fn default() -> Self {
if std::io::stdin().is_terminal() {
Input::Text
} else {
Input::Stdin
}
}
}
pub async fn run_input(
drt: dynamo_runtime::DistributedRuntime,
in_opt: Input,
engine_config: super::EngineConfig,
) -> anyhow::Result<()> {
if let Err(e) = crate::agents::trace::init_from_env_with_shutdown(drt.child_token()).await {
tracing::warn!(error = %e, "Agent trace initialization failed; continuing without trace sink");
}
if let Err(e) = crate::agents::trace::start_tool_event_ingest_from_policy(
drt.clone(),
engine_config.local_model(),
)
.await
{
tracing::warn!(error = %e, "Agent tool event ingest initialization failed; continuing without tool traces");
}
if let Err(e) = crate::audit::init_from_env_with_shutdown(drt.child_token()).await {
tracing::warn!(error = %e, "Audit initialization failed; continuing without audit sink");
}
match in_opt {
Input::Http => {
http::run(drt, engine_config).await?;
}
Input::Grpc => {
grpc::run(drt, engine_config).await?;
}
Input::Text => {
text::run(drt, None, engine_config).await?;
}
Input::Stdin => {
let mut prompt = String::new();
std::io::stdin().read_to_string(&mut prompt).unwrap();
text::run(drt, Some(prompt), engine_config).await?;
}
Input::Endpoint(path) => {
endpoint::run(drt, path, engine_config).await?;
}
}
Ok(())
}