1use std::collections::HashSet;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9
10use haystack_client::HaystackClient;
11use haystack_client::transport::http::HttpTransport;
12use haystack_client::transport::ws::WsTransport;
13use haystack_core::data::{HDict, HGrid};
14use haystack_core::kinds::{HRef, Kind};
15
16enum ConnectorClient {
21 Http(HaystackClient<HttpTransport>),
22 Ws(HaystackClient<WsTransport>),
23}
24
25impl ConnectorClient {
26 async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
29 match self {
30 ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
31 ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
32 }
33 }
34
35 async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
37 match self {
38 ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
39 ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
40 }
41 }
42
43 async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
45 match self {
46 ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
47 ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
48 }
49 }
50
51 async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
53 match self {
54 ConnectorClient::Http(c) => c
55 .point_write(id, level, val)
56 .await
57 .map_err(|e| e.to_string()),
58 ConnectorClient::Ws(c) => c
59 .point_write(id, level, val)
60 .await
61 .map_err(|e| e.to_string()),
62 }
63 }
64
65 async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
67 match self {
68 ConnectorClient::Http(c) => c
69 .invoke_action(id, action, args)
70 .await
71 .map_err(|e| e.to_string()),
72 ConnectorClient::Ws(c) => c
73 .invoke_action(id, action, args)
74 .await
75 .map_err(|e| e.to_string()),
76 }
77 }
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82#[repr(u8)]
83pub enum TransportMode {
84 Http = 0,
85 WebSocket = 1,
86}
87
88#[derive(Debug, Clone, serde::Deserialize)]
90pub struct ConnectorConfig {
91 pub name: String,
93 pub url: String,
95 pub username: String,
97 pub password: String,
99 pub id_prefix: Option<String>,
101 pub ws_url: Option<String>,
103 pub sync_interval_secs: Option<u64>,
105 pub client_cert: Option<String>,
107 pub client_key: Option<String>,
109 pub ca_cert: Option<String>,
111}
112
113pub struct Connector {
115 pub config: ConnectorConfig,
116 cache: RwLock<Vec<HDict>>,
118 owned_ids: RwLock<HashSet<String>>,
120 remote_watch_ids: RwLock<HashSet<String>>,
122 transport_mode: AtomicU8,
124 connected: AtomicBool,
126 last_sync: RwLock<Option<DateTime<Utc>>>,
128 client: tokio::sync::RwLock<Option<ConnectorClient>>,
132}
133
134impl Connector {
135 pub fn new(config: ConnectorConfig) -> Self {
137 Self {
138 config,
139 cache: RwLock::new(Vec::new()),
140 owned_ids: RwLock::new(HashSet::new()),
141 remote_watch_ids: RwLock::new(HashSet::new()),
142 transport_mode: AtomicU8::new(TransportMode::Http as u8),
143 connected: AtomicBool::new(false),
144 last_sync: RwLock::new(None),
145 client: tokio::sync::RwLock::new(None),
146 }
147 }
148
149 async fn connect_persistent(&self) -> Result<(), String> {
152 let ws_url = self.config.effective_ws_url();
154 match HaystackClient::connect_ws(
155 &self.config.url,
156 &ws_url,
157 &self.config.username,
158 &self.config.password,
159 )
160 .await
161 {
162 Ok(ws_client) => {
163 *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
164 self.transport_mode
165 .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
166 self.connected.store(true, Ordering::Relaxed);
167 log::info!("Connected to {} via WebSocket", self.config.name);
168 return Ok(());
169 }
170 Err(e) => {
171 log::warn!(
172 "WS connection to {} failed: {}, trying HTTP",
173 self.config.name,
174 e
175 );
176 }
177 }
178
179 match HaystackClient::connect(
181 &self.config.url,
182 &self.config.username,
183 &self.config.password,
184 )
185 .await
186 {
187 Ok(http_client) => {
188 *self.client.write().await = Some(ConnectorClient::Http(http_client));
189 self.transport_mode
190 .store(TransportMode::Http as u8, Ordering::Relaxed);
191 self.connected.store(true, Ordering::Relaxed);
192 log::info!("Connected to {} via HTTP", self.config.name);
193 Ok(())
194 }
195 Err(e) => {
196 self.connected.store(false, Ordering::Relaxed);
197 Err(format!("connection failed: {e}"))
198 }
199 }
200 }
201
202 pub async fn sync(&self) -> Result<usize, String> {
208 if self.client.read().await.is_none() {
210 self.connect_persistent().await?;
211 }
212
213 let grid = {
215 let client = self.client.read().await;
216 let client = client.as_ref().ok_or("not connected")?;
217 client.call("read", &build_read_all_grid()).await
218 };
219
220 let grid = grid.map_err(|e| {
221 self.connected.store(false, Ordering::Relaxed);
222 format!("read failed: {e}")
223 })?;
224
225 let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
226
227 if let Some(ref prefix) = self.config.id_prefix {
229 for entity in &mut entities {
230 prefix_refs(entity, prefix);
231 }
232 }
233
234 let count = entities.len();
235 self.update_cache(entities);
236 *self.last_sync.write() = Some(Utc::now());
237 self.connected.store(true, Ordering::Relaxed);
238 Ok(count)
239 }
240
241 pub fn update_cache(&self, entities: Vec<HDict>) {
246 let ids: HashSet<String> = entities
247 .iter()
248 .filter_map(|e| match e.get("id") {
249 Some(Kind::Ref(r)) => Some(r.val.clone()),
250 _ => None,
251 })
252 .collect();
253 *self.cache.write() = entities;
254 *self.owned_ids.write() = ids;
255 }
256
257 pub fn owns(&self, id: &str) -> bool {
259 self.owned_ids.read().contains(id)
260 }
261
262 pub fn cached_entities(&self) -> Vec<HDict> {
264 self.cache.read().clone()
265 }
266
267 pub fn entity_count(&self) -> usize {
269 self.cache.read().len()
270 }
271
272 pub fn add_remote_watch(&self, prefixed_id: &str) {
274 self.remote_watch_ids
275 .write()
276 .insert(prefixed_id.to_string());
277 }
278
279 pub fn remove_remote_watch(&self, prefixed_id: &str) {
281 self.remote_watch_ids.write().remove(prefixed_id);
282 }
283
284 pub fn remote_watch_count(&self) -> usize {
286 self.remote_watch_ids.read().len()
287 }
288
289 pub fn transport_mode(&self) -> TransportMode {
291 match self.transport_mode.load(Ordering::Relaxed) {
292 1 => TransportMode::WebSocket,
293 _ => TransportMode::Http,
294 }
295 }
296
297 pub fn is_connected(&self) -> bool {
299 self.connected.load(Ordering::Relaxed)
300 }
301
302 pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
304 *self.last_sync.read()
305 }
306
307 fn strip_id(&self, prefixed_id: &str) -> String {
309 if let Some(ref prefix) = self.config.id_prefix {
310 prefixed_id
311 .strip_prefix(prefix.as_str())
312 .unwrap_or(prefixed_id)
313 .to_string()
314 } else {
315 prefixed_id.to_string()
316 }
317 }
318
319 async fn ensure_connected(&self) -> Result<(), String> {
322 if self.client.read().await.is_none() {
323 self.connect_persistent().await?;
324 }
325 Ok(())
326 }
327
328 async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
331 *self.client.write().await = None;
332 self.connected.store(false, Ordering::Relaxed);
333 format!("{op_name} failed: {e}")
334 }
335
336 pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
338 self.ensure_connected().await?;
339 let id = self.strip_id(prefixed_id);
340 let guard = self.client.read().await;
341 let client = guard.as_ref().ok_or("not connected")?;
342 match client.his_read(&id, range).await {
343 Ok(grid) => Ok(grid),
344 Err(e) => {
345 drop(guard);
346 Err(self.on_proxy_error("hisRead", e).await)
347 }
348 }
349 }
350
351 pub async fn proxy_point_write(
353 &self,
354 prefixed_id: &str,
355 level: u8,
356 val: &Kind,
357 ) -> Result<HGrid, String> {
358 self.ensure_connected().await?;
359 let id = self.strip_id(prefixed_id);
360 let val = val.clone();
361 let guard = self.client.read().await;
362 let client = guard.as_ref().ok_or("not connected")?;
363 match client.point_write(&id, level, val).await {
364 Ok(grid) => Ok(grid),
365 Err(e) => {
366 drop(guard);
367 Err(self.on_proxy_error("pointWrite", e).await)
368 }
369 }
370 }
371
372 pub async fn proxy_his_write(
374 &self,
375 prefixed_id: &str,
376 items: Vec<HDict>,
377 ) -> Result<HGrid, String> {
378 self.ensure_connected().await?;
379 let id = self.strip_id(prefixed_id);
380 let guard = self.client.read().await;
381 let client = guard.as_ref().ok_or("not connected")?;
382 match client.his_write(&id, items).await {
383 Ok(grid) => Ok(grid),
384 Err(e) => {
385 drop(guard);
386 Err(self.on_proxy_error("hisWrite", e).await)
387 }
388 }
389 }
390
391 pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
396 self.ensure_connected().await?;
397
398 let mut stripped = entity.clone();
399 if let Some(ref prefix) = self.config.id_prefix {
400 strip_prefix_refs(&mut stripped, prefix);
401 }
402
403 let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
404 let cols: Vec<haystack_core::data::HCol> = col_names
405 .iter()
406 .map(|n| haystack_core::data::HCol::new(n.as_str()))
407 .collect();
408 let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
409
410 let guard = self.client.read().await;
411 let client = guard.as_ref().ok_or("not connected")?;
412 match client.call("import", &grid).await {
413 Ok(grid) => Ok(grid),
414 Err(e) => {
415 drop(guard);
416 Err(self.on_proxy_error("import", e).await)
417 }
418 }
419 }
420
421 pub async fn proxy_invoke_action(
423 &self,
424 prefixed_id: &str,
425 action: &str,
426 args: HDict,
427 ) -> Result<HGrid, String> {
428 self.ensure_connected().await?;
429 let id = self.strip_id(prefixed_id);
430 let action = action.to_string();
431 let guard = self.client.read().await;
432 let client = guard.as_ref().ok_or("not connected")?;
433 match client.invoke_action(&id, &action, args).await {
434 Ok(grid) => Ok(grid),
435 Err(e) => {
436 drop(guard);
437 Err(self.on_proxy_error("invokeAction", e).await)
438 }
439 }
440 }
441
442 pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
448 let interval_secs = connector.config.effective_sync_interval_secs();
449 tokio::spawn(async move {
450 loop {
451 match connector.sync().await {
452 Ok(count) => {
453 log::debug!("Synced {} entities from {}", count, connector.config.name);
454 }
455 Err(e) => {
456 log::error!("Sync failed for {}: {}", connector.config.name, e);
457 *connector.client.write().await = None;
459 connector.connected.store(false, Ordering::Relaxed);
460 }
461 }
462 tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
463 }
464 })
465 }
466}
467
468fn build_read_all_grid() -> HGrid {
470 use haystack_core::data::HCol;
471 let mut row = HDict::new();
472 row.set("filter", Kind::Str("*".to_string()));
473 HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
474}
475
476impl ConnectorConfig {
477 pub fn effective_ws_url(&self) -> String {
481 if let Some(ref ws) = self.ws_url {
482 return ws.clone();
483 }
484 let ws = if self.url.starts_with("https://") {
485 self.url.replacen("https://", "wss://", 1)
486 } else {
487 self.url.replacen("http://", "ws://", 1)
488 };
489 format!("{ws}/ws")
490 }
491
492 pub fn effective_sync_interval_secs(&self) -> u64 {
494 self.sync_interval_secs.unwrap_or(60)
495 }
496}
497
498pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
503 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
504
505 for name in &tag_names {
506 let should_prefix = name == "id" || name.ends_with("Ref");
507 if !should_prefix {
508 continue;
509 }
510
511 if let Some(Kind::Ref(r)) = entity.get(name) {
512 let new_val = format!("{}{}", prefix, r.val);
513 let new_ref = HRef::new(new_val, r.dis.clone());
514 entity.set(name.as_str(), Kind::Ref(new_ref));
515 }
516 }
517}
518
519pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
525 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
526
527 for name in &tag_names {
528 let should_strip = name == "id" || name.ends_with("Ref");
529 if !should_strip {
530 continue;
531 }
532
533 if let Some(Kind::Ref(r)) = entity.get(name)
534 && let Some(stripped) = r.val.strip_prefix(prefix)
535 {
536 let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
537 entity.set(name.as_str(), Kind::Ref(new_ref));
538 }
539 }
540}
541
542#[cfg(test)]
543mod tests {
544 use super::*;
545 use haystack_core::kinds::HRef;
546
547 #[test]
548 fn connector_new_empty_cache() {
549 let config = ConnectorConfig {
550 name: "test".to_string(),
551 url: "http://localhost:8080/api".to_string(),
552 username: "user".to_string(),
553 password: "pass".to_string(),
554 id_prefix: None,
555 ws_url: None,
556 sync_interval_secs: None,
557 client_cert: None,
558 client_key: None,
559 ca_cert: None,
560 };
561 let connector = Connector::new(config);
562 assert_eq!(connector.entity_count(), 0);
563 assert!(connector.cached_entities().is_empty());
564 }
565
566 #[test]
567 fn connector_config_deserialization() {
568 let json = r#"{
569 "name": "Remote Server",
570 "url": "http://remote:8080/api",
571 "username": "admin",
572 "password": "secret",
573 "id_prefix": "r1-"
574 }"#;
575 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
576 assert_eq!(config.name, "Remote Server");
577 assert_eq!(config.url, "http://remote:8080/api");
578 assert_eq!(config.username, "admin");
579 assert_eq!(config.password, "secret");
580 assert_eq!(config.id_prefix, Some("r1-".to_string()));
581 }
582
583 #[test]
584 fn connector_config_deserialization_without_prefix() {
585 let json = r#"{
586 "name": "Remote",
587 "url": "http://remote:8080/api",
588 "username": "admin",
589 "password": "secret"
590 }"#;
591 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
592 assert_eq!(config.id_prefix, None);
593 }
594
595 #[test]
596 fn id_prefix_application() {
597 let mut entity = HDict::new();
598 entity.set("id", Kind::Ref(HRef::from_val("site-1")));
599 entity.set("dis", Kind::Str("Main Site".to_string()));
600 entity.set("site", Kind::Marker);
601 entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
602 entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
603 entity.set(
604 "floorRef",
605 Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
606 );
607
608 prefix_refs(&mut entity, "r1-");
609
610 match entity.get("id") {
612 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
613 other => panic!("expected Ref, got {other:?}"),
614 }
615
616 match entity.get("siteRef") {
618 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
619 other => panic!("expected Ref, got {other:?}"),
620 }
621
622 match entity.get("equipRef") {
624 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
625 other => panic!("expected Ref, got {other:?}"),
626 }
627
628 match entity.get("floorRef") {
630 Some(Kind::Ref(r)) => {
631 assert_eq!(r.val, "r1-floor-1");
632 assert_eq!(r.dis, Some("Floor 1".to_string()));
633 }
634 other => panic!("expected Ref, got {other:?}"),
635 }
636
637 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
639 assert_eq!(entity.get("site"), Some(&Kind::Marker));
640 }
641
642 #[test]
643 fn id_prefix_skips_non_ref_values() {
644 let mut entity = HDict::new();
645 entity.set("id", Kind::Ref(HRef::from_val("point-1")));
646 entity.set("customRef", Kind::Str("not-a-ref".to_string()));
648
649 prefix_refs(&mut entity, "p-");
650
651 match entity.get("id") {
653 Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
654 other => panic!("expected Ref, got {other:?}"),
655 }
656
657 assert_eq!(
659 entity.get("customRef"),
660 Some(&Kind::Str("not-a-ref".to_string()))
661 );
662 }
663
664 #[test]
665 fn connector_config_deserialization_full() {
666 let json = r#"{
667 "name": "Full Config",
668 "url": "https://remote:8443/api",
669 "username": "admin",
670 "password": "secret",
671 "id_prefix": "r1-",
672 "ws_url": "wss://remote:8443/api/ws",
673 "sync_interval_secs": 30,
674 "client_cert": "/etc/certs/client.pem",
675 "client_key": "/etc/certs/client-key.pem",
676 "ca_cert": "/etc/certs/ca.pem"
677 }"#;
678 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
679 assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
680 assert_eq!(config.sync_interval_secs, Some(30));
681 assert_eq!(
682 config.client_cert,
683 Some("/etc/certs/client.pem".to_string())
684 );
685 assert_eq!(
686 config.client_key,
687 Some("/etc/certs/client-key.pem".to_string())
688 );
689 assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
690 }
691
692 #[test]
693 fn strip_prefix_refs_reverses_prefix() {
694 let mut entity = HDict::new();
695 entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
696 entity.set("dis", Kind::Str("Main Site".to_string()));
697 entity.set("site", Kind::Marker);
698 entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
699 entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
700
701 strip_prefix_refs(&mut entity, "r1-");
702
703 match entity.get("id") {
704 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
705 other => panic!("expected Ref, got {other:?}"),
706 }
707 match entity.get("siteRef") {
708 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
709 other => panic!("expected Ref, got {other:?}"),
710 }
711 match entity.get("equipRef") {
712 Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
713 other => panic!("expected Ref, got {other:?}"),
714 }
715 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
716 }
717
718 #[test]
719 fn strip_prefix_refs_ignores_non_matching() {
720 let mut entity = HDict::new();
721 entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
722
723 strip_prefix_refs(&mut entity, "r1-");
724
725 match entity.get("id") {
726 Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
727 other => panic!("expected Ref, got {other:?}"),
728 }
729 }
730
731 #[test]
732 fn derive_ws_url_from_http() {
733 let config = ConnectorConfig {
734 name: "test".to_string(),
735 url: "http://remote:8080/api".to_string(),
736 username: "u".to_string(),
737 password: "p".to_string(),
738 id_prefix: None,
739 ws_url: None,
740 sync_interval_secs: None,
741 client_cert: None,
742 client_key: None,
743 ca_cert: None,
744 };
745 assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
746 }
747
748 #[test]
749 fn derive_ws_url_from_https() {
750 let config = ConnectorConfig {
751 name: "test".to_string(),
752 url: "https://remote:8443/api".to_string(),
753 username: "u".to_string(),
754 password: "p".to_string(),
755 id_prefix: None,
756 ws_url: None,
757 sync_interval_secs: None,
758 client_cert: None,
759 client_key: None,
760 ca_cert: None,
761 };
762 assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
763 }
764
765 #[test]
766 fn explicit_ws_url_overrides_derived() {
767 let config = ConnectorConfig {
768 name: "test".to_string(),
769 url: "http://remote:8080/api".to_string(),
770 username: "u".to_string(),
771 password: "p".to_string(),
772 id_prefix: None,
773 ws_url: Some("ws://custom:9999/ws".to_string()),
774 sync_interval_secs: None,
775 client_cert: None,
776 client_key: None,
777 ca_cert: None,
778 };
779 assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
780 }
781
782 #[test]
783 fn connector_tracks_entity_ids_in_ownership() {
784 let config = ConnectorConfig {
785 name: "test".to_string(),
786 url: "http://localhost:8080/api".to_string(),
787 username: "user".to_string(),
788 password: "pass".to_string(),
789 id_prefix: Some("t-".to_string()),
790 ws_url: None,
791 sync_interval_secs: None,
792 client_cert: None,
793 client_key: None,
794 ca_cert: None,
795 };
796 let connector = Connector::new(config);
797 assert!(!connector.owns("t-site-1"));
798
799 {
801 let mut entity = HDict::new();
802 entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
803 connector.update_cache(vec![entity]);
804 }
805
806 assert!(connector.owns("t-site-1"));
807 assert!(!connector.owns("other-1"));
808 }
809
810 #[test]
811 fn connector_new_defaults_transport_and_connected() {
812 let config = ConnectorConfig {
813 name: "test".to_string(),
814 url: "http://localhost:8080/api".to_string(),
815 username: "user".to_string(),
816 password: "pass".to_string(),
817 id_prefix: None,
818 ws_url: None,
819 sync_interval_secs: None,
820 client_cert: None,
821 client_key: None,
822 ca_cert: None,
823 };
824 let connector = Connector::new(config);
825 assert_eq!(connector.transport_mode(), TransportMode::Http);
826 assert!(!connector.is_connected());
827 assert!(connector.last_sync_time().is_none());
828 }
829
830 #[test]
831 fn connector_config_new_fields_default_to_none() {
832 let json = r#"{
833 "name": "Minimal",
834 "url": "http://remote:8080/api",
835 "username": "user",
836 "password": "pass"
837 }"#;
838 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
839 assert_eq!(config.ws_url, None);
840 assert_eq!(config.sync_interval_secs, None);
841 assert_eq!(config.client_cert, None);
842 assert_eq!(config.client_key, None);
843 assert_eq!(config.ca_cert, None);
844 }
845
846 #[test]
847 fn remote_watch_add_and_remove() {
848 let config = ConnectorConfig {
849 name: "test".to_string(),
850 url: "http://localhost:8080/api".to_string(),
851 username: "user".to_string(),
852 password: "pass".to_string(),
853 id_prefix: Some("r-".to_string()),
854 ws_url: None,
855 sync_interval_secs: None,
856 client_cert: None,
857 client_key: None,
858 ca_cert: None,
859 };
860 let connector = Connector::new(config);
861 assert_eq!(connector.remote_watch_count(), 0);
862
863 connector.add_remote_watch("r-site-1");
864 assert_eq!(connector.remote_watch_count(), 1);
865
866 connector.add_remote_watch("r-equip-2");
867 assert_eq!(connector.remote_watch_count(), 2);
868
869 connector.add_remote_watch("r-site-1");
871 assert_eq!(connector.remote_watch_count(), 2);
872
873 connector.remove_remote_watch("r-site-1");
874 assert_eq!(connector.remote_watch_count(), 1);
875
876 connector.remove_remote_watch("r-nonexistent");
878 assert_eq!(connector.remote_watch_count(), 1);
879
880 connector.remove_remote_watch("r-equip-2");
881 assert_eq!(connector.remote_watch_count(), 0);
882 }
883}