1use std::collections::HashMap;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
35
36use chrono::{DateTime, Utc};
37use parking_lot::RwLock;
38
39use haystack_client::HaystackClient;
40use haystack_client::transport::http::HttpTransport;
41use haystack_client::transport::ws::WsTransport;
42use haystack_core::data::{HDict, HGrid};
43use haystack_core::filter::{matches, parse_filter};
44use haystack_core::graph::bitmap::TagBitmapIndex;
45use haystack_core::graph::query_planner;
46use haystack_core::kinds::{HRef, Kind};
47
48enum ConnectorClient {
53 Http(HaystackClient<HttpTransport>),
54 Ws(HaystackClient<WsTransport>),
55}
56
57impl ConnectorClient {
58 async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
61 match self {
62 ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
63 ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
64 }
65 }
66
67 async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
69 match self {
70 ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
71 ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
72 }
73 }
74
75 async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
77 match self {
78 ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
79 ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
80 }
81 }
82
83 async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
85 match self {
86 ConnectorClient::Http(c) => c
87 .point_write(id, level, val)
88 .await
89 .map_err(|e| e.to_string()),
90 ConnectorClient::Ws(c) => c
91 .point_write(id, level, val)
92 .await
93 .map_err(|e| e.to_string()),
94 }
95 }
96
97 async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
99 match self {
100 ConnectorClient::Http(c) => c
101 .invoke_action(id, action, args)
102 .await
103 .map_err(|e| e.to_string()),
104 ConnectorClient::Ws(c) => c
105 .invoke_action(id, action, args)
106 .await
107 .map_err(|e| e.to_string()),
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114#[repr(u8)]
115pub enum TransportMode {
116 Http = 0,
117 WebSocket = 1,
118}
119
120#[derive(Debug, Clone, serde::Deserialize)]
122pub struct ConnectorConfig {
123 pub name: String,
125 pub url: String,
127 pub username: String,
129 pub password: String,
131 pub id_prefix: Option<String>,
133 pub ws_url: Option<String>,
135 pub sync_interval_secs: Option<u64>,
137 pub client_cert: Option<String>,
139 pub client_key: Option<String>,
141 pub ca_cert: Option<String>,
143}
144
145struct CacheState {
151 entities: Vec<HDict>,
152 owned_ids: HashSet<String>,
153 tag_index: TagBitmapIndex,
154 id_map: HashMap<String, usize>,
155}
156
157impl CacheState {
158 fn empty() -> Self {
159 Self {
160 entities: Vec::new(),
161 owned_ids: HashSet::new(),
162 tag_index: TagBitmapIndex::new(),
163 id_map: HashMap::new(),
164 }
165 }
166
167 fn build(entities: Vec<HDict>) -> Self {
169 let mut owned_ids = HashSet::with_capacity(entities.len());
170 let mut tag_index = TagBitmapIndex::new();
171 let mut id_map = HashMap::with_capacity(entities.len());
172
173 for (eid, entity) in entities.iter().enumerate() {
174 if let Some(Kind::Ref(r)) = entity.get("id") {
175 owned_ids.insert(r.val.clone());
176 id_map.insert(r.val.clone(), eid);
177 }
178 let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
179 tag_index.add(eid, &tags);
180 }
181
182 Self {
183 entities,
184 owned_ids,
185 tag_index,
186 id_map,
187 }
188 }
189
190 fn rebuild_index(&mut self) {
192 self.owned_ids.clear();
193 self.tag_index = TagBitmapIndex::new();
194 self.id_map.clear();
195
196 for (eid, entity) in self.entities.iter().enumerate() {
197 if let Some(Kind::Ref(r)) = entity.get("id") {
198 self.owned_ids.insert(r.val.clone());
199 self.id_map.insert(r.val.clone(), eid);
200 }
201 let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
202 self.tag_index.add(eid, &tags);
203 }
204 }
205}
206
207pub struct Connector {
210 pub config: ConnectorConfig,
211 cache_state: RwLock<CacheState>,
214 remote_watch_ids: RwLock<HashSet<String>>,
216 transport_mode: AtomicU8,
218 connected: AtomicBool,
220 last_sync: RwLock<Option<DateTime<Utc>>>,
222 last_remote_version: RwLock<Option<u64>>,
224 client: tokio::sync::RwLock<Option<ConnectorClient>>,
228 current_interval_secs: AtomicU64,
230 last_entity_count: AtomicU64,
232}
233
234impl Connector {
235 pub fn new(config: ConnectorConfig) -> Self {
240 let base_interval = config.effective_sync_interval_secs();
241 Self {
242 config,
243 cache_state: RwLock::new(CacheState::empty()),
244 remote_watch_ids: RwLock::new(HashSet::new()),
245 transport_mode: AtomicU8::new(TransportMode::Http as u8),
246 connected: AtomicBool::new(false),
247 last_sync: RwLock::new(None),
248 last_remote_version: RwLock::new(None),
249 client: tokio::sync::RwLock::new(None),
250 current_interval_secs: AtomicU64::new(base_interval),
251 last_entity_count: AtomicU64::new(0),
252 }
253 }
254
255 async fn connect_persistent(&self) -> Result<(), String> {
258 let ws_url = self.config.effective_ws_url();
260 match HaystackClient::connect_ws(
261 &self.config.url,
262 &ws_url,
263 &self.config.username,
264 &self.config.password,
265 )
266 .await
267 {
268 Ok(ws_client) => {
269 *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
270 self.transport_mode
271 .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
272 self.connected.store(true, Ordering::Relaxed);
273 log::info!("Connected to {} via WebSocket", self.config.name);
274 return Ok(());
275 }
276 Err(e) => {
277 log::warn!(
278 "WS connection to {} failed: {}, trying HTTP",
279 self.config.name,
280 e
281 );
282 }
283 }
284
285 match HaystackClient::connect(
287 &self.config.url,
288 &self.config.username,
289 &self.config.password,
290 )
291 .await
292 {
293 Ok(http_client) => {
294 *self.client.write().await = Some(ConnectorClient::Http(http_client));
295 self.transport_mode
296 .store(TransportMode::Http as u8, Ordering::Relaxed);
297 self.connected.store(true, Ordering::Relaxed);
298 log::info!("Connected to {} via HTTP", self.config.name);
299 Ok(())
300 }
301 Err(e) => {
302 self.connected.store(false, Ordering::Relaxed);
303 Err(format!("connection failed: {e}"))
304 }
305 }
306 }
307
308 pub async fn sync(&self) -> Result<usize, String> {
316 if self.client.read().await.is_none() {
318 self.connect_persistent().await?;
319 }
320
321 let maybe_ver = *self.last_remote_version.read();
323 if let Some(last_ver) = maybe_ver {
324 match self.try_delta_sync(last_ver).await {
325 Ok(count) => return Ok(count),
326 Err(e) => {
327 log::debug!(
328 "Delta sync failed for {}, falling back to full: {e}",
329 self.config.name,
330 );
331 }
332 }
333 }
334
335 self.full_sync().await
337 }
338
339 async fn full_sync(&self) -> Result<usize, String> {
341 let grid = {
342 let client = self.client.read().await;
343 let client = client.as_ref().ok_or("not connected")?;
344 client.call("read", &build_read_all_grid()).await
345 };
346
347 let grid = grid.map_err(|e| {
348 self.connected.store(false, Ordering::Relaxed);
349 format!("read failed: {e}")
350 })?;
351
352 let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
353
354 if let Some(ref prefix) = self.config.id_prefix {
356 for entity in &mut entities {
357 prefix_refs(entity, prefix);
358 }
359 }
360
361 let count = entities.len();
362 self.update_cache(entities);
363 *self.last_sync.write() = Some(Utc::now());
364 self.connected.store(true, Ordering::Relaxed);
365
366 self.probe_remote_version().await;
368
369 Ok(count)
370 }
371
372 async fn probe_remote_version(&self) {
377 let grid = {
378 let client = self.client.read().await;
379 let Some(client) = client.as_ref() else {
380 return;
381 };
382 client.call("changes", &build_changes_grid(u64::MAX)).await
383 };
384 if let Ok(grid) = grid
385 && let Some(Kind::Number(n)) = grid.meta.get("curVer")
386 {
387 *self.last_remote_version.write() = Some(n.val as u64);
388 }
389 }
390
391 async fn try_delta_sync(&self, since_version: u64) -> Result<usize, String> {
397 let changes_grid = build_changes_grid(since_version);
398 let grid = {
399 let client = self.client.read().await;
400 let client = client.as_ref().ok_or("not connected")?;
401 client.call("changes", &changes_grid).await
402 }
403 .map_err(|e| format!("changes op failed: {e}"))?;
404
405 if grid.is_err() {
407 return Err("remote returned error grid for changes op".to_string());
408 }
409
410 let cur_ver = grid
412 .meta
413 .get("curVer")
414 .and_then(|k| {
415 if let Kind::Number(n) = k {
416 Some(n.val as u64)
417 } else {
418 None
419 }
420 })
421 .ok_or("changes response missing curVer in meta")?;
422
423 if grid.rows.is_empty() {
425 *self.last_remote_version.write() = Some(cur_ver);
426 *self.last_sync.write() = Some(Utc::now());
427 return Ok(self.cache_state.read().entities.len());
428 }
429
430 let mut state = self.cache_state.write();
432 let prefix = self.config.id_prefix.as_deref();
433
434 for row in &grid.rows {
435 let op = match row.get("op") {
436 Some(Kind::Str(s)) => s.as_str(),
437 _ => continue,
438 };
439 let ref_val = match row.get("ref") {
440 Some(Kind::Str(s)) => s.clone(),
441 _ => continue,
442 };
443
444 match op {
445 "add" | "update" => {
446 if let Some(Kind::Dict(entity_box)) = row.get("entity") {
447 let mut entity: HDict = (**entity_box).clone();
448 if let Some(pfx) = prefix {
449 prefix_refs(&mut entity, pfx);
450 }
451 let entity_id = entity.get("id").and_then(|k| {
452 if let Kind::Ref(r) = k {
453 Some(r.val.clone())
454 } else {
455 None
456 }
457 });
458
459 if let Some(ref eid) = entity_id {
460 if let Some(&idx) = state.id_map.get(eid.as_str()) {
461 state.entities[idx] = entity;
463 } else {
464 let idx = state.entities.len();
466 state.id_map.insert(eid.clone(), idx);
467 state.entities.push(entity);
468 }
469 }
470 }
471 }
472 "remove" => {
473 let prefixed_ref = match prefix {
474 Some(pfx) => format!("{pfx}{ref_val}"),
475 None => ref_val,
476 };
477 if let Some(&idx) = state.id_map.get(prefixed_ref.as_str()) {
478 let last_idx = state.entities.len() - 1;
480 if idx != last_idx {
481 let last_id = state.entities[last_idx].get("id").and_then(|k| {
482 if let Kind::Ref(r) = k {
483 Some(r.val.clone())
484 } else {
485 None
486 }
487 });
488 state.entities.swap(idx, last_idx);
489 if let Some(lid) = last_id {
490 state.id_map.insert(lid, idx);
491 }
492 }
493 state.entities.pop();
494 state.id_map.remove(prefixed_ref.as_str());
495 }
496 }
497 _ => {}
498 }
499 }
500
501 state.rebuild_index();
503
504 let count = state.entities.len();
505 drop(state);
506
507 *self.last_remote_version.write() = Some(cur_ver);
508 *self.last_sync.write() = Some(Utc::now());
509 self.connected.store(true, Ordering::Relaxed);
510 Ok(count)
511 }
512
513 pub fn update_cache(&self, entities: Vec<HDict>) {
519 *self.cache_state.write() = CacheState::build(entities);
520 }
521
522 pub fn owns(&self, id: &str) -> bool {
524 self.cache_state.read().owned_ids.contains(id)
525 }
526
527 pub fn get_cached_entity(&self, id: &str) -> Option<HDict> {
529 let state = self.cache_state.read();
530 state
531 .id_map
532 .get(id)
533 .and_then(|&idx| state.entities.get(idx))
534 .cloned()
535 }
536
537 pub fn batch_get_cached(&self, ids: &[&str]) -> (Vec<HDict>, Vec<String>) {
540 let state = self.cache_state.read();
541 let mut found = Vec::with_capacity(ids.len());
542 let mut missing = Vec::new();
543 for &id in ids {
544 if let Some(&idx) = state.id_map.get(id) {
545 if let Some(entity) = state.entities.get(idx) {
546 found.push(entity.clone());
547 } else {
548 missing.push(id.to_string());
549 }
550 } else {
551 missing.push(id.to_string());
552 }
553 }
554 (found, missing)
555 }
556
557 pub fn cached_entities(&self) -> Vec<HDict> {
559 self.cache_state.read().entities.clone()
560 }
561
562 pub fn entity_count(&self) -> usize {
564 self.cache_state.read().entities.len()
565 }
566
567 pub fn filter_cached(&self, filter_expr: &str, limit: usize) -> Result<Vec<HDict>, String> {
572 let effective_limit = if limit == 0 { usize::MAX } else { limit };
573
574 let ast = parse_filter(filter_expr).map_err(|e| format!("filter error: {e}"))?;
575
576 let state = self.cache_state.read();
577 let max_id = state.entities.len();
578
579 let candidates = query_planner::bitmap_candidates(&ast, &state.tag_index, max_id);
580
581 let mut results = Vec::new();
582
583 if let Some(ref bitmap) = candidates {
584 for eid in TagBitmapIndex::iter_set_bits(bitmap) {
585 if results.len() >= effective_limit {
586 break;
587 }
588 if let Some(entity) = state.entities.get(eid)
589 && matches(&ast, entity, None)
590 {
591 results.push(entity.clone());
592 }
593 }
594 } else {
595 for entity in state.entities.iter() {
596 if results.len() >= effective_limit {
597 break;
598 }
599 if matches(&ast, entity, None) {
600 results.push(entity.clone());
601 }
602 }
603 }
604
605 Ok(results)
606 }
607
608 pub fn add_remote_watch(&self, prefixed_id: &str) {
610 self.remote_watch_ids
611 .write()
612 .insert(prefixed_id.to_string());
613 }
614
615 pub fn remove_remote_watch(&self, prefixed_id: &str) {
617 self.remote_watch_ids.write().remove(prefixed_id);
618 }
619
620 pub fn remote_watch_count(&self) -> usize {
622 self.remote_watch_ids.read().len()
623 }
624
625 pub fn transport_mode(&self) -> TransportMode {
627 match self.transport_mode.load(Ordering::Relaxed) {
628 1 => TransportMode::WebSocket,
629 _ => TransportMode::Http,
630 }
631 }
632
633 pub fn is_connected(&self) -> bool {
635 self.connected.load(Ordering::Relaxed)
636 }
637
638 pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
640 *self.last_sync.read()
641 }
642
643 fn strip_id(&self, prefixed_id: &str) -> String {
645 if let Some(ref prefix) = self.config.id_prefix {
646 prefixed_id
647 .strip_prefix(prefix.as_str())
648 .unwrap_or(prefixed_id)
649 .to_string()
650 } else {
651 prefixed_id.to_string()
652 }
653 }
654
655 async fn ensure_connected(&self) -> Result<(), String> {
658 if self.client.read().await.is_none() {
659 self.connect_persistent().await?;
660 }
661 Ok(())
662 }
663
664 async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
667 *self.client.write().await = None;
668 self.connected.store(false, Ordering::Relaxed);
669 format!("{op_name} failed: {e}")
670 }
671
672 pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
677 self.ensure_connected().await?;
678 let id = self.strip_id(prefixed_id);
679 let guard = self.client.read().await;
680 let client = guard.as_ref().ok_or("not connected")?;
681 match client.his_read(&id, range).await {
682 Ok(grid) => Ok(grid),
683 Err(e) => {
684 drop(guard);
685 Err(self.on_proxy_error("hisRead", e).await)
686 }
687 }
688 }
689
690 pub async fn proxy_point_write(
695 &self,
696 prefixed_id: &str,
697 level: u8,
698 val: &Kind,
699 ) -> Result<HGrid, String> {
700 self.ensure_connected().await?;
701 let id = self.strip_id(prefixed_id);
702 let val = val.clone();
703 let guard = self.client.read().await;
704 let client = guard.as_ref().ok_or("not connected")?;
705 match client.point_write(&id, level, val).await {
706 Ok(grid) => Ok(grid),
707 Err(e) => {
708 drop(guard);
709 Err(self.on_proxy_error("pointWrite", e).await)
710 }
711 }
712 }
713
714 pub async fn proxy_his_write(
719 &self,
720 prefixed_id: &str,
721 items: Vec<HDict>,
722 ) -> Result<HGrid, String> {
723 self.ensure_connected().await?;
724 let id = self.strip_id(prefixed_id);
725 let guard = self.client.read().await;
726 let client = guard.as_ref().ok_or("not connected")?;
727 match client.his_write(&id, items).await {
728 Ok(grid) => Ok(grid),
729 Err(e) => {
730 drop(guard);
731 Err(self.on_proxy_error("hisWrite", e).await)
732 }
733 }
734 }
735
736 pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
741 self.ensure_connected().await?;
742
743 let mut stripped = entity.clone();
744 if let Some(ref prefix) = self.config.id_prefix {
745 strip_prefix_refs(&mut stripped, prefix);
746 }
747
748 let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
749 let cols: Vec<haystack_core::data::HCol> = col_names
750 .iter()
751 .map(|n| haystack_core::data::HCol::new(n.as_str()))
752 .collect();
753 let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
754
755 let guard = self.client.read().await;
756 let client = guard.as_ref().ok_or("not connected")?;
757 match client.call("import", &grid).await {
758 Ok(grid) => Ok(grid),
759 Err(e) => {
760 drop(guard);
761 Err(self.on_proxy_error("import", e).await)
762 }
763 }
764 }
765
766 pub async fn proxy_invoke_action(
771 &self,
772 prefixed_id: &str,
773 action: &str,
774 args: HDict,
775 ) -> Result<HGrid, String> {
776 self.ensure_connected().await?;
777 let id = self.strip_id(prefixed_id);
778 let action = action.to_string();
779 let guard = self.client.read().await;
780 let client = guard.as_ref().ok_or("not connected")?;
781 match client.invoke_action(&id, &action, args).await {
782 Ok(grid) => Ok(grid),
783 Err(e) => {
784 drop(guard);
785 Err(self.on_proxy_error("invokeAction", e).await)
786 }
787 }
788 }
789
790 pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
796 let base_interval = connector.config.effective_sync_interval_secs();
797 let min_interval = base_interval / 2;
798 let max_interval = base_interval * 5;
799
800 tokio::spawn(async move {
801 loop {
802 let prev_count = connector.last_entity_count.load(Ordering::Relaxed);
803
804 match connector.sync().await {
805 Ok(count) => {
806 log::debug!("Synced {} entities from {}", count, connector.config.name);
807
808 let current = connector.current_interval_secs.load(Ordering::Relaxed);
810 let new_interval = if count as u64 == prev_count && prev_count > 0 {
811 (current + current / 2).min(max_interval)
813 } else {
814 base_interval
816 };
817 connector
818 .current_interval_secs
819 .store(new_interval, Ordering::Relaxed);
820 connector
821 .last_entity_count
822 .store(count as u64, Ordering::Relaxed);
823 }
824 Err(e) => {
825 log::error!("Sync failed for {}: {}", connector.config.name, e);
826 *connector.client.write().await = None;
828 connector.connected.store(false, Ordering::Relaxed);
829 connector
831 .current_interval_secs
832 .store(base_interval, Ordering::Relaxed);
833 }
834 }
835
836 let sleep_secs = connector
837 .current_interval_secs
838 .load(Ordering::Relaxed)
839 .max(min_interval);
840 tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
841 }
842 })
843 }
844}
845
846fn build_read_all_grid() -> HGrid {
848 use haystack_core::data::HCol;
849 let mut row = HDict::new();
850 row.set("filter", Kind::Str("*".to_string()));
851 HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
852}
853
854fn build_changes_grid(since_version: u64) -> HGrid {
856 use haystack_core::data::HCol;
857 use haystack_core::kinds::Number;
858 let mut row = HDict::new();
859 row.set(
860 "version",
861 Kind::Number(Number::unitless(since_version as f64)),
862 );
863 HGrid::from_parts(HDict::new(), vec![HCol::new("version")], vec![row])
864}
865
866impl ConnectorConfig {
867 pub fn effective_ws_url(&self) -> String {
871 if let Some(ref ws) = self.ws_url {
872 return ws.clone();
873 }
874 let ws = if self.url.starts_with("https://") {
875 self.url.replacen("https://", "wss://", 1)
876 } else {
877 self.url.replacen("http://", "ws://", 1)
878 };
879 format!("{ws}/ws")
880 }
881
882 pub fn effective_sync_interval_secs(&self) -> u64 {
884 self.sync_interval_secs.unwrap_or(60)
885 }
886}
887
888pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
893 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
894
895 for name in &tag_names {
896 let should_prefix = name == "id" || name.ends_with("Ref");
897 if !should_prefix {
898 continue;
899 }
900
901 if let Some(Kind::Ref(r)) = entity.get(name) {
902 let new_val = format!("{}{}", prefix, r.val);
903 let new_ref = HRef::new(new_val, r.dis.clone());
904 entity.set(name.as_str(), Kind::Ref(new_ref));
905 }
906 }
907}
908
909pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
915 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
916
917 for name in &tag_names {
918 let should_strip = name == "id" || name.ends_with("Ref");
919 if !should_strip {
920 continue;
921 }
922
923 if let Some(Kind::Ref(r)) = entity.get(name)
924 && let Some(stripped) = r.val.strip_prefix(prefix)
925 {
926 let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
927 entity.set(name.as_str(), Kind::Ref(new_ref));
928 }
929 }
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935 use haystack_core::kinds::HRef;
936
937 #[test]
938 fn connector_new_empty_cache() {
939 let config = ConnectorConfig {
940 name: "test".to_string(),
941 url: "http://localhost:8080/api".to_string(),
942 username: "user".to_string(),
943 password: "pass".to_string(),
944 id_prefix: None,
945 ws_url: None,
946 sync_interval_secs: None,
947 client_cert: None,
948 client_key: None,
949 ca_cert: None,
950 };
951 let connector = Connector::new(config);
952 assert_eq!(connector.entity_count(), 0);
953 assert!(connector.cached_entities().is_empty());
954 }
955
956 #[test]
957 fn connector_config_deserialization() {
958 let json = r#"{
959 "name": "Remote Server",
960 "url": "http://remote:8080/api",
961 "username": "admin",
962 "password": "secret",
963 "id_prefix": "r1-"
964 }"#;
965 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
966 assert_eq!(config.name, "Remote Server");
967 assert_eq!(config.url, "http://remote:8080/api");
968 assert_eq!(config.username, "admin");
969 assert_eq!(config.password, "secret");
970 assert_eq!(config.id_prefix, Some("r1-".to_string()));
971 }
972
973 #[test]
974 fn connector_config_deserialization_without_prefix() {
975 let json = r#"{
976 "name": "Remote",
977 "url": "http://remote:8080/api",
978 "username": "admin",
979 "password": "secret"
980 }"#;
981 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
982 assert_eq!(config.id_prefix, None);
983 }
984
985 #[test]
986 fn id_prefix_application() {
987 let mut entity = HDict::new();
988 entity.set("id", Kind::Ref(HRef::from_val("site-1")));
989 entity.set("dis", Kind::Str("Main Site".to_string()));
990 entity.set("site", Kind::Marker);
991 entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
992 entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
993 entity.set(
994 "floorRef",
995 Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
996 );
997
998 prefix_refs(&mut entity, "r1-");
999
1000 match entity.get("id") {
1002 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1003 other => panic!("expected Ref, got {other:?}"),
1004 }
1005
1006 match entity.get("siteRef") {
1008 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1009 other => panic!("expected Ref, got {other:?}"),
1010 }
1011
1012 match entity.get("equipRef") {
1014 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
1015 other => panic!("expected Ref, got {other:?}"),
1016 }
1017
1018 match entity.get("floorRef") {
1020 Some(Kind::Ref(r)) => {
1021 assert_eq!(r.val, "r1-floor-1");
1022 assert_eq!(r.dis, Some("Floor 1".to_string()));
1023 }
1024 other => panic!("expected Ref, got {other:?}"),
1025 }
1026
1027 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1029 assert_eq!(entity.get("site"), Some(&Kind::Marker));
1030 }
1031
1032 #[test]
1033 fn id_prefix_skips_non_ref_values() {
1034 let mut entity = HDict::new();
1035 entity.set("id", Kind::Ref(HRef::from_val("point-1")));
1036 entity.set("customRef", Kind::Str("not-a-ref".to_string()));
1038
1039 prefix_refs(&mut entity, "p-");
1040
1041 match entity.get("id") {
1043 Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
1044 other => panic!("expected Ref, got {other:?}"),
1045 }
1046
1047 assert_eq!(
1049 entity.get("customRef"),
1050 Some(&Kind::Str("not-a-ref".to_string()))
1051 );
1052 }
1053
1054 #[test]
1055 fn connector_config_deserialization_full() {
1056 let json = r#"{
1057 "name": "Full Config",
1058 "url": "https://remote:8443/api",
1059 "username": "admin",
1060 "password": "secret",
1061 "id_prefix": "r1-",
1062 "ws_url": "wss://remote:8443/api/ws",
1063 "sync_interval_secs": 30,
1064 "client_cert": "/etc/certs/client.pem",
1065 "client_key": "/etc/certs/client-key.pem",
1066 "ca_cert": "/etc/certs/ca.pem"
1067 }"#;
1068 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1069 assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
1070 assert_eq!(config.sync_interval_secs, Some(30));
1071 assert_eq!(
1072 config.client_cert,
1073 Some("/etc/certs/client.pem".to_string())
1074 );
1075 assert_eq!(
1076 config.client_key,
1077 Some("/etc/certs/client-key.pem".to_string())
1078 );
1079 assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
1080 }
1081
1082 #[test]
1083 fn strip_prefix_refs_reverses_prefix() {
1084 let mut entity = HDict::new();
1085 entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
1086 entity.set("dis", Kind::Str("Main Site".to_string()));
1087 entity.set("site", Kind::Marker);
1088 entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
1089 entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
1090
1091 strip_prefix_refs(&mut entity, "r1-");
1092
1093 match entity.get("id") {
1094 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1095 other => panic!("expected Ref, got {other:?}"),
1096 }
1097 match entity.get("siteRef") {
1098 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1099 other => panic!("expected Ref, got {other:?}"),
1100 }
1101 match entity.get("equipRef") {
1102 Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
1103 other => panic!("expected Ref, got {other:?}"),
1104 }
1105 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1106 }
1107
1108 #[test]
1109 fn strip_prefix_refs_ignores_non_matching() {
1110 let mut entity = HDict::new();
1111 entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
1112
1113 strip_prefix_refs(&mut entity, "r1-");
1114
1115 match entity.get("id") {
1116 Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
1117 other => panic!("expected Ref, got {other:?}"),
1118 }
1119 }
1120
1121 #[test]
1122 fn derive_ws_url_from_http() {
1123 let config = ConnectorConfig {
1124 name: "test".to_string(),
1125 url: "http://remote:8080/api".to_string(),
1126 username: "u".to_string(),
1127 password: "p".to_string(),
1128 id_prefix: None,
1129 ws_url: None,
1130 sync_interval_secs: None,
1131 client_cert: None,
1132 client_key: None,
1133 ca_cert: None,
1134 };
1135 assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
1136 }
1137
1138 #[test]
1139 fn derive_ws_url_from_https() {
1140 let config = ConnectorConfig {
1141 name: "test".to_string(),
1142 url: "https://remote:8443/api".to_string(),
1143 username: "u".to_string(),
1144 password: "p".to_string(),
1145 id_prefix: None,
1146 ws_url: None,
1147 sync_interval_secs: None,
1148 client_cert: None,
1149 client_key: None,
1150 ca_cert: None,
1151 };
1152 assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
1153 }
1154
1155 #[test]
1156 fn explicit_ws_url_overrides_derived() {
1157 let config = ConnectorConfig {
1158 name: "test".to_string(),
1159 url: "http://remote:8080/api".to_string(),
1160 username: "u".to_string(),
1161 password: "p".to_string(),
1162 id_prefix: None,
1163 ws_url: Some("ws://custom:9999/ws".to_string()),
1164 sync_interval_secs: None,
1165 client_cert: None,
1166 client_key: None,
1167 ca_cert: None,
1168 };
1169 assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
1170 }
1171
1172 #[test]
1173 fn connector_tracks_entity_ids_in_ownership() {
1174 let config = ConnectorConfig {
1175 name: "test".to_string(),
1176 url: "http://localhost:8080/api".to_string(),
1177 username: "user".to_string(),
1178 password: "pass".to_string(),
1179 id_prefix: Some("t-".to_string()),
1180 ws_url: None,
1181 sync_interval_secs: None,
1182 client_cert: None,
1183 client_key: None,
1184 ca_cert: None,
1185 };
1186 let connector = Connector::new(config);
1187 assert!(!connector.owns("t-site-1"));
1188
1189 {
1191 let mut entity = HDict::new();
1192 entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
1193 connector.update_cache(vec![entity]);
1194 }
1195
1196 assert!(connector.owns("t-site-1"));
1197 assert!(!connector.owns("other-1"));
1198 }
1199
1200 #[test]
1201 fn connector_new_defaults_transport_and_connected() {
1202 let config = ConnectorConfig {
1203 name: "test".to_string(),
1204 url: "http://localhost:8080/api".to_string(),
1205 username: "user".to_string(),
1206 password: "pass".to_string(),
1207 id_prefix: None,
1208 ws_url: None,
1209 sync_interval_secs: None,
1210 client_cert: None,
1211 client_key: None,
1212 ca_cert: None,
1213 };
1214 let connector = Connector::new(config);
1215 assert_eq!(connector.transport_mode(), TransportMode::Http);
1216 assert!(!connector.is_connected());
1217 assert!(connector.last_sync_time().is_none());
1218 }
1219
1220 #[test]
1221 fn connector_config_new_fields_default_to_none() {
1222 let json = r#"{
1223 "name": "Minimal",
1224 "url": "http://remote:8080/api",
1225 "username": "user",
1226 "password": "pass"
1227 }"#;
1228 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1229 assert_eq!(config.ws_url, None);
1230 assert_eq!(config.sync_interval_secs, None);
1231 assert_eq!(config.client_cert, None);
1232 assert_eq!(config.client_key, None);
1233 assert_eq!(config.ca_cert, None);
1234 }
1235
1236 #[test]
1237 fn remote_watch_add_and_remove() {
1238 let config = ConnectorConfig {
1239 name: "test".to_string(),
1240 url: "http://localhost:8080/api".to_string(),
1241 username: "user".to_string(),
1242 password: "pass".to_string(),
1243 id_prefix: Some("r-".to_string()),
1244 ws_url: None,
1245 sync_interval_secs: None,
1246 client_cert: None,
1247 client_key: None,
1248 ca_cert: None,
1249 };
1250 let connector = Connector::new(config);
1251 assert_eq!(connector.remote_watch_count(), 0);
1252
1253 connector.add_remote_watch("r-site-1");
1254 assert_eq!(connector.remote_watch_count(), 1);
1255
1256 connector.add_remote_watch("r-equip-2");
1257 assert_eq!(connector.remote_watch_count(), 2);
1258
1259 connector.add_remote_watch("r-site-1");
1261 assert_eq!(connector.remote_watch_count(), 2);
1262
1263 connector.remove_remote_watch("r-site-1");
1264 assert_eq!(connector.remote_watch_count(), 1);
1265
1266 connector.remove_remote_watch("r-nonexistent");
1268 assert_eq!(connector.remote_watch_count(), 1);
1269
1270 connector.remove_remote_watch("r-equip-2");
1271 assert_eq!(connector.remote_watch_count(), 0);
1272 }
1273
1274 fn make_test_entities() -> Vec<HDict> {
1275 let mut site = HDict::new();
1276 site.set("id", Kind::Ref(HRef::from_val("site-1")));
1277 site.set("site", Kind::Marker);
1278 site.set("dis", Kind::Str("Main Site".into()));
1279
1280 let mut equip = HDict::new();
1281 equip.set("id", Kind::Ref(HRef::from_val("equip-1")));
1282 equip.set("equip", Kind::Marker);
1283 equip.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1284
1285 let mut point = HDict::new();
1286 point.set("id", Kind::Ref(HRef::from_val("point-1")));
1287 point.set("point", Kind::Marker);
1288 point.set("sensor", Kind::Marker);
1289 point.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1290
1291 vec![site, equip, point]
1292 }
1293
1294 #[test]
1295 fn filter_cached_returns_matching_entities() {
1296 let config = ConnectorConfig {
1297 name: "test".to_string(),
1298 url: "http://localhost:8080/api".to_string(),
1299 username: "user".to_string(),
1300 password: "pass".to_string(),
1301 id_prefix: None,
1302 ws_url: None,
1303 sync_interval_secs: None,
1304 client_cert: None,
1305 client_key: None,
1306 ca_cert: None,
1307 };
1308 let connector = Connector::new(config);
1309 connector.update_cache(make_test_entities());
1310
1311 let results = connector.filter_cached("site", 0).unwrap();
1313 assert_eq!(results.len(), 1);
1314 assert_eq!(
1315 results[0].get("id"),
1316 Some(&Kind::Ref(HRef::from_val("site-1")))
1317 );
1318
1319 let results = connector.filter_cached("equip", 0).unwrap();
1321 assert_eq!(results.len(), 1);
1322
1323 let results = connector.filter_cached("point and sensor", 0).unwrap();
1325 assert_eq!(results.len(), 1);
1326
1327 let results = connector.filter_cached("ahu", 0).unwrap();
1329 assert!(results.is_empty());
1330 }
1331
1332 #[test]
1333 fn filter_cached_respects_limit() {
1334 let config = ConnectorConfig {
1335 name: "test".to_string(),
1336 url: "http://localhost:8080/api".to_string(),
1337 username: "user".to_string(),
1338 password: "pass".to_string(),
1339 id_prefix: None,
1340 ws_url: None,
1341 sync_interval_secs: None,
1342 client_cert: None,
1343 client_key: None,
1344 ca_cert: None,
1345 };
1346 let connector = Connector::new(config);
1347
1348 let mut entities = Vec::new();
1350 for i in 0..10 {
1351 let mut p = HDict::new();
1352 p.set("id", Kind::Ref(HRef::from_val(format!("point-{i}"))));
1353 p.set("point", Kind::Marker);
1354 entities.push(p);
1355 }
1356 connector.update_cache(entities);
1357
1358 let results = connector.filter_cached("point", 3).unwrap();
1359 assert_eq!(results.len(), 3);
1360 }
1361
1362 #[test]
1363 fn filter_cached_or_query() {
1364 let config = ConnectorConfig {
1365 name: "test".to_string(),
1366 url: "http://localhost:8080/api".to_string(),
1367 username: "user".to_string(),
1368 password: "pass".to_string(),
1369 id_prefix: None,
1370 ws_url: None,
1371 sync_interval_secs: None,
1372 client_cert: None,
1373 client_key: None,
1374 ca_cert: None,
1375 };
1376 let connector = Connector::new(config);
1377 connector.update_cache(make_test_entities());
1378
1379 let results = connector.filter_cached("site or equip", 0).unwrap();
1381 assert_eq!(results.len(), 2);
1382 }
1383}