camel_processor/
endpoint_pipeline.rs1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3use std::sync::Mutex;
4
5use camel_api::{BoxProcessor, CamelError, EndpointPipelineConfig, EndpointResolver};
6
7struct EndpointCache {
8 map: HashMap<String, BoxProcessor>,
9 order: VecDeque<String>,
10}
11
12impl EndpointCache {
13 fn new() -> Self {
14 Self {
15 map: HashMap::new(),
16 order: VecDeque::new(),
17 }
18 }
19
20 fn get(&self, uri: &str) -> Option<BoxProcessor> {
21 self.map.get(uri).cloned()
22 }
23
24 fn insert(&mut self, uri: String, endpoint: BoxProcessor, capacity: usize) {
25 if self.map.contains_key(&uri) {
26 return;
27 }
28 if self.map.len() >= capacity
29 && let Some(oldest) = self.order.pop_front()
30 {
31 self.map.remove(&oldest);
32 }
33 self.order.push_back(uri.clone());
34 self.map.insert(uri, endpoint);
35 }
36}
37
38#[derive(Clone)]
39pub struct EndpointPipelineService {
40 resolver: EndpointResolver,
41 cache: Arc<Mutex<EndpointCache>>,
42 config: EndpointPipelineConfig,
43}
44
45impl EndpointPipelineService {
46 pub fn new(resolver: EndpointResolver, config: EndpointPipelineConfig) -> Self {
50 Self {
51 resolver,
52 cache: Arc::new(Mutex::new(EndpointCache::new())),
53 config,
54 }
55 }
56
57 pub fn resolve(&self, uri: &str) -> Result<Option<BoxProcessor>, CamelError> {
58 {
59 let cache_guard = self.cache.lock().unwrap(); if let Some(endpoint) = cache_guard.get(uri) {
61 return Ok(Some(endpoint));
62 }
63 }
64
65 match (self.resolver)(uri) {
66 Some(endpoint) => {
67 if self.config.cache_size > 0 {
68 let mut cache_guard = self.cache.lock().unwrap(); cache_guard.insert(uri.to_string(), endpoint.clone(), self.config.cache_size);
70 }
71 Ok(Some(endpoint))
72 }
73 None => {
74 if self.config.ignore_invalid_endpoints {
75 tracing::debug!(uri = uri, "Skipping invalid endpoint");
76 Ok(None)
77 } else {
78 Err(CamelError::ProcessorError(format!(
79 "Invalid endpoint: {}",
80 uri
81 )))
82 }
83 }
84 }
85 }
86}