Skip to main content

haystack_server/
connector.rs

1//! Connector for fetching entities from a remote Haystack server.
2
3use std::collections::HashSet;
4use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
5use std::sync::Arc;
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9
10use haystack_client::HaystackClient;
11use haystack_client::transport::http::HttpTransport;
12use haystack_client::transport::ws::WsTransport;
13use haystack_core::data::{HDict, HGrid};
14use haystack_core::kinds::{HRef, Kind};
15
16/// Type-erased persistent connection to a remote Haystack server.
17///
18/// Wraps either an HTTP or WebSocket `HaystackClient`, allowing the connector
19/// to hold a single persistent connection regardless of transport.
20enum ConnectorClient {
21    Http(HaystackClient<HttpTransport>),
22    Ws(HaystackClient<WsTransport>),
23}
24
25impl ConnectorClient {
26    /// Call a Haystack op through the underlying client, returning the
27    /// response grid or an error string.
28    async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
29        match self {
30            ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
31            ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
32        }
33    }
34}
35
36/// Transport protocol used by a connector.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38#[repr(u8)]
39pub enum TransportMode {
40    Http = 0,
41    WebSocket = 1,
42}
43
44/// Configuration for a remote Haystack server connection.
45#[derive(Debug, Clone, serde::Deserialize)]
46pub struct ConnectorConfig {
47    /// Display name for this connector.
48    pub name: String,
49    /// Base URL of the remote Haystack API (e.g. "http://remote:8080/api").
50    pub url: String,
51    /// Username for SCRAM authentication.
52    pub username: String,
53    /// Password for SCRAM authentication.
54    pub password: String,
55    /// Optional tag prefix to namespace remote entity IDs (e.g. "remote1-").
56    pub id_prefix: Option<String>,
57    /// WebSocket URL (e.g. "ws://remote:8080/api/ws"). Derived from url if omitted.
58    pub ws_url: Option<String>,
59    /// Fallback polling interval in seconds (default: 60).
60    pub sync_interval_secs: Option<u64>,
61    /// Path to PEM client certificate for mTLS.
62    pub client_cert: Option<String>,
63    /// Path to PEM client private key for mTLS.
64    pub client_key: Option<String>,
65    /// Path to PEM CA certificate for server verification.
66    pub ca_cert: Option<String>,
67}
68
69/// A connector that can fetch entities from a remote Haystack server.
70pub struct Connector {
71    pub config: ConnectorConfig,
72    /// Cached entities from last sync.
73    cache: RwLock<Vec<HDict>>,
74    /// Set of entity IDs owned by this connector (populated during sync/update_cache).
75    owned_ids: RwLock<HashSet<String>>,
76    /// Entity IDs being watched remotely (prefixed IDs).
77    remote_watch_ids: RwLock<HashSet<String>>,
78    /// Transport protocol (HTTP or WebSocket).
79    transport_mode: AtomicU8,
80    /// Whether the connector is currently connected (last sync succeeded).
81    connected: AtomicBool,
82    /// Timestamp of the last successful sync.
83    last_sync: RwLock<Option<DateTime<Utc>>>,
84    /// Persistent client connection used by sync and background tasks.
85    /// Lazily initialized on first sync. Cleared on connection error.
86    /// Uses `tokio::sync::RwLock` because the guard is held across `.await`.
87    client: tokio::sync::RwLock<Option<ConnectorClient>>,
88}
89
90impl Connector {
91    /// Create a new connector with an empty cache.
92    pub fn new(config: ConnectorConfig) -> Self {
93        Self {
94            config,
95            cache: RwLock::new(Vec::new()),
96            owned_ids: RwLock::new(HashSet::new()),
97            remote_watch_ids: RwLock::new(HashSet::new()),
98            transport_mode: AtomicU8::new(TransportMode::Http as u8),
99            connected: AtomicBool::new(false),
100            last_sync: RwLock::new(None),
101            client: tokio::sync::RwLock::new(None),
102        }
103    }
104
105    /// Attempt to connect to the remote server. Tries WebSocket first,
106    /// falls back to HTTP.
107    async fn connect_persistent(&self) -> Result<(), String> {
108        // Try WebSocket first
109        let ws_url = self.config.effective_ws_url();
110        match HaystackClient::connect_ws(
111            &self.config.url,
112            &ws_url,
113            &self.config.username,
114            &self.config.password,
115        )
116        .await
117        {
118            Ok(ws_client) => {
119                *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
120                self.transport_mode
121                    .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
122                self.connected.store(true, Ordering::Relaxed);
123                log::info!("Connected to {} via WebSocket", self.config.name);
124                return Ok(());
125            }
126            Err(e) => {
127                log::warn!(
128                    "WS connection to {} failed: {}, trying HTTP",
129                    self.config.name,
130                    e
131                );
132            }
133        }
134
135        // Fall back to HTTP
136        match HaystackClient::connect(
137            &self.config.url,
138            &self.config.username,
139            &self.config.password,
140        )
141        .await
142        {
143            Ok(http_client) => {
144                *self.client.write().await = Some(ConnectorClient::Http(http_client));
145                self.transport_mode
146                    .store(TransportMode::Http as u8, Ordering::Relaxed);
147                self.connected.store(true, Ordering::Relaxed);
148                log::info!("Connected to {} via HTTP", self.config.name);
149                Ok(())
150            }
151            Err(e) => {
152                self.connected.store(false, Ordering::Relaxed);
153                Err(format!("connection failed: {e}"))
154            }
155        }
156    }
157
158    /// Connect to the remote server, fetch all entities, apply id prefixing,
159    /// and store them in the cache. Returns the count of entities synced.
160    ///
161    /// Uses the persistent client if available, otherwise establishes a new
162    /// connection (WS-first, falling back to HTTP).
163    pub async fn sync(&self) -> Result<usize, String> {
164        // Ensure we have a connection
165        if self.client.read().await.is_none() {
166            self.connect_persistent().await?;
167        }
168
169        // Use the persistent client to read all entities
170        let grid = {
171            let client = self.client.read().await;
172            let client = client.as_ref().ok_or("not connected")?;
173            client.call("read", &build_read_all_grid()).await
174        };
175
176        let grid = grid.map_err(|e| {
177            self.connected.store(false, Ordering::Relaxed);
178            format!("read failed: {e}")
179        })?;
180
181        let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
182
183        // Apply id prefix if configured.
184        if let Some(ref prefix) = self.config.id_prefix {
185            for entity in &mut entities {
186                prefix_refs(entity, prefix);
187            }
188        }
189
190        let count = entities.len();
191        self.update_cache(entities);
192        *self.last_sync.write() = Some(Utc::now());
193        self.connected.store(true, Ordering::Relaxed);
194        Ok(count)
195    }
196
197    /// Replace the cached entities and rebuild the owned-ID index.
198    ///
199    /// Extracts all `id` Ref values from the given entities into a `HashSet`,
200    /// then atomically replaces both the entity cache and the ownership set.
201    pub fn update_cache(&self, entities: Vec<HDict>) {
202        let ids: HashSet<String> = entities
203            .iter()
204            .filter_map(|e| match e.get("id") {
205                Some(Kind::Ref(r)) => Some(r.val.clone()),
206                _ => None,
207            })
208            .collect();
209        *self.cache.write() = entities;
210        *self.owned_ids.write() = ids;
211    }
212
213    /// Returns `true` if this connector owns an entity with the given ID.
214    pub fn owns(&self, id: &str) -> bool {
215        self.owned_ids.read().contains(id)
216    }
217
218    /// Returns a clone of all cached entities.
219    pub fn cached_entities(&self) -> Vec<HDict> {
220        self.cache.read().clone()
221    }
222
223    /// Returns the number of cached entities.
224    pub fn entity_count(&self) -> usize {
225        self.cache.read().len()
226    }
227
228    /// Add a federated entity ID to the remote watch set.
229    pub fn add_remote_watch(&self, prefixed_id: &str) {
230        self.remote_watch_ids
231            .write()
232            .insert(prefixed_id.to_string());
233    }
234
235    /// Remove a federated entity ID from the remote watch set.
236    pub fn remove_remote_watch(&self, prefixed_id: &str) {
237        self.remote_watch_ids.write().remove(prefixed_id);
238    }
239
240    /// Returns the number of entity IDs being watched remotely.
241    pub fn remote_watch_count(&self) -> usize {
242        self.remote_watch_ids.read().len()
243    }
244
245    /// Returns the current transport mode.
246    pub fn transport_mode(&self) -> TransportMode {
247        match self.transport_mode.load(Ordering::Relaxed) {
248            1 => TransportMode::WebSocket,
249            _ => TransportMode::Http,
250        }
251    }
252
253    /// Returns whether the connector is currently connected (last sync succeeded).
254    pub fn is_connected(&self) -> bool {
255        self.connected.load(Ordering::Relaxed)
256    }
257
258    /// Returns the timestamp of the last successful sync, if any.
259    pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
260        *self.last_sync.read()
261    }
262
263    /// Strip the id_prefix from an entity ID.
264    fn strip_id(&self, prefixed_id: &str) -> String {
265        if let Some(ref prefix) = self.config.id_prefix {
266            prefixed_id
267                .strip_prefix(prefix.as_str())
268                .unwrap_or(prefixed_id)
269                .to_string()
270        } else {
271            prefixed_id.to_string()
272        }
273    }
274
275    /// Create a fresh HTTP connection to the remote.
276    ///
277    // TODO: Reuse the persistent client for proxy methods once we have a
278    // better abstraction over ConnectorClient that exposes typed ops.
279    async fn connect_remote(
280        &self,
281    ) -> Result<
282        HaystackClient<haystack_client::transport::http::HttpTransport>,
283        String,
284    > {
285        HaystackClient::connect(
286            &self.config.url,
287            &self.config.username,
288            &self.config.password,
289        )
290        .await
291        .map_err(|e| format!("connection failed: {e}"))
292    }
293
294    /// Proxy a hisRead request to the remote server.
295    pub async fn proxy_his_read(
296        &self,
297        prefixed_id: &str,
298        range: &str,
299    ) -> Result<HGrid, String> {
300        let id = self.strip_id(prefixed_id);
301        let client = self.connect_remote().await?;
302        client
303            .his_read(&id, range)
304            .await
305            .map_err(|e| format!("hisRead failed: {e}"))
306    }
307
308    /// Proxy a pointWrite request to the remote server.
309    pub async fn proxy_point_write(
310        &self,
311        prefixed_id: &str,
312        level: u8,
313        val: &Kind,
314    ) -> Result<HGrid, String> {
315        let id = self.strip_id(prefixed_id);
316        let client = self.connect_remote().await?;
317        client
318            .point_write(&id, level, val.clone())
319            .await
320            .map_err(|e| format!("pointWrite failed: {e}"))
321    }
322
323    /// Proxy a hisWrite request to the remote server.
324    pub async fn proxy_his_write(
325        &self,
326        prefixed_id: &str,
327        items: Vec<HDict>,
328    ) -> Result<HGrid, String> {
329        let id = self.strip_id(prefixed_id);
330        let client = self.connect_remote().await?;
331        client
332            .his_write(&id, items)
333            .await
334            .map_err(|e| format!("hisWrite failed: {e}"))
335    }
336
337    /// Proxy an import request for a single entity to the remote server.
338    ///
339    /// Strips the id prefix from the entity, wraps it in a single-row grid,
340    /// and calls the remote `import` op.
341    pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
342        use crate::connector::strip_prefix_refs;
343
344        let mut stripped = entity.clone();
345        if let Some(ref prefix) = self.config.id_prefix {
346            strip_prefix_refs(&mut stripped, prefix);
347        }
348
349        // Build a single-row grid with columns from the entity.
350        let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
351        let cols: Vec<haystack_core::data::HCol> =
352            col_names.iter().map(|n| haystack_core::data::HCol::new(n.as_str())).collect();
353        let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
354
355        let client = self.connect_remote().await?;
356        client
357            .call("import", &grid)
358            .await
359            .map_err(|e| format!("import failed: {e}"))
360    }
361
362    /// Proxy an invokeAction request to the remote server.
363    pub async fn proxy_invoke_action(
364        &self,
365        prefixed_id: &str,
366        action: &str,
367        args: HDict,
368    ) -> Result<HGrid, String> {
369        let id = self.strip_id(prefixed_id);
370        let client = self.connect_remote().await?;
371        client
372            .invoke_action(&id, action, args)
373            .await
374            .map_err(|e| format!("invokeAction failed: {e}"))
375    }
376
377    /// Spawn a background sync task for this connector.
378    ///
379    /// The task loops forever, syncing entities at the configured interval.
380    /// On error, the persistent client is cleared to force reconnection on
381    /// the next iteration.
382    pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
383        let interval_secs = connector.config.effective_sync_interval_secs();
384        tokio::spawn(async move {
385            loop {
386                match connector.sync().await {
387                    Ok(count) => {
388                        log::debug!(
389                            "Synced {} entities from {}",
390                            count,
391                            connector.config.name
392                        );
393                    }
394                    Err(e) => {
395                        log::error!("Sync failed for {}: {}", connector.config.name, e);
396                        // Clear the client on error to force reconnection
397                        *connector.client.write().await = None;
398                        connector.connected.store(false, Ordering::Relaxed);
399                    }
400                }
401                tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
402            }
403        })
404    }
405}
406
407/// Build a `read` request grid with filter `"*"` (all entities).
408fn build_read_all_grid() -> HGrid {
409    use haystack_core::data::HCol;
410    let mut row = HDict::new();
411    row.set("filter", Kind::Str("*".to_string()));
412    HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
413}
414
415impl ConnectorConfig {
416    /// Returns the WebSocket URL. If `ws_url` is set, returns it directly.
417    /// Otherwise derives from `url` by replacing `http://` with `ws://`
418    /// or `https://` with `wss://` and appending `/ws`.
419    pub fn effective_ws_url(&self) -> String {
420        if let Some(ref ws) = self.ws_url {
421            return ws.clone();
422        }
423        let ws = if self.url.starts_with("https://") {
424            self.url.replacen("https://", "wss://", 1)
425        } else {
426            self.url.replacen("http://", "ws://", 1)
427        };
428        format!("{ws}/ws")
429    }
430
431    /// Returns the sync interval in seconds. Defaults to 60 if not set.
432    pub fn effective_sync_interval_secs(&self) -> u64 {
433        self.sync_interval_secs.unwrap_or(60)
434    }
435}
436
437/// Prefix all Ref values in an entity dict.
438///
439/// Prefixes the `id` tag and any tag whose name ends with `Ref`
440/// (e.g. `siteRef`, `equipRef`, `floorRef`, `spaceRef`).
441pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
442    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
443
444    for name in &tag_names {
445        let should_prefix = name == "id" || name.ends_with("Ref");
446        if !should_prefix {
447            continue;
448        }
449
450        if let Some(Kind::Ref(r)) = entity.get(name) {
451            let new_val = format!("{}{}", prefix, r.val);
452            let new_ref = HRef::new(new_val, r.dis.clone());
453            entity.set(name.as_str(), Kind::Ref(new_ref));
454        }
455    }
456}
457
458/// Strip a prefix from all Ref values in an entity dict.
459///
460/// The inverse of [`prefix_refs`]. Strips the given prefix from the `id` tag
461/// and any tag whose name ends with `Ref`, but only if the Ref value actually
462/// starts with the prefix. Preserves `dis` metadata on Refs.
463pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
464    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
465
466    for name in &tag_names {
467        let should_strip = name == "id" || name.ends_with("Ref");
468        if !should_strip {
469            continue;
470        }
471
472        if let Some(Kind::Ref(r)) = entity.get(name)
473            && let Some(stripped) = r.val.strip_prefix(prefix)
474        {
475            let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
476            entity.set(name.as_str(), Kind::Ref(new_ref));
477        }
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use haystack_core::kinds::HRef;
485
486    #[test]
487    fn connector_new_empty_cache() {
488        let config = ConnectorConfig {
489            name: "test".to_string(),
490            url: "http://localhost:8080/api".to_string(),
491            username: "user".to_string(),
492            password: "pass".to_string(),
493            id_prefix: None,
494            ws_url: None,
495            sync_interval_secs: None,
496            client_cert: None,
497            client_key: None,
498            ca_cert: None,
499        };
500        let connector = Connector::new(config);
501        assert_eq!(connector.entity_count(), 0);
502        assert!(connector.cached_entities().is_empty());
503    }
504
505    #[test]
506    fn connector_config_deserialization() {
507        let json = r#"{
508            "name": "Remote Server",
509            "url": "http://remote:8080/api",
510            "username": "admin",
511            "password": "secret",
512            "id_prefix": "r1-"
513        }"#;
514        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
515        assert_eq!(config.name, "Remote Server");
516        assert_eq!(config.url, "http://remote:8080/api");
517        assert_eq!(config.username, "admin");
518        assert_eq!(config.password, "secret");
519        assert_eq!(config.id_prefix, Some("r1-".to_string()));
520    }
521
522    #[test]
523    fn connector_config_deserialization_without_prefix() {
524        let json = r#"{
525            "name": "Remote",
526            "url": "http://remote:8080/api",
527            "username": "admin",
528            "password": "secret"
529        }"#;
530        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
531        assert_eq!(config.id_prefix, None);
532    }
533
534    #[test]
535    fn id_prefix_application() {
536        let mut entity = HDict::new();
537        entity.set("id", Kind::Ref(HRef::from_val("site-1")));
538        entity.set("dis", Kind::Str("Main Site".to_string()));
539        entity.set("site", Kind::Marker);
540        entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
541        entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
542        entity.set(
543            "floorRef",
544            Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
545        );
546
547        prefix_refs(&mut entity, "r1-");
548
549        // id should be prefixed
550        match entity.get("id") {
551            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
552            other => panic!("expected Ref, got {other:?}"),
553        }
554
555        // siteRef should be prefixed
556        match entity.get("siteRef") {
557            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
558            other => panic!("expected Ref, got {other:?}"),
559        }
560
561        // equipRef should be prefixed
562        match entity.get("equipRef") {
563            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
564            other => panic!("expected Ref, got {other:?}"),
565        }
566
567        // floorRef should be prefixed, preserving dis
568        match entity.get("floorRef") {
569            Some(Kind::Ref(r)) => {
570                assert_eq!(r.val, "r1-floor-1");
571                assert_eq!(r.dis, Some("Floor 1".to_string()));
572            }
573            other => panic!("expected Ref, got {other:?}"),
574        }
575
576        // Non-ref tags should not be changed
577        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
578        assert_eq!(entity.get("site"), Some(&Kind::Marker));
579    }
580
581    #[test]
582    fn id_prefix_skips_non_ref_values() {
583        let mut entity = HDict::new();
584        entity.set("id", Kind::Ref(HRef::from_val("point-1")));
585        // A tag ending in "Ref" but whose value is not actually a Ref
586        entity.set("customRef", Kind::Str("not-a-ref".to_string()));
587
588        prefix_refs(&mut entity, "p-");
589
590        // id should be prefixed
591        match entity.get("id") {
592            Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
593            other => panic!("expected Ref, got {other:?}"),
594        }
595
596        // customRef is a Str, not a Ref, so it should be unchanged
597        assert_eq!(
598            entity.get("customRef"),
599            Some(&Kind::Str("not-a-ref".to_string()))
600        );
601    }
602
603    #[test]
604    fn connector_config_deserialization_full() {
605        let json = r#"{
606            "name": "Full Config",
607            "url": "https://remote:8443/api",
608            "username": "admin",
609            "password": "secret",
610            "id_prefix": "r1-",
611            "ws_url": "wss://remote:8443/api/ws",
612            "sync_interval_secs": 30,
613            "client_cert": "/etc/certs/client.pem",
614            "client_key": "/etc/certs/client-key.pem",
615            "ca_cert": "/etc/certs/ca.pem"
616        }"#;
617        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
618        assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
619        assert_eq!(config.sync_interval_secs, Some(30));
620        assert_eq!(config.client_cert, Some("/etc/certs/client.pem".to_string()));
621        assert_eq!(config.client_key, Some("/etc/certs/client-key.pem".to_string()));
622        assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
623    }
624
625    #[test]
626    fn strip_prefix_refs_reverses_prefix() {
627        let mut entity = HDict::new();
628        entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
629        entity.set("dis", Kind::Str("Main Site".to_string()));
630        entity.set("site", Kind::Marker);
631        entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
632        entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
633
634        strip_prefix_refs(&mut entity, "r1-");
635
636        match entity.get("id") {
637            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
638            other => panic!("expected Ref, got {other:?}"),
639        }
640        match entity.get("siteRef") {
641            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
642            other => panic!("expected Ref, got {other:?}"),
643        }
644        match entity.get("equipRef") {
645            Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
646            other => panic!("expected Ref, got {other:?}"),
647        }
648        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
649    }
650
651    #[test]
652    fn strip_prefix_refs_ignores_non_matching() {
653        let mut entity = HDict::new();
654        entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
655
656        strip_prefix_refs(&mut entity, "r1-");
657
658        match entity.get("id") {
659            Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
660            other => panic!("expected Ref, got {other:?}"),
661        }
662    }
663
664    #[test]
665    fn derive_ws_url_from_http() {
666        let config = ConnectorConfig {
667            name: "test".to_string(),
668            url: "http://remote:8080/api".to_string(),
669            username: "u".to_string(),
670            password: "p".to_string(),
671            id_prefix: None,
672            ws_url: None,
673            sync_interval_secs: None,
674            client_cert: None,
675            client_key: None,
676            ca_cert: None,
677        };
678        assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
679    }
680
681    #[test]
682    fn derive_ws_url_from_https() {
683        let config = ConnectorConfig {
684            name: "test".to_string(),
685            url: "https://remote:8443/api".to_string(),
686            username: "u".to_string(),
687            password: "p".to_string(),
688            id_prefix: None,
689            ws_url: None,
690            sync_interval_secs: None,
691            client_cert: None,
692            client_key: None,
693            ca_cert: None,
694        };
695        assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
696    }
697
698    #[test]
699    fn explicit_ws_url_overrides_derived() {
700        let config = ConnectorConfig {
701            name: "test".to_string(),
702            url: "http://remote:8080/api".to_string(),
703            username: "u".to_string(),
704            password: "p".to_string(),
705            id_prefix: None,
706            ws_url: Some("ws://custom:9999/ws".to_string()),
707            sync_interval_secs: None,
708            client_cert: None,
709            client_key: None,
710            ca_cert: None,
711        };
712        assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
713    }
714
715    #[test]
716    fn connector_tracks_entity_ids_in_ownership() {
717        let config = ConnectorConfig {
718            name: "test".to_string(),
719            url: "http://localhost:8080/api".to_string(),
720            username: "user".to_string(),
721            password: "pass".to_string(),
722            id_prefix: Some("t-".to_string()),
723            ws_url: None,
724            sync_interval_secs: None,
725            client_cert: None,
726            client_key: None,
727            ca_cert: None,
728        };
729        let connector = Connector::new(config);
730        assert!(!connector.owns("t-site-1"));
731
732        // Simulate populating cache
733        {
734            let mut entity = HDict::new();
735            entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
736            connector.update_cache(vec![entity]);
737        }
738
739        assert!(connector.owns("t-site-1"));
740        assert!(!connector.owns("other-1"));
741    }
742
743    #[test]
744    fn connector_new_defaults_transport_and_connected() {
745        let config = ConnectorConfig {
746            name: "test".to_string(),
747            url: "http://localhost:8080/api".to_string(),
748            username: "user".to_string(),
749            password: "pass".to_string(),
750            id_prefix: None,
751            ws_url: None,
752            sync_interval_secs: None,
753            client_cert: None,
754            client_key: None,
755            ca_cert: None,
756        };
757        let connector = Connector::new(config);
758        assert_eq!(connector.transport_mode(), TransportMode::Http);
759        assert!(!connector.is_connected());
760        assert!(connector.last_sync_time().is_none());
761    }
762
763    #[test]
764    fn connector_config_new_fields_default_to_none() {
765        let json = r#"{
766            "name": "Minimal",
767            "url": "http://remote:8080/api",
768            "username": "user",
769            "password": "pass"
770        }"#;
771        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
772        assert_eq!(config.ws_url, None);
773        assert_eq!(config.sync_interval_secs, None);
774        assert_eq!(config.client_cert, None);
775        assert_eq!(config.client_key, None);
776        assert_eq!(config.ca_cert, None);
777    }
778
779    #[test]
780    fn remote_watch_add_and_remove() {
781        let config = ConnectorConfig {
782            name: "test".to_string(),
783            url: "http://localhost:8080/api".to_string(),
784            username: "user".to_string(),
785            password: "pass".to_string(),
786            id_prefix: Some("r-".to_string()),
787            ws_url: None,
788            sync_interval_secs: None,
789            client_cert: None,
790            client_key: None,
791            ca_cert: None,
792        };
793        let connector = Connector::new(config);
794        assert_eq!(connector.remote_watch_count(), 0);
795
796        connector.add_remote_watch("r-site-1");
797        assert_eq!(connector.remote_watch_count(), 1);
798
799        connector.add_remote_watch("r-equip-2");
800        assert_eq!(connector.remote_watch_count(), 2);
801
802        // Duplicate add is idempotent (HashSet).
803        connector.add_remote_watch("r-site-1");
804        assert_eq!(connector.remote_watch_count(), 2);
805
806        connector.remove_remote_watch("r-site-1");
807        assert_eq!(connector.remote_watch_count(), 1);
808
809        // Removing a non-existent ID is a no-op.
810        connector.remove_remote_watch("r-nonexistent");
811        assert_eq!(connector.remote_watch_count(), 1);
812
813        connector.remove_remote_watch("r-equip-2");
814        assert_eq!(connector.remote_watch_count(), 0);
815    }
816}