Skip to main content

camel_processor/
endpoint_pipeline.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use camel_api::{BoxProcessor, CamelError, EndpointPipelineConfig, EndpointResolver};
6
7struct EndpointCache {
8    map: HashMap<String, BoxProcessor>,
9    order: VecDeque<String>,
10}
11
12impl EndpointCache {
13    fn new() -> Self {
14        Self {
15            map: HashMap::new(),
16            order: VecDeque::new(),
17        }
18    }
19
20    fn get(&self, uri: &str) -> Option<BoxProcessor> {
21        self.map.get(uri).cloned()
22    }
23
24    fn insert(&mut self, uri: String, endpoint: BoxProcessor, capacity: usize) {
25        if self.map.contains_key(&uri) {
26            return;
27        }
28        if self.map.len() >= capacity
29            && let Some(oldest) = self.order.pop_front()
30        {
31            self.map.remove(&oldest);
32        }
33        self.order.push_back(uri.clone());
34        self.map.insert(uri, endpoint);
35    }
36}
37
38#[derive(Clone)]
39pub struct EndpointPipelineService {
40    resolver: EndpointResolver,
41    cache: Arc<Mutex<EndpointCache>>,
42    config: EndpointPipelineConfig,
43}
44
45impl EndpointPipelineService {
46    pub fn new(resolver: EndpointResolver, config: EndpointPipelineConfig) -> Self {
47        Self {
48            resolver,
49            cache: Arc::new(Mutex::new(EndpointCache::new())),
50            config,
51        }
52    }
53
54    pub fn resolve(&self, uri: &str) -> Result<Option<BoxProcessor>, CamelError> {
55        {
56            let cache_guard = self.cache.lock().unwrap();
57            if let Some(endpoint) = cache_guard.get(uri) {
58                return Ok(Some(endpoint));
59            }
60        }
61
62        match (self.resolver)(uri) {
63            Some(endpoint) => {
64                if self.config.cache_size > 0 {
65                    let mut cache_guard = self.cache.lock().unwrap();
66                    cache_guard.insert(uri.to_string(), endpoint.clone(), self.config.cache_size);
67                }
68                Ok(Some(endpoint))
69            }
70            None => {
71                if self.config.ignore_invalid_endpoints {
72                    tracing::debug!(uri = uri, "Skipping invalid endpoint");
73                    Ok(None)
74                } else {
75                    Err(CamelError::ProcessorError(format!(
76                        "Invalid endpoint: {}",
77                        uri
78                    )))
79                }
80            }
81        }
82    }
83}