Skip to main content

haystack_server/
connector.rs

1//! Connector for fetching entities from a remote Haystack server.
2//!
3//! # Overview
4//!
5//! A [`Connector`] maintains a persistent connection (HTTP or WebSocket) to a
6//! remote Haystack server, periodically syncs entities into a local cache, and
7//! proxies write operations (`hisRead`, `hisWrite`, `pointWrite`,
8//! `invokeAction`, `import`) to the remote when the target entity is owned by
9//! that connector.
10//!
11//! # Lifecycle
12//!
13//! 1. **Construction** — [`Connector::new`] creates a connector with an empty
14//!    cache from a [`ConnectorConfig`] (parsed from the TOML federation config).
15//! 2. **Connection** — on first sync, [`Connector::sync`] calls
16//!    `connect_persistent()` which tries WebSocket first, falling back to HTTP.
17//! 3. **Sync loop** — [`Connector::spawn_sync_task`] spawns a tokio task that
18//!    calls [`Connector::sync`] at an adaptive interval. Sync tries incremental
19//!    delta sync via the `changes` op first, falling back to full entity fetch.
20//! 4. **Cache** — [`Connector::update_cache`] replaces the cached entity list
21//!    and rebuilds the bitmap tag index and owned-ID set for fast lookups.
22//! 5. **Proxy** — write ops check [`Connector::owns`] and forward to the remote
23//!    server via `proxy_*` methods, stripping/re-adding the ID prefix as needed.
24//!
25//! # ID Prefixing
26//!
27//! When `id_prefix` is configured, all Ref values (`id`, `siteRef`, etc.) are
28//! prefixed on sync and stripped before proxying. See [`prefix_refs`] and
29//! [`strip_prefix_refs`].
30
31use std::collections::HashMap;
32use std::collections::HashSet;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
35
36use chrono::{DateTime, Utc};
37use parking_lot::RwLock;
38
39use haystack_client::HaystackClient;
40use haystack_client::transport::http::HttpTransport;
41use haystack_client::transport::ws::WsTransport;
42use haystack_core::data::{HDict, HGrid};
43use haystack_core::filter::{matches, parse_filter};
44use haystack_core::graph::bitmap::TagBitmapIndex;
45use haystack_core::graph::query_planner;
46use haystack_core::kinds::{HRef, Kind};
47
48/// Type-erased persistent connection to a remote Haystack server.
49///
50/// Wraps either an HTTP or WebSocket `HaystackClient`, allowing the connector
51/// to hold a single persistent connection regardless of transport.
52enum ConnectorClient {
53    Http(HaystackClient<HttpTransport>),
54    Ws(HaystackClient<WsTransport>),
55}
56
57impl ConnectorClient {
58    /// Call a Haystack op through the underlying client, returning the
59    /// response grid or an error string.
60    async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
61        match self {
62            ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
63            ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
64        }
65    }
66
67    /// Read historical data for a point.
68    async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
69        match self {
70            ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
71            ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
72        }
73    }
74
75    /// Write historical data for a point.
76    async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
77        match self {
78            ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
79            ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
80        }
81    }
82
83    /// Write a value to a writable point.
84    async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
85        match self {
86            ConnectorClient::Http(c) => c
87                .point_write(id, level, val)
88                .await
89                .map_err(|e| e.to_string()),
90            ConnectorClient::Ws(c) => c
91                .point_write(id, level, val)
92                .await
93                .map_err(|e| e.to_string()),
94        }
95    }
96
97    /// Invoke an action on an entity.
98    async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
99        match self {
100            ConnectorClient::Http(c) => c
101                .invoke_action(id, action, args)
102                .await
103                .map_err(|e| e.to_string()),
104            ConnectorClient::Ws(c) => c
105                .invoke_action(id, action, args)
106                .await
107                .map_err(|e| e.to_string()),
108        }
109    }
110}
111
112/// Transport protocol used by a connector.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114#[repr(u8)]
115pub enum TransportMode {
116    Http = 0,
117    WebSocket = 1,
118}
119
120/// Configuration for a remote Haystack server connection.
121#[derive(Debug, Clone, serde::Deserialize)]
122pub struct ConnectorConfig {
123    /// Display name for this connector.
124    pub name: String,
125    /// Base URL of the remote Haystack API (e.g. "http://remote:8080/api").
126    pub url: String,
127    /// Username for SCRAM authentication.
128    pub username: String,
129    /// Password for SCRAM authentication.
130    pub password: String,
131    /// Optional tag prefix to namespace remote entity IDs (e.g. "remote1-").
132    pub id_prefix: Option<String>,
133    /// WebSocket URL (e.g. "ws://remote:8080/api/ws"). Derived from url if omitted.
134    pub ws_url: Option<String>,
135    /// Fallback polling interval in seconds (default: 60).
136    pub sync_interval_secs: Option<u64>,
137    /// Path to PEM client certificate for mTLS.
138    pub client_cert: Option<String>,
139    /// Path to PEM client private key for mTLS.
140    pub client_key: Option<String>,
141    /// Path to PEM CA certificate for server verification.
142    pub ca_cert: Option<String>,
143}
144
145/// A connector that can fetch entities from a remote Haystack server.
146///
147/// Holds a persistent connection, a local entity cache with bitmap index,
148/// and state for adaptive sync and federation proxy operations. See the
149/// [module-level documentation](self) for the full lifecycle.
150pub struct Connector {
151    pub config: ConnectorConfig,
152    /// Cached entities from last sync.
153    cache: RwLock<Vec<HDict>>,
154    /// Set of entity IDs owned by this connector (populated during sync/update_cache).
155    owned_ids: RwLock<HashSet<String>>,
156    /// Bitmap tag index over cached entities for fast filtered reads.
157    cache_index: RwLock<TagBitmapIndex>,
158    /// Maps entity ref_val → numeric cache index for bitmap lookups.
159    cache_id_map: RwLock<HashMap<String, usize>>,
160    /// Entity IDs being watched remotely (prefixed IDs).
161    remote_watch_ids: RwLock<HashSet<String>>,
162    /// Transport protocol (HTTP or WebSocket).
163    transport_mode: AtomicU8,
164    /// Whether the connector is currently connected (last sync succeeded).
165    connected: AtomicBool,
166    /// Timestamp of the last successful sync.
167    last_sync: RwLock<Option<DateTime<Utc>>>,
168    /// Remote graph version from last successful sync (for delta sync).
169    last_remote_version: RwLock<Option<u64>>,
170    /// Persistent client connection used by sync and background tasks.
171    /// Lazily initialized on first sync. Cleared on connection error.
172    /// Uses `tokio::sync::RwLock` because the guard is held across `.await`.
173    client: tokio::sync::RwLock<Option<ConnectorClient>>,
174    /// Current adaptive sync interval in seconds (adjusted based on change rate).
175    current_interval_secs: AtomicU64,
176    /// Number of entities from last sync (for change detection).
177    last_entity_count: AtomicU64,
178}
179
180impl Connector {
181    /// Create a new connector with an empty cache.
182    ///
183    /// The connector starts disconnected; a persistent connection is
184    /// established lazily on the first [`sync`](Self::sync) call.
185    pub fn new(config: ConnectorConfig) -> Self {
186        let base_interval = config.effective_sync_interval_secs();
187        Self {
188            config,
189            cache: RwLock::new(Vec::new()),
190            owned_ids: RwLock::new(HashSet::new()),
191            cache_index: RwLock::new(TagBitmapIndex::new()),
192            cache_id_map: RwLock::new(HashMap::new()),
193            remote_watch_ids: RwLock::new(HashSet::new()),
194            transport_mode: AtomicU8::new(TransportMode::Http as u8),
195            connected: AtomicBool::new(false),
196            last_sync: RwLock::new(None),
197            last_remote_version: RwLock::new(None),
198            client: tokio::sync::RwLock::new(None),
199            current_interval_secs: AtomicU64::new(base_interval),
200            last_entity_count: AtomicU64::new(0),
201        }
202    }
203
204    /// Attempt to connect to the remote server. Tries WebSocket first,
205    /// falls back to HTTP.
206    async fn connect_persistent(&self) -> Result<(), String> {
207        // Try WebSocket first
208        let ws_url = self.config.effective_ws_url();
209        match HaystackClient::connect_ws(
210            &self.config.url,
211            &ws_url,
212            &self.config.username,
213            &self.config.password,
214        )
215        .await
216        {
217            Ok(ws_client) => {
218                *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
219                self.transport_mode
220                    .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
221                self.connected.store(true, Ordering::Relaxed);
222                log::info!("Connected to {} via WebSocket", self.config.name);
223                return Ok(());
224            }
225            Err(e) => {
226                log::warn!(
227                    "WS connection to {} failed: {}, trying HTTP",
228                    self.config.name,
229                    e
230                );
231            }
232        }
233
234        // Fall back to HTTP
235        match HaystackClient::connect(
236            &self.config.url,
237            &self.config.username,
238            &self.config.password,
239        )
240        .await
241        {
242            Ok(http_client) => {
243                *self.client.write().await = Some(ConnectorClient::Http(http_client));
244                self.transport_mode
245                    .store(TransportMode::Http as u8, Ordering::Relaxed);
246                self.connected.store(true, Ordering::Relaxed);
247                log::info!("Connected to {} via HTTP", self.config.name);
248                Ok(())
249            }
250            Err(e) => {
251                self.connected.store(false, Ordering::Relaxed);
252                Err(format!("connection failed: {e}"))
253            }
254        }
255    }
256
257    /// Connect to the remote server, fetch entities, and update cache.
258    ///
259    /// Tries incremental delta sync first (via the `changes` op) if we have a
260    /// known remote version. Falls back to full sync on first connect, when the
261    /// remote doesn't support `changes`, or when the version gap is too large.
262    ///
263    /// Returns the number of cached entities after sync, or an error string.
264    pub async fn sync(&self) -> Result<usize, String> {
265        // Ensure we have a connection
266        if self.client.read().await.is_none() {
267            self.connect_persistent().await?;
268        }
269
270        // Attempt delta sync if we have a previous version.
271        let maybe_ver = *self.last_remote_version.read();
272        if let Some(last_ver) = maybe_ver {
273            match self.try_delta_sync(last_ver).await {
274                Ok(count) => return Ok(count),
275                Err(e) => {
276                    log::debug!(
277                        "Delta sync failed for {}, falling back to full: {e}",
278                        self.config.name,
279                    );
280                }
281            }
282        }
283
284        // Full sync: read all entities
285        self.full_sync().await
286    }
287
288    /// Full sync: fetch all entities from remote and replace cache.
289    async fn full_sync(&self) -> Result<usize, String> {
290        let grid = {
291            let client = self.client.read().await;
292            let client = client.as_ref().ok_or("not connected")?;
293            client.call("read", &build_read_all_grid()).await
294        };
295
296        let grid = grid.map_err(|e| {
297            self.connected.store(false, Ordering::Relaxed);
298            format!("read failed: {e}")
299        })?;
300
301        let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
302
303        // Apply id prefix if configured.
304        if let Some(ref prefix) = self.config.id_prefix {
305            for entity in &mut entities {
306                prefix_refs(entity, prefix);
307            }
308        }
309
310        let count = entities.len();
311        self.update_cache(entities);
312        *self.last_sync.write() = Some(Utc::now());
313        self.connected.store(true, Ordering::Relaxed);
314
315        // Probe remote version for next delta sync attempt.
316        self.probe_remote_version().await;
317
318        Ok(count)
319    }
320
321    /// Try to discover the remote graph version by calling `changes` with a
322    /// high version number. The response meta contains `curVer` which we store
323    /// for future delta syncs. Failures are silently ignored (remote may not
324    /// support the `changes` op).
325    async fn probe_remote_version(&self) {
326        let grid = {
327            let client = self.client.read().await;
328            let Some(client) = client.as_ref() else {
329                return;
330            };
331            client.call("changes", &build_changes_grid(u64::MAX)).await
332        };
333        if let Ok(grid) = grid {
334            if let Some(Kind::Number(n)) = grid.meta.get("curVer") {
335                *self.last_remote_version.write() = Some(n.val as u64);
336            }
337        }
338    }
339
340    /// Attempt incremental delta sync using the `changes` op.
341    ///
342    /// Sends the last known remote version to the server. The server returns
343    /// only the diffs since that version. We apply them incrementally to our
344    /// cache, avoiding a full entity set transfer.
345    async fn try_delta_sync(&self, since_version: u64) -> Result<usize, String> {
346        let changes_grid = build_changes_grid(since_version);
347        let grid = {
348            let client = self.client.read().await;
349            let client = client.as_ref().ok_or("not connected")?;
350            client.call("changes", &changes_grid).await
351        }
352        .map_err(|e| format!("changes op failed: {e}"))?;
353
354        // If the remote returned an error grid, fall back.
355        if grid.is_err() {
356            return Err("remote returned error grid for changes op".to_string());
357        }
358
359        // Extract current version from response meta.
360        let cur_ver = grid
361            .meta
362            .get("curVer")
363            .and_then(|k| {
364                if let Kind::Number(n) = k {
365                    Some(n.val as u64)
366                } else {
367                    None
368                }
369            })
370            .ok_or("changes response missing curVer in meta")?;
371
372        // No changes — nothing to do.
373        if grid.rows.is_empty() {
374            *self.last_remote_version.write() = Some(cur_ver);
375            *self.last_sync.write() = Some(Utc::now());
376            return Ok(self.cache.read().len());
377        }
378
379        // Apply diffs to a mutable copy of the cache.
380        let mut cache = self.cache.read().clone();
381        let mut id_to_idx: HashMap<String, usize> = self.cache_id_map.read().clone();
382        let prefix = self.config.id_prefix.as_deref();
383
384        for row in &grid.rows {
385            let op = match row.get("op") {
386                Some(Kind::Str(s)) => s.as_str(),
387                _ => continue,
388            };
389            let ref_val = match row.get("ref") {
390                Some(Kind::Str(s)) => s.clone(),
391                _ => continue,
392            };
393
394            match op {
395                "add" | "update" => {
396                    if let Some(Kind::Dict(entity_box)) = row.get("entity") {
397                        let mut entity: HDict = (**entity_box).clone();
398                        if let Some(pfx) = prefix {
399                            prefix_refs(&mut entity, pfx);
400                        }
401                        // Get the (potentially prefixed) ID.
402                        let entity_id = entity.get("id").and_then(|k| {
403                            if let Kind::Ref(r) = k {
404                                Some(r.val.clone())
405                            } else {
406                                None
407                            }
408                        });
409
410                        if let Some(ref eid) = entity_id {
411                            if let Some(&idx) = id_to_idx.get(eid.as_str()) {
412                                // Update in place.
413                                cache[idx] = entity;
414                            } else {
415                                // Add new entity.
416                                let idx = cache.len();
417                                id_to_idx.insert(eid.clone(), idx);
418                                cache.push(entity);
419                            }
420                        }
421                    }
422                }
423                "remove" => {
424                    let prefixed_ref = match prefix {
425                        Some(pfx) => format!("{pfx}{ref_val}"),
426                        None => ref_val,
427                    };
428                    if let Some(&idx) = id_to_idx.get(prefixed_ref.as_str()) {
429                        // Swap-remove and fix up the id_to_idx map.
430                        let last_idx = cache.len() - 1;
431                        if idx != last_idx {
432                            let last_id = cache[last_idx].get("id").and_then(|k| {
433                                if let Kind::Ref(r) = k {
434                                    Some(r.val.clone())
435                                } else {
436                                    None
437                                }
438                            });
439                            cache.swap(idx, last_idx);
440                            if let Some(lid) = last_id {
441                                id_to_idx.insert(lid, idx);
442                            }
443                        }
444                        cache.pop();
445                        id_to_idx.remove(prefixed_ref.as_str());
446                    }
447                }
448                _ => {}
449            }
450        }
451
452        let count = cache.len();
453        self.update_cache(cache);
454        *self.last_remote_version.write() = Some(cur_ver);
455        *self.last_sync.write() = Some(Utc::now());
456        self.connected.store(true, Ordering::Relaxed);
457        Ok(count)
458    }
459
460    /// Replace the cached entities and rebuild the owned-ID index and bitmap tag index.
461    ///
462    /// Extracts all `id` Ref values from the given entities into a `HashSet`,
463    /// builds a bitmap tag index for fast filtered reads, then atomically
464    /// replaces the cache, ownership set, and index.
465    pub fn update_cache(&self, entities: Vec<HDict>) {
466        let mut ids = HashSet::with_capacity(entities.len());
467        let mut tag_index = TagBitmapIndex::new();
468        let mut id_map = HashMap::with_capacity(entities.len());
469
470        for (eid, entity) in entities.iter().enumerate() {
471            if let Some(Kind::Ref(r)) = entity.get("id") {
472                ids.insert(r.val.clone());
473                id_map.insert(r.val.clone(), eid);
474            }
475
476            let tags: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
477            tag_index.add(eid, &tags);
478        }
479
480        *self.cache.write() = entities;
481        *self.owned_ids.write() = ids;
482        *self.cache_index.write() = tag_index;
483        *self.cache_id_map.write() = id_map;
484    }
485
486    /// Returns `true` if this connector owns an entity with the given ID.
487    pub fn owns(&self, id: &str) -> bool {
488        self.owned_ids.read().contains(id)
489    }
490
491    /// Look up a single cached entity by ID using the O(1) id_map index.
492    pub fn get_cached_entity(&self, id: &str) -> Option<HDict> {
493        let id_map = self.cache_id_map.read();
494        let cache = self.cache.read();
495        id_map.get(id).and_then(|&idx| cache.get(idx)).cloned()
496    }
497
498    /// Look up multiple cached entities by ID in a single pass.
499    /// Returns found entities and missing IDs.
500    pub fn batch_get_cached(&self, ids: &[&str]) -> (Vec<HDict>, Vec<String>) {
501        let id_map = self.cache_id_map.read();
502        let cache = self.cache.read();
503        let mut found = Vec::with_capacity(ids.len());
504        let mut missing = Vec::new();
505        for &id in ids {
506            if let Some(&idx) = id_map.get(id) {
507                if let Some(entity) = cache.get(idx) {
508                    found.push(entity.clone());
509                } else {
510                    missing.push(id.to_string());
511                }
512            } else {
513                missing.push(id.to_string());
514            }
515        }
516        (found, missing)
517    }
518
519    /// Returns a clone of all cached entities.
520    pub fn cached_entities(&self) -> Vec<HDict> {
521        self.cache.read().clone()
522    }
523
524    /// Returns the number of cached entities.
525    pub fn entity_count(&self) -> usize {
526        self.cache.read().len()
527    }
528
529    /// Filter cached entities using the bitmap tag index for acceleration.
530    ///
531    /// Returns matching entities up to `limit` (0 = unlimited). Uses the same
532    /// two-phase approach as EntityGraph: bitmap candidates → full filter eval.
533    pub fn filter_cached(&self, filter_expr: &str, limit: usize) -> Result<Vec<HDict>, String> {
534        let effective_limit = if limit == 0 { usize::MAX } else { limit };
535
536        let ast = parse_filter(filter_expr).map_err(|e| format!("filter error: {e}"))?;
537
538        let cache = self.cache.read();
539        let tag_index = self.cache_index.read();
540        let max_id = cache.len();
541
542        let candidates = query_planner::bitmap_candidates(&ast, &tag_index, max_id);
543
544        let mut results = Vec::new();
545
546        if let Some(ref bitmap) = candidates {
547            for eid in TagBitmapIndex::iter_set_bits(bitmap) {
548                if results.len() >= effective_limit {
549                    break;
550                }
551                if let Some(entity) = cache.get(eid) {
552                    if matches(&ast, entity, None) {
553                        results.push(entity.clone());
554                    }
555                }
556            }
557        } else {
558            for entity in cache.iter() {
559                if results.len() >= effective_limit {
560                    break;
561                }
562                if matches(&ast, entity, None) {
563                    results.push(entity.clone());
564                }
565            }
566        }
567
568        Ok(results)
569    }
570
571    /// Add a federated entity ID to the remote watch set.
572    pub fn add_remote_watch(&self, prefixed_id: &str) {
573        self.remote_watch_ids
574            .write()
575            .insert(prefixed_id.to_string());
576    }
577
578    /// Remove a federated entity ID from the remote watch set.
579    pub fn remove_remote_watch(&self, prefixed_id: &str) {
580        self.remote_watch_ids.write().remove(prefixed_id);
581    }
582
583    /// Returns the number of entity IDs being watched remotely.
584    pub fn remote_watch_count(&self) -> usize {
585        self.remote_watch_ids.read().len()
586    }
587
588    /// Returns the current transport mode.
589    pub fn transport_mode(&self) -> TransportMode {
590        match self.transport_mode.load(Ordering::Relaxed) {
591            1 => TransportMode::WebSocket,
592            _ => TransportMode::Http,
593        }
594    }
595
596    /// Returns whether the connector is currently connected (last sync succeeded).
597    pub fn is_connected(&self) -> bool {
598        self.connected.load(Ordering::Relaxed)
599    }
600
601    /// Returns the timestamp of the last successful sync, if any.
602    pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
603        *self.last_sync.read()
604    }
605
606    /// Strip the id_prefix from an entity ID.
607    fn strip_id(&self, prefixed_id: &str) -> String {
608        if let Some(ref prefix) = self.config.id_prefix {
609            prefixed_id
610                .strip_prefix(prefix.as_str())
611                .unwrap_or(prefixed_id)
612                .to_string()
613        } else {
614            prefixed_id.to_string()
615        }
616    }
617
618    /// Ensure the persistent client is connected, establishing a new
619    /// connection if needed. Returns an error if connection fails.
620    async fn ensure_connected(&self) -> Result<(), String> {
621        if self.client.read().await.is_none() {
622            self.connect_persistent().await?;
623        }
624        Ok(())
625    }
626
627    /// Handle a failed proxy operation by clearing the client to force
628    /// reconnection on the next call.
629    async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
630        *self.client.write().await = None;
631        self.connected.store(false, Ordering::Relaxed);
632        format!("{op_name} failed: {e}")
633    }
634
635    /// Proxy a hisRead request to the remote server.
636    ///
637    /// Strips the ID prefix, calls `hisRead` on the remote, and returns the
638    /// response grid. Clears the client on connection error.
639    pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
640        self.ensure_connected().await?;
641        let id = self.strip_id(prefixed_id);
642        let guard = self.client.read().await;
643        let client = guard.as_ref().ok_or("not connected")?;
644        match client.his_read(&id, range).await {
645            Ok(grid) => Ok(grid),
646            Err(e) => {
647                drop(guard);
648                Err(self.on_proxy_error("hisRead", e).await)
649            }
650        }
651    }
652
653    /// Proxy a pointWrite request to the remote server.
654    ///
655    /// Strips the ID prefix, calls `pointWrite` on the remote with the given
656    /// level and value. Clears the client on connection error.
657    pub async fn proxy_point_write(
658        &self,
659        prefixed_id: &str,
660        level: u8,
661        val: &Kind,
662    ) -> Result<HGrid, String> {
663        self.ensure_connected().await?;
664        let id = self.strip_id(prefixed_id);
665        let val = val.clone();
666        let guard = self.client.read().await;
667        let client = guard.as_ref().ok_or("not connected")?;
668        match client.point_write(&id, level, val).await {
669            Ok(grid) => Ok(grid),
670            Err(e) => {
671                drop(guard);
672                Err(self.on_proxy_error("pointWrite", e).await)
673            }
674        }
675    }
676
677    /// Proxy a hisWrite request to the remote server.
678    ///
679    /// Strips the ID prefix, calls `hisWrite` on the remote with the given
680    /// time-series rows. Clears the client on connection error.
681    pub async fn proxy_his_write(
682        &self,
683        prefixed_id: &str,
684        items: Vec<HDict>,
685    ) -> Result<HGrid, String> {
686        self.ensure_connected().await?;
687        let id = self.strip_id(prefixed_id);
688        let guard = self.client.read().await;
689        let client = guard.as_ref().ok_or("not connected")?;
690        match client.his_write(&id, items).await {
691            Ok(grid) => Ok(grid),
692            Err(e) => {
693                drop(guard);
694                Err(self.on_proxy_error("hisWrite", e).await)
695            }
696        }
697    }
698
699    /// Proxy an import request for a single entity to the remote server.
700    ///
701    /// Strips the id prefix from the entity, wraps it in a single-row grid,
702    /// and calls the remote `import` op.
703    pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
704        self.ensure_connected().await?;
705
706        let mut stripped = entity.clone();
707        if let Some(ref prefix) = self.config.id_prefix {
708            strip_prefix_refs(&mut stripped, prefix);
709        }
710
711        let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
712        let cols: Vec<haystack_core::data::HCol> = col_names
713            .iter()
714            .map(|n| haystack_core::data::HCol::new(n.as_str()))
715            .collect();
716        let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
717
718        let guard = self.client.read().await;
719        let client = guard.as_ref().ok_or("not connected")?;
720        match client.call("import", &grid).await {
721            Ok(grid) => Ok(grid),
722            Err(e) => {
723                drop(guard);
724                Err(self.on_proxy_error("import", e).await)
725            }
726        }
727    }
728
729    /// Proxy an invokeAction request to the remote server.
730    ///
731    /// Strips the ID prefix, calls `invokeAction` on the remote with the given
732    /// action name and arguments dict. Clears the client on connection error.
733    pub async fn proxy_invoke_action(
734        &self,
735        prefixed_id: &str,
736        action: &str,
737        args: HDict,
738    ) -> Result<HGrid, String> {
739        self.ensure_connected().await?;
740        let id = self.strip_id(prefixed_id);
741        let action = action.to_string();
742        let guard = self.client.read().await;
743        let client = guard.as_ref().ok_or("not connected")?;
744        match client.invoke_action(&id, &action, args).await {
745            Ok(grid) => Ok(grid),
746            Err(e) => {
747                drop(guard);
748                Err(self.on_proxy_error("invokeAction", e).await)
749            }
750        }
751    }
752
753    /// Spawn a background sync task for this connector.
754    ///
755    /// The task loops forever, syncing entities at the configured interval.
756    /// On error, the persistent client is cleared to force reconnection on
757    /// the next iteration.
758    pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
759        let base_interval = connector.config.effective_sync_interval_secs();
760        let min_interval = base_interval / 2;
761        let max_interval = base_interval * 5;
762
763        tokio::spawn(async move {
764            loop {
765                let prev_count = connector.last_entity_count.load(Ordering::Relaxed);
766
767                match connector.sync().await {
768                    Ok(count) => {
769                        log::debug!("Synced {} entities from {}", count, connector.config.name);
770
771                        // Adaptive interval: increase when no changes, reset when changes detected.
772                        let current = connector.current_interval_secs.load(Ordering::Relaxed);
773                        let new_interval = if count as u64 == prev_count && prev_count > 0 {
774                            // No change — slow down (increase by 50%, capped at max).
775                            (current + current / 2).min(max_interval)
776                        } else {
777                            // Changes detected — reset to base interval.
778                            base_interval
779                        };
780                        connector
781                            .current_interval_secs
782                            .store(new_interval, Ordering::Relaxed);
783                        connector
784                            .last_entity_count
785                            .store(count as u64, Ordering::Relaxed);
786                    }
787                    Err(e) => {
788                        log::error!("Sync failed for {}: {}", connector.config.name, e);
789                        // Clear the client on error to force reconnection
790                        *connector.client.write().await = None;
791                        connector.connected.store(false, Ordering::Relaxed);
792                        // On error, use base interval for retry.
793                        connector
794                            .current_interval_secs
795                            .store(base_interval, Ordering::Relaxed);
796                    }
797                }
798
799                let sleep_secs = connector
800                    .current_interval_secs
801                    .load(Ordering::Relaxed)
802                    .max(min_interval);
803                tokio::time::sleep(std::time::Duration::from_secs(sleep_secs)).await;
804            }
805        })
806    }
807}
808
809/// Build a `read` request grid with filter `"*"` (all entities).
810fn build_read_all_grid() -> HGrid {
811    use haystack_core::data::HCol;
812    let mut row = HDict::new();
813    row.set("filter", Kind::Str("*".to_string()));
814    HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
815}
816
817/// Build a request grid for the `changes` op with the given version.
818fn build_changes_grid(since_version: u64) -> HGrid {
819    use haystack_core::data::HCol;
820    use haystack_core::kinds::Number;
821    let mut row = HDict::new();
822    row.set(
823        "version",
824        Kind::Number(Number::unitless(since_version as f64)),
825    );
826    HGrid::from_parts(HDict::new(), vec![HCol::new("version")], vec![row])
827}
828
829impl ConnectorConfig {
830    /// Returns the WebSocket URL. If `ws_url` is set, returns it directly.
831    /// Otherwise derives from `url` by replacing `http://` with `ws://`
832    /// or `https://` with `wss://` and appending `/ws`.
833    pub fn effective_ws_url(&self) -> String {
834        if let Some(ref ws) = self.ws_url {
835            return ws.clone();
836        }
837        let ws = if self.url.starts_with("https://") {
838            self.url.replacen("https://", "wss://", 1)
839        } else {
840            self.url.replacen("http://", "ws://", 1)
841        };
842        format!("{ws}/ws")
843    }
844
845    /// Returns the sync interval in seconds. Defaults to 60 if not set.
846    pub fn effective_sync_interval_secs(&self) -> u64 {
847        self.sync_interval_secs.unwrap_or(60)
848    }
849}
850
851/// Prefix all Ref values in an entity dict.
852///
853/// Prefixes the `id` tag and any tag whose name ends with `Ref`
854/// (e.g. `siteRef`, `equipRef`, `floorRef`, `spaceRef`).
855pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
856    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
857
858    for name in &tag_names {
859        let should_prefix = name == "id" || name.ends_with("Ref");
860        if !should_prefix {
861            continue;
862        }
863
864        if let Some(Kind::Ref(r)) = entity.get(name) {
865            let new_val = format!("{}{}", prefix, r.val);
866            let new_ref = HRef::new(new_val, r.dis.clone());
867            entity.set(name.as_str(), Kind::Ref(new_ref));
868        }
869    }
870}
871
872/// Strip a prefix from all Ref values in an entity dict.
873///
874/// The inverse of [`prefix_refs`]. Strips the given prefix from the `id` tag
875/// and any tag whose name ends with `Ref`, but only if the Ref value actually
876/// starts with the prefix. Preserves `dis` metadata on Refs.
877pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
878    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
879
880    for name in &tag_names {
881        let should_strip = name == "id" || name.ends_with("Ref");
882        if !should_strip {
883            continue;
884        }
885
886        if let Some(Kind::Ref(r)) = entity.get(name)
887            && let Some(stripped) = r.val.strip_prefix(prefix)
888        {
889            let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
890            entity.set(name.as_str(), Kind::Ref(new_ref));
891        }
892    }
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898    use haystack_core::kinds::HRef;
899
900    #[test]
901    fn connector_new_empty_cache() {
902        let config = ConnectorConfig {
903            name: "test".to_string(),
904            url: "http://localhost:8080/api".to_string(),
905            username: "user".to_string(),
906            password: "pass".to_string(),
907            id_prefix: None,
908            ws_url: None,
909            sync_interval_secs: None,
910            client_cert: None,
911            client_key: None,
912            ca_cert: None,
913        };
914        let connector = Connector::new(config);
915        assert_eq!(connector.entity_count(), 0);
916        assert!(connector.cached_entities().is_empty());
917    }
918
919    #[test]
920    fn connector_config_deserialization() {
921        let json = r#"{
922            "name": "Remote Server",
923            "url": "http://remote:8080/api",
924            "username": "admin",
925            "password": "secret",
926            "id_prefix": "r1-"
927        }"#;
928        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
929        assert_eq!(config.name, "Remote Server");
930        assert_eq!(config.url, "http://remote:8080/api");
931        assert_eq!(config.username, "admin");
932        assert_eq!(config.password, "secret");
933        assert_eq!(config.id_prefix, Some("r1-".to_string()));
934    }
935
936    #[test]
937    fn connector_config_deserialization_without_prefix() {
938        let json = r#"{
939            "name": "Remote",
940            "url": "http://remote:8080/api",
941            "username": "admin",
942            "password": "secret"
943        }"#;
944        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
945        assert_eq!(config.id_prefix, None);
946    }
947
948    #[test]
949    fn id_prefix_application() {
950        let mut entity = HDict::new();
951        entity.set("id", Kind::Ref(HRef::from_val("site-1")));
952        entity.set("dis", Kind::Str("Main Site".to_string()));
953        entity.set("site", Kind::Marker);
954        entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
955        entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
956        entity.set(
957            "floorRef",
958            Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
959        );
960
961        prefix_refs(&mut entity, "r1-");
962
963        // id should be prefixed
964        match entity.get("id") {
965            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
966            other => panic!("expected Ref, got {other:?}"),
967        }
968
969        // siteRef should be prefixed
970        match entity.get("siteRef") {
971            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
972            other => panic!("expected Ref, got {other:?}"),
973        }
974
975        // equipRef should be prefixed
976        match entity.get("equipRef") {
977            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
978            other => panic!("expected Ref, got {other:?}"),
979        }
980
981        // floorRef should be prefixed, preserving dis
982        match entity.get("floorRef") {
983            Some(Kind::Ref(r)) => {
984                assert_eq!(r.val, "r1-floor-1");
985                assert_eq!(r.dis, Some("Floor 1".to_string()));
986            }
987            other => panic!("expected Ref, got {other:?}"),
988        }
989
990        // Non-ref tags should not be changed
991        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
992        assert_eq!(entity.get("site"), Some(&Kind::Marker));
993    }
994
995    #[test]
996    fn id_prefix_skips_non_ref_values() {
997        let mut entity = HDict::new();
998        entity.set("id", Kind::Ref(HRef::from_val("point-1")));
999        // A tag ending in "Ref" but whose value is not actually a Ref
1000        entity.set("customRef", Kind::Str("not-a-ref".to_string()));
1001
1002        prefix_refs(&mut entity, "p-");
1003
1004        // id should be prefixed
1005        match entity.get("id") {
1006            Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
1007            other => panic!("expected Ref, got {other:?}"),
1008        }
1009
1010        // customRef is a Str, not a Ref, so it should be unchanged
1011        assert_eq!(
1012            entity.get("customRef"),
1013            Some(&Kind::Str("not-a-ref".to_string()))
1014        );
1015    }
1016
1017    #[test]
1018    fn connector_config_deserialization_full() {
1019        let json = r#"{
1020            "name": "Full Config",
1021            "url": "https://remote:8443/api",
1022            "username": "admin",
1023            "password": "secret",
1024            "id_prefix": "r1-",
1025            "ws_url": "wss://remote:8443/api/ws",
1026            "sync_interval_secs": 30,
1027            "client_cert": "/etc/certs/client.pem",
1028            "client_key": "/etc/certs/client-key.pem",
1029            "ca_cert": "/etc/certs/ca.pem"
1030        }"#;
1031        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1032        assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
1033        assert_eq!(config.sync_interval_secs, Some(30));
1034        assert_eq!(
1035            config.client_cert,
1036            Some("/etc/certs/client.pem".to_string())
1037        );
1038        assert_eq!(
1039            config.client_key,
1040            Some("/etc/certs/client-key.pem".to_string())
1041        );
1042        assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
1043    }
1044
1045    #[test]
1046    fn strip_prefix_refs_reverses_prefix() {
1047        let mut entity = HDict::new();
1048        entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
1049        entity.set("dis", Kind::Str("Main Site".to_string()));
1050        entity.set("site", Kind::Marker);
1051        entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
1052        entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
1053
1054        strip_prefix_refs(&mut entity, "r1-");
1055
1056        match entity.get("id") {
1057            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1058            other => panic!("expected Ref, got {other:?}"),
1059        }
1060        match entity.get("siteRef") {
1061            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
1062            other => panic!("expected Ref, got {other:?}"),
1063        }
1064        match entity.get("equipRef") {
1065            Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
1066            other => panic!("expected Ref, got {other:?}"),
1067        }
1068        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
1069    }
1070
1071    #[test]
1072    fn strip_prefix_refs_ignores_non_matching() {
1073        let mut entity = HDict::new();
1074        entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
1075
1076        strip_prefix_refs(&mut entity, "r1-");
1077
1078        match entity.get("id") {
1079            Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
1080            other => panic!("expected Ref, got {other:?}"),
1081        }
1082    }
1083
1084    #[test]
1085    fn derive_ws_url_from_http() {
1086        let config = ConnectorConfig {
1087            name: "test".to_string(),
1088            url: "http://remote:8080/api".to_string(),
1089            username: "u".to_string(),
1090            password: "p".to_string(),
1091            id_prefix: None,
1092            ws_url: None,
1093            sync_interval_secs: None,
1094            client_cert: None,
1095            client_key: None,
1096            ca_cert: None,
1097        };
1098        assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
1099    }
1100
1101    #[test]
1102    fn derive_ws_url_from_https() {
1103        let config = ConnectorConfig {
1104            name: "test".to_string(),
1105            url: "https://remote:8443/api".to_string(),
1106            username: "u".to_string(),
1107            password: "p".to_string(),
1108            id_prefix: None,
1109            ws_url: None,
1110            sync_interval_secs: None,
1111            client_cert: None,
1112            client_key: None,
1113            ca_cert: None,
1114        };
1115        assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
1116    }
1117
1118    #[test]
1119    fn explicit_ws_url_overrides_derived() {
1120        let config = ConnectorConfig {
1121            name: "test".to_string(),
1122            url: "http://remote:8080/api".to_string(),
1123            username: "u".to_string(),
1124            password: "p".to_string(),
1125            id_prefix: None,
1126            ws_url: Some("ws://custom:9999/ws".to_string()),
1127            sync_interval_secs: None,
1128            client_cert: None,
1129            client_key: None,
1130            ca_cert: None,
1131        };
1132        assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
1133    }
1134
1135    #[test]
1136    fn connector_tracks_entity_ids_in_ownership() {
1137        let config = ConnectorConfig {
1138            name: "test".to_string(),
1139            url: "http://localhost:8080/api".to_string(),
1140            username: "user".to_string(),
1141            password: "pass".to_string(),
1142            id_prefix: Some("t-".to_string()),
1143            ws_url: None,
1144            sync_interval_secs: None,
1145            client_cert: None,
1146            client_key: None,
1147            ca_cert: None,
1148        };
1149        let connector = Connector::new(config);
1150        assert!(!connector.owns("t-site-1"));
1151
1152        // Simulate populating cache
1153        {
1154            let mut entity = HDict::new();
1155            entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
1156            connector.update_cache(vec![entity]);
1157        }
1158
1159        assert!(connector.owns("t-site-1"));
1160        assert!(!connector.owns("other-1"));
1161    }
1162
1163    #[test]
1164    fn connector_new_defaults_transport_and_connected() {
1165        let config = ConnectorConfig {
1166            name: "test".to_string(),
1167            url: "http://localhost:8080/api".to_string(),
1168            username: "user".to_string(),
1169            password: "pass".to_string(),
1170            id_prefix: None,
1171            ws_url: None,
1172            sync_interval_secs: None,
1173            client_cert: None,
1174            client_key: None,
1175            ca_cert: None,
1176        };
1177        let connector = Connector::new(config);
1178        assert_eq!(connector.transport_mode(), TransportMode::Http);
1179        assert!(!connector.is_connected());
1180        assert!(connector.last_sync_time().is_none());
1181    }
1182
1183    #[test]
1184    fn connector_config_new_fields_default_to_none() {
1185        let json = r#"{
1186            "name": "Minimal",
1187            "url": "http://remote:8080/api",
1188            "username": "user",
1189            "password": "pass"
1190        }"#;
1191        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
1192        assert_eq!(config.ws_url, None);
1193        assert_eq!(config.sync_interval_secs, None);
1194        assert_eq!(config.client_cert, None);
1195        assert_eq!(config.client_key, None);
1196        assert_eq!(config.ca_cert, None);
1197    }
1198
1199    #[test]
1200    fn remote_watch_add_and_remove() {
1201        let config = ConnectorConfig {
1202            name: "test".to_string(),
1203            url: "http://localhost:8080/api".to_string(),
1204            username: "user".to_string(),
1205            password: "pass".to_string(),
1206            id_prefix: Some("r-".to_string()),
1207            ws_url: None,
1208            sync_interval_secs: None,
1209            client_cert: None,
1210            client_key: None,
1211            ca_cert: None,
1212        };
1213        let connector = Connector::new(config);
1214        assert_eq!(connector.remote_watch_count(), 0);
1215
1216        connector.add_remote_watch("r-site-1");
1217        assert_eq!(connector.remote_watch_count(), 1);
1218
1219        connector.add_remote_watch("r-equip-2");
1220        assert_eq!(connector.remote_watch_count(), 2);
1221
1222        // Duplicate add is idempotent (HashSet).
1223        connector.add_remote_watch("r-site-1");
1224        assert_eq!(connector.remote_watch_count(), 2);
1225
1226        connector.remove_remote_watch("r-site-1");
1227        assert_eq!(connector.remote_watch_count(), 1);
1228
1229        // Removing a non-existent ID is a no-op.
1230        connector.remove_remote_watch("r-nonexistent");
1231        assert_eq!(connector.remote_watch_count(), 1);
1232
1233        connector.remove_remote_watch("r-equip-2");
1234        assert_eq!(connector.remote_watch_count(), 0);
1235    }
1236
1237    fn make_test_entities() -> Vec<HDict> {
1238        let mut site = HDict::new();
1239        site.set("id", Kind::Ref(HRef::from_val("site-1")));
1240        site.set("site", Kind::Marker);
1241        site.set("dis", Kind::Str("Main Site".into()));
1242
1243        let mut equip = HDict::new();
1244        equip.set("id", Kind::Ref(HRef::from_val("equip-1")));
1245        equip.set("equip", Kind::Marker);
1246        equip.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
1247
1248        let mut point = HDict::new();
1249        point.set("id", Kind::Ref(HRef::from_val("point-1")));
1250        point.set("point", Kind::Marker);
1251        point.set("sensor", Kind::Marker);
1252        point.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
1253
1254        vec![site, equip, point]
1255    }
1256
1257    #[test]
1258    fn filter_cached_returns_matching_entities() {
1259        let config = ConnectorConfig {
1260            name: "test".to_string(),
1261            url: "http://localhost:8080/api".to_string(),
1262            username: "user".to_string(),
1263            password: "pass".to_string(),
1264            id_prefix: None,
1265            ws_url: None,
1266            sync_interval_secs: None,
1267            client_cert: None,
1268            client_key: None,
1269            ca_cert: None,
1270        };
1271        let connector = Connector::new(config);
1272        connector.update_cache(make_test_entities());
1273
1274        // Filter for site
1275        let results = connector.filter_cached("site", 0).unwrap();
1276        assert_eq!(results.len(), 1);
1277        assert_eq!(
1278            results[0].get("id"),
1279            Some(&Kind::Ref(HRef::from_val("site-1")))
1280        );
1281
1282        // Filter for equip
1283        let results = connector.filter_cached("equip", 0).unwrap();
1284        assert_eq!(results.len(), 1);
1285
1286        // Filter for point and sensor
1287        let results = connector.filter_cached("point and sensor", 0).unwrap();
1288        assert_eq!(results.len(), 1);
1289
1290        // Filter for something that doesn't exist
1291        let results = connector.filter_cached("ahu", 0).unwrap();
1292        assert!(results.is_empty());
1293    }
1294
1295    #[test]
1296    fn filter_cached_respects_limit() {
1297        let config = ConnectorConfig {
1298            name: "test".to_string(),
1299            url: "http://localhost:8080/api".to_string(),
1300            username: "user".to_string(),
1301            password: "pass".to_string(),
1302            id_prefix: None,
1303            ws_url: None,
1304            sync_interval_secs: None,
1305            client_cert: None,
1306            client_key: None,
1307            ca_cert: None,
1308        };
1309        let connector = Connector::new(config);
1310
1311        // Add multiple points
1312        let mut entities = Vec::new();
1313        for i in 0..10 {
1314            let mut p = HDict::new();
1315            p.set("id", Kind::Ref(HRef::from_val(&format!("point-{i}"))));
1316            p.set("point", Kind::Marker);
1317            entities.push(p);
1318        }
1319        connector.update_cache(entities);
1320
1321        let results = connector.filter_cached("point", 3).unwrap();
1322        assert_eq!(results.len(), 3);
1323    }
1324
1325    #[test]
1326    fn filter_cached_or_query() {
1327        let config = ConnectorConfig {
1328            name: "test".to_string(),
1329            url: "http://localhost:8080/api".to_string(),
1330            username: "user".to_string(),
1331            password: "pass".to_string(),
1332            id_prefix: None,
1333            ws_url: None,
1334            sync_interval_secs: None,
1335            client_cert: None,
1336            client_key: None,
1337            ca_cert: None,
1338        };
1339        let connector = Connector::new(config);
1340        connector.update_cache(make_test_entities());
1341
1342        // site or equip should return 2
1343        let results = connector.filter_cached("site or equip", 0).unwrap();
1344        assert_eq!(results.len(), 2);
1345    }
1346}