Skip to main content

haystack_server/
federation.rs

1//! Federation manager — coordinates multiple remote connectors for federated queries.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::connector::{Connector, ConnectorConfig};
7use haystack_core::data::HDict;
8
9/// TOML file structure for federation configuration.
10///
11/// Example:
12/// ```toml
13/// [connectors.building-a]
14/// name = "Building A"
15/// url = "http://building-a:8080/api"
16/// username = "federation"
17/// password = "s3cret"
18/// ```
19#[derive(serde::Deserialize)]
20struct FederationToml {
21    connectors: HashMap<String, ConnectorConfig>,
22}
23
24/// Manages multiple remote connectors for federated queries.
25pub struct Federation {
26    pub connectors: Vec<Arc<Connector>>,
27}
28
29impl Federation {
30    /// Create a new federation with no connectors.
31    pub fn new() -> Self {
32        Self {
33            connectors: Vec::new(),
34        }
35    }
36
37    /// Add a connector for a remote Haystack server.
38    pub fn add(&mut self, config: ConnectorConfig) {
39        self.connectors.push(Arc::new(Connector::new(config)));
40    }
41
42    /// Sync a single connector by name, returning the entity count on success.
43    pub async fn sync_one(&self, name: &str) -> Result<usize, String> {
44        for connector in &self.connectors {
45            if connector.config.name == name {
46                return connector.sync().await;
47            }
48        }
49        Err(format!("connector not found: {name}"))
50    }
51
52    /// Sync all connectors, returning a vec of (name, result) pairs.
53    ///
54    /// Each result is either `Ok(count)` with the number of entities synced,
55    /// or `Err(message)` with the error description.
56    pub async fn sync_all(&self) -> Vec<(String, Result<usize, String>)> {
57        let mut results = Vec::new();
58        for connector in &self.connectors {
59            let name = connector.config.name.clone();
60            let result = connector.sync().await;
61            results.push((name, result));
62        }
63        results
64    }
65
66    /// Returns all cached entities from all connectors, merged into a single vec.
67    pub fn all_cached_entities(&self) -> Vec<HDict> {
68        let mut all = Vec::new();
69        for connector in &self.connectors {
70            all.extend(connector.cached_entities());
71        }
72        all
73    }
74
75    /// Returns the number of connectors.
76    pub fn connector_count(&self) -> usize {
77        self.connectors.len()
78    }
79
80    /// Returns the connector that owns the entity with the given ID, if any.
81    pub fn owner_of(&self, id: &str) -> Option<&Arc<Connector>> {
82        self.connectors.iter().find(|c| c.owns(id))
83    }
84
85    /// Returns `(name, entity_count)` for each connector.
86    pub fn status(&self) -> Vec<(String, usize)> {
87        self.connectors
88            .iter()
89            .map(|c| (c.config.name.clone(), c.entity_count()))
90            .collect()
91    }
92
93    /// Parse a TOML string into a `Federation`, adding each connector defined
94    /// under `[connectors.<key>]`.
95    pub fn from_toml_str(toml_str: &str) -> Result<Self, String> {
96        let parsed: FederationToml =
97            toml::from_str(toml_str).map_err(|e| format!("invalid federation TOML: {e}"))?;
98        let mut fed = Self::new();
99        for (_key, config) in parsed.connectors {
100            fed.add(config);
101        }
102        Ok(fed)
103    }
104
105    /// Read a TOML file from disk and parse it into a `Federation`.
106    pub fn from_toml_file(path: &str) -> Result<Self, String> {
107        let contents =
108            std::fs::read_to_string(path).map_err(|e| format!("failed to read {path}: {e}"))?;
109        Self::from_toml_str(&contents)
110    }
111
112    /// Start background sync tasks for all connectors.
113    ///
114    /// Each connector gets its own tokio task that loops at its configured
115    /// sync interval, reconnecting automatically on failure.
116    /// Returns the join handles (they run until the server shuts down).
117    pub fn start_background_sync(&self) -> Vec<tokio::task::JoinHandle<()>> {
118        self.connectors
119            .iter()
120            .map(|c| Connector::spawn_sync_task(Arc::clone(c)))
121            .collect()
122    }
123}
124
125impl Default for Federation {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use haystack_core::kinds::{HRef, Kind};
135
136    #[test]
137    fn federation_new_empty() {
138        let fed = Federation::new();
139        assert_eq!(fed.connector_count(), 0);
140        assert!(fed.all_cached_entities().is_empty());
141        assert!(fed.status().is_empty());
142    }
143
144    #[test]
145    fn federation_add_connector() {
146        let mut fed = Federation::new();
147        assert_eq!(fed.connector_count(), 0);
148
149        fed.add(ConnectorConfig {
150            name: "server-1".to_string(),
151            url: "http://localhost:8080/api".to_string(),
152            username: "user".to_string(),
153            password: "pass".to_string(),
154            id_prefix: None,
155            ws_url: None,
156            sync_interval_secs: None,
157            client_cert: None,
158            client_key: None,
159            ca_cert: None,
160        });
161        assert_eq!(fed.connector_count(), 1);
162
163        fed.add(ConnectorConfig {
164            name: "server-2".to_string(),
165            url: "http://localhost:8081/api".to_string(),
166            username: "user".to_string(),
167            password: "pass".to_string(),
168            id_prefix: Some("s2-".to_string()),
169            ws_url: None,
170            sync_interval_secs: None,
171            client_cert: None,
172            client_key: None,
173            ca_cert: None,
174        });
175        assert_eq!(fed.connector_count(), 2);
176    }
177
178    #[test]
179    fn federation_status_empty() {
180        let fed = Federation::new();
181        let status = fed.status();
182        assert!(status.is_empty());
183    }
184
185    #[test]
186    fn federation_status_with_connectors() {
187        let mut fed = Federation::new();
188        fed.add(ConnectorConfig {
189            name: "alpha".to_string(),
190            url: "http://alpha:8080/api".to_string(),
191            username: "user".to_string(),
192            password: "pass".to_string(),
193            id_prefix: None,
194            ws_url: None,
195            sync_interval_secs: None,
196            client_cert: None,
197            client_key: None,
198            ca_cert: None,
199        });
200        fed.add(ConnectorConfig {
201            name: "beta".to_string(),
202            url: "http://beta:8080/api".to_string(),
203            username: "user".to_string(),
204            password: "pass".to_string(),
205            id_prefix: Some("b-".to_string()),
206            ws_url: None,
207            sync_interval_secs: None,
208            client_cert: None,
209            client_key: None,
210            ca_cert: None,
211        });
212
213        let status = fed.status();
214        assert_eq!(status.len(), 2);
215        assert_eq!(status[0].0, "alpha");
216        assert_eq!(status[0].1, 0); // no sync yet
217        assert_eq!(status[1].0, "beta");
218        assert_eq!(status[1].1, 0);
219    }
220
221    #[test]
222    fn federation_owner_of_returns_correct_connector() {
223        let mut fed = Federation::new();
224        fed.add(ConnectorConfig {
225            name: "alpha".to_string(),
226            url: "http://alpha:8080/api".to_string(),
227            username: "user".to_string(),
228            password: "pass".to_string(),
229            id_prefix: Some("a-".to_string()),
230            ws_url: None,
231            sync_interval_secs: None,
232            client_cert: None,
233            client_key: None,
234            ca_cert: None,
235        });
236
237        // Simulate cache population for alpha
238        fed.connectors[0].update_cache(vec![{
239            let mut d = HDict::new();
240            d.set("id", Kind::Ref(HRef::from_val("a-site-1")));
241            d
242        }]);
243
244        assert!(fed.owner_of("a-site-1").is_some());
245        assert_eq!(fed.owner_of("a-site-1").unwrap().config.name, "alpha");
246        assert!(fed.owner_of("unknown-1").is_none());
247    }
248
249    #[test]
250    fn federation_from_toml_str() {
251        let toml = r#"
252[connectors.building-a]
253name = "Building A"
254url = "http://building-a:8080/api"
255username = "federation"
256password = "s3cret"
257id_prefix = "bldg-a-"
258sync_interval_secs = 30
259
260[connectors.building-b]
261name = "Building B"
262url = "https://building-b:8443/api"
263username = "federation"
264password = "s3cret"
265id_prefix = "bldg-b-"
266client_cert = "/etc/certs/federation.pem"
267client_key = "/etc/certs/federation-key.pem"
268ca_cert = "/etc/certs/ca.pem"
269"#;
270        let fed = Federation::from_toml_str(toml).unwrap();
271        assert_eq!(fed.connector_count(), 2);
272        let status = fed.status();
273        let names: Vec<&str> = status.iter().map(|(n, _)| n.as_str()).collect();
274        assert!(names.contains(&"Building A"));
275        assert!(names.contains(&"Building B"));
276    }
277
278    #[test]
279    fn federation_from_toml_str_empty() {
280        let toml = "[connectors]\n";
281        let fed = Federation::from_toml_str(toml).unwrap();
282        assert_eq!(fed.connector_count(), 0);
283    }
284
285    #[test]
286    fn federation_from_toml_str_invalid() {
287        let toml = "not valid toml {{{}";
288        assert!(Federation::from_toml_str(toml).is_err());
289    }
290
291    #[test]
292    fn federation_all_cached_entities_empty() {
293        let mut fed = Federation::new();
294        fed.add(ConnectorConfig {
295            name: "server".to_string(),
296            url: "http://localhost:8080/api".to_string(),
297            username: "user".to_string(),
298            password: "pass".to_string(),
299            id_prefix: None,
300            ws_url: None,
301            sync_interval_secs: None,
302            client_cert: None,
303            client_key: None,
304            ca_cert: None,
305        });
306        // No sync performed, so entities are empty.
307        assert!(fed.all_cached_entities().is_empty());
308    }
309}