1use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::connector::{Connector, ConnectorConfig};
7use haystack_core::data::HDict;
8
9#[derive(serde::Deserialize)]
20struct FederationToml {
21 connectors: HashMap<String, ConnectorConfig>,
22}
23
24pub struct Federation {
26 pub connectors: Vec<Arc<Connector>>,
27}
28
29impl Federation {
30 pub fn new() -> Self {
32 Self {
33 connectors: Vec::new(),
34 }
35 }
36
37 pub fn add(&mut self, config: ConnectorConfig) {
39 self.connectors.push(Arc::new(Connector::new(config)));
40 }
41
42 pub async fn sync_one(&self, name: &str) -> Result<usize, String> {
44 for connector in &self.connectors {
45 if connector.config.name == name {
46 return connector.sync().await;
47 }
48 }
49 Err(format!("connector not found: {name}"))
50 }
51
52 pub async fn sync_all(&self) -> Vec<(String, Result<usize, String>)> {
57 let mut results = Vec::new();
58 for connector in &self.connectors {
59 let name = connector.config.name.clone();
60 let result = connector.sync().await;
61 results.push((name, result));
62 }
63 results
64 }
65
66 pub fn all_cached_entities(&self) -> Vec<HDict> {
68 let mut all = Vec::new();
69 for connector in &self.connectors {
70 all.extend(connector.cached_entities());
71 }
72 all
73 }
74
75 pub fn filter_cached_entities(
81 &self,
82 filter_expr: &str,
83 limit: usize,
84 ) -> Result<Vec<HDict>, String> {
85 let effective_limit = if limit == 0 { usize::MAX } else { limit };
86 let mut all = Vec::new();
87 for connector in &self.connectors {
88 if all.len() >= effective_limit {
89 break;
90 }
91 let remaining = effective_limit - all.len();
92 let results = connector.filter_cached(filter_expr, remaining)?;
93 all.extend(results);
94 }
95 Ok(all)
96 }
97
98 pub fn connector_count(&self) -> usize {
100 self.connectors.len()
101 }
102
103 pub fn owner_of(&self, id: &str) -> Option<&Arc<Connector>> {
105 self.connectors.iter().find(|c| c.owns(id))
106 }
107
108 pub fn batch_read_by_id<'a>(
114 &self,
115 ids: impl IntoIterator<Item = &'a str>,
116 ) -> (Vec<HDict>, Vec<String>) {
117 let mut groups: HashMap<usize, Vec<&str>> = HashMap::new();
119 let mut not_owned: Vec<String> = Vec::new();
120
121 for id in ids {
122 let mut found = false;
123 for (idx, connector) in self.connectors.iter().enumerate() {
124 if connector.owns(id) {
125 groups.entry(idx).or_default().push(id);
126 found = true;
127 break;
128 }
129 }
130 if !found {
131 not_owned.push(id.to_string());
132 }
133 }
134
135 let mut all_found = Vec::new();
137 for (idx, ids) in &groups {
138 let (found, mut missing) = self.connectors[*idx].batch_get_cached(ids);
139 all_found.extend(found);
140 not_owned.append(&mut missing);
141 }
142
143 (all_found, not_owned)
144 }
145
146 pub fn status(&self) -> Vec<(String, usize)> {
148 self.connectors
149 .iter()
150 .map(|c| (c.config.name.clone(), c.entity_count()))
151 .collect()
152 }
153
154 pub fn from_toml_str(toml_str: &str) -> Result<Self, String> {
157 let parsed: FederationToml =
158 toml::from_str(toml_str).map_err(|e| format!("invalid federation TOML: {e}"))?;
159 let mut fed = Self::new();
160 for (_key, config) in parsed.connectors {
161 fed.add(config);
162 }
163 Ok(fed)
164 }
165
166 pub fn from_toml_file(path: &str) -> Result<Self, String> {
168 let contents =
169 std::fs::read_to_string(path).map_err(|e| format!("failed to read {path}: {e}"))?;
170 Self::from_toml_str(&contents)
171 }
172
173 pub fn start_background_sync(&self) -> Vec<tokio::task::JoinHandle<()>> {
179 self.connectors
180 .iter()
181 .map(|c| Connector::spawn_sync_task(Arc::clone(c)))
182 .collect()
183 }
184}
185
186impl Default for Federation {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use haystack_core::kinds::{HRef, Kind};
196
197 #[test]
198 fn federation_new_empty() {
199 let fed = Federation::new();
200 assert_eq!(fed.connector_count(), 0);
201 assert!(fed.all_cached_entities().is_empty());
202 assert!(fed.status().is_empty());
203 }
204
205 #[test]
206 fn federation_add_connector() {
207 let mut fed = Federation::new();
208 assert_eq!(fed.connector_count(), 0);
209
210 fed.add(ConnectorConfig {
211 name: "server-1".to_string(),
212 url: "http://localhost:8080/api".to_string(),
213 username: "user".to_string(),
214 password: "pass".to_string(),
215 id_prefix: None,
216 ws_url: None,
217 sync_interval_secs: None,
218 client_cert: None,
219 client_key: None,
220 ca_cert: None,
221 });
222 assert_eq!(fed.connector_count(), 1);
223
224 fed.add(ConnectorConfig {
225 name: "server-2".to_string(),
226 url: "http://localhost:8081/api".to_string(),
227 username: "user".to_string(),
228 password: "pass".to_string(),
229 id_prefix: Some("s2-".to_string()),
230 ws_url: None,
231 sync_interval_secs: None,
232 client_cert: None,
233 client_key: None,
234 ca_cert: None,
235 });
236 assert_eq!(fed.connector_count(), 2);
237 }
238
239 #[test]
240 fn federation_status_empty() {
241 let fed = Federation::new();
242 let status = fed.status();
243 assert!(status.is_empty());
244 }
245
246 #[test]
247 fn federation_status_with_connectors() {
248 let mut fed = Federation::new();
249 fed.add(ConnectorConfig {
250 name: "alpha".to_string(),
251 url: "http://alpha:8080/api".to_string(),
252 username: "user".to_string(),
253 password: "pass".to_string(),
254 id_prefix: None,
255 ws_url: None,
256 sync_interval_secs: None,
257 client_cert: None,
258 client_key: None,
259 ca_cert: None,
260 });
261 fed.add(ConnectorConfig {
262 name: "beta".to_string(),
263 url: "http://beta:8080/api".to_string(),
264 username: "user".to_string(),
265 password: "pass".to_string(),
266 id_prefix: Some("b-".to_string()),
267 ws_url: None,
268 sync_interval_secs: None,
269 client_cert: None,
270 client_key: None,
271 ca_cert: None,
272 });
273
274 let status = fed.status();
275 assert_eq!(status.len(), 2);
276 assert_eq!(status[0].0, "alpha");
277 assert_eq!(status[0].1, 0); assert_eq!(status[1].0, "beta");
279 assert_eq!(status[1].1, 0);
280 }
281
282 #[test]
283 fn federation_owner_of_returns_correct_connector() {
284 let mut fed = Federation::new();
285 fed.add(ConnectorConfig {
286 name: "alpha".to_string(),
287 url: "http://alpha:8080/api".to_string(),
288 username: "user".to_string(),
289 password: "pass".to_string(),
290 id_prefix: Some("a-".to_string()),
291 ws_url: None,
292 sync_interval_secs: None,
293 client_cert: None,
294 client_key: None,
295 ca_cert: None,
296 });
297
298 fed.connectors[0].update_cache(vec![{
300 let mut d = HDict::new();
301 d.set("id", Kind::Ref(HRef::from_val("a-site-1")));
302 d
303 }]);
304
305 assert!(fed.owner_of("a-site-1").is_some());
306 assert_eq!(fed.owner_of("a-site-1").unwrap().config.name, "alpha");
307 assert!(fed.owner_of("unknown-1").is_none());
308 }
309
310 #[test]
311 fn federation_from_toml_str() {
312 let toml = r#"
313[connectors.building-a]
314name = "Building A"
315url = "http://building-a:8080/api"
316username = "federation"
317password = "s3cret"
318id_prefix = "bldg-a-"
319sync_interval_secs = 30
320
321[connectors.building-b]
322name = "Building B"
323url = "https://building-b:8443/api"
324username = "federation"
325password = "s3cret"
326id_prefix = "bldg-b-"
327client_cert = "/etc/certs/federation.pem"
328client_key = "/etc/certs/federation-key.pem"
329ca_cert = "/etc/certs/ca.pem"
330"#;
331 let fed = Federation::from_toml_str(toml).unwrap();
332 assert_eq!(fed.connector_count(), 2);
333 let status = fed.status();
334 let names: Vec<&str> = status.iter().map(|(n, _)| n.as_str()).collect();
335 assert!(names.contains(&"Building A"));
336 assert!(names.contains(&"Building B"));
337 }
338
339 #[test]
340 fn federation_from_toml_str_empty() {
341 let toml = "[connectors]\n";
342 let fed = Federation::from_toml_str(toml).unwrap();
343 assert_eq!(fed.connector_count(), 0);
344 }
345
346 #[test]
347 fn federation_from_toml_str_invalid() {
348 let toml = "not valid toml {{{}";
349 assert!(Federation::from_toml_str(toml).is_err());
350 }
351
352 #[test]
353 fn federation_all_cached_entities_empty() {
354 let mut fed = Federation::new();
355 fed.add(ConnectorConfig {
356 name: "server".to_string(),
357 url: "http://localhost:8080/api".to_string(),
358 username: "user".to_string(),
359 password: "pass".to_string(),
360 id_prefix: None,
361 ws_url: None,
362 sync_interval_secs: None,
363 client_cert: None,
364 client_key: None,
365 ca_cert: None,
366 });
367 assert!(fed.all_cached_entities().is_empty());
369 }
370}