1use std::collections::HashSet;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9
10use haystack_client::HaystackClient;
11use haystack_client::transport::http::HttpTransport;
12use haystack_client::transport::ws::WsTransport;
13use haystack_core::data::{HDict, HGrid};
14use haystack_core::kinds::{HRef, Kind};
15
16enum ConnectorClient {
21 Http(HaystackClient<HttpTransport>),
22 Ws(HaystackClient<WsTransport>),
23}
24
25impl ConnectorClient {
26 async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
29 match self {
30 ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
31 ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
32 }
33 }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38#[repr(u8)]
39pub enum TransportMode {
40 Http = 0,
41 WebSocket = 1,
42}
43
44#[derive(Debug, Clone, serde::Deserialize)]
46pub struct ConnectorConfig {
47 pub name: String,
49 pub url: String,
51 pub username: String,
53 pub password: String,
55 pub id_prefix: Option<String>,
57 pub ws_url: Option<String>,
59 pub sync_interval_secs: Option<u64>,
61 pub client_cert: Option<String>,
63 pub client_key: Option<String>,
65 pub ca_cert: Option<String>,
67}
68
69pub struct Connector {
71 pub config: ConnectorConfig,
72 cache: RwLock<Vec<HDict>>,
74 owned_ids: RwLock<HashSet<String>>,
76 remote_watch_ids: RwLock<HashSet<String>>,
78 transport_mode: AtomicU8,
80 connected: AtomicBool,
82 last_sync: RwLock<Option<DateTime<Utc>>>,
84 client: tokio::sync::RwLock<Option<ConnectorClient>>,
88}
89
90impl Connector {
91 pub fn new(config: ConnectorConfig) -> Self {
93 Self {
94 config,
95 cache: RwLock::new(Vec::new()),
96 owned_ids: RwLock::new(HashSet::new()),
97 remote_watch_ids: RwLock::new(HashSet::new()),
98 transport_mode: AtomicU8::new(TransportMode::Http as u8),
99 connected: AtomicBool::new(false),
100 last_sync: RwLock::new(None),
101 client: tokio::sync::RwLock::new(None),
102 }
103 }
104
105 async fn connect_persistent(&self) -> Result<(), String> {
108 let ws_url = self.config.effective_ws_url();
110 match HaystackClient::connect_ws(
111 &self.config.url,
112 &ws_url,
113 &self.config.username,
114 &self.config.password,
115 )
116 .await
117 {
118 Ok(ws_client) => {
119 *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
120 self.transport_mode
121 .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
122 self.connected.store(true, Ordering::Relaxed);
123 log::info!("Connected to {} via WebSocket", self.config.name);
124 return Ok(());
125 }
126 Err(e) => {
127 log::warn!(
128 "WS connection to {} failed: {}, trying HTTP",
129 self.config.name,
130 e
131 );
132 }
133 }
134
135 match HaystackClient::connect(
137 &self.config.url,
138 &self.config.username,
139 &self.config.password,
140 )
141 .await
142 {
143 Ok(http_client) => {
144 *self.client.write().await = Some(ConnectorClient::Http(http_client));
145 self.transport_mode
146 .store(TransportMode::Http as u8, Ordering::Relaxed);
147 self.connected.store(true, Ordering::Relaxed);
148 log::info!("Connected to {} via HTTP", self.config.name);
149 Ok(())
150 }
151 Err(e) => {
152 self.connected.store(false, Ordering::Relaxed);
153 Err(format!("connection failed: {e}"))
154 }
155 }
156 }
157
158 pub async fn sync(&self) -> Result<usize, String> {
164 if self.client.read().await.is_none() {
166 self.connect_persistent().await?;
167 }
168
169 let grid = {
171 let client = self.client.read().await;
172 let client = client.as_ref().ok_or("not connected")?;
173 client.call("read", &build_read_all_grid()).await
174 };
175
176 let grid = grid.map_err(|e| {
177 self.connected.store(false, Ordering::Relaxed);
178 format!("read failed: {e}")
179 })?;
180
181 let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
182
183 if let Some(ref prefix) = self.config.id_prefix {
185 for entity in &mut entities {
186 prefix_refs(entity, prefix);
187 }
188 }
189
190 let count = entities.len();
191 self.update_cache(entities);
192 *self.last_sync.write() = Some(Utc::now());
193 self.connected.store(true, Ordering::Relaxed);
194 Ok(count)
195 }
196
197 pub fn update_cache(&self, entities: Vec<HDict>) {
202 let ids: HashSet<String> = entities
203 .iter()
204 .filter_map(|e| match e.get("id") {
205 Some(Kind::Ref(r)) => Some(r.val.clone()),
206 _ => None,
207 })
208 .collect();
209 *self.cache.write() = entities;
210 *self.owned_ids.write() = ids;
211 }
212
213 pub fn owns(&self, id: &str) -> bool {
215 self.owned_ids.read().contains(id)
216 }
217
218 pub fn cached_entities(&self) -> Vec<HDict> {
220 self.cache.read().clone()
221 }
222
223 pub fn entity_count(&self) -> usize {
225 self.cache.read().len()
226 }
227
228 pub fn add_remote_watch(&self, prefixed_id: &str) {
230 self.remote_watch_ids
231 .write()
232 .insert(prefixed_id.to_string());
233 }
234
235 pub fn remove_remote_watch(&self, prefixed_id: &str) {
237 self.remote_watch_ids.write().remove(prefixed_id);
238 }
239
240 pub fn remote_watch_count(&self) -> usize {
242 self.remote_watch_ids.read().len()
243 }
244
245 pub fn transport_mode(&self) -> TransportMode {
247 match self.transport_mode.load(Ordering::Relaxed) {
248 1 => TransportMode::WebSocket,
249 _ => TransportMode::Http,
250 }
251 }
252
253 pub fn is_connected(&self) -> bool {
255 self.connected.load(Ordering::Relaxed)
256 }
257
258 pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
260 *self.last_sync.read()
261 }
262
263 fn strip_id(&self, prefixed_id: &str) -> String {
265 if let Some(ref prefix) = self.config.id_prefix {
266 prefixed_id
267 .strip_prefix(prefix.as_str())
268 .unwrap_or(prefixed_id)
269 .to_string()
270 } else {
271 prefixed_id.to_string()
272 }
273 }
274
275 async fn connect_remote(
280 &self,
281 ) -> Result<HaystackClient<haystack_client::transport::http::HttpTransport>, String> {
282 HaystackClient::connect(
283 &self.config.url,
284 &self.config.username,
285 &self.config.password,
286 )
287 .await
288 .map_err(|e| format!("connection failed: {e}"))
289 }
290
291 pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
293 let id = self.strip_id(prefixed_id);
294 let client = self.connect_remote().await?;
295 client
296 .his_read(&id, range)
297 .await
298 .map_err(|e| format!("hisRead failed: {e}"))
299 }
300
301 pub async fn proxy_point_write(
303 &self,
304 prefixed_id: &str,
305 level: u8,
306 val: &Kind,
307 ) -> Result<HGrid, String> {
308 let id = self.strip_id(prefixed_id);
309 let client = self.connect_remote().await?;
310 client
311 .point_write(&id, level, val.clone())
312 .await
313 .map_err(|e| format!("pointWrite failed: {e}"))
314 }
315
316 pub async fn proxy_his_write(
318 &self,
319 prefixed_id: &str,
320 items: Vec<HDict>,
321 ) -> Result<HGrid, String> {
322 let id = self.strip_id(prefixed_id);
323 let client = self.connect_remote().await?;
324 client
325 .his_write(&id, items)
326 .await
327 .map_err(|e| format!("hisWrite failed: {e}"))
328 }
329
330 pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
335 use crate::connector::strip_prefix_refs;
336
337 let mut stripped = entity.clone();
338 if let Some(ref prefix) = self.config.id_prefix {
339 strip_prefix_refs(&mut stripped, prefix);
340 }
341
342 let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
344 let cols: Vec<haystack_core::data::HCol> = col_names
345 .iter()
346 .map(|n| haystack_core::data::HCol::new(n.as_str()))
347 .collect();
348 let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
349
350 let client = self.connect_remote().await?;
351 client
352 .call("import", &grid)
353 .await
354 .map_err(|e| format!("import failed: {e}"))
355 }
356
357 pub async fn proxy_invoke_action(
359 &self,
360 prefixed_id: &str,
361 action: &str,
362 args: HDict,
363 ) -> Result<HGrid, String> {
364 let id = self.strip_id(prefixed_id);
365 let client = self.connect_remote().await?;
366 client
367 .invoke_action(&id, action, args)
368 .await
369 .map_err(|e| format!("invokeAction failed: {e}"))
370 }
371
372 pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
378 let interval_secs = connector.config.effective_sync_interval_secs();
379 tokio::spawn(async move {
380 loop {
381 match connector.sync().await {
382 Ok(count) => {
383 log::debug!("Synced {} entities from {}", count, connector.config.name);
384 }
385 Err(e) => {
386 log::error!("Sync failed for {}: {}", connector.config.name, e);
387 *connector.client.write().await = None;
389 connector.connected.store(false, Ordering::Relaxed);
390 }
391 }
392 tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
393 }
394 })
395 }
396}
397
398fn build_read_all_grid() -> HGrid {
400 use haystack_core::data::HCol;
401 let mut row = HDict::new();
402 row.set("filter", Kind::Str("*".to_string()));
403 HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
404}
405
406impl ConnectorConfig {
407 pub fn effective_ws_url(&self) -> String {
411 if let Some(ref ws) = self.ws_url {
412 return ws.clone();
413 }
414 let ws = if self.url.starts_with("https://") {
415 self.url.replacen("https://", "wss://", 1)
416 } else {
417 self.url.replacen("http://", "ws://", 1)
418 };
419 format!("{ws}/ws")
420 }
421
422 pub fn effective_sync_interval_secs(&self) -> u64 {
424 self.sync_interval_secs.unwrap_or(60)
425 }
426}
427
428pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
433 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
434
435 for name in &tag_names {
436 let should_prefix = name == "id" || name.ends_with("Ref");
437 if !should_prefix {
438 continue;
439 }
440
441 if let Some(Kind::Ref(r)) = entity.get(name) {
442 let new_val = format!("{}{}", prefix, r.val);
443 let new_ref = HRef::new(new_val, r.dis.clone());
444 entity.set(name.as_str(), Kind::Ref(new_ref));
445 }
446 }
447}
448
449pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
455 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
456
457 for name in &tag_names {
458 let should_strip = name == "id" || name.ends_with("Ref");
459 if !should_strip {
460 continue;
461 }
462
463 if let Some(Kind::Ref(r)) = entity.get(name)
464 && let Some(stripped) = r.val.strip_prefix(prefix)
465 {
466 let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
467 entity.set(name.as_str(), Kind::Ref(new_ref));
468 }
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475 use haystack_core::kinds::HRef;
476
477 #[test]
478 fn connector_new_empty_cache() {
479 let config = ConnectorConfig {
480 name: "test".to_string(),
481 url: "http://localhost:8080/api".to_string(),
482 username: "user".to_string(),
483 password: "pass".to_string(),
484 id_prefix: None,
485 ws_url: None,
486 sync_interval_secs: None,
487 client_cert: None,
488 client_key: None,
489 ca_cert: None,
490 };
491 let connector = Connector::new(config);
492 assert_eq!(connector.entity_count(), 0);
493 assert!(connector.cached_entities().is_empty());
494 }
495
496 #[test]
497 fn connector_config_deserialization() {
498 let json = r#"{
499 "name": "Remote Server",
500 "url": "http://remote:8080/api",
501 "username": "admin",
502 "password": "secret",
503 "id_prefix": "r1-"
504 }"#;
505 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
506 assert_eq!(config.name, "Remote Server");
507 assert_eq!(config.url, "http://remote:8080/api");
508 assert_eq!(config.username, "admin");
509 assert_eq!(config.password, "secret");
510 assert_eq!(config.id_prefix, Some("r1-".to_string()));
511 }
512
513 #[test]
514 fn connector_config_deserialization_without_prefix() {
515 let json = r#"{
516 "name": "Remote",
517 "url": "http://remote:8080/api",
518 "username": "admin",
519 "password": "secret"
520 }"#;
521 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
522 assert_eq!(config.id_prefix, None);
523 }
524
525 #[test]
526 fn id_prefix_application() {
527 let mut entity = HDict::new();
528 entity.set("id", Kind::Ref(HRef::from_val("site-1")));
529 entity.set("dis", Kind::Str("Main Site".to_string()));
530 entity.set("site", Kind::Marker);
531 entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
532 entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
533 entity.set(
534 "floorRef",
535 Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
536 );
537
538 prefix_refs(&mut entity, "r1-");
539
540 match entity.get("id") {
542 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
543 other => panic!("expected Ref, got {other:?}"),
544 }
545
546 match entity.get("siteRef") {
548 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
549 other => panic!("expected Ref, got {other:?}"),
550 }
551
552 match entity.get("equipRef") {
554 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
555 other => panic!("expected Ref, got {other:?}"),
556 }
557
558 match entity.get("floorRef") {
560 Some(Kind::Ref(r)) => {
561 assert_eq!(r.val, "r1-floor-1");
562 assert_eq!(r.dis, Some("Floor 1".to_string()));
563 }
564 other => panic!("expected Ref, got {other:?}"),
565 }
566
567 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
569 assert_eq!(entity.get("site"), Some(&Kind::Marker));
570 }
571
572 #[test]
573 fn id_prefix_skips_non_ref_values() {
574 let mut entity = HDict::new();
575 entity.set("id", Kind::Ref(HRef::from_val("point-1")));
576 entity.set("customRef", Kind::Str("not-a-ref".to_string()));
578
579 prefix_refs(&mut entity, "p-");
580
581 match entity.get("id") {
583 Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
584 other => panic!("expected Ref, got {other:?}"),
585 }
586
587 assert_eq!(
589 entity.get("customRef"),
590 Some(&Kind::Str("not-a-ref".to_string()))
591 );
592 }
593
594 #[test]
595 fn connector_config_deserialization_full() {
596 let json = r#"{
597 "name": "Full Config",
598 "url": "https://remote:8443/api",
599 "username": "admin",
600 "password": "secret",
601 "id_prefix": "r1-",
602 "ws_url": "wss://remote:8443/api/ws",
603 "sync_interval_secs": 30,
604 "client_cert": "/etc/certs/client.pem",
605 "client_key": "/etc/certs/client-key.pem",
606 "ca_cert": "/etc/certs/ca.pem"
607 }"#;
608 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
609 assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
610 assert_eq!(config.sync_interval_secs, Some(30));
611 assert_eq!(
612 config.client_cert,
613 Some("/etc/certs/client.pem".to_string())
614 );
615 assert_eq!(
616 config.client_key,
617 Some("/etc/certs/client-key.pem".to_string())
618 );
619 assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
620 }
621
622 #[test]
623 fn strip_prefix_refs_reverses_prefix() {
624 let mut entity = HDict::new();
625 entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
626 entity.set("dis", Kind::Str("Main Site".to_string()));
627 entity.set("site", Kind::Marker);
628 entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
629 entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
630
631 strip_prefix_refs(&mut entity, "r1-");
632
633 match entity.get("id") {
634 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
635 other => panic!("expected Ref, got {other:?}"),
636 }
637 match entity.get("siteRef") {
638 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
639 other => panic!("expected Ref, got {other:?}"),
640 }
641 match entity.get("equipRef") {
642 Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
643 other => panic!("expected Ref, got {other:?}"),
644 }
645 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
646 }
647
648 #[test]
649 fn strip_prefix_refs_ignores_non_matching() {
650 let mut entity = HDict::new();
651 entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
652
653 strip_prefix_refs(&mut entity, "r1-");
654
655 match entity.get("id") {
656 Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
657 other => panic!("expected Ref, got {other:?}"),
658 }
659 }
660
661 #[test]
662 fn derive_ws_url_from_http() {
663 let config = ConnectorConfig {
664 name: "test".to_string(),
665 url: "http://remote:8080/api".to_string(),
666 username: "u".to_string(),
667 password: "p".to_string(),
668 id_prefix: None,
669 ws_url: None,
670 sync_interval_secs: None,
671 client_cert: None,
672 client_key: None,
673 ca_cert: None,
674 };
675 assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
676 }
677
678 #[test]
679 fn derive_ws_url_from_https() {
680 let config = ConnectorConfig {
681 name: "test".to_string(),
682 url: "https://remote:8443/api".to_string(),
683 username: "u".to_string(),
684 password: "p".to_string(),
685 id_prefix: None,
686 ws_url: None,
687 sync_interval_secs: None,
688 client_cert: None,
689 client_key: None,
690 ca_cert: None,
691 };
692 assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
693 }
694
695 #[test]
696 fn explicit_ws_url_overrides_derived() {
697 let config = ConnectorConfig {
698 name: "test".to_string(),
699 url: "http://remote:8080/api".to_string(),
700 username: "u".to_string(),
701 password: "p".to_string(),
702 id_prefix: None,
703 ws_url: Some("ws://custom:9999/ws".to_string()),
704 sync_interval_secs: None,
705 client_cert: None,
706 client_key: None,
707 ca_cert: None,
708 };
709 assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
710 }
711
712 #[test]
713 fn connector_tracks_entity_ids_in_ownership() {
714 let config = ConnectorConfig {
715 name: "test".to_string(),
716 url: "http://localhost:8080/api".to_string(),
717 username: "user".to_string(),
718 password: "pass".to_string(),
719 id_prefix: Some("t-".to_string()),
720 ws_url: None,
721 sync_interval_secs: None,
722 client_cert: None,
723 client_key: None,
724 ca_cert: None,
725 };
726 let connector = Connector::new(config);
727 assert!(!connector.owns("t-site-1"));
728
729 {
731 let mut entity = HDict::new();
732 entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
733 connector.update_cache(vec![entity]);
734 }
735
736 assert!(connector.owns("t-site-1"));
737 assert!(!connector.owns("other-1"));
738 }
739
740 #[test]
741 fn connector_new_defaults_transport_and_connected() {
742 let config = ConnectorConfig {
743 name: "test".to_string(),
744 url: "http://localhost:8080/api".to_string(),
745 username: "user".to_string(),
746 password: "pass".to_string(),
747 id_prefix: None,
748 ws_url: None,
749 sync_interval_secs: None,
750 client_cert: None,
751 client_key: None,
752 ca_cert: None,
753 };
754 let connector = Connector::new(config);
755 assert_eq!(connector.transport_mode(), TransportMode::Http);
756 assert!(!connector.is_connected());
757 assert!(connector.last_sync_time().is_none());
758 }
759
760 #[test]
761 fn connector_config_new_fields_default_to_none() {
762 let json = r#"{
763 "name": "Minimal",
764 "url": "http://remote:8080/api",
765 "username": "user",
766 "password": "pass"
767 }"#;
768 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
769 assert_eq!(config.ws_url, None);
770 assert_eq!(config.sync_interval_secs, None);
771 assert_eq!(config.client_cert, None);
772 assert_eq!(config.client_key, None);
773 assert_eq!(config.ca_cert, None);
774 }
775
776 #[test]
777 fn remote_watch_add_and_remove() {
778 let config = ConnectorConfig {
779 name: "test".to_string(),
780 url: "http://localhost:8080/api".to_string(),
781 username: "user".to_string(),
782 password: "pass".to_string(),
783 id_prefix: Some("r-".to_string()),
784 ws_url: None,
785 sync_interval_secs: None,
786 client_cert: None,
787 client_key: None,
788 ca_cert: None,
789 };
790 let connector = Connector::new(config);
791 assert_eq!(connector.remote_watch_count(), 0);
792
793 connector.add_remote_watch("r-site-1");
794 assert_eq!(connector.remote_watch_count(), 1);
795
796 connector.add_remote_watch("r-equip-2");
797 assert_eq!(connector.remote_watch_count(), 2);
798
799 connector.add_remote_watch("r-site-1");
801 assert_eq!(connector.remote_watch_count(), 2);
802
803 connector.remove_remote_watch("r-site-1");
804 assert_eq!(connector.remote_watch_count(), 1);
805
806 connector.remove_remote_watch("r-nonexistent");
808 assert_eq!(connector.remote_watch_count(), 1);
809
810 connector.remove_remote_watch("r-equip-2");
811 assert_eq!(connector.remote_watch_count(), 0);
812 }
813}