camel_processor/
endpoint_pipeline.rs1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use camel_api::{BoxProcessor, CamelError, EndpointPipelineConfig, EndpointResolver};
6
7struct EndpointCache {
8 map: HashMap<String, BoxProcessor>,
9 order: VecDeque<String>,
10}
11
12impl EndpointCache {
13 fn new() -> Self {
14 Self {
15 map: HashMap::new(),
16 order: VecDeque::new(),
17 }
18 }
19
20 fn get(&self, uri: &str) -> Option<BoxProcessor> {
21 self.map.get(uri).cloned()
22 }
23
24 fn insert(&mut self, uri: String, endpoint: BoxProcessor, capacity: usize) {
25 if self.map.contains_key(&uri) {
26 return;
27 }
28 if self.map.len() >= capacity
29 && let Some(oldest) = self.order.pop_front()
30 {
31 self.map.remove(&oldest);
32 }
33 self.order.push_back(uri.clone());
34 self.map.insert(uri, endpoint);
35 }
36}
37
38#[derive(Clone)]
39pub struct EndpointPipelineService {
40 resolver: EndpointResolver,
41 cache: Arc<Mutex<EndpointCache>>,
42 config: EndpointPipelineConfig,
43}
44
45impl EndpointPipelineService {
46 pub fn new(resolver: EndpointResolver, config: EndpointPipelineConfig) -> Self {
47 Self {
48 resolver,
49 cache: Arc::new(Mutex::new(EndpointCache::new())),
50 config,
51 }
52 }
53
54 pub fn resolve(&self, uri: &str) -> Result<Option<BoxProcessor>, CamelError> {
55 {
56 let cache_guard = self.cache.lock().unwrap();
57 if let Some(endpoint) = cache_guard.get(uri) {
58 return Ok(Some(endpoint));
59 }
60 }
61
62 match (self.resolver)(uri) {
63 Some(endpoint) => {
64 if self.config.cache_size > 0 {
65 let mut cache_guard = self.cache.lock().unwrap();
66 cache_guard.insert(uri.to_string(), endpoint.clone(), self.config.cache_size);
67 }
68 Ok(Some(endpoint))
69 }
70 None => {
71 if self.config.ignore_invalid_endpoints {
72 tracing::debug!(uri = uri, "Skipping invalid endpoint");
73 Ok(None)
74 } else {
75 Err(CamelError::ProcessorError(format!(
76 "Invalid endpoint: {}",
77 uri
78 )))
79 }
80 }
81 }
82 }
83}