Skip to main content

camel_component_surrealdb/
lib.rs

1//! camel-component-surrealdb — SurrealDB multi-model database component for rust-camel.
2//!
3//! Provides document, graph, and vector operations through the surrealdb crate.
4//! Integrates with the datasource domain via PoolFactory.
5
6pub mod bundle;
7pub mod config;
8pub mod consumer;
9pub mod endpoint;
10pub mod error;
11pub mod headers;
12pub mod polling;
13pub mod pool_factory;
14pub mod producer;
15pub mod query;
16pub mod vector;
17
18use std::sync::Arc;
19
20use camel_api::datasource::DatasourceCatalog;
21use camel_component_api::{CamelError, Component, ComponentContext, Endpoint};
22
23pub use bundle::SurrealDbBundle;
24pub use config::{SurrealDbEndpointConfig, SurrealDbOperation, VectorMetric};
25pub use error::SurrealDbError;
26pub use pool_factory::redact_db_url;
27
28/// SurrealDB component — factory for `surrealdb:` endpoints.
29///
30/// Supports document, graph, vector, and live query operations. Each
31/// endpoint references a named datasource from the catalog whose connection
32/// string is resolved at endpoint creation time.
33///
34/// When a `DatasourceCatalog` is present, the component resolves datasource
35/// names into connection URLs. The component also validates that `Live`
36/// operations use a WebSocket protocol (ws/wss), since SurrealDB's live
37/// queries require persistent connections.
38pub struct SurrealDbComponent {
39    catalog: Option<Arc<dyn DatasourceCatalog>>,
40}
41
42impl SurrealDbComponent {
43    /// Create a new component without a datasource catalog.
44    ///
45    /// Use this when endpoints reference their datasource directly via
46    /// `db_url` in the config, or when testing.
47    pub fn new() -> Self {
48        Self { catalog: None }
49    }
50
51    /// Attach a datasource catalog for named datasource resolution.
52    pub fn with_catalog(catalog: Arc<dyn DatasourceCatalog>) -> Self {
53        Self {
54            catalog: Some(catalog),
55        }
56    }
57
58    /// Access the optional datasource catalog.
59    pub fn catalog(&self) -> Option<&Arc<dyn DatasourceCatalog>> {
60        self.catalog.as_ref()
61    }
62}
63
64impl Default for SurrealDbComponent {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl Component for SurrealDbComponent {
71    fn scheme(&self) -> &str {
72        "surrealdb"
73    }
74
75    fn create_endpoint(
76        &self,
77        uri: &str,
78        _ctx: &dyn ComponentContext,
79    ) -> Result<Box<dyn Endpoint>, CamelError> {
80        // Step 1: Parse URI into SurrealDbEndpointConfig (validates scheme + operation + params)
81        let config = SurrealDbEndpointConfig::from_uri(uri)?;
82
83        // Step 2: Validate operation-specific required params (fail-fast)
84        config.validate().map_err(|e| {
85            CamelError::EndpointCreationFailed(format!("config validation failed: {e}"))
86        })?;
87
88        // Step 3: If the endpoint references a datasource name, resolve it from catalog
89        if !config.datasource.is_empty() {
90            let catalog = self.catalog.as_ref().ok_or_else(|| {
91                CamelError::EndpointCreationFailed(
92                    "datasource requires catalog — no datasource catalog configured".into(),
93                )
94            })?;
95
96            let ds_config = catalog.get_config(&config.datasource).ok_or_else(|| {
97                CamelError::EndpointCreationFailed(format!(
98                    "datasource '{}' not found in catalog",
99                    config.datasource
100                ))
101            })?;
102
103            // Step 4: LIVE operation requires WebSocket protocol (ws/wss)
104            if config.operation == crate::config::SurrealDbOperation::Live {
105                let url = &ds_config.db_url;
106                if !url.starts_with("ws://") && !url.starts_with("wss://") {
107                    return Err(CamelError::from(SurrealDbError::LiveRequiresWebSocket(
108                        redact_db_url(url),
109                    )));
110                }
111            }
112
113            // Step 5: Return the real endpoint
114            return Ok(Box::new(endpoint::SurrealDbEndpoint::new(
115                uri.to_string(),
116                config,
117                self.catalog.clone(),
118            )));
119        }
120
121        // No datasource name: return endpoint without catalog resolution
122        Ok(Box::new(endpoint::SurrealDbEndpoint::new(
123            uri.to_string(),
124            config,
125            self.catalog.clone(),
126        )))
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use camel_component_api::NoOpComponentContext;
134
135    #[test]
136    fn component_scheme_is_surrealdb() {
137        let c = SurrealDbComponent::new();
138        assert_eq!(c.scheme(), "surrealdb");
139    }
140
141    #[test]
142    fn component_default_is_new() {
143        let c1 = SurrealDbComponent::default();
144        let c2 = SurrealDbComponent::new();
145        assert!(c1.catalog.is_none());
146        assert!(c2.catalog.is_none());
147    }
148
149    #[test]
150    fn component_rejects_invalid_uri() {
151        let c = SurrealDbComponent::new();
152        let ctx = NoOpComponentContext;
153        let result = c.create_endpoint("invalid", &ctx);
154        assert!(result.is_err());
155    }
156
157    #[test]
158    fn component_rejects_missing_datasource_in_uri() {
159        let c = SurrealDbComponent::new();
160        let ctx = NoOpComponentContext;
161        let result = c.create_endpoint("surrealdb:query", &ctx);
162        // from_uri() returns InvalidUri when datasource is missing
163        assert!(result.is_err());
164    }
165
166    #[test]
167    fn component_create_endpoint_succeeds_with_catalog() {
168        // Valid URI + catalog with config → should return Ok with a real endpoint.
169        use camel_api::datasource::DatasourceConfig;
170
171        struct TestCatalog;
172        impl camel_api::datasource::DatasourceCatalog for TestCatalog {
173            fn get_config(&self, name: &str) -> Option<DatasourceConfig> {
174                if name == "main" {
175                    Some(DatasourceConfig {
176                        db_url: "ws://localhost:8000".into(),
177                        provider: Some("surrealdb".into()),
178                        max_connections: None,
179                        min_connections: None,
180                        idle_timeout_secs: None,
181                        max_lifetime_secs: None,
182                        ssl_mode: None,
183                        ssl_root_cert: None,
184                        ssl_cert: None,
185                        ssl_key: None,
186                        extra: std::collections::HashMap::new(),
187                    })
188                } else {
189                    None
190                }
191            }
192            fn get_pool<'a>(&'a self, _name: &'a str) -> camel_api::datasource::GetPoolFuture<'a> {
193                Box::pin(async { Err(CamelError::Config("no pool".into())) })
194            }
195            fn register_factory(
196                &self,
197                _kind: &str,
198                _factory: Arc<dyn camel_api::datasource::PoolFactory>,
199            ) -> Result<(), CamelError> {
200                Ok(())
201            }
202        }
203
204        let c = SurrealDbComponent::with_catalog(Arc::new(TestCatalog));
205        let ctx = NoOpComponentContext;
206        let result = c.create_endpoint("surrealdb:query?datasource=main", &ctx);
207        assert!(result.is_ok());
208        let endpoint = result.unwrap();
209        assert_eq!(endpoint.uri(), "surrealdb:query?datasource=main");
210    }
211
212    #[test]
213    fn component_with_catalog_survives_construction() {
214        use camel_api::datasource::DatasourceConfig;
215
216        struct TestCatalog;
217        impl camel_api::datasource::DatasourceCatalog for TestCatalog {
218            fn get_config(&self, _name: &str) -> Option<DatasourceConfig> {
219                None
220            }
221            fn get_pool<'a>(&'a self, _name: &'a str) -> camel_api::datasource::GetPoolFuture<'a> {
222                Box::pin(async { Err(CamelError::Config("no pool".into())) })
223            }
224            fn register_factory(
225                &self,
226                _kind: &str,
227                _factory: Arc<dyn camel_api::datasource::PoolFactory>,
228            ) -> Result<(), CamelError> {
229                Ok(())
230            }
231        }
232
233        let c = SurrealDbComponent::with_catalog(Arc::new(TestCatalog));
234        assert!(c.catalog().is_some());
235    }
236}