Skip to main content

camel_processor/
endpoint_pipeline.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use camel_api::{BoxProcessor, CamelError, EndpointPipelineConfig, EndpointResolver};
6
7struct EndpointCache {
8    map: HashMap<String, BoxProcessor>,
9    order: VecDeque<String>,
10}
11
12impl EndpointCache {
13    fn new() -> Self {
14        Self {
15            map: HashMap::new(),
16            order: VecDeque::new(),
17        }
18    }
19
20    fn get(&self, uri: &str) -> Option<BoxProcessor> {
21        self.map.get(uri).cloned()
22    }
23
24    fn insert(&mut self, uri: String, endpoint: BoxProcessor, capacity: usize) {
25        if self.map.contains_key(&uri) {
26            return;
27        }
28        if self.map.len() >= capacity
29            && let Some(oldest) = self.order.pop_front()
30        {
31            self.map.remove(&oldest);
32        }
33        self.order.push_back(uri.clone());
34        self.map.insert(uri, endpoint);
35    }
36}
37
38#[derive(Clone)]
39pub struct EndpointPipelineService {
40    resolver: EndpointResolver,
41    cache: Arc<Mutex<EndpointCache>>,
42    config: EndpointPipelineConfig,
43}
44
45impl EndpointPipelineService {
46    // TODO(PROC-004): Add metrics instrumentation on endpoint dispatch — processed count,
47    // error count, and latency are not yet instrumented. Consider wiring a MetricsCollector
48    // here to track resolve() call counts, cache hit/miss rates, and resolution latency.
49    pub fn new(resolver: EndpointResolver, config: EndpointPipelineConfig) -> Self {
50        Self {
51            resolver,
52            cache: Arc::new(Mutex::new(EndpointCache::new())),
53            config,
54        }
55    }
56
57    pub fn resolve(&self, uri: &str) -> Result<Option<BoxProcessor>, CamelError> {
58        {
59            let cache_guard = self.cache.lock().unwrap(); // allow-unwrap
60            if let Some(endpoint) = cache_guard.get(uri) {
61                return Ok(Some(endpoint));
62            }
63        }
64
65        match (self.resolver)(uri) {
66            Some(endpoint) => {
67                if self.config.cache_size > 0 {
68                    let mut cache_guard = self.cache.lock().unwrap(); // allow-unwrap
69                    cache_guard.insert(uri.to_string(), endpoint.clone(), self.config.cache_size);
70                }
71                Ok(Some(endpoint))
72            }
73            None => {
74                if self.config.ignore_invalid_endpoints {
75                    tracing::debug!(uri = uri, "Skipping invalid endpoint");
76                    Ok(None)
77                } else {
78                    Err(CamelError::ProcessorError(format!(
79                        "Invalid endpoint: {}",
80                        uri
81                    )))
82                }
83            }
84        }
85    }
86}