use std::any::Any;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::error::CamelError;
use crate::lifecycle::HealthStatus;
#[derive(Clone, Deserialize, Serialize, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct DatasourceConfig {
pub db_url: String,
#[serde(default)]
pub provider: Option<String>,
#[serde(default)]
pub max_connections: Option<u32>,
#[serde(default)]
pub min_connections: Option<u32>,
#[serde(default)]
pub idle_timeout_secs: Option<u64>,
#[serde(default)]
pub max_lifetime_secs: Option<u64>,
#[serde(default)]
pub ssl_mode: Option<String>,
#[serde(default)]
pub ssl_root_cert: Option<String>,
#[serde(default)]
pub ssl_cert: Option<String>,
#[serde(default)]
pub ssl_key: Option<String>,
}
impl fmt::Debug for DatasourceConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DatasourceConfig")
.field("db_url", &"[REDACTED]")
.field("provider", &self.provider)
.field("max_connections", &self.max_connections)
.field("min_connections", &self.min_connections)
.field("idle_timeout_secs", &self.idle_timeout_secs)
.field("max_lifetime_secs", &self.max_lifetime_secs)
.field("ssl_mode", &self.ssl_mode)
.field("ssl_root_cert", &self.ssl_root_cert)
.field("ssl_cert", &self.ssl_cert)
.field("ssl_key", &self.ssl_key.as_ref().map(|_| "[REDACTED]"))
.finish()
}
}
impl DatasourceConfig {
pub fn validate(&self) -> Result<(), CamelError> {
if self.db_url.trim().is_empty() {
return Err(CamelError::Config(
"datasource db_url cannot be empty".into(),
));
}
Ok(())
}
}
#[derive(Clone)]
pub struct DatasourceHandle {
pub name: String,
pub provider: String,
inner: Arc<dyn Any + Send + Sync>,
}
impl DatasourceHandle {
pub fn new(name: String, provider: String, inner: Arc<dyn Any + Send + Sync>) -> Self {
Self {
name,
provider,
inner,
}
}
pub fn downcast<T: 'static + Send + Sync>(&self) -> Result<Arc<T>, CamelError> {
self.inner.clone().downcast::<T>().map_err(|_| {
CamelError::ProcessorError(format!(
"datasource '{}' (provider '{}'): failed to downcast handle",
self.name, self.provider
))
})
}
}
impl fmt::Debug for DatasourceHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DatasourceHandle")
.field("name", &self.name)
.field("provider", &self.provider)
.finish()
}
}
#[doc(hidden)]
pub struct ResourceRef {
pub kind: String,
pub name: String,
}
impl fmt::Debug for ResourceRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ResourceRef")
.field("kind", &self.kind)
.field("name", &self.name)
.finish()
}
}
pub type CreatePoolResult = Result<Arc<dyn Any + Send + Sync>, CamelError>;
pub type CreatePoolFuture<'a> = Pin<Box<dyn Future<Output = CreatePoolResult> + Send + 'a>>;
pub type CheckFuture<'a> = Pin<Box<dyn Future<Output = HealthStatus> + Send + 'a>>;
pub trait PoolFactory: Send + Sync + 'static {
fn create<'a>(&'a self, config: &'a DatasourceConfig) -> CreatePoolFuture<'a>;
fn check<'a>(&'a self, handle: &'a DatasourceHandle) -> CheckFuture<'a>;
fn supported_schemes(&self) -> &[&str];
fn matches(&self, config: &DatasourceConfig) -> bool {
self.supported_schemes().iter().any(|s| {
config.db_url.starts_with(&format!("{}://", s))
|| config.db_url.starts_with(&format!("{}::", s))
})
}
fn name(&self) -> &'static str;
}
pub type GetPoolFuture<'a> =
Pin<Box<dyn Future<Output = Result<DatasourceHandle, CamelError>> + Send + 'a>>;
pub trait DatasourceCatalog: Send + Sync {
fn get_config(&self, name: &str) -> Option<DatasourceConfig>;
fn get_pool<'a>(&'a self, name: &'a str) -> GetPoolFuture<'a>;
fn register_factory(&self, kind: &str, factory: Arc<dyn PoolFactory>)
-> Result<(), CamelError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn datasource_config_validate_rejects_empty_db_url() {
let config = DatasourceConfig {
db_url: "".to_string(),
provider: None,
max_connections: None,
min_connections: None,
idle_timeout_secs: None,
max_lifetime_secs: None,
ssl_mode: None,
ssl_root_cert: None,
ssl_cert: None,
ssl_key: None,
};
let result = config.validate();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("empty"));
}
#[test]
fn datasource_config_validate_accepts_valid() {
let config = DatasourceConfig {
db_url: "postgresql://localhost:5432/mydb".to_string(),
provider: None,
max_connections: Some(10),
min_connections: Some(2),
idle_timeout_secs: Some(300),
max_lifetime_secs: Some(1800),
ssl_mode: None,
ssl_root_cert: None,
ssl_cert: None,
ssl_key: None,
};
assert!(config.validate().is_ok());
}
#[test]
fn datasource_config_debug_redacts_db_url() {
let config = DatasourceConfig {
db_url: "postgresql://user:pass@localhost:5432/mydb".to_string(),
provider: None,
max_connections: None,
min_connections: None,
idle_timeout_secs: None,
max_lifetime_secs: None,
ssl_mode: None,
ssl_root_cert: None,
ssl_cert: None,
ssl_key: None,
};
let debug_str = format!("{:?}", config);
assert!(
debug_str.contains("[REDACTED]"),
"Debug output should redact db_url: {}",
debug_str
);
assert!(
!debug_str.contains("user:pass"),
"Debug output should not contain credentials: {}",
debug_str
);
}
#[test]
fn datasource_handle_downcast_fails_on_wrong_type() {
let handle = DatasourceHandle::new("test".to_string(), "mock".to_string(), Arc::new(42u32));
let result: Result<Arc<String>, CamelError> = handle.downcast();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("failed to downcast"));
}
#[test]
fn pool_factory_matches_by_scheme() {
struct PostgresFactory;
impl PoolFactory for PostgresFactory {
fn create<'a>(&'a self, _config: &'a DatasourceConfig) -> CreatePoolFuture<'a> {
Box::pin(async { Ok(Arc::new("pool") as Arc<dyn Any + Send + Sync>) })
}
fn check<'a>(&'a self, _handle: &'a DatasourceHandle) -> CheckFuture<'a> {
Box::pin(async { HealthStatus::Healthy })
}
fn supported_schemes(&self) -> &[&str] {
&["postgresql", "postgres"]
}
fn name(&self) -> &'static str {
"postgres"
}
}
let factory = PostgresFactory;
let pg_config = DatasourceConfig {
db_url: "postgresql://localhost/mydb".to_string(),
provider: None,
max_connections: None,
min_connections: None,
idle_timeout_secs: None,
max_lifetime_secs: None,
ssl_mode: None,
ssl_root_cert: None,
ssl_cert: None,
ssl_key: None,
};
assert!(factory.matches(&pg_config));
let mysql_config = DatasourceConfig {
db_url: "mysql://localhost/mydb".to_string(),
provider: None,
max_connections: None,
min_connections: None,
idle_timeout_secs: None,
max_lifetime_secs: None,
ssl_mode: None,
ssl_root_cert: None,
ssl_cert: None,
ssl_key: None,
};
assert!(!factory.matches(&mysql_config));
}
}