Skip to main content

haystack_server/
federation.rs

1//! Federation manager — coordinates multiple remote connectors for federated queries.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::connector::{Connector, ConnectorConfig};
7use haystack_core::data::HDict;
8
9/// TOML file structure for federation configuration.
10///
11/// Example:
12/// ```toml
13/// [connectors.building-a]
14/// name = "Building A"
15/// url = "http://building-a:8080/api"
16/// username = "federation"
17/// password = "s3cret"
18/// ```
19#[derive(serde::Deserialize)]
20struct FederationToml {
21    connectors: HashMap<String, ConnectorConfig>,
22}
23
24/// Manages multiple remote connectors for federated queries.
25pub struct Federation {
26    pub connectors: Vec<Arc<Connector>>,
27}
28
29impl Federation {
30    /// Create a new federation with no connectors.
31    pub fn new() -> Self {
32        Self {
33            connectors: Vec::new(),
34        }
35    }
36
37    /// Add a connector for a remote Haystack server.
38    pub fn add(&mut self, config: ConnectorConfig) {
39        self.connectors.push(Arc::new(Connector::new(config)));
40    }
41
42    /// Sync a single connector by name, returning the entity count on success.
43    pub async fn sync_one(&self, name: &str) -> Result<usize, String> {
44        for connector in &self.connectors {
45            if connector.config.name == name {
46                return connector.sync().await;
47            }
48        }
49        Err(format!("connector not found: {name}"))
50    }
51
52    /// Sync all connectors, returning a vec of (name, result) pairs.
53    ///
54    /// Each result is either `Ok(count)` with the number of entities synced,
55    /// or `Err(message)` with the error description.
56    pub async fn sync_all(&self) -> Vec<(String, Result<usize, String>)> {
57        let mut results = Vec::new();
58        for connector in &self.connectors {
59            let name = connector.config.name.clone();
60            let result = connector.sync().await;
61            results.push((name, result));
62        }
63        results
64    }
65
66    /// Returns all cached entities from all connectors, merged into a single vec.
67    pub fn all_cached_entities(&self) -> Vec<HDict> {
68        let mut all = Vec::new();
69        for connector in &self.connectors {
70            all.extend(connector.cached_entities());
71        }
72        all
73    }
74
75    /// Filter cached entities across all connectors using bitmap-accelerated queries.
76    ///
77    /// Each connector uses its own bitmap tag index for fast filtering, then
78    /// results are merged up to the given limit. Much faster than linear scan
79    /// for tag-based queries over large federated entity sets.
80    pub fn filter_cached_entities(
81        &self,
82        filter_expr: &str,
83        limit: usize,
84    ) -> Result<Vec<HDict>, String> {
85        let effective_limit = if limit == 0 { usize::MAX } else { limit };
86        let mut all = Vec::new();
87        for connector in &self.connectors {
88            if all.len() >= effective_limit {
89                break;
90            }
91            let remaining = effective_limit - all.len();
92            let results = connector.filter_cached(filter_expr, remaining)?;
93            all.extend(results);
94        }
95        Ok(all)
96    }
97
98    /// Returns the number of connectors.
99    pub fn connector_count(&self) -> usize {
100        self.connectors.len()
101    }
102
103    /// Returns the connector that owns the entity with the given ID, if any.
104    pub fn owner_of(&self, id: &str) -> Option<&Arc<Connector>> {
105        self.connectors.iter().find(|c| c.owns(id))
106    }
107
108    /// Batch read entities by ID across federated connectors.
109    ///
110    /// Groups IDs by owning connector and fetches each group in a single
111    /// indexed lookup (O(1) per ID via `cache_id_map`), avoiding repeated
112    /// linear scans. Returns `(found_entities, missing_ids)`.
113    pub fn batch_read_by_id<'a>(
114        &self,
115        ids: impl IntoIterator<Item = &'a str>,
116    ) -> (Vec<HDict>, Vec<String>) {
117        // Group IDs by connector index.
118        let mut groups: HashMap<usize, Vec<&str>> = HashMap::new();
119        let mut not_owned: Vec<String> = Vec::new();
120
121        for id in ids {
122            let mut found = false;
123            for (idx, connector) in self.connectors.iter().enumerate() {
124                if connector.owns(id) {
125                    groups.entry(idx).or_default().push(id);
126                    found = true;
127                    break;
128                }
129            }
130            if !found {
131                not_owned.push(id.to_string());
132            }
133        }
134
135        // Fetch each group from its connector in a single pass.
136        let mut all_found = Vec::new();
137        for (idx, ids) in &groups {
138            let (found, mut missing) = self.connectors[*idx].batch_get_cached(ids);
139            all_found.extend(found);
140            not_owned.append(&mut missing);
141        }
142
143        (all_found, not_owned)
144    }
145
146    /// Returns `(name, entity_count)` for each connector.
147    pub fn status(&self) -> Vec<(String, usize)> {
148        self.connectors
149            .iter()
150            .map(|c| (c.config.name.clone(), c.entity_count()))
151            .collect()
152    }
153
154    /// Parse a TOML string into a `Federation`, adding each connector defined
155    /// under `[connectors.<key>]`.
156    pub fn from_toml_str(toml_str: &str) -> Result<Self, String> {
157        let parsed: FederationToml =
158            toml::from_str(toml_str).map_err(|e| format!("invalid federation TOML: {e}"))?;
159        let mut fed = Self::new();
160        for (_key, config) in parsed.connectors {
161            fed.add(config);
162        }
163        Ok(fed)
164    }
165
166    /// Read a TOML file from disk and parse it into a `Federation`.
167    pub fn from_toml_file(path: &str) -> Result<Self, String> {
168        let contents =
169            std::fs::read_to_string(path).map_err(|e| format!("failed to read {path}: {e}"))?;
170        Self::from_toml_str(&contents)
171    }
172
173    /// Start background sync tasks for all connectors.
174    ///
175    /// Each connector gets its own tokio task that loops at its configured
176    /// sync interval, reconnecting automatically on failure.
177    /// Returns the join handles (they run until the server shuts down).
178    pub fn start_background_sync(&self) -> Vec<tokio::task::JoinHandle<()>> {
179        self.connectors
180            .iter()
181            .map(|c| Connector::spawn_sync_task(Arc::clone(c)))
182            .collect()
183    }
184}
185
186impl Default for Federation {
187    fn default() -> Self {
188        Self::new()
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use haystack_core::kinds::{HRef, Kind};
196
197    #[test]
198    fn federation_new_empty() {
199        let fed = Federation::new();
200        assert_eq!(fed.connector_count(), 0);
201        assert!(fed.all_cached_entities().is_empty());
202        assert!(fed.status().is_empty());
203    }
204
205    #[test]
206    fn federation_add_connector() {
207        let mut fed = Federation::new();
208        assert_eq!(fed.connector_count(), 0);
209
210        fed.add(ConnectorConfig {
211            name: "server-1".to_string(),
212            url: "http://localhost:8080/api".to_string(),
213            username: "user".to_string(),
214            password: "pass".to_string(),
215            id_prefix: None,
216            ws_url: None,
217            sync_interval_secs: None,
218            client_cert: None,
219            client_key: None,
220            ca_cert: None,
221        });
222        assert_eq!(fed.connector_count(), 1);
223
224        fed.add(ConnectorConfig {
225            name: "server-2".to_string(),
226            url: "http://localhost:8081/api".to_string(),
227            username: "user".to_string(),
228            password: "pass".to_string(),
229            id_prefix: Some("s2-".to_string()),
230            ws_url: None,
231            sync_interval_secs: None,
232            client_cert: None,
233            client_key: None,
234            ca_cert: None,
235        });
236        assert_eq!(fed.connector_count(), 2);
237    }
238
239    #[test]
240    fn federation_status_empty() {
241        let fed = Federation::new();
242        let status = fed.status();
243        assert!(status.is_empty());
244    }
245
246    #[test]
247    fn federation_status_with_connectors() {
248        let mut fed = Federation::new();
249        fed.add(ConnectorConfig {
250            name: "alpha".to_string(),
251            url: "http://alpha:8080/api".to_string(),
252            username: "user".to_string(),
253            password: "pass".to_string(),
254            id_prefix: None,
255            ws_url: None,
256            sync_interval_secs: None,
257            client_cert: None,
258            client_key: None,
259            ca_cert: None,
260        });
261        fed.add(ConnectorConfig {
262            name: "beta".to_string(),
263            url: "http://beta:8080/api".to_string(),
264            username: "user".to_string(),
265            password: "pass".to_string(),
266            id_prefix: Some("b-".to_string()),
267            ws_url: None,
268            sync_interval_secs: None,
269            client_cert: None,
270            client_key: None,
271            ca_cert: None,
272        });
273
274        let status = fed.status();
275        assert_eq!(status.len(), 2);
276        assert_eq!(status[0].0, "alpha");
277        assert_eq!(status[0].1, 0); // no sync yet
278        assert_eq!(status[1].0, "beta");
279        assert_eq!(status[1].1, 0);
280    }
281
282    #[test]
283    fn federation_owner_of_returns_correct_connector() {
284        let mut fed = Federation::new();
285        fed.add(ConnectorConfig {
286            name: "alpha".to_string(),
287            url: "http://alpha:8080/api".to_string(),
288            username: "user".to_string(),
289            password: "pass".to_string(),
290            id_prefix: Some("a-".to_string()),
291            ws_url: None,
292            sync_interval_secs: None,
293            client_cert: None,
294            client_key: None,
295            ca_cert: None,
296        });
297
298        // Simulate cache population for alpha
299        fed.connectors[0].update_cache(vec![{
300            let mut d = HDict::new();
301            d.set("id", Kind::Ref(HRef::from_val("a-site-1")));
302            d
303        }]);
304
305        assert!(fed.owner_of("a-site-1").is_some());
306        assert_eq!(fed.owner_of("a-site-1").unwrap().config.name, "alpha");
307        assert!(fed.owner_of("unknown-1").is_none());
308    }
309
310    #[test]
311    fn federation_from_toml_str() {
312        let toml = r#"
313[connectors.building-a]
314name = "Building A"
315url = "http://building-a:8080/api"
316username = "federation"
317password = "s3cret"
318id_prefix = "bldg-a-"
319sync_interval_secs = 30
320
321[connectors.building-b]
322name = "Building B"
323url = "https://building-b:8443/api"
324username = "federation"
325password = "s3cret"
326id_prefix = "bldg-b-"
327client_cert = "/etc/certs/federation.pem"
328client_key = "/etc/certs/federation-key.pem"
329ca_cert = "/etc/certs/ca.pem"
330"#;
331        let fed = Federation::from_toml_str(toml).unwrap();
332        assert_eq!(fed.connector_count(), 2);
333        let status = fed.status();
334        let names: Vec<&str> = status.iter().map(|(n, _)| n.as_str()).collect();
335        assert!(names.contains(&"Building A"));
336        assert!(names.contains(&"Building B"));
337    }
338
339    #[test]
340    fn federation_from_toml_str_empty() {
341        let toml = "[connectors]\n";
342        let fed = Federation::from_toml_str(toml).unwrap();
343        assert_eq!(fed.connector_count(), 0);
344    }
345
346    #[test]
347    fn federation_from_toml_str_invalid() {
348        let toml = "not valid toml {{{}";
349        assert!(Federation::from_toml_str(toml).is_err());
350    }
351
352    #[test]
353    fn federation_all_cached_entities_empty() {
354        let mut fed = Federation::new();
355        fed.add(ConnectorConfig {
356            name: "server".to_string(),
357            url: "http://localhost:8080/api".to_string(),
358            username: "user".to_string(),
359            password: "pass".to_string(),
360            id_prefix: None,
361            ws_url: None,
362            sync_interval_secs: None,
363            client_cert: None,
364            client_key: None,
365            ca_cert: None,
366        });
367        // No sync performed, so entities are empty.
368        assert!(fed.all_cached_entities().is_empty());
369    }
370}