use std::sync::Arc;
use std::time::Duration;
use camel_component_api::{
BoxProcessor, CamelError, Consumer, Endpoint, NetworkRetryPolicy, ProducerContext,
RuntimeObservability,
};
use tokio::sync::Semaphore;
use crate::LlmGlobalConfig;
use crate::config::LlmEndpointConfig;
use crate::cost::PricingTable;
use crate::producer::LlmProducer;
use crate::producer_cache::ProducerCache;
use crate::provider::LlmProvider;
use crate::provider_factory::ProviderMap;
pub(crate) fn resolve_provider_name<'a>(
config: &'a LlmEndpointConfig,
global: &'a LlmGlobalConfig,
) -> Result<&'a str, CamelError> {
config
.provider
.as_deref()
.or(global.default_provider.as_deref())
.ok_or_else(|| {
CamelError::InvalidUri(
"no provider specified and no default_provider configured".into(),
)
})
}
pub struct LlmEndpoint {
uri: String,
pub config: LlmEndpointConfig,
providers: Arc<ProviderMap>,
global_config: Arc<LlmGlobalConfig>,
}
impl LlmEndpoint {
pub fn new(
uri: String,
config: LlmEndpointConfig,
providers: Arc<ProviderMap>,
global_config: Arc<LlmGlobalConfig>,
) -> Self {
Self {
uri,
config,
providers,
global_config,
}
}
#[allow(clippy::type_complexity)]
fn resolve_provider(
&self,
) -> Result<
(
Arc<dyn LlmProvider>,
Option<usize>,
Option<u64>,
Option<NetworkRetryPolicy>,
Option<PricingTable>,
Option<(u64, Option<usize>)>, // (cache_ttl_secs, cache_max_entries)
),
CamelError,
> {
let name = resolve_provider_name(&self.config, &self.global_config)?;
let provider = self.providers.get(name).cloned().ok_or_else(|| {
CamelError::InvalidUri(format!("provider '{}' not found in config", name))
})?;
let max_concurrency = self
.global_config
.providers
.get(name)
.and_then(|pc| pc.max_concurrency());
let provider_timeout = self
.global_config
.providers
.get(name)
.and_then(|pc| pc.timeout_secs());
let network_retry = self
.global_config
.providers
.get(name)
.and_then(|pc| pc.network_retry());
let pricing = self
.global_config
.providers
.get(name)
.and_then(|pc| pc.pricing());
let cache_ttl = self
.global_config
.providers
.get(name)
.and_then(|pc| pc.cache_ttl_secs());
let cache_max = self
.global_config
.providers
.get(name)
.and_then(|pc| pc.cache_max_entries());
let cache_info = cache_ttl.map(|ttl| (ttl, cache_max));
Ok((
provider,
max_concurrency,
provider_timeout,
network_retry,
pricing,
cache_info,
))
}
}
impl Endpoint for LlmEndpoint {
fn uri(&self) -> &str {
&self.uri
}
fn create_producer(
&self,
rt: Arc<dyn RuntimeObservability>,
ctx: &ProducerContext,
) -> Result<BoxProcessor, CamelError> {
let (provider, max_concurrency, provider_timeout, network_retry, pricing, cache_info) =
self.resolve_provider()?;
let route_id = ctx.route_id().unwrap_or("unknown").to_string();
let semaphore = max_concurrency.map(|n| Arc::new(Semaphore::new(n)));
let timeout_secs = provider_timeout.or(self.global_config.timeout_secs);
let timeout = timeout_secs.map(Duration::from_secs);
let pricing = pricing.map(Arc::new);
let cache = cache_info.map(|(ttl_secs, max_entries)| {
if max_entries.is_none() {
tracing::warn!(
route_id = %route_id,
"cache_max_entries is not set — cache is unbounded (high-cardinality prompts may cause memory growth)"
);
}
Arc::new(ProducerCache::new(Duration::from_secs(ttl_secs), max_entries))
});
let producer = LlmProducer::new(
self.config.clone(),
provider,
self.global_config.max_prompt_bytes,
route_id,
)
.with_semaphore(semaphore)
.with_timeout(timeout)
.with_retry(network_retry)
.with_pricing(pricing)
.with_cache(cache)
.with_observability(Some(rt))
.build();
Ok(BoxProcessor::new(producer))
}
fn create_consumer(
&self,
_rt: Arc<dyn RuntimeObservability>,
) -> Result<Box<dyn Consumer>, CamelError> {
Err(CamelError::InvalidUri(
"llm component does not support consumers in MVP".into(),
))
}
}