camel_component_surrealdb/
lib.rs1pub mod bundle;
7pub mod config;
8pub mod consumer;
9pub mod endpoint;
10pub mod error;
11pub mod headers;
12pub mod polling;
13pub mod pool_factory;
14pub mod producer;
15pub mod query;
16pub mod vector;
17
18use std::sync::Arc;
19
20use camel_api::datasource::DatasourceCatalog;
21use camel_component_api::{CamelError, Component, ComponentContext, Endpoint};
22
23pub use bundle::SurrealDbBundle;
24pub use config::{SurrealDbEndpointConfig, SurrealDbOperation, VectorMetric};
25pub use error::SurrealDbError;
26pub use pool_factory::redact_db_url;
27
28pub struct SurrealDbComponent {
39 catalog: Option<Arc<dyn DatasourceCatalog>>,
40}
41
42impl SurrealDbComponent {
43 pub fn new() -> Self {
48 Self { catalog: None }
49 }
50
51 pub fn with_catalog(catalog: Arc<dyn DatasourceCatalog>) -> Self {
53 Self {
54 catalog: Some(catalog),
55 }
56 }
57
58 pub fn catalog(&self) -> Option<&Arc<dyn DatasourceCatalog>> {
60 self.catalog.as_ref()
61 }
62}
63
64impl Default for SurrealDbComponent {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl Component for SurrealDbComponent {
71 fn scheme(&self) -> &str {
72 "surrealdb"
73 }
74
75 fn create_endpoint(
76 &self,
77 uri: &str,
78 _ctx: &dyn ComponentContext,
79 ) -> Result<Box<dyn Endpoint>, CamelError> {
80 let config = SurrealDbEndpointConfig::from_uri(uri)?;
82
83 config.validate().map_err(|e| {
85 CamelError::EndpointCreationFailed(format!("config validation failed: {e}"))
86 })?;
87
88 if !config.datasource.is_empty() {
90 let catalog = self.catalog.as_ref().ok_or_else(|| {
91 CamelError::EndpointCreationFailed(
92 "datasource requires catalog — no datasource catalog configured".into(),
93 )
94 })?;
95
96 let ds_config = catalog.get_config(&config.datasource).ok_or_else(|| {
97 CamelError::EndpointCreationFailed(format!(
98 "datasource '{}' not found in catalog",
99 config.datasource
100 ))
101 })?;
102
103 if config.operation == crate::config::SurrealDbOperation::Live {
105 let url = &ds_config.db_url;
106 if !url.starts_with("ws://") && !url.starts_with("wss://") {
107 return Err(CamelError::from(SurrealDbError::LiveRequiresWebSocket(
108 redact_db_url(url),
109 )));
110 }
111 }
112
113 return Ok(Box::new(endpoint::SurrealDbEndpoint::new(
115 uri.to_string(),
116 config,
117 self.catalog.clone(),
118 )));
119 }
120
121 Ok(Box::new(endpoint::SurrealDbEndpoint::new(
123 uri.to_string(),
124 config,
125 self.catalog.clone(),
126 )))
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use camel_component_api::NoOpComponentContext;
134
135 #[test]
136 fn component_scheme_is_surrealdb() {
137 let c = SurrealDbComponent::new();
138 assert_eq!(c.scheme(), "surrealdb");
139 }
140
141 #[test]
142 fn component_default_is_new() {
143 let c1 = SurrealDbComponent::default();
144 let c2 = SurrealDbComponent::new();
145 assert!(c1.catalog.is_none());
146 assert!(c2.catalog.is_none());
147 }
148
149 #[test]
150 fn component_rejects_invalid_uri() {
151 let c = SurrealDbComponent::new();
152 let ctx = NoOpComponentContext;
153 let result = c.create_endpoint("invalid", &ctx);
154 assert!(result.is_err());
155 }
156
157 #[test]
158 fn component_rejects_missing_datasource_in_uri() {
159 let c = SurrealDbComponent::new();
160 let ctx = NoOpComponentContext;
161 let result = c.create_endpoint("surrealdb:query", &ctx);
162 assert!(result.is_err());
164 }
165
166 #[test]
167 fn component_create_endpoint_succeeds_with_catalog() {
168 use camel_api::datasource::DatasourceConfig;
170
171 struct TestCatalog;
172 impl camel_api::datasource::DatasourceCatalog for TestCatalog {
173 fn get_config(&self, name: &str) -> Option<DatasourceConfig> {
174 if name == "main" {
175 Some(DatasourceConfig {
176 db_url: "ws://localhost:8000".into(),
177 provider: Some("surrealdb".into()),
178 max_connections: None,
179 min_connections: None,
180 idle_timeout_secs: None,
181 max_lifetime_secs: None,
182 ssl_mode: None,
183 ssl_root_cert: None,
184 ssl_cert: None,
185 ssl_key: None,
186 extra: std::collections::HashMap::new(),
187 })
188 } else {
189 None
190 }
191 }
192 fn get_pool<'a>(&'a self, _name: &'a str) -> camel_api::datasource::GetPoolFuture<'a> {
193 Box::pin(async { Err(CamelError::Config("no pool".into())) })
194 }
195 fn register_factory(
196 &self,
197 _kind: &str,
198 _factory: Arc<dyn camel_api::datasource::PoolFactory>,
199 ) -> Result<(), CamelError> {
200 Ok(())
201 }
202 }
203
204 let c = SurrealDbComponent::with_catalog(Arc::new(TestCatalog));
205 let ctx = NoOpComponentContext;
206 let result = c.create_endpoint("surrealdb:query?datasource=main", &ctx);
207 assert!(result.is_ok());
208 let endpoint = result.unwrap();
209 assert_eq!(endpoint.uri(), "surrealdb:query?datasource=main");
210 }
211
212 #[test]
213 fn component_with_catalog_survives_construction() {
214 use camel_api::datasource::DatasourceConfig;
215
216 struct TestCatalog;
217 impl camel_api::datasource::DatasourceCatalog for TestCatalog {
218 fn get_config(&self, _name: &str) -> Option<DatasourceConfig> {
219 None
220 }
221 fn get_pool<'a>(&'a self, _name: &'a str) -> camel_api::datasource::GetPoolFuture<'a> {
222 Box::pin(async { Err(CamelError::Config("no pool".into())) })
223 }
224 fn register_factory(
225 &self,
226 _kind: &str,
227 _factory: Arc<dyn camel_api::datasource::PoolFactory>,
228 ) -> Result<(), CamelError> {
229 Ok(())
230 }
231 }
232
233 let c = SurrealDbComponent::with_catalog(Arc::new(TestCatalog));
234 assert!(c.catalog().is_some());
235 }
236}