Skip to main content

camel_api/
datasource.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use serde::{Deserialize, Serialize};
9
10use crate::error::CamelError;
11use crate::lifecycle::HealthStatus;
12
13#[derive(Clone, Deserialize, Serialize, PartialEq)]
14#[serde(deny_unknown_fields)]
15pub struct DatasourceConfig {
16    pub db_url: String,
17    #[serde(default)]
18    pub provider: Option<String>,
19    #[serde(default)]
20    pub max_connections: Option<u32>,
21    #[serde(default)]
22    pub min_connections: Option<u32>,
23    #[serde(default)]
24    pub idle_timeout_secs: Option<u64>,
25    #[serde(default)]
26    pub max_lifetime_secs: Option<u64>,
27    #[serde(default)]
28    pub ssl_mode: Option<String>,
29    #[serde(default)]
30    pub ssl_root_cert: Option<String>,
31    #[serde(default)]
32    pub ssl_cert: Option<String>,
33    #[serde(default)]
34    pub ssl_key: Option<String>,
35    /// Generic key-value pairs for database-specific configuration.
36    /// Components read their bespoke fields from here.
37    /// SQL ignores this; SurrealDB reads namespace/database/username/password.
38    #[serde(default)]
39    pub extra: HashMap<String, toml::Value>,
40}
41
42impl fmt::Debug for DatasourceConfig {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        f.debug_struct("DatasourceConfig")
45            .field("db_url", &"[REDACTED]")
46            .field("provider", &self.provider)
47            .field("max_connections", &self.max_connections)
48            .field("min_connections", &self.min_connections)
49            .field("idle_timeout_secs", &self.idle_timeout_secs)
50            .field("max_lifetime_secs", &self.max_lifetime_secs)
51            .field("ssl_mode", &self.ssl_mode)
52            .field("ssl_root_cert", &self.ssl_root_cert)
53            .field("ssl_cert", &self.ssl_cert)
54            .field("ssl_key", &self.ssl_key.as_ref().map(|_| "[REDACTED]"))
55            .field("extra", &"[REDACTED]")
56            .finish()
57    }
58}
59
60impl DatasourceConfig {
61    pub fn validate(&self) -> Result<(), CamelError> {
62        if self.db_url.trim().is_empty() {
63            return Err(CamelError::Config(
64                "datasource db_url cannot be empty".into(),
65            ));
66        }
67        Ok(())
68    }
69}
70
71#[derive(Clone)]
72pub struct DatasourceHandle {
73    pub name: String,
74    pub provider: String,
75    inner: Arc<dyn Any + Send + Sync>,
76}
77
78impl DatasourceHandle {
79    pub fn new(name: String, provider: String, inner: Arc<dyn Any + Send + Sync>) -> Self {
80        Self {
81            name,
82            provider,
83            inner,
84        }
85    }
86
87    pub fn downcast<T: 'static + Send + Sync>(&self) -> Result<Arc<T>, CamelError> {
88        self.inner.clone().downcast::<T>().map_err(|_| {
89            CamelError::ProcessorError(format!(
90                "datasource '{}' (provider '{}'): failed to downcast handle",
91                self.name, self.provider
92            ))
93        })
94    }
95}
96
97impl fmt::Debug for DatasourceHandle {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        f.debug_struct("DatasourceHandle")
100            .field("name", &self.name)
101            .field("provider", &self.provider)
102            .finish()
103    }
104}
105
106#[doc(hidden)]
107pub struct ResourceRef {
108    pub kind: String,
109    pub name: String,
110}
111
112impl fmt::Debug for ResourceRef {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("ResourceRef")
115            .field("kind", &self.kind)
116            .field("name", &self.name)
117            .finish()
118    }
119}
120
121pub type CreatePoolResult = Result<Arc<dyn Any + Send + Sync>, CamelError>;
122pub type CreatePoolFuture<'a> = Pin<Box<dyn Future<Output = CreatePoolResult> + Send + 'a>>;
123pub type CheckFuture<'a> = Pin<Box<dyn Future<Output = HealthStatus> + Send + 'a>>;
124
125pub trait PoolFactory: Send + Sync + 'static {
126    fn create<'a>(&'a self, config: &'a DatasourceConfig) -> CreatePoolFuture<'a>;
127
128    fn check<'a>(&'a self, handle: &'a DatasourceHandle) -> CheckFuture<'a>;
129
130    fn supported_schemes(&self) -> &[&str];
131
132    fn matches(&self, config: &DatasourceConfig) -> bool {
133        self.supported_schemes().iter().any(|s| {
134            config.db_url.starts_with(&format!("{}://", s))
135                || config.db_url.starts_with(&format!("{}::", s))
136        })
137    }
138
139    fn name(&self) -> &'static str;
140}
141
142pub type GetPoolFuture<'a> =
143    Pin<Box<dyn Future<Output = Result<DatasourceHandle, CamelError>> + Send + 'a>>;
144
145pub trait DatasourceCatalog: Send + Sync {
146    fn get_config(&self, name: &str) -> Option<DatasourceConfig>;
147    fn get_pool<'a>(&'a self, name: &'a str) -> GetPoolFuture<'a>;
148    fn register_factory(&self, kind: &str, factory: Arc<dyn PoolFactory>)
149    -> Result<(), CamelError>;
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155
156    #[test]
157    fn datasource_config_validate_rejects_empty_db_url() {
158        let config = DatasourceConfig {
159            db_url: "".to_string(),
160            provider: None,
161            max_connections: None,
162            min_connections: None,
163            idle_timeout_secs: None,
164            max_lifetime_secs: None,
165            ssl_mode: None,
166            ssl_root_cert: None,
167            ssl_cert: None,
168            ssl_key: None,
169            extra: HashMap::new(),
170        };
171        let result = config.validate();
172        assert!(result.is_err());
173        assert!(result.unwrap_err().to_string().contains("empty"));
174    }
175
176    #[test]
177    fn datasource_config_validate_accepts_valid() {
178        let config = DatasourceConfig {
179            db_url: "postgresql://localhost:5432/mydb".to_string(),
180            provider: None,
181            max_connections: Some(10),
182            min_connections: Some(2),
183            idle_timeout_secs: Some(300),
184            max_lifetime_secs: Some(1800),
185            ssl_mode: None,
186            ssl_root_cert: None,
187            ssl_cert: None,
188            ssl_key: None,
189            extra: HashMap::new(),
190        };
191        assert!(config.validate().is_ok());
192    }
193
194    #[test]
195    fn datasource_config_debug_redacts_db_url() {
196        let config = DatasourceConfig {
197            db_url: "postgresql://user:pass@localhost:5432/mydb".to_string(),
198            provider: None,
199            max_connections: None,
200            min_connections: None,
201            idle_timeout_secs: None,
202            max_lifetime_secs: None,
203            ssl_mode: None,
204            ssl_root_cert: None,
205            ssl_cert: None,
206            ssl_key: None,
207            extra: HashMap::new(),
208        };
209        let debug_str = format!("{:?}", config);
210        assert!(
211            debug_str.contains("[REDACTED]"),
212            "Debug output should redact db_url: {}",
213            debug_str
214        );
215        assert!(
216            !debug_str.contains("user:pass"),
217            "Debug output should not contain credentials: {}",
218            debug_str
219        );
220    }
221
222    #[test]
223    fn datasource_handle_downcast_fails_on_wrong_type() {
224        let handle = DatasourceHandle::new("test".to_string(), "mock".to_string(), Arc::new(42u32));
225        let result: Result<Arc<String>, CamelError> = handle.downcast();
226        assert!(result.is_err());
227        let err = result.unwrap_err();
228        assert!(err.to_string().contains("failed to downcast"));
229    }
230
231    #[test]
232    fn pool_factory_matches_by_scheme() {
233        struct PostgresFactory;
234        impl PoolFactory for PostgresFactory {
235            fn create<'a>(&'a self, _config: &'a DatasourceConfig) -> CreatePoolFuture<'a> {
236                Box::pin(async { Ok(Arc::new("pool") as Arc<dyn Any + Send + Sync>) })
237            }
238            fn check<'a>(&'a self, _handle: &'a DatasourceHandle) -> CheckFuture<'a> {
239                Box::pin(async { HealthStatus::Healthy })
240            }
241            fn supported_schemes(&self) -> &[&str] {
242                &["postgresql", "postgres"]
243            }
244            fn name(&self) -> &'static str {
245                "postgres"
246            }
247        }
248
249        let factory = PostgresFactory;
250        let pg_config = DatasourceConfig {
251            db_url: "postgresql://localhost/mydb".to_string(),
252            provider: None,
253            max_connections: None,
254            min_connections: None,
255            idle_timeout_secs: None,
256            max_lifetime_secs: None,
257            ssl_mode: None,
258            ssl_root_cert: None,
259            ssl_cert: None,
260            ssl_key: None,
261            extra: HashMap::new(),
262        };
263        assert!(factory.matches(&pg_config));
264
265        let mysql_config = DatasourceConfig {
266            db_url: "mysql://localhost/mydb".to_string(),
267            provider: None,
268            max_connections: None,
269            min_connections: None,
270            idle_timeout_secs: None,
271            max_lifetime_secs: None,
272            ssl_mode: None,
273            ssl_root_cert: None,
274            ssl_cert: None,
275            ssl_key: None,
276            extra: HashMap::new(),
277        };
278        assert!(!factory.matches(&mysql_config));
279    }
280
281    #[test]
282    fn datasource_config_extra_defaults_empty() {
283        let config = DatasourceConfig {
284            db_url: "ws://localhost:8000".to_string(),
285            provider: None,
286            max_connections: None,
287            min_connections: None,
288            idle_timeout_secs: None,
289            max_lifetime_secs: None,
290            ssl_mode: None,
291            ssl_root_cert: None,
292            ssl_cert: None,
293            ssl_key: None,
294            extra: HashMap::new(),
295        };
296        assert!(config.extra.is_empty());
297    }
298
299    #[test]
300    fn datasource_config_extra_deserializes_from_toml() {
301        let toml_str = r#"
302db_url = "ws://localhost:8000"
303provider = "surrealdb"
304
305[extra]
306namespace = "camel"
307database = "runtime"
308"#;
309        let config: DatasourceConfig = toml::from_str(toml_str).unwrap(); // allow-unwrap
310        assert_eq!(config.db_url, "ws://localhost:8000");
311        assert_eq!(config.provider.as_deref(), Some("surrealdb"));
312        assert_eq!(config.extra.len(), 2);
313        assert_eq!(
314            config.extra.get("namespace").and_then(|v| v.as_str()),
315            Some("camel")
316        );
317    }
318
319    #[test]
320    fn datasource_config_extra_backward_compat_without_extra_block() {
321        let toml_str = r#"
322db_url = "ws://localhost:8000"
323"#;
324        let config: DatasourceConfig = toml::from_str(toml_str).unwrap(); // allow-unwrap
325        assert_eq!(config.db_url, "ws://localhost:8000");
326        assert!(
327            config.extra.is_empty(),
328            "TOML without [extra] must deserialize with empty extra (#[serde(default)])"
329        );
330    }
331
332    #[test]
333    fn datasource_config_debug_redacts_extra() {
334        let mut extra = HashMap::new();
335        extra.insert(
336            "password".to_string(),
337            toml::Value::String("secret123".to_string()),
338        );
339        let config = DatasourceConfig {
340            db_url: "ws://localhost:8000".to_string(),
341            provider: None,
342            max_connections: None,
343            min_connections: None,
344            idle_timeout_secs: None,
345            max_lifetime_secs: None,
346            ssl_mode: None,
347            ssl_root_cert: None,
348            ssl_cert: None,
349            ssl_key: None,
350            extra,
351        };
352        let debug_str = format!("{:?}", config);
353        assert!(
354            debug_str.contains("[REDACTED]"),
355            "extra should be redacted in Debug: {}",
356            debug_str
357        );
358        assert!(
359            !debug_str.contains("secret123"),
360            "password must not appear in Debug: {}",
361            debug_str
362        );
363    }
364}