Skip to main content

camel_api/
datasource.rs

1use std::any::Any;
2use std::fmt;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6
7use serde::{Deserialize, Serialize};
8
9use crate::error::CamelError;
10use crate::lifecycle::HealthStatus;
11
12#[derive(Clone, Deserialize, Serialize, PartialEq)]
13#[serde(deny_unknown_fields)]
14pub struct DatasourceConfig {
15    pub db_url: String,
16    #[serde(default)]
17    pub provider: Option<String>,
18    #[serde(default)]
19    pub max_connections: Option<u32>,
20    #[serde(default)]
21    pub min_connections: Option<u32>,
22    #[serde(default)]
23    pub idle_timeout_secs: Option<u64>,
24    #[serde(default)]
25    pub max_lifetime_secs: Option<u64>,
26    #[serde(default)]
27    pub ssl_mode: Option<String>,
28    #[serde(default)]
29    pub ssl_root_cert: Option<String>,
30    #[serde(default)]
31    pub ssl_cert: Option<String>,
32    #[serde(default)]
33    pub ssl_key: Option<String>,
34}
35
36impl fmt::Debug for DatasourceConfig {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("DatasourceConfig")
39            .field("db_url", &"[REDACTED]")
40            .field("provider", &self.provider)
41            .field("max_connections", &self.max_connections)
42            .field("min_connections", &self.min_connections)
43            .field("idle_timeout_secs", &self.idle_timeout_secs)
44            .field("max_lifetime_secs", &self.max_lifetime_secs)
45            .field("ssl_mode", &self.ssl_mode)
46            .field("ssl_root_cert", &self.ssl_root_cert)
47            .field("ssl_cert", &self.ssl_cert)
48            .field("ssl_key", &self.ssl_key.as_ref().map(|_| "[REDACTED]"))
49            .finish()
50    }
51}
52
53impl DatasourceConfig {
54    pub fn validate(&self) -> Result<(), CamelError> {
55        if self.db_url.trim().is_empty() {
56            return Err(CamelError::Config(
57                "datasource db_url cannot be empty".into(),
58            ));
59        }
60        Ok(())
61    }
62}
63
64#[derive(Clone)]
65pub struct DatasourceHandle {
66    pub name: String,
67    pub provider: String,
68    inner: Arc<dyn Any + Send + Sync>,
69}
70
71impl DatasourceHandle {
72    pub fn new(name: String, provider: String, inner: Arc<dyn Any + Send + Sync>) -> Self {
73        Self {
74            name,
75            provider,
76            inner,
77        }
78    }
79
80    pub fn downcast<T: 'static + Send + Sync>(&self) -> Result<Arc<T>, CamelError> {
81        self.inner.clone().downcast::<T>().map_err(|_| {
82            CamelError::ProcessorError(format!(
83                "datasource '{}' (provider '{}'): failed to downcast handle",
84                self.name, self.provider
85            ))
86        })
87    }
88}
89
90impl fmt::Debug for DatasourceHandle {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("DatasourceHandle")
93            .field("name", &self.name)
94            .field("provider", &self.provider)
95            .finish()
96    }
97}
98
99#[doc(hidden)]
100pub struct ResourceRef {
101    pub kind: String,
102    pub name: String,
103}
104
105impl fmt::Debug for ResourceRef {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        f.debug_struct("ResourceRef")
108            .field("kind", &self.kind)
109            .field("name", &self.name)
110            .finish()
111    }
112}
113
114pub type CreatePoolResult = Result<Arc<dyn Any + Send + Sync>, CamelError>;
115pub type CreatePoolFuture<'a> = Pin<Box<dyn Future<Output = CreatePoolResult> + Send + 'a>>;
116pub type CheckFuture<'a> = Pin<Box<dyn Future<Output = HealthStatus> + Send + 'a>>;
117
118pub trait PoolFactory: Send + Sync + 'static {
119    fn create<'a>(&'a self, config: &'a DatasourceConfig) -> CreatePoolFuture<'a>;
120
121    fn check<'a>(&'a self, handle: &'a DatasourceHandle) -> CheckFuture<'a>;
122
123    fn supported_schemes(&self) -> &[&str];
124
125    fn matches(&self, config: &DatasourceConfig) -> bool {
126        self.supported_schemes().iter().any(|s| {
127            config.db_url.starts_with(&format!("{}://", s))
128                || config.db_url.starts_with(&format!("{}::", s))
129        })
130    }
131
132    fn name(&self) -> &'static str;
133}
134
135pub type GetPoolFuture<'a> =
136    Pin<Box<dyn Future<Output = Result<DatasourceHandle, CamelError>> + Send + 'a>>;
137
138pub trait DatasourceCatalog: Send + Sync {
139    fn get_config(&self, name: &str) -> Option<DatasourceConfig>;
140    fn get_pool<'a>(&'a self, name: &'a str) -> GetPoolFuture<'a>;
141    fn register_factory(&self, kind: &str, factory: Arc<dyn PoolFactory>)
142    -> Result<(), CamelError>;
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn datasource_config_validate_rejects_empty_db_url() {
151        let config = DatasourceConfig {
152            db_url: "".to_string(),
153            provider: None,
154            max_connections: None,
155            min_connections: None,
156            idle_timeout_secs: None,
157            max_lifetime_secs: None,
158            ssl_mode: None,
159            ssl_root_cert: None,
160            ssl_cert: None,
161            ssl_key: None,
162        };
163        let result = config.validate();
164        assert!(result.is_err());
165        assert!(result.unwrap_err().to_string().contains("empty"));
166    }
167
168    #[test]
169    fn datasource_config_validate_accepts_valid() {
170        let config = DatasourceConfig {
171            db_url: "postgresql://localhost:5432/mydb".to_string(),
172            provider: None,
173            max_connections: Some(10),
174            min_connections: Some(2),
175            idle_timeout_secs: Some(300),
176            max_lifetime_secs: Some(1800),
177            ssl_mode: None,
178            ssl_root_cert: None,
179            ssl_cert: None,
180            ssl_key: None,
181        };
182        assert!(config.validate().is_ok());
183    }
184
185    #[test]
186    fn datasource_config_debug_redacts_db_url() {
187        let config = DatasourceConfig {
188            db_url: "postgresql://user:pass@localhost:5432/mydb".to_string(),
189            provider: None,
190            max_connections: None,
191            min_connections: None,
192            idle_timeout_secs: None,
193            max_lifetime_secs: None,
194            ssl_mode: None,
195            ssl_root_cert: None,
196            ssl_cert: None,
197            ssl_key: None,
198        };
199        let debug_str = format!("{:?}", config);
200        assert!(
201            debug_str.contains("[REDACTED]"),
202            "Debug output should redact db_url: {}",
203            debug_str
204        );
205        assert!(
206            !debug_str.contains("user:pass"),
207            "Debug output should not contain credentials: {}",
208            debug_str
209        );
210    }
211
212    #[test]
213    fn datasource_handle_downcast_fails_on_wrong_type() {
214        let handle = DatasourceHandle::new("test".to_string(), "mock".to_string(), Arc::new(42u32));
215        let result: Result<Arc<String>, CamelError> = handle.downcast();
216        assert!(result.is_err());
217        let err = result.unwrap_err();
218        assert!(err.to_string().contains("failed to downcast"));
219    }
220
221    #[test]
222    fn pool_factory_matches_by_scheme() {
223        struct PostgresFactory;
224        impl PoolFactory for PostgresFactory {
225            fn create<'a>(&'a self, _config: &'a DatasourceConfig) -> CreatePoolFuture<'a> {
226                Box::pin(async { Ok(Arc::new("pool") as Arc<dyn Any + Send + Sync>) })
227            }
228            fn check<'a>(&'a self, _handle: &'a DatasourceHandle) -> CheckFuture<'a> {
229                Box::pin(async { HealthStatus::Healthy })
230            }
231            fn supported_schemes(&self) -> &[&str] {
232                &["postgresql", "postgres"]
233            }
234            fn name(&self) -> &'static str {
235                "postgres"
236            }
237        }
238
239        let factory = PostgresFactory;
240        let pg_config = DatasourceConfig {
241            db_url: "postgresql://localhost/mydb".to_string(),
242            provider: None,
243            max_connections: None,
244            min_connections: None,
245            idle_timeout_secs: None,
246            max_lifetime_secs: None,
247            ssl_mode: None,
248            ssl_root_cert: None,
249            ssl_cert: None,
250            ssl_key: None,
251        };
252        assert!(factory.matches(&pg_config));
253
254        let mysql_config = DatasourceConfig {
255            db_url: "mysql://localhost/mydb".to_string(),
256            provider: None,
257            max_connections: None,
258            min_connections: None,
259            idle_timeout_secs: None,
260            max_lifetime_secs: None,
261            ssl_mode: None,
262            ssl_root_cert: None,
263            ssl_cert: None,
264            ssl_key: None,
265        };
266        assert!(!factory.matches(&mysql_config));
267    }
268}