1use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::connector::{Connector, ConnectorConfig};
7use haystack_core::data::HDict;
8
9#[derive(serde::Deserialize)]
20struct FederationToml {
21 connectors: HashMap<String, ConnectorConfig>,
22}
23
24pub struct Federation {
26 pub connectors: Vec<Arc<Connector>>,
27}
28
29impl Federation {
30 pub fn new() -> Self {
32 Self {
33 connectors: Vec::new(),
34 }
35 }
36
37 pub fn add(&mut self, config: ConnectorConfig) {
39 self.connectors.push(Arc::new(Connector::new(config)));
40 }
41
42 pub async fn sync_one(&self, name: &str) -> Result<usize, String> {
44 for connector in &self.connectors {
45 if connector.config.name == name {
46 return connector.sync().await;
47 }
48 }
49 Err(format!("connector not found: {name}"))
50 }
51
52 pub async fn sync_all(&self) -> Vec<(String, Result<usize, String>)> {
57 let mut results = Vec::new();
58 for connector in &self.connectors {
59 let name = connector.config.name.clone();
60 let result = connector.sync().await;
61 results.push((name, result));
62 }
63 results
64 }
65
66 pub fn all_cached_entities(&self) -> Vec<HDict> {
68 let mut all = Vec::new();
69 for connector in &self.connectors {
70 all.extend(connector.cached_entities());
71 }
72 all
73 }
74
75 pub fn connector_count(&self) -> usize {
77 self.connectors.len()
78 }
79
80 pub fn owner_of(&self, id: &str) -> Option<&Arc<Connector>> {
82 self.connectors.iter().find(|c| c.owns(id))
83 }
84
85 pub fn status(&self) -> Vec<(String, usize)> {
87 self.connectors
88 .iter()
89 .map(|c| (c.config.name.clone(), c.entity_count()))
90 .collect()
91 }
92
93 pub fn from_toml_str(toml_str: &str) -> Result<Self, String> {
96 let parsed: FederationToml =
97 toml::from_str(toml_str).map_err(|e| format!("invalid federation TOML: {e}"))?;
98 let mut fed = Self::new();
99 for (_key, config) in parsed.connectors {
100 fed.add(config);
101 }
102 Ok(fed)
103 }
104
105 pub fn from_toml_file(path: &str) -> Result<Self, String> {
107 let contents =
108 std::fs::read_to_string(path).map_err(|e| format!("failed to read {path}: {e}"))?;
109 Self::from_toml_str(&contents)
110 }
111
112 pub fn start_background_sync(&self) -> Vec<tokio::task::JoinHandle<()>> {
118 self.connectors
119 .iter()
120 .map(|c| Connector::spawn_sync_task(Arc::clone(c)))
121 .collect()
122 }
123}
124
125impl Default for Federation {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use haystack_core::kinds::{HRef, Kind};
135
136 #[test]
137 fn federation_new_empty() {
138 let fed = Federation::new();
139 assert_eq!(fed.connector_count(), 0);
140 assert!(fed.all_cached_entities().is_empty());
141 assert!(fed.status().is_empty());
142 }
143
144 #[test]
145 fn federation_add_connector() {
146 let mut fed = Federation::new();
147 assert_eq!(fed.connector_count(), 0);
148
149 fed.add(ConnectorConfig {
150 name: "server-1".to_string(),
151 url: "http://localhost:8080/api".to_string(),
152 username: "user".to_string(),
153 password: "pass".to_string(),
154 id_prefix: None,
155 ws_url: None,
156 sync_interval_secs: None,
157 client_cert: None,
158 client_key: None,
159 ca_cert: None,
160 });
161 assert_eq!(fed.connector_count(), 1);
162
163 fed.add(ConnectorConfig {
164 name: "server-2".to_string(),
165 url: "http://localhost:8081/api".to_string(),
166 username: "user".to_string(),
167 password: "pass".to_string(),
168 id_prefix: Some("s2-".to_string()),
169 ws_url: None,
170 sync_interval_secs: None,
171 client_cert: None,
172 client_key: None,
173 ca_cert: None,
174 });
175 assert_eq!(fed.connector_count(), 2);
176 }
177
178 #[test]
179 fn federation_status_empty() {
180 let fed = Federation::new();
181 let status = fed.status();
182 assert!(status.is_empty());
183 }
184
185 #[test]
186 fn federation_status_with_connectors() {
187 let mut fed = Federation::new();
188 fed.add(ConnectorConfig {
189 name: "alpha".to_string(),
190 url: "http://alpha:8080/api".to_string(),
191 username: "user".to_string(),
192 password: "pass".to_string(),
193 id_prefix: None,
194 ws_url: None,
195 sync_interval_secs: None,
196 client_cert: None,
197 client_key: None,
198 ca_cert: None,
199 });
200 fed.add(ConnectorConfig {
201 name: "beta".to_string(),
202 url: "http://beta:8080/api".to_string(),
203 username: "user".to_string(),
204 password: "pass".to_string(),
205 id_prefix: Some("b-".to_string()),
206 ws_url: None,
207 sync_interval_secs: None,
208 client_cert: None,
209 client_key: None,
210 ca_cert: None,
211 });
212
213 let status = fed.status();
214 assert_eq!(status.len(), 2);
215 assert_eq!(status[0].0, "alpha");
216 assert_eq!(status[0].1, 0); assert_eq!(status[1].0, "beta");
218 assert_eq!(status[1].1, 0);
219 }
220
221 #[test]
222 fn federation_owner_of_returns_correct_connector() {
223 let mut fed = Federation::new();
224 fed.add(ConnectorConfig {
225 name: "alpha".to_string(),
226 url: "http://alpha:8080/api".to_string(),
227 username: "user".to_string(),
228 password: "pass".to_string(),
229 id_prefix: Some("a-".to_string()),
230 ws_url: None,
231 sync_interval_secs: None,
232 client_cert: None,
233 client_key: None,
234 ca_cert: None,
235 });
236
237 fed.connectors[0].update_cache(vec![{
239 let mut d = HDict::new();
240 d.set("id", Kind::Ref(HRef::from_val("a-site-1")));
241 d
242 }]);
243
244 assert!(fed.owner_of("a-site-1").is_some());
245 assert_eq!(fed.owner_of("a-site-1").unwrap().config.name, "alpha");
246 assert!(fed.owner_of("unknown-1").is_none());
247 }
248
249 #[test]
250 fn federation_from_toml_str() {
251 let toml = r#"
252[connectors.building-a]
253name = "Building A"
254url = "http://building-a:8080/api"
255username = "federation"
256password = "s3cret"
257id_prefix = "bldg-a-"
258sync_interval_secs = 30
259
260[connectors.building-b]
261name = "Building B"
262url = "https://building-b:8443/api"
263username = "federation"
264password = "s3cret"
265id_prefix = "bldg-b-"
266client_cert = "/etc/certs/federation.pem"
267client_key = "/etc/certs/federation-key.pem"
268ca_cert = "/etc/certs/ca.pem"
269"#;
270 let fed = Federation::from_toml_str(toml).unwrap();
271 assert_eq!(fed.connector_count(), 2);
272 let status = fed.status();
273 let names: Vec<&str> = status.iter().map(|(n, _)| n.as_str()).collect();
274 assert!(names.contains(&"Building A"));
275 assert!(names.contains(&"Building B"));
276 }
277
278 #[test]
279 fn federation_from_toml_str_empty() {
280 let toml = "[connectors]\n";
281 let fed = Federation::from_toml_str(toml).unwrap();
282 assert_eq!(fed.connector_count(), 0);
283 }
284
285 #[test]
286 fn federation_from_toml_str_invalid() {
287 let toml = "not valid toml {{{}";
288 assert!(Federation::from_toml_str(toml).is_err());
289 }
290
291 #[test]
292 fn federation_all_cached_entities_empty() {
293 let mut fed = Federation::new();
294 fed.add(ConnectorConfig {
295 name: "server".to_string(),
296 url: "http://localhost:8080/api".to_string(),
297 username: "user".to_string(),
298 password: "pass".to_string(),
299 id_prefix: None,
300 ws_url: None,
301 sync_interval_secs: None,
302 client_cert: None,
303 client_key: None,
304 ca_cert: None,
305 });
306 assert!(fed.all_cached_entities().is_empty());
308 }
309}