1use core::fmt::{self, Debug};
31
32extern crate alloc;
33
34use alloc::{
35 string::{String, ToString},
36 sync::Arc,
37 vec::Vec,
38};
39
40#[cfg(feature = "std")]
41use alloc::format;
42
43use crate::DbResult;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum SerializeError {
51 BufferTooSmall,
53
54 TypeMismatch,
56
57 InvalidData,
59}
60
61#[cfg(feature = "defmt")]
62impl defmt::Format for SerializeError {
63 fn format(&self, f: defmt::Formatter) {
64 match self {
65 Self::BufferTooSmall => defmt::write!(f, "BufferTooSmall"),
66 Self::TypeMismatch => defmt::write!(f, "TypeMismatch"),
67 Self::InvalidData => defmt::write!(f, "InvalidData"),
68 }
69 }
70}
71
72#[cfg(feature = "std")]
73impl std::fmt::Display for SerializeError {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 match self {
76 Self::BufferTooSmall => write!(f, "Output buffer too small"),
77 Self::TypeMismatch => write!(f, "Type mismatch in serializer"),
78 Self::InvalidData => write!(f, "Invalid data for serialization"),
79 }
80 }
81}
82
83#[cfg(feature = "std")]
84impl std::error::Error for SerializeError {}
85
86pub type SerializerFn =
103 Arc<dyn Fn(&dyn core::any::Any) -> Result<Vec<u8>, SerializeError> + Send + Sync>;
104
105#[derive(Clone, Debug, PartialEq)]
113pub struct ConnectorUrl {
114 pub scheme: String,
116
117 pub host: String,
119
120 pub port: Option<u16>,
122
123 pub path: Option<String>,
125
126 pub username: Option<String>,
128
129 pub password: Option<String>,
131
132 pub query_params: Vec<(String, String)>,
134}
135
136impl ConnectorUrl {
137 pub fn parse(url: &str) -> DbResult<Self> {
162 parse_connector_url(url)
163 }
164
165 pub fn default_port(&self) -> Option<u16> {
167 match self.scheme.as_str() {
168 "mqtt" | "ws" => Some(1883),
169 "mqtts" | "wss" => Some(8883),
170 "kafka" => Some(9092),
171 "http" => Some(80),
172 "https" => Some(443),
173 _ => None,
174 }
175 }
176
177 pub fn effective_port(&self) -> Option<u16> {
179 self.port.or_else(|| self.default_port())
180 }
181
182 pub fn is_secure(&self) -> bool {
184 matches!(self.scheme.as_str(), "mqtts" | "https" | "wss")
185 }
186
187 pub fn scheme(&self) -> &str {
189 &self.scheme
190 }
191
192 pub fn path(&self) -> &str {
194 self.path.as_deref().unwrap_or("/")
195 }
196}
197
198impl fmt::Display for ConnectorUrl {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 write!(f, "{}://", self.scheme)?;
201
202 if let Some(ref username) = self.username {
203 write!(f, "{}", username)?;
204 if self.password.is_some() {
205 write!(f, ":****")?; }
207 write!(f, "@")?;
208 }
209
210 write!(f, "{}", self.host)?;
211
212 if let Some(port) = self.port {
213 write!(f, ":{}", port)?;
214 }
215
216 if let Some(ref path) = self.path {
217 if !path.starts_with('/') {
218 write!(f, "/")?;
219 }
220 write!(f, "{}", path)?;
221 }
222
223 Ok(())
224 }
225}
226
227#[derive(Clone)]
239pub enum ConnectorClient {
240 Mqtt(Arc<dyn core::any::Any + Send + Sync>),
242
243 Kafka(Arc<dyn core::any::Any + Send + Sync>),
245
246 Http(Arc<dyn core::any::Any + Send + Sync>),
248
249 Generic {
251 protocol: String,
252 client: Arc<dyn core::any::Any + Send + Sync>,
253 },
254}
255
256impl Debug for ConnectorClient {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 match self {
259 ConnectorClient::Mqtt(_) => write!(f, "ConnectorClient::Mqtt(..)"),
260 ConnectorClient::Kafka(_) => write!(f, "ConnectorClient::Kafka(..)"),
261 ConnectorClient::Http(_) => write!(f, "ConnectorClient::Http(..)"),
262 ConnectorClient::Generic { protocol, .. } => {
263 write!(f, "ConnectorClient::Generic({})", protocol)
264 }
265 }
266 }
267}
268
269impl ConnectorClient {
270 pub fn downcast_ref<T: 'static>(&self) -> Option<&T> {
282 match self {
283 ConnectorClient::Mqtt(arc) => arc.downcast_ref::<T>(),
284 ConnectorClient::Kafka(arc) => arc.downcast_ref::<T>(),
285 ConnectorClient::Http(arc) => arc.downcast_ref::<T>(),
286 ConnectorClient::Generic { client, .. } => client.downcast_ref::<T>(),
287 }
288 }
289}
290
291#[derive(Clone)]
296pub struct ConnectorLink {
297 pub url: ConnectorUrl,
299
300 pub config: Vec<(String, String)>,
302
303 pub serializer: Option<SerializerFn>,
312}
313
314impl Debug for ConnectorLink {
315 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
316 f.debug_struct("ConnectorLink")
317 .field("url", &self.url)
318 .field("config", &self.config)
319 .field(
320 "serializer",
321 &self.serializer.as_ref().map(|_| "<function>"),
322 )
323 .finish()
324 }
325}
326
327impl ConnectorLink {
328 pub fn new(url: ConnectorUrl) -> Self {
330 Self {
331 url,
332 config: Vec::new(),
333 serializer: None,
334 }
335 }
336
337 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
339 self.config.push((key.into(), value.into()));
340 self
341 }
342}
343
344fn parse_connector_url(url: &str) -> DbResult<ConnectorUrl> {
349 use crate::DbError;
350
351 let (scheme, rest) = url.split_once("://").ok_or({
353 #[cfg(feature = "std")]
354 {
355 DbError::InvalidOperation {
356 operation: "parse_connector_url".into(),
357 reason: format!("Missing scheme in URL: {}", url),
358 }
359 }
360 #[cfg(not(feature = "std"))]
361 {
362 DbError::InvalidOperation {
363 _operation: (),
364 _reason: (),
365 }
366 }
367 })?;
368
369 let (credentials, host_part) = if let Some(at_idx) = rest.find('@') {
371 let creds = &rest[..at_idx];
372 let host = &rest[at_idx + 1..];
373 (Some(creds), host)
374 } else {
375 (None, rest)
376 };
377
378 let (username, password) = if let Some(creds) = credentials {
379 if let Some((user, pass)) = creds.split_once(':') {
380 (Some(user.to_string()), Some(pass.to_string()))
381 } else {
382 (Some(creds.to_string()), None)
383 }
384 } else {
385 (None, None)
386 };
387
388 let (host_port, path, query_params) = if let Some(slash_idx) = host_part.find('/') {
390 let hp = &host_part[..slash_idx];
391 let path_query = &host_part[slash_idx..];
392
393 let (path_part, query_part) = if let Some(q_idx) = path_query.find('?') {
395 (&path_query[..q_idx], Some(&path_query[q_idx + 1..]))
396 } else {
397 (path_query, None)
398 };
399
400 let params = if let Some(query) = query_part {
402 query
403 .split('&')
404 .filter_map(|pair| {
405 let (k, v) = pair.split_once('=')?;
406 Some((k.to_string(), v.to_string()))
407 })
408 .collect()
409 } else {
410 Vec::new()
411 };
412
413 (hp, Some(path_part.to_string()), params)
414 } else {
415 (host_part, None, Vec::new())
416 };
417
418 let (host, port) = if let Some(colon_idx) = host_port.rfind(':') {
420 let h = &host_port[..colon_idx];
421 let p = &host_port[colon_idx + 1..];
422 let port_num = p.parse::<u16>().ok();
423 (h.to_string(), port_num)
424 } else {
425 (host_port.to_string(), None)
426 };
427
428 Ok(ConnectorUrl {
429 scheme: scheme.to_string(),
430 host,
431 port,
432 path,
433 username,
434 password,
435 query_params,
436 })
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use alloc::format;
443
444 #[test]
445 fn test_parse_simple_mqtt() {
446 let url = ConnectorUrl::parse("mqtt://broker.example.com:1883").unwrap();
447 assert_eq!(url.scheme, "mqtt");
448 assert_eq!(url.host, "broker.example.com");
449 assert_eq!(url.port, Some(1883));
450 assert_eq!(url.username, None);
451 assert_eq!(url.password, None);
452 }
453
454 #[test]
455 fn test_parse_mqtt_with_credentials() {
456 let url = ConnectorUrl::parse("mqtt://user:pass@broker.example.com:1883").unwrap();
457 assert_eq!(url.scheme, "mqtt");
458 assert_eq!(url.host, "broker.example.com");
459 assert_eq!(url.port, Some(1883));
460 assert_eq!(url.username, Some("user".to_string()));
461 assert_eq!(url.password, Some("pass".to_string()));
462 }
463
464 #[test]
465 fn test_parse_https_with_path() {
466 let url = ConnectorUrl::parse("https://api.example.com:8443/events").unwrap();
467 assert_eq!(url.scheme, "https");
468 assert_eq!(url.host, "api.example.com");
469 assert_eq!(url.port, Some(8443));
470 assert_eq!(url.path, Some("/events".to_string()));
471 }
472
473 #[test]
474 fn test_parse_with_query_params() {
475 let url = ConnectorUrl::parse("http://api.example.com/data?key=value&foo=bar").unwrap();
476 assert_eq!(url.scheme, "http");
477 assert_eq!(url.host, "api.example.com");
478 assert_eq!(url.path, Some("/data".to_string()));
479 assert_eq!(url.query_params.len(), 2);
480 assert_eq!(
481 url.query_params[0],
482 ("key".to_string(), "value".to_string())
483 );
484 assert_eq!(url.query_params[1], ("foo".to_string(), "bar".to_string()));
485 }
486
487 #[test]
488 fn test_default_ports() {
489 let mqtt = ConnectorUrl::parse("mqtt://broker.local").unwrap();
490 assert_eq!(mqtt.default_port(), Some(1883));
491 assert_eq!(mqtt.effective_port(), Some(1883));
492
493 let https = ConnectorUrl::parse("https://api.example.com").unwrap();
494 assert_eq!(https.default_port(), Some(443));
495 assert_eq!(https.effective_port(), Some(443));
496 }
497
498 #[test]
499 fn test_is_secure() {
500 assert!(ConnectorUrl::parse("mqtts://broker.local")
501 .unwrap()
502 .is_secure());
503 assert!(ConnectorUrl::parse("https://api.example.com")
504 .unwrap()
505 .is_secure());
506 assert!(ConnectorUrl::parse("wss://ws.example.com")
507 .unwrap()
508 .is_secure());
509
510 assert!(!ConnectorUrl::parse("mqtt://broker.local")
511 .unwrap()
512 .is_secure());
513 assert!(!ConnectorUrl::parse("http://api.example.com")
514 .unwrap()
515 .is_secure());
516 assert!(!ConnectorUrl::parse("ws://ws.example.com")
517 .unwrap()
518 .is_secure());
519 }
520
521 #[test]
522 fn test_display_hides_password() {
523 let url = ConnectorUrl::parse("mqtt://user:secret@broker.local:1883").unwrap();
524 let display = format!("{}", url);
525 assert!(display.contains("user:****"));
526 assert!(!display.contains("secret"));
527 }
528
529 #[test]
530 fn test_parse_kafka_style() {
531 let url =
532 ConnectorUrl::parse("kafka://broker1.local:9092,broker2.local:9092/my-topic").unwrap();
533 assert_eq!(url.scheme, "kafka");
534 assert!(url.host.contains("broker1.local"));
538 assert!(url.host.contains("broker2.local"));
539 assert_eq!(url.path, Some("/my-topic".to_string()));
540 }
541
542 #[test]
543 fn test_parse_missing_scheme() {
544 let result = ConnectorUrl::parse("broker.example.com:1883");
545 assert!(result.is_err());
546 }
547}