Skip to main content

haystack_server/
connector.rs

1//! Connector for fetching entities from a remote Haystack server.
2//!
3//! # Overview
4//!
5//! A [`Connector`] maintains a persistent connection (HTTP or WebSocket) to a
6//! remote Haystack server, periodically syncs entities into a local cache, and
7//! proxies write operations (`hisRead`, `hisWrite`, `pointWrite`,
8//! `invokeAction`, `import`) to the remote when the target entity is owned by
9//! that connector.
10//!
11//! # Lifecycle
12//!
13//! 1. **Construction** — [`Connector::new`] creates a connector with an empty
14//!    cache from a [`ConnectorConfig`] (parsed from the TOML federation config).
15//! 2. **Connection** — on first sync, [`Connector::sync`] calls
16//!    `connect_persistent()` which tries WebSocket first, falling back to HTTP.
17//! 3. **Sync loop** — [`Connector::spawn_sync_task`] spawns a tokio task that
18//!    calls [`Connector::sync`] at an adaptive interval. Sync tries incremental
19//!    delta sync via the `changes` op first, falling back to full entity fetch.
20//! 4. **Cache** — [`Connector::update_cache`] replaces the cached entity list
21//!    and rebuilds the bitmap tag index and owned-ID set for fast lookups.
22//! 5. **Proxy** — write ops check [`Connector::owns`] and forward to the remote
23//!    server via `proxy_*` methods, stripping/re-adding the ID prefix as needed.
24//!
25//! # ID Prefixing
26//!
27//! When `id_prefix` is configured, all Ref values (`id`, `siteRef`, etc.) are
28//! prefixed on sync and stripped before proxying. See [`prefix_refs`] and
29//! [`strip_prefix_refs`].
30
31use std::collections::HashMap;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
35
36use chrono::{DateTime, Utc};
37use parking_lot::RwLock;
38
39use haystack_client::HaystackClient;
40use haystack_client::transport::http::HttpTransport;
41use haystack_client::transport::ws::WsTransport;
42use haystack_core::data::{HDict, HGrid};
43use haystack_core::filter::{FilterNode, matches, parse_filter};
44use haystack_core::graph::bitmap::TagBitmapIndex;
45use haystack_core::graph::query_planner;
46use haystack_core::kinds::{HRef, Kind};
47
48/// Type-erased persistent connection to a remote Haystack server.
49///
50/// Wraps either an HTTP or WebSocket `HaystackClient`, allowing the connector
51/// to hold a single persistent connection regardless of transport.
52enum ConnectorClient {
53    Http(HaystackClient<HttpTransport>),
54    Ws(HaystackClient<WsTransport>),
55}
56
57impl ConnectorClient {
58    /// Call a Haystack op through the underlying client, returning the
59    /// response grid or an error string.
60    async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
61        match self {
62            ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
63            ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
64        }
65    }
66
67    /// Read historical data for a point.
68    async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
69        match self {
70            ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
71            ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
72        }
73    }
74
75    /// Write historical data for a point.
76    async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
77        match self {
78            ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
79            ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
80        }
81    }
82
83    /// Write a value to a writable point.
84    async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
85        match self {
86            ConnectorClient::Http(c) => c
87                .point_write(id, level, val)
88                .await
89                .map_err(|e| e.to_string()),
90            ConnectorClient::Ws(c) => c
91                .point_write(id, level, val)
92                .await
93                .map_err(|e| e.to_string()),
94        }
95    }
96
97    /// Invoke an action on an entity.
98    async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
99        match self {
100            ConnectorClient::Http(c) => c
101                .invoke_action(id, action, args)
102                .await
103                .map_err(|e| e.to_string()),
104            ConnectorClient::Ws(c) => c
105                .invoke_action(id, action, args)
106                .await
107                .map_err(|e| e.to_string()),
108        }
109    }
110}
111
112/// Transport protocol used by a connector.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114#[repr(u8)]
115pub enum TransportMode {
116    Http = 0,
117    WebSocket = 1,
118}
119
120/// Configuration for a remote Haystack server connection.
121#[derive(Debug, Clone, serde::Deserialize)]
122pub struct ConnectorConfig {
123    /// Display name for this connector.
124    pub name: String,
125    /// Base URL of the remote Haystack API (e.g. "http://remote:8080/api").
126    pub url: String,
127    /// Username for SCRAM authentication.
128    pub username: String,
129    /// Password for SCRAM authentication.
130    pub password: String,
131    /// Optional tag prefix to namespace remote entity IDs (e.g. "remote1-").
132    pub id_prefix: Option<String>,
133    /// WebSocket URL (e.g. "ws://remote:8080/api/ws"). Derived from url if omitted.
134    pub ws_url: Option<String>,
135    /// Fallback polling interval in seconds (default: 60).
136    pub sync_interval_secs: Option<u64>,
137    /// Path to PEM client certificate for mTLS.
138    pub client_cert: Option<String>,
139    /// Path to PEM client private key for mTLS.
140    pub client_key: Option<String>,
141    /// Path to PEM CA certificate for server verification.
142    pub ca_cert: Option<String>,
143    /// Optional domain tag for scoping federated queries (e.g. "site:bldg-a").
144    #[serde(default)]
145    pub domain: Option<String>,
146}
147
148const MAX_DOMAIN_LEN: usize = 256;
149
150impl ConnectorConfig {
151    /// Validate configuration fields.
152    pub fn validate(&self) -> Result<(), String> {
153        if let Some(ref domain) = self.domain {
154            if domain.len() > MAX_DOMAIN_LEN {
155                return Err(format!(
156                    "domain too long: {} chars (max {MAX_DOMAIN_LEN})",
157                    domain.len()
158                ));
159            }
160            if !domain
161                .chars()
162                .all(|c| c.is_alphanumeric() || "-.:_".contains(c))
163            {
164                return Err(format!("domain contains invalid characters: {domain}"));
165            }
166        }
167        Ok(())
168    }
169}
170
171/// A connector that can fetch entities from a remote Haystack server.
172///
173/// Holds a persistent connection, a local entity cache with bitmap index,
174/// Consolidated cache state — entities, ownership set, bitmap index, and ID map
175/// bundled under a single lock for atomic reads/updates and consistency.
176struct CacheState {
177    entities: Vec<Arc<HDict>>,
178    owned_ids: HashSet<String>,
179    tag_index: TagBitmapIndex,
180    id_map: HashMap<String, usize>,
181}
182
183impl CacheState {
184    fn empty() -> Self {
185        Self {
186            entities: Vec::new(),
187            owned_ids: HashSet::new(),
188            tag_index: TagBitmapIndex::new(),
189            id_map: HashMap::new(),
190        }
191    }
192
193    /// Maximum number of tags per entity before the entity is skipped.
194    const MAX_ENTITY_TAGS: usize = 1_000;
195    /// Maximum entity ID length (bytes) before the entity is skipped.
196    const MAX_ENTITY_ID_LEN: usize = 256;
197
198    /// Build a fully indexed cache state from a list of entities.
199    /// Entities with an ID exceeding [`MAX_ENTITY_ID_LEN`] or more than
200    /// [`MAX_ENTITY_TAGS`] tags are silently skipped.
201    fn build(entities: Vec<HDict>) -> Self {
202        let mut owned_ids = HashSet::with_capacity(entities.len());
203        let mut tag_index = TagBitmapIndex::new();
204        let mut id_map = HashMap::with_capacity(entities.len());
205        let mut valid_entities = Vec::with_capacity(entities.len());
206
207        for entity in entities {
208            // Validate entity: skip oversized IDs or excessive tag counts
209            if let Some(Kind::Ref(r)) = entity.get("id") {
210                if r.val.len() > Self::MAX_ENTITY_ID_LEN {
211                    log::warn!("skipping entity with oversized ID ({} bytes)", r.val.len());
212                    continue;
213                }
214            }
215            if entity.len() > Self::MAX_ENTITY_TAGS {
216                log::warn!("skipping entity with too many tags ({})", entity.len());
217                continue;
218            }
219
220            let eid = valid_entities.len();
221            if let Some(Kind::Ref(r)) = entity.get("id") {
222                owned_ids.insert(r.val.clone());
223                id_map.insert(r.val.clone(), eid);
224            }
225            let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
226            tag_index.add(eid, &tags);
227            valid_entities.push(Arc::new(entity));
228        }
229
230        Self {
231            entities: valid_entities,
232            owned_ids,
233            tag_index,
234            id_map,
235        }
236    }
237
238    /// Rebuild tag index and id_map from current entities.
239    fn rebuild_index(&mut self) {
240        self.owned_ids.clear();
241        self.tag_index = TagBitmapIndex::new();
242        self.id_map.clear();
243
244        for (eid, entity) in self.entities.iter().enumerate() {
245            if let Some(Kind::Ref(r)) = entity.get("id") {
246                self.owned_ids.insert(r.val.clone());
247                self.id_map.insert(r.val.clone(), eid);
248            }
249            let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
250            self.tag_index.add(eid, &tags);
251        }
252    }
253}
254
255/// and state for adaptive sync and federation proxy operations. See the
256/// [module-level documentation](self) for the full lifecycle.
257/// Observable state of a federation connector.
258#[derive(Debug, Clone)]
259pub struct ConnectorState {
260    pub name: String,
261    pub connected: bool,
262    pub cache_version: u64,
263    pub entity_count: usize,
264    pub last_sync_ts: Option<i64>,
265    pub staleness_secs: Option<f64>,
266}
267
268pub struct Connector {
269    pub config: ConnectorConfig,
270    /// Consolidated cache state: entities, ownership index, bitmap tag index, and
271    /// ID→position map — all under a single lock for atomic consistency.
272    cache_state: RwLock<CacheState>,
273    /// Entity IDs being watched remotely (prefixed IDs).
274    remote_watch_ids: RwLock<HashSet<String>>,
275    /// Transport protocol (HTTP or WebSocket).
276    transport_mode: AtomicU8,
277    /// Whether the connector is currently connected (last sync succeeded).
278    connected: AtomicBool,
279    /// Timestamp of the last successful sync.
280    last_sync: RwLock<Option<DateTime<Utc>>>,
281    /// Remote graph version from last successful sync (for delta sync).
282    last_remote_version: RwLock<Option<u64>>,
283    /// Persistent client connection used by sync and background tasks.
284    /// Lazily initialized on first sync. Cleared on connection error.
285    /// Uses `tokio::sync::RwLock` because the guard is held across `.await`.
286    client: tokio::sync::RwLock<Option<ConnectorClient>>,
287    /// Current adaptive sync interval in seconds (adjusted based on change rate).
288    current_interval_secs: AtomicU64,
289    /// Number of entities from last sync (for change detection).
290    last_entity_count: AtomicU64,
291    /// Monotonically increasing version counter, bumped on each cache update.
292    cache_version: AtomicU64,
293}
294
295impl Connector {
296    /// Create a new connector with an empty cache.
297    ///
298    /// The connector starts disconnected; a persistent connection is
299    /// established lazily on the first [`sync`](Self::sync) call.
300    pub fn new(config: ConnectorConfig) -> Self {
301        let base_interval = config.effective_sync_interval_secs();
302        Self {
303            config,
304            cache_state: RwLock::new(CacheState::empty()),
305            remote_watch_ids: RwLock::new(HashSet::new()),
306            transport_mode: AtomicU8::new(TransportMode::Http as u8),
307            connected: AtomicBool::new(false),
308            last_sync: RwLock::new(None),
309            last_remote_version: RwLock::new(None),
310            client: tokio::sync::RwLock::new(None),
311            current_interval_secs: AtomicU64::new(base_interval),
312            last_entity_count: AtomicU64::new(0),
313            cache_version: AtomicU64::new(0),
314        }
315    }
316
317    /// Attempt to connect to the remote server. Tries WebSocket first,
318    /// falls back to HTTP.
319    async fn connect_persistent(&self) -> Result<(), String> {
320        // Try WebSocket first
321        let ws_url = self.config.effective_ws_url();
322        match HaystackClient::connect_ws(
323            &self.config.url,
324            &ws_url,
325            &self.config.username,
326            &self.config.password,
327        )
328        .await
329        {
330            Ok(ws_client) => {
331                *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
332                self.transport_mode
333                    .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
334                self.connected.store(true, Ordering::Relaxed);
335                log::info!("Connected to {} via WebSocket", self.config.name);
336                return Ok(());
337            }
338            Err(e) => {
339                log::warn!(
340                    "WS connection to {} failed: {}, trying HTTP",
341                    self.config.name,
342                    e
343                );
344            }
345        }
346
347        // Fall back to HTTP
348        match HaystackClient::connect(
349            &self.config.url,
350            &self.config.username,
351            &self.config.password,
352        )
353        .await
354        {
355            Ok(http_client) => {
356                *self.client.write().await = Some(ConnectorClient::Http(http_client));
357                self.transport_mode
358                    .store(TransportMode::Http as u8, Ordering::Relaxed);
359                self.connected.store(true, Ordering::Relaxed);
360                log::info!("Connected to {} via HTTP", self.config.name);
361                Ok(())
362            }
363            Err(e) => {
364                self.connected.store(false, Ordering::Relaxed);
365                Err(format!("connection failed: {e}"))
366            }
367        }
368    }
369
370    /// Connect to the remote server, fetch entities, and update cache.
371    ///
372    /// Tries incremental delta sync first (via the `changes` op) if we have a
373    /// known remote version. Falls back to full sync on first connect, when the
374    /// remote doesn't support `changes`, or when the version gap is too large.
375    ///
376    /// Returns the number of cached entities after sync, or an error string.
377    pub async fn sync(&self) -> Result<usize, String> {
378        // Ensure we have a connection
379        if self.client.read().await.is_none() {
380            self.connect_persistent().await?;
381        }
382
383        // Attempt delta sync if we have a previous version.
384        let maybe_ver = *self.last_remote_version.read();
385        if let Some(last_ver) = maybe_ver {
386            match self.try_delta_sync(last_ver).await {
387                Ok(count) => return Ok(count),
388                Err(e) => {
389                    log::debug!(
390                        "Delta sync failed for {}, falling back to full: {e}",
391                        self.config.name,
392                    );
393                }
394            }
395        }
396
397        // Full sync: read all entities
398        self.full_sync().await
399    }
400
401    /// Full sync: fetch all entities from remote and replace cache.
402    async fn full_sync(&self) -> Result<usize, String> {
403        let grid = {
404            let client = self.client.read().await;
405            let client = client.as_ref().ok_or("not connected")?;
406            client.call("read", &build_read_all_grid()).await
407        };
408
409        let grid = grid.map_err(|e| {
410            self.connected.store(false, Ordering::Relaxed);
411            format!("read failed: {e}")
412        })?;
413
414        let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
415
416        // Apply id prefix if configured.
417        if let Some(ref prefix) = self.config.id_prefix {
418            for entity in &mut entities {
419                prefix_refs(entity, prefix);
420            }
421        }
422
423        let count = entities.len();
424        self.update_cache(entities);
425        *self.last_sync.write() = Some(Utc::now());
426        self.connected.store(true, Ordering::Relaxed);
427
428        // Probe remote version for next delta sync attempt.
429        self.probe_remote_version().await;
430
431        Ok(count)
432    }
433
434    /// Try to discover the remote graph version by calling `changes` with a
435    /// high version number. The response meta contains `curVer` which we store
436    /// for future delta syncs. Failures are silently ignored (remote may not
437    /// support the `changes` op).
438    async fn probe_remote_version(&self) {
439        let grid = {
440            let client = self.client.read().await;
441            let Some(client) = client.as_ref() else {
442                return;
443            };
444            client.call("changes", &build_changes_grid(u64::MAX)).await
445        };
446        if let Ok(grid) = grid
447            && let Some(Kind::Number(n)) = grid.meta.get("curVer")
448        {
449            *self.last_remote_version.write() = Some(n.val as u64);
450        }
451    }
452
453    /// Attempt incremental delta sync using the `changes` op.
454    ///
455    /// Sends the last known remote version to the server. The server returns
456    /// only the diffs since that version. We apply them incrementally to our
457    /// cache, avoiding a full entity set transfer.
458    async fn try_delta_sync(&self, since_version: u64) -> Result<usize, String> {
459        let changes_grid = build_changes_grid(since_version);
460        let grid = {
461            let client = self.client.read().await;
462            let client = client.as_ref().ok_or("not connected")?;
463            client.call("changes", &changes_grid).await
464        }
465        .map_err(|e| format!("changes op failed: {e}"))?;
466
467        // If the remote returned an error grid, fall back.
468        if grid.is_err() {
469            return Err("remote returned error grid for changes op".to_string());
470        }
471
472        // Extract current version from response meta.
473        let cur_ver = grid
474            .meta
475            .get("curVer")
476            .and_then(|k| {
477                if let Kind::Number(n) = k {
478                    Some(n.val as u64)
479                } else {
480                    None
481                }
482            })
483            .ok_or("changes response missing curVer in meta")?;
484
485        // No changes — nothing to do.
486        if grid.rows.is_empty() {
487            *self.last_remote_version.write() = Some(cur_ver);
488            *self.last_sync.write() = Some(Utc::now());
489            return Ok(self.cache_state.read().entities.len());
490        }
491
492        // Apply diffs in-place under a single write lock.
493        let mut state = self.cache_state.write();
494        let prefix = self.config.id_prefix.as_deref();
495
496        for row in &grid.rows {
497            let op = match row.get("op") {
498                Some(Kind::Str(s)) => s.as_str(),
499                _ => continue,
500            };
501            let ref_val = match row.get("ref") {
502                Some(Kind::Str(s)) => s.clone(),
503                _ => continue,
504            };
505
506            const MAX_ENTITY_TAGS: usize = 1_000;
507            const MAX_ENTITY_ID_LEN: usize = 256;
508
509            if ref_val.len() > MAX_ENTITY_ID_LEN {
510                log::warn!("skipping entity with oversized id: {} bytes", ref_val.len());
511                continue;
512            }
513
514            match op {
515                "add" | "update" => {
516                    if let Some(Kind::Dict(entity_box)) = row.get("entity") {
517                        let mut entity: HDict = (**entity_box).clone();
518                        if entity.len() > MAX_ENTITY_TAGS {
519                            log::warn!("skipping oversized entity with {} tags", entity.len());
520                            continue;
521                        }
522                        if let Some(pfx) = prefix {
523                            prefix_refs(&mut entity, pfx);
524                        }
525                        let entity_id = entity.get("id").and_then(|k| {
526                            if let Kind::Ref(r) = k {
527                                Some(r.val.clone())
528                            } else {
529                                None
530                            }
531                        });
532
533                        if let Some(ref eid) = entity_id {
534                            if let Some(&idx) = state.id_map.get(eid.as_str()) {
535                                // Update in place.
536                                state.entities[idx] = Arc::new(entity);
537                            } else {
538                                // Add new entity.
539                                let idx = state.entities.len();
540                                state.id_map.insert(eid.clone(), idx);
541                                state.entities.push(Arc::new(entity));
542                            }
543                        }
544                    }
545                }
546                "remove" => {
547                    let prefixed_ref = match prefix {
548                        Some(pfx) => format!("{pfx}{ref_val}"),
549                        None => ref_val,
550                    };
551                    if let Some(&idx) = state.id_map.get(prefixed_ref.as_str()) {
552                        // Swap-remove and fix up the id_map.
553                        let last_idx = state.entities.len() - 1;
554                        if idx != last_idx {
555                            let last_id = state.entities[last_idx].get("id").and_then(|k| {
556                                if let Kind::Ref(r) = k {
557                                    Some(r.val.clone())
558                                } else {
559                                    None
560                                }
561                            });
562                            state.entities.swap(idx, last_idx);
563                            if let Some(lid) = last_id {
564                                state.id_map.insert(lid, idx);
565                            }
566                        }
567                        state.entities.pop();
568                        state.id_map.remove(prefixed_ref.as_str());
569                    }
570                }
571                _ => {}
572            }
573        }
574
575        // Rebuild index after in-place mutations.
576        state.rebuild_index();
577
578        let count = state.entities.len();
579        drop(state);
580
581        self.cache_version.fetch_add(1, Ordering::Relaxed);
582        *self.last_remote_version.write() = Some(cur_ver);
583        *self.last_sync.write() = Some(Utc::now());
584        self.connected.store(true, Ordering::Relaxed);
585        Ok(count)
586    }
587
588    /// Replace the cached entities and rebuild the owned-ID index and bitmap tag index.
589    ///
590    /// Extracts all `id` Ref values from the given entities into a `HashSet`,
591    /// builds a bitmap tag index for fast filtered reads, then atomically
592    /// replaces the cache, ownership set, and index.
593    pub fn update_cache(&self, entities: Vec<HDict>) {
594        *self.cache_state.write() = CacheState::build(entities);
595        self.cache_version.fetch_add(1, Ordering::Relaxed);
596    }
597
598    /// Returns `true` if this connector owns an entity with the given ID.
599    pub fn owns(&self, id: &str) -> bool {
600        self.cache_state.read().owned_ids.contains(id)
601    }
602
603    /// Look up a single cached entity by ID using the O(1) id_map index.
604    pub fn get_cached_entity(&self, id: &str) -> Option<Arc<HDict>> {
605        let state = self.cache_state.read();
606        state
607            .id_map
608            .get(id)
609            .and_then(|&idx| state.entities.get(idx))
610            .map(Arc::clone)
611    }
612
613    /// Look up multiple cached entities by ID in a single pass.
614    /// Returns found entities and missing IDs.
615    pub fn batch_get_cached(&self, ids: &[&str]) -> (Vec<Arc<HDict>>, Vec<String>) {
616        let state = self.cache_state.read();
617        let mut found = Vec::with_capacity(ids.len());
618        let mut missing = Vec::new();
619        for &id in ids {
620            if let Some(&idx) = state.id_map.get(id) {
621                if let Some(entity) = state.entities.get(idx) {
622                    found.push(Arc::clone(entity));
623                } else {
624                    missing.push(id.to_string());
625                }
626            } else {
627                missing.push(id.to_string());
628            }
629        }
630        (found, missing)
631    }
632
633    /// Returns Arc-wrapped references to all cached entities (cheap pointer copies).
634    pub fn cached_entities(&self) -> Vec<Arc<HDict>> {
635        self.cache_state.read().entities.clone()
636    }
637
638    /// Returns the number of cached entities.
639    pub fn entity_count(&self) -> usize {
640        self.cache_state.read().entities.len()
641    }
642
643    /// Filter cached entities using the bitmap tag index for acceleration.
644    ///
645    /// Returns matching entities up to `limit` (0 = unlimited). Uses the same
646    /// two-phase approach as EntityGraph: bitmap candidates → full filter eval.
647    pub fn filter_cached(&self, filter_expr: &str, limit: usize) -> Result<Vec<HDict>, String> {
648        let ast = parse_filter(filter_expr).map_err(|e| format!("filter error: {e}"))?;
649        Ok(self
650            .filter_cached_with_ast(&ast, limit)
651            .into_iter()
652            .map(|arc| Arc::try_unwrap(arc).unwrap_or_else(|a| (*a).clone()))
653            .collect())
654    }
655
656    /// Filter cached entities using a pre-parsed filter AST.
657    ///
658    /// Avoids redundant parsing when the same filter is applied across
659    /// multiple connectors (e.g. federated reads). Returns Arc-wrapped
660    /// references for cheap cloning in federation pipelines.
661    pub fn filter_cached_with_ast(&self, ast: &FilterNode, limit: usize) -> Vec<Arc<HDict>> {
662        let effective_limit = if limit == 0 { usize::MAX } else { limit };
663
664        let state = self.cache_state.read();
665        let max_id = state.entities.len();
666
667        let candidates = query_planner::bitmap_candidates(ast, &state.tag_index, max_id);
668
669        let mut results = Vec::with_capacity(effective_limit.min(max_id));
670
671        if let Some(ref bitmap) = candidates {
672            for eid in TagBitmapIndex::iter_set_bits(bitmap) {
673                if results.len() >= effective_limit {
674                    break;
675                }
676                if let Some(entity) = state.entities.get(eid)
677                    && matches(ast, entity, None)
678                {
679                    results.push(Arc::clone(entity));
680                }
681            }
682        } else {
683            for entity in state.entities.iter() {
684                if results.len() >= effective_limit {
685                    break;
686                }
687                if matches(ast, entity, None) {
688                    results.push(Arc::clone(entity));
689                }
690            }
691        }
692
693        results
694    }
695
696    /// Add a federated entity ID to the remote watch set.
697    pub fn add_remote_watch(&self, prefixed_id: &str) {
698        self.remote_watch_ids
699            .write()
700            .insert(prefixed_id.to_string());
701    }
702
703    /// Remove a federated entity ID from the remote watch set.
704    pub fn remove_remote_watch(&self, prefixed_id: &str) {
705        self.remote_watch_ids.write().remove(prefixed_id);
706    }
707
708    /// Returns the number of entity IDs being watched remotely.
709    pub fn remote_watch_count(&self) -> usize {
710        self.remote_watch_ids.read().len()
711    }
712
713    /// Returns the current transport mode.
714    pub fn transport_mode(&self) -> TransportMode {
715        match self.transport_mode.load(Ordering::Relaxed) {
716            1 => TransportMode::WebSocket,
717            _ => TransportMode::Http,
718        }
719    }
720
721    /// Returns whether the connector is currently connected (last sync succeeded).
722    pub fn is_connected(&self) -> bool {
723        self.connected.load(Ordering::Relaxed)
724    }
725
726    /// Returns the timestamp of the last successful sync, if any.
727    pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
728        *self.last_sync.read()
729    }
730
731    /// Returns the current cache version counter.
732    pub fn cache_version(&self) -> u64 {
733        self.cache_version.load(Ordering::Relaxed)
734    }
735
736    /// Returns an observable snapshot of this connector's state.
737    pub fn state(&self) -> ConnectorState {
738        let now = Utc::now();
739        let last_sync = self.last_sync_time();
740        ConnectorState {
741            name: self.config.name.clone(),
742            connected: self.is_connected(),
743            cache_version: self.cache_version(),
744            entity_count: self.entity_count(),
745            last_sync_ts: last_sync.map(|ts| ts.timestamp()),
746            staleness_secs: last_sync.map(|ts| (now - ts).num_milliseconds() as f64 / 1000.0),
747        }
748    }
749
750    /// Strip the id_prefix from an entity ID.
751    fn strip_id(&self, prefixed_id: &str) -> String {
752        if let Some(ref prefix) = self.config.id_prefix {
753            prefixed_id
754                .strip_prefix(prefix.as_str())
755                .unwrap_or(prefixed_id)
756                .to_string()
757        } else {
758            prefixed_id.to_string()
759        }
760    }
761
762    /// Ensure the persistent client is connected, establishing a new
763    /// connection if needed. Returns an error if connection fails.
764    async fn ensure_connected(&self) -> Result<(), String> {
765        if self.client.read().await.is_none() {
766            self.connect_persistent().await?;
767        }
768        Ok(())
769    }
770
771    /// Handle a failed proxy operation by clearing the client to force
772    /// reconnection on the next call.
773    async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
774        *self.client.write().await = None;
775        self.connected.store(false, Ordering::Relaxed);
776        format!("{op_name} failed: {e}")
777    }
778
779    /// Proxy a hisRead request to the remote server.
780    ///
781    /// Strips the ID prefix, calls `hisRead` on the remote, and returns the
782    /// response grid. Clears the client on connection error.
783    pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
784        self.ensure_connected().await?;
785        let id = self.strip_id(prefixed_id);
786        let guard = self.client.read().await;
787        let client = guard.as_ref().ok_or("not connected")?;
788        match client.his_read(&id, range).await {
789            Ok(grid) => Ok(grid),
790            Err(e) => {
791                drop(guard);
792                Err(self.on_proxy_error("hisRead", e).await)
793            }
794        }
795    }
796
797    /// Proxy a pointWrite request to the remote server.
798    ///
799    /// Strips the ID prefix, calls `pointWrite` on the remote with the given
800    /// level and value. Clears the client on connection error.
801    pub async fn proxy_point_write(
802        &self,
803        prefixed_id: &str,
804        level: u8,
805        val: &Kind,
806    ) -> Result<HGrid, String> {
807        self.ensure_connected().await?;
808        let id = self.strip_id(prefixed_id);
809        let val = val.clone();
810        let guard = self.client.read().await;
811        let client = guard.as_ref().ok_or("not connected")?;
812        match client.point_write(&id, level, val).await {
813            Ok(grid) => Ok(grid),
814            Err(e) => {
815                drop(guard);
816                Err(self.on_proxy_error("pointWrite", e).await)
817            }
818        }
819    }
820
821    /// Proxy a hisWrite request to the remote server.
822    ///
823    /// Strips the ID prefix, calls `hisWrite` on the remote with the given
824    /// time-series rows. Clears the client on connection error.
825    pub async fn proxy_his_write(
826        &self,
827        prefixed_id: &str,
828        items: Vec<HDict>,
829    ) -> Result<HGrid, String> {
830        self.ensure_connected().await?;
831        let id = self.strip_id(prefixed_id);
832        let guard = self.client.read().await;
833        let client = guard.as_ref().ok_or("not connected")?;
834        match client.his_write(&id, items).await {
835            Ok(grid) => Ok(grid),
836            Err(e) => {
837                drop(guard);
838                Err(self.on_proxy_error("hisWrite", e).await)
839            }
840        }
841    }
842
843    /// Proxy an import request for a single entity to the remote server.
844    ///
845    /// Strips the id prefix from the entity, wraps it in a single-row grid,
846    /// and calls the remote `import` op.
847    pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
848        self.ensure_connected().await?;
849
850        let mut stripped = entity.clone();
851        if let Some(ref prefix) = self.config.id_prefix {
852            strip_prefix_refs(&mut stripped, prefix);
853        }
854
855        let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
856        let cols: Vec<haystack_core::data::HCol> = col_names
857            .iter()
858            .map(|n| haystack_core::data::HCol::new(n.as_str()))
859            .collect();
860        let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
861
862        let guard = self.client.read().await;
863        let client = guard.as_ref().ok_or("not connected")?;
864        match client.call("import", &grid).await {
865            Ok(grid) => Ok(grid),
866            Err(e) => {
867                drop(guard);
868                Err(self.on_proxy_error("import", e).await)
869            }
870        }
871    }
872
873    /// Proxy an invokeAction request to the remote server.
874    ///
875    /// Strips the ID prefix, calls `invokeAction` on the remote with the given
876    /// action name and arguments dict. Clears the client on connection error.
877    pub async fn proxy_invoke_action(
878        &self,
879        prefixed_id: &str,
880        action: &str,
881        args: HDict,
882    ) -> Result<HGrid, String> {
883        self.ensure_connected().await?;
884        let id = self.strip_id(prefixed_id);
885        let action = action.to_string();
886        let guard = self.client.read().await;
887        let client = guard.as_ref().ok_or("not connected")?;
888        match client.invoke_action(&id, &action, args).await {
889            Ok(grid) => Ok(grid),
890            Err(e) => {
891                drop(guard);
892                Err(self.on_proxy_error("invokeAction", e).await)
893            }
894        }
895    }
896
897    /// Spawn a background sync task for this connector.
898    ///
899    /// The task loops forever, syncing entities at the configured interval.
900    /// On error, the persistent client is cleared to force reconnection on
901    /// the next iteration.
902    pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
903        let base_interval = connector.config.effective_sync_interval_secs();
904        let min_interval = base_interval / 2;
905        let max_interval = base_interval * 5;
906
907        tokio::spawn(async move {
908            loop {
909                let prev_count = connector.last_entity_count.load(Ordering::Relaxed);
910
911                match connector.sync().await {
912                    Ok(count) => {
913                        log::debug!("Synced {} entities from {}", count, connector.config.name);
914
915                        // Adaptive interval: increase when no changes, reset when changes detected.
916                        let current = connector.current_interval_secs.load(Ordering::Relaxed);
917                        let new_interval = if count as u64 == prev_count && prev_count > 0 {
918                            // No change — slow down (increase by 50%, capped at max).
919                            (current + current / 2).min(max_interval)
920                        } else {
921                            // Changes detected — reset to base interval.
922                            base_interval
923                        };
924                        connector
925                            .current_interval_secs
926                            .store(new_interval, Ordering::Relaxed);
927                        connector
928                            .last_entity_count
929                            .store(count as u64, Ordering::Relaxed);
930                    }
931                    Err(e) => {
932                        log::error!("Sync failed for {}: {}", connector.config.name, e);
933                        // Clear the client on error to force reconnection
934                        *connector.client.write().await = None;
935                        connector.connected.store(false, Ordering::Relaxed);
936                        // On error, use base interval for retry.
937                        connector
938                            .current_interval_secs
939                            .store(base_interval, Ordering::Relaxed);
940                    }
941                }
942
943                let sleep_secs = connector
944                    .current_interval_secs
945                    .load(Ordering::Relaxed)
946                    .max(min_interval);
947                tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
948            }
949        })
950    }
951}
952
953/// Build a `read` request grid with filter `"*"` (all entities).
954fn build_read_all_grid() -> HGrid {
955    use haystack_core::data::HCol;
956    let mut row = HDict::new();
957    row.set("filter", Kind::Str("*".to_string()));
958    HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
959}
960
961/// Build a request grid for the `changes` op with the given version.
962fn build_changes_grid(since_version: u64) -> HGrid {
963    use haystack_core::data::HCol;
964    use haystack_core::kinds::Number;
965    let mut row = HDict::new();
966    row.set(
967        "version",
968        Kind::Number(Number::unitless(since_version as f64)),
969    );
970    HGrid::from_parts(HDict::new(), vec![HCol::new("version")], vec![row])
971}
972
973impl ConnectorConfig {
974    /// Returns the WebSocket URL. If `ws_url` is set, returns it directly.
975    /// Otherwise derives from `url` by replacing `http://` with `ws://`
976    /// or `https://` with `wss://` and appending `/ws`.
977    pub fn effective_ws_url(&self) -> String {
978        if let Some(ref ws) = self.ws_url {
979            return ws.clone();
980        }
981        let ws = if self.url.starts_with("https://") {
982            self.url.replacen("https://", "wss://", 1)
983        } else {
984            self.url.replacen("http://", "ws://", 1)
985        };
986        format!("{ws}/ws")
987    }
988
989    /// Returns the sync interval in seconds. Defaults to 60 if not set.
990    pub fn effective_sync_interval_secs(&self) -> u64 {
991        self.sync_interval_secs.unwrap_or(60)
992    }
993}
994
995/// Prefix all Ref values in an entity dict.
996///
997/// Prefixes the `id` tag and any tag whose name ends with `Ref`
998/// (e.g. `siteRef`, `equipRef`, `floorRef`, `spaceRef`).
999pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
1000    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
1001
1002    for name in &tag_names {
1003        let should_prefix = name == "id" || name.ends_with("Ref");
1004        if !should_prefix {
1005            continue;
1006        }
1007
1008        if let Some(Kind::Ref(r)) = entity.get(name) {
1009            let new_val = format!("{}{}", prefix, r.val);
1010            let new_ref = HRef::new(new_val, r.dis.clone());
1011            entity.set(name.as_str(), Kind::Ref(new_ref));
1012        }
1013    }
1014}
1015
1016/// Strip a prefix from all Ref values in an entity dict.
1017///
1018/// The inverse of [`prefix_refs`]. Strips the given prefix from the `id` tag
1019/// and any tag whose name ends with `Ref`, but only if the Ref value actually
1020/// starts with the prefix. Preserves `dis` metadata on Refs.
1021pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
1022    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
1023
1024    for name in &tag_names {
1025        let should_strip = name == "id" || name.ends_with("Ref");
1026        if !should_strip {
1027            continue;
1028        }
1029
1030        if let Some(Kind::Ref(r)) = entity.get(name)
1031            && let Some(stripped) = r.val.strip_prefix(prefix)
1032        {
1033            let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
1034            entity.set(name.as_str(), Kind::Ref(new_ref));
1035        }
1036    }
1037}
1038
1039/// Encode an entity grid to HBF binary for efficient federation sync.
1040///
1041/// Provides a compact binary representation that is significantly smaller
1042/// than Zinc or JSON for bulk entity transfer.
1043pub fn encode_sync_payload(grid: &HGrid) -> Result<Vec<u8>, String> {
1044    haystack_core::codecs::encode_grid_binary(grid)
1045}
1046
1047/// Decode an entity grid from HBF binary received during federation sync.
1048pub fn decode_sync_payload(bytes: &[u8]) -> Result<HGrid, String> {
1049    haystack_core::codecs::decode_grid_binary(bytes)
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054    use super::*;
1055    use haystack_core::kinds::HRef;
1056
1057    #[test]
1058    fn connector_new_empty_cache() {
1059        let config = ConnectorConfig {
1060            name: "test".to_string(),
1061            url: "http://localhost:8080/api".to_string(),
1062            username: "user".to_string(),
1063            password: "pass".to_string(),
1064            id_prefix: None,
1065            ws_url: None,
1066            sync_interval_secs: None,
1067            client_cert: None,
1068            client_key: None,
1069            ca_cert: None,
1070            domain: None,
1071        };
1072        let connector = Connector::new(config);
1073        assert_eq!(connector.entity_count(), 0);
1074        assert!(connector.cached_entities().is_empty());
1075    }
1076
1077    #[test]
1078    fn connector_config_deserialization() {
1079        let json = r#"{
1080            "name": "Remote Server",
1081            "url": "http://remote:8080/api",
1082            "username": "admin",
1083            "password": "secret",
1084            "id_prefix": "r1-"
1085        }"#;
1086        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1087        assert_eq!(config.name, "Remote Server");
1088        assert_eq!(config.url, "http://remote:8080/api");
1089        assert_eq!(config.username, "admin");
1090        assert_eq!(config.password, "secret");
1091        assert_eq!(config.id_prefix, Some("r1-".to_string()));
1092    }
1093
1094    #[test]
1095    fn connector_config_deserialization_without_prefix() {
1096        let json = r#"{
1097            "name": "Remote",
1098            "url": "http://remote:8080/api",
1099            "username": "admin",
1100            "password": "secret"
1101        }"#;
1102        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1103        assert_eq!(config.id_prefix, None);
1104    }
1105
1106    #[test]
1107    fn id_prefix_application() {
1108        let mut entity = HDict::new();
1109        entity.set("id", Kind::Ref(HRef::from_val("site-1")));
1110        entity.set("dis", Kind::Str("Main Site".to_string()));
1111        entity.set("site", Kind::Marker);
1112        entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1113        entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1114        entity.set(
1115            "floorRef",
1116            Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
1117        );
1118
1119        prefix_refs(&mut entity, "r1-");
1120
1121        // id should be prefixed
1122        match entity.get("id") {
1123            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1124            other => panic!("expected Ref, got {other:?}"),
1125        }
1126
1127        // siteRef should be prefixed
1128        match entity.get("siteRef") {
1129            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1130            other => panic!("expected Ref, got {other:?}"),
1131        }
1132
1133        // equipRef should be prefixed
1134        match entity.get("equipRef") {
1135            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
1136            other => panic!("expected Ref, got {other:?}"),
1137        }
1138
1139        // floorRef should be prefixed, preserving dis
1140        match entity.get("floorRef") {
1141            Some(Kind::Ref(r)) => {
1142                assert_eq!(r.val, "r1-floor-1");
1143                assert_eq!(r.dis, Some("Floor 1".to_string()));
1144            }
1145            other => panic!("expected Ref, got {other:?}"),
1146        }
1147
1148        // Non-ref tags should not be changed
1149        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1150        assert_eq!(entity.get("site"), Some(&Kind::Marker));
1151    }
1152
1153    #[test]
1154    fn id_prefix_skips_non_ref_values() {
1155        let mut entity = HDict::new();
1156        entity.set("id", Kind::Ref(HRef::from_val("point-1")));
1157        // A tag ending in "Ref" but whose value is not actually a Ref
1158        entity.set("customRef", Kind::Str("not-a-ref".to_string()));
1159
1160        prefix_refs(&mut entity, "p-");
1161
1162        // id should be prefixed
1163        match entity.get("id") {
1164            Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
1165            other => panic!("expected Ref, got {other:?}"),
1166        }
1167
1168        // customRef is a Str, not a Ref, so it should be unchanged
1169        assert_eq!(
1170            entity.get("customRef"),
1171            Some(&Kind::Str("not-a-ref".to_string()))
1172        );
1173    }
1174
1175    #[test]
1176    fn connector_config_deserialization_full() {
1177        let json = r#"{
1178            "name": "Full Config",
1179            "url": "https://remote:8443/api",
1180            "username": "admin",
1181            "password": "secret",
1182            "id_prefix": "r1-",
1183            "ws_url": "wss://remote:8443/api/ws",
1184            "sync_interval_secs": 30,
1185            "client_cert": "/etc/certs/client.pem",
1186            "client_key": "/etc/certs/client-key.pem",
1187            "ca_cert": "/etc/certs/ca.pem"
1188        }"#;
1189        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1190        assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
1191        assert_eq!(config.sync_interval_secs, Some(30));
1192        assert_eq!(
1193            config.client_cert,
1194            Some("/etc/certs/client.pem".to_string())
1195        );
1196        assert_eq!(
1197            config.client_key,
1198            Some("/etc/certs/client-key.pem".to_string())
1199        );
1200        assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
1201    }
1202
1203    #[test]
1204    fn strip_prefix_refs_reverses_prefix() {
1205        let mut entity = HDict::new();
1206        entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
1207        entity.set("dis", Kind::Str("Main Site".to_string()));
1208        entity.set("site", Kind::Marker);
1209        entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
1210        entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
1211
1212        strip_prefix_refs(&mut entity, "r1-");
1213
1214        match entity.get("id") {
1215            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1216            other => panic!("expected Ref, got {other:?}"),
1217        }
1218        match entity.get("siteRef") {
1219            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1220            other => panic!("expected Ref, got {other:?}"),
1221        }
1222        match entity.get("equipRef") {
1223            Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
1224            other => panic!("expected Ref, got {other:?}"),
1225        }
1226        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1227    }
1228
1229    #[test]
1230    fn strip_prefix_refs_ignores_non_matching() {
1231        let mut entity = HDict::new();
1232        entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
1233
1234        strip_prefix_refs(&mut entity, "r1-");
1235
1236        match entity.get("id") {
1237            Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
1238            other => panic!("expected Ref, got {other:?}"),
1239        }
1240    }
1241
1242    #[test]
1243    fn derive_ws_url_from_http() {
1244        let config = ConnectorConfig {
1245            name: "test".to_string(),
1246            url: "http://remote:8080/api".to_string(),
1247            username: "u".to_string(),
1248            password: "p".to_string(),
1249            id_prefix: None,
1250            ws_url: None,
1251            sync_interval_secs: None,
1252            client_cert: None,
1253            client_key: None,
1254            ca_cert: None,
1255            domain: None,
1256        };
1257        assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
1258    }
1259
1260    #[test]
1261    fn derive_ws_url_from_https() {
1262        let config = ConnectorConfig {
1263            name: "test".to_string(),
1264            url: "https://remote:8443/api".to_string(),
1265            username: "u".to_string(),
1266            password: "p".to_string(),
1267            id_prefix: None,
1268            ws_url: None,
1269            sync_interval_secs: None,
1270            client_cert: None,
1271            client_key: None,
1272            ca_cert: None,
1273            domain: None,
1274        };
1275        assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
1276    }
1277
1278    #[test]
1279    fn explicit_ws_url_overrides_derived() {
1280        let config = ConnectorConfig {
1281            name: "test".to_string(),
1282            url: "http://remote:8080/api".to_string(),
1283            username: "u".to_string(),
1284            password: "p".to_string(),
1285            id_prefix: None,
1286            ws_url: Some("ws://custom:9999/ws".to_string()),
1287            sync_interval_secs: None,
1288            client_cert: None,
1289            client_key: None,
1290            ca_cert: None,
1291            domain: None,
1292        };
1293        assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
1294    }
1295
1296    #[test]
1297    fn connector_tracks_entity_ids_in_ownership() {
1298        let config = ConnectorConfig {
1299            name: "test".to_string(),
1300            url: "http://localhost:8080/api".to_string(),
1301            username: "user".to_string(),
1302            password: "pass".to_string(),
1303            id_prefix: Some("t-".to_string()),
1304            ws_url: None,
1305            sync_interval_secs: None,
1306            client_cert: None,
1307            client_key: None,
1308            ca_cert: None,
1309            domain: None,
1310        };
1311        let connector = Connector::new(config);
1312        assert!(!connector.owns("t-site-1"));
1313
1314        // Simulate populating cache
1315        {
1316            let mut entity = HDict::new();
1317            entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
1318            connector.update_cache(vec![entity]);
1319        }
1320
1321        assert!(connector.owns("t-site-1"));
1322        assert!(!connector.owns("other-1"));
1323    }
1324
1325    #[test]
1326    fn connector_new_defaults_transport_and_connected() {
1327        let config = ConnectorConfig {
1328            name: "test".to_string(),
1329            url: "http://localhost:8080/api".to_string(),
1330            username: "user".to_string(),
1331            password: "pass".to_string(),
1332            id_prefix: None,
1333            ws_url: None,
1334            sync_interval_secs: None,
1335            client_cert: None,
1336            client_key: None,
1337            ca_cert: None,
1338            domain: None,
1339        };
1340        let connector = Connector::new(config);
1341        assert_eq!(connector.transport_mode(), TransportMode::Http);
1342        assert!(!connector.is_connected());
1343        assert!(connector.last_sync_time().is_none());
1344    }
1345
1346    #[test]
1347    fn connector_config_new_fields_default_to_none() {
1348        let json = r#"{
1349            "name": "Minimal",
1350            "url": "http://remote:8080/api",
1351            "username": "user",
1352            "password": "pass"
1353        }"#;
1354        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1355        assert_eq!(config.ws_url, None);
1356        assert_eq!(config.sync_interval_secs, None);
1357        assert_eq!(config.client_cert, None);
1358        assert_eq!(config.client_key, None);
1359        assert_eq!(config.ca_cert, None);
1360    }
1361
1362    #[test]
1363    fn remote_watch_add_and_remove() {
1364        let config = ConnectorConfig {
1365            name: "test".to_string(),
1366            url: "http://localhost:8080/api".to_string(),
1367            username: "user".to_string(),
1368            password: "pass".to_string(),
1369            id_prefix: Some("r-".to_string()),
1370            ws_url: None,
1371            sync_interval_secs: None,
1372            client_cert: None,
1373            client_key: None,
1374            ca_cert: None,
1375            domain: None,
1376        };
1377        let connector = Connector::new(config);
1378        assert_eq!(connector.remote_watch_count(), 0);
1379
1380        connector.add_remote_watch("r-site-1");
1381        assert_eq!(connector.remote_watch_count(), 1);
1382
1383        connector.add_remote_watch("r-equip-2");
1384        assert_eq!(connector.remote_watch_count(), 2);
1385
1386        // Duplicate add is idempotent (HashSet).
1387        connector.add_remote_watch("r-site-1");
1388        assert_eq!(connector.remote_watch_count(), 2);
1389
1390        connector.remove_remote_watch("r-site-1");
1391        assert_eq!(connector.remote_watch_count(), 1);
1392
1393        // Removing a non-existent ID is a no-op.
1394        connector.remove_remote_watch("r-nonexistent");
1395        assert_eq!(connector.remote_watch_count(), 1);
1396
1397        connector.remove_remote_watch("r-equip-2");
1398        assert_eq!(connector.remote_watch_count(), 0);
1399    }
1400
1401    fn make_test_entities() -> Vec<HDict> {
1402        let mut site = HDict::new();
1403        site.set("id", Kind::Ref(HRef::from_val("site-1")));
1404        site.set("site", Kind::Marker);
1405        site.set("dis", Kind::Str("Main Site".into()));
1406
1407        let mut equip = HDict::new();
1408        equip.set("id", Kind::Ref(HRef::from_val("equip-1")));
1409        equip.set("equip", Kind::Marker);
1410        equip.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1411
1412        let mut point = HDict::new();
1413        point.set("id", Kind::Ref(HRef::from_val("point-1")));
1414        point.set("point", Kind::Marker);
1415        point.set("sensor", Kind::Marker);
1416        point.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1417
1418        vec![site, equip, point]
1419    }
1420
1421    #[test]
1422    fn filter_cached_returns_matching_entities() {
1423        let config = ConnectorConfig {
1424            name: "test".to_string(),
1425            url: "http://localhost:8080/api".to_string(),
1426            username: "user".to_string(),
1427            password: "pass".to_string(),
1428            id_prefix: None,
1429            ws_url: None,
1430            sync_interval_secs: None,
1431            client_cert: None,
1432            client_key: None,
1433            ca_cert: None,
1434            domain: None,
1435        };
1436        let connector = Connector::new(config);
1437        connector.update_cache(make_test_entities());
1438
1439        // Filter for site
1440        let results = connector.filter_cached("site", 0).unwrap();
1441        assert_eq!(results.len(), 1);
1442        assert_eq!(
1443            results[0].get("id"),
1444            Some(&Kind::Ref(HRef::from_val("site-1")))
1445        );
1446
1447        // Filter for equip
1448        let results = connector.filter_cached("equip", 0).unwrap();
1449        assert_eq!(results.len(), 1);
1450
1451        // Filter for point and sensor
1452        let results = connector.filter_cached("point and sensor", 0).unwrap();
1453        assert_eq!(results.len(), 1);
1454
1455        // Filter for something that doesn't exist
1456        let results = connector.filter_cached("ahu", 0).unwrap();
1457        assert!(results.is_empty());
1458    }
1459
1460    #[test]
1461    fn filter_cached_respects_limit() {
1462        let config = ConnectorConfig {
1463            name: "test".to_string(),
1464            url: "http://localhost:8080/api".to_string(),
1465            username: "user".to_string(),
1466            password: "pass".to_string(),
1467            id_prefix: None,
1468            ws_url: None,
1469            sync_interval_secs: None,
1470            client_cert: None,
1471            client_key: None,
1472            ca_cert: None,
1473            domain: None,
1474        };
1475        let connector = Connector::new(config);
1476
1477        // Add multiple points
1478        let mut entities = Vec::new();
1479        for i in 0..10 {
1480            let mut p = HDict::new();
1481            p.set("id", Kind::Ref(HRef::from_val(format!("point-{i}"))));
1482            p.set("point", Kind::Marker);
1483            entities.push(p);
1484        }
1485        connector.update_cache(entities);
1486
1487        let results = connector.filter_cached("point", 3).unwrap();
1488        assert_eq!(results.len(), 3);
1489    }
1490
1491    #[test]
1492    fn filter_cached_or_query() {
1493        let config = ConnectorConfig {
1494            name: "test".to_string(),
1495            url: "http://localhost:8080/api".to_string(),
1496            username: "user".to_string(),
1497            password: "pass".to_string(),
1498            id_prefix: None,
1499            ws_url: None,
1500            sync_interval_secs: None,
1501            client_cert: None,
1502            client_key: None,
1503            ca_cert: None,
1504            domain: None,
1505        };
1506        let connector = Connector::new(config);
1507        connector.update_cache(make_test_entities());
1508
1509        // site or equip should return 2
1510        let results = connector.filter_cached("site or equip", 0).unwrap();
1511        assert_eq!(results.len(), 2);
1512    }
1513
1514    #[test]
1515    fn cache_version_starts_at_zero() {
1516        let connector = Connector::new(ConnectorConfig {
1517            name: "test".to_string(),
1518            url: "http://localhost:8080/api".to_string(),
1519            username: "user".to_string(),
1520            password: "pass".to_string(),
1521            id_prefix: None,
1522            ws_url: None,
1523            sync_interval_secs: None,
1524            client_cert: None,
1525            client_key: None,
1526            ca_cert: None,
1527            domain: None,
1528        });
1529        assert_eq!(connector.cache_version(), 0);
1530    }
1531
1532    #[test]
1533    fn cache_version_increments_on_update() {
1534        let connector = Connector::new(ConnectorConfig {
1535            name: "test".to_string(),
1536            url: "http://localhost:8080/api".to_string(),
1537            username: "user".to_string(),
1538            password: "pass".to_string(),
1539            id_prefix: None,
1540            ws_url: None,
1541            sync_interval_secs: None,
1542            client_cert: None,
1543            client_key: None,
1544            ca_cert: None,
1545            domain: None,
1546        });
1547        assert_eq!(connector.cache_version(), 0);
1548
1549        connector.update_cache(vec![]);
1550        assert_eq!(connector.cache_version(), 1);
1551
1552        let mut e = HDict::new();
1553        e.set("id", Kind::Ref(HRef::from_val("p-1")));
1554        connector.update_cache(vec![e]);
1555        assert_eq!(connector.cache_version(), 2);
1556    }
1557
1558    #[test]
1559    fn connector_state_populated() {
1560        let connector = Connector::new(ConnectorConfig {
1561            name: "alpha".to_string(),
1562            url: "http://localhost:8080/api".to_string(),
1563            username: "user".to_string(),
1564            password: "pass".to_string(),
1565            id_prefix: None,
1566            ws_url: None,
1567            sync_interval_secs: None,
1568            client_cert: None,
1569            client_key: None,
1570            ca_cert: None,
1571            domain: None,
1572        });
1573
1574        let st = connector.state();
1575        assert_eq!(st.name, "alpha");
1576        assert!(!st.connected);
1577        assert_eq!(st.cache_version, 0);
1578        assert_eq!(st.entity_count, 0);
1579        assert!(st.last_sync_ts.is_none());
1580        assert!(st.staleness_secs.is_none());
1581
1582        // After cache update, version and count change
1583        let mut e = HDict::new();
1584        e.set("id", Kind::Ref(HRef::from_val("s-1")));
1585        e.set("site", Kind::Marker);
1586        connector.update_cache(vec![e]);
1587
1588        let st = connector.state();
1589        assert_eq!(st.cache_version, 1);
1590        assert_eq!(st.entity_count, 1);
1591    }
1592}