1use std::collections::HashSet;
4use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
5use std::sync::Arc;
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9
10use haystack_client::HaystackClient;
11use haystack_client::transport::http::HttpTransport;
12use haystack_client::transport::ws::WsTransport;
13use haystack_core::data::{HDict, HGrid};
14use haystack_core::kinds::{HRef, Kind};
15
16enum ConnectorClient {
21 Http(HaystackClient<HttpTransport>),
22 Ws(HaystackClient<WsTransport>),
23}
24
25impl ConnectorClient {
26 async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
29 match self {
30 ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
31 ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
32 }
33 }
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38#[repr(u8)]
39pub enum TransportMode {
40 Http = 0,
41 WebSocket = 1,
42}
43
44#[derive(Debug, Clone, serde::Deserialize)]
46pub struct ConnectorConfig {
47 pub name: String,
49 pub url: String,
51 pub username: String,
53 pub password: String,
55 pub id_prefix: Option<String>,
57 pub ws_url: Option<String>,
59 pub sync_interval_secs: Option<u64>,
61 pub client_cert: Option<String>,
63 pub client_key: Option<String>,
65 pub ca_cert: Option<String>,
67}
68
69pub struct Connector {
71 pub config: ConnectorConfig,
72 cache: RwLock<Vec<HDict>>,
74 owned_ids: RwLock<HashSet<String>>,
76 remote_watch_ids: RwLock<HashSet<String>>,
78 transport_mode: AtomicU8,
80 connected: AtomicBool,
82 last_sync: RwLock<Option<DateTime<Utc>>>,
84 client: tokio::sync::RwLock<Option<ConnectorClient>>,
88}
89
90impl Connector {
91 pub fn new(config: ConnectorConfig) -> Self {
93 Self {
94 config,
95 cache: RwLock::new(Vec::new()),
96 owned_ids: RwLock::new(HashSet::new()),
97 remote_watch_ids: RwLock::new(HashSet::new()),
98 transport_mode: AtomicU8::new(TransportMode::Http as u8),
99 connected: AtomicBool::new(false),
100 last_sync: RwLock::new(None),
101 client: tokio::sync::RwLock::new(None),
102 }
103 }
104
105 async fn connect_persistent(&self) -> Result<(), String> {
108 let ws_url = self.config.effective_ws_url();
110 match HaystackClient::connect_ws(
111 &self.config.url,
112 &ws_url,
113 &self.config.username,
114 &self.config.password,
115 )
116 .await
117 {
118 Ok(ws_client) => {
119 *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
120 self.transport_mode
121 .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
122 self.connected.store(true, Ordering::Relaxed);
123 log::info!("Connected to {} via WebSocket", self.config.name);
124 return Ok(());
125 }
126 Err(e) => {
127 log::warn!(
128 "WS connection to {} failed: {}, trying HTTP",
129 self.config.name,
130 e
131 );
132 }
133 }
134
135 match HaystackClient::connect(
137 &self.config.url,
138 &self.config.username,
139 &self.config.password,
140 )
141 .await
142 {
143 Ok(http_client) => {
144 *self.client.write().await = Some(ConnectorClient::Http(http_client));
145 self.transport_mode
146 .store(TransportMode::Http as u8, Ordering::Relaxed);
147 self.connected.store(true, Ordering::Relaxed);
148 log::info!("Connected to {} via HTTP", self.config.name);
149 Ok(())
150 }
151 Err(e) => {
152 self.connected.store(false, Ordering::Relaxed);
153 Err(format!("connection failed: {e}"))
154 }
155 }
156 }
157
158 pub async fn sync(&self) -> Result<usize, String> {
164 if self.client.read().await.is_none() {
166 self.connect_persistent().await?;
167 }
168
169 let grid = {
171 let client = self.client.read().await;
172 let client = client.as_ref().ok_or("not connected")?;
173 client.call("read", &build_read_all_grid()).await
174 };
175
176 let grid = grid.map_err(|e| {
177 self.connected.store(false, Ordering::Relaxed);
178 format!("read failed: {e}")
179 })?;
180
181 let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
182
183 if let Some(ref prefix) = self.config.id_prefix {
185 for entity in &mut entities {
186 prefix_refs(entity, prefix);
187 }
188 }
189
190 let count = entities.len();
191 self.update_cache(entities);
192 *self.last_sync.write() = Some(Utc::now());
193 self.connected.store(true, Ordering::Relaxed);
194 Ok(count)
195 }
196
197 pub fn update_cache(&self, entities: Vec<HDict>) {
202 let ids: HashSet<String> = entities
203 .iter()
204 .filter_map(|e| match e.get("id") {
205 Some(Kind::Ref(r)) => Some(r.val.clone()),
206 _ => None,
207 })
208 .collect();
209 *self.cache.write() = entities;
210 *self.owned_ids.write() = ids;
211 }
212
213 pub fn owns(&self, id: &str) -> bool {
215 self.owned_ids.read().contains(id)
216 }
217
218 pub fn cached_entities(&self) -> Vec<HDict> {
220 self.cache.read().clone()
221 }
222
223 pub fn entity_count(&self) -> usize {
225 self.cache.read().len()
226 }
227
228 pub fn add_remote_watch(&self, prefixed_id: &str) {
230 self.remote_watch_ids
231 .write()
232 .insert(prefixed_id.to_string());
233 }
234
235 pub fn remove_remote_watch(&self, prefixed_id: &str) {
237 self.remote_watch_ids.write().remove(prefixed_id);
238 }
239
240 pub fn remote_watch_count(&self) -> usize {
242 self.remote_watch_ids.read().len()
243 }
244
245 pub fn transport_mode(&self) -> TransportMode {
247 match self.transport_mode.load(Ordering::Relaxed) {
248 1 => TransportMode::WebSocket,
249 _ => TransportMode::Http,
250 }
251 }
252
253 pub fn is_connected(&self) -> bool {
255 self.connected.load(Ordering::Relaxed)
256 }
257
258 pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
260 *self.last_sync.read()
261 }
262
263 fn strip_id(&self, prefixed_id: &str) -> String {
265 if let Some(ref prefix) = self.config.id_prefix {
266 prefixed_id
267 .strip_prefix(prefix.as_str())
268 .unwrap_or(prefixed_id)
269 .to_string()
270 } else {
271 prefixed_id.to_string()
272 }
273 }
274
275 async fn connect_remote(
280 &self,
281 ) -> Result<
282 HaystackClient<haystack_client::transport::http::HttpTransport>,
283 String,
284 > {
285 HaystackClient::connect(
286 &self.config.url,
287 &self.config.username,
288 &self.config.password,
289 )
290 .await
291 .map_err(|e| format!("connection failed: {e}"))
292 }
293
294 pub async fn proxy_his_read(
296 &self,
297 prefixed_id: &str,
298 range: &str,
299 ) -> Result<HGrid, String> {
300 let id = self.strip_id(prefixed_id);
301 let client = self.connect_remote().await?;
302 client
303 .his_read(&id, range)
304 .await
305 .map_err(|e| format!("hisRead failed: {e}"))
306 }
307
308 pub async fn proxy_point_write(
310 &self,
311 prefixed_id: &str,
312 level: u8,
313 val: &Kind,
314 ) -> Result<HGrid, String> {
315 let id = self.strip_id(prefixed_id);
316 let client = self.connect_remote().await?;
317 client
318 .point_write(&id, level, val.clone())
319 .await
320 .map_err(|e| format!("pointWrite failed: {e}"))
321 }
322
323 pub async fn proxy_his_write(
325 &self,
326 prefixed_id: &str,
327 items: Vec<HDict>,
328 ) -> Result<HGrid, String> {
329 let id = self.strip_id(prefixed_id);
330 let client = self.connect_remote().await?;
331 client
332 .his_write(&id, items)
333 .await
334 .map_err(|e| format!("hisWrite failed: {e}"))
335 }
336
337 pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
342 use crate::connector::strip_prefix_refs;
343
344 let mut stripped = entity.clone();
345 if let Some(ref prefix) = self.config.id_prefix {
346 strip_prefix_refs(&mut stripped, prefix);
347 }
348
349 let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
351 let cols: Vec<haystack_core::data::HCol> =
352 col_names.iter().map(|n| haystack_core::data::HCol::new(n.as_str())).collect();
353 let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
354
355 let client = self.connect_remote().await?;
356 client
357 .call("import", &grid)
358 .await
359 .map_err(|e| format!("import failed: {e}"))
360 }
361
362 pub async fn proxy_invoke_action(
364 &self,
365 prefixed_id: &str,
366 action: &str,
367 args: HDict,
368 ) -> Result<HGrid, String> {
369 let id = self.strip_id(prefixed_id);
370 let client = self.connect_remote().await?;
371 client
372 .invoke_action(&id, action, args)
373 .await
374 .map_err(|e| format!("invokeAction failed: {e}"))
375 }
376
377 pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
383 let interval_secs = connector.config.effective_sync_interval_secs();
384 tokio::spawn(async move {
385 loop {
386 match connector.sync().await {
387 Ok(count) => {
388 log::debug!(
389 "Synced {} entities from {}",
390 count,
391 connector.config.name
392 );
393 }
394 Err(e) => {
395 log::error!("Sync failed for {}: {}", connector.config.name, e);
396 *connector.client.write().await = None;
398 connector.connected.store(false, Ordering::Relaxed);
399 }
400 }
401 tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
402 }
403 })
404 }
405}
406
407fn build_read_all_grid() -> HGrid {
409 use haystack_core::data::HCol;
410 let mut row = HDict::new();
411 row.set("filter", Kind::Str("*".to_string()));
412 HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
413}
414
415impl ConnectorConfig {
416 pub fn effective_ws_url(&self) -> String {
420 if let Some(ref ws) = self.ws_url {
421 return ws.clone();
422 }
423 let ws = if self.url.starts_with("https://") {
424 self.url.replacen("https://", "wss://", 1)
425 } else {
426 self.url.replacen("http://", "ws://", 1)
427 };
428 format!("{ws}/ws")
429 }
430
431 pub fn effective_sync_interval_secs(&self) -> u64 {
433 self.sync_interval_secs.unwrap_or(60)
434 }
435}
436
437pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
442 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
443
444 for name in &tag_names {
445 let should_prefix = name == "id" || name.ends_with("Ref");
446 if !should_prefix {
447 continue;
448 }
449
450 if let Some(Kind::Ref(r)) = entity.get(name) {
451 let new_val = format!("{}{}", prefix, r.val);
452 let new_ref = HRef::new(new_val, r.dis.clone());
453 entity.set(name.as_str(), Kind::Ref(new_ref));
454 }
455 }
456}
457
458pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
464 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
465
466 for name in &tag_names {
467 let should_strip = name == "id" || name.ends_with("Ref");
468 if !should_strip {
469 continue;
470 }
471
472 if let Some(Kind::Ref(r)) = entity.get(name)
473 && let Some(stripped) = r.val.strip_prefix(prefix)
474 {
475 let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
476 entity.set(name.as_str(), Kind::Ref(new_ref));
477 }
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use haystack_core::kinds::HRef;
485
486 #[test]
487 fn connector_new_empty_cache() {
488 let config = ConnectorConfig {
489 name: "test".to_string(),
490 url: "http://localhost:8080/api".to_string(),
491 username: "user".to_string(),
492 password: "pass".to_string(),
493 id_prefix: None,
494 ws_url: None,
495 sync_interval_secs: None,
496 client_cert: None,
497 client_key: None,
498 ca_cert: None,
499 };
500 let connector = Connector::new(config);
501 assert_eq!(connector.entity_count(), 0);
502 assert!(connector.cached_entities().is_empty());
503 }
504
505 #[test]
506 fn connector_config_deserialization() {
507 let json = r#"{
508 "name": "Remote Server",
509 "url": "http://remote:8080/api",
510 "username": "admin",
511 "password": "secret",
512 "id_prefix": "r1-"
513 }"#;
514 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
515 assert_eq!(config.name, "Remote Server");
516 assert_eq!(config.url, "http://remote:8080/api");
517 assert_eq!(config.username, "admin");
518 assert_eq!(config.password, "secret");
519 assert_eq!(config.id_prefix, Some("r1-".to_string()));
520 }
521
522 #[test]
523 fn connector_config_deserialization_without_prefix() {
524 let json = r#"{
525 "name": "Remote",
526 "url": "http://remote:8080/api",
527 "username": "admin",
528 "password": "secret"
529 }"#;
530 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
531 assert_eq!(config.id_prefix, None);
532 }
533
534 #[test]
535 fn id_prefix_application() {
536 let mut entity = HDict::new();
537 entity.set("id", Kind::Ref(HRef::from_val("site-1")));
538 entity.set("dis", Kind::Str("Main Site".to_string()));
539 entity.set("site", Kind::Marker);
540 entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
541 entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
542 entity.set(
543 "floorRef",
544 Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
545 );
546
547 prefix_refs(&mut entity, "r1-");
548
549 match entity.get("id") {
551 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
552 other => panic!("expected Ref, got {other:?}"),
553 }
554
555 match entity.get("siteRef") {
557 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
558 other => panic!("expected Ref, got {other:?}"),
559 }
560
561 match entity.get("equipRef") {
563 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
564 other => panic!("expected Ref, got {other:?}"),
565 }
566
567 match entity.get("floorRef") {
569 Some(Kind::Ref(r)) => {
570 assert_eq!(r.val, "r1-floor-1");
571 assert_eq!(r.dis, Some("Floor 1".to_string()));
572 }
573 other => panic!("expected Ref, got {other:?}"),
574 }
575
576 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
578 assert_eq!(entity.get("site"), Some(&Kind::Marker));
579 }
580
581 #[test]
582 fn id_prefix_skips_non_ref_values() {
583 let mut entity = HDict::new();
584 entity.set("id", Kind::Ref(HRef::from_val("point-1")));
585 entity.set("customRef", Kind::Str("not-a-ref".to_string()));
587
588 prefix_refs(&mut entity, "p-");
589
590 match entity.get("id") {
592 Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
593 other => panic!("expected Ref, got {other:?}"),
594 }
595
596 assert_eq!(
598 entity.get("customRef"),
599 Some(&Kind::Str("not-a-ref".to_string()))
600 );
601 }
602
603 #[test]
604 fn connector_config_deserialization_full() {
605 let json = r#"{
606 "name": "Full Config",
607 "url": "https://remote:8443/api",
608 "username": "admin",
609 "password": "secret",
610 "id_prefix": "r1-",
611 "ws_url": "wss://remote:8443/api/ws",
612 "sync_interval_secs": 30,
613 "client_cert": "/etc/certs/client.pem",
614 "client_key": "/etc/certs/client-key.pem",
615 "ca_cert": "/etc/certs/ca.pem"
616 }"#;
617 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
618 assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
619 assert_eq!(config.sync_interval_secs, Some(30));
620 assert_eq!(config.client_cert, Some("/etc/certs/client.pem".to_string()));
621 assert_eq!(config.client_key, Some("/etc/certs/client-key.pem".to_string()));
622 assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
623 }
624
625 #[test]
626 fn strip_prefix_refs_reverses_prefix() {
627 let mut entity = HDict::new();
628 entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
629 entity.set("dis", Kind::Str("Main Site".to_string()));
630 entity.set("site", Kind::Marker);
631 entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
632 entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
633
634 strip_prefix_refs(&mut entity, "r1-");
635
636 match entity.get("id") {
637 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
638 other => panic!("expected Ref, got {other:?}"),
639 }
640 match entity.get("siteRef") {
641 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
642 other => panic!("expected Ref, got {other:?}"),
643 }
644 match entity.get("equipRef") {
645 Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
646 other => panic!("expected Ref, got {other:?}"),
647 }
648 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
649 }
650
651 #[test]
652 fn strip_prefix_refs_ignores_non_matching() {
653 let mut entity = HDict::new();
654 entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
655
656 strip_prefix_refs(&mut entity, "r1-");
657
658 match entity.get("id") {
659 Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
660 other => panic!("expected Ref, got {other:?}"),
661 }
662 }
663
664 #[test]
665 fn derive_ws_url_from_http() {
666 let config = ConnectorConfig {
667 name: "test".to_string(),
668 url: "http://remote:8080/api".to_string(),
669 username: "u".to_string(),
670 password: "p".to_string(),
671 id_prefix: None,
672 ws_url: None,
673 sync_interval_secs: None,
674 client_cert: None,
675 client_key: None,
676 ca_cert: None,
677 };
678 assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
679 }
680
681 #[test]
682 fn derive_ws_url_from_https() {
683 let config = ConnectorConfig {
684 name: "test".to_string(),
685 url: "https://remote:8443/api".to_string(),
686 username: "u".to_string(),
687 password: "p".to_string(),
688 id_prefix: None,
689 ws_url: None,
690 sync_interval_secs: None,
691 client_cert: None,
692 client_key: None,
693 ca_cert: None,
694 };
695 assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
696 }
697
698 #[test]
699 fn explicit_ws_url_overrides_derived() {
700 let config = ConnectorConfig {
701 name: "test".to_string(),
702 url: "http://remote:8080/api".to_string(),
703 username: "u".to_string(),
704 password: "p".to_string(),
705 id_prefix: None,
706 ws_url: Some("ws://custom:9999/ws".to_string()),
707 sync_interval_secs: None,
708 client_cert: None,
709 client_key: None,
710 ca_cert: None,
711 };
712 assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
713 }
714
715 #[test]
716 fn connector_tracks_entity_ids_in_ownership() {
717 let config = ConnectorConfig {
718 name: "test".to_string(),
719 url: "http://localhost:8080/api".to_string(),
720 username: "user".to_string(),
721 password: "pass".to_string(),
722 id_prefix: Some("t-".to_string()),
723 ws_url: None,
724 sync_interval_secs: None,
725 client_cert: None,
726 client_key: None,
727 ca_cert: None,
728 };
729 let connector = Connector::new(config);
730 assert!(!connector.owns("t-site-1"));
731
732 {
734 let mut entity = HDict::new();
735 entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
736 connector.update_cache(vec![entity]);
737 }
738
739 assert!(connector.owns("t-site-1"));
740 assert!(!connector.owns("other-1"));
741 }
742
743 #[test]
744 fn connector_new_defaults_transport_and_connected() {
745 let config = ConnectorConfig {
746 name: "test".to_string(),
747 url: "http://localhost:8080/api".to_string(),
748 username: "user".to_string(),
749 password: "pass".to_string(),
750 id_prefix: None,
751 ws_url: None,
752 sync_interval_secs: None,
753 client_cert: None,
754 client_key: None,
755 ca_cert: None,
756 };
757 let connector = Connector::new(config);
758 assert_eq!(connector.transport_mode(), TransportMode::Http);
759 assert!(!connector.is_connected());
760 assert!(connector.last_sync_time().is_none());
761 }
762
763 #[test]
764 fn connector_config_new_fields_default_to_none() {
765 let json = r#"{
766 "name": "Minimal",
767 "url": "http://remote:8080/api",
768 "username": "user",
769 "password": "pass"
770 }"#;
771 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
772 assert_eq!(config.ws_url, None);
773 assert_eq!(config.sync_interval_secs, None);
774 assert_eq!(config.client_cert, None);
775 assert_eq!(config.client_key, None);
776 assert_eq!(config.ca_cert, None);
777 }
778
779 #[test]
780 fn remote_watch_add_and_remove() {
781 let config = ConnectorConfig {
782 name: "test".to_string(),
783 url: "http://localhost:8080/api".to_string(),
784 username: "user".to_string(),
785 password: "pass".to_string(),
786 id_prefix: Some("r-".to_string()),
787 ws_url: None,
788 sync_interval_secs: None,
789 client_cert: None,
790 client_key: None,
791 ca_cert: None,
792 };
793 let connector = Connector::new(config);
794 assert_eq!(connector.remote_watch_count(), 0);
795
796 connector.add_remote_watch("r-site-1");
797 assert_eq!(connector.remote_watch_count(), 1);
798
799 connector.add_remote_watch("r-equip-2");
800 assert_eq!(connector.remote_watch_count(), 2);
801
802 connector.add_remote_watch("r-site-1");
804 assert_eq!(connector.remote_watch_count(), 2);
805
806 connector.remove_remote_watch("r-site-1");
807 assert_eq!(connector.remote_watch_count(), 1);
808
809 connector.remove_remote_watch("r-nonexistent");
811 assert_eq!(connector.remote_watch_count(), 1);
812
813 connector.remove_remote_watch("r-equip-2");
814 assert_eq!(connector.remote_watch_count(), 0);
815 }
816}