Skip to main content

microscope_memory/
federation.rs

1//! Multi-index federation for Microscope Memory.
2//!
3//! Query multiple microscope indices in parallel and merge results.
4//! Enables cross-project memory search.
5
6use crate::config::Config;
7use crate::reader::MicroscopeReader;
8use crate::{content_coords_blended, read_append_log, LAYER_NAMES};
9use std::path::Path;
10
11/// A single result from a federated search, tagged with its source index.
12#[derive(Clone)]
13pub struct FederatedResult {
14    pub text: String,
15    pub depth: u8,
16    pub layer: String,
17    pub score: f32,
18    pub source_index: String,
19    pub is_append: bool,
20}
21
22/// Federated search across multiple microscope indices.
23pub struct FederatedSearch {
24    /// (name, config, weight) for each index
25    indices: Vec<(String, Config, f32)>,
26}
27
28impl FederatedSearch {
29    /// Create from the main config's federation section.
30    pub fn from_config(config: &Config) -> Result<Self, String> {
31        let mut indices = Vec::new();
32
33        for entry in &config.federation.indices {
34            let idx_config = Config::load(&entry.config_path).map_err(|e| {
35                format!(
36                    "Failed to load federated index '{}' from '{}': {}",
37                    entry.name, entry.config_path, e
38                )
39            })?;
40            indices.push((entry.name.clone(), idx_config, entry.weight));
41        }
42
43        if indices.is_empty() {
44            return Err("No federated indices configured".to_string());
45        }
46
47        Ok(Self { indices })
48    }
49
50    /// Recall query across all federated indices in parallel.
51    pub fn recall(&self, query: &str, k: usize) -> Vec<FederatedResult> {
52        let results: Vec<Vec<FederatedResult>> = std::thread::scope(|s| {
53            let handles: Vec<_> = self
54                .indices
55                .iter()
56                .map(|(name, config, weight)| {
57                    let name = name.clone();
58                    let weight = *weight;
59                    s.spawn(move || recall_single(&name, config, query, k, weight))
60                })
61                .collect();
62
63            handles.into_iter().filter_map(|h| h.join().ok()).collect()
64        });
65
66        merge_results(results, k)
67    }
68
69    /// Text search across all federated indices in parallel.
70    pub fn find_text(&self, query: &str, k: usize) -> Vec<FederatedResult> {
71        let results: Vec<Vec<FederatedResult>> = std::thread::scope(|s| {
72            let handles: Vec<_> = self
73                .indices
74                .iter()
75                .map(|(name, config, weight)| {
76                    let name = name.clone();
77                    let weight = *weight;
78                    s.spawn(move || find_single(&name, config, query, k, weight))
79                })
80                .collect();
81
82            handles.into_iter().filter_map(|h| h.join().ok()).collect()
83        });
84
85        merge_results(results, k)
86    }
87
88    /// MQL query across all federated indices in parallel.
89    pub fn mql_query(&self, mql: &str, k: usize) -> Vec<FederatedResult> {
90        let results: Vec<Vec<FederatedResult>> = std::thread::scope(|s| {
91            let handles: Vec<_> = self
92                .indices
93                .iter()
94                .map(|(name, config, weight)| {
95                    let name = name.clone();
96                    let weight = *weight;
97                    s.spawn(move || mql_single(&name, config, mql, k, weight))
98                })
99                .collect();
100
101            handles.into_iter().filter_map(|h| h.join().ok()).collect()
102        });
103
104        merge_results(results, k)
105    }
106
107    /// Get names and status of all federated indices.
108    pub fn status(&self) -> Vec<(String, Result<usize, String>)> {
109        self.indices
110            .iter()
111            .map(|(name, config, _)| {
112                let result = MicroscopeReader::open(config).map(|r| r.block_count);
113                (name.clone(), result)
114            })
115            .collect()
116    }
117}
118
119/// Recall from a single index.
120fn recall_single(
121    name: &str,
122    config: &Config,
123    query: &str,
124    k: usize,
125    weight: f32,
126) -> Vec<FederatedResult> {
127    let reader = match MicroscopeReader::open(config) {
128        Ok(r) => r,
129        Err(_) => return Vec::new(),
130    };
131
132    let (qx, qy, qz) = content_coords_blended(query, "long_term", config.search.semantic_weight);
133    let q_lower = query.to_lowercase();
134    let keywords: Vec<&str> = q_lower.split_whitespace().filter(|w| w.len() > 2).collect();
135
136    let (zoom_lo, zoom_hi) = match query.len() {
137        0..=10 => (0u8, 3u8),
138        11..=40 => (3, 6),
139        _ => (6, 8),
140    };
141
142    let mut results: Vec<(f32, FederatedResult)> = Vec::new();
143
144    for zoom in zoom_lo..=zoom_hi {
145        let (start, count) = reader.depth_ranges[zoom as usize];
146        let (start, count) = (start as usize, count as usize);
147        for i in start..(start + count) {
148            let text = reader.text(i);
149            let text_lower = text.to_lowercase();
150            let hits = keywords
151                .iter()
152                .filter(|&&kw| text_lower.contains(kw))
153                .count();
154            if hits > 0 {
155                let h = reader.header(i);
156                let dx = h.x - qx;
157                let dy = h.y - qy;
158                let dz = h.z - qz;
159                let dist = dx * dx + dy * dy + dz * dz;
160                let boost = hits as f32 * 0.1;
161                let score = (dist - boost).max(0.0) / weight; // lower weight = better score
162                results.push((
163                    score,
164                    FederatedResult {
165                        text: text.to_string(),
166                        depth: h.depth,
167                        layer: LAYER_NAMES
168                            .get(h.layer_id as usize)
169                            .unwrap_or(&"?")
170                            .to_string(),
171                        score,
172                        source_index: name.to_string(),
173                        is_append: false,
174                    },
175                ));
176            }
177        }
178    }
179
180    // Also search append log
181    let append_path = Path::new(&config.paths.output_dir).join("append.bin");
182    let appended = read_append_log(&append_path);
183    for entry in &appended {
184        let dx = entry.x - qx;
185        let dy = entry.y - qy;
186        let dz = entry.z - qz;
187        let dist = dx * dx + dy * dy + dz * dz;
188        let text_lower = entry.text.to_lowercase();
189        let hits = keywords
190            .iter()
191            .filter(|&&kw| text_lower.contains(kw))
192            .count();
193        if dist < 0.1 || hits > 0 {
194            let boost = hits as f32 * 0.1;
195            let score = (dist - boost).max(0.0) / weight;
196            results.push((
197                score,
198                FederatedResult {
199                    text: entry.text.clone(),
200                    depth: entry.depth,
201                    layer: LAYER_NAMES
202                        .get(entry.layer_id as usize)
203                        .unwrap_or(&"?")
204                        .to_string(),
205                    score,
206                    source_index: name.to_string(),
207                    is_append: true,
208                },
209            ));
210        }
211    }
212
213    results.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
214    results.truncate(k);
215    results.into_iter().map(|(_, r)| r).collect()
216}
217
218/// Text search from a single index.
219fn find_single(
220    name: &str,
221    config: &Config,
222    query: &str,
223    k: usize,
224    weight: f32,
225) -> Vec<FederatedResult> {
226    let reader = match MicroscopeReader::open(config) {
227        Ok(r) => r,
228        Err(_) => return Vec::new(),
229    };
230
231    let results = reader.find_text(query, k);
232    results
233        .iter()
234        .enumerate()
235        .map(|(rank, &(_, idx))| {
236            let h = reader.header(idx);
237            FederatedResult {
238                text: reader.text(idx).to_string(),
239                depth: h.depth,
240                layer: LAYER_NAMES
241                    .get(h.layer_id as usize)
242                    .unwrap_or(&"?")
243                    .to_string(),
244                score: rank as f32 / weight,
245                source_index: name.to_string(),
246                is_append: false,
247            }
248        })
249        .collect()
250}
251
252/// MQL query from a single index.
253fn mql_single(
254    name: &str,
255    config: &Config,
256    mql: &str,
257    k: usize,
258    weight: f32,
259) -> Vec<FederatedResult> {
260    let reader = match MicroscopeReader::open(config) {
261        Ok(r) => r,
262        Err(_) => return Vec::new(),
263    };
264
265    let append_path = Path::new(&config.paths.output_dir).join("append.bin");
266    let appended = read_append_log(&append_path);
267    let q = crate::query::parse(mql);
268    let mut results = crate::query::execute(&q, &reader, &appended);
269    results.truncate(k);
270
271    results
272        .iter()
273        .map(|r| {
274            if r.is_main {
275                let h = reader.header(r.block_idx);
276                FederatedResult {
277                    text: reader.text(r.block_idx).to_string(),
278                    depth: h.depth,
279                    layer: LAYER_NAMES
280                        .get(h.layer_id as usize)
281                        .unwrap_or(&"?")
282                        .to_string(),
283                    score: r.score / weight,
284                    source_index: name.to_string(),
285                    is_append: false,
286                }
287            } else {
288                let entry = &appended[r.block_idx];
289                FederatedResult {
290                    text: entry.text.clone(),
291                    depth: entry.depth,
292                    layer: LAYER_NAMES
293                        .get(entry.layer_id as usize)
294                        .unwrap_or(&"?")
295                        .to_string(),
296                    score: r.score / weight,
297                    source_index: name.to_string(),
298                    is_append: true,
299                }
300            }
301        })
302        .collect()
303}
304
305/// Merge results from multiple indices, sort by score, truncate to k.
306fn merge_results(all: Vec<Vec<FederatedResult>>, k: usize) -> Vec<FederatedResult> {
307    let mut merged: Vec<FederatedResult> = all.into_iter().flatten().collect();
308    merged.sort_by(|a, b| a.score.partial_cmp(&b.score).unwrap());
309    merged.truncate(k);
310    merged
311}
312
313// ─── Mirror Neuron Pulse Exchange ───────────────────
314
315/// Exchange resonance pulses across federated indices.
316/// Each index exports its outgoing pulses, and imports others' pulses.
317/// Returns total pulses exchanged.
318pub fn exchange_pulses(config: &Config) -> Result<usize, String> {
319    use crate::resonance::ResonanceState;
320
321    let output_dir = Path::new(&config.paths.output_dir);
322    let mut local = ResonanceState::load_or_init(output_dir);
323
324    // Export our outgoing pulses
325    let our_pulses = local.export_pulses();
326    let mut total_exchanged = 0usize;
327
328    // Read local headers for proximity matching
329    let reader = MicroscopeReader::open(config).map_err(|e| format!("open reader: {}", e))?;
330    let local_headers: Vec<(f32, f32, f32)> = (0..reader.block_count)
331        .map(|i| {
332            let h = reader.header(i);
333            (h.x, h.y, h.z)
334        })
335        .collect();
336
337    // For each federated index, exchange pulses
338    for idx_config in &config.federation.indices {
339        let idx_cfg =
340            Config::load(&idx_config.config_path).map_err(|e| format!("load config: {}", e))?;
341        let idx_dir = Path::new(&idx_cfg.paths.output_dir);
342
343        // Load the other index's resonance state
344        let mut other = ResonanceState::load_or_init(idx_dir);
345
346        // Send our pulses to them
347        let their_headers: Vec<(f32, f32, f32)> = {
348            if let Ok(r) = MicroscopeReader::open(&idx_cfg) {
349                (0..r.block_count)
350                    .map(|i| {
351                        let h = r.header(i);
352                        (h.x, h.y, h.z)
353                    })
354                    .collect()
355            } else {
356                continue;
357            }
358        };
359
360        let our_decoded = ResonanceState::import_pulses(&our_pulses);
361        for pulse in our_decoded {
362            other.receive_pulse(pulse, &their_headers, 0.05);
363            total_exchanged += 1;
364        }
365
366        // Receive their pulses
367        let their_pulses = other.export_pulses();
368        let their_decoded = ResonanceState::import_pulses(&their_pulses);
369        for pulse in their_decoded {
370            local.receive_pulse(pulse, &local_headers, 0.05);
371            total_exchanged += 1;
372        }
373
374        // Save the other index's updated state
375        let _ = other.save(idx_dir);
376    }
377
378    // Save our updated state
379    local
380        .save(output_dir)
381        .map_err(|e| format!("save resonance: {}", e))?;
382
383    Ok(total_exchanged)
384}
385
386// ─── Cross-Instance Learning ────────────────────────
387
388/// Exchange thought patterns and predictive cache stats across federated indices.
389/// Returns total patterns exchanged.
390pub fn exchange_patterns(config: &Config) -> Result<usize, String> {
391    use crate::predictive_cache::PredictiveCache;
392    use crate::thought_graph::ThoughtGraphState;
393
394    let output_dir = Path::new(&config.paths.output_dir);
395    let mut local_tg = ThoughtGraphState::load_or_init(output_dir);
396    let mut local_pc = PredictiveCache::load_or_init(output_dir);
397
398    let our_patterns: Vec<_> = local_tg.export_patterns().into_iter().cloned().collect();
399    let (_our_preds, _our_hits, _our_misses, our_hit_rate) = local_pc.export_stats();
400
401    let mut total_exchanged = 0usize;
402
403    for idx_config in &config.federation.indices {
404        let idx_cfg =
405            Config::load(&idx_config.config_path).map_err(|e| format!("load config: {}", e))?;
406        let idx_dir = Path::new(&idx_cfg.paths.output_dir);
407
408        let mut other_tg = ThoughtGraphState::load_or_init(idx_dir);
409        let mut other_pc = PredictiveCache::load_or_init(idx_dir);
410
411        let their_patterns: Vec<_> = other_tg.export_patterns().into_iter().cloned().collect();
412        let (their_preds, their_hits, their_misses, their_hit_rate) = other_pc.export_stats();
413
414        // Send our patterns to them (trust = our hit rate * their federation weight)
415        let trust_for_them = our_hit_rate * idx_config.weight;
416        other_tg.import_patterns(&our_patterns, trust_for_them);
417        total_exchanged += our_patterns.len();
418
419        // Receive their patterns (trust = their hit rate * their federation weight)
420        let trust_for_us = their_hit_rate * idx_config.weight;
421        local_tg.import_patterns(&their_patterns, trust_for_us);
422        total_exchanged += their_patterns.len();
423
424        // Merge stats
425        local_pc.merge_stats(their_preds, their_hits, their_misses);
426        other_pc.merge_stats(_our_preds, _our_hits, _our_misses);
427
428        let _ = other_tg.save(idx_dir);
429        let _ = other_pc.save(idx_dir);
430    }
431
432    local_tg
433        .save(output_dir)
434        .map_err(|e| format!("save thought_graph: {}", e))?;
435    local_pc
436        .save(output_dir)
437        .map_err(|e| format!("save predictive_cache: {}", e))?;
438
439    Ok(total_exchanged)
440}