use clap::Parser;
use inferd_daemon::admin::StatusBroadcaster;
use inferd_daemon::config::{BackendKind, Cli};
#[cfg(any(feature = "llamacpp", feature = "openai", feature = "bedrock"))]
use inferd_daemon::config_file::BackendEntry;
#[cfg(feature = "bedrock")]
use inferd_daemon::config_file::BedrockInvokeEntry;
use inferd_daemon::config_file::ConfigFile;
#[cfg(feature = "llamacpp")]
use inferd_daemon::config_file::LlamacppEntry;
#[cfg(feature = "openai")]
use inferd_daemon::config_file::OpenaiCompatEntry;
#[cfg(unix)]
use inferd_daemon::endpoint::bind_admin_uds;
#[cfg(unix)]
use inferd_daemon::endpoint::bind_uds;
use inferd_daemon::endpoint::{bind_tcp, default_admin_addr};
use inferd_daemon::lifecycle::{AcceptContext, serve_tcp, wait_for_ready};
use inferd_daemon::lock::Lock;
use inferd_daemon::logx::{DEFAULT_ROTATE_BYTES, LogxLayer, LogxWriter, default_log_dir};
use inferd_daemon::router::Router;
use inferd_daemon::status::{LoadPhase, StatusEvent};
use inferd_engine::{Backend, mock::Mock};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
install_tracing()?;
let cli = Cli::parse();
info!(
version = env!("CARGO_PKG_VERSION"),
"inferd-daemon starting"
);
if let Some(parent) = cli.lock.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)
.map_err(|e| anyhow::anyhow!("create runtime dir {:?}: {e}", parent))?;
}
let _lock =
Lock::acquire(&cli.lock).map_err(|e| anyhow::anyhow!("lock acquire failed: {e}"))?;
info!(path = %cli.lock.display(), "single-instance lock held");
let broadcaster = Arc::new(StatusBroadcaster::new(StatusEvent::Starting));
let (admin_shutdown_tx, admin_handle) =
spawn_admin_listener(Arc::clone(&broadcaster), cli.admin_addr.clone()).await?;
let config = load_config_file(cli.config.as_deref());
let resolved_v1 = resolve_v1_transport(&cli, config.as_ref())?;
let resolved_v2_tcp: Option<String> = cli.v2_tcp.clone().or_else(|| {
config
.as_ref()
.and_then(|c| c.listen.as_ref())
.and_then(|l| l.tcp_v2.clone())
});
let resolved_embed_tcp: Option<String> = cli.embed_tcp.clone().or_else(|| {
config
.as_ref()
.and_then(|c| c.listen.as_ref())
.and_then(|l| l.tcp_embed.clone())
});
let effective_api_key: Option<String> = cli.api_key.clone().or_else(|| {
config
.as_ref()
.and_then(|c| c.listen.as_ref())
.and_then(|l| l.api_key_env.as_deref())
.and_then(|name| std::env::var(name).ok())
.filter(|v| !v.is_empty())
});
let backends: Vec<Arc<dyn Backend>> =
match build_backends(&cli, config.as_ref(), Arc::clone(&broadcaster)).await {
Ok(b) => b,
Err(e) => {
let _ = admin_shutdown_tx.send(());
let _ = tokio::time::timeout(Duration::from_secs(1), admin_handle).await;
return Err(e);
}
};
for b in &backends {
info!(name = b.name(), "backend constructed");
}
for b in &backends {
let caps = b.capabilities();
broadcaster.publish(StatusEvent::Capabilities {
backend: b.name().to_string(),
v2: caps.v2,
vision: caps.vision,
audio: caps.audio,
tools: caps.tools,
thinking: caps.thinking,
embed: caps.embed,
accelerator: caps.accelerator.kind.as_str().to_string(),
gpu_layers: caps.accelerator.gpu_layers,
});
}
let embed_enabled = cli.embed && backends.iter().any(|b| b.capabilities().embed);
if cli.embed && !embed_enabled {
warn!(
"--embed requested but no registered backend advertises \
`capabilities().embed = true`; embed socket will not bind"
);
}
let router = Arc::new(Router::new(backends));
let waited = wait_for_ready(&router, Duration::from_secs(cli.ready_timeout_secs))
.await
.map_err(|e| anyhow::anyhow!("backend ready: {e}"))?;
info!(?waited, "backend ready");
broadcaster.publish(StatusEvent::Ready);
let fanout = 1 + usize::from(cli.v2) + usize::from(embed_enabled);
let mut shutdown_rxs = install_shutdown_signal(fanout)?;
let inference_shutdown_tx = shutdown_rxs.remove(0);
let v2_shutdown_tx = if cli.v2 {
Some(shutdown_rxs.remove(0))
} else {
None
};
let embed_shutdown_tx = if embed_enabled {
Some(shutdown_rxs.remove(0))
} else {
None
};
let admission = inferd_daemon::queue::Admission::new(cli.active_permits, cli.queue_depth);
info!(
active_permits = cli.active_permits,
queue_depth = cli.queue_depth,
capacity = admission.capacity(),
"admission gate configured"
);
let accept_ctx = AcceptContext {
expected_api_key: effective_api_key.clone(),
admission: Some(admission),
};
let any_tcp = matches!(resolved_v1, ResolvedTransport::Tcp(_))
|| resolved_v2_tcp.is_some()
|| (embed_enabled && resolved_embed_tcp.is_some());
if any_tcp && accept_ctx.expected_api_key.is_some() {
info!("tcp api-key auth enabled (F-8)");
} else if any_tcp {
warn!(
"tcp listener has no api-key configured (CLI --api-key or \
config listen.api_key_env unset); any local process can \
connect (THREAT_MODEL F-8)"
);
}
let v2_handle = if let Some(rx) = v2_shutdown_tx {
Some(
spawn_v2_listener(
&cli,
resolved_v2_tcp.as_deref(),
Arc::clone(&router),
accept_ctx.clone(),
rx,
)
.await?,
)
} else {
None
};
let embed_handle = if let Some(rx) = embed_shutdown_tx {
Some(
spawn_embed_listener(
&cli,
resolved_embed_tcp.as_deref(),
Arc::clone(&router),
accept_ctx.clone(),
rx,
)
.await?,
)
} else {
None
};
let serve_result = match resolved_v1 {
ResolvedTransport::Tcp(addr) => {
let listener = bind_tcp(&addr).await?;
info!(addr = %listener.local_addr()?, "tcp listener bound");
serve_tcp(listener, router, accept_ctx, inference_shutdown_tx).await
}
#[cfg(unix)]
ResolvedTransport::Uds(path) => {
let listener = bind_uds(&path, cli.group.as_deref()).await?;
info!(path = %path.display(), "uds listener bound");
inferd_daemon::lifecycle::serve_uds(listener, router, accept_ctx, inference_shutdown_tx)
.await
}
#[cfg(not(unix))]
ResolvedTransport::Uds(path) => {
drop((path, router, accept_ctx, inference_shutdown_tx));
anyhow::bail!(
"Unix domain sockets are not supported on this platform; use --pipe or --tcp"
);
}
#[cfg(windows)]
ResolvedTransport::Pipe(path) => {
let first = inferd_daemon::endpoint::bind_named_pipe(&path, true)?;
info!(path = %path, "named pipe listener bound");
inferd_daemon::lifecycle::serve_named_pipe(
&path,
first,
router,
accept_ctx,
inference_shutdown_tx,
)
.await
}
#[cfg(not(windows))]
ResolvedTransport::Pipe(path) => {
drop((path, router, accept_ctx, inference_shutdown_tx));
anyhow::bail!(
"Windows named pipes are not supported on this platform; use --uds or --tcp"
);
}
};
broadcaster.publish(StatusEvent::Draining);
let _ = admin_shutdown_tx.send(());
let _ = tokio::time::timeout(Duration::from_secs(2), admin_handle).await;
if let Some(handle) = v2_handle {
let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
}
if let Some(handle) = embed_handle {
let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
}
serve_result?;
info!("shutdown complete");
Ok(())
}
async fn spawn_v2_listener(
cli: &Cli,
resolved_v2_tcp: Option<&str>,
router: Arc<Router>,
accept_ctx: AcceptContext,
shutdown_rx: tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<tokio::task::JoinHandle<()>> {
use inferd_daemon::endpoint::default_v2_addr;
use inferd_daemon::lifecycle_v2;
if let Some(addr) = resolved_v2_tcp {
let listener = bind_tcp(addr).await?;
info!(addr = %listener.local_addr()?, "v2 tcp listener bound");
Ok(tokio::spawn(async move {
if let Err(e) =
lifecycle_v2::serve_tcp_v2(listener, router, accept_ctx, shutdown_rx).await
{
error!(error = ?e, "v2 tcp listener error");
}
}))
} else {
let path = cli.v2_addr.clone().unwrap_or_else(default_v2_addr);
#[cfg(unix)]
{
let listener = bind_uds(&path, cli.group.as_deref()).await?;
info!(path = %path.display(), "v2 uds listener bound");
Ok(tokio::spawn(async move {
if let Err(e) =
lifecycle_v2::serve_uds_v2(listener, router, accept_ctx, shutdown_rx).await
{
error!(error = ?e, "v2 uds listener error");
}
}))
}
#[cfg(windows)]
{
let path_str = path
.to_str()
.ok_or_else(|| {
anyhow::anyhow!("v2 pipe path is not valid utf-8: {}", path.display())
})?
.to_string();
let first = inferd_daemon::endpoint::bind_named_pipe(&path_str, true)?;
info!(path = %path_str, "v2 named pipe listener bound");
Ok(tokio::spawn(async move {
if let Err(e) = lifecycle_v2::serve_named_pipe_v2(
&path_str,
first,
router,
accept_ctx,
shutdown_rx,
)
.await
{
error!(error = ?e, "v2 named pipe listener error");
}
}))
}
#[cfg(not(any(unix, windows)))]
{
drop((path, router, accept_ctx, shutdown_rx));
anyhow::bail!("v2 endpoint requires unix or windows; use --v2-tcp instead")
}
}
}
async fn spawn_embed_listener(
cli: &Cli,
resolved_embed_tcp: Option<&str>,
router: Arc<Router>,
accept_ctx: AcceptContext,
shutdown_rx: tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<tokio::task::JoinHandle<()>> {
use inferd_daemon::endpoint::default_embed_addr;
use inferd_daemon::lifecycle_embed;
if let Some(addr) = resolved_embed_tcp {
let listener = bind_tcp(addr).await?;
info!(addr = %listener.local_addr()?, "embed tcp listener bound");
Ok(tokio::spawn(async move {
if let Err(e) =
lifecycle_embed::serve_tcp_embed(listener, router, accept_ctx, shutdown_rx).await
{
error!(error = ?e, "embed tcp listener error");
}
}))
} else {
let path = cli.embed_addr.clone().unwrap_or_else(default_embed_addr);
#[cfg(unix)]
{
let listener = bind_uds(&path, cli.group.as_deref()).await?;
info!(path = %path.display(), "embed uds listener bound");
Ok(tokio::spawn(async move {
if let Err(e) =
lifecycle_embed::serve_uds_embed(listener, router, accept_ctx, shutdown_rx)
.await
{
error!(error = ?e, "embed uds listener error");
}
}))
}
#[cfg(windows)]
{
let path_str = path
.to_str()
.ok_or_else(|| {
anyhow::anyhow!("embed pipe path is not valid utf-8: {}", path.display())
})?
.to_string();
let first = inferd_daemon::endpoint::bind_named_pipe(&path_str, true)?;
info!(path = %path_str, "embed named pipe listener bound");
Ok(tokio::spawn(async move {
if let Err(e) = lifecycle_embed::serve_named_pipe_embed(
&path_str,
first,
router,
accept_ctx,
shutdown_rx,
)
.await
{
error!(error = ?e, "embed named pipe listener error");
}
}))
}
#[cfg(not(any(unix, windows)))]
{
drop((path, router, accept_ctx, shutdown_rx));
anyhow::bail!("embed endpoint requires unix or windows; use --embed-tcp instead")
}
}
}
async fn spawn_admin_listener(
broadcaster: Arc<StatusBroadcaster>,
cli_addr: Option<PathBuf>,
) -> anyhow::Result<(
tokio::sync::oneshot::Sender<()>,
tokio::task::JoinHandle<()>,
)> {
let addr = cli_addr.unwrap_or_else(default_admin_addr);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
#[cfg(unix)]
{
let listener = bind_admin_uds(&addr)
.await
.map_err(|e| anyhow::anyhow!("bind admin socket {}: {e}", addr.display()))?;
info!(path = %addr.display(), "admin uds listener bound");
let b = Arc::clone(&broadcaster);
let handle = tokio::spawn(async move {
if let Err(e) = inferd_daemon::admin::serve_admin_uds(listener, b, shutdown_rx).await {
error!(error = ?e, "admin uds listener error");
}
});
Ok((shutdown_tx, handle))
}
#[cfg(windows)]
{
let path_str = addr
.to_str()
.ok_or_else(|| {
anyhow::anyhow!("admin pipe path is not valid utf-8: {}", addr.display())
})?
.to_string();
let first = inferd_daemon::endpoint::bind_admin_pipe(&path_str, true)
.map_err(|e| anyhow::anyhow!("bind admin pipe {}: {e}", path_str))?;
info!(path = %path_str, "admin pipe listener bound");
let b = Arc::clone(&broadcaster);
let handle = tokio::spawn(async move {
if let Err(e) =
inferd_daemon::admin::serve_admin_pipe(&path_str, first, b, shutdown_rx).await
{
error!(error = ?e, "admin pipe listener error");
}
});
Ok((shutdown_tx, handle))
}
}
fn load_config_file(cli_path: Option<&std::path::Path>) -> Option<ConfigFile> {
let path = cli_path
.map(std::path::Path::to_path_buf)
.unwrap_or_else(inferd_daemon::config_file::default_config_path);
match ConfigFile::load(&path) {
Ok(cfg) => {
info!(path = %path.display(), "loaded config file");
Some(cfg)
}
Err(inferd_daemon::config_file::ConfigError::NotFound(_)) => {
info!(path = %path.display(), "no config file; using CLI flags only");
None
}
Err(e) => {
error!(error = %e, "config file load failed");
None
}
}
}
enum ResolvedTransport {
Tcp(String),
Uds(PathBuf),
Pipe(String),
}
fn resolve_v1_transport(
cli: &Cli,
config: Option<&ConfigFile>,
) -> anyhow::Result<ResolvedTransport> {
if let Some(addr) = cli.tcp.as_deref() {
return Ok(ResolvedTransport::Tcp(addr.to_string()));
}
if let Some(path) = cli.uds.as_ref() {
return Ok(ResolvedTransport::Uds(path.clone()));
}
if let Some(path) = cli.pipe.as_ref() {
return Ok(ResolvedTransport::Pipe(path.clone()));
}
if let Some(addr) = config
.and_then(|c| c.listen.as_ref())
.and_then(|l| l.tcp.as_deref())
{
info!(addr = %addr, "tcp listener from config (listen.tcp)");
return Ok(ResolvedTransport::Tcp(addr.to_string()));
}
anyhow::bail!(
"no transport configured: pass --tcp / --uds / --pipe on the CLI, \
or set `listen.tcp` in ~/.inferd/config.json"
)
}
async fn build_backends(
cli: &Cli,
#[cfg_attr(
all(not(feature = "llamacpp"), not(feature = "openai")),
allow(unused_variables)
)]
config: Option<&ConfigFile>,
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Vec<Arc<dyn Backend>>> {
match cli.backend {
BackendKind::Mock => {
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal {
path: PathBuf::from("(mock)"),
},
});
Ok(vec![Arc::new(Mock::new())])
}
#[cfg(any(feature = "llamacpp", feature = "openai", feature = "bedrock"))]
_ => {
if let Some(cfg) = config {
let entries = cfg.resolved_backends();
let auto_pull = cfg.auto_pull;
let mut out: Vec<Arc<dyn Backend>> = Vec::with_capacity(entries.len());
for entry in entries {
let b = build_entry(&entry, cfg, auto_pull, Arc::clone(&broadcaster)).await?;
out.push(b);
}
return Ok(out);
}
match cli.backend {
BackendKind::Mock => unreachable!("handled above"),
#[cfg(feature = "llamacpp")]
BackendKind::Llamacpp => {
let b = build_llamacpp_cli_only(cli, Arc::clone(&broadcaster)).await?;
Ok(vec![b])
}
#[cfg(feature = "openai")]
BackendKind::OpenaiCompat => {
let b = build_openai_compat_cli_only(cli, Arc::clone(&broadcaster))?;
Ok(vec![b])
}
#[cfg(feature = "bedrock")]
BackendKind::BedrockInvoke => {
let b = build_bedrock_invoke_cli_only(cli, Arc::clone(&broadcaster))?;
Ok(vec![b])
}
}
}
}
}
#[cfg(any(feature = "llamacpp", feature = "openai", feature = "bedrock"))]
async fn build_entry(
entry: &BackendEntry,
#[cfg_attr(not(feature = "llamacpp"), allow(unused_variables))] cfg: &ConfigFile,
#[cfg_attr(not(feature = "llamacpp"), allow(unused_variables))] auto_pull: bool,
#[cfg_attr(
all(
not(feature = "llamacpp"),
not(feature = "openai"),
not(feature = "bedrock")
),
allow(unused_variables)
)]
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Arc<dyn Backend>> {
match entry {
#[cfg(feature = "llamacpp")]
BackendEntry::Llamacpp(e) => build_llamacpp_entry(e, cfg, auto_pull, broadcaster).await,
#[cfg(not(feature = "llamacpp"))]
BackendEntry::Llamacpp(_) => {
anyhow::bail!(
"config declares a `kind: llamacpp` backend but this daemon \
binary was built without the `llamacpp` feature"
)
}
#[cfg(feature = "openai")]
BackendEntry::OpenaiCompat(e) => build_openai_compat_entry(e, broadcaster),
#[cfg(not(feature = "openai"))]
BackendEntry::OpenaiCompat(_) => {
anyhow::bail!(
"config declares a `kind: openai-compat` backend but this \
daemon binary was built without the `openai` feature"
)
}
#[cfg(feature = "bedrock")]
BackendEntry::BedrockInvoke(e) => build_bedrock_invoke_entry(e, broadcaster),
#[cfg(not(feature = "bedrock"))]
BackendEntry::BedrockInvoke(_) => {
anyhow::bail!(
"config declares a `kind: bedrock-invoke` backend but this \
daemon binary was built without the `bedrock` feature"
)
}
}
}
#[cfg(feature = "openai")]
fn build_openai_compat_entry(
entry: &OpenaiCompatEntry,
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Arc<dyn Backend>> {
use inferd_engine::openai_compat::{OpenAiCompat, OpenAiCompatConfig};
let api_key = resolve_openai_api_key(entry.api_key_env.as_deref());
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal {
path: PathBuf::from(format!(
"(openai-compat: {} / {})",
entry.base_url, entry.model
)),
},
});
let backend = OpenAiCompat::new(OpenAiCompatConfig {
base_url: entry.base_url.clone(),
api_key,
model: entry.model.clone(),
timeout: Duration::from_secs(entry.timeout_secs),
})
.map_err(|e| anyhow::anyhow!("openai-compat init failed for {}: {e}", entry.name))?;
Ok(Arc::new(backend))
}
#[cfg(feature = "openai")]
fn resolve_openai_api_key(api_key_env: Option<&str>) -> String {
if let Some(name) = api_key_env
&& let Ok(v) = std::env::var(name)
{
return v;
}
if let Ok(v) = std::env::var("INFERD_OPENAI_API_KEY") {
return v;
}
if let Ok(v) = std::env::var("OPENAI_API_KEY") {
return v;
}
String::new()
}
#[cfg(feature = "openai")]
fn build_openai_compat_cli_only(
cli: &Cli,
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Arc<dyn Backend>> {
use inferd_engine::openai_compat::{OpenAiCompat, OpenAiCompatConfig};
let base_url = cli.openai_base_url.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"--backend openai-compat requires --openai-base-url \
(e.g. https://api.openai.com, http://localhost:11434)"
)
})?;
let model = cli.openai_model.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"--backend openai-compat requires --openai-model \
(e.g. gpt-4o-mini, llama3.1:8b)"
)
})?;
let api_key = cli
.openai_api_key
.clone()
.or_else(|| std::env::var("OPENAI_API_KEY").ok())
.unwrap_or_default();
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal {
path: PathBuf::from(format!("(openai-compat: {base_url} / {model})")),
},
});
let backend = OpenAiCompat::new(OpenAiCompatConfig {
base_url: base_url.to_string(),
api_key,
model: model.to_string(),
timeout: Duration::from_secs(cli.openai_timeout_secs),
})
.map_err(|e| anyhow::anyhow!("openai-compat init failed: {e}"))?;
Ok(Arc::new(backend))
}
#[cfg(feature = "bedrock")]
fn build_bedrock_invoke_entry(
entry: &BedrockInvokeEntry,
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Arc<dyn Backend>> {
use inferd_engine::bedrock_invoke::{BedrockInvoke, BedrockInvokeConfig};
let bearer = entry
.bearer_token_env
.as_deref()
.and_then(|name| std::env::var(name).ok())
.filter(|v| !v.is_empty());
let auth = resolve_bedrock_auth(bearer.as_deref()).ok_or_else(|| {
anyhow::anyhow!(
"bedrock-invoke {:?}: no auth credentials. Set the env var named in \
`bearer_token_env` (Bearer auth) or AWS_ACCESS_KEY_ID / \
AWS_SECRET_ACCESS_KEY (SigV4)",
entry.name
)
})?;
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal {
path: PathBuf::from(format!(
"(bedrock-invoke: {} / {})",
entry.region, entry.model_id
)),
},
});
let backend = BedrockInvoke::new(BedrockInvokeConfig {
region: entry.region.clone(),
model_id: entry.model_id.clone(),
auth,
timeout: Duration::from_secs(entry.timeout_secs),
endpoint_override: entry.endpoint.clone().filter(|s| !s.is_empty()),
})
.map_err(|e| anyhow::anyhow!("bedrock-invoke init failed for {}: {e}", entry.name))?;
Ok(Arc::new(backend))
}
#[cfg(feature = "bedrock")]
fn build_bedrock_invoke_cli_only(
cli: &Cli,
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Arc<dyn Backend>> {
use inferd_engine::bedrock_invoke::{BedrockInvoke, BedrockInvokeConfig};
let region = cli.bedrock_region.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"--backend bedrock-invoke requires --bedrock-region \
(e.g. us-east-1, eu-central-1)"
)
})?;
let model_id = cli.bedrock_model_id.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"--backend bedrock-invoke requires --bedrock-model-id \
(e.g. anthropic.claude-3-5-sonnet-20241022-v2:0)"
)
})?;
let bearer = cli
.bedrock_bearer_token
.as_deref()
.filter(|v| !v.is_empty());
let auth = resolve_bedrock_auth(bearer).ok_or_else(|| {
anyhow::anyhow!(
"bedrock-invoke: no auth credentials. Set --bedrock-bearer-token / \
AWS_BEARER_TOKEN_BEDROCK (Bearer auth) or AWS_ACCESS_KEY_ID / \
AWS_SECRET_ACCESS_KEY (SigV4)"
)
})?;
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal {
path: PathBuf::from(format!("(bedrock-invoke: {region} / {model_id})")),
},
});
let backend = BedrockInvoke::new(BedrockInvokeConfig {
region: region.to_string(),
model_id: model_id.to_string(),
auth,
timeout: Duration::from_secs(cli.bedrock_timeout_secs),
endpoint_override: cli.bedrock_endpoint.clone().filter(|s| !s.is_empty()),
})
.map_err(|e| anyhow::anyhow!("bedrock-invoke init failed: {e}"))?;
Ok(Arc::new(backend))
}
#[cfg(feature = "bedrock")]
fn resolve_bedrock_auth(
bearer: Option<&str>,
) -> Option<inferd_engine::bedrock_invoke::BedrockAuth> {
use inferd_engine::bedrock_invoke::BedrockAuth;
if let Some(token) = bearer
&& !token.is_empty()
{
return Some(BedrockAuth::BearerToken(token.to_string()));
}
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok()?;
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok()?;
if access_key_id.is_empty() || secret_access_key.is_empty() {
return None;
}
let session_token = std::env::var("AWS_SESSION_TOKEN")
.ok()
.filter(|v| !v.is_empty());
Some(BedrockAuth::Sigv4 {
access_key_id,
secret_access_key,
session_token,
})
}
#[cfg(feature = "llamacpp")]
async fn build_llamacpp_entry(
entry: &LlamacppEntry,
cfg: &ConfigFile,
auto_pull: bool,
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Arc<dyn Backend>> {
use inferd_daemon::fetch::{ModelSpec, fetch_model};
use inferd_daemon::store::{ModelStore, default_models_home};
use inferd_engine::llamacpp::{LlamaCpp, LlamaCppConfig};
let spec: ModelSpec = (&entry.model).into();
let n_ctx = entry.n_ctx;
let n_gpu_layers = entry.n_gpu_layers;
let model_sha256_bytes = parse_sha256_hex(&entry.model.sha256)?;
let store = match cfg.models_home.as_ref() {
Some(p) => ModelStore::open(p),
None => ModelStore::open(default_models_home()),
};
let model_path = if auto_pull {
let spec_clone = spec.clone();
let store_clone = store.clone();
let bcast = Arc::clone(&broadcaster);
tokio::task::spawn_blocking(move || fetch_model(&spec_clone, &store_clone, &bcast))
.await
.map_err(|e| anyhow::anyhow!("fetch task join: {e}"))?
.map_err(|e| anyhow::anyhow!("fetch failed for {}: {e}", entry.name))?
} else {
let blob_path = store.blob_path(&spec.sha256_hex);
if !blob_path.exists() {
anyhow::bail!(
"model {} not present in store at {} and auto_pull is disabled. \
Run `inferdctl pull` or set auto_pull: true in config.",
entry.name,
blob_path.display()
);
}
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal {
path: blob_path.clone(),
},
});
blob_path
};
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::Mmap {
path: model_path.clone(),
},
});
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::KvCache { n_ctx },
});
let backend = LlamaCpp::new(LlamaCppConfig {
model_path,
model_sha256: Some(model_sha256_bytes),
n_ctx,
n_gpu_layers,
embed: entry.embed,
embed_pooling: entry.embed_pooling,
embed_n_ctx: entry.embed_n_ctx,
..Default::default()
})
.map_err(|e| anyhow::anyhow!("llamacpp init failed for {}: {e}", entry.name))?;
Ok(Arc::new(backend))
}
#[cfg(feature = "llamacpp")]
async fn build_llamacpp_cli_only(
cli: &Cli,
broadcaster: Arc<StatusBroadcaster>,
) -> anyhow::Result<Arc<dyn Backend>> {
use inferd_engine::llamacpp::{LlamaCpp, LlamaCppConfig};
let path = cli.model_path.as_ref().ok_or_else(|| {
anyhow::anyhow!(
"--backend llamacpp needs either ~/.inferd/config.json or \
--model-path/--model-sha256 CLI flags"
)
})?;
let sha_str = cli
.model_sha256
.as_ref()
.ok_or_else(|| anyhow::anyhow!("--model-sha256 is required for --backend llamacpp"))?;
let model_sha256_bytes = parse_sha256_hex(sha_str)?;
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::CheckingLocal { path: path.clone() },
});
if !path.exists() {
anyhow::bail!("model not present at {} (CLI-only mode)", path.display());
}
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::Mmap { path: path.clone() },
});
broadcaster.publish(StatusEvent::LoadingModel {
phase: LoadPhase::KvCache { n_ctx: cli.n_ctx },
});
let backend = LlamaCpp::new(LlamaCppConfig {
model_path: path.clone(),
model_sha256: Some(model_sha256_bytes),
n_ctx: cli.n_ctx,
n_gpu_layers: cli.n_gpu_layers,
..Default::default()
})
.map_err(|e| anyhow::anyhow!("llamacpp init failed: {e}"))?;
Ok(Arc::new(backend))
}
#[cfg(feature = "llamacpp")]
fn parse_sha256_hex(s: &str) -> anyhow::Result<[u8; 32]> {
if s.len() != 64 {
anyhow::bail!("model.sha256 must be 64 hex chars (got {})", s.len());
}
let mut out = [0u8; 32];
for (i, byte) in out.iter_mut().enumerate() {
let hi = hex_digit(s.as_bytes()[i * 2])?;
let lo = hex_digit(s.as_bytes()[i * 2 + 1])?;
*byte = (hi << 4) | lo;
}
Ok(out)
}
#[cfg(feature = "llamacpp")]
fn hex_digit(b: u8) -> anyhow::Result<u8> {
match b {
b'0'..=b'9' => Ok(b - b'0'),
b'a'..=b'f' => Ok(b - b'a' + 10),
b'A'..=b'F' => Ok(b - b'A' + 10),
_ => anyhow::bail!("invalid hex digit: {:?}", b as char),
}
}
fn install_tracing() -> anyhow::Result<()> {
let filter = EnvFilter::try_from_env("INFERD_LOG").unwrap_or_else(|_| EnvFilter::new("info"));
let stderr_layer = tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.compact();
let log_dir = default_log_dir();
let writer = Arc::new(LogxWriter::open(&log_dir, "inferd", DEFAULT_ROTATE_BYTES)?);
let logx_layer = LogxLayer::new(writer);
tracing_subscriber::registry()
.with(filter)
.with(stderr_layer)
.with(logx_layer)
.init();
Ok(())
}
fn install_shutdown_signal(
fanout: usize,
) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<()>>> {
let (txs, rxs): (Vec<_>, Vec<_>) = (0..fanout).map(|_| tokio::sync::oneshot::channel()).unzip();
tokio::spawn(async move {
#[cfg(unix)]
let result: Result<(), std::io::Error> = async {
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate())?;
let mut sigint = signal(SignalKind::interrupt())?;
tokio::select! {
_ = sigterm.recv() => {},
_ = sigint.recv() => {},
}
Ok(())
}
.await;
#[cfg(not(unix))]
let result: Result<(), std::io::Error> = tokio::signal::ctrl_c().await;
if let Err(e) = result {
error!(error = ?e, "signal handler failed; shutdown channels will not fire");
return;
}
for tx in txs {
let _ = tx.send(());
}
});
Ok(rxs)
}