1use std::collections::HashMap;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
35
36use chrono::{DateTime, Utc};
37use parking_lot::RwLock;
38
39use haystack_client::HaystackClient;
40use haystack_client::transport::http::HttpTransport;
41use haystack_client::transport::ws::WsTransport;
42use haystack_core::data::{HDict, HGrid};
43use haystack_core::filter::{FilterNode, matches, parse_filter};
44use haystack_core::graph::bitmap::TagBitmapIndex;
45use haystack_core::graph::query_planner;
46use haystack_core::kinds::{HRef, Kind};
47
48enum ConnectorClient {
53 Http(HaystackClient<HttpTransport>),
54 Ws(HaystackClient<WsTransport>),
55}
56
57impl ConnectorClient {
58 async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
61 match self {
62 ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
63 ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
64 }
65 }
66
67 async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
69 match self {
70 ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
71 ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
72 }
73 }
74
75 async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
77 match self {
78 ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
79 ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
80 }
81 }
82
83 async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
85 match self {
86 ConnectorClient::Http(c) => c
87 .point_write(id, level, val)
88 .await
89 .map_err(|e| e.to_string()),
90 ConnectorClient::Ws(c) => c
91 .point_write(id, level, val)
92 .await
93 .map_err(|e| e.to_string()),
94 }
95 }
96
97 async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
99 match self {
100 ConnectorClient::Http(c) => c
101 .invoke_action(id, action, args)
102 .await
103 .map_err(|e| e.to_string()),
104 ConnectorClient::Ws(c) => c
105 .invoke_action(id, action, args)
106 .await
107 .map_err(|e| e.to_string()),
108 }
109 }
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114#[repr(u8)]
115pub enum TransportMode {
116 Http = 0,
117 WebSocket = 1,
118}
119
120#[derive(Debug, Clone, serde::Deserialize)]
122pub struct ConnectorConfig {
123 pub name: String,
125 pub url: String,
127 pub username: String,
129 pub password: String,
131 pub id_prefix: Option<String>,
133 pub ws_url: Option<String>,
135 pub sync_interval_secs: Option<u64>,
137 pub client_cert: Option<String>,
139 pub client_key: Option<String>,
141 pub ca_cert: Option<String>,
143 #[serde(default)]
145 pub domain: Option<String>,
146}
147
148const MAX_DOMAIN_LEN: usize = 256;
149
150impl ConnectorConfig {
151 pub fn validate(&self) -> Result<(), String> {
153 if let Some(ref domain) = self.domain {
154 if domain.len() > MAX_DOMAIN_LEN {
155 return Err(format!(
156 "domain too long: {} chars (max {MAX_DOMAIN_LEN})",
157 domain.len()
158 ));
159 }
160 if !domain
161 .chars()
162 .all(|c| c.is_alphanumeric() || "-.:_".contains(c))
163 {
164 return Err(format!("domain contains invalid characters: {domain}"));
165 }
166 }
167 Ok(())
168 }
169}
170
171struct CacheState {
177 entities: Vec<Arc<HDict>>,
178 owned_ids: HashSet<String>,
179 tag_index: TagBitmapIndex,
180 id_map: HashMap<String, usize>,
181}
182
183impl CacheState {
184 fn empty() -> Self {
185 Self {
186 entities: Vec::new(),
187 owned_ids: HashSet::new(),
188 tag_index: TagBitmapIndex::new(),
189 id_map: HashMap::new(),
190 }
191 }
192
193 const MAX_ENTITY_TAGS: usize = 1_000;
195 const MAX_ENTITY_ID_LEN: usize = 256;
197
198 fn build(entities: Vec<HDict>) -> Self {
202 let mut owned_ids = HashSet::with_capacity(entities.len());
203 let mut tag_index = TagBitmapIndex::new();
204 let mut id_map = HashMap::with_capacity(entities.len());
205 let mut valid_entities = Vec::with_capacity(entities.len());
206
207 for entity in entities {
208 if let Some(Kind::Ref(r)) = entity.get("id") {
210 if r.val.len() > Self::MAX_ENTITY_ID_LEN {
211 log::warn!("skipping entity with oversized ID ({} bytes)", r.val.len());
212 continue;
213 }
214 }
215 if entity.len() > Self::MAX_ENTITY_TAGS {
216 log::warn!("skipping entity with too many tags ({})", entity.len());
217 continue;
218 }
219
220 let eid = valid_entities.len();
221 if let Some(Kind::Ref(r)) = entity.get("id") {
222 owned_ids.insert(r.val.clone());
223 id_map.insert(r.val.clone(), eid);
224 }
225 let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
226 tag_index.add(eid, &tags);
227 valid_entities.push(Arc::new(entity));
228 }
229
230 Self {
231 entities: valid_entities,
232 owned_ids,
233 tag_index,
234 id_map,
235 }
236 }
237
238 fn rebuild_index(&mut self) {
240 self.owned_ids.clear();
241 self.tag_index = TagBitmapIndex::new();
242 self.id_map.clear();
243
244 for (eid, entity) in self.entities.iter().enumerate() {
245 if let Some(Kind::Ref(r)) = entity.get("id") {
246 self.owned_ids.insert(r.val.clone());
247 self.id_map.insert(r.val.clone(), eid);
248 }
249 let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
250 self.tag_index.add(eid, &tags);
251 }
252 }
253}
254
255#[derive(Debug, Clone)]
259pub struct ConnectorState {
260 pub name: String,
261 pub connected: bool,
262 pub cache_version: u64,
263 pub entity_count: usize,
264 pub last_sync_ts: Option<i64>,
265 pub staleness_secs: Option<f64>,
266}
267
268pub struct Connector {
269 pub config: ConnectorConfig,
270 cache_state: RwLock<CacheState>,
273 remote_watch_ids: RwLock<HashSet<String>>,
275 transport_mode: AtomicU8,
277 connected: AtomicBool,
279 last_sync: RwLock<Option<DateTime<Utc>>>,
281 last_remote_version: RwLock<Option<u64>>,
283 client: tokio::sync::RwLock<Option<ConnectorClient>>,
287 current_interval_secs: AtomicU64,
289 last_entity_count: AtomicU64,
291 cache_version: AtomicU64,
293}
294
295impl Connector {
296 pub fn new(config: ConnectorConfig) -> Self {
301 let base_interval = config.effective_sync_interval_secs();
302 Self {
303 config,
304 cache_state: RwLock::new(CacheState::empty()),
305 remote_watch_ids: RwLock::new(HashSet::new()),
306 transport_mode: AtomicU8::new(TransportMode::Http as u8),
307 connected: AtomicBool::new(false),
308 last_sync: RwLock::new(None),
309 last_remote_version: RwLock::new(None),
310 client: tokio::sync::RwLock::new(None),
311 current_interval_secs: AtomicU64::new(base_interval),
312 last_entity_count: AtomicU64::new(0),
313 cache_version: AtomicU64::new(0),
314 }
315 }
316
317 async fn connect_persistent(&self) -> Result<(), String> {
320 let ws_url = self.config.effective_ws_url();
322 match HaystackClient::connect_ws(
323 &self.config.url,
324 &ws_url,
325 &self.config.username,
326 &self.config.password,
327 )
328 .await
329 {
330 Ok(ws_client) => {
331 *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
332 self.transport_mode
333 .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
334 self.connected.store(true, Ordering::Relaxed);
335 log::info!("Connected to {} via WebSocket", self.config.name);
336 return Ok(());
337 }
338 Err(e) => {
339 log::warn!(
340 "WS connection to {} failed: {}, trying HTTP",
341 self.config.name,
342 e
343 );
344 }
345 }
346
347 match HaystackClient::connect(
349 &self.config.url,
350 &self.config.username,
351 &self.config.password,
352 )
353 .await
354 {
355 Ok(http_client) => {
356 *self.client.write().await = Some(ConnectorClient::Http(http_client));
357 self.transport_mode
358 .store(TransportMode::Http as u8, Ordering::Relaxed);
359 self.connected.store(true, Ordering::Relaxed);
360 log::info!("Connected to {} via HTTP", self.config.name);
361 Ok(())
362 }
363 Err(e) => {
364 self.connected.store(false, Ordering::Relaxed);
365 Err(format!("connection failed: {e}"))
366 }
367 }
368 }
369
370 pub async fn sync(&self) -> Result<usize, String> {
378 if self.client.read().await.is_none() {
380 self.connect_persistent().await?;
381 }
382
383 let maybe_ver = *self.last_remote_version.read();
385 if let Some(last_ver) = maybe_ver {
386 match self.try_delta_sync(last_ver).await {
387 Ok(count) => return Ok(count),
388 Err(e) => {
389 log::debug!(
390 "Delta sync failed for {}, falling back to full: {e}",
391 self.config.name,
392 );
393 }
394 }
395 }
396
397 self.full_sync().await
399 }
400
401 async fn full_sync(&self) -> Result<usize, String> {
403 let grid = {
404 let client = self.client.read().await;
405 let client = client.as_ref().ok_or("not connected")?;
406 client.call("read", &build_read_all_grid()).await
407 };
408
409 let grid = grid.map_err(|e| {
410 self.connected.store(false, Ordering::Relaxed);
411 format!("read failed: {e}")
412 })?;
413
414 let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
415
416 if let Some(ref prefix) = self.config.id_prefix {
418 for entity in &mut entities {
419 prefix_refs(entity, prefix);
420 }
421 }
422
423 let count = entities.len();
424 self.update_cache(entities);
425 *self.last_sync.write() = Some(Utc::now());
426 self.connected.store(true, Ordering::Relaxed);
427
428 self.probe_remote_version().await;
430
431 Ok(count)
432 }
433
434 async fn probe_remote_version(&self) {
439 let grid = {
440 let client = self.client.read().await;
441 let Some(client) = client.as_ref() else {
442 return;
443 };
444 client.call("changes", &build_changes_grid(u64::MAX)).await
445 };
446 if let Ok(grid) = grid
447 && let Some(Kind::Number(n)) = grid.meta.get("curVer")
448 {
449 *self.last_remote_version.write() = Some(n.val as u64);
450 }
451 }
452
453 async fn try_delta_sync(&self, since_version: u64) -> Result<usize, String> {
459 let changes_grid = build_changes_grid(since_version);
460 let grid = {
461 let client = self.client.read().await;
462 let client = client.as_ref().ok_or("not connected")?;
463 client.call("changes", &changes_grid).await
464 }
465 .map_err(|e| format!("changes op failed: {e}"))?;
466
467 if grid.is_err() {
469 return Err("remote returned error grid for changes op".to_string());
470 }
471
472 let cur_ver = grid
474 .meta
475 .get("curVer")
476 .and_then(|k| {
477 if let Kind::Number(n) = k {
478 Some(n.val as u64)
479 } else {
480 None
481 }
482 })
483 .ok_or("changes response missing curVer in meta")?;
484
485 if grid.rows.is_empty() {
487 *self.last_remote_version.write() = Some(cur_ver);
488 *self.last_sync.write() = Some(Utc::now());
489 return Ok(self.cache_state.read().entities.len());
490 }
491
492 let mut state = self.cache_state.write();
494 let prefix = self.config.id_prefix.as_deref();
495
496 for row in &grid.rows {
497 let op = match row.get("op") {
498 Some(Kind::Str(s)) => s.as_str(),
499 _ => continue,
500 };
501 let ref_val = match row.get("ref") {
502 Some(Kind::Str(s)) => s.clone(),
503 _ => continue,
504 };
505
506 const MAX_ENTITY_TAGS: usize = 1_000;
507 const MAX_ENTITY_ID_LEN: usize = 256;
508
509 if ref_val.len() > MAX_ENTITY_ID_LEN {
510 log::warn!("skipping entity with oversized id: {} bytes", ref_val.len());
511 continue;
512 }
513
514 match op {
515 "add" | "update" => {
516 if let Some(Kind::Dict(entity_box)) = row.get("entity") {
517 let mut entity: HDict = (**entity_box).clone();
518 if entity.len() > MAX_ENTITY_TAGS {
519 log::warn!("skipping oversized entity with {} tags", entity.len());
520 continue;
521 }
522 if let Some(pfx) = prefix {
523 prefix_refs(&mut entity, pfx);
524 }
525 let entity_id = entity.get("id").and_then(|k| {
526 if let Kind::Ref(r) = k {
527 Some(r.val.clone())
528 } else {
529 None
530 }
531 });
532
533 if let Some(ref eid) = entity_id {
534 if let Some(&idx) = state.id_map.get(eid.as_str()) {
535 state.entities[idx] = Arc::new(entity);
537 } else {
538 let idx = state.entities.len();
540 state.id_map.insert(eid.clone(), idx);
541 state.entities.push(Arc::new(entity));
542 }
543 }
544 }
545 }
546 "remove" => {
547 let prefixed_ref = match prefix {
548 Some(pfx) => format!("{pfx}{ref_val}"),
549 None => ref_val,
550 };
551 if let Some(&idx) = state.id_map.get(prefixed_ref.as_str()) {
552 let last_idx = state.entities.len() - 1;
554 if idx != last_idx {
555 let last_id = state.entities[last_idx].get("id").and_then(|k| {
556 if let Kind::Ref(r) = k {
557 Some(r.val.clone())
558 } else {
559 None
560 }
561 });
562 state.entities.swap(idx, last_idx);
563 if let Some(lid) = last_id {
564 state.id_map.insert(lid, idx);
565 }
566 }
567 state.entities.pop();
568 state.id_map.remove(prefixed_ref.as_str());
569 }
570 }
571 _ => {}
572 }
573 }
574
575 state.rebuild_index();
577
578 let count = state.entities.len();
579 drop(state);
580
581 self.cache_version.fetch_add(1, Ordering::Relaxed);
582 *self.last_remote_version.write() = Some(cur_ver);
583 *self.last_sync.write() = Some(Utc::now());
584 self.connected.store(true, Ordering::Relaxed);
585 Ok(count)
586 }
587
588 pub fn update_cache(&self, entities: Vec<HDict>) {
594 *self.cache_state.write() = CacheState::build(entities);
595 self.cache_version.fetch_add(1, Ordering::Relaxed);
596 }
597
598 pub fn owns(&self, id: &str) -> bool {
600 self.cache_state.read().owned_ids.contains(id)
601 }
602
603 pub fn get_cached_entity(&self, id: &str) -> Option<Arc<HDict>> {
605 let state = self.cache_state.read();
606 state
607 .id_map
608 .get(id)
609 .and_then(|&idx| state.entities.get(idx))
610 .map(Arc::clone)
611 }
612
613 pub fn batch_get_cached(&self, ids: &[&str]) -> (Vec<Arc<HDict>>, Vec<String>) {
616 let state = self.cache_state.read();
617 let mut found = Vec::with_capacity(ids.len());
618 let mut missing = Vec::new();
619 for &id in ids {
620 if let Some(&idx) = state.id_map.get(id) {
621 if let Some(entity) = state.entities.get(idx) {
622 found.push(Arc::clone(entity));
623 } else {
624 missing.push(id.to_string());
625 }
626 } else {
627 missing.push(id.to_string());
628 }
629 }
630 (found, missing)
631 }
632
633 pub fn cached_entities(&self) -> Vec<Arc<HDict>> {
635 self.cache_state.read().entities.clone()
636 }
637
638 pub fn entity_count(&self) -> usize {
640 self.cache_state.read().entities.len()
641 }
642
643 pub fn filter_cached(&self, filter_expr: &str, limit: usize) -> Result<Vec<HDict>, String> {
648 let ast = parse_filter(filter_expr).map_err(|e| format!("filter error: {e}"))?;
649 Ok(self
650 .filter_cached_with_ast(&ast, limit)
651 .into_iter()
652 .map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|a| (*a).clone()))
653 .collect())
654 }
655
656 pub fn filter_cached_with_ast(&self, ast: &FilterNode, limit: usize) -> Vec<Arc<HDict>> {
662 let effective_limit = if limit == 0 { usize::MAX } else { limit };
663
664 let state = self.cache_state.read();
665 let max_id = state.entities.len();
666
667 let candidates = query_planner::bitmap_candidates(ast, &state.tag_index, max_id);
668
669 let mut results = Vec::with_capacity(effective_limit.min(max_id));
670
671 if let Some(ref bitmap) = candidates {
672 for eid in TagBitmapIndex::iter_set_bits(bitmap) {
673 if results.len() >= effective_limit {
674 break;
675 }
676 if let Some(entity) = state.entities.get(eid)
677 && matches(ast, entity, None)
678 {
679 results.push(Arc::clone(entity));
680 }
681 }
682 } else {
683 for entity in state.entities.iter() {
684 if results.len() >= effective_limit {
685 break;
686 }
687 if matches(ast, entity, None) {
688 results.push(Arc::clone(entity));
689 }
690 }
691 }
692
693 results
694 }
695
696 pub fn add_remote_watch(&self, prefixed_id: &str) {
698 self.remote_watch_ids
699 .write()
700 .insert(prefixed_id.to_string());
701 }
702
703 pub fn remove_remote_watch(&self, prefixed_id: &str) {
705 self.remote_watch_ids.write().remove(prefixed_id);
706 }
707
708 pub fn remote_watch_count(&self) -> usize {
710 self.remote_watch_ids.read().len()
711 }
712
713 pub fn transport_mode(&self) -> TransportMode {
715 match self.transport_mode.load(Ordering::Relaxed) {
716 1 => TransportMode::WebSocket,
717 _ => TransportMode::Http,
718 }
719 }
720
721 pub fn is_connected(&self) -> bool {
723 self.connected.load(Ordering::Relaxed)
724 }
725
726 pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
728 *self.last_sync.read()
729 }
730
731 pub fn cache_version(&self) -> u64 {
733 self.cache_version.load(Ordering::Relaxed)
734 }
735
736 pub fn state(&self) -> ConnectorState {
738 let now = Utc::now();
739 let last_sync = self.last_sync_time();
740 ConnectorState {
741 name: self.config.name.clone(),
742 connected: self.is_connected(),
743 cache_version: self.cache_version(),
744 entity_count: self.entity_count(),
745 last_sync_ts: last_sync.map(|ts| ts.timestamp()),
746 staleness_secs: last_sync.map(|ts| (now - ts).num_milliseconds() as f64 / 1000.0),
747 }
748 }
749
750 fn strip_id(&self, prefixed_id: &str) -> String {
752 if let Some(ref prefix) = self.config.id_prefix {
753 prefixed_id
754 .strip_prefix(prefix.as_str())
755 .unwrap_or(prefixed_id)
756 .to_string()
757 } else {
758 prefixed_id.to_string()
759 }
760 }
761
762 async fn ensure_connected(&self) -> Result<(), String> {
765 if self.client.read().await.is_none() {
766 self.connect_persistent().await?;
767 }
768 Ok(())
769 }
770
771 async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
774 *self.client.write().await = None;
775 self.connected.store(false, Ordering::Relaxed);
776 format!("{op_name} failed: {e}")
777 }
778
779 pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
784 self.ensure_connected().await?;
785 let id = self.strip_id(prefixed_id);
786 let guard = self.client.read().await;
787 let client = guard.as_ref().ok_or("not connected")?;
788 match client.his_read(&id, range).await {
789 Ok(grid) => Ok(grid),
790 Err(e) => {
791 drop(guard);
792 Err(self.on_proxy_error("hisRead", e).await)
793 }
794 }
795 }
796
797 pub async fn proxy_point_write(
802 &self,
803 prefixed_id: &str,
804 level: u8,
805 val: &Kind,
806 ) -> Result<HGrid, String> {
807 self.ensure_connected().await?;
808 let id = self.strip_id(prefixed_id);
809 let val = val.clone();
810 let guard = self.client.read().await;
811 let client = guard.as_ref().ok_or("not connected")?;
812 match client.point_write(&id, level, val).await {
813 Ok(grid) => Ok(grid),
814 Err(e) => {
815 drop(guard);
816 Err(self.on_proxy_error("pointWrite", e).await)
817 }
818 }
819 }
820
821 pub async fn proxy_his_write(
826 &self,
827 prefixed_id: &str,
828 items: Vec<HDict>,
829 ) -> Result<HGrid, String> {
830 self.ensure_connected().await?;
831 let id = self.strip_id(prefixed_id);
832 let guard = self.client.read().await;
833 let client = guard.as_ref().ok_or("not connected")?;
834 match client.his_write(&id, items).await {
835 Ok(grid) => Ok(grid),
836 Err(e) => {
837 drop(guard);
838 Err(self.on_proxy_error("hisWrite", e).await)
839 }
840 }
841 }
842
843 pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
848 self.ensure_connected().await?;
849
850 let mut stripped = entity.clone();
851 if let Some(ref prefix) = self.config.id_prefix {
852 strip_prefix_refs(&mut stripped, prefix);
853 }
854
855 let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
856 let cols: Vec<haystack_core::data::HCol> = col_names
857 .iter()
858 .map(|n| haystack_core::data::HCol::new(n.as_str()))
859 .collect();
860 let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
861
862 let guard = self.client.read().await;
863 let client = guard.as_ref().ok_or("not connected")?;
864 match client.call("import", &grid).await {
865 Ok(grid) => Ok(grid),
866 Err(e) => {
867 drop(guard);
868 Err(self.on_proxy_error("import", e).await)
869 }
870 }
871 }
872
873 pub async fn proxy_invoke_action(
878 &self,
879 prefixed_id: &str,
880 action: &str,
881 args: HDict,
882 ) -> Result<HGrid, String> {
883 self.ensure_connected().await?;
884 let id = self.strip_id(prefixed_id);
885 let action = action.to_string();
886 let guard = self.client.read().await;
887 let client = guard.as_ref().ok_or("not connected")?;
888 match client.invoke_action(&id, &action, args).await {
889 Ok(grid) => Ok(grid),
890 Err(e) => {
891 drop(guard);
892 Err(self.on_proxy_error("invokeAction", e).await)
893 }
894 }
895 }
896
897 pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
903 let base_interval = connector.config.effective_sync_interval_secs();
904 let min_interval = base_interval / 2;
905 let max_interval = base_interval * 5;
906
907 tokio::spawn(async move {
908 loop {
909 let prev_count = connector.last_entity_count.load(Ordering::Relaxed);
910
911 match connector.sync().await {
912 Ok(count) => {
913 log::debug!("Synced {} entities from {}", count, connector.config.name);
914
915 let current = connector.current_interval_secs.load(Ordering::Relaxed);
917 let new_interval = if count as u64 == prev_count && prev_count > 0 {
918 (current + current / 2).min(max_interval)
920 } else {
921 base_interval
923 };
924 connector
925 .current_interval_secs
926 .store(new_interval, Ordering::Relaxed);
927 connector
928 .last_entity_count
929 .store(count as u64, Ordering::Relaxed);
930 }
931 Err(e) => {
932 log::error!("Sync failed for {}: {}", connector.config.name, e);
933 *connector.client.write().await = None;
935 connector.connected.store(false, Ordering::Relaxed);
936 connector
938 .current_interval_secs
939 .store(base_interval, Ordering::Relaxed);
940 }
941 }
942
943 let sleep_secs = connector
944 .current_interval_secs
945 .load(Ordering::Relaxed)
946 .max(min_interval);
947 tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
948 }
949 })
950 }
951}
952
953fn build_read_all_grid() -> HGrid {
955 use haystack_core::data::HCol;
956 let mut row = HDict::new();
957 row.set("filter", Kind::Str("*".to_string()));
958 HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
959}
960
961fn build_changes_grid(since_version: u64) -> HGrid {
963 use haystack_core::data::HCol;
964 use haystack_core::kinds::Number;
965 let mut row = HDict::new();
966 row.set(
967 "version",
968 Kind::Number(Number::unitless(since_version as f64)),
969 );
970 HGrid::from_parts(HDict::new(), vec![HCol::new("version")], vec![row])
971}
972
973impl ConnectorConfig {
974 pub fn effective_ws_url(&self) -> String {
978 if let Some(ref ws) = self.ws_url {
979 return ws.clone();
980 }
981 let ws = if self.url.starts_with("https://") {
982 self.url.replacen("https://", "wss://", 1)
983 } else {
984 self.url.replacen("http://", "ws://", 1)
985 };
986 format!("{ws}/ws")
987 }
988
989 pub fn effective_sync_interval_secs(&self) -> u64 {
991 self.sync_interval_secs.unwrap_or(60)
992 }
993}
994
995pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
1000 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
1001
1002 for name in &tag_names {
1003 let should_prefix = name == "id" || name.ends_with("Ref");
1004 if !should_prefix {
1005 continue;
1006 }
1007
1008 if let Some(Kind::Ref(r)) = entity.get(name) {
1009 let new_val = format!("{}{}", prefix, r.val);
1010 let new_ref = HRef::new(new_val, r.dis.clone());
1011 entity.set(name.as_str(), Kind::Ref(new_ref));
1012 }
1013 }
1014}
1015
1016pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
1022 let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
1023
1024 for name in &tag_names {
1025 let should_strip = name == "id" || name.ends_with("Ref");
1026 if !should_strip {
1027 continue;
1028 }
1029
1030 if let Some(Kind::Ref(r)) = entity.get(name)
1031 && let Some(stripped) = r.val.strip_prefix(prefix)
1032 {
1033 let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
1034 entity.set(name.as_str(), Kind::Ref(new_ref));
1035 }
1036 }
1037}
1038
1039pub fn encode_sync_payload(grid: &HGrid) -> Result<Vec<u8>, String> {
1044 haystack_core::codecs::encode_grid_binary(grid)
1045}
1046
1047pub fn decode_sync_payload(bytes: &[u8]) -> Result<HGrid, String> {
1049 haystack_core::codecs::decode_grid_binary(bytes)
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use super::*;
1055 use haystack_core::kinds::HRef;
1056
1057 #[test]
1058 fn connector_new_empty_cache() {
1059 let config = ConnectorConfig {
1060 name: "test".to_string(),
1061 url: "http://localhost:8080/api".to_string(),
1062 username: "user".to_string(),
1063 password: "pass".to_string(),
1064 id_prefix: None,
1065 ws_url: None,
1066 sync_interval_secs: None,
1067 client_cert: None,
1068 client_key: None,
1069 ca_cert: None,
1070 domain: None,
1071 };
1072 let connector = Connector::new(config);
1073 assert_eq!(connector.entity_count(), 0);
1074 assert!(connector.cached_entities().is_empty());
1075 }
1076
1077 #[test]
1078 fn connector_config_deserialization() {
1079 let json = r#"{
1080 "name": "Remote Server",
1081 "url": "http://remote:8080/api",
1082 "username": "admin",
1083 "password": "secret",
1084 "id_prefix": "r1-"
1085 }"#;
1086 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1087 assert_eq!(config.name, "Remote Server");
1088 assert_eq!(config.url, "http://remote:8080/api");
1089 assert_eq!(config.username, "admin");
1090 assert_eq!(config.password, "secret");
1091 assert_eq!(config.id_prefix, Some("r1-".to_string()));
1092 }
1093
1094 #[test]
1095 fn connector_config_deserialization_without_prefix() {
1096 let json = r#"{
1097 "name": "Remote",
1098 "url": "http://remote:8080/api",
1099 "username": "admin",
1100 "password": "secret"
1101 }"#;
1102 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1103 assert_eq!(config.id_prefix, None);
1104 }
1105
1106 #[test]
1107 fn id_prefix_application() {
1108 let mut entity = HDict::new();
1109 entity.set("id", Kind::Ref(HRef::from_val("site-1")));
1110 entity.set("dis", Kind::Str("Main Site".to_string()));
1111 entity.set("site", Kind::Marker);
1112 entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1113 entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1114 entity.set(
1115 "floorRef",
1116 Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
1117 );
1118
1119 prefix_refs(&mut entity, "r1-");
1120
1121 match entity.get("id") {
1123 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1124 other => panic!("expected Ref, got {other:?}"),
1125 }
1126
1127 match entity.get("siteRef") {
1129 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1130 other => panic!("expected Ref, got {other:?}"),
1131 }
1132
1133 match entity.get("equipRef") {
1135 Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
1136 other => panic!("expected Ref, got {other:?}"),
1137 }
1138
1139 match entity.get("floorRef") {
1141 Some(Kind::Ref(r)) => {
1142 assert_eq!(r.val, "r1-floor-1");
1143 assert_eq!(r.dis, Some("Floor 1".to_string()));
1144 }
1145 other => panic!("expected Ref, got {other:?}"),
1146 }
1147
1148 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1150 assert_eq!(entity.get("site"), Some(&Kind::Marker));
1151 }
1152
1153 #[test]
1154 fn id_prefix_skips_non_ref_values() {
1155 let mut entity = HDict::new();
1156 entity.set("id", Kind::Ref(HRef::from_val("point-1")));
1157 entity.set("customRef", Kind::Str("not-a-ref".to_string()));
1159
1160 prefix_refs(&mut entity, "p-");
1161
1162 match entity.get("id") {
1164 Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
1165 other => panic!("expected Ref, got {other:?}"),
1166 }
1167
1168 assert_eq!(
1170 entity.get("customRef"),
1171 Some(&Kind::Str("not-a-ref".to_string()))
1172 );
1173 }
1174
1175 #[test]
1176 fn connector_config_deserialization_full() {
1177 let json = r#"{
1178 "name": "Full Config",
1179 "url": "https://remote:8443/api",
1180 "username": "admin",
1181 "password": "secret",
1182 "id_prefix": "r1-",
1183 "ws_url": "wss://remote:8443/api/ws",
1184 "sync_interval_secs": 30,
1185 "client_cert": "/etc/certs/client.pem",
1186 "client_key": "/etc/certs/client-key.pem",
1187 "ca_cert": "/etc/certs/ca.pem"
1188 }"#;
1189 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1190 assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
1191 assert_eq!(config.sync_interval_secs, Some(30));
1192 assert_eq!(
1193 config.client_cert,
1194 Some("/etc/certs/client.pem".to_string())
1195 );
1196 assert_eq!(
1197 config.client_key,
1198 Some("/etc/certs/client-key.pem".to_string())
1199 );
1200 assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
1201 }
1202
1203 #[test]
1204 fn strip_prefix_refs_reverses_prefix() {
1205 let mut entity = HDict::new();
1206 entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
1207 entity.set("dis", Kind::Str("Main Site".to_string()));
1208 entity.set("site", Kind::Marker);
1209 entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
1210 entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
1211
1212 strip_prefix_refs(&mut entity, "r1-");
1213
1214 match entity.get("id") {
1215 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1216 other => panic!("expected Ref, got {other:?}"),
1217 }
1218 match entity.get("siteRef") {
1219 Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1220 other => panic!("expected Ref, got {other:?}"),
1221 }
1222 match entity.get("equipRef") {
1223 Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
1224 other => panic!("expected Ref, got {other:?}"),
1225 }
1226 assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1227 }
1228
1229 #[test]
1230 fn strip_prefix_refs_ignores_non_matching() {
1231 let mut entity = HDict::new();
1232 entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
1233
1234 strip_prefix_refs(&mut entity, "r1-");
1235
1236 match entity.get("id") {
1237 Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
1238 other => panic!("expected Ref, got {other:?}"),
1239 }
1240 }
1241
1242 #[test]
1243 fn derive_ws_url_from_http() {
1244 let config = ConnectorConfig {
1245 name: "test".to_string(),
1246 url: "http://remote:8080/api".to_string(),
1247 username: "u".to_string(),
1248 password: "p".to_string(),
1249 id_prefix: None,
1250 ws_url: None,
1251 sync_interval_secs: None,
1252 client_cert: None,
1253 client_key: None,
1254 ca_cert: None,
1255 domain: None,
1256 };
1257 assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
1258 }
1259
1260 #[test]
1261 fn derive_ws_url_from_https() {
1262 let config = ConnectorConfig {
1263 name: "test".to_string(),
1264 url: "https://remote:8443/api".to_string(),
1265 username: "u".to_string(),
1266 password: "p".to_string(),
1267 id_prefix: None,
1268 ws_url: None,
1269 sync_interval_secs: None,
1270 client_cert: None,
1271 client_key: None,
1272 ca_cert: None,
1273 domain: None,
1274 };
1275 assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
1276 }
1277
1278 #[test]
1279 fn explicit_ws_url_overrides_derived() {
1280 let config = ConnectorConfig {
1281 name: "test".to_string(),
1282 url: "http://remote:8080/api".to_string(),
1283 username: "u".to_string(),
1284 password: "p".to_string(),
1285 id_prefix: None,
1286 ws_url: Some("ws://custom:9999/ws".to_string()),
1287 sync_interval_secs: None,
1288 client_cert: None,
1289 client_key: None,
1290 ca_cert: None,
1291 domain: None,
1292 };
1293 assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
1294 }
1295
1296 #[test]
1297 fn connector_tracks_entity_ids_in_ownership() {
1298 let config = ConnectorConfig {
1299 name: "test".to_string(),
1300 url: "http://localhost:8080/api".to_string(),
1301 username: "user".to_string(),
1302 password: "pass".to_string(),
1303 id_prefix: Some("t-".to_string()),
1304 ws_url: None,
1305 sync_interval_secs: None,
1306 client_cert: None,
1307 client_key: None,
1308 ca_cert: None,
1309 domain: None,
1310 };
1311 let connector = Connector::new(config);
1312 assert!(!connector.owns("t-site-1"));
1313
1314 {
1316 let mut entity = HDict::new();
1317 entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
1318 connector.update_cache(vec![entity]);
1319 }
1320
1321 assert!(connector.owns("t-site-1"));
1322 assert!(!connector.owns("other-1"));
1323 }
1324
1325 #[test]
1326 fn connector_new_defaults_transport_and_connected() {
1327 let config = ConnectorConfig {
1328 name: "test".to_string(),
1329 url: "http://localhost:8080/api".to_string(),
1330 username: "user".to_string(),
1331 password: "pass".to_string(),
1332 id_prefix: None,
1333 ws_url: None,
1334 sync_interval_secs: None,
1335 client_cert: None,
1336 client_key: None,
1337 ca_cert: None,
1338 domain: None,
1339 };
1340 let connector = Connector::new(config);
1341 assert_eq!(connector.transport_mode(), TransportMode::Http);
1342 assert!(!connector.is_connected());
1343 assert!(connector.last_sync_time().is_none());
1344 }
1345
1346 #[test]
1347 fn connector_config_new_fields_default_to_none() {
1348 let json = r#"{
1349 "name": "Minimal",
1350 "url": "http://remote:8080/api",
1351 "username": "user",
1352 "password": "pass"
1353 }"#;
1354 let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1355 assert_eq!(config.ws_url, None);
1356 assert_eq!(config.sync_interval_secs, None);
1357 assert_eq!(config.client_cert, None);
1358 assert_eq!(config.client_key, None);
1359 assert_eq!(config.ca_cert, None);
1360 }
1361
1362 #[test]
1363 fn remote_watch_add_and_remove() {
1364 let config = ConnectorConfig {
1365 name: "test".to_string(),
1366 url: "http://localhost:8080/api".to_string(),
1367 username: "user".to_string(),
1368 password: "pass".to_string(),
1369 id_prefix: Some("r-".to_string()),
1370 ws_url: None,
1371 sync_interval_secs: None,
1372 client_cert: None,
1373 client_key: None,
1374 ca_cert: None,
1375 domain: None,
1376 };
1377 let connector = Connector::new(config);
1378 assert_eq!(connector.remote_watch_count(), 0);
1379
1380 connector.add_remote_watch("r-site-1");
1381 assert_eq!(connector.remote_watch_count(), 1);
1382
1383 connector.add_remote_watch("r-equip-2");
1384 assert_eq!(connector.remote_watch_count(), 2);
1385
1386 connector.add_remote_watch("r-site-1");
1388 assert_eq!(connector.remote_watch_count(), 2);
1389
1390 connector.remove_remote_watch("r-site-1");
1391 assert_eq!(connector.remote_watch_count(), 1);
1392
1393 connector.remove_remote_watch("r-nonexistent");
1395 assert_eq!(connector.remote_watch_count(), 1);
1396
1397 connector.remove_remote_watch("r-equip-2");
1398 assert_eq!(connector.remote_watch_count(), 0);
1399 }
1400
1401 fn make_test_entities() -> Vec<HDict> {
1402 let mut site = HDict::new();
1403 site.set("id", Kind::Ref(HRef::from_val("site-1")));
1404 site.set("site", Kind::Marker);
1405 site.set("dis", Kind::Str("Main Site".into()));
1406
1407 let mut equip = HDict::new();
1408 equip.set("id", Kind::Ref(HRef::from_val("equip-1")));
1409 equip.set("equip", Kind::Marker);
1410 equip.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1411
1412 let mut point = HDict::new();
1413 point.set("id", Kind::Ref(HRef::from_val("point-1")));
1414 point.set("point", Kind::Marker);
1415 point.set("sensor", Kind::Marker);
1416 point.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1417
1418 vec![site, equip, point]
1419 }
1420
1421 #[test]
1422 fn filter_cached_returns_matching_entities() {
1423 let config = ConnectorConfig {
1424 name: "test".to_string(),
1425 url: "http://localhost:8080/api".to_string(),
1426 username: "user".to_string(),
1427 password: "pass".to_string(),
1428 id_prefix: None,
1429 ws_url: None,
1430 sync_interval_secs: None,
1431 client_cert: None,
1432 client_key: None,
1433 ca_cert: None,
1434 domain: None,
1435 };
1436 let connector = Connector::new(config);
1437 connector.update_cache(make_test_entities());
1438
1439 let results = connector.filter_cached("site", 0).unwrap();
1441 assert_eq!(results.len(), 1);
1442 assert_eq!(
1443 results[0].get("id"),
1444 Some(&Kind::Ref(HRef::from_val("site-1")))
1445 );
1446
1447 let results = connector.filter_cached("equip", 0).unwrap();
1449 assert_eq!(results.len(), 1);
1450
1451 let results = connector.filter_cached("point and sensor", 0).unwrap();
1453 assert_eq!(results.len(), 1);
1454
1455 let results = connector.filter_cached("ahu", 0).unwrap();
1457 assert!(results.is_empty());
1458 }
1459
1460 #[test]
1461 fn filter_cached_respects_limit() {
1462 let config = ConnectorConfig {
1463 name: "test".to_string(),
1464 url: "http://localhost:8080/api".to_string(),
1465 username: "user".to_string(),
1466 password: "pass".to_string(),
1467 id_prefix: None,
1468 ws_url: None,
1469 sync_interval_secs: None,
1470 client_cert: None,
1471 client_key: None,
1472 ca_cert: None,
1473 domain: None,
1474 };
1475 let connector = Connector::new(config);
1476
1477 let mut entities = Vec::new();
1479 for i in 0..10 {
1480 let mut p = HDict::new();
1481 p.set("id", Kind::Ref(HRef::from_val(format!("point-{i}"))));
1482 p.set("point", Kind::Marker);
1483 entities.push(p);
1484 }
1485 connector.update_cache(entities);
1486
1487 let results = connector.filter_cached("point", 3).unwrap();
1488 assert_eq!(results.len(), 3);
1489 }
1490
1491 #[test]
1492 fn filter_cached_or_query() {
1493 let config = ConnectorConfig {
1494 name: "test".to_string(),
1495 url: "http://localhost:8080/api".to_string(),
1496 username: "user".to_string(),
1497 password: "pass".to_string(),
1498 id_prefix: None,
1499 ws_url: None,
1500 sync_interval_secs: None,
1501 client_cert: None,
1502 client_key: None,
1503 ca_cert: None,
1504 domain: None,
1505 };
1506 let connector = Connector::new(config);
1507 connector.update_cache(make_test_entities());
1508
1509 let results = connector.filter_cached("site or equip", 0).unwrap();
1511 assert_eq!(results.len(), 2);
1512 }
1513
1514 #[test]
1515 fn cache_version_starts_at_zero() {
1516 let connector = Connector::new(ConnectorConfig {
1517 name: "test".to_string(),
1518 url: "http://localhost:8080/api".to_string(),
1519 username: "user".to_string(),
1520 password: "pass".to_string(),
1521 id_prefix: None,
1522 ws_url: None,
1523 sync_interval_secs: None,
1524 client_cert: None,
1525 client_key: None,
1526 ca_cert: None,
1527 domain: None,
1528 });
1529 assert_eq!(connector.cache_version(), 0);
1530 }
1531
1532 #[test]
1533 fn cache_version_increments_on_update() {
1534 let connector = Connector::new(ConnectorConfig {
1535 name: "test".to_string(),
1536 url: "http://localhost:8080/api".to_string(),
1537 username: "user".to_string(),
1538 password: "pass".to_string(),
1539 id_prefix: None,
1540 ws_url: None,
1541 sync_interval_secs: None,
1542 client_cert: None,
1543 client_key: None,
1544 ca_cert: None,
1545 domain: None,
1546 });
1547 assert_eq!(connector.cache_version(), 0);
1548
1549 connector.update_cache(vec![]);
1550 assert_eq!(connector.cache_version(), 1);
1551
1552 let mut e = HDict::new();
1553 e.set("id", Kind::Ref(HRef::from_val("p-1")));
1554 connector.update_cache(vec![e]);
1555 assert_eq!(connector.cache_version(), 2);
1556 }
1557
1558 #[test]
1559 fn connector_state_populated() {
1560 let connector = Connector::new(ConnectorConfig {
1561 name: "alpha".to_string(),
1562 url: "http://localhost:8080/api".to_string(),
1563 username: "user".to_string(),
1564 password: "pass".to_string(),
1565 id_prefix: None,
1566 ws_url: None,
1567 sync_interval_secs: None,
1568 client_cert: None,
1569 client_key: None,
1570 ca_cert: None,
1571 domain: None,
1572 });
1573
1574 let st = connector.state();
1575 assert_eq!(st.name, "alpha");
1576 assert!(!st.connected);
1577 assert_eq!(st.cache_version, 0);
1578 assert_eq!(st.entity_count, 0);
1579 assert!(st.last_sync_ts.is_none());
1580 assert!(st.staleness_secs.is_none());
1581
1582 let mut e = HDict::new();
1584 e.set("id", Kind::Ref(HRef::from_val("s-1")));
1585 e.set("site", Kind::Marker);
1586 connector.update_cache(vec![e]);
1587
1588 let st = connector.state();
1589 assert_eq!(st.cache_version, 1);
1590 assert_eq!(st.entity_count, 1);
1591 }
1592}