Skip to main content

camel_component_sql/
endpoint.rs

1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_component_api::{BodyType, BoxProcessor, CamelError};
5use camel_component_api::{Consumer, Endpoint, ProducerContext};
6use sqlx::AnyPool;
7
8use crate::config::SqlEndpointConfig;
9use crate::consumer::SqlConsumer;
10use crate::producer::SqlProducer;
11
12pub(crate) struct SqlEndpoint {
13    uri: String,
14    pub(crate) config: SqlEndpointConfig,
15    pub(crate) pool: Arc<OnceCell<AnyPool>>,
16}
17
18impl SqlEndpoint {
19    pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
20        Self {
21            uri,
22            config,
23            pool: Arc::new(OnceCell::new()),
24        }
25    }
26}
27
28impl Endpoint for SqlEndpoint {
29    fn uri(&self) -> &str {
30        &self.uri
31    }
32
33    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
34        Ok(BoxProcessor::new(SqlProducer::new(
35            self.config.clone(),
36            Arc::clone(&self.pool),
37        )))
38    }
39
40    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
41        Ok(Box::new(SqlConsumer::new(
42            self.config.clone(),
43            Arc::clone(&self.pool),
44        )))
45    }
46
47    fn body_contract(&self) -> Option<BodyType> {
48        if self.config.use_message_body_for_sql {
49            Some(BodyType::Text)
50        } else {
51            None
52        }
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59    use crate::config::SqlEndpointConfig;
60    use camel_component_api::Endpoint;
61    use camel_component_api::UriConfig;
62
63    fn make_endpoint(use_body: bool) -> SqlEndpoint {
64        let mut config =
65            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
66                .expect("valid SQL endpoint config");
67        config.use_message_body_for_sql = use_body;
68
69        SqlEndpoint::new("sql:select 1".to_string(), config)
70    }
71
72    #[test]
73    fn body_contract_returns_some_text_when_body_mode_enabled() {
74        let endpoint = make_endpoint(true);
75        assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
76    }
77
78    #[test]
79    fn body_contract_returns_none_when_body_mode_disabled() {
80        let endpoint = make_endpoint(false);
81        assert_eq!(endpoint.body_contract(), None);
82    }
83
84    #[test]
85    fn new_stores_uri_and_config() {
86        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
87            .expect("valid config");
88        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config.clone());
89        assert_eq!(endpoint.uri(), "sql:select 1");
90        assert_eq!(endpoint.config.db_url, "postgres://localhost/test");
91    }
92
93    #[test]
94    fn uri_returns_stored_uri() {
95        let endpoint = make_endpoint(false);
96        assert_eq!(endpoint.uri(), "sql:select 1");
97    }
98
99    #[test]
100    fn create_producer_returns_ok() {
101        let endpoint = make_endpoint(false);
102        let ctx = ProducerContext::new();
103        let result = endpoint.create_producer(&ctx);
104        assert!(result.is_ok());
105    }
106
107    #[test]
108    fn create_consumer_returns_ok() {
109        let endpoint = make_endpoint(false);
110        let result = endpoint.create_consumer();
111        assert!(result.is_ok());
112    }
113
114    #[test]
115    fn pool_is_shared_via_arc() {
116        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
117            .expect("valid config");
118        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config);
119        let pool_ref1 = Arc::clone(&endpoint.pool);
120        let pool_ref2 = Arc::clone(&endpoint.pool);
121        assert!(Arc::ptr_eq(&pool_ref1, &pool_ref2));
122    }
123}