Skip to main content

haystack_server/
connector.rs

1//! Connector for fetching entities from a remote Haystack server.
2
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9
10use haystack_client::HaystackClient;
11use haystack_client::transport::http::HttpTransport;
12use haystack_client::transport::ws::WsTransport;
13use haystack_core::data::{HDict, HGrid};
14use haystack_core::kinds::{HRef, Kind};
15
16/// Type-erased persistent connection to a remote Haystack server.
17///
18/// Wraps either an HTTP or WebSocket `HaystackClient`, allowing the connector
19/// to hold a single persistent connection regardless of transport.
20enum ConnectorClient {
21    Http(HaystackClient<HttpTransport>),
22    Ws(HaystackClient<WsTransport>),
23}
24
25impl ConnectorClient {
26    /// Call a Haystack op through the underlying client, returning the
27    /// response grid or an error string.
28    async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, String> {
29        match self {
30            ConnectorClient::Http(c) => c.call(op, req).await.map_err(|e| e.to_string()),
31            ConnectorClient::Ws(c) => c.call(op, req).await.map_err(|e| e.to_string()),
32        }
33    }
34
35    /// Read historical data for a point.
36    async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, String> {
37        match self {
38            ConnectorClient::Http(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
39            ConnectorClient::Ws(c) => c.his_read(id, range).await.map_err(|e| e.to_string()),
40        }
41    }
42
43    /// Write historical data for a point.
44    async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, String> {
45        match self {
46            ConnectorClient::Http(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
47            ConnectorClient::Ws(c) => c.his_write(id, items).await.map_err(|e| e.to_string()),
48        }
49    }
50
51    /// Write a value to a writable point.
52    async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, String> {
53        match self {
54            ConnectorClient::Http(c) => c
55                .point_write(id, level, val)
56                .await
57                .map_err(|e| e.to_string()),
58            ConnectorClient::Ws(c) => c
59                .point_write(id, level, val)
60                .await
61                .map_err(|e| e.to_string()),
62        }
63    }
64
65    /// Invoke an action on an entity.
66    async fn invoke_action(&self, id: &str, action: &str, args: HDict) -> Result<HGrid, String> {
67        match self {
68            ConnectorClient::Http(c) => c
69                .invoke_action(id, action, args)
70                .await
71                .map_err(|e| e.to_string()),
72            ConnectorClient::Ws(c) => c
73                .invoke_action(id, action, args)
74                .await
75                .map_err(|e| e.to_string()),
76        }
77    }
78}
79
80/// Transport protocol used by a connector.
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82#[repr(u8)]
83pub enum TransportMode {
84    Http = 0,
85    WebSocket = 1,
86}
87
88/// Configuration for a remote Haystack server connection.
89#[derive(Debug, Clone, serde::Deserialize)]
90pub struct ConnectorConfig {
91    /// Display name for this connector.
92    pub name: String,
93    /// Base URL of the remote Haystack API (e.g. "http://remote:8080/api").
94    pub url: String,
95    /// Username for SCRAM authentication.
96    pub username: String,
97    /// Password for SCRAM authentication.
98    pub password: String,
99    /// Optional tag prefix to namespace remote entity IDs (e.g. "remote1-").
100    pub id_prefix: Option<String>,
101    /// WebSocket URL (e.g. "ws://remote:8080/api/ws"). Derived from url if omitted.
102    pub ws_url: Option<String>,
103    /// Fallback polling interval in seconds (default: 60).
104    pub sync_interval_secs: Option<u64>,
105    /// Path to PEM client certificate for mTLS.
106    pub client_cert: Option<String>,
107    /// Path to PEM client private key for mTLS.
108    pub client_key: Option<String>,
109    /// Path to PEM CA certificate for server verification.
110    pub ca_cert: Option<String>,
111}
112
113/// A connector that can fetch entities from a remote Haystack server.
114pub struct Connector {
115    pub config: ConnectorConfig,
116    /// Cached entities from last sync.
117    cache: RwLock<Vec<HDict>>,
118    /// Set of entity IDs owned by this connector (populated during sync/update_cache).
119    owned_ids: RwLock<HashSet<String>>,
120    /// Entity IDs being watched remotely (prefixed IDs).
121    remote_watch_ids: RwLock<HashSet<String>>,
122    /// Transport protocol (HTTP or WebSocket).
123    transport_mode: AtomicU8,
124    /// Whether the connector is currently connected (last sync succeeded).
125    connected: AtomicBool,
126    /// Timestamp of the last successful sync.
127    last_sync: RwLock<Option<DateTime<Utc>>>,
128    /// Persistent client connection used by sync and background tasks.
129    /// Lazily initialized on first sync. Cleared on connection error.
130    /// Uses `tokio::sync::RwLock` because the guard is held across `.await`.
131    client: tokio::sync::RwLock<Option<ConnectorClient>>,
132}
133
134impl Connector {
135    /// Create a new connector with an empty cache.
136    pub fn new(config: ConnectorConfig) -> Self {
137        Self {
138            config,
139            cache: RwLock::new(Vec::new()),
140            owned_ids: RwLock::new(HashSet::new()),
141            remote_watch_ids: RwLock::new(HashSet::new()),
142            transport_mode: AtomicU8::new(TransportMode::Http as u8),
143            connected: AtomicBool::new(false),
144            last_sync: RwLock::new(None),
145            client: tokio::sync::RwLock::new(None),
146        }
147    }
148
149    /// Attempt to connect to the remote server. Tries WebSocket first,
150    /// falls back to HTTP.
151    async fn connect_persistent(&self) -> Result<(), String> {
152        // Try WebSocket first
153        let ws_url = self.config.effective_ws_url();
154        match HaystackClient::connect_ws(
155            &self.config.url,
156            &ws_url,
157            &self.config.username,
158            &self.config.password,
159        )
160        .await
161        {
162            Ok(ws_client) => {
163                *self.client.write().await = Some(ConnectorClient::Ws(ws_client));
164                self.transport_mode
165                    .store(TransportMode::WebSocket as u8, Ordering::Relaxed);
166                self.connected.store(true, Ordering::Relaxed);
167                log::info!("Connected to {} via WebSocket", self.config.name);
168                return Ok(());
169            }
170            Err(e) => {
171                log::warn!(
172                    "WS connection to {} failed: {}, trying HTTP",
173                    self.config.name,
174                    e
175                );
176            }
177        }
178
179        // Fall back to HTTP
180        match HaystackClient::connect(
181            &self.config.url,
182            &self.config.username,
183            &self.config.password,
184        )
185        .await
186        {
187            Ok(http_client) => {
188                *self.client.write().await = Some(ConnectorClient::Http(http_client));
189                self.transport_mode
190                    .store(TransportMode::Http as u8, Ordering::Relaxed);
191                self.connected.store(true, Ordering::Relaxed);
192                log::info!("Connected to {} via HTTP", self.config.name);
193                Ok(())
194            }
195            Err(e) => {
196                self.connected.store(false, Ordering::Relaxed);
197                Err(format!("connection failed: {e}"))
198            }
199        }
200    }
201
202    /// Connect to the remote server, fetch all entities, apply id prefixing,
203    /// and store them in the cache. Returns the count of entities synced.
204    ///
205    /// Uses the persistent client if available, otherwise establishes a new
206    /// connection (WS-first, falling back to HTTP).
207    pub async fn sync(&self) -> Result<usize, String> {
208        // Ensure we have a connection
209        if self.client.read().await.is_none() {
210            self.connect_persistent().await?;
211        }
212
213        // Use the persistent client to read all entities
214        let grid = {
215            let client = self.client.read().await;
216            let client = client.as_ref().ok_or("not connected")?;
217            client.call("read", &build_read_all_grid()).await
218        };
219
220        let grid = grid.map_err(|e| {
221            self.connected.store(false, Ordering::Relaxed);
222            format!("read failed: {e}")
223        })?;
224
225        let mut entities: Vec<HDict> = grid.rows.into_iter().collect();
226
227        // Apply id prefix if configured.
228        if let Some(ref prefix) = self.config.id_prefix {
229            for entity in &mut entities {
230                prefix_refs(entity, prefix);
231            }
232        }
233
234        let count = entities.len();
235        self.update_cache(entities);
236        *self.last_sync.write() = Some(Utc::now());
237        self.connected.store(true, Ordering::Relaxed);
238        Ok(count)
239    }
240
241    /// Replace the cached entities and rebuild the owned-ID index.
242    ///
243    /// Extracts all `id` Ref values from the given entities into a `HashSet`,
244    /// then atomically replaces both the entity cache and the ownership set.
245    pub fn update_cache(&self, entities: Vec<HDict>) {
246        let ids: HashSet<String> = entities
247            .iter()
248            .filter_map(|e| match e.get("id") {
249                Some(Kind::Ref(r)) => Some(r.val.clone()),
250                _ => None,
251            })
252            .collect();
253        *self.cache.write() = entities;
254        *self.owned_ids.write() = ids;
255    }
256
257    /// Returns `true` if this connector owns an entity with the given ID.
258    pub fn owns(&self, id: &str) -> bool {
259        self.owned_ids.read().contains(id)
260    }
261
262    /// Returns a clone of all cached entities.
263    pub fn cached_entities(&self) -> Vec<HDict> {
264        self.cache.read().clone()
265    }
266
267    /// Returns the number of cached entities.
268    pub fn entity_count(&self) -> usize {
269        self.cache.read().len()
270    }
271
272    /// Add a federated entity ID to the remote watch set.
273    pub fn add_remote_watch(&self, prefixed_id: &str) {
274        self.remote_watch_ids
275            .write()
276            .insert(prefixed_id.to_string());
277    }
278
279    /// Remove a federated entity ID from the remote watch set.
280    pub fn remove_remote_watch(&self, prefixed_id: &str) {
281        self.remote_watch_ids.write().remove(prefixed_id);
282    }
283
284    /// Returns the number of entity IDs being watched remotely.
285    pub fn remote_watch_count(&self) -> usize {
286        self.remote_watch_ids.read().len()
287    }
288
289    /// Returns the current transport mode.
290    pub fn transport_mode(&self) -> TransportMode {
291        match self.transport_mode.load(Ordering::Relaxed) {
292            1 => TransportMode::WebSocket,
293            _ => TransportMode::Http,
294        }
295    }
296
297    /// Returns whether the connector is currently connected (last sync succeeded).
298    pub fn is_connected(&self) -> bool {
299        self.connected.load(Ordering::Relaxed)
300    }
301
302    /// Returns the timestamp of the last successful sync, if any.
303    pub fn last_sync_time(&self) -> Option<DateTime<Utc>> {
304        *self.last_sync.read()
305    }
306
307    /// Strip the id_prefix from an entity ID.
308    fn strip_id(&self, prefixed_id: &str) -> String {
309        if let Some(ref prefix) = self.config.id_prefix {
310            prefixed_id
311                .strip_prefix(prefix.as_str())
312                .unwrap_or(prefixed_id)
313                .to_string()
314        } else {
315            prefixed_id.to_string()
316        }
317    }
318
319    /// Ensure the persistent client is connected, establishing a new
320    /// connection if needed. Returns an error if connection fails.
321    async fn ensure_connected(&self) -> Result<(), String> {
322        if self.client.read().await.is_none() {
323            self.connect_persistent().await?;
324        }
325        Ok(())
326    }
327
328    /// Handle a failed proxy operation by clearing the client to force
329    /// reconnection on the next call.
330    async fn on_proxy_error(&self, op_name: &str, e: String) -> String {
331        *self.client.write().await = None;
332        self.connected.store(false, Ordering::Relaxed);
333        format!("{op_name} failed: {e}")
334    }
335
336    /// Proxy a hisRead request to the remote server.
337    pub async fn proxy_his_read(&self, prefixed_id: &str, range: &str) -> Result<HGrid, String> {
338        self.ensure_connected().await?;
339        let id = self.strip_id(prefixed_id);
340        let guard = self.client.read().await;
341        let client = guard.as_ref().ok_or("not connected")?;
342        match client.his_read(&id, range).await {
343            Ok(grid) => Ok(grid),
344            Err(e) => {
345                drop(guard);
346                Err(self.on_proxy_error("hisRead", e).await)
347            }
348        }
349    }
350
351    /// Proxy a pointWrite request to the remote server.
352    pub async fn proxy_point_write(
353        &self,
354        prefixed_id: &str,
355        level: u8,
356        val: &Kind,
357    ) -> Result<HGrid, String> {
358        self.ensure_connected().await?;
359        let id = self.strip_id(prefixed_id);
360        let val = val.clone();
361        let guard = self.client.read().await;
362        let client = guard.as_ref().ok_or("not connected")?;
363        match client.point_write(&id, level, val).await {
364            Ok(grid) => Ok(grid),
365            Err(e) => {
366                drop(guard);
367                Err(self.on_proxy_error("pointWrite", e).await)
368            }
369        }
370    }
371
372    /// Proxy a hisWrite request to the remote server.
373    pub async fn proxy_his_write(
374        &self,
375        prefixed_id: &str,
376        items: Vec<HDict>,
377    ) -> Result<HGrid, String> {
378        self.ensure_connected().await?;
379        let id = self.strip_id(prefixed_id);
380        let guard = self.client.read().await;
381        let client = guard.as_ref().ok_or("not connected")?;
382        match client.his_write(&id, items).await {
383            Ok(grid) => Ok(grid),
384            Err(e) => {
385                drop(guard);
386                Err(self.on_proxy_error("hisWrite", e).await)
387            }
388        }
389    }
390
391    /// Proxy an import request for a single entity to the remote server.
392    ///
393    /// Strips the id prefix from the entity, wraps it in a single-row grid,
394    /// and calls the remote `import` op.
395    pub async fn proxy_import(&self, entity: &HDict) -> Result<HGrid, String> {
396        self.ensure_connected().await?;
397
398        let mut stripped = entity.clone();
399        if let Some(ref prefix) = self.config.id_prefix {
400            strip_prefix_refs(&mut stripped, prefix);
401        }
402
403        let col_names: Vec<String> = stripped.tag_names().map(|s| s.to_string()).collect();
404        let cols: Vec<haystack_core::data::HCol> = col_names
405            .iter()
406            .map(|n| haystack_core::data::HCol::new(n.as_str()))
407            .collect();
408        let grid = HGrid::from_parts(HDict::new(), cols, vec![stripped]);
409
410        let guard = self.client.read().await;
411        let client = guard.as_ref().ok_or("not connected")?;
412        match client.call("import", &grid).await {
413            Ok(grid) => Ok(grid),
414            Err(e) => {
415                drop(guard);
416                Err(self.on_proxy_error("import", e).await)
417            }
418        }
419    }
420
421    /// Proxy an invokeAction request to the remote server.
422    pub async fn proxy_invoke_action(
423        &self,
424        prefixed_id: &str,
425        action: &str,
426        args: HDict,
427    ) -> Result<HGrid, String> {
428        self.ensure_connected().await?;
429        let id = self.strip_id(prefixed_id);
430        let action = action.to_string();
431        let guard = self.client.read().await;
432        let client = guard.as_ref().ok_or("not connected")?;
433        match client.invoke_action(&id, &action, args).await {
434            Ok(grid) => Ok(grid),
435            Err(e) => {
436                drop(guard);
437                Err(self.on_proxy_error("invokeAction", e).await)
438            }
439        }
440    }
441
442    /// Spawn a background sync task for this connector.
443    ///
444    /// The task loops forever, syncing entities at the configured interval.
445    /// On error, the persistent client is cleared to force reconnection on
446    /// the next iteration.
447    pub fn spawn_sync_task(connector: Arc<Connector>) -> tokio::task::JoinHandle<()> {
448        let interval_secs = connector.config.effective_sync_interval_secs();
449        tokio::spawn(async move {
450            loop {
451                match connector.sync().await {
452                    Ok(count) => {
453                        log::debug!("Synced {} entities from {}", count, connector.config.name);
454                    }
455                    Err(e) => {
456                        log::error!("Sync failed for {}: {}", connector.config.name, e);
457                        // Clear the client on error to force reconnection
458                        *connector.client.write().await = None;
459                        connector.connected.store(false, Ordering::Relaxed);
460                    }
461                }
462                tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
463            }
464        })
465    }
466}
467
468/// Build a `read` request grid with filter `"*"` (all entities).
469fn build_read_all_grid() -> HGrid {
470    use haystack_core::data::HCol;
471    let mut row = HDict::new();
472    row.set("filter", Kind::Str("*".to_string()));
473    HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row])
474}
475
476impl ConnectorConfig {
477    /// Returns the WebSocket URL. If `ws_url` is set, returns it directly.
478    /// Otherwise derives from `url` by replacing `http://` with `ws://`
479    /// or `https://` with `wss://` and appending `/ws`.
480    pub fn effective_ws_url(&self) -> String {
481        if let Some(ref ws) = self.ws_url {
482            return ws.clone();
483        }
484        let ws = if self.url.starts_with("https://") {
485            self.url.replacen("https://", "wss://", 1)
486        } else {
487            self.url.replacen("http://", "ws://", 1)
488        };
489        format!("{ws}/ws")
490    }
491
492    /// Returns the sync interval in seconds. Defaults to 60 if not set.
493    pub fn effective_sync_interval_secs(&self) -> u64 {
494        self.sync_interval_secs.unwrap_or(60)
495    }
496}
497
498/// Prefix all Ref values in an entity dict.
499///
500/// Prefixes the `id` tag and any tag whose name ends with `Ref`
501/// (e.g. `siteRef`, `equipRef`, `floorRef`, `spaceRef`).
502pub fn prefix_refs(entity: &mut HDict, prefix: &str) {
503    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
504
505    for name in &tag_names {
506        let should_prefix = name == "id" || name.ends_with("Ref");
507        if !should_prefix {
508            continue;
509        }
510
511        if let Some(Kind::Ref(r)) = entity.get(name) {
512            let new_val = format!("{}{}", prefix, r.val);
513            let new_ref = HRef::new(new_val, r.dis.clone());
514            entity.set(name.as_str(), Kind::Ref(new_ref));
515        }
516    }
517}
518
519/// Strip a prefix from all Ref values in an entity dict.
520///
521/// The inverse of [`prefix_refs`]. Strips the given prefix from the `id` tag
522/// and any tag whose name ends with `Ref`, but only if the Ref value actually
523/// starts with the prefix. Preserves `dis` metadata on Refs.
524pub fn strip_prefix_refs(entity: &mut HDict, prefix: &str) {
525    let tag_names: Vec<String> = entity.tag_names().map(|s| s.to_string()).collect();
526
527    for name in &tag_names {
528        let should_strip = name == "id" || name.ends_with("Ref");
529        if !should_strip {
530            continue;
531        }
532
533        if let Some(Kind::Ref(r)) = entity.get(name)
534            && let Some(stripped) = r.val.strip_prefix(prefix)
535        {
536            let new_ref = HRef::new(stripped.to_string(), r.dis.clone());
537            entity.set(name.as_str(), Kind::Ref(new_ref));
538        }
539    }
540}
541
542#[cfg(test)]
543mod tests {
544    use super::*;
545    use haystack_core::kinds::HRef;
546
547    #[test]
548    fn connector_new_empty_cache() {
549        let config = ConnectorConfig {
550            name: "test".to_string(),
551            url: "http://localhost:8080/api".to_string(),
552            username: "user".to_string(),
553            password: "pass".to_string(),
554            id_prefix: None,
555            ws_url: None,
556            sync_interval_secs: None,
557            client_cert: None,
558            client_key: None,
559            ca_cert: None,
560        };
561        let connector = Connector::new(config);
562        assert_eq!(connector.entity_count(), 0);
563        assert!(connector.cached_entities().is_empty());
564    }
565
566    #[test]
567    fn connector_config_deserialization() {
568        let json = r#"{
569            "name": "Remote Server",
570            "url": "http://remote:8080/api",
571            "username": "admin",
572            "password": "secret",
573            "id_prefix": "r1-"
574        }"#;
575        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
576        assert_eq!(config.name, "Remote Server");
577        assert_eq!(config.url, "http://remote:8080/api");
578        assert_eq!(config.username, "admin");
579        assert_eq!(config.password, "secret");
580        assert_eq!(config.id_prefix, Some("r1-".to_string()));
581    }
582
583    #[test]
584    fn connector_config_deserialization_without_prefix() {
585        let json = r#"{
586            "name": "Remote",
587            "url": "http://remote:8080/api",
588            "username": "admin",
589            "password": "secret"
590        }"#;
591        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
592        assert_eq!(config.id_prefix, None);
593    }
594
595    #[test]
596    fn id_prefix_application() {
597        let mut entity = HDict::new();
598        entity.set("id", Kind::Ref(HRef::from_val("site-1")));
599        entity.set("dis", Kind::Str("Main Site".to_string()));
600        entity.set("site", Kind::Marker);
601        entity.set("siteRef", Kind::Ref(HRef::from_val("site-1")));
602        entity.set("equipRef", Kind::Ref(HRef::from_val("equip-1")));
603        entity.set(
604            "floorRef",
605            Kind::Ref(HRef::new("floor-1", Some("Floor 1".to_string()))),
606        );
607
608        prefix_refs(&mut entity, "r1-");
609
610        // id should be prefixed
611        match entity.get("id") {
612            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
613            other => panic!("expected Ref, got {other:?}"),
614        }
615
616        // siteRef should be prefixed
617        match entity.get("siteRef") {
618            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-site-1"),
619            other => panic!("expected Ref, got {other:?}"),
620        }
621
622        // equipRef should be prefixed
623        match entity.get("equipRef") {
624            Some(Kind::Ref(r)) => assert_eq!(r.val, "r1-equip-1"),
625            other => panic!("expected Ref, got {other:?}"),
626        }
627
628        // floorRef should be prefixed, preserving dis
629        match entity.get("floorRef") {
630            Some(Kind::Ref(r)) => {
631                assert_eq!(r.val, "r1-floor-1");
632                assert_eq!(r.dis, Some("Floor 1".to_string()));
633            }
634            other => panic!("expected Ref, got {other:?}"),
635        }
636
637        // Non-ref tags should not be changed
638        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
639        assert_eq!(entity.get("site"), Some(&Kind::Marker));
640    }
641
642    #[test]
643    fn id_prefix_skips_non_ref_values() {
644        let mut entity = HDict::new();
645        entity.set("id", Kind::Ref(HRef::from_val("point-1")));
646        // A tag ending in "Ref" but whose value is not actually a Ref
647        entity.set("customRef", Kind::Str("not-a-ref".to_string()));
648
649        prefix_refs(&mut entity, "p-");
650
651        // id should be prefixed
652        match entity.get("id") {
653            Some(Kind::Ref(r)) => assert_eq!(r.val, "p-point-1"),
654            other => panic!("expected Ref, got {other:?}"),
655        }
656
657        // customRef is a Str, not a Ref, so it should be unchanged
658        assert_eq!(
659            entity.get("customRef"),
660            Some(&Kind::Str("not-a-ref".to_string()))
661        );
662    }
663
664    #[test]
665    fn connector_config_deserialization_full() {
666        let json = r#"{
667            "name": "Full Config",
668            "url": "https://remote:8443/api",
669            "username": "admin",
670            "password": "secret",
671            "id_prefix": "r1-",
672            "ws_url": "wss://remote:8443/api/ws",
673            "sync_interval_secs": 30,
674            "client_cert": "/etc/certs/client.pem",
675            "client_key": "/etc/certs/client-key.pem",
676            "ca_cert": "/etc/certs/ca.pem"
677        }"#;
678        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
679        assert_eq!(config.ws_url, Some("wss://remote:8443/api/ws".to_string()));
680        assert_eq!(config.sync_interval_secs, Some(30));
681        assert_eq!(
682            config.client_cert,
683            Some("/etc/certs/client.pem".to_string())
684        );
685        assert_eq!(
686            config.client_key,
687            Some("/etc/certs/client-key.pem".to_string())
688        );
689        assert_eq!(config.ca_cert, Some("/etc/certs/ca.pem".to_string()));
690    }
691
692    #[test]
693    fn strip_prefix_refs_reverses_prefix() {
694        let mut entity = HDict::new();
695        entity.set("id", Kind::Ref(HRef::from_val("r1-site-1")));
696        entity.set("dis", Kind::Str("Main Site".to_string()));
697        entity.set("site", Kind::Marker);
698        entity.set("siteRef", Kind::Ref(HRef::from_val("r1-site-1")));
699        entity.set("equipRef", Kind::Ref(HRef::from_val("r1-equip-1")));
700
701        strip_prefix_refs(&mut entity, "r1-");
702
703        match entity.get("id") {
704            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
705            other => panic!("expected Ref, got {other:?}"),
706        }
707        match entity.get("siteRef") {
708            Some(Kind::Ref(r)) => assert_eq!(r.val, "site-1"),
709            other => panic!("expected Ref, got {other:?}"),
710        }
711        match entity.get("equipRef") {
712            Some(Kind::Ref(r)) => assert_eq!(r.val, "equip-1"),
713            other => panic!("expected Ref, got {other:?}"),
714        }
715        assert_eq!(entity.get("dis"), Some(&Kind::Str("Main Site".to_string())));
716    }
717
718    #[test]
719    fn strip_prefix_refs_ignores_non_matching() {
720        let mut entity = HDict::new();
721        entity.set("id", Kind::Ref(HRef::from_val("other-site-1")));
722
723        strip_prefix_refs(&mut entity, "r1-");
724
725        match entity.get("id") {
726            Some(Kind::Ref(r)) => assert_eq!(r.val, "other-site-1"),
727            other => panic!("expected Ref, got {other:?}"),
728        }
729    }
730
731    #[test]
732    fn derive_ws_url_from_http() {
733        let config = ConnectorConfig {
734            name: "test".to_string(),
735            url: "http://remote:8080/api".to_string(),
736            username: "u".to_string(),
737            password: "p".to_string(),
738            id_prefix: None,
739            ws_url: None,
740            sync_interval_secs: None,
741            client_cert: None,
742            client_key: None,
743            ca_cert: None,
744        };
745        assert_eq!(config.effective_ws_url(), "ws://remote:8080/api/ws");
746    }
747
748    #[test]
749    fn derive_ws_url_from_https() {
750        let config = ConnectorConfig {
751            name: "test".to_string(),
752            url: "https://remote:8443/api".to_string(),
753            username: "u".to_string(),
754            password: "p".to_string(),
755            id_prefix: None,
756            ws_url: None,
757            sync_interval_secs: None,
758            client_cert: None,
759            client_key: None,
760            ca_cert: None,
761        };
762        assert_eq!(config.effective_ws_url(), "wss://remote:8443/api/ws");
763    }
764
765    #[test]
766    fn explicit_ws_url_overrides_derived() {
767        let config = ConnectorConfig {
768            name: "test".to_string(),
769            url: "http://remote:8080/api".to_string(),
770            username: "u".to_string(),
771            password: "p".to_string(),
772            id_prefix: None,
773            ws_url: Some("ws://custom:9999/ws".to_string()),
774            sync_interval_secs: None,
775            client_cert: None,
776            client_key: None,
777            ca_cert: None,
778        };
779        assert_eq!(config.effective_ws_url(), "ws://custom:9999/ws");
780    }
781
782    #[test]
783    fn connector_tracks_entity_ids_in_ownership() {
784        let config = ConnectorConfig {
785            name: "test".to_string(),
786            url: "http://localhost:8080/api".to_string(),
787            username: "user".to_string(),
788            password: "pass".to_string(),
789            id_prefix: Some("t-".to_string()),
790            ws_url: None,
791            sync_interval_secs: None,
792            client_cert: None,
793            client_key: None,
794            ca_cert: None,
795        };
796        let connector = Connector::new(config);
797        assert!(!connector.owns("t-site-1"));
798
799        // Simulate populating cache
800        {
801            let mut entity = HDict::new();
802            entity.set("id", Kind::Ref(HRef::from_val("t-site-1")));
803            connector.update_cache(vec![entity]);
804        }
805
806        assert!(connector.owns("t-site-1"));
807        assert!(!connector.owns("other-1"));
808    }
809
810    #[test]
811    fn connector_new_defaults_transport_and_connected() {
812        let config = ConnectorConfig {
813            name: "test".to_string(),
814            url: "http://localhost:8080/api".to_string(),
815            username: "user".to_string(),
816            password: "pass".to_string(),
817            id_prefix: None,
818            ws_url: None,
819            sync_interval_secs: None,
820            client_cert: None,
821            client_key: None,
822            ca_cert: None,
823        };
824        let connector = Connector::new(config);
825        assert_eq!(connector.transport_mode(), TransportMode::Http);
826        assert!(!connector.is_connected());
827        assert!(connector.last_sync_time().is_none());
828    }
829
830    #[test]
831    fn connector_config_new_fields_default_to_none() {
832        let json = r#"{
833            "name": "Minimal",
834            "url": "http://remote:8080/api",
835            "username": "user",
836            "password": "pass"
837        }"#;
838        let config: ConnectorConfig = serde_json::from_str(json).unwrap();
839        assert_eq!(config.ws_url, None);
840        assert_eq!(config.sync_interval_secs, None);
841        assert_eq!(config.client_cert, None);
842        assert_eq!(config.client_key, None);
843        assert_eq!(config.ca_cert, None);
844    }
845
846    #[test]
847    fn remote_watch_add_and_remove() {
848        let config = ConnectorConfig {
849            name: "test".to_string(),
850            url: "http://localhost:8080/api".to_string(),
851            username: "user".to_string(),
852            password: "pass".to_string(),
853            id_prefix: Some("r-".to_string()),
854            ws_url: None,
855            sync_interval_secs: None,
856            client_cert: None,
857            client_key: None,
858            ca_cert: None,
859        };
860        let connector = Connector::new(config);
861        assert_eq!(connector.remote_watch_count(), 0);
862
863        connector.add_remote_watch("r-site-1");
864        assert_eq!(connector.remote_watch_count(), 1);
865
866        connector.add_remote_watch("r-equip-2");
867        assert_eq!(connector.remote_watch_count(), 2);
868
869        // Duplicate add is idempotent (HashSet).
870        connector.add_remote_watch("r-site-1");
871        assert_eq!(connector.remote_watch_count(), 2);
872
873        connector.remove_remote_watch("r-site-1");
874        assert_eq!(connector.remote_watch_count(), 1);
875
876        // Removing a non-existent ID is a no-op.
877        connector.remove_remote_watch("r-nonexistent");
878        assert_eq!(connector.remote_watch_count(), 1);
879
880        connector.remove_remote_watch("r-equip-2");
881        assert_eq!(connector.remote_watch_count(), 0);
882    }
883}