1use std::any::Any;
2use std::fmt;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6
7use serde::{Deserialize, Serialize};
8
9use crate::error::CamelError;
10use crate::lifecycle::HealthStatus;
11
12#[derive(Clone, Deserialize, Serialize, PartialEq)]
13#[serde(deny_unknown_fields)]
14pub struct DatasourceConfig {
15 pub db_url: String,
16 #[serde(default)]
17 pub provider: Option<String>,
18 #[serde(default)]
19 pub max_connections: Option<u32>,
20 #[serde(default)]
21 pub min_connections: Option<u32>,
22 #[serde(default)]
23 pub idle_timeout_secs: Option<u64>,
24 #[serde(default)]
25 pub max_lifetime_secs: Option<u64>,
26 #[serde(default)]
27 pub ssl_mode: Option<String>,
28 #[serde(default)]
29 pub ssl_root_cert: Option<String>,
30 #[serde(default)]
31 pub ssl_cert: Option<String>,
32 #[serde(default)]
33 pub ssl_key: Option<String>,
34}
35
36impl fmt::Debug for DatasourceConfig {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("DatasourceConfig")
39 .field("db_url", &"[REDACTED]")
40 .field("provider", &self.provider)
41 .field("max_connections", &self.max_connections)
42 .field("min_connections", &self.min_connections)
43 .field("idle_timeout_secs", &self.idle_timeout_secs)
44 .field("max_lifetime_secs", &self.max_lifetime_secs)
45 .field("ssl_mode", &self.ssl_mode)
46 .field("ssl_root_cert", &self.ssl_root_cert)
47 .field("ssl_cert", &self.ssl_cert)
48 .field("ssl_key", &self.ssl_key.as_ref().map(|_| "[REDACTED]"))
49 .finish()
50 }
51}
52
53impl DatasourceConfig {
54 pub fn validate(&self) -> Result<(), CamelError> {
55 if self.db_url.trim().is_empty() {
56 return Err(CamelError::Config(
57 "datasource db_url cannot be empty".into(),
58 ));
59 }
60 Ok(())
61 }
62}
63
64#[derive(Clone)]
65pub struct DatasourceHandle {
66 pub name: String,
67 pub provider: String,
68 inner: Arc<dyn Any + Send + Sync>,
69}
70
71impl DatasourceHandle {
72 pub fn new(name: String, provider: String, inner: Arc<dyn Any + Send + Sync>) -> Self {
73 Self {
74 name,
75 provider,
76 inner,
77 }
78 }
79
80 pub fn downcast<T: 'static + Send + Sync>(&self) -> Result<Arc<T>, CamelError> {
81 self.inner.clone().downcast::<T>().map_err(|_| {
82 CamelError::ProcessorError(format!(
83 "datasource '{}' (provider '{}'): failed to downcast handle",
84 self.name, self.provider
85 ))
86 })
87 }
88}
89
90impl fmt::Debug for DatasourceHandle {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("DatasourceHandle")
93 .field("name", &self.name)
94 .field("provider", &self.provider)
95 .finish()
96 }
97}
98
99#[doc(hidden)]
100pub struct ResourceRef {
101 pub kind: String,
102 pub name: String,
103}
104
105impl fmt::Debug for ResourceRef {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 f.debug_struct("ResourceRef")
108 .field("kind", &self.kind)
109 .field("name", &self.name)
110 .finish()
111 }
112}
113
114pub type CreatePoolResult = Result<Arc<dyn Any + Send + Sync>, CamelError>;
115pub type CreatePoolFuture<'a> = Pin<Box<dyn Future<Output = CreatePoolResult> + Send + 'a>>;
116pub type CheckFuture<'a> = Pin<Box<dyn Future<Output = HealthStatus> + Send + 'a>>;
117
118pub trait PoolFactory: Send + Sync + 'static {
119 fn create<'a>(&'a self, config: &'a DatasourceConfig) -> CreatePoolFuture<'a>;
120
121 fn check<'a>(&'a self, handle: &'a DatasourceHandle) -> CheckFuture<'a>;
122
123 fn supported_schemes(&self) -> &[&str];
124
125 fn matches(&self, config: &DatasourceConfig) -> bool {
126 self.supported_schemes().iter().any(|s| {
127 config.db_url.starts_with(&format!("{}://", s))
128 || config.db_url.starts_with(&format!("{}::", s))
129 })
130 }
131
132 fn name(&self) -> &'static str;
133}
134
135pub type GetPoolFuture<'a> =
136 Pin<Box<dyn Future<Output = Result<DatasourceHandle, CamelError>> + Send + 'a>>;
137
138pub trait DatasourceCatalog: Send + Sync {
139 fn get_config(&self, name: &str) -> Option<DatasourceConfig>;
140 fn get_pool<'a>(&'a self, name: &'a str) -> GetPoolFuture<'a>;
141 fn register_factory(&self, kind: &str, factory: Arc<dyn PoolFactory>)
142 -> Result<(), CamelError>;
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn datasource_config_validate_rejects_empty_db_url() {
151 let config = DatasourceConfig {
152 db_url: "".to_string(),
153 provider: None,
154 max_connections: None,
155 min_connections: None,
156 idle_timeout_secs: None,
157 max_lifetime_secs: None,
158 ssl_mode: None,
159 ssl_root_cert: None,
160 ssl_cert: None,
161 ssl_key: None,
162 };
163 let result = config.validate();
164 assert!(result.is_err());
165 assert!(result.unwrap_err().to_string().contains("empty"));
166 }
167
168 #[test]
169 fn datasource_config_validate_accepts_valid() {
170 let config = DatasourceConfig {
171 db_url: "postgresql://localhost:5432/mydb".to_string(),
172 provider: None,
173 max_connections: Some(10),
174 min_connections: Some(2),
175 idle_timeout_secs: Some(300),
176 max_lifetime_secs: Some(1800),
177 ssl_mode: None,
178 ssl_root_cert: None,
179 ssl_cert: None,
180 ssl_key: None,
181 };
182 assert!(config.validate().is_ok());
183 }
184
185 #[test]
186 fn datasource_config_debug_redacts_db_url() {
187 let config = DatasourceConfig {
188 db_url: "postgresql://user:pass@localhost:5432/mydb".to_string(),
189 provider: None,
190 max_connections: None,
191 min_connections: None,
192 idle_timeout_secs: None,
193 max_lifetime_secs: None,
194 ssl_mode: None,
195 ssl_root_cert: None,
196 ssl_cert: None,
197 ssl_key: None,
198 };
199 let debug_str = format!("{:?}", config);
200 assert!(
201 debug_str.contains("[REDACTED]"),
202 "Debug output should redact db_url: {}",
203 debug_str
204 );
205 assert!(
206 !debug_str.contains("user:pass"),
207 "Debug output should not contain credentials: {}",
208 debug_str
209 );
210 }
211
212 #[test]
213 fn datasource_handle_downcast_fails_on_wrong_type() {
214 let handle = DatasourceHandle::new("test".to_string(), "mock".to_string(), Arc::new(42u32));
215 let result: Result<Arc<String>, CamelError> = handle.downcast();
216 assert!(result.is_err());
217 let err = result.unwrap_err();
218 assert!(err.to_string().contains("failed to downcast"));
219 }
220
221 #[test]
222 fn pool_factory_matches_by_scheme() {
223 struct PostgresFactory;
224 impl PoolFactory for PostgresFactory {
225 fn create<'a>(&'a self, _config: &'a DatasourceConfig) -> CreatePoolFuture<'a> {
226 Box::pin(async { Ok(Arc::new("pool") as Arc<dyn Any + Send + Sync>) })
227 }
228 fn check<'a>(&'a self, _handle: &'a DatasourceHandle) -> CheckFuture<'a> {
229 Box::pin(async { HealthStatus::Healthy })
230 }
231 fn supported_schemes(&self) -> &[&str] {
232 &["postgresql", "postgres"]
233 }
234 fn name(&self) -> &'static str {
235 "postgres"
236 }
237 }
238
239 let factory = PostgresFactory;
240 let pg_config = DatasourceConfig {
241 db_url: "postgresql://localhost/mydb".to_string(),
242 provider: None,
243 max_connections: None,
244 min_connections: None,
245 idle_timeout_secs: None,
246 max_lifetime_secs: None,
247 ssl_mode: None,
248 ssl_root_cert: None,
249 ssl_cert: None,
250 ssl_key: None,
251 };
252 assert!(factory.matches(&pg_config));
253
254 let mysql_config = DatasourceConfig {
255 db_url: "mysql://localhost/mydb".to_string(),
256 provider: None,
257 max_connections: None,
258 min_connections: None,
259 idle_timeout_secs: None,
260 max_lifetime_secs: None,
261 ssl_mode: None,
262 ssl_root_cert: None,
263 ssl_cert: None,
264 ssl_key: None,
265 };
266 assert!(!factory.matches(&mysql_config));
267 }
268}