Skip to main content

haystack_server/
federation.rs

1//! Federation manager — coordinates multiple remote connectors for federated queries.
2
3use crate::connector::{Connector, ConnectorConfig};
4use haystack_core::data::HDict;
5
6/// Manages multiple remote connectors for federated queries.
7pub struct Federation {
8    connectors: Vec<Connector>,
9}
10
11impl Federation {
12    /// Create a new federation with no connectors.
13    pub fn new() -> Self {
14        Self {
15            connectors: Vec::new(),
16        }
17    }
18
19    /// Add a connector for a remote Haystack server.
20    pub fn add(&mut self, config: ConnectorConfig) {
21        self.connectors.push(Connector::new(config));
22    }
23
24    /// Sync all connectors, returning a vec of (name, result) pairs.
25    ///
26    /// Each result is either `Ok(count)` with the number of entities synced,
27    /// or `Err(message)` with the error description.
28    pub async fn sync_all(&self) -> Vec<(String, Result<usize, String>)> {
29        let mut results = Vec::new();
30        for connector in &self.connectors {
31            let name = connector.config.name.clone();
32            let result = connector.sync().await;
33            results.push((name, result));
34        }
35        results
36    }
37
38    /// Returns all cached entities from all connectors, merged into a single vec.
39    pub fn all_cached_entities(&self) -> Vec<HDict> {
40        let mut all = Vec::new();
41        for connector in &self.connectors {
42            all.extend(connector.cached_entities());
43        }
44        all
45    }
46
47    /// Returns the number of connectors.
48    pub fn connector_count(&self) -> usize {
49        self.connectors.len()
50    }
51
52    /// Returns `(name, entity_count)` for each connector.
53    pub fn status(&self) -> Vec<(String, usize)> {
54        self.connectors
55            .iter()
56            .map(|c| (c.config.name.clone(), c.entity_count()))
57            .collect()
58    }
59}
60
61impl Default for Federation {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70
71    #[test]
72    fn federation_new_empty() {
73        let fed = Federation::new();
74        assert_eq!(fed.connector_count(), 0);
75        assert!(fed.all_cached_entities().is_empty());
76        assert!(fed.status().is_empty());
77    }
78
79    #[test]
80    fn federation_add_connector() {
81        let mut fed = Federation::new();
82        assert_eq!(fed.connector_count(), 0);
83
84        fed.add(ConnectorConfig {
85            name: "server-1".to_string(),
86            url: "http://localhost:8080/api".to_string(),
87            username: "user".to_string(),
88            password: "pass".to_string(),
89            id_prefix: None,
90        });
91        assert_eq!(fed.connector_count(), 1);
92
93        fed.add(ConnectorConfig {
94            name: "server-2".to_string(),
95            url: "http://localhost:8081/api".to_string(),
96            username: "user".to_string(),
97            password: "pass".to_string(),
98            id_prefix: Some("s2-".to_string()),
99        });
100        assert_eq!(fed.connector_count(), 2);
101    }
102
103    #[test]
104    fn federation_status_empty() {
105        let fed = Federation::new();
106        let status = fed.status();
107        assert!(status.is_empty());
108    }
109
110    #[test]
111    fn federation_status_with_connectors() {
112        let mut fed = Federation::new();
113        fed.add(ConnectorConfig {
114            name: "alpha".to_string(),
115            url: "http://alpha:8080/api".to_string(),
116            username: "user".to_string(),
117            password: "pass".to_string(),
118            id_prefix: None,
119        });
120        fed.add(ConnectorConfig {
121            name: "beta".to_string(),
122            url: "http://beta:8080/api".to_string(),
123            username: "user".to_string(),
124            password: "pass".to_string(),
125            id_prefix: Some("b-".to_string()),
126        });
127
128        let status = fed.status();
129        assert_eq!(status.len(), 2);
130        assert_eq!(status[0].0, "alpha");
131        assert_eq!(status[0].1, 0); // no sync yet
132        assert_eq!(status[1].0, "beta");
133        assert_eq!(status[1].1, 0);
134    }
135
136    #[test]
137    fn federation_all_cached_entities_empty() {
138        let mut fed = Federation::new();
139        fed.add(ConnectorConfig {
140            name: "server".to_string(),
141            url: "http://localhost:8080/api".to_string(),
142            username: "user".to_string(),
143            password: "pass".to_string(),
144            id_prefix: None,
145        });
146        // No sync performed, so entities are empty.
147        assert!(fed.all_cached_entities().is_empty());
148    }
149}