1use crate::config::Config;
7use crate::reader::MicroscopeReader;
8use crate::{content_coords_blended, read_append_log, LAYER_NAMES};
9use std::path::Path;
10
11#[derive(Clone)]
13pub struct FederatedResult {
14 pub text: String,
15 pub depth: u8,
16 pub layer: String,
17 pub score: f32,
18 pub source_index: String,
19 pub is_append: bool,
20}
21
22pub struct FederatedSearch {
24 indices: Vec<(String, Config, f32)>,
26}
27
28impl FederatedSearch {
29 pub fn from_config(config: &Config) -> Result<Self, String> {
31 let mut indices = Vec::new();
32
33 for entry in &config.federation.indices {
34 let idx_config = Config::load(&entry.config_path).map_err(|e| {
35 format!(
36 "Failed to load federated index '{}' from '{}': {}",
37 entry.name, entry.config_path, e
38 )
39 })?;
40 indices.push((entry.name.clone(), idx_config, entry.weight));
41 }
42
43 if indices.is_empty() {
44 return Err("No federated indices configured".to_string());
45 }
46
47 Ok(Self { indices })
48 }
49
50 pub fn recall(&self, query: &str, k: usize) -> Vec<FederatedResult> {
52 let results: Vec<Vec<FederatedResult>> = std::thread::scope(|s| {
53 let handles: Vec<_> = self
54 .indices
55 .iter()
56 .map(|(name, config, weight)| {
57 let name = name.clone();
58 let weight = *weight;
59 s.spawn(move || recall_single(&name, config, query, k, weight))
60 })
61 .collect();
62
63 handles.into_iter().filter_map(|h| h.join().ok()).collect()
64 });
65
66 merge_results(results, k)
67 }
68
69 pub fn find_text(&self, query: &str, k: usize) -> Vec<FederatedResult> {
71 let results: Vec<Vec<FederatedResult>> = std::thread::scope(|s| {
72 let handles: Vec<_> = self
73 .indices
74 .iter()
75 .map(|(name, config, weight)| {
76 let name = name.clone();
77 let weight = *weight;
78 s.spawn(move || find_single(&name, config, query, k, weight))
79 })
80 .collect();
81
82 handles.into_iter().filter_map(|h| h.join().ok()).collect()
83 });
84
85 merge_results(results, k)
86 }
87
88 pub fn mql_query(&self, mql: &str, k: usize) -> Vec<FederatedResult> {
90 let results: Vec<Vec<FederatedResult>> = std::thread::scope(|s| {
91 let handles: Vec<_> = self
92 .indices
93 .iter()
94 .map(|(name, config, weight)| {
95 let name = name.clone();
96 let weight = *weight;
97 s.spawn(move || mql_single(&name, config, mql, k, weight))
98 })
99 .collect();
100
101 handles.into_iter().filter_map(|h| h.join().ok()).collect()
102 });
103
104 merge_results(results, k)
105 }
106
107 pub fn status(&self) -> Vec<(String, Result<usize, String>)> {
109 self.indices
110 .iter()
111 .map(|(name, config, _)| {
112 let result = MicroscopeReader::open(config).map(|r| r.block_count);
113 (name.clone(), result)
114 })
115 .collect()
116 }
117}
118
119fn recall_single(
121 name: &str,
122 config: &Config,
123 query: &str,
124 k: usize,
125 weight: f32,
126) -> Vec<FederatedResult> {
127 let reader = match MicroscopeReader::open(config) {
128 Ok(r) => r,
129 Err(_) => return Vec::new(),
130 };
131
132 let (qx, qy, qz) = content_coords_blended(query, "long_term", config.search.semantic_weight);
133 let q_lower = query.to_lowercase();
134 let keywords: Vec<&str> = q_lower.split_whitespace().filter(|w| w.len() > 2).collect();
135
136 let (zoom_lo, zoom_hi) = match query.len() {
137 0..=10 => (0u8, 3u8),
138 11..=40 => (3, 6),
139 _ => (6, 8),
140 };
141
142 let mut results: Vec<(f32, FederatedResult)> = Vec::new();
143
144 for zoom in zoom_lo..=zoom_hi {
145 let (start, count) = reader.depth_ranges[zoom as usize];
146 let (start, count) = (start as usize, count as usize);
147 for i in start..(start + count) {
148 let text = reader.text(i);
149 let text_lower = text.to_lowercase();
150 let hits = keywords
151 .iter()
152 .filter(|&&kw| text_lower.contains(kw))
153 .count();
154 if hits > 0 {
155 let h = reader.header(i);
156 let dx = h.x - qx;
157 let dy = h.y - qy;
158 let dz = h.z - qz;
159 let dist = dx * dx + dy * dy + dz * dz;
160 let boost = hits as f32 * 0.1;
161 let score = (dist - boost).max(0.0) / weight; results.push((
163 score,
164 FederatedResult {
165 text: text.to_string(),
166 depth: h.depth,
167 layer: LAYER_NAMES
168 .get(h.layer_id as usize)
169 .unwrap_or(&"?")
170 .to_string(),
171 score,
172 source_index: name.to_string(),
173 is_append: false,
174 },
175 ));
176 }
177 }
178 }
179
180 let append_path = Path::new(&config.paths.output_dir).join("append.bin");
182 let appended = read_append_log(&append_path);
183 for entry in &appended {
184 let dx = entry.x - qx;
185 let dy = entry.y - qy;
186 let dz = entry.z - qz;
187 let dist = dx * dx + dy * dy + dz * dz;
188 let text_lower = entry.text.to_lowercase();
189 let hits = keywords
190 .iter()
191 .filter(|&&kw| text_lower.contains(kw))
192 .count();
193 if dist < 0.1 || hits > 0 {
194 let boost = hits as f32 * 0.1;
195 let score = (dist - boost).max(0.0) / weight;
196 results.push((
197 score,
198 FederatedResult {
199 text: entry.text.clone(),
200 depth: entry.depth,
201 layer: LAYER_NAMES
202 .get(entry.layer_id as usize)
203 .unwrap_or(&"?")
204 .to_string(),
205 score,
206 source_index: name.to_string(),
207 is_append: true,
208 },
209 ));
210 }
211 }
212
213 results.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
214 results.truncate(k);
215 results.into_iter().map(|(_, r)| r).collect()
216}
217
218fn find_single(
220 name: &str,
221 config: &Config,
222 query: &str,
223 k: usize,
224 weight: f32,
225) -> Vec<FederatedResult> {
226 let reader = match MicroscopeReader::open(config) {
227 Ok(r) => r,
228 Err(_) => return Vec::new(),
229 };
230
231 let results = reader.find_text(query, k);
232 results
233 .iter()
234 .enumerate()
235 .map(|(rank, &(_, idx))| {
236 let h = reader.header(idx);
237 FederatedResult {
238 text: reader.text(idx).to_string(),
239 depth: h.depth,
240 layer: LAYER_NAMES
241 .get(h.layer_id as usize)
242 .unwrap_or(&"?")
243 .to_string(),
244 score: rank as f32 / weight,
245 source_index: name.to_string(),
246 is_append: false,
247 }
248 })
249 .collect()
250}
251
252fn mql_single(
254 name: &str,
255 config: &Config,
256 mql: &str,
257 k: usize,
258 weight: f32,
259) -> Vec<FederatedResult> {
260 let reader = match MicroscopeReader::open(config) {
261 Ok(r) => r,
262 Err(_) => return Vec::new(),
263 };
264
265 let append_path = Path::new(&config.paths.output_dir).join("append.bin");
266 let appended = read_append_log(&append_path);
267 let q = crate::query::parse(mql);
268 let mut results = crate::query::execute(&q, &reader, &appended);
269 results.truncate(k);
270
271 results
272 .iter()
273 .map(|r| {
274 if r.is_main {
275 let h = reader.header(r.block_idx);
276 FederatedResult {
277 text: reader.text(r.block_idx).to_string(),
278 depth: h.depth,
279 layer: LAYER_NAMES
280 .get(h.layer_id as usize)
281 .unwrap_or(&"?")
282 .to_string(),
283 score: r.score / weight,
284 source_index: name.to_string(),
285 is_append: false,
286 }
287 } else {
288 let entry = &appended[r.block_idx];
289 FederatedResult {
290 text: entry.text.clone(),
291 depth: entry.depth,
292 layer: LAYER_NAMES
293 .get(entry.layer_id as usize)
294 .unwrap_or(&"?")
295 .to_string(),
296 score: r.score / weight,
297 source_index: name.to_string(),
298 is_append: true,
299 }
300 }
301 })
302 .collect()
303}
304
305fn merge_results(all: Vec<Vec<FederatedResult>>, k: usize) -> Vec<FederatedResult> {
307 let mut merged: Vec<FederatedResult> = all.into_iter().flatten().collect();
308 merged.sort_by(|a, b| a.score.partial_cmp(&b.score).unwrap());
309 merged.truncate(k);
310 merged
311}
312
313pub fn exchange_pulses(config: &Config) -> Result<usize, String> {
319 use crate::resonance::ResonanceState;
320
321 let output_dir = Path::new(&config.paths.output_dir);
322 let mut local = ResonanceState::load_or_init(output_dir);
323
324 let our_pulses = local.export_pulses();
326 let mut total_exchanged = 0usize;
327
328 let reader = MicroscopeReader::open(config).map_err(|e| format!("open reader: {}", e))?;
330 let local_headers: Vec<(f32, f32, f32)> = (0..reader.block_count)
331 .map(|i| {
332 let h = reader.header(i);
333 (h.x, h.y, h.z)
334 })
335 .collect();
336
337 for idx_config in &config.federation.indices {
339 let idx_cfg =
340 Config::load(&idx_config.config_path).map_err(|e| format!("load config: {}", e))?;
341 let idx_dir = Path::new(&idx_cfg.paths.output_dir);
342
343 let mut other = ResonanceState::load_or_init(idx_dir);
345
346 let their_headers: Vec<(f32, f32, f32)> = {
348 if let Ok(r) = MicroscopeReader::open(&idx_cfg) {
349 (0..r.block_count)
350 .map(|i| {
351 let h = r.header(i);
352 (h.x, h.y, h.z)
353 })
354 .collect()
355 } else {
356 continue;
357 }
358 };
359
360 let our_decoded = ResonanceState::import_pulses(&our_pulses);
361 for pulse in our_decoded {
362 other.receive_pulse(pulse, &their_headers, 0.05);
363 total_exchanged += 1;
364 }
365
366 let their_pulses = other.export_pulses();
368 let their_decoded = ResonanceState::import_pulses(&their_pulses);
369 for pulse in their_decoded {
370 local.receive_pulse(pulse, &local_headers, 0.05);
371 total_exchanged += 1;
372 }
373
374 let _ = other.save(idx_dir);
376 }
377
378 local
380 .save(output_dir)
381 .map_err(|e| format!("save resonance: {}", e))?;
382
383 Ok(total_exchanged)
384}
385
386pub fn exchange_patterns(config: &Config) -> Result<usize, String> {
391 use crate::predictive_cache::PredictiveCache;
392 use crate::thought_graph::ThoughtGraphState;
393
394 let output_dir = Path::new(&config.paths.output_dir);
395 let mut local_tg = ThoughtGraphState::load_or_init(output_dir);
396 let mut local_pc = PredictiveCache::load_or_init(output_dir);
397
398 let our_patterns: Vec<_> = local_tg.export_patterns().into_iter().cloned().collect();
399 let (_our_preds, _our_hits, _our_misses, our_hit_rate) = local_pc.export_stats();
400
401 let mut total_exchanged = 0usize;
402
403 for idx_config in &config.federation.indices {
404 let idx_cfg =
405 Config::load(&idx_config.config_path).map_err(|e| format!("load config: {}", e))?;
406 let idx_dir = Path::new(&idx_cfg.paths.output_dir);
407
408 let mut other_tg = ThoughtGraphState::load_or_init(idx_dir);
409 let mut other_pc = PredictiveCache::load_or_init(idx_dir);
410
411 let their_patterns: Vec<_> = other_tg.export_patterns().into_iter().cloned().collect();
412 let (their_preds, their_hits, their_misses, their_hit_rate) = other_pc.export_stats();
413
414 let trust_for_them = our_hit_rate * idx_config.weight;
416 other_tg.import_patterns(&our_patterns, trust_for_them);
417 total_exchanged += our_patterns.len();
418
419 let trust_for_us = their_hit_rate * idx_config.weight;
421 local_tg.import_patterns(&their_patterns, trust_for_us);
422 total_exchanged += their_patterns.len();
423
424 local_pc.merge_stats(their_preds, their_hits, their_misses);
426 other_pc.merge_stats(_our_preds, _our_hits, _our_misses);
427
428 let _ = other_tg.save(idx_dir);
429 let _ = other_pc.save(idx_dir);
430 }
431
432 local_tg
433 .save(output_dir)
434 .map_err(|e| format!("save thought_graph: {}", e))?;
435 local_pc
436 .save(output_dir)
437 .map_err(|e| format!("save predictive_cache: {}", e))?;
438
439 Ok(total_exchanged)
440}