camel_component_sql/
endpoint.rs1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_component_api::{BodyType, BoxProcessor, CamelError, RuntimeObservability};
5use camel_component_api::{Consumer, Endpoint, ProducerContext};
6use sqlx::AnyPool;
7
8use crate::config::SqlEndpointConfig;
9use crate::consumer::SqlConsumer;
10use crate::producer::SqlProducer;
11
12pub(crate) struct SqlEndpoint {
13 uri: String,
14 pub(crate) config: SqlEndpointConfig,
15 pub(crate) pool: Arc<OnceCell<AnyPool>>,
16}
17
18impl SqlEndpoint {
19 #[cfg(test)]
20 pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
21 Self::new_with_pool(uri, config, Arc::new(OnceCell::new()))
22 }
23
24 pub fn new_with_pool(
25 uri: String,
26 config: SqlEndpointConfig,
27 pool: Arc<OnceCell<AnyPool>>,
28 ) -> Self {
29 Self { uri, config, pool }
30 }
31}
32
33impl Endpoint for SqlEndpoint {
34 fn uri(&self) -> &str {
35 &self.uri
36 }
37
38 fn create_producer(
39 &self,
40 rt: Arc<dyn RuntimeObservability>,
41 _ctx: &ProducerContext,
42 ) -> Result<BoxProcessor, CamelError> {
43 Ok(BoxProcessor::new(SqlProducer::new(
44 self.config.clone(),
45 Arc::clone(&self.pool),
46 rt,
47 )))
48 }
49
50 fn create_consumer(
51 &self,
52 rt: Arc<dyn RuntimeObservability>,
53 ) -> Result<Box<dyn Consumer>, CamelError> {
54 Ok(Box::new(SqlConsumer::new(
55 self.config.clone(),
56 Arc::clone(&self.pool),
57 rt,
58 )))
59 }
60
61 fn body_contract(&self) -> Option<BodyType> {
62 if !self.config.use_message_body_for_sql {
63 return None;
64 }
65 if self.config.batch {
66 Some(BodyType::Json)
67 } else {
68 Some(BodyType::Text)
69 }
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use camel_component_api::test_support::PanicRuntimeObservability;
76 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
77 std::sync::Arc::new(PanicRuntimeObservability)
78 }
79 use super::*;
80 use crate::config::SqlEndpointConfig;
81 use camel_component_api::Endpoint;
82 use camel_component_api::UriConfig;
83
84 fn make_endpoint(use_body: bool, batch: bool) -> SqlEndpoint {
85 let mut config =
86 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
87 .expect("valid SQL endpoint config");
88 config.use_message_body_for_sql = use_body;
89 config.batch = batch;
90
91 SqlEndpoint::new("sql:select 1".to_string(), config)
92 }
93
94 #[test]
95 fn body_contract_returns_some_text_when_body_mode_enabled_no_batch() {
96 let endpoint = make_endpoint(true, false);
97 assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
98 }
99
100 #[test]
101 fn body_contract_returns_some_json_when_batch_mode_enabled() {
102 let endpoint = make_endpoint(true, true);
103 assert_eq!(endpoint.body_contract(), Some(BodyType::Json));
104 }
105
106 #[test]
107 fn body_contract_returns_none_when_body_mode_disabled() {
108 let endpoint = make_endpoint(false, false);
109 assert_eq!(endpoint.body_contract(), None);
110 }
111
112 #[test]
113 fn body_contract_returns_none_when_batch_without_body_mode() {
114 let endpoint = make_endpoint(false, true);
115 assert_eq!(endpoint.body_contract(), None);
116 }
117
118 #[test]
119 fn new_stores_uri_and_config() {
120 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
121 .expect("valid config");
122 let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config.clone());
123 assert_eq!(endpoint.uri(), "sql:select 1");
124 assert_eq!(endpoint.config.db_url, "postgres://localhost/test");
125 }
126
127 #[test]
128 fn uri_returns_stored_uri() {
129 let endpoint = make_endpoint(false, false);
130 assert_eq!(endpoint.uri(), "sql:select 1");
131 }
132
133 #[test]
134 fn create_producer_returns_ok() {
135 let endpoint = make_endpoint(false, false);
136 let ctx = ProducerContext::new();
137 let result = endpoint.create_producer(test_rt(), &ctx);
138 assert!(result.is_ok());
139 }
140
141 #[test]
142 fn create_consumer_returns_ok() {
143 let endpoint = make_endpoint(false, false);
144 let result = endpoint.create_consumer(test_rt());
145 assert!(result.is_ok());
146 }
147
148 #[test]
149 fn pool_is_shared_via_arc() {
150 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
151 .expect("valid config");
152 let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config);
153 let pool_ref1 = Arc::clone(&endpoint.pool);
154 let pool_ref2 = Arc::clone(&endpoint.pool);
155 assert!(Arc::ptr_eq(&pool_ref1, &pool_ref2));
156 }
157}