camel_component_sql/
endpoint.rs1use std::sync::Arc;
2use tokio::sync::OnceCell;
3
4use camel_component_api::{BodyType, BoxProcessor, CamelError};
5use camel_component_api::{Consumer, Endpoint, ProducerContext};
6use sqlx::AnyPool;
7
8use crate::config::SqlEndpointConfig;
9use crate::consumer::SqlConsumer;
10use crate::producer::SqlProducer;
11
12pub(crate) struct SqlEndpoint {
13 uri: String,
14 pub(crate) config: SqlEndpointConfig,
15 pub(crate) pool: Arc<OnceCell<AnyPool>>,
16}
17
18impl SqlEndpoint {
19 pub fn new(uri: String, config: SqlEndpointConfig) -> Self {
20 Self {
21 uri,
22 config,
23 pool: Arc::new(OnceCell::new()),
24 }
25 }
26}
27
28impl Endpoint for SqlEndpoint {
29 fn uri(&self) -> &str {
30 &self.uri
31 }
32
33 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
34 Ok(BoxProcessor::new(SqlProducer::new(
35 self.config.clone(),
36 Arc::clone(&self.pool),
37 )))
38 }
39
40 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
41 Ok(Box::new(SqlConsumer::new(
42 self.config.clone(),
43 Arc::clone(&self.pool),
44 )))
45 }
46
47 fn body_contract(&self) -> Option<BodyType> {
48 if self.config.use_message_body_for_sql {
49 Some(BodyType::Text)
50 } else {
51 None
52 }
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use super::*;
59 use crate::config::SqlEndpointConfig;
60 use camel_component_api::Endpoint;
61 use camel_component_api::UriConfig;
62
63 fn make_endpoint(use_body: bool) -> SqlEndpoint {
64 let mut config =
65 SqlEndpointConfig::from_uri("sql:select 1?db_url=postgres://localhost/test")
66 .expect("valid SQL endpoint config");
67 config.use_message_body_for_sql = use_body;
68
69 SqlEndpoint::new("sql:select 1".to_string(), config)
70 }
71
72 #[test]
73 fn body_contract_returns_some_text_when_body_mode_enabled() {
74 let endpoint = make_endpoint(true);
75 assert_eq!(endpoint.body_contract(), Some(BodyType::Text));
76 }
77
78 #[test]
79 fn body_contract_returns_none_when_body_mode_disabled() {
80 let endpoint = make_endpoint(false);
81 assert_eq!(endpoint.body_contract(), None);
82 }
83}