Skip to main content

haystack_server/
connector.rs

1//! Connector for fetching entities from a remote Haystack server.
2//!
3//! # Overview
4//!
5//! A [`Connector`] maintains a persistent connection (HTTP or WebSocket) to a
6//! remote Haystack server, periodically syncs entities into a local cache, and
7//! proxies write operations (`hisRead`, `hisWrite`, `pointWrite`,
8//! `invokeAction`, `import`) to the remote when the target entity is owned by
9//! that connector.
10//!
11//! # Lifecycle
12//!
13//! 1. **Construction** — [`Connector::new`] creates a connector with an empty
14//!    cache from a [`ConnectorConfig`] (parsed from the TOML federation config).
15//! 2. **Connection** — on first sync, [`Connector::sync`] calls
16//!    `connect_persistent()` which tries WebSocket first, falling back to HTTP.
17//! 3. **Sync loop** — [`Connector::spawn_sync_task`] spawns a tokio task that
18//!    calls [`Connector::sync`] at an adaptive interval. Sync tries incremental
19//!    delta sync via the `changes` op first, falling back to full entity fetch.
20//! 4. **Cache** — [`Connector::update_cache`] replaces the cached entity list
21//!    and rebuilds the bitmap tag index and owned-ID set for fast lookups.
22//! 5. **Proxy** — write ops check [`Connector::owns`] and forward to the remote
23//!    server via `proxy_*` methods, stripping/re-adding the ID prefix as needed.
24//!
25//! # ID Prefixing
26//!
27//! When `id_prefix` is configured, all Ref values (`id`, `siteRef`, etc.) are
28//! prefixed on sync and stripped before proxying. See [`prefix_refs`] and
29//! [`strip_prefix_refs`].
30
31use std::collections::HashMap;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
35
36use chrono::{DateTime, Utc};
37use parking_lot::RwLock;
38
39use haystack_client::HaystackClient;
40use haystack_client::transport::http::HttpTransport;
41use haystack_client::transport::ws::WsTransport;
42use haystack_core::data::{HDict, HGrid};
43use haystack_core::filter::{matches, parse_filter};
44use haystack_core::graph::bitmap::TagBitmapIndex;
45use haystack_core::graph::query_planner;
46use haystack_core::kinds::{HRef, Kind};
47
48/// Type-erased persistent connection to a remote Haystack server.
49///
50/// Wraps either an HTTP or WebSocket `HaystackClient`, allowing the connector
51/// to hold a single persistent connection regardless of transport.
52enum ConnectorClient {
53    Http(HaystackClient<HttpTransport>),
54    Ws(HaystackClient<WsTransport>),
55}
56
57impl ConnectorClient {
58    /// Call a Haystack op through the underlying client, returning the
59    /// response grid or an error string.
60    async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
61        match self {
62            ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
63            ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
64        }
65    }
66
67    /// Read historical data for a point.
68    async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
69        match self {
70            ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
71            ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
72        }
73    }
74
75    /// Write historical data for a point.
76    async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
77        match self {
78            ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
79            ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
80        }
81    }
82
83    /// Write a value to a writable point.
84    async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
85        match self {
86            ConnectorClient::Http(c) => c
87                .point_write(id, level, val)
88                .await
89                .map_err(|e| e.to_string()),
90            ConnectorClient::Ws(c) => c
91                .point_write(id, level, val)
92                .await
93                .map_err(|e| e.to_string()),
94        }
95    }
96
97    /// Invoke an action on an entity.
98    async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
99        match self {
100            ConnectorClient::Http(c) => c
101                .invoke_action(id, action, args)
102                .await
103                .map_err(|e| e.to_string()),
104            ConnectorClient::Ws(c) => c
105                .invoke_action(id, action, args)
106                .await
107                .map_err(|e| e.to_string()),
108        }
109    }
110}
111
112/// Transport protocol used by a connector.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114#[repr(u8)]
115pub enum TransportMode {
116    Http = 0,
117    WebSocket = 1,
118}
119
120/// Configuration for a remote Haystack server connection.
121#[derive(Debug, Clone, serde::Deserialize)]
122pub struct ConnectorConfig {
123    /// Display name for this connector.
124    pub name: String,
125    /// Base URL of the remote Haystack API (e.g. "http://remote:8080/api").
126    pub url: String,
127    /// Username for SCRAM authentication.
128    pub username: String,
129    /// Password for SCRAM authentication.
130    pub password: String,
131    /// Optional tag prefix to namespace remote entity IDs (e.g. "remote1-").
132    pub id_prefix: Option<String>,
133    /// WebSocket URL (e.g. "ws://remote:8080/api/ws"). Derived from url if omitted.
134    pub ws_url: Option<String>,
135    /// Fallback polling interval in seconds (default: 60).
136    pub sync_interval_secs: Option<u64>,
137    /// Path to PEM client certificate for mTLS.
138    pub client_cert: Option<String>,
139    /// Path to PEM client private key for mTLS.
140    pub client_key: Option<String>,
141    /// Path to PEM CA certificate for server verification.
142    pub ca_cert: Option<String>,
143}
144
145/// A connector that can fetch entities from a remote Haystack server.
146///
147/// Holds a persistent connection, a local entity cache with bitmap index,
148/// Consolidated cache state — entities, ownership set, bitmap index, and ID map
149/// bundled under a single lock for atomic reads/updates and consistency.
150struct CacheState {
151    entities: Vec<HDict>,
152    owned_ids: HashSet<String>,
153    tag_index: TagBitmapIndex,
154    id_map: HashMap<String, usize>,
155}
156
157impl CacheState {
158    fn empty() -> Self {
159        Self {
160            entities: Vec::new(),
161            owned_ids: HashSet::new(),
162            tag_index: TagBitmapIndex::new(),
163            id_map: HashMap::new(),
164        }
165    }
166
167    /// Build a fully indexed cache state from a list of entities.
168    fn build(entities: Vec<HDict>) -> Self {
169        let mut owned_ids = HashSet::with_capacity(entities.len());
170        let mut tag_index = TagBitmapIndex::new();
171        let mut id_map = HashMap::with_capacity(entities.len());
172
173        for (eid, entity) in entities.iter().enumerate() {
174            if let Some(Kind::Ref(r)) = entity.get("id") {
175                owned_ids.insert(r.val.clone());
176                id_map.insert(r.val.clone(), eid);
177            }
178            let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
179            tag_index.add(eid, &tags);
180        }
181
182        Self {
183            entities,
184            owned_ids,
185            tag_index,
186            id_map,
187        }
188    }
189
190    /// Rebuild tag index and id_map from current entities.
191    fn rebuild_index(&mut self) {
192        self.owned_ids.clear();
193        self.tag_index = TagBitmapIndex::new();
194        self.id_map.clear();
195
196        for (eid, entity) in self.entities.iter().enumerate() {
197            if let Some(Kind::Ref(r)) = entity.get("id") {
198                self.owned_ids.insert(r.val.clone());
199                self.id_map.insert(r.val.clone(), eid);
200            }
201            let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
202            self.tag_index.add(eid, &tags);
203        }
204    }
205}
206
207/// and state for adaptive sync and federation proxy operations. See the
208/// [module-level documentation](self) for the full lifecycle.
209pub struct Connector {
210    pub config: ConnectorConfig,
211    /// Consolidated cache state: entities, ownership index, bitmap tag index, and
212    /// ID→position map — all under a single lock for atomic consistency.
213    cache_state: RwLock<CacheState>,
214    /// Entity IDs being watched remotely (prefixed IDs).
215    remote_watch_ids: RwLock<HashSet<String>>,
216    /// Transport protocol (HTTP or WebSocket).
217    transport_mode: AtomicU8,
218    /// Whether the connector is currently connected (last sync succeeded).
219    connected: AtomicBool,
220    /// Timestamp of the last successful sync.
221    last_sync: RwLock<Option<DateTime<Utc>>>,
222    /// Remote graph version from last successful sync (for delta sync).
223    last_remote_version: RwLock<Option<u64>>,
224    /// Persistent client connection used by sync and background tasks.
225    /// Lazily initialized on first sync. Cleared on connection error.
226    /// Uses `tokio::sync::RwLock` because the guard is held across `.await`.
227    client: tokio::sync::RwLock<Option<ConnectorClient>>,
228    /// Current adaptive sync interval in seconds (adjusted based on change rate).
229    current_interval_secs: AtomicU64,
230    /// Number of entities from last sync (for change detection).
231    last_entity_count: AtomicU64,
232}
233
234impl Connector {
235    /// Create a new connector with an empty cache.
236    ///
237    /// The connector starts disconnected; a persistent connection is
238    /// established lazily on the first [`sync`](Self::sync) call.
239    pub fn new(config: ConnectorConfig) -> Self {
240        let base_interval = config.effective_sync_interval_secs();
241        Self {
242            config,
243            cache_state: RwLock::new(CacheState::empty()),
244            remote_watch_ids: RwLock::new(HashSet::new()),
245            transport_mode: AtomicU8::new(TransportMode::Http as u8),
246            connected: AtomicBool::new(false),
247            last_sync: RwLock::new(None),
248            last_remote_version: RwLock::new(None),
249            client: tokio::sync::RwLock::new(None),
250            current_interval_secs: AtomicU64::new(base_interval),
251            last_entity_count: AtomicU64::new(0),
252        }
253    }
254
255    /// Attempt to connect to the remote server. Tries WebSocket first,
256    /// falls back to HTTP.
257    async fn connect_persistent(&self) -> Result<(), String> {
258        // Try WebSocket first
259        let ws_url = self.config.effective_ws_url();
260        match HaystackClient::connect_ws(
261            &self.config.url,
262            &ws_url,
263            &self.config.username,
264            &self.config.password,
265        )
266        .await
267        {
268            Ok(ws_client) => {
269                *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
270                self.transport_mode
271                    .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
272                self.connected.store(true, Ordering::Relaxed);
273                log::info!("Connected to {} via WebSocket", self.config.name);
274                return Ok(());
275            }
276            Err(e) => {
277                log::warn!(
278                    "WS connection to {} failed: {}, trying HTTP",
279                    self.config.name,
280                    e
281                );
282            }
283        }
284
285        // Fall back to HTTP
286        match HaystackClient::connect(
287            &self.config.url,
288            &self.config.username,
289            &self.config.password,
290        )
291        .await
292        {
293            Ok(http_client) => {
294                *self.client.write().await = Some(ConnectorClient::Http(http_client));
295                self.transport_mode
296                    .store(TransportMode::Http as u8, Ordering::Relaxed);
297                self.connected.store(true, Ordering::Relaxed);
298                log::info!("Connected to {} via HTTP", self.config.name);
299                Ok(())
300            }
301            Err(e) => {
302                self.connected.store(false, Ordering::Relaxed);
303                Err(format!("connection failed: {e}"))
304            }
305        }
306    }
307
308    /// Connect to the remote server, fetch entities, and update cache.
309    ///
310    /// Tries incremental delta sync first (via the `changes` op) if we have a
311    /// known remote version. Falls back to full sync on first connect, when the
312    /// remote doesn't support `changes`, or when the version gap is too large.
313    ///
314    /// Returns the number of cached entities after sync, or an error string.
315    pub async fn sync(&self) -> Result<usize, String> {
316        // Ensure we have a connection
317        if self.client.read().await.is_none() {
318            self.connect_persistent().await?;
319        }
320
321        // Attempt delta sync if we have a previous version.
322        let maybe_ver = *self.last_remote_version.read();
323        if let Some(last_ver) = maybe_ver {
324            match self.try_delta_sync(last_ver).await {
325                Ok(count) => return Ok(count),
326                Err(e) => {
327                    log::debug!(
328                        "Delta sync failed for {}, falling back to full: {e}",
329                        self.config.name,
330                    );
331                }
332            }
333        }
334
335        // Full sync: read all entities
336        self.full_sync().await
337    }
338
339    /// Full sync: fetch all entities from remote and replace cache.
340    async fn full_sync(&self) -> Result<usize, String> {
341        let grid = {
342            let client = self.client.read().await;
343            let client = client.as_ref().ok_or("not connected")?;
344            client.call("read", &build_read_all_grid()).await
345        };
346
347        let grid = grid.map_err(|e| {
348            self.connected.store(false, Ordering::Relaxed);
349            format!("read failed: {e}")
350        })?;
351
352        let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
353
354        // Apply id prefix if configured.
355        if let Some(ref prefix) = self.config.id_prefix {
356            for entity in &mut entities {
357                prefix_refs(entity, prefix);
358            }
359        }
360
361        let count = entities.len();
362        self.update_cache(entities);
363        *self.last_sync.write() = Some(Utc::now());
364        self.connected.store(true, Ordering::Relaxed);
365
366        // Probe remote version for next delta sync attempt.
367        self.probe_remote_version().await;
368
369        Ok(count)
370    }
371
372    /// Try to discover the remote graph version by calling `changes` with a
373    /// high version number. The response meta contains `curVer` which we store
374    /// for future delta syncs. Failures are silently ignored (remote may not
375    /// support the `changes` op).
376    async fn probe_remote_version(&self) {
377        let grid = {
378            let client = self.client.read().await;
379            let Some(client) = client.as_ref() else {
380                return;
381            };
382            client.call("changes", &build_changes_grid(u64::MAX)).await
383        };
384        if let Ok(grid) = grid
385            && let Some(Kind::Number(n)) = grid.meta.get("curVer")
386        {
387            *self.last_remote_version.write() = Some(n.val as u64);
388        }
389    }
390
391    /// Attempt incremental delta sync using the `changes` op.
392    ///
393    /// Sends the last known remote version to the server. The server returns
394    /// only the diffs since that version. We apply them incrementally to our
395    /// cache, avoiding a full entity set transfer.
396    async fn try_delta_sync(&self, since_version: u64) -> Result<usize, String> {
397        let changes_grid = build_changes_grid(since_version);
398        let grid = {
399            let client = self.client.read().await;
400            let client = client.as_ref().ok_or("not connected")?;
401            client.call("changes", &changes_grid).await
402        }
403        .map_err(|e| format!("changes op failed: {e}"))?;
404
405        // If the remote returned an error grid, fall back.
406        if grid.is_err() {
407            return Err("remote returned error grid for changes op".to_string());
408        }
409
410        // Extract current version from response meta.
411        let cur_ver = grid
412            .meta
413            .get("curVer")
414            .and_then(|k| {
415                if let Kind::Number(n) = k {
416                    Some(n.val as u64)
417                } else {
418                    None
419                }
420            })
421            .ok_or("changes response missing curVer in meta")?;
422
423        // No changes — nothing to do.
424        if grid.rows.is_empty() {
425            *self.last_remote_version.write() = Some(cur_ver);
426            *self.last_sync.write() = Some(Utc::now());
427            return Ok(self.cache_state.read().entities.len());
428        }
429
430        // Apply diffs in-place under a single write lock.
431        let mut state = self.cache_state.write();
432        let prefix = self.config.id_prefix.as_deref();
433
434        for row in &grid.rows {
435            let op = match row.get("op") {
436                Some(Kind::Str(s)) => s.as_str(),
437                _ => continue,
438            };
439            let ref_val = match row.get("ref") {
440                Some(Kind::Str(s)) => s.clone(),
441                _ => continue,
442            };
443
444            match op {
445                "add" | "update" => {
446                    if let Some(Kind::Dict(entity_box)) = row.get("entity") {
447                        let mut entity: HDict = (**entity_box).clone();
448                        if let Some(pfx) = prefix {
449                            prefix_refs(&mut entity, pfx);
450                        }
451                        let entity_id = entity.get("id").and_then(|k| {
452                            if let Kind::Ref(r) = k {
453                                Some(r.val.clone())
454                            } else {
455                                None
456                            }
457                        });
458
459                        if let Some(ref eid) = entity_id {
460                            if let Some(&idx) = state.id_map.get(eid.as_str()) {
461                                // Update in place.
462                                state.entities[idx] = entity;
463                            } else {
464                                // Add new entity.
465                                let idx = state.entities.len();
466                                state.id_map.insert(eid.clone(), idx);
467                                state.entities.push(entity);
468                            }
469                        }
470                    }
471                }
472                "remove" => {
473                    let prefixed_ref = match prefix {
474                        Some(pfx) => format!("{pfx}{ref_val}"),
475                        None => ref_val,
476                    };
477                    if let Some(&idx) = state.id_map.get(prefixed_ref.as_str()) {
478                        // Swap-remove and fix up the id_map.
479                        let last_idx = state.entities.len() - 1;
480                        if idx != last_idx {
481                            let last_id = state.entities[last_idx].get("id").and_then(|k| {
482                                if let Kind::Ref(r) = k {
483                                    Some(r.val.clone())
484                                } else {
485                                    None
486                                }
487                            });
488                            state.entities.swap(idx, last_idx);
489                            if let Some(lid) = last_id {
490                                state.id_map.insert(lid, idx);
491                            }
492                        }
493                        state.entities.pop();
494                        state.id_map.remove(prefixed_ref.as_str());
495                    }
496                }
497                _ => {}
498            }
499        }
500
501        // Rebuild index after in-place mutations.
502        state.rebuild_index();
503
504        let count = state.entities.len();
505        drop(state);
506
507        *self.last_remote_version.write() = Some(cur_ver);
508        *self.last_sync.write() = Some(Utc::now());
509        self.connected.store(true, Ordering::Relaxed);
510        Ok(count)
511    }
512
513    /// Replace the cached entities and rebuild the owned-ID index and bitmap tag index.
514    ///
515    /// Extracts all `id` Ref values from the given entities into a `HashSet`,
516    /// builds a bitmap tag index for fast filtered reads, then atomically
517    /// replaces the cache, ownership set, and index.
518    pub fn update_cache(&self, entities: Vec<HDict>) {
519        *self.cache_state.write() = CacheState::build(entities);
520    }
521
522    /// Returns `true` if this connector owns an entity with the given ID.
523    pub fn owns(&self, id: &str) -> bool {
524        self.cache_state.read().owned_ids.contains(id)
525    }
526
527    /// Look up a single cached entity by ID using the O(1) id_map index.
528    pub fn get_cached_entity(&self, id: &str) -> Option<HDict> {
529        let state = self.cache_state.read();
530        state
531            .id_map
532            .get(id)
533            .and_then(|&idx| state.entities.get(idx))
534            .cloned()
535    }
536
537    /// Look up multiple cached entities by ID in a single pass.
538    /// Returns found entities and missing IDs.
539    pub fn batch_get_cached(&self, ids: &[&str]) -> (Vec<HDict>, Vec<String>) {
540        let state = self.cache_state.read();
541        let mut found = Vec::with_capacity(ids.len());
542        let mut missing = Vec::new();
543        for &id in ids {
544            if let Some(&idx) = state.id_map.get(id) {
545                if let Some(entity) = state.entities.get(idx) {
546                    found.push(entity.clone());
547                } else {
548                    missing.push(id.to_string());
549                }
550            } else {
551                missing.push(id.to_string());
552            }
553        }
554        (found, missing)
555    }
556
557    /// Returns a clone of all cached entities.
558    pub fn cached_entities(&self) -> Vec<HDict> {
559        self.cache_state.read().entities.clone()
560    }
561
562    /// Returns the number of cached entities.
563    pub fn entity_count(&self) -> usize {
564        self.cache_state.read().entities.len()
565    }
566
567    /// Filter cached entities using the bitmap tag index for acceleration.
568    ///
569    /// Returns matching entities up to `limit` (0 = unlimited). Uses the same
570    /// two-phase approach as EntityGraph: bitmap candidates → full filter eval.
571    pub fn filter_cached(&self, filter_expr: &str, limit: usize) -> Result<Vec<HDict>, String> {
572        let effective_limit = if limit == 0 { usize::MAX } else { limit };
573
574        let ast = parse_filter(filter_expr).map_err(|e| format!("filter error: {e}"))?;
575
576        let state = self.cache_state.read();
577        let max_id = state.entities.len();
578
579        let candidates = query_planner::bitmap_candidates(&ast, &state.tag_index, max_id);
580
581        let mut results = Vec::new();
582
583        if let Some(ref bitmap) = candidates {
584            for eid in TagBitmapIndex::iter_set_bits(bitmap) {
585                if results.len() >= effective_limit {
586                    break;
587                }
588                if let Some(entity) = state.entities.get(eid)
589                    && matches(&ast, entity, None)
590                {
591                    results.push(entity.clone());
592                }
593            }
594        } else {
595            for entity in state.entities.iter() {
596                if results.len() >= effective_limit {
597                    break;
598                }
599                if matches(&ast, entity, None) {
600                    results.push(entity.clone());
601                }
602            }
603        }
604
605        Ok(results)
606    }
607
608    /// Add a federated entity ID to the remote watch set.
609    pub fn add_remote_watch(&self, prefixed_id: &str) {
610        self.remote_watch_ids
611            .write()
612            .insert(prefixed_id.to_string());
613    }
614
615    /// Remove a federated entity ID from the remote watch set.
616    pub fn remove_remote_watch(&self, prefixed_id: &str) {
617        self.remote_watch_ids.write().remove(prefixed_id);
618    }
619
620    /// Returns the number of entity IDs being watched remotely.
621    pub fn remote_watch_count(&self) -> usize {
622        self.remote_watch_ids.read().len()
623    }
624
625    /// Returns the current transport mode.
626    pub fn transport_mode(&self) -> TransportMode {
627        match self.transport_mode.load(Ordering::Relaxed) {
628            1 => TransportMode::WebSocket,
629            _ => TransportMode::Http,
630        }
631    }
632
633    /// Returns whether the connector is currently connected (last sync succeeded).
634    pub fn is_connected(&self) -> bool {
635        self.connected.load(Ordering::Relaxed)
636    }
637
638    /// Returns the timestamp of the last successful sync, if any.
639    pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
640        *self.last_sync.read()
641    }
642
643    /// Strip the id_prefix from an entity ID.
644    fn strip_id(&self, prefixed_id: &str) -> String {
645        if let Some(ref prefix) = self.config.id_prefix {
646            prefixed_id
647                .strip_prefix(prefix.as_str())
648                .unwrap_or(prefixed_id)
649                .to_string()
650        } else {
651            prefixed_id.to_string()
652        }
653    }
654
655    /// Ensure the persistent client is connected, establishing a new
656    /// connection if needed. Returns an error if connection fails.
657    async fn ensure_connected(&self) -> Result<(), String> {
658        if self.client.read().await.is_none() {
659            self.connect_persistent().await?;
660        }
661        Ok(())
662    }
663
664    /// Handle a failed proxy operation by clearing the client to force
665    /// reconnection on the next call.
666    async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
667        *self.client.write().await = None;
668        self.connected.store(false, Ordering::Relaxed);
669        format!("{op_name} failed: {e}")
670    }
671
672    /// Proxy a hisRead request to the remote server.
673    ///
674    /// Strips the ID prefix, calls `hisRead` on the remote, and returns the
675    /// response grid. Clears the client on connection error.
676    pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
677        self.ensure_connected().await?;
678        let id = self.strip_id(prefixed_id);
679        let guard = self.client.read().await;
680        let client = guard.as_ref().ok_or("not connected")?;
681        match client.his_read(&id, range).await {
682            Ok(grid) => Ok(grid),
683            Err(e) => {
684                drop(guard);
685                Err(self.on_proxy_error("hisRead", e).await)
686            }
687        }
688    }
689
690    /// Proxy a pointWrite request to the remote server.
691    ///
692    /// Strips the ID prefix, calls `pointWrite` on the remote with the given
693    /// level and value. Clears the client on connection error.
694    pub async fn proxy_point_write(
695        &self,
696        prefixed_id: &str,
697        level: u8,
698        val: &Kind,
699    ) -> Result<HGrid, String> {
700        self.ensure_connected().await?;
701        let id = self.strip_id(prefixed_id);
702        let val = val.clone();
703        let guard = self.client.read().await;
704        let client = guard.as_ref().ok_or("not connected")?;
705        match client.point_write(&id, level, val).await {
706            Ok(grid) => Ok(grid),
707            Err(e) => {
708                drop(guard);
709                Err(self.on_proxy_error("pointWrite", e).await)
710            }
711        }
712    }
713
714    /// Proxy a hisWrite request to the remote server.
715    ///
716    /// Strips the ID prefix, calls `hisWrite` on the remote with the given
717    /// time-series rows. Clears the client on connection error.
718    pub async fn proxy_his_write(
719        &self,
720        prefixed_id: &str,
721        items: Vec<HDict>,
722    ) -> Result<HGrid, String> {
723        self.ensure_connected().await?;
724        let id = self.strip_id(prefixed_id);
725        let guard = self.client.read().await;
726        let client = guard.as_ref().ok_or("not connected")?;
727        match client.his_write(&id, items).await {
728            Ok(grid) => Ok(grid),
729            Err(e) => {
730                drop(guard);
731                Err(self.on_proxy_error("hisWrite", e).await)
732            }
733        }
734    }
735
736    /// Proxy an import request for a single entity to the remote server.
737    ///
738    /// Strips the id prefix from the entity, wraps it in a single-row grid,
739    /// and calls the remote `import` op.
740    pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
741        self.ensure_connected().await?;
742
743        let mut stripped = entity.clone();
744        if let Some(ref prefix) = self.config.id_prefix {
745            strip_prefix_refs(&mut stripped, prefix);
746        }
747
748        let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
749        let cols: Vec<haystack_core::data::HCol> = col_names
750            .iter()
751            .map(|n| haystack_core::data::HCol::new(n.as_str()))
752            .collect();
753        let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
754
755        let guard = self.client.read().await;
756        let client = guard.as_ref().ok_or("not connected")?;
757        match client.call("import", &grid).await {
758            Ok(grid) => Ok(grid),
759            Err(e) => {
760                drop(guard);
761                Err(self.on_proxy_error("import", e).await)
762            }
763        }
764    }
765
766    /// Proxy an invokeAction request to the remote server.
767    ///
768    /// Strips the ID prefix, calls `invokeAction` on the remote with the given
769    /// action name and arguments dict. Clears the client on connection error.
770    pub async fn proxy_invoke_action(
771        &self,
772        prefixed_id: &str,
773        action: &str,
774        args: HDict,
775    ) -> Result<HGrid, String> {
776        self.ensure_connected().await?;
777        let id = self.strip_id(prefixed_id);
778        let action = action.to_string();
779        let guard = self.client.read().await;
780        let client = guard.as_ref().ok_or("not connected")?;
781        match client.invoke_action(&id, &action, args).await {
782            Ok(grid) => Ok(grid),
783            Err(e) => {
784                drop(guard);
785                Err(self.on_proxy_error("invokeAction", e).await)
786            }
787        }
788    }
789
790    /// Spawn a background sync task for this connector.
791    ///
792    /// The task loops forever, syncing entities at the configured interval.
793    /// On error, the persistent client is cleared to force reconnection on
794    /// the next iteration.
795    pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
796        let base_interval = connector.config.effective_sync_interval_secs();
797        let min_interval = base_interval / 2;
798        let max_interval = base_interval * 5;
799
800        tokio::spawn(async move {
801            loop {
802                let prev_count = connector.last_entity_count.load(Ordering::Relaxed);
803
804                match connector.sync().await {
805                    Ok(count) => {
806                        log::debug!("Synced {} entities from {}", count, connector.config.name);
807
808                        // Adaptive interval: increase when no changes, reset when changes detected.
809                        let current = connector.current_interval_secs.load(Ordering::Relaxed);
810                        let new_interval = if count as u64 == prev_count && prev_count > 0 {
811                            // No change — slow down (increase by 50%, capped at max).
812                            (current + current / 2).min(max_interval)
813                        } else {
814                            // Changes detected — reset to base interval.
815                            base_interval
816                        };
817                        connector
818                            .current_interval_secs
819                            .store(new_interval, Ordering::Relaxed);
820                        connector
821                            .last_entity_count
822                            .store(count as u64, Ordering::Relaxed);
823                    }
824                    Err(e) => {
825                        log::error!("Sync failed for {}: {}", connector.config.name, e);
826                        // Clear the client on error to force reconnection
827                        *connector.client.write().await = None;
828                        connector.connected.store(false, Ordering::Relaxed);
829                        // On error, use base interval for retry.
830                        connector
831                            .current_interval_secs
832                            .store(base_interval, Ordering::Relaxed);
833                    }
834                }
835
836                let sleep_secs = connector
837                    .current_interval_secs
838                    .load(Ordering::Relaxed)
839                    .max(min_interval);
840                tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
841            }
842        })
843    }
844}
845
846/// Build a `read` request grid with filter `"*"` (all entities).
847fn build_read_all_grid() -> HGrid {
848    use haystack_core::data::HCol;
849    let mut row = HDict::new();
850    row.set("filter", Kind::Str("*".to_string()));
851    HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
852}
853
854/// Build a request grid for the `changes` op with the given version.
855fn build_changes_grid(since_version: u64) -> HGrid {
856    use haystack_core::data::HCol;
857    use haystack_core::kinds::Number;
858    let mut row = HDict::new();
859    row.set(
860        "version",
861        Kind::Number(Number::unitless(since_version as f64)),
862    );
863    HGrid::from_parts(HDict::new(), vec![HCol::new("version")], vec![row])
864}
865
866impl ConnectorConfig {
867    /// Returns the WebSocket URL. If `ws_url` is set, returns it directly.
868    /// Otherwise derives from `url` by replacing `http://` with `ws://`
869    /// or `https://` with `wss://` and appending `/ws`.
870    pub fn effective_ws_url(&self) -> String {
871        if let Some(ref ws) = self.ws_url {
872            return ws.clone();
873        }
874        let ws = if self.url.starts_with("https://") {
875            self.url.replacen("https://", "wss://", 1)
876        } else {
877            self.url.replacen("http://", "ws://", 1)
878        };
879        format!("{ws}/ws")
880    }
881
882    /// Returns the sync interval in seconds. Defaults to 60 if not set.
883    pub fn effective_sync_interval_secs(&self) -> u64 {
884        self.sync_interval_secs.unwrap_or(60)
885    }
886}
887
888/// Prefix all Ref values in an entity dict.
889///
890/// Prefixes the `id` tag and any tag whose name ends with `Ref`
891/// (e.g. `siteRef`, `equipRef`, `floorRef`, `spaceRef`).
892pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
893    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
894
895    for name in &tag_names {
896        let should_prefix = name == "id" || name.ends_with("Ref");
897        if !should_prefix {
898            continue;
899        }
900
901        if let Some(Kind::Ref(r)) = entity.get(name) {
902            let new_val = format!("{}{}", prefix, r.val);
903            let new_ref = HRef::new(new_val, r.dis.clone());
904            entity.set(name.as_str(), Kind::Ref(new_ref));
905        }
906    }
907}
908
909/// Strip a prefix from all Ref values in an entity dict.
910///
911/// The inverse of [`prefix_refs`]. Strips the given prefix from the `id` tag
912/// and any tag whose name ends with `Ref`, but only if the Ref value actually
913/// starts with the prefix. Preserves `dis` metadata on Refs.
914pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
915    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
916
917    for name in &tag_names {
918        let should_strip = name == "id" || name.ends_with("Ref");
919        if !should_strip {
920            continue;
921        }
922
923        if let Some(Kind::Ref(r)) = entity.get(name)
924            && let Some(stripped) = r.val.strip_prefix(prefix)
925        {
926            let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
927            entity.set(name.as_str(), Kind::Ref(new_ref));
928        }
929    }
930}
931
932#[cfg(test)]
933mod tests {
934    use super::*;
935    use haystack_core::kinds::HRef;
936
937    #[test]
938    fn connector_new_empty_cache() {
939        let config = ConnectorConfig {
940            name: "test".to_string(),
941            url: "http://localhost:8080/api".to_string(),
942            username: "user".to_string(),
943            password: "pass".to_string(),
944            id_prefix: None,
945            ws_url: None,
946            sync_interval_secs: None,
947            client_cert: None,
948            client_key: None,
949            ca_cert: None,
950        };
951        let connector = Connector::new(config);
952        assert_eq!(connector.entity_count(), 0);
953        assert!(connector.cached_entities().is_empty());
954    }
955
956    #[test]
957    fn connector_config_deserialization() {
958        let json = r#"{
959            "name": "Remote Server",
960            "url": "http://remote:8080/api",
961            "username": "admin",
962            "password": "secret",
963            "id_prefix": "r1-"
964        }"#;
965        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
966        assert_eq!(config.name, "Remote Server");
967        assert_eq!(config.url, "http://remote:8080/api");
968        assert_eq!(config.username, "admin");
969        assert_eq!(config.password, "secret");
970        assert_eq!(config.id_prefix, Some("r1-".to_string()));
971    }
972
973    #[test]
974    fn connector_config_deserialization_without_prefix() {
975        let json = r#"{
976            "name": "Remote",
977            "url": "http://remote:8080/api",
978            "username": "admin",
979            "password": "secret"
980        }"#;
981        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
982        assert_eq!(config.id_prefix, None);
983    }
984
985    #[test]
986    fn id_prefix_application() {
987        let mut entity = HDict::new();
988        entity.set("id", Kind::Ref(HRef::from_val("site-1")));
989        entity.set("dis", Kind::Str("Main Site".to_string()));
990        entity.set("site", Kind::Marker);
991        entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
992        entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
993        entity.set(
994            "floorRef",
995            Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
996        );
997
998        prefix_refs(&mut entity, "r1-");
999
1000        // id should be prefixed
1001        match entity.get("id") {
1002            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1003            other => panic!("expected Ref, got {other:?}"),
1004        }
1005
1006        // siteRef should be prefixed
1007        match entity.get("siteRef") {
1008            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
1009            other => panic!("expected Ref, got {other:?}"),
1010        }
1011
1012        // equipRef should be prefixed
1013        match entity.get("equipRef") {
1014            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
1015            other => panic!("expected Ref, got {other:?}"),
1016        }
1017
1018        // floorRef should be prefixed, preserving dis
1019        match entity.get("floorRef") {
1020            Some(Kind::Ref(r)) => {
1021                assert_eq!(r.val, "r1-floor-1");
1022                assert_eq!(r.dis, Some("Floor 1".to_string()));
1023            }
1024            other => panic!("expected Ref, got {other:?}"),
1025        }
1026
1027        // Non-ref tags should not be changed
1028        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1029        assert_eq!(entity.get("site"), Some(&Kind::Marker));
1030    }
1031
1032    #[test]
1033    fn id_prefix_skips_non_ref_values() {
1034        let mut entity = HDict::new();
1035        entity.set("id", Kind::Ref(HRef::from_val("point-1")));
1036        // A tag ending in "Ref" but whose value is not actually a Ref
1037        entity.set("customRef", Kind::Str("not-a-ref".to_string()));
1038
1039        prefix_refs(&mut entity, "p-");
1040
1041        // id should be prefixed
1042        match entity.get("id") {
1043            Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
1044            other => panic!("expected Ref, got {other:?}"),
1045        }
1046
1047        // customRef is a Str, not a Ref, so it should be unchanged
1048        assert_eq!(
1049            entity.get("customRef"),
1050            Some(&Kind::Str("not-a-ref".to_string()))
1051        );
1052    }
1053
1054    #[test]
1055    fn connector_config_deserialization_full() {
1056        let json = r#"{
1057            "name": "Full Config",
1058            "url": "https://remote:8443/api",
1059            "username": "admin",
1060            "password": "secret",
1061            "id_prefix": "r1-",
1062            "ws_url": "wss://remote:8443/api/ws",
1063            "sync_interval_secs": 30,
1064            "client_cert": "/etc/certs/client.pem",
1065            "client_key": "/etc/certs/client-key.pem",
1066            "ca_cert": "/etc/certs/ca.pem"
1067        }"#;
1068        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1069        assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
1070        assert_eq!(config.sync_interval_secs, Some(30));
1071        assert_eq!(
1072            config.client_cert,
1073            Some("/etc/certs/client.pem".to_string())
1074        );
1075        assert_eq!(
1076            config.client_key,
1077            Some("/etc/certs/client-key.pem".to_string())
1078        );
1079        assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
1080    }
1081
1082    #[test]
1083    fn strip_prefix_refs_reverses_prefix() {
1084        let mut entity = HDict::new();
1085        entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
1086        entity.set("dis", Kind::Str("Main Site".to_string()));
1087        entity.set("site", Kind::Marker);
1088        entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
1089        entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
1090
1091        strip_prefix_refs(&mut entity, "r1-");
1092
1093        match entity.get("id") {
1094            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1095            other => panic!("expected Ref, got {other:?}"),
1096        }
1097        match entity.get("siteRef") {
1098            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1099            other => panic!("expected Ref, got {other:?}"),
1100        }
1101        match entity.get("equipRef") {
1102            Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
1103            other => panic!("expected Ref, got {other:?}"),
1104        }
1105        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1106    }
1107
1108    #[test]
1109    fn strip_prefix_refs_ignores_non_matching() {
1110        let mut entity = HDict::new();
1111        entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
1112
1113        strip_prefix_refs(&mut entity, "r1-");
1114
1115        match entity.get("id") {
1116            Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
1117            other => panic!("expected Ref, got {other:?}"),
1118        }
1119    }
1120
1121    #[test]
1122    fn derive_ws_url_from_http() {
1123        let config = ConnectorConfig {
1124            name: "test".to_string(),
1125            url: "http://remote:8080/api".to_string(),
1126            username: "u".to_string(),
1127            password: "p".to_string(),
1128            id_prefix: None,
1129            ws_url: None,
1130            sync_interval_secs: None,
1131            client_cert: None,
1132            client_key: None,
1133            ca_cert: None,
1134        };
1135        assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
1136    }
1137
1138    #[test]
1139    fn derive_ws_url_from_https() {
1140        let config = ConnectorConfig {
1141            name: "test".to_string(),
1142            url: "https://remote:8443/api".to_string(),
1143            username: "u".to_string(),
1144            password: "p".to_string(),
1145            id_prefix: None,
1146            ws_url: None,
1147            sync_interval_secs: None,
1148            client_cert: None,
1149            client_key: None,
1150            ca_cert: None,
1151        };
1152        assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
1153    }
1154
1155    #[test]
1156    fn explicit_ws_url_overrides_derived() {
1157        let config = ConnectorConfig {
1158            name: "test".to_string(),
1159            url: "http://remote:8080/api".to_string(),
1160            username: "u".to_string(),
1161            password: "p".to_string(),
1162            id_prefix: None,
1163            ws_url: Some("ws://custom:9999/ws".to_string()),
1164            sync_interval_secs: None,
1165            client_cert: None,
1166            client_key: None,
1167            ca_cert: None,
1168        };
1169        assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
1170    }
1171
1172    #[test]
1173    fn connector_tracks_entity_ids_in_ownership() {
1174        let config = ConnectorConfig {
1175            name: "test".to_string(),
1176            url: "http://localhost:8080/api".to_string(),
1177            username: "user".to_string(),
1178            password: "pass".to_string(),
1179            id_prefix: Some("t-".to_string()),
1180            ws_url: None,
1181            sync_interval_secs: None,
1182            client_cert: None,
1183            client_key: None,
1184            ca_cert: None,
1185        };
1186        let connector = Connector::new(config);
1187        assert!(!connector.owns("t-site-1"));
1188
1189        // Simulate populating cache
1190        {
1191            let mut entity = HDict::new();
1192            entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
1193            connector.update_cache(vec![entity]);
1194        }
1195
1196        assert!(connector.owns("t-site-1"));
1197        assert!(!connector.owns("other-1"));
1198    }
1199
1200    #[test]
1201    fn connector_new_defaults_transport_and_connected() {
1202        let config = ConnectorConfig {
1203            name: "test".to_string(),
1204            url: "http://localhost:8080/api".to_string(),
1205            username: "user".to_string(),
1206            password: "pass".to_string(),
1207            id_prefix: None,
1208            ws_url: None,
1209            sync_interval_secs: None,
1210            client_cert: None,
1211            client_key: None,
1212            ca_cert: None,
1213        };
1214        let connector = Connector::new(config);
1215        assert_eq!(connector.transport_mode(), TransportMode::Http);
1216        assert!(!connector.is_connected());
1217        assert!(connector.last_sync_time().is_none());
1218    }
1219
1220    #[test]
1221    fn connector_config_new_fields_default_to_none() {
1222        let json = r#"{
1223            "name": "Minimal",
1224            "url": "http://remote:8080/api",
1225            "username": "user",
1226            "password": "pass"
1227        }"#;
1228        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1229        assert_eq!(config.ws_url, None);
1230        assert_eq!(config.sync_interval_secs, None);
1231        assert_eq!(config.client_cert, None);
1232        assert_eq!(config.client_key, None);
1233        assert_eq!(config.ca_cert, None);
1234    }
1235
1236    #[test]
1237    fn remote_watch_add_and_remove() {
1238        let config = ConnectorConfig {
1239            name: "test".to_string(),
1240            url: "http://localhost:8080/api".to_string(),
1241            username: "user".to_string(),
1242            password: "pass".to_string(),
1243            id_prefix: Some("r-".to_string()),
1244            ws_url: None,
1245            sync_interval_secs: None,
1246            client_cert: None,
1247            client_key: None,
1248            ca_cert: None,
1249        };
1250        let connector = Connector::new(config);
1251        assert_eq!(connector.remote_watch_count(), 0);
1252
1253        connector.add_remote_watch("r-site-1");
1254        assert_eq!(connector.remote_watch_count(), 1);
1255
1256        connector.add_remote_watch("r-equip-2");
1257        assert_eq!(connector.remote_watch_count(), 2);
1258
1259        // Duplicate add is idempotent (HashSet).
1260        connector.add_remote_watch("r-site-1");
1261        assert_eq!(connector.remote_watch_count(), 2);
1262
1263        connector.remove_remote_watch("r-site-1");
1264        assert_eq!(connector.remote_watch_count(), 1);
1265
1266        // Removing a non-existent ID is a no-op.
1267        connector.remove_remote_watch("r-nonexistent");
1268        assert_eq!(connector.remote_watch_count(), 1);
1269
1270        connector.remove_remote_watch("r-equip-2");
1271        assert_eq!(connector.remote_watch_count(), 0);
1272    }
1273
1274    fn make_test_entities() -> Vec<HDict> {
1275        let mut site = HDict::new();
1276        site.set("id", Kind::Ref(HRef::from_val("site-1")));
1277        site.set("site", Kind::Marker);
1278        site.set("dis", Kind::Str("Main Site".into()));
1279
1280        let mut equip = HDict::new();
1281        equip.set("id", Kind::Ref(HRef::from_val("equip-1")));
1282        equip.set("equip", Kind::Marker);
1283        equip.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1284
1285        let mut point = HDict::new();
1286        point.set("id", Kind::Ref(HRef::from_val("point-1")));
1287        point.set("point", Kind::Marker);
1288        point.set("sensor", Kind::Marker);
1289        point.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1290
1291        vec![site, equip, point]
1292    }
1293
1294    #[test]
1295    fn filter_cached_returns_matching_entities() {
1296        let config = ConnectorConfig {
1297            name: "test".to_string(),
1298            url: "http://localhost:8080/api".to_string(),
1299            username: "user".to_string(),
1300            password: "pass".to_string(),
1301            id_prefix: None,
1302            ws_url: None,
1303            sync_interval_secs: None,
1304            client_cert: None,
1305            client_key: None,
1306            ca_cert: None,
1307        };
1308        let connector = Connector::new(config);
1309        connector.update_cache(make_test_entities());
1310
1311        // Filter for site
1312        let results = connector.filter_cached("site", 0).unwrap();
1313        assert_eq!(results.len(), 1);
1314        assert_eq!(
1315            results[0].get("id"),
1316            Some(&Kind::Ref(HRef::from_val("site-1")))
1317        );
1318
1319        // Filter for equip
1320        let results = connector.filter_cached("equip", 0).unwrap();
1321        assert_eq!(results.len(), 1);
1322
1323        // Filter for point and sensor
1324        let results = connector.filter_cached("point and sensor", 0).unwrap();
1325        assert_eq!(results.len(), 1);
1326
1327        // Filter for something that doesn't exist
1328        let results = connector.filter_cached("ahu", 0).unwrap();
1329        assert!(results.is_empty());
1330    }
1331
1332    #[test]
1333    fn filter_cached_respects_limit() {
1334        let config = ConnectorConfig {
1335            name: "test".to_string(),
1336            url: "http://localhost:8080/api".to_string(),
1337            username: "user".to_string(),
1338            password: "pass".to_string(),
1339            id_prefix: None,
1340            ws_url: None,
1341            sync_interval_secs: None,
1342            client_cert: None,
1343            client_key: None,
1344            ca_cert: None,
1345        };
1346        let connector = Connector::new(config);
1347
1348        // Add multiple points
1349        let mut entities = Vec::new();
1350        for i in 0..10 {
1351            let mut p = HDict::new();
1352            p.set("id", Kind::Ref(HRef::from_val(format!("point-{i}"))));
1353            p.set("point", Kind::Marker);
1354            entities.push(p);
1355        }
1356        connector.update_cache(entities);
1357
1358        let results = connector.filter_cached("point", 3).unwrap();
1359        assert_eq!(results.len(), 3);
1360    }
1361
1362    #[test]
1363    fn filter_cached_or_query() {
1364        let config = ConnectorConfig {
1365            name: "test".to_string(),
1366            url: "http://localhost:8080/api".to_string(),
1367            username: "user".to_string(),
1368            password: "pass".to_string(),
1369            id_prefix: None,
1370            ws_url: None,
1371            sync_interval_secs: None,
1372            client_cert: None,
1373            client_key: None,
1374            ca_cert: None,
1375        };
1376        let connector = Connector::new(config);
1377        connector.update_cache(make_test_entities());
1378
1379        // site or equip should return 2
1380        let results = connector.filter_cached("site or equip", 0).unwrap();
1381        assert_eq!(results.len(), 2);
1382    }
1383}