Skip to main content

camel_component_sql/
endpoint.rs

1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_api::datasource::DatasourceCatalog;
5use camel_component_api::{BodyType, BoxProcessor, CamelError, RuntimeObservability};
6use camel_component_api::{Consumer, Endpoint, ProducerContext};
7use sqlx::AnyPool;
8
9use crate::config::SqlEndpointConfig;
10use crate::consumer::SqlConsumer;
11use crate::producer::SqlProducer;
12
13pub(crate) struct SqlEndpoint {
14    uri: String,
15    pub(crate) config: SqlEndpointConfig,
16    pub(crate) pool: Arc<OnceCell<Arc<AnyPool>>>,
17    pub(crate) catalog: Option<Arc<dyn DatasourceCatalog>>,
18}
19
20impl SqlEndpoint {
21    #[cfg(test)]
22    pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
23        Self::new_with_pool(uri, config, Arc::new(OnceCell::new()))
24    }
25
26    pub fn new_with_pool(
27        uri: String,
28        config: SqlEndpointConfig,
29        pool: Arc<OnceCell<Arc<AnyPool>>>,
30    ) -> Self {
31        Self {
32            uri,
33            config,
34            pool,
35            catalog: None,
36        }
37    }
38
39    pub fn new_with_pool_and_catalog(
40        uri: String,
41        config: SqlEndpointConfig,
42        pool: Arc<OnceCell<Arc<AnyPool>>>,
43        catalog: Arc<dyn DatasourceCatalog>,
44    ) -> Self {
45        Self {
46            uri,
47            config,
48            pool,
49            catalog: Some(catalog),
50        }
51    }
52}
53
54impl Endpoint for SqlEndpoint {
55    fn uri(&self) -> &str {
56        &self.uri
57    }
58
59    fn create_producer(
60        &self,
61        rt: Arc<dyn RuntimeObservability>,
62        ctx: &ProducerContext,
63    ) -> Result<BoxProcessor, CamelError> {
64        let route_id = ctx.route_id().unwrap_or("unknown").to_string();
65        Ok(BoxProcessor::new(SqlProducer::new(
66            self.config.clone(),
67            Arc::clone(&self.pool),
68            self.catalog.clone(),
69            rt,
70            route_id,
71        )))
72    }
73
74    fn create_consumer(
75        &self,
76        rt: Arc<dyn RuntimeObservability>,
77    ) -> Result<Box<dyn Consumer>, CamelError> {
78        Ok(Box::new(SqlConsumer::new(
79            self.config.clone(),
80            Arc::clone(&self.pool),
81            self.catalog.clone(),
82            rt,
83        )))
84    }
85
86    fn body_contract(&self) -> Option<BodyType> {
87        if !self.config.use_message_body_for_sql {
88            return None;
89        }
90        if self.config.batch {
91            Some(BodyType::Json)
92        } else {
93            Some(BodyType::Text)
94        }
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use camel_component_api::test_support::PanicRuntimeObservability;
101    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
102        std::sync::Arc::new(PanicRuntimeObservability)
103    }
104    use super::*;
105    use crate::config::SqlEndpointConfig;
106    use camel_component_api::Endpoint;
107    use camel_component_api::UriConfig;
108
109    fn make_endpoint(use_body: bool, batch: bool) -> SqlEndpoint {
110        let mut config =
111            SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
112                .expect("valid SQL endpoint config");
113        config.use_message_body_for_sql = use_body;
114        config.batch = batch;
115
116        SqlEndpoint::new("sql:select 1".to_string(), config)
117    }
118
119    #[test]
120    fn body_contract_returns_some_text_when_body_mode_enabled_no_batch() {
121        let endpoint = make_endpoint(true, false);
122        assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
123    }
124
125    #[test]
126    fn body_contract_returns_some_json_when_batch_mode_enabled() {
127        let endpoint = make_endpoint(true, true);
128        assert_eq!(endpoint.body_contract(), Some(BodyType::Json));
129    }
130
131    #[test]
132    fn body_contract_returns_none_when_body_mode_disabled() {
133        let endpoint = make_endpoint(false, false);
134        assert_eq!(endpoint.body_contract(), None);
135    }
136
137    #[test]
138    fn body_contract_returns_none_when_batch_without_body_mode() {
139        let endpoint = make_endpoint(false, true);
140        assert_eq!(endpoint.body_contract(), None);
141    }
142
143    #[test]
144    fn new_stores_uri_and_config() {
145        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
146            .expect("valid config");
147        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config.clone());
148        assert_eq!(endpoint.uri(), "sql:select 1");
149        assert_eq!(endpoint.config.db_url, "postgres://localhost/test");
150    }
151
152    #[test]
153    fn uri_returns_stored_uri() {
154        let endpoint = make_endpoint(false, false);
155        assert_eq!(endpoint.uri(), "sql:select 1");
156    }
157
158    #[test]
159    fn create_producer_returns_ok() {
160        let endpoint = make_endpoint(false, false);
161        let ctx = ProducerContext::new();
162        let result = endpoint.create_producer(test_rt(), &ctx);
163        assert!(result.is_ok());
164    }
165
166    #[test]
167    fn create_consumer_returns_ok() {
168        let endpoint = make_endpoint(false, false);
169        let result = endpoint.create_consumer(test_rt());
170        assert!(result.is_ok());
171    }
172
173    #[test]
174    fn pool_is_shared_via_arc() {
175        let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
176            .expect("valid config");
177        let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config);
178        let pool_ref1 = Arc::clone(&endpoint.pool);
179        let pool_ref2 = Arc::clone(&endpoint.pool);
180        assert!(Arc::ptr_eq(&pool_ref1, &pool_ref2));
181    }
182}