camel-processor 0.6.2

Message processors for rust-camel
Documentation
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::Mutex;

use camel_api::{BoxProcessor, CamelError, EndpointPipelineConfig, EndpointResolver};

struct EndpointCache {
    map: HashMap<String, BoxProcessor>,
    order: VecDeque<String>,
}

impl EndpointCache {
    fn new() -> Self {
        Self {
            map: HashMap::new(),
            order: VecDeque::new(),
        }
    }

    fn get(&self, uri: &str) -> Option<BoxProcessor> {
        self.map.get(uri).cloned()
    }

    fn insert(&mut self, uri: String, endpoint: BoxProcessor, capacity: usize) {
        if self.map.contains_key(&uri) {
            return;
        }
        if self.map.len() >= capacity
            && let Some(oldest) = self.order.pop_front()
        {
            self.map.remove(&oldest);
        }
        self.order.push_back(uri.clone());
        self.map.insert(uri, endpoint);
    }
}

#[derive(Clone)]
pub struct EndpointPipelineService {
    resolver: EndpointResolver,
    cache: Arc<Mutex<EndpointCache>>,
    config: EndpointPipelineConfig,
}

impl EndpointPipelineService {
    pub fn new(resolver: EndpointResolver, config: EndpointPipelineConfig) -> Self {
        Self {
            resolver,
            cache: Arc::new(Mutex::new(EndpointCache::new())),
            config,
        }
    }

    pub fn resolve(&self, uri: &str) -> Result<Option<BoxProcessor>, CamelError> {
        {
            let cache_guard = self.cache.lock().unwrap();
            if let Some(endpoint) = cache_guard.get(uri) {
                return Ok(Some(endpoint));
            }
        }

        match (self.resolver)(uri) {
            Some(endpoint) => {
                if self.config.cache_size > 0 {
                    let mut cache_guard = self.cache.lock().unwrap();
                    cache_guard.insert(uri.to_string(), endpoint.clone(), self.config.cache_size);
                }
                Ok(Some(endpoint))
            }
            None => {
                if self.config.ignore_invalid_endpoints {
                    tracing::debug!(uri = uri, "Skipping invalid endpoint");
                    Ok(None)
                } else {
                    Err(CamelError::ProcessorError(format!(
                        "Invalid endpoint: {}",
                        uri
                    )))
                }
            }
        }
    }
}