camel-component-llm 0.18.0

LLM chat, embeddings, tool calling, multi-turn, response cache, cost observability, retry, and concurrency control via OpenAI, Ollama, or Mock
Documentation
use std::sync::Arc;
use std::time::Duration;

use camel_component_api::{
    BoxProcessor, CamelError, Consumer, Endpoint, NetworkRetryPolicy, ProducerContext,
    RuntimeObservability,
};
use tokio::sync::Semaphore;

use crate::LlmGlobalConfig;
use crate::config::LlmEndpointConfig;
use crate::cost::PricingTable;
use crate::producer::LlmProducer;
use crate::producer_cache::ProducerCache;
use crate::provider::LlmProvider;
use crate::provider_factory::ProviderMap;

/// Resolve the provider name from an endpoint config, falling back to the
/// global default. Returns the provider name or an error if neither is set.
///
/// Shared between `create_endpoint` (fail-fast validation in lib.rs) and
/// `resolve_provider` (actual provider lookup at producer creation time).
pub(crate) fn resolve_provider_name<'a>(
    config: &'a LlmEndpointConfig,
    global: &'a LlmGlobalConfig,
) -> Result<&'a str, CamelError> {
    config
        .provider
        .as_deref()
        .or(global.default_provider.as_deref())
        .ok_or_else(|| {
            CamelError::InvalidUri(
                "no provider specified and no default_provider configured".into(),
            )
        })
}

pub struct LlmEndpoint {
    uri: String,
    pub config: LlmEndpointConfig,
    providers: Arc<ProviderMap>,
    global_config: Arc<LlmGlobalConfig>,
}

impl LlmEndpoint {
    pub fn new(
        uri: String,
        config: LlmEndpointConfig,
        providers: Arc<ProviderMap>,
        global_config: Arc<LlmGlobalConfig>,
    ) -> Self {
        Self {
            uri,
            config,
            providers,
            global_config,
        }
    }

    #[allow(clippy::type_complexity)]
    fn resolve_provider(
        &self,
    ) -> Result<
        (
            Arc<dyn LlmProvider>,
            Option<usize>,
            Option<u64>,
            Option<NetworkRetryPolicy>,
            Option<PricingTable>,
            Option<(u64, Option<usize>)>, // (cache_ttl_secs, cache_max_entries)
        ),
        CamelError,
    > {
        let name = resolve_provider_name(&self.config, &self.global_config)?;
        let provider = self.providers.get(name).cloned().ok_or_else(|| {
            CamelError::InvalidUri(format!("provider '{}' not found in config", name))
        })?;
        let max_concurrency = self
            .global_config
            .providers
            .get(name)
            .and_then(|pc| pc.max_concurrency());
        let provider_timeout = self
            .global_config
            .providers
            .get(name)
            .and_then(|pc| pc.timeout_secs());
        let network_retry = self
            .global_config
            .providers
            .get(name)
            .and_then(|pc| pc.network_retry());
        let pricing = self
            .global_config
            .providers
            .get(name)
            .and_then(|pc| pc.pricing());
        let cache_ttl = self
            .global_config
            .providers
            .get(name)
            .and_then(|pc| pc.cache_ttl_secs());
        let cache_max = self
            .global_config
            .providers
            .get(name)
            .and_then(|pc| pc.cache_max_entries());
        let cache_info = cache_ttl.map(|ttl| (ttl, cache_max));
        Ok((
            provider,
            max_concurrency,
            provider_timeout,
            network_retry,
            pricing,
            cache_info,
        ))
    }
}

impl Endpoint for LlmEndpoint {
    fn uri(&self) -> &str {
        &self.uri
    }

    fn create_producer(
        &self,
        rt: Arc<dyn RuntimeObservability>,
        ctx: &ProducerContext,
    ) -> Result<BoxProcessor, CamelError> {
        let (provider, max_concurrency, provider_timeout, network_retry, pricing, cache_info) =
            self.resolve_provider()?;

        let route_id = ctx.route_id().unwrap_or("unknown").to_string();

        let semaphore = max_concurrency.map(|n| Arc::new(Semaphore::new(n)));

        // Effective timeout: provider overrides global, None = no timeout.
        let timeout_secs = provider_timeout.or(self.global_config.timeout_secs);
        let timeout = timeout_secs.map(Duration::from_secs);

        let pricing = pricing.map(Arc::new);

        // Build cache from config, if configured.
        let cache = cache_info.map(|(ttl_secs, max_entries)| {
            if max_entries.is_none() {
                tracing::warn!(
                    route_id = %route_id,
                    "cache_max_entries is not set — cache is unbounded (high-cardinality prompts may cause memory growth)"
                );
            }
            Arc::new(ProducerCache::new(Duration::from_secs(ttl_secs), max_entries))
        });

        let producer = LlmProducer::new(
            self.config.clone(),
            provider,
            self.global_config.max_prompt_bytes,
            route_id,
        )
        .with_semaphore(semaphore)
        .with_timeout(timeout)
        .with_retry(network_retry)
        .with_pricing(pricing)
        .with_cache(cache)
        .with_observability(Some(rt))
        .build();
        Ok(BoxProcessor::new(producer))
    }

    fn create_consumer(
        &self,
        _rt: Arc<dyn RuntimeObservability>,
    ) -> Result<Box<dyn Consumer>, CamelError> {
        Err(CamelError::InvalidUri(
            "llm component does not support consumers in MVP".into(),
        ))
    }
}