1use std::collections::HashMap;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
35
36use chrono::{DateTime, Utc};
37use parking_lot::RwLock;
38
39use haystack_client::HaystackClient;
40use haystack_client::transport::http::HttpTransport;
41use haystack_client::transport::ws::WsTransport;
42use haystack_core::data::{HDict, HGrid};
43use haystack_core::filter::{matches, parse_filter};
44use haystack_core::graph::bitmap::TagBitmapIndex;
45use haystack_core::graph::query_planner;
46use haystack_core::kinds::{HRef, Kind};
47
48enum ConnectorClient {
53 Http(HaystackClient<HttpTransport>),
54 Ws(HaystackClient<WsTransport>),
55}
56
57impl ConnectorClient {
58 async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
61 match self {
62 ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
63 ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
64 }
65 }
66
67 async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
69 match self {
70 ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
71 ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
72 }
73 }
74
75 async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
77 match self {
78 ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
79 ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
80 }
81 }
82
83 async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
85 match self {
86 ConnectorClient::Http(c) => c
87 .point_write(id, level, val)
88 .await
89 .map_err(|e| e.to_string()),
90 ConnectorClient::Ws(c) => c
91 .point_write(id, level, val)
92 .await
93 .map_err(|e| e.to_string()),
94 }
95 }
96
97 async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
99 match self {
100 ConnectorClient::Http(c) => c
101 .invoke_action(id, action, args)
102 .await
103 .map_err(|e| e.to_string()),
104 ConnectorClient::Ws(c) => c
105 .invoke_action(id, action, args)
106 .await
107 .map_err(|e| e.to_string()),
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114#[repr(u8)]
115pub enum TransportMode {
116 Http = 0,
117 WebSocket = 1,
118}
119
120#[derive(Debug, Clone, serde::Deserialize)]
122pub struct ConnectorConfig {
123 pub name: String,
125 pub url: String,
127 pub username: String,
129 pub password: String,
131 pub id_prefix: Option<String>,
133 pub ws_url: Option<String>,
135 pub sync_interval_secs: Option<u64>,
137 pub client_cert: Option<String>,
139 pub client_key: Option<String>,
141 pub ca_cert: Option<String>,
143}
144
145pub struct Connector {
151 pub config: ConnectorConfig,
152 cache: RwLock<Vec<HDict>>,
154 owned_ids: RwLock<HashSet<String>>,
156 cache_index: RwLock<TagBitmapIndex>,
158 cache_id_map: RwLock<HashMap<String, usize>>,
160 remote_watch_ids: RwLock<HashSet<String>>,
162 transport_mode: AtomicU8,
164 connected: AtomicBool,
166 last_sync: RwLock<Option<DateTime<Utc>>>,
168 last_remote_version: RwLock<Option<u64>>,
170 client: tokio::sync::RwLock<Option<ConnectorClient>>,
174 current_interval_secs: AtomicU64,
176 last_entity_count: AtomicU64,
178}
179
180impl Connector {
181 pub fn new(config: ConnectorConfig) -> Self {
186 let base_interval = config.effective_sync_interval_secs();
187 Self {
188 config,
189 cache: RwLock::new(Vec::new()),
190 owned_ids: RwLock::new(HashSet::new()),
191 cache_index: RwLock::new(TagBitmapIndex::new()),
192 cache_id_map: RwLock::new(HashMap::new()),
193 remote_watch_ids: RwLock::new(HashSet::new()),
194 transport_mode: AtomicU8::new(TransportMode::Http as u8),
195 connected: AtomicBool::new(false),
196 last_sync: RwLock::new(None),
197 last_remote_version: RwLock::new(None),
198 client: tokio::sync::RwLock::new(None),
199 current_interval_secs: AtomicU64::new(base_interval),
200 last_entity_count: AtomicU64::new(0),
201 }
202 }
203
204 async fn connect_persistent(&self) -> Result<(), String> {
207 let ws_url = self.config.effective_ws_url();
209 match HaystackClient::connect_ws(
210 &self.config.url,
211 &ws_url,
212 &self.config.username,
213 &self.config.password,
214 )
215 .await
216 {
217 Ok(ws_client) => {
218 *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
219 self.transport_mode
220 .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
221 self.connected.store(true, Ordering::Relaxed);
222 log::info!("Connected to {} via WebSocket", self.config.name);
223 return Ok(());
224 }
225 Err(e) => {
226 log::warn!(
227 "WS connection to {} failed: {}, trying HTTP",
228 self.config.name,
229 e
230 );
231 }
232 }
233
234 match HaystackClient::connect(
236 &self.config.url,
237 &self.config.username,
238 &self.config.password,
239 )
240 .await
241 {
242 Ok(http_client) => {
243 *self.client.write().await = Some(ConnectorClient::Http(http_client));
244 self.transport_mode
245 .store(TransportMode::Http as u8, Ordering::Relaxed);
246 self.connected.store(true, Ordering::Relaxed);
247 log::info!("Connected to {} via HTTP", self.config.name);
248 Ok(())
249 }
250 Err(e) => {
251 self.connected.store(false, Ordering::Relaxed);
252 Err(format!("connection failed: {e}"))
253 }
254 }
255 }
256
257 pub async fn sync(&self) -> Result<usize, String> {
265 if self.client.read().await.is_none() {
267 self.connect_persistent().await?;
268 }
269
270 let maybe_ver = *self.last_remote_version.read();
272 if let Some(last_ver) = maybe_ver {
273 match self.try_delta_sync(last_ver).await {
274 Ok(count) => return Ok(count),
275 Err(e) => {
276 log::debug!(
277 "Delta sync failed for {}, falling back to full: {e}",
278 self.config.name,
279 );
280 }
281 }
282 }
283
284 self.full_sync().await
286 }
287
288 async fn full_sync(&self) -> Result<usize, String> {
290 let grid = {
291 let client = self.client.read().await;
292 let client = client.as_ref().ok_or("not connected")?;
293 client.call("read", &build_read_all_grid()).await
294 };
295
296 let grid = grid.map_err(|e| {
297 self.connected.store(false, Ordering::Relaxed);
298 format!("read failed: {e}")
299 })?;
300
301 let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
302
303 if let Some(ref prefix) = self.config.id_prefix {
305 for entity in &mut entities {
306 prefix_refs(entity, prefix);
307 }
308 }
309
310 let count = entities.len();
311 self.update_cache(entities);
312 *self.last_sync.write() = Some(Utc::now());
313 self.connected.store(true, Ordering::Relaxed);
314
315 self.probe_remote_version().await;
317
318 Ok(count)
319 }
320
321 async fn probe_remote_version(&self) {
326 let grid = {
327 let client = self.client.read().await;
328 let Some(client) = client.as_ref() else {
329 return;
330 };
331 client.call("changes", &build_changes_grid(u64::MAX)).await
332 };
333 if let Ok(grid) = grid {
334 if let Some(Kind::Number(n)) = grid.meta.get("curVer") {
335 *self.last_remote_version.write() = Some(n.val as u64);
336 }
337 }
338 }
339
340 async fn try_delta_sync(&self, since_version: u64) -> Result<usize, String> {
346 let changes_grid = build_changes_grid(since_version);
347 let grid = {
348 let client = self.client.read().await;
349 let client = client.as_ref().ok_or("not connected")?;
350 client.call("changes", &changes_grid).await
351 }
352 .map_err(|e| format!("changes op failed: {e}"))?;
353
354 if grid.is_err() {
356 return Err("remote returned error grid for changes op".to_string());
357 }
358
359 let cur_ver = grid
361 .meta
362 .get("curVer")
363 .and_then(|k| {
364 if let Kind::Number(n) = k {
365 Some(n.val as u64)
366 } else {
367 None
368 }
369 })
370 .ok_or("changes response missing curVer in meta")?;
371
372 if grid.rows.is_empty() {
374 *self.last_remote_version.write() = Some(cur_ver);
375 *self.last_sync.write() = Some(Utc::now());
376 return Ok(self.cache.read().len());
377 }
378
379 let mut cache = self.cache.read().clone();
381 let mut id_to_idx: HashMap<String, usize> = self.cache_id_map.read().clone();
382 let prefix = self.config.id_prefix.as_deref();
383
384 for row in &grid.rows {
385 let op = match row.get("op") {
386 Some(Kind::Str(s)) => s.as_str(),
387 _ => continue,
388 };
389 let ref_val = match row.get("ref") {
390 Some(Kind::Str(s)) => s.clone(),
391 _ => continue,
392 };
393
394 match op {
395 "add" | "update" => {
396 if let Some(Kind::Dict(entity_box)) = row.get("entity") {
397 let mut entity: HDict = (**entity_box).clone();
398 if let Some(pfx) = prefix {
399 prefix_refs(&mut entity, pfx);
400 }
401 let entity_id = entity.get("id").and_then(|k| {
403 if let Kind::Ref(r) = k {
404 Some(r.val.clone())
405 } else {
406 None
407 }
408 });
409
410 if let Some(ref eid) = entity_id {
411 if let Some(&idx) = id_to_idx.get(eid.as_str()) {
412 cache[idx] = entity;
414 } else {
415 let idx = cache.len();
417 id_to_idx.insert(eid.clone(), idx);
418 cache.push(entity);
419 }
420 }
421 }
422 }
423 "remove" => {
424 let prefixed_ref = match prefix {
425 Some(pfx) => format!("{pfx}{ref_val}"),
426 None => ref_val,
427 };
428 if let Some(&idx) = id_to_idx.get(prefixed_ref.as_str()) {
429 let last_idx = cache.len() - 1;
431 if idx != last_idx {
432 let last_id = cache[last_idx].get("id").and_then(|k| {
433 if let Kind::Ref(r) = k {
434 Some(r.val.clone())
435 } else {
436 None
437 }
438 });
439 cache.swap(idx, last_idx);
440 if let Some(lid) = last_id {
441 id_to_idx.insert(lid, idx);
442 }
443 }
444 cache.pop();
445 id_to_idx.remove(prefixed_ref.as_str());
446 }
447 }
448 _ => {}
449 }
450 }
451
452 let count = cache.len();
453 self.update_cache(cache);
454 *self.last_remote_version.write() = Some(cur_ver);
455 *self.last_sync.write() = Some(Utc::now());
456 self.connected.store(true, Ordering::Relaxed);
457 Ok(count)
458 }
459
460 pub fn update_cache(&self, entities: Vec<HDict>) {
466 let mut ids = HashSet::with_capacity(entities.len());
467 let mut tag_index = TagBitmapIndex::new();
468 let mut id_map = HashMap::with_capacity(entities.len());
469
470 for (eid, entity) in entities.iter().enumerate() {
471 if let Some(Kind::Ref(r)) = entity.get("id") {
472 ids.insert(r.val.clone());
473 id_map.insert(r.val.clone(), eid);
474 }
475
476 let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
477 tag_index.add(eid, &tags);
478 }
479
480 *self.cache.write() = entities;
481 *self.owned_ids.write() = ids;
482 *self.cache_index.write() = tag_index;
483 *self.cache_id_map.write() = id_map;
484 }
485
486 pub fn owns(&self, id: &str) -> bool {
488 self.owned_ids.read().contains(id)
489 }
490
491 pub fn get_cached_entity(&self, id: &str) -> Option<HDict> {
493 let id_map = self.cache_id_map.read();
494 let cache = self.cache.read();
495 id_map.get(id).and_then(|&idx| cache.get(idx)).cloned()
496 }
497
498 pub fn batch_get_cached(&self, ids: &[&str]) -> (Vec<HDict>, Vec<String>) {
501 let id_map = self.cache_id_map.read();
502 let cache = self.cache.read();
503 let mut found = Vec::with_capacity(ids.len());
504 let mut missing = Vec::new();
505 for &id in ids {
506 if let Some(&idx) = id_map.get(id) {
507 if let Some(entity) = cache.get(idx) {
508 found.push(entity.clone());
509 } else {
510 missing.push(id.to_string());
511 }
512 } else {
513 missing.push(id.to_string());
514 }
515 }
516 (found, missing)
517 }
518
519 pub fn cached_entities(&self) -> Vec<HDict> {
521 self.cache.read().clone()
522 }
523
524 pub fn entity_count(&self) -> usize {
526 self.cache.read().len()
527 }
528
529 pub fn filter_cached(&self, filter_expr: &str, limit: usize) -> Result<Vec<HDict>, String> {
534 let effective_limit = if limit == 0 { usize::MAX } else { limit };
535
536 let ast = parse_filter(filter_expr).map_err(|e| format!("filter error: {e}"))?;
537
538 let cache = self.cache.read();
539 let tag_index = self.cache_index.read();
540 let max_id = cache.len();
541
542 let candidates = query_planner::bitmap_candidates(&ast, &tag_index, max_id);
543
544 let mut results = Vec::new();
545
546 if let Some(ref bitmap) = candidates {
547 for eid in TagBitmapIndex::iter_set_bits(bitmap) {
548 if results.len() >= effective_limit {
549 break;
550 }
551 if let Some(entity) = cache.get(eid) {
552 if matches(&ast, entity, None) {
553 results.push(entity.clone());
554 }
555 }
556 }
557 } else {
558 for entity in cache.iter() {
559 if results.len() >= effective_limit {
560 break;
561 }
562 if matches(&ast, entity, None) {
563 results.push(entity.clone());
564 }
565 }
566 }
567
568 Ok(results)
569 }
570
571 pub fn add_remote_watch(&self, prefixed_id: &str) {
573 self.remote_watch_ids
574 .write()
575 .insert(prefixed_id.to_string());
576 }
577
578 pub fn remove_remote_watch(&self, prefixed_id: &str) {
580 self.remote_watch_ids.write().remove(prefixed_id);
581 }
582
583 pub fn remote_watch_count(&self) -> usize {
585 self.remote_watch_ids.read().len()
586 }
587
588 pub fn transport_mode(&self) -> TransportMode {
590 match self.transport_mode.load(Ordering::Relaxed) {
591 1 => TransportMode::WebSocket,
592 _ => TransportMode::Http,
593 }
594 }
595
596 pub fn is_connected(&self) -> bool {
598 self.connected.load(Ordering::Relaxed)
599 }
600
601 pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
603 *self.last_sync.read()
604 }
605
606 fn strip_id(&self, prefixed_id: &str) -> String {
608 if let Some(ref prefix) = self.config.id_prefix {
609 prefixed_id
610 .strip_prefix(prefix.as_str())
611 .unwrap_or(prefixed_id)
612 .to_string()
613 } else {
614 prefixed_id.to_string()
615 }
616 }
617
618 async fn ensure_connected(&self) -> Result<(), String> {
621 if self.client.read().await.is_none() {
622 self.connect_persistent().await?;
623 }
624 Ok(())
625 }
626
627 async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
630 *self.client.write().await = None;
631 self.connected.store(false, Ordering::Relaxed);
632 format!("{op_name} failed: {e}")
633 }
634
635 pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
640 self.ensure_connected().await?;
641 let id = self.strip_id(prefixed_id);
642 let guard = self.client.read().await;
643 let client = guard.as_ref().ok_or("not connected")?;
644 match client.his_read(&id, range).await {
645 Ok(grid) => Ok(grid),
646 Err(e) => {
647 drop(guard);
648 Err(self.on_proxy_error("hisRead", e).await)
649 }
650 }
651 }
652
653 pub async fn proxy_point_write(
658 &self,
659 prefixed_id: &str,
660 level: u8,
661 val: &Kind,
662 ) -> Result<HGrid, String> {
663 self.ensure_connected().await?;
664 let id = self.strip_id(prefixed_id);
665 let val = val.clone();
666 let guard = self.client.read().await;
667 let client = guard.as_ref().ok_or("not connected")?;
668 match client.point_write(&id, level, val).await {
669 Ok(grid) => Ok(grid),
670 Err(e) => {
671 drop(guard);
672 Err(self.on_proxy_error("pointWrite", e).await)
673 }
674 }
675 }
676
677 pub async fn proxy_his_write(
682 &self,
683 prefixed_id: &str,
684 items: Vec<HDict>,
685 ) -> Result<HGrid, String> {
686 self.ensure_connected().await?;
687 let id = self.strip_id(prefixed_id);
688 let guard = self.client.read().await;
689 let client = guard.as_ref().ok_or("not connected")?;
690 match client.his_write(&id, items).await {
691 Ok(grid) => Ok(grid),
692 Err(e) => {
693 drop(guard);
694 Err(self.on_proxy_error("hisWrite", e).await)
695 }
696 }
697 }
698
699 pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
704 self.ensure_connected().await?;
705
706 let mut stripped = entity.clone();
707 if let Some(ref prefix) = self.config.id_prefix {
708 strip_prefix_refs(&mut stripped, prefix);
709 }
710
711 let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
712 let cols: Vec<haystack_core::data::HCol> = col_names
713 .iter()
714 .map(|n| haystack_core::data::HCol::new(n.as_str()))
715 .collect();
716 let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
717
718 let guard = self.client.read().await;
719 let client = guard.as_ref().ok_or("not connected")?;
720 match client.call("import", &grid).await {
721 Ok(grid) => Ok(grid),
722 Err(e) => {
723 drop(guard);
724 Err(self.on_proxy_error("import", e).await)
725 }
726 }
727 }
728
729 pub async fn proxy_invoke_action(
734 &self,
735 prefixed_id: &str,
736 action: &str,
737 args: HDict,
738 ) -> Result<HGrid, String> {
739 self.ensure_connected().await?;
740 let id = self.strip_id(prefixed_id);
741 let action = action.to_string();
742 let guard = self.client.read().await;
743 let client = guard.as_ref().ok_or("not connected")?;
744 match client.invoke_action(&id, &action, args).await {
745 Ok(grid) => Ok(grid),
746 Err(e) => {
747 drop(guard);
748 Err(self.on_proxy_error("invokeAction", e).await)
749 }
750 }
751 }
752
753 pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
759 let base_interval = connector.config.effective_sync_interval_secs();
760 let min_interval = base_interval / 2;
761 let max_interval = base_interval * 5;
762
763 tokio::spawn(async move {
764 loop {
765 let prev_count = connector.last_entity_count.load(Ordering::Relaxed);
766
767 match connector.sync().await {
768 Ok(count) => {
769 log::debug!("Synced {} entities from {}", count, connector.config.name);
770
771 let current = connector.current_interval_secs.load(Ordering::Relaxed);
773 let new_interval = if count as u64 == prev_count && prev_count > 0 {
774 (current + current / 2).min(max_interval)
776 } else {
777 base_interval
779 };
780 connector
781 .current_interval_secs
782 .store(new_interval, Ordering::Relaxed);
783 connector
784 .last_entity_count
785 .store(count as u64, Ordering::Relaxed);
786 }
787 Err(e) => {
788 log::error!("Sync failed for {}: {}", connector.config.name, e);
789 *connector.client.write().await = None;
791 connector.connected.store(false, Ordering::Relaxed);
792 connector
794 .current_interval_secs
795 .store(base_interval, Ordering::Relaxed);
796 }
797 }
798
799 let sleep_secs = connector
800 .current_interval_secs
801 .load(Ordering::Relaxed)
802 .max(min_interval);
803 tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
804 }
805 })
806 }
807}
808
809fn build_read_all_grid() -> HGrid {
811 use haystack_core::data::HCol;
812 let mut row = HDict::new();
813 row.set("filter", Kind::Str("*".to_string()));
814 HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
815}
816
817fn build_changes_grid(since_version: u64) -> HGrid {
819 use haystack_core::data::HCol;
820 use haystack_core::kinds::Number;
821 let mut row = HDict::new();
822 row.set(
823 "version",
824 Kind::Number(Number::unitless(since_version as f64)),
825 );
826 HGrid::from_parts(HDict::new(), vec![HCol::new("version")], vec![row])
827}
828
829impl ConnectorConfig {
830 pub fn effective_ws_url(&self) -> String {
834 if let Some(ref ws) = self.ws_url {
835 return ws.clone();
836 }
837 let ws = if self.url.starts_with("https://") {
838 self.url.replacen("https://", "wss://", 1)
839 } else {
840 self.url.replacen("http://", "ws://", 1)
841 };
842 format!("{ws}/ws")
843 }
844
845 pub fn effective_sync_interval_secs(&self) -> u64 {
847 self.sync_interval_secs.unwrap_or(60)
848 }
849}
850
851pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
856 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
857
858 for name in &tag_names {
859 let should_prefix = name == "id" || name.ends_with("Ref");
860 if !should_prefix {
861 continue;
862 }
863
864 if let Some(Kind::Ref(r)) = entity.get(name) {
865 let new_val = format!("{}{}", prefix, r.val);
866 let new_ref = HRef::new(new_val, r.dis.clone());
867 entity.set(name.as_str(), Kind::Ref(new_ref));
868 }
869 }
870}
871
872pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
878 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
879
880 for name in &tag_names {
881 let should_strip = name == "id" || name.ends_with("Ref");
882 if !should_strip {
883 continue;
884 }
885
886 if let Some(Kind::Ref(r)) = entity.get(name)
887 && let Some(stripped) = r.val.strip_prefix(prefix)
888 {
889 let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
890 entity.set(name.as_str(), Kind::Ref(new_ref));
891 }
892 }
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898 use haystack_core::kinds::HRef;
899
900 #[test]
901 fn connector_new_empty_cache() {
902 let config = ConnectorConfig {
903 name: "test".to_string(),
904 url: "http://localhost:8080/api".to_string(),
905 username: "user".to_string(),
906 password: "pass".to_string(),
907 id_prefix: None,
908 ws_url: None,
909 sync_interval_secs: None,
910 client_cert: None,
911 client_key: None,
912 ca_cert: None,
913 };
914 let connector = Connector::new(config);
915 assert_eq!(connector.entity_count(), 0);
916 assert!(connector.cached_entities().is_empty());
917 }
918
919 #[test]
920 fn connector_config_deserialization() {
921 let json = r#"{
922 "name": "Remote Server",
923 "url": "http://remote:8080/api",
924 "username": "admin",
925 "password": "secret",
926 "id_prefix": "r1-"
927 }"#;
928 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
929 assert_eq!(config.name, "Remote Server");
930 assert_eq!(config.url, "http://remote:8080/api");
931 assert_eq!(config.username, "admin");
932 assert_eq!(config.password, "secret");
933 assert_eq!(config.id_prefix, Some("r1-".to_string()));
934 }
935
936 #[test]
937 fn connector_config_deserialization_without_prefix() {
938 let json = r#"{
939 "name": "Remote",
940 "url": "http://remote:8080/api",
941 "username": "admin",
942 "password": "secret"
943 }"#;
944 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
945 assert_eq!(config.id_prefix, None);
946 }
947
948 #[test]
949 fn id_prefix_application() {
950 let mut entity = HDict::new();
951 entity.set("id", Kind::Ref(HRef::from_val("site-1")));
952 entity.set("dis", Kind::Str("Main Site".to_string()));
953 entity.set("site", Kind::Marker);
954 entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
955 entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
956 entity.set(
957 "floorRef",
958 Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
959 );
960
961 prefix_refs(&mut entity, "r1-");
962
963 match entity.get("id") {
965 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
966 other => panic!("expected Ref, got {other:?}"),
967 }
968
969 match entity.get("siteRef") {
971 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
972 other => panic!("expected Ref, got {other:?}"),
973 }
974
975 match entity.get("equipRef") {
977 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
978 other => panic!("expected Ref, got {other:?}"),
979 }
980
981 match entity.get("floorRef") {
983 Some(Kind::Ref(r)) => {
984 assert_eq!(r.val, "r1-floor-1");
985 assert_eq!(r.dis, Some("Floor 1".to_string()));
986 }
987 other => panic!("expected Ref, got {other:?}"),
988 }
989
990 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
992 assert_eq!(entity.get("site"), Some(&Kind::Marker));
993 }
994
995 #[test]
996 fn id_prefix_skips_non_ref_values() {
997 let mut entity = HDict::new();
998 entity.set("id", Kind::Ref(HRef::from_val("point-1")));
999 entity.set("customRef", Kind::Str("not-a-ref".to_string()));
1001
1002 prefix_refs(&mut entity, "p-");
1003
1004 match entity.get("id") {
1006 Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
1007 other => panic!("expected Ref, got {other:?}"),
1008 }
1009
1010 assert_eq!(
1012 entity.get("customRef"),
1013 Some(&Kind::Str("not-a-ref".to_string()))
1014 );
1015 }
1016
1017 #[test]
1018 fn connector_config_deserialization_full() {
1019 let json = r#"{
1020 "name": "Full Config",
1021 "url": "https://remote:8443/api",
1022 "username": "admin",
1023 "password": "secret",
1024 "id_prefix": "r1-",
1025 "ws_url": "wss://remote:8443/api/ws",
1026 "sync_interval_secs": 30,
1027 "client_cert": "/etc/certs/client.pem",
1028 "client_key": "/etc/certs/client-key.pem",
1029 "ca_cert": "/etc/certs/ca.pem"
1030 }"#;
1031 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1032 assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
1033 assert_eq!(config.sync_interval_secs, Some(30));
1034 assert_eq!(
1035 config.client_cert,
1036 Some("/etc/certs/client.pem".to_string())
1037 );
1038 assert_eq!(
1039 config.client_key,
1040 Some("/etc/certs/client-key.pem".to_string())
1041 );
1042 assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
1043 }
1044
1045 #[test]
1046 fn strip_prefix_refs_reverses_prefix() {
1047 let mut entity = HDict::new();
1048 entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
1049 entity.set("dis", Kind::Str("Main Site".to_string()));
1050 entity.set("site", Kind::Marker);
1051 entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
1052 entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
1053
1054 strip_prefix_refs(&mut entity, "r1-");
1055
1056 match entity.get("id") {
1057 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1058 other => panic!("expected Ref, got {other:?}"),
1059 }
1060 match entity.get("siteRef") {
1061 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1062 other => panic!("expected Ref, got {other:?}"),
1063 }
1064 match entity.get("equipRef") {
1065 Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
1066 other => panic!("expected Ref, got {other:?}"),
1067 }
1068 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1069 }
1070
1071 #[test]
1072 fn strip_prefix_refs_ignores_non_matching() {
1073 let mut entity = HDict::new();
1074 entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
1075
1076 strip_prefix_refs(&mut entity, "r1-");
1077
1078 match entity.get("id") {
1079 Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
1080 other => panic!("expected Ref, got {other:?}"),
1081 }
1082 }
1083
1084 #[test]
1085 fn derive_ws_url_from_http() {
1086 let config = ConnectorConfig {
1087 name: "test".to_string(),
1088 url: "http://remote:8080/api".to_string(),
1089 username: "u".to_string(),
1090 password: "p".to_string(),
1091 id_prefix: None,
1092 ws_url: None,
1093 sync_interval_secs: None,
1094 client_cert: None,
1095 client_key: None,
1096 ca_cert: None,
1097 };
1098 assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
1099 }
1100
1101 #[test]
1102 fn derive_ws_url_from_https() {
1103 let config = ConnectorConfig {
1104 name: "test".to_string(),
1105 url: "https://remote:8443/api".to_string(),
1106 username: "u".to_string(),
1107 password: "p".to_string(),
1108 id_prefix: None,
1109 ws_url: None,
1110 sync_interval_secs: None,
1111 client_cert: None,
1112 client_key: None,
1113 ca_cert: None,
1114 };
1115 assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
1116 }
1117
1118 #[test]
1119 fn explicit_ws_url_overrides_derived() {
1120 let config = ConnectorConfig {
1121 name: "test".to_string(),
1122 url: "http://remote:8080/api".to_string(),
1123 username: "u".to_string(),
1124 password: "p".to_string(),
1125 id_prefix: None,
1126 ws_url: Some("ws://custom:9999/ws".to_string()),
1127 sync_interval_secs: None,
1128 client_cert: None,
1129 client_key: None,
1130 ca_cert: None,
1131 };
1132 assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
1133 }
1134
1135 #[test]
1136 fn connector_tracks_entity_ids_in_ownership() {
1137 let config = ConnectorConfig {
1138 name: "test".to_string(),
1139 url: "http://localhost:8080/api".to_string(),
1140 username: "user".to_string(),
1141 password: "pass".to_string(),
1142 id_prefix: Some("t-".to_string()),
1143 ws_url: None,
1144 sync_interval_secs: None,
1145 client_cert: None,
1146 client_key: None,
1147 ca_cert: None,
1148 };
1149 let connector = Connector::new(config);
1150 assert!(!connector.owns("t-site-1"));
1151
1152 {
1154 let mut entity = HDict::new();
1155 entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
1156 connector.update_cache(vec![entity]);
1157 }
1158
1159 assert!(connector.owns("t-site-1"));
1160 assert!(!connector.owns("other-1"));
1161 }
1162
1163 #[test]
1164 fn connector_new_defaults_transport_and_connected() {
1165 let config = ConnectorConfig {
1166 name: "test".to_string(),
1167 url: "http://localhost:8080/api".to_string(),
1168 username: "user".to_string(),
1169 password: "pass".to_string(),
1170 id_prefix: None,
1171 ws_url: None,
1172 sync_interval_secs: None,
1173 client_cert: None,
1174 client_key: None,
1175 ca_cert: None,
1176 };
1177 let connector = Connector::new(config);
1178 assert_eq!(connector.transport_mode(), TransportMode::Http);
1179 assert!(!connector.is_connected());
1180 assert!(connector.last_sync_time().is_none());
1181 }
1182
1183 #[test]
1184 fn connector_config_new_fields_default_to_none() {
1185 let json = r#"{
1186 "name": "Minimal",
1187 "url": "http://remote:8080/api",
1188 "username": "user",
1189 "password": "pass"
1190 }"#;
1191 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1192 assert_eq!(config.ws_url, None);
1193 assert_eq!(config.sync_interval_secs, None);
1194 assert_eq!(config.client_cert, None);
1195 assert_eq!(config.client_key, None);
1196 assert_eq!(config.ca_cert, None);
1197 }
1198
1199 #[test]
1200 fn remote_watch_add_and_remove() {
1201 let config = ConnectorConfig {
1202 name: "test".to_string(),
1203 url: "http://localhost:8080/api".to_string(),
1204 username: "user".to_string(),
1205 password: "pass".to_string(),
1206 id_prefix: Some("r-".to_string()),
1207 ws_url: None,
1208 sync_interval_secs: None,
1209 client_cert: None,
1210 client_key: None,
1211 ca_cert: None,
1212 };
1213 let connector = Connector::new(config);
1214 assert_eq!(connector.remote_watch_count(), 0);
1215
1216 connector.add_remote_watch("r-site-1");
1217 assert_eq!(connector.remote_watch_count(), 1);
1218
1219 connector.add_remote_watch("r-equip-2");
1220 assert_eq!(connector.remote_watch_count(), 2);
1221
1222 connector.add_remote_watch("r-site-1");
1224 assert_eq!(connector.remote_watch_count(), 2);
1225
1226 connector.remove_remote_watch("r-site-1");
1227 assert_eq!(connector.remote_watch_count(), 1);
1228
1229 connector.remove_remote_watch("r-nonexistent");
1231 assert_eq!(connector.remote_watch_count(), 1);
1232
1233 connector.remove_remote_watch("r-equip-2");
1234 assert_eq!(connector.remote_watch_count(), 0);
1235 }
1236
1237 fn make_test_entities() -> Vec<HDict> {
1238 let mut site = HDict::new();
1239 site.set("id", Kind::Ref(HRef::from_val("site-1")));
1240 site.set("site", Kind::Marker);
1241 site.set("dis", Kind::Str("Main Site".into()));
1242
1243 let mut equip = HDict::new();
1244 equip.set("id", Kind::Ref(HRef::from_val("equip-1")));
1245 equip.set("equip", Kind::Marker);
1246 equip.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1247
1248 let mut point = HDict::new();
1249 point.set("id", Kind::Ref(HRef::from_val("point-1")));
1250 point.set("point", Kind::Marker);
1251 point.set("sensor", Kind::Marker);
1252 point.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1253
1254 vec![site, equip, point]
1255 }
1256
1257 #[test]
1258 fn filter_cached_returns_matching_entities() {
1259 let config = ConnectorConfig {
1260 name: "test".to_string(),
1261 url: "http://localhost:8080/api".to_string(),
1262 username: "user".to_string(),
1263 password: "pass".to_string(),
1264 id_prefix: None,
1265 ws_url: None,
1266 sync_interval_secs: None,
1267 client_cert: None,
1268 client_key: None,
1269 ca_cert: None,
1270 };
1271 let connector = Connector::new(config);
1272 connector.update_cache(make_test_entities());
1273
1274 let results = connector.filter_cached("site", 0).unwrap();
1276 assert_eq!(results.len(), 1);
1277 assert_eq!(
1278 results[0].get("id"),
1279 Some(&Kind::Ref(HRef::from_val("site-1")))
1280 );
1281
1282 let results = connector.filter_cached("equip", 0).unwrap();
1284 assert_eq!(results.len(), 1);
1285
1286 let results = connector.filter_cached("point and sensor", 0).unwrap();
1288 assert_eq!(results.len(), 1);
1289
1290 let results = connector.filter_cached("ahu", 0).unwrap();
1292 assert!(results.is_empty());
1293 }
1294
1295 #[test]
1296 fn filter_cached_respects_limit() {
1297 let config = ConnectorConfig {
1298 name: "test".to_string(),
1299 url: "http://localhost:8080/api".to_string(),
1300 username: "user".to_string(),
1301 password: "pass".to_string(),
1302 id_prefix: None,
1303 ws_url: None,
1304 sync_interval_secs: None,
1305 client_cert: None,
1306 client_key: None,
1307 ca_cert: None,
1308 };
1309 let connector = Connector::new(config);
1310
1311 let mut entities = Vec::new();
1313 for i in 0..10 {
1314 let mut p = HDict::new();
1315 p.set("id", Kind::Ref(HRef::from_val(&format!("point-{i}"))));
1316 p.set("point", Kind::Marker);
1317 entities.push(p);
1318 }
1319 connector.update_cache(entities);
1320
1321 let results = connector.filter_cached("point", 3).unwrap();
1322 assert_eq!(results.len(), 3);
1323 }
1324
1325 #[test]
1326 fn filter_cached_or_query() {
1327 let config = ConnectorConfig {
1328 name: "test".to_string(),
1329 url: "http://localhost:8080/api".to_string(),
1330 username: "user".to_string(),
1331 password: "pass".to_string(),
1332 id_prefix: None,
1333 ws_url: None,
1334 sync_interval_secs: None,
1335 client_cert: None,
1336 client_key: None,
1337 ca_cert: None,
1338 };
1339 let connector = Connector::new(config);
1340 connector.update_cache(make_test_entities());
1341
1342 let results = connector.filter_cached("site or equip", 0).unwrap();
1344 assert_eq!(results.len(), 2);
1345 }
1346}