1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use serde::{Deserialize, Serialize};
9
10use crate::error::CamelError;
11use crate::lifecycle::HealthStatus;
12
13#[derive(Clone, Deserialize, Serialize, PartialEq)]
14#[serde(deny_unknown_fields)]
15pub struct DatasourceConfig {
16 pub db_url: String,
17 #[serde(default)]
18 pub provider: Option<String>,
19 #[serde(default)]
20 pub max_connections: Option<u32>,
21 #[serde(default)]
22 pub min_connections: Option<u32>,
23 #[serde(default)]
24 pub idle_timeout_secs: Option<u64>,
25 #[serde(default)]
26 pub max_lifetime_secs: Option<u64>,
27 #[serde(default)]
28 pub ssl_mode: Option<String>,
29 #[serde(default)]
30 pub ssl_root_cert: Option<String>,
31 #[serde(default)]
32 pub ssl_cert: Option<String>,
33 #[serde(default)]
34 pub ssl_key: Option<String>,
35 #[serde(default)]
39 pub extra: HashMap<String, toml::Value>,
40}
41
42impl fmt::Debug for DatasourceConfig {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 f.debug_struct("DatasourceConfig")
45 .field("db_url", &"[REDACTED]")
46 .field("provider", &self.provider)
47 .field("max_connections", &self.max_connections)
48 .field("min_connections", &self.min_connections)
49 .field("idle_timeout_secs", &self.idle_timeout_secs)
50 .field("max_lifetime_secs", &self.max_lifetime_secs)
51 .field("ssl_mode", &self.ssl_mode)
52 .field("ssl_root_cert", &self.ssl_root_cert)
53 .field("ssl_cert", &self.ssl_cert)
54 .field("ssl_key", &self.ssl_key.as_ref().map(|_| "[REDACTED]"))
55 .field("extra", &"[REDACTED]")
56 .finish()
57 }
58}
59
60impl DatasourceConfig {
61 pub fn validate(&self) -> Result<(), CamelError> {
62 if self.db_url.trim().is_empty() {
63 return Err(CamelError::Config(
64 "datasource db_url cannot be empty".into(),
65 ));
66 }
67 Ok(())
68 }
69}
70
71#[derive(Clone)]
72pub struct DatasourceHandle {
73 pub name: String,
74 pub provider: String,
75 inner: Arc<dyn Any + Send + Sync>,
76}
77
78impl DatasourceHandle {
79 pub fn new(name: String, provider: String, inner: Arc<dyn Any + Send + Sync>) -> Self {
80 Self {
81 name,
82 provider,
83 inner,
84 }
85 }
86
87 pub fn downcast<T: 'static + Send + Sync>(&self) -> Result<Arc<T>, CamelError> {
88 self.inner.clone().downcast::<T>().map_err(|_| {
89 CamelError::ProcessorError(format!(
90 "datasource '{}' (provider '{}'): failed to downcast handle",
91 self.name, self.provider
92 ))
93 })
94 }
95}
96
97impl fmt::Debug for DatasourceHandle {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f.debug_struct("DatasourceHandle")
100 .field("name", &self.name)
101 .field("provider", &self.provider)
102 .finish()
103 }
104}
105
106#[doc(hidden)]
107pub struct ResourceRef {
108 pub kind: String,
109 pub name: String,
110}
111
112impl fmt::Debug for ResourceRef {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 f.debug_struct("ResourceRef")
115 .field("kind", &self.kind)
116 .field("name", &self.name)
117 .finish()
118 }
119}
120
121pub type CreatePoolResult = Result<Arc<dyn Any + Send + Sync>, CamelError>;
122pub type CreatePoolFuture<'a> = Pin<Box<dyn Future<Output = CreatePoolResult> + Send + 'a>>;
123pub type CheckFuture<'a> = Pin<Box<dyn Future<Output = HealthStatus> + Send + 'a>>;
124
125pub trait PoolFactory: Send + Sync + 'static {
126 fn create<'a>(&'a self, config: &'a DatasourceConfig) -> CreatePoolFuture<'a>;
127
128 fn check<'a>(&'a self, handle: &'a DatasourceHandle) -> CheckFuture<'a>;
129
130 fn supported_schemes(&self) -> &[&str];
131
132 fn matches(&self, config: &DatasourceConfig) -> bool {
133 self.supported_schemes().iter().any(|s| {
134 config.db_url.starts_with(&format!("{}://", s))
135 || config.db_url.starts_with(&format!("{}::", s))
136 })
137 }
138
139 fn name(&self) -> &'static str;
140}
141
142pub type GetPoolFuture<'a> =
143 Pin<Box<dyn Future<Output = Result<DatasourceHandle, CamelError>> + Send + 'a>>;
144
145pub trait DatasourceCatalog: Send + Sync {
146 fn get_config(&self, name: &str) -> Option<DatasourceConfig>;
147 fn get_pool<'a>(&'a self, name: &'a str) -> GetPoolFuture<'a>;
148 fn register_factory(&self, kind: &str, factory: Arc<dyn PoolFactory>)
149 -> Result<(), CamelError>;
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[test]
157 fn datasource_config_validate_rejects_empty_db_url() {
158 let config = DatasourceConfig {
159 db_url: "".to_string(),
160 provider: None,
161 max_connections: None,
162 min_connections: None,
163 idle_timeout_secs: None,
164 max_lifetime_secs: None,
165 ssl_mode: None,
166 ssl_root_cert: None,
167 ssl_cert: None,
168 ssl_key: None,
169 extra: HashMap::new(),
170 };
171 let result = config.validate();
172 assert!(result.is_err());
173 assert!(result.unwrap_err().to_string().contains("empty"));
174 }
175
176 #[test]
177 fn datasource_config_validate_accepts_valid() {
178 let config = DatasourceConfig {
179 db_url: "postgresql://localhost:5432/mydb".to_string(),
180 provider: None,
181 max_connections: Some(10),
182 min_connections: Some(2),
183 idle_timeout_secs: Some(300),
184 max_lifetime_secs: Some(1800),
185 ssl_mode: None,
186 ssl_root_cert: None,
187 ssl_cert: None,
188 ssl_key: None,
189 extra: HashMap::new(),
190 };
191 assert!(config.validate().is_ok());
192 }
193
194 #[test]
195 fn datasource_config_debug_redacts_db_url() {
196 let config = DatasourceConfig {
197 db_url: "postgresql://user:pass@localhost:5432/mydb".to_string(),
198 provider: None,
199 max_connections: None,
200 min_connections: None,
201 idle_timeout_secs: None,
202 max_lifetime_secs: None,
203 ssl_mode: None,
204 ssl_root_cert: None,
205 ssl_cert: None,
206 ssl_key: None,
207 extra: HashMap::new(),
208 };
209 let debug_str = format!("{:?}", config);
210 assert!(
211 debug_str.contains("[REDACTED]"),
212 "Debug output should redact db_url: {}",
213 debug_str
214 );
215 assert!(
216 !debug_str.contains("user:pass"),
217 "Debug output should not contain credentials: {}",
218 debug_str
219 );
220 }
221
222 #[test]
223 fn datasource_handle_downcast_fails_on_wrong_type() {
224 let handle = DatasourceHandle::new("test".to_string(), "mock".to_string(), Arc::new(42u32));
225 let result: Result<Arc<String>, CamelError> = handle.downcast();
226 assert!(result.is_err());
227 let err = result.unwrap_err();
228 assert!(err.to_string().contains("failed to downcast"));
229 }
230
231 #[test]
232 fn pool_factory_matches_by_scheme() {
233 struct PostgresFactory;
234 impl PoolFactory for PostgresFactory {
235 fn create<'a>(&'a self, _config: &'a DatasourceConfig) -> CreatePoolFuture<'a> {
236 Box::pin(async { Ok(Arc::new("pool") as Arc<dyn Any + Send + Sync>) })
237 }
238 fn check<'a>(&'a self, _handle: &'a DatasourceHandle) -> CheckFuture<'a> {
239 Box::pin(async { HealthStatus::Healthy })
240 }
241 fn supported_schemes(&self) -> &[&str] {
242 &["postgresql", "postgres"]
243 }
244 fn name(&self) -> &'static str {
245 "postgres"
246 }
247 }
248
249 let factory = PostgresFactory;
250 let pg_config = DatasourceConfig {
251 db_url: "postgresql://localhost/mydb".to_string(),
252 provider: None,
253 max_connections: None,
254 min_connections: None,
255 idle_timeout_secs: None,
256 max_lifetime_secs: None,
257 ssl_mode: None,
258 ssl_root_cert: None,
259 ssl_cert: None,
260 ssl_key: None,
261 extra: HashMap::new(),
262 };
263 assert!(factory.matches(&pg_config));
264
265 let mysql_config = DatasourceConfig {
266 db_url: "mysql://localhost/mydb".to_string(),
267 provider: None,
268 max_connections: None,
269 min_connections: None,
270 idle_timeout_secs: None,
271 max_lifetime_secs: None,
272 ssl_mode: None,
273 ssl_root_cert: None,
274 ssl_cert: None,
275 ssl_key: None,
276 extra: HashMap::new(),
277 };
278 assert!(!factory.matches(&mysql_config));
279 }
280
281 #[test]
282 fn datasource_config_extra_defaults_empty() {
283 let config = DatasourceConfig {
284 db_url: "ws://localhost:8000".to_string(),
285 provider: None,
286 max_connections: None,
287 min_connections: None,
288 idle_timeout_secs: None,
289 max_lifetime_secs: None,
290 ssl_mode: None,
291 ssl_root_cert: None,
292 ssl_cert: None,
293 ssl_key: None,
294 extra: HashMap::new(),
295 };
296 assert!(config.extra.is_empty());
297 }
298
299 #[test]
300 fn datasource_config_extra_deserializes_from_toml() {
301 let toml_str = r#"
302db_url = "ws://localhost:8000"
303provider = "surrealdb"
304
305[extra]
306namespace = "camel"
307database = "runtime"
308"#;
309 let config: DatasourceConfig = toml::from_str(toml_str).unwrap(); assert_eq!(config.db_url, "ws://localhost:8000");
311 assert_eq!(config.provider.as_deref(), Some("surrealdb"));
312 assert_eq!(config.extra.len(), 2);
313 assert_eq!(
314 config.extra.get("namespace").and_then(|v| v.as_str()),
315 Some("camel")
316 );
317 }
318
319 #[test]
320 fn datasource_config_extra_backward_compat_without_extra_block() {
321 let toml_str = r#"
322db_url = "ws://localhost:8000"
323"#;
324 let config: DatasourceConfig = toml::from_str(toml_str).unwrap(); assert_eq!(config.db_url, "ws://localhost:8000");
326 assert!(
327 config.extra.is_empty(),
328 "TOML without [extra] must deserialize with empty extra (#[serde(default)])"
329 );
330 }
331
332 #[test]
333 fn datasource_config_debug_redacts_extra() {
334 let mut extra = HashMap::new();
335 extra.insert(
336 "password".to_string(),
337 toml::Value::String("secret123".to_string()),
338 );
339 let config = DatasourceConfig {
340 db_url: "ws://localhost:8000".to_string(),
341 provider: None,
342 max_connections: None,
343 min_connections: None,
344 idle_timeout_secs: None,
345 max_lifetime_secs: None,
346 ssl_mode: None,
347 ssl_root_cert: None,
348 ssl_cert: None,
349 ssl_key: None,
350 extra,
351 };
352 let debug_str = format!("{:?}", config);
353 assert!(
354 debug_str.contains("[REDACTED]"),
355 "extra should be redacted in Debug: {}",
356 debug_str
357 );
358 assert!(
359 !debug_str.contains("secret123"),
360 "password must not appear in Debug: {}",
361 debug_str
362 );
363 }
364}