camel_component_sql/
endpoint.rs1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_api::datasource::DatasourceCatalog;
5use camel_component_api::{BodyType, BoxProcessor, CamelError, RuntimeObservability};
6use camel_component_api::{Consumer, Endpoint, ProducerContext};
7use sqlx::AnyPool;
8
9use crate::config::SqlEndpointConfig;
10use crate::consumer::SqlConsumer;
11use crate::producer::SqlProducer;
12
13pub(crate) struct SqlEndpoint {
14 uri: String,
15 pub(crate) config: SqlEndpointConfig,
16 pub(crate) pool: Arc<OnceCell<Arc<AnyPool>>>,
17 pub(crate) catalog: Option<Arc<dyn DatasourceCatalog>>,
18}
19
20impl SqlEndpoint {
21 #[cfg(test)]
22 pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
23 Self::new_with_pool(uri, config, Arc::new(OnceCell::new()))
24 }
25
26 pub fn new_with_pool(
27 uri: String,
28 config: SqlEndpointConfig,
29 pool: Arc<OnceCell<Arc<AnyPool>>>,
30 ) -> Self {
31 Self {
32 uri,
33 config,
34 pool,
35 catalog: None,
36 }
37 }
38
39 pub fn new_with_pool_and_catalog(
40 uri: String,
41 config: SqlEndpointConfig,
42 pool: Arc<OnceCell<Arc<AnyPool>>>,
43 catalog: Arc<dyn DatasourceCatalog>,
44 ) -> Self {
45 Self {
46 uri,
47 config,
48 pool,
49 catalog: Some(catalog),
50 }
51 }
52}
53
54impl Endpoint for SqlEndpoint {
55 fn uri(&self) -> &str {
56 &self.uri
57 }
58
59 fn create_producer(
60 &self,
61 rt: Arc<dyn RuntimeObservability>,
62 ctx: &ProducerContext,
63 ) -> Result<BoxProcessor, CamelError> {
64 let route_id = ctx.route_id().unwrap_or("unknown").to_string();
65 Ok(BoxProcessor::new(SqlProducer::new(
66 self.config.clone(),
67 Arc::clone(&self.pool),
68 self.catalog.clone(),
69 rt,
70 route_id,
71 )))
72 }
73
74 fn create_consumer(
75 &self,
76 rt: Arc<dyn RuntimeObservability>,
77 ) -> Result<Box<dyn Consumer>, CamelError> {
78 Ok(Box::new(SqlConsumer::new(
79 self.config.clone(),
80 Arc::clone(&self.pool),
81 self.catalog.clone(),
82 rt,
83 )))
84 }
85
86 fn body_contract(&self) -> Option<BodyType> {
87 if !self.config.use_message_body_for_sql {
88 return None;
89 }
90 if self.config.batch {
91 Some(BodyType::Json)
92 } else {
93 Some(BodyType::Text)
94 }
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use camel_component_api::test_support::PanicRuntimeObservability;
101 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
102 std::sync::Arc::new(PanicRuntimeObservability)
103 }
104 use super::*;
105 use crate::config::SqlEndpointConfig;
106 use camel_component_api::Endpoint;
107 use camel_component_api::UriConfig;
108
109 fn make_endpoint(use_body: bool, batch: bool) -> SqlEndpoint {
110 let mut config =
111 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
112 .expect("valid SQL endpoint config");
113 config.use_message_body_for_sql = use_body;
114 config.batch = batch;
115
116 SqlEndpoint::new("sql:select 1".to_string(), config)
117 }
118
119 #[test]
120 fn body_contract_returns_some_text_when_body_mode_enabled_no_batch() {
121 let endpoint = make_endpoint(true, false);
122 assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
123 }
124
125 #[test]
126 fn body_contract_returns_some_json_when_batch_mode_enabled() {
127 let endpoint = make_endpoint(true, true);
128 assert_eq!(endpoint.body_contract(), Some(BodyType::Json));
129 }
130
131 #[test]
132 fn body_contract_returns_none_when_body_mode_disabled() {
133 let endpoint = make_endpoint(false, false);
134 assert_eq!(endpoint.body_contract(), None);
135 }
136
137 #[test]
138 fn body_contract_returns_none_when_batch_without_body_mode() {
139 let endpoint = make_endpoint(false, true);
140 assert_eq!(endpoint.body_contract(), None);
141 }
142
143 #[test]
144 fn new_stores_uri_and_config() {
145 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
146 .expect("valid config");
147 let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config.clone());
148 assert_eq!(endpoint.uri(), "sql:select 1");
149 assert_eq!(endpoint.config.db_url, "postgres://localhost/test");
150 }
151
152 #[test]
153 fn uri_returns_stored_uri() {
154 let endpoint = make_endpoint(false, false);
155 assert_eq!(endpoint.uri(), "sql:select 1");
156 }
157
158 #[test]
159 fn create_producer_returns_ok() {
160 let endpoint = make_endpoint(false, false);
161 let ctx = ProducerContext::new();
162 let result = endpoint.create_producer(test_rt(), &ctx);
163 assert!(result.is_ok());
164 }
165
166 #[test]
167 fn create_consumer_returns_ok() {
168 let endpoint = make_endpoint(false, false);
169 let result = endpoint.create_consumer(test_rt());
170 assert!(result.is_ok());
171 }
172
173 #[test]
174 fn pool_is_shared_via_arc() {
175 let config = SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
176 .expect("valid config");
177 let endpoint = SqlEndpoint::new("sql:select 1".to_string(), config);
178 let pool_ref1 = Arc::clone(&endpoint.pool);
179 let pool_ref2 = Arc::clone(&endpoint.pool);
180 assert!(Arc::ptr_eq(&pool_ref1, &pool_ref2));
181 }
182}