Skip to main content

haystack_server/
connector.rs

1//! Connector for fetching entities from a remote Haystack server.
2
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9
10use haystack_client::HaystackClient;
11use haystack_client::transport::http::HttpTransport;
12use haystack_client::transport::ws::WsTransport;
13use haystack_core::data::{HDict, HGrid};
14use haystack_core::kinds::{HRef, Kind};
15
16/// Type-erased persistent connection to a remote Haystack server.
17///
18/// Wraps either an HTTP or WebSocket `HaystackClient`, allowing the connector
19/// to hold a single persistent connection regardless of transport.
20enum ConnectorClient {
21    Http(HaystackClient<HttpTransport>),
22    Ws(HaystackClient<WsTransport>),
23}
24
25impl ConnectorClient {
26    /// Call a Haystack op through the underlying client, returning the
27    /// response grid or an error string.
28    async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
29        match self {
30            ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
31            ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
32        }
33    }
34}
35
36/// Transport protocol used by a connector.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38#[repr(u8)]
39pub enum TransportMode {
40    Http = 0,
41    WebSocket = 1,
42}
43
44/// Configuration for a remote Haystack server connection.
45#[derive(Debug, Clone, serde::Deserialize)]
46pub struct ConnectorConfig {
47    /// Display name for this connector.
48    pub name: String,
49    /// Base URL of the remote Haystack API (e.g. "http://remote:8080/api").
50    pub url: String,
51    /// Username for SCRAM authentication.
52    pub username: String,
53    /// Password for SCRAM authentication.
54    pub password: String,
55    /// Optional tag prefix to namespace remote entity IDs (e.g. "remote1-").
56    pub id_prefix: Option<String>,
57    /// WebSocket URL (e.g. "ws://remote:8080/api/ws"). Derived from url if omitted.
58    pub ws_url: Option<String>,
59    /// Fallback polling interval in seconds (default: 60).
60    pub sync_interval_secs: Option<u64>,
61    /// Path to PEM client certificate for mTLS.
62    pub client_cert: Option<String>,
63    /// Path to PEM client private key for mTLS.
64    pub client_key: Option<String>,
65    /// Path to PEM CA certificate for server verification.
66    pub ca_cert: Option<String>,
67}
68
69/// A connector that can fetch entities from a remote Haystack server.
70pub struct Connector {
71    pub config: ConnectorConfig,
72    /// Cached entities from last sync.
73    cache: RwLock<Vec<HDict>>,
74    /// Set of entity IDs owned by this connector (populated during sync/update_cache).
75    owned_ids: RwLock<HashSet<String>>,
76    /// Entity IDs being watched remotely (prefixed IDs).
77    remote_watch_ids: RwLock<HashSet<String>>,
78    /// Transport protocol (HTTP or WebSocket).
79    transport_mode: AtomicU8,
80    /// Whether the connector is currently connected (last sync succeeded).
81    connected: AtomicBool,
82    /// Timestamp of the last successful sync.
83    last_sync: RwLock<Option<DateTime<Utc>>>,
84    /// Persistent client connection used by sync and background tasks.
85    /// Lazily initialized on first sync. Cleared on connection error.
86    /// Uses `tokio::sync::RwLock` because the guard is held across `.await`.
87    client: tokio::sync::RwLock<Option<ConnectorClient>>,
88}
89
90impl Connector {
91    /// Create a new connector with an empty cache.
92    pub fn new(config: ConnectorConfig) -> Self {
93        Self {
94            config,
95            cache: RwLock::new(Vec::new()),
96            owned_ids: RwLock::new(HashSet::new()),
97            remote_watch_ids: RwLock::new(HashSet::new()),
98            transport_mode: AtomicU8::new(TransportMode::Http as u8),
99            connected: AtomicBool::new(false),
100            last_sync: RwLock::new(None),
101            client: tokio::sync::RwLock::new(None),
102        }
103    }
104
105    /// Attempt to connect to the remote server. Tries WebSocket first,
106    /// falls back to HTTP.
107    async fn connect_persistent(&self) -> Result<(), String> {
108        // Try WebSocket first
109        let ws_url = self.config.effective_ws_url();
110        match HaystackClient::connect_ws(
111            &self.config.url,
112            &ws_url,
113            &self.config.username,
114            &self.config.password,
115        )
116        .await
117        {
118            Ok(ws_client) => {
119                *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
120                self.transport_mode
121                    .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
122                self.connected.store(true, Ordering::Relaxed);
123                log::info!("Connected to {} via WebSocket", self.config.name);
124                return Ok(());
125            }
126            Err(e) => {
127                log::warn!(
128                    "WS connection to {} failed: {}, trying HTTP",
129                    self.config.name,
130                    e
131                );
132            }
133        }
134
135        // Fall back to HTTP
136        match HaystackClient::connect(
137            &self.config.url,
138            &self.config.username,
139            &self.config.password,
140        )
141        .await
142        {
143            Ok(http_client) => {
144                *self.client.write().await = Some(ConnectorClient::Http(http_client));
145                self.transport_mode
146                    .store(TransportMode::Http as u8, Ordering::Relaxed);
147                self.connected.store(true, Ordering::Relaxed);
148                log::info!("Connected to {} via HTTP", self.config.name);
149                Ok(())
150            }
151            Err(e) => {
152                self.connected.store(false, Ordering::Relaxed);
153                Err(format!("connection failed: {e}"))
154            }
155        }
156    }
157
158    /// Connect to the remote server, fetch all entities, apply id prefixing,
159    /// and store them in the cache. Returns the count of entities synced.
160    ///
161    /// Uses the persistent client if available, otherwise establishes a new
162    /// connection (WS-first, falling back to HTTP).
163    pub async fn sync(&self) -> Result<usize, String> {
164        // Ensure we have a connection
165        if self.client.read().await.is_none() {
166            self.connect_persistent().await?;
167        }
168
169        // Use the persistent client to read all entities
170        let grid = {
171            let client = self.client.read().await;
172            let client = client.as_ref().ok_or("not connected")?;
173            client.call("read", &build_read_all_grid()).await
174        };
175
176        let grid = grid.map_err(|e| {
177            self.connected.store(false, Ordering::Relaxed);
178            format!("read failed: {e}")
179        })?;
180
181        let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
182
183        // Apply id prefix if configured.
184        if let Some(ref prefix) = self.config.id_prefix {
185            for entity in &mut entities {
186                prefix_refs(entity, prefix);
187            }
188        }
189
190        let count = entities.len();
191        self.update_cache(entities);
192        *self.last_sync.write() = Some(Utc::now());
193        self.connected.store(true, Ordering::Relaxed);
194        Ok(count)
195    }
196
197    /// Replace the cached entities and rebuild the owned-ID index.
198    ///
199    /// Extracts all `id` Ref values from the given entities into a `HashSet`,
200    /// then atomically replaces both the entity cache and the ownership set.
201    pub fn update_cache(&self, entities: Vec<HDict>) {
202        let ids: HashSet<String> = entities
203            .iter()
204            .filter_map(|e| match e.get("id") {
205                Some(Kind::Ref(r)) => Some(r.val.clone()),
206                _ => None,
207            })
208            .collect();
209        *self.cache.write() = entities;
210        *self.owned_ids.write() = ids;
211    }
212
213    /// Returns `true` if this connector owns an entity with the given ID.
214    pub fn owns(&self, id: &str) -> bool {
215        self.owned_ids.read().contains(id)
216    }
217
218    /// Returns a clone of all cached entities.
219    pub fn cached_entities(&self) -> Vec<HDict> {
220        self.cache.read().clone()
221    }
222
223    /// Returns the number of cached entities.
224    pub fn entity_count(&self) -> usize {
225        self.cache.read().len()
226    }
227
228    /// Add a federated entity ID to the remote watch set.
229    pub fn add_remote_watch(&self, prefixed_id: &str) {
230        self.remote_watch_ids
231            .write()
232            .insert(prefixed_id.to_string());
233    }
234
235    /// Remove a federated entity ID from the remote watch set.
236    pub fn remove_remote_watch(&self, prefixed_id: &str) {
237        self.remote_watch_ids.write().remove(prefixed_id);
238    }
239
240    /// Returns the number of entity IDs being watched remotely.
241    pub fn remote_watch_count(&self) -> usize {
242        self.remote_watch_ids.read().len()
243    }
244
245    /// Returns the current transport mode.
246    pub fn transport_mode(&self) -> TransportMode {
247        match self.transport_mode.load(Ordering::Relaxed) {
248            1 => TransportMode::WebSocket,
249            _ => TransportMode::Http,
250        }
251    }
252
253    /// Returns whether the connector is currently connected (last sync succeeded).
254    pub fn is_connected(&self) -> bool {
255        self.connected.load(Ordering::Relaxed)
256    }
257
258    /// Returns the timestamp of the last successful sync, if any.
259    pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
260        *self.last_sync.read()
261    }
262
263    /// Strip the id_prefix from an entity ID.
264    fn strip_id(&self, prefixed_id: &str) -> String {
265        if let Some(ref prefix) = self.config.id_prefix {
266            prefixed_id
267                .strip_prefix(prefix.as_str())
268                .unwrap_or(prefixed_id)
269                .to_string()
270        } else {
271            prefixed_id.to_string()
272        }
273    }
274
275    /// Create a fresh HTTP connection to the remote.
276    ///
277    // TODO: Reuse the persistent client for proxy methods once we have a
278    // better abstraction over ConnectorClient that exposes typed ops.
279    async fn connect_remote(
280        &self,
281    ) -> Result<HaystackClient<haystack_client::transport::http::HttpTransport>, String> {
282        HaystackClient::connect(
283            &self.config.url,
284            &self.config.username,
285            &self.config.password,
286        )
287        .await
288        .map_err(|e| format!("connection failed: {e}"))
289    }
290
291    /// Proxy a hisRead request to the remote server.
292    pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
293        let id = self.strip_id(prefixed_id);
294        let client = self.connect_remote().await?;
295        client
296            .his_read(&id, range)
297            .await
298            .map_err(|e| format!("hisRead failed: {e}"))
299    }
300
301    /// Proxy a pointWrite request to the remote server.
302    pub async fn proxy_point_write(
303        &self,
304        prefixed_id: &str,
305        level: u8,
306        val: &Kind,
307    ) -> Result<HGrid, String> {
308        let id = self.strip_id(prefixed_id);
309        let client = self.connect_remote().await?;
310        client
311            .point_write(&id, level, val.clone())
312            .await
313            .map_err(|e| format!("pointWrite failed: {e}"))
314    }
315
316    /// Proxy a hisWrite request to the remote server.
317    pub async fn proxy_his_write(
318        &self,
319        prefixed_id: &str,
320        items: Vec<HDict>,
321    ) -> Result<HGrid, String> {
322        let id = self.strip_id(prefixed_id);
323        let client = self.connect_remote().await?;
324        client
325            .his_write(&id, items)
326            .await
327            .map_err(|e| format!("hisWrite failed: {e}"))
328    }
329
330    /// Proxy an import request for a single entity to the remote server.
331    ///
332    /// Strips the id prefix from the entity, wraps it in a single-row grid,
333    /// and calls the remote `import` op.
334    pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
335        use crate::connector::strip_prefix_refs;
336
337        let mut stripped = entity.clone();
338        if let Some(ref prefix) = self.config.id_prefix {
339            strip_prefix_refs(&mut stripped, prefix);
340        }
341
342        // Build a single-row grid with columns from the entity.
343        let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
344        let cols: Vec<haystack_core::data::HCol> = col_names
345            .iter()
346            .map(|n| haystack_core::data::HCol::new(n.as_str()))
347            .collect();
348        let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
349
350        let client = self.connect_remote().await?;
351        client
352            .call("import", &grid)
353            .await
354            .map_err(|e| format!("import failed: {e}"))
355    }
356
357    /// Proxy an invokeAction request to the remote server.
358    pub async fn proxy_invoke_action(
359        &self,
360        prefixed_id: &str,
361        action: &str,
362        args: HDict,
363    ) -> Result<HGrid, String> {
364        let id = self.strip_id(prefixed_id);
365        let client = self.connect_remote().await?;
366        client
367            .invoke_action(&id, action, args)
368            .await
369            .map_err(|e| format!("invokeAction failed: {e}"))
370    }
371
372    /// Spawn a background sync task for this connector.
373    ///
374    /// The task loops forever, syncing entities at the configured interval.
375    /// On error, the persistent client is cleared to force reconnection on
376    /// the next iteration.
377    pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
378        let interval_secs = connector.config.effective_sync_interval_secs();
379        tokio::spawn(async move {
380            loop {
381                match connector.sync().await {
382                    Ok(count) => {
383                        log::debug!("Synced {} entities from {}", count, connector.config.name);
384                    }
385                    Err(e) => {
386                        log::error!("Sync failed for {}: {}", connector.config.name, e);
387                        // Clear the client on error to force reconnection
388                        *connector.client.write().await = None;
389                        connector.connected.store(false, Ordering::Relaxed);
390                    }
391                }
392                tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
393            }
394        })
395    }
396}
397
398/// Build a `read` request grid with filter `"*"` (all entities).
399fn build_read_all_grid() -> HGrid {
400    use haystack_core::data::HCol;
401    let mut row = HDict::new();
402    row.set("filter", Kind::Str("*".to_string()));
403    HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
404}
405
406impl ConnectorConfig {
407    /// Returns the WebSocket URL. If `ws_url` is set, returns it directly.
408    /// Otherwise derives from `url` by replacing `http://` with `ws://`
409    /// or `https://` with `wss://` and appending `/ws`.
410    pub fn effective_ws_url(&self) -> String {
411        if let Some(ref ws) = self.ws_url {
412            return ws.clone();
413        }
414        let ws = if self.url.starts_with("https://") {
415            self.url.replacen("https://", "wss://", 1)
416        } else {
417            self.url.replacen("http://", "ws://", 1)
418        };
419        format!("{ws}/ws")
420    }
421
422    /// Returns the sync interval in seconds. Defaults to 60 if not set.
423    pub fn effective_sync_interval_secs(&self) -> u64 {
424        self.sync_interval_secs.unwrap_or(60)
425    }
426}
427
428/// Prefix all Ref values in an entity dict.
429///
430/// Prefixes the `id` tag and any tag whose name ends with `Ref`
431/// (e.g. `siteRef`, `equipRef`, `floorRef`, `spaceRef`).
432pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
433    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
434
435    for name in &tag_names {
436        let should_prefix = name == "id" || name.ends_with("Ref");
437        if !should_prefix {
438            continue;
439        }
440
441        if let Some(Kind::Ref(r)) = entity.get(name) {
442            let new_val = format!("{}{}", prefix, r.val);
443            let new_ref = HRef::new(new_val, r.dis.clone());
444            entity.set(name.as_str(), Kind::Ref(new_ref));
445        }
446    }
447}
448
449/// Strip a prefix from all Ref values in an entity dict.
450///
451/// The inverse of [`prefix_refs`]. Strips the given prefix from the `id` tag
452/// and any tag whose name ends with `Ref`, but only if the Ref value actually
453/// starts with the prefix. Preserves `dis` metadata on Refs.
454pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
455    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
456
457    for name in &tag_names {
458        let should_strip = name == "id" || name.ends_with("Ref");
459        if !should_strip {
460            continue;
461        }
462
463        if let Some(Kind::Ref(r)) = entity.get(name)
464            && let Some(stripped) = r.val.strip_prefix(prefix)
465        {
466            let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
467            entity.set(name.as_str(), Kind::Ref(new_ref));
468        }
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475    use haystack_core::kinds::HRef;
476
477    #[test]
478    fn connector_new_empty_cache() {
479        let config = ConnectorConfig {
480            name: "test".to_string(),
481            url: "http://localhost:8080/api".to_string(),
482            username: "user".to_string(),
483            password: "pass".to_string(),
484            id_prefix: None,
485            ws_url: None,
486            sync_interval_secs: None,
487            client_cert: None,
488            client_key: None,
489            ca_cert: None,
490        };
491        let connector = Connector::new(config);
492        assert_eq!(connector.entity_count(), 0);
493        assert!(connector.cached_entities().is_empty());
494    }
495
496    #[test]
497    fn connector_config_deserialization() {
498        let json = r#"{
499            "name": "Remote Server",
500            "url": "http://remote:8080/api",
501            "username": "admin",
502            "password": "secret",
503            "id_prefix": "r1-"
504        }"#;
505        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
506        assert_eq!(config.name, "Remote Server");
507        assert_eq!(config.url, "http://remote:8080/api");
508        assert_eq!(config.username, "admin");
509        assert_eq!(config.password, "secret");
510        assert_eq!(config.id_prefix, Some("r1-".to_string()));
511    }
512
513    #[test]
514    fn connector_config_deserialization_without_prefix() {
515        let json = r#"{
516            "name": "Remote",
517            "url": "http://remote:8080/api",
518            "username": "admin",
519            "password": "secret"
520        }"#;
521        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
522        assert_eq!(config.id_prefix, None);
523    }
524
525    #[test]
526    fn id_prefix_application() {
527        let mut entity = HDict::new();
528        entity.set("id", Kind::Ref(HRef::from_val("site-1")));
529        entity.set("dis", Kind::Str("Main Site".to_string()));
530        entity.set("site", Kind::Marker);
531        entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
532        entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
533        entity.set(
534            "floorRef",
535            Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
536        );
537
538        prefix_refs(&mut entity, "r1-");
539
540        // id should be prefixed
541        match entity.get("id") {
542            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
543            other => panic!("expected Ref, got {other:?}"),
544        }
545
546        // siteRef should be prefixed
547        match entity.get("siteRef") {
548            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
549            other => panic!("expected Ref, got {other:?}"),
550        }
551
552        // equipRef should be prefixed
553        match entity.get("equipRef") {
554            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
555            other => panic!("expected Ref, got {other:?}"),
556        }
557
558        // floorRef should be prefixed, preserving dis
559        match entity.get("floorRef") {
560            Some(Kind::Ref(r)) => {
561                assert_eq!(r.val, "r1-floor-1");
562                assert_eq!(r.dis, Some("Floor 1".to_string()));
563            }
564            other => panic!("expected Ref, got {other:?}"),
565        }
566
567        // Non-ref tags should not be changed
568        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
569        assert_eq!(entity.get("site"), Some(&Kind::Marker));
570    }
571
572    #[test]
573    fn id_prefix_skips_non_ref_values() {
574        let mut entity = HDict::new();
575        entity.set("id", Kind::Ref(HRef::from_val("point-1")));
576        // A tag ending in "Ref" but whose value is not actually a Ref
577        entity.set("customRef", Kind::Str("not-a-ref".to_string()));
578
579        prefix_refs(&mut entity, "p-");
580
581        // id should be prefixed
582        match entity.get("id") {
583            Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
584            other => panic!("expected Ref, got {other:?}"),
585        }
586
587        // customRef is a Str, not a Ref, so it should be unchanged
588        assert_eq!(
589            entity.get("customRef"),
590            Some(&Kind::Str("not-a-ref".to_string()))
591        );
592    }
593
594    #[test]
595    fn connector_config_deserialization_full() {
596        let json = r#"{
597            "name": "Full Config",
598            "url": "https://remote:8443/api",
599            "username": "admin",
600            "password": "secret",
601            "id_prefix": "r1-",
602            "ws_url": "wss://remote:8443/api/ws",
603            "sync_interval_secs": 30,
604            "client_cert": "/etc/certs/client.pem",
605            "client_key": "/etc/certs/client-key.pem",
606            "ca_cert": "/etc/certs/ca.pem"
607        }"#;
608        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
609        assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
610        assert_eq!(config.sync_interval_secs, Some(30));
611        assert_eq!(
612            config.client_cert,
613            Some("/etc/certs/client.pem".to_string())
614        );
615        assert_eq!(
616            config.client_key,
617            Some("/etc/certs/client-key.pem".to_string())
618        );
619        assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
620    }
621
622    #[test]
623    fn strip_prefix_refs_reverses_prefix() {
624        let mut entity = HDict::new();
625        entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
626        entity.set("dis", Kind::Str("Main Site".to_string()));
627        entity.set("site", Kind::Marker);
628        entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
629        entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
630
631        strip_prefix_refs(&mut entity, "r1-");
632
633        match entity.get("id") {
634            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
635            other => panic!("expected Ref, got {other:?}"),
636        }
637        match entity.get("siteRef") {
638            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
639            other => panic!("expected Ref, got {other:?}"),
640        }
641        match entity.get("equipRef") {
642            Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
643            other => panic!("expected Ref, got {other:?}"),
644        }
645        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
646    }
647
648    #[test]
649    fn strip_prefix_refs_ignores_non_matching() {
650        let mut entity = HDict::new();
651        entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
652
653        strip_prefix_refs(&mut entity, "r1-");
654
655        match entity.get("id") {
656            Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
657            other => panic!("expected Ref, got {other:?}"),
658        }
659    }
660
661    #[test]
662    fn derive_ws_url_from_http() {
663        let config = ConnectorConfig {
664            name: "test".to_string(),
665            url: "http://remote:8080/api".to_string(),
666            username: "u".to_string(),
667            password: "p".to_string(),
668            id_prefix: None,
669            ws_url: None,
670            sync_interval_secs: None,
671            client_cert: None,
672            client_key: None,
673            ca_cert: None,
674        };
675        assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
676    }
677
678    #[test]
679    fn derive_ws_url_from_https() {
680        let config = ConnectorConfig {
681            name: "test".to_string(),
682            url: "https://remote:8443/api".to_string(),
683            username: "u".to_string(),
684            password: "p".to_string(),
685            id_prefix: None,
686            ws_url: None,
687            sync_interval_secs: None,
688            client_cert: None,
689            client_key: None,
690            ca_cert: None,
691        };
692        assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
693    }
694
695    #[test]
696    fn explicit_ws_url_overrides_derived() {
697        let config = ConnectorConfig {
698            name: "test".to_string(),
699            url: "http://remote:8080/api".to_string(),
700            username: "u".to_string(),
701            password: "p".to_string(),
702            id_prefix: None,
703            ws_url: Some("ws://custom:9999/ws".to_string()),
704            sync_interval_secs: None,
705            client_cert: None,
706            client_key: None,
707            ca_cert: None,
708        };
709        assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
710    }
711
712    #[test]
713    fn connector_tracks_entity_ids_in_ownership() {
714        let config = ConnectorConfig {
715            name: "test".to_string(),
716            url: "http://localhost:8080/api".to_string(),
717            username: "user".to_string(),
718            password: "pass".to_string(),
719            id_prefix: Some("t-".to_string()),
720            ws_url: None,
721            sync_interval_secs: None,
722            client_cert: None,
723            client_key: None,
724            ca_cert: None,
725        };
726        let connector = Connector::new(config);
727        assert!(!connector.owns("t-site-1"));
728
729        // Simulate populating cache
730        {
731            let mut entity = HDict::new();
732            entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
733            connector.update_cache(vec![entity]);
734        }
735
736        assert!(connector.owns("t-site-1"));
737        assert!(!connector.owns("other-1"));
738    }
739
740    #[test]
741    fn connector_new_defaults_transport_and_connected() {
742        let config = ConnectorConfig {
743            name: "test".to_string(),
744            url: "http://localhost:8080/api".to_string(),
745            username: "user".to_string(),
746            password: "pass".to_string(),
747            id_prefix: None,
748            ws_url: None,
749            sync_interval_secs: None,
750            client_cert: None,
751            client_key: None,
752            ca_cert: None,
753        };
754        let connector = Connector::new(config);
755        assert_eq!(connector.transport_mode(), TransportMode::Http);
756        assert!(!connector.is_connected());
757        assert!(connector.last_sync_time().is_none());
758    }
759
760    #[test]
761    fn connector_config_new_fields_default_to_none() {
762        let json = r#"{
763            "name": "Minimal",
764            "url": "http://remote:8080/api",
765            "username": "user",
766            "password": "pass"
767        }"#;
768        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
769        assert_eq!(config.ws_url, None);
770        assert_eq!(config.sync_interval_secs, None);
771        assert_eq!(config.client_cert, None);
772        assert_eq!(config.client_key, None);
773        assert_eq!(config.ca_cert, None);
774    }
775
776    #[test]
777    fn remote_watch_add_and_remove() {
778        let config = ConnectorConfig {
779            name: "test".to_string(),
780            url: "http://localhost:8080/api".to_string(),
781            username: "user".to_string(),
782            password: "pass".to_string(),
783            id_prefix: Some("r-".to_string()),
784            ws_url: None,
785            sync_interval_secs: None,
786            client_cert: None,
787            client_key: None,
788            ca_cert: None,
789        };
790        let connector = Connector::new(config);
791        assert_eq!(connector.remote_watch_count(), 0);
792
793        connector.add_remote_watch("r-site-1");
794        assert_eq!(connector.remote_watch_count(), 1);
795
796        connector.add_remote_watch("r-equip-2");
797        assert_eq!(connector.remote_watch_count(), 2);
798
799        // Duplicate add is idempotent (HashSet).
800        connector.add_remote_watch("r-site-1");
801        assert_eq!(connector.remote_watch_count(), 2);
802
803        connector.remove_remote_watch("r-site-1");
804        assert_eq!(connector.remote_watch_count(), 1);
805
806        // Removing a non-existent ID is a no-op.
807        connector.remove_remote_watch("r-nonexistent");
808        assert_eq!(connector.remote_watch_count(), 1);
809
810        connector.remove_remote_watch("r-equip-2");
811        assert_eq!(connector.remote_watch_count(), 0);
812    }
813}