Skip to main content

camel_component_sql/
endpoint.rs

1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_component_api::{BodyType, BoxProcessor, CamelError, RuntimeObservability};
5use camel_component_api::{Consumer, Endpoint, ProducerContext};
6use sqlx::AnyPool;
7
8use crate::config::SqlEndpointConfig;
9use crate::consumer::SqlConsumer;
10use crate::producer::SqlProducer;
11
12pub(crate) struct SqlEndpoint {
13    uri: String,
14    pub(crate) config: SqlEndpointConfig,
15    pub(crate) pool: Arc<OnceCell<AnyPool>>,
16}
17
18impl SqlEndpoint {
19    #[cfg(test)]
20    pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
21        Self::new_with_pool(uri, config, Arc::new(OnceCell::new()))
22    }
23
24    pub fn new_with_pool(
25        uri: String,
26        config: SqlEndpointConfig,
27        pool: Arc<OnceCell<AnyPool>>,
28    ) -> Self {
29        Self { uri, config, pool }
30    }
31}
32
33impl Endpoint for SqlEndpoint {
34    fn uri(&self) -> &str {
35        &self.uri
36    }
37
38    fn create_producer(
39        &self,
40        rt: Arc<dyn RuntimeObservability>,
41        _ctx: &ProducerContext,
42    ) -> Result<BoxProcessor, CamelError> {
43        Ok(BoxProcessor::new(SqlProducer::new(
44            self.config.clone(),
45            Arc::clone(&self.pool),
46            rt,
47        )))
48    }
49
50    fn create_consumer(
51        &self,
52        rt: Arc<dyn RuntimeObservability>,
53    ) -> Result<Box<dyn Consumer>, CamelError> {
54        Ok(Box::new(SqlConsumer::new(
55            self.config.clone(),
56            Arc::clone(&self.pool),
57            rt,
58        )))
59    }
60
61    fn body_contract(&self) -> Option<BodyType> {
62        if !self.config.use_message_body_for_sql {
63            return None;
64        }
65        if self.config.batch {
66            Some(BodyType::Json)
67        } else {
68            Some(BodyType::Text)
69        }
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use camel_component_api::test_support::PanicRuntimeObservability;
76    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
77        std::sync::Arc::new(PanicRuntimeObservability)
78    }
79    use super::*;
80    use crate::config::SqlEndpointConfig;
81    use camel_component_api::Endpoint;
82    use camel_component_api::UriConfig;
83
84    fn make_endpoint(use_body: bool, batch: bool) -> SqlEndpoint {
85        let mut config =
86            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
87                .expect("valid SQL endpoint config");
88        config.use_message_body_for_sql = use_body;
89        config.batch = batch;
90
91        SqlEndpoint::new("sql:select 1".to_string(), config)
92    }
93
94    #[test]
95    fn body_contract_returns_some_text_when_body_mode_enabled_no_batch() {
96        let endpoint = make_endpoint(true, false);
97        assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
98    }
99
100    #[test]
101    fn body_contract_returns_some_json_when_batch_mode_enabled() {
102        let endpoint = make_endpoint(true, true);
103        assert_eq!(endpoint.body_contract(), Some(BodyType::Json));
104    }
105
106    #[test]
107    fn body_contract_returns_none_when_body_mode_disabled() {
108        let endpoint = make_endpoint(false, false);
109        assert_eq!(endpoint.body_contract(), None);
110    }
111
112    #[test]
113    fn body_contract_returns_none_when_batch_without_body_mode() {
114        let endpoint = make_endpoint(false, true);
115        assert_eq!(endpoint.body_contract(), None);
116    }
117
118    #[test]
119    fn new_stores_uri_and_config() {
120        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
121            .expect("valid config");
122        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config.clone());
123        assert_eq!(endpoint.uri(), "sql:select 1");
124        assert_eq!(endpoint.config.db_url, "postgres://localhost/test");
125    }
126
127    #[test]
128    fn uri_returns_stored_uri() {
129        let endpoint = make_endpoint(false, false);
130        assert_eq!(endpoint.uri(), "sql:select 1");
131    }
132
133    #[test]
134    fn create_producer_returns_ok() {
135        let endpoint = make_endpoint(false, false);
136        let ctx = ProducerContext::new();
137        let result = endpoint.create_producer(test_rt(), &ctx);
138        assert!(result.is_ok());
139    }
140
141    #[test]
142    fn create_consumer_returns_ok() {
143        let endpoint = make_endpoint(false, false);
144        let result = endpoint.create_consumer(test_rt());
145        assert!(result.is_ok());
146    }
147
148    #[test]
149    fn pool_is_shared_via_arc() {
150        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
151            .expect("valid config");
152        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config);
153        let pool_ref1 = Arc::clone(&endpoint.pool);
154        let pool_ref2 = Arc::clone(&endpoint.pool);
155        assert!(Arc::ptr_eq(&pool_ref1, &pool_ref2));
156    }
157}