camel_component_sql/
endpoint.rs1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_component_api::{BodyType, BoxProcessor, CamelError};
5use camel_component_api::{Consumer, Endpoint, ProducerContext};
6use sqlx::AnyPool;
7
8use crate::config::SqlEndpointConfig;
9use crate::consumer::SqlConsumer;
10use crate::producer::SqlProducer;
11
12pub(crate) struct SqlEndpoint {
13 uri: String,
14 pub(crate) config: SqlEndpointConfig,
15 pub(crate) pool: Arc<OnceCell<AnyPool>>,
16}
17
18impl SqlEndpoint {
19 #[cfg(test)]
20 pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
21 Self::new_with_pool(uri, config, Arc::new(OnceCell::new()))
22 }
23
24 pub fn new_with_pool(
25 uri: String,
26 config: SqlEndpointConfig,
27 pool: Arc<OnceCell<AnyPool>>,
28 ) -> Self {
29 Self { uri, config, pool }
30 }
31}
32
33impl Endpoint for SqlEndpoint {
34 fn uri(&self) -> &str {
35 &self.uri
36 }
37
38 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
39 Ok(BoxProcessor::new(SqlProducer::new(
40 self.config.clone(),
41 Arc::clone(&self.pool),
42 )))
43 }
44
45 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
46 Ok(Box::new(SqlConsumer::new(
47 self.config.clone(),
48 Arc::clone(&self.pool),
49 )))
50 }
51
52 fn body_contract(&self) -> Option<BodyType> {
53 if !self.config.use_message_body_for_sql {
54 return None;
55 }
56 if self.config.batch {
57 Some(BodyType::Json)
58 } else {
59 Some(BodyType::Text)
60 }
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use crate::config::SqlEndpointConfig;
68 use camel_component_api::Endpoint;
69 use camel_component_api::UriConfig;
70
71 fn make_endpoint(use_body: bool, batch: bool) -> SqlEndpoint {
72 let mut config =
73 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
74 .expect("valid SQL endpoint config");
75 config.use_message_body_for_sql = use_body;
76 config.batch = batch;
77
78 SqlEndpoint::new("sql:select 1".to_string(), config)
79 }
80
81 #[test]
82 fn body_contract_returns_some_text_when_body_mode_enabled_no_batch() {
83 let endpoint = make_endpoint(true, false);
84 assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
85 }
86
87 #[test]
88 fn body_contract_returns_some_json_when_batch_mode_enabled() {
89 let endpoint = make_endpoint(true, true);
90 assert_eq!(endpoint.body_contract(), Some(BodyType::Json));
91 }
92
93 #[test]
94 fn body_contract_returns_none_when_body_mode_disabled() {
95 let endpoint = make_endpoint(false, false);
96 assert_eq!(endpoint.body_contract(), None);
97 }
98
99 #[test]
100 fn body_contract_returns_none_when_batch_without_body_mode() {
101 let endpoint = make_endpoint(false, true);
102 assert_eq!(endpoint.body_contract(), None);
103 }
104
105 #[test]
106 fn new_stores_uri_and_config() {
107 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
108 .expect("valid config");
109 let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config.clone());
110 assert_eq!(endpoint.uri(), "sql:select 1");
111 assert_eq!(endpoint.config.db_url, "postgres://localhost/test");
112 }
113
114 #[test]
115 fn uri_returns_stored_uri() {
116 let endpoint = make_endpoint(false, false);
117 assert_eq!(endpoint.uri(), "sql:select 1");
118 }
119
120 #[test]
121 fn create_producer_returns_ok() {
122 let endpoint = make_endpoint(false, false);
123 let ctx = ProducerContext::new();
124 let result = endpoint.create_producer(&ctx);
125 assert!(result.is_ok());
126 }
127
128 #[test]
129 fn create_consumer_returns_ok() {
130 let endpoint = make_endpoint(false, false);
131 let result = endpoint.create_consumer();
132 assert!(result.is_ok());
133 }
134
135 #[test]
136 fn pool_is_shared_via_arc() {
137 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
138 .expect("valid config");
139 let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config);
140 let pool_ref1 = Arc::clone(&endpoint.pool);
141 let pool_ref2 = Arc::clone(&endpoint.pool);
142 assert!(Arc::ptr_eq(&pool_ref1, &pool_ref2));
143 }
144}