Skip to main content

camel_component_sql/
endpoint.rs

1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_component_api::{BodyType, BoxProcessor, CamelError};
5use camel_component_api::{Consumer, Endpoint, ProducerContext};
6use sqlx::AnyPool;
7
8use crate::config::SqlEndpointConfig;
9use crate::consumer::SqlConsumer;
10use crate::producer::SqlProducer;
11
12pub(crate) struct SqlEndpoint {
13    uri: String,
14    pub(crate) config: SqlEndpointConfig,
15    pub(crate) pool: Arc<OnceCell<AnyPool>>,
16}
17
18impl SqlEndpoint {
19    #[cfg(test)]
20    pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
21        Self::new_with_pool(uri, config, Arc::new(OnceCell::new()))
22    }
23
24    pub fn new_with_pool(
25        uri: String,
26        config: SqlEndpointConfig,
27        pool: Arc<OnceCell<AnyPool>>,
28    ) -> Self {
29        Self { uri, config, pool }
30    }
31}
32
33impl Endpoint for SqlEndpoint {
34    fn uri(&self) -> &str {
35        &self.uri
36    }
37
38    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
39        Ok(BoxProcessor::new(SqlProducer::new(
40            self.config.clone(),
41            Arc::clone(&self.pool),
42        )))
43    }
44
45    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
46        Ok(Box::new(SqlConsumer::new(
47            self.config.clone(),
48            Arc::clone(&self.pool),
49        )))
50    }
51
52    fn body_contract(&self) -> Option<BodyType> {
53        if self.config.use_message_body_for_sql {
54            Some(BodyType::Text)
55        } else {
56            None
57        }
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    use super::*;
64    use crate::config::SqlEndpointConfig;
65    use camel_component_api::Endpoint;
66    use camel_component_api::UriConfig;
67
68    fn make_endpoint(use_body: bool) -> SqlEndpoint {
69        let mut config =
70            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
71                .expect("valid SQL endpoint config");
72        config.use_message_body_for_sql = use_body;
73
74        SqlEndpoint::new("sql:select 1".to_string(), config)
75    }
76
77    #[test]
78    fn body_contract_returns_some_text_when_body_mode_enabled() {
79        let endpoint = make_endpoint(true);
80        assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
81    }
82
83    #[test]
84    fn body_contract_returns_none_when_body_mode_disabled() {
85        let endpoint = make_endpoint(false);
86        assert_eq!(endpoint.body_contract(), None);
87    }
88
89    #[test]
90    fn new_stores_uri_and_config() {
91        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
92            .expect("valid config");
93        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config.clone());
94        assert_eq!(endpoint.uri(), "sql:select 1");
95        assert_eq!(endpoint.config.db_url, "postgres://localhost/test");
96    }
97
98    #[test]
99    fn uri_returns_stored_uri() {
100        let endpoint = make_endpoint(false);
101        assert_eq!(endpoint.uri(), "sql:select 1");
102    }
103
104    #[test]
105    fn create_producer_returns_ok() {
106        let endpoint = make_endpoint(false);
107        let ctx = ProducerContext::new();
108        let result = endpoint.create_producer(&ctx);
109        assert!(result.is_ok());
110    }
111
112    #[test]
113    fn create_consumer_returns_ok() {
114        let endpoint = make_endpoint(false);
115        let result = endpoint.create_consumer();
116        assert!(result.is_ok());
117    }
118
119    #[test]
120    fn pool_is_shared_via_arc() {
121        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
122            .expect("valid config");
123        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config);
124        let pool_ref1 = Arc::clone(&endpoint.pool);
125        let pool_ref2 = Arc::clone(&endpoint.pool);
126        assert!(Arc::ptr_eq(&pool_ref1, &pool_ref2));
127    }
128}