Skip to main content

lean_ctx/core/
consolidation.rs

1//! Consolidation engine for provider data — hippocampal sleep replay.
2//!
3//! Converts provider results into long-term context artifacts:
4//!   1. BM25/embedding index chunks (for future searches)
5//!   2. Cross-source graph edges (for related-file discovery)
6//!   3. Knowledge facts (for semantic memory)
7//!   4. Session cache entries (for fast re-reads at ~13 tokens)
8//!
9//! This is the "sleep replay" mechanism: raw episodic data (provider API
10//! responses) is consolidated into durable semantic representations.
11//!
12//! Scientific basis: Hippocampal memory consolidation (Kitamura, Science 2017).
13//! Fast hippocampal (session cache) traces are replayed to build slow
14//! neocortical (knowledge + graph + index) representations.
15
16use crate::core::content_chunk::ContentChunk;
17use crate::core::cross_source_edges;
18use crate::core::graph_index::IndexEdge;
19use crate::core::knowledge_provider_extract::{self, ExtractedFact};
20
21/// Result of a consolidation run — tells the caller what was created.
22#[derive(Debug, Clone, Default)]
23pub struct ConsolidationResult {
24    pub chunks_indexed: usize,
25    pub edges_created: usize,
26    pub facts_extracted: usize,
27    pub cache_entries_stored: usize,
28}
29
30/// Consolidate a batch of ContentChunks into all long-term stores.
31///
32/// This is the main entry point. It does NOT perform I/O itself — it returns
33/// the artifacts that the caller should persist. This keeps the consolidation
34/// logic pure and testable.
35pub fn consolidate(chunks: &[ContentChunk]) -> ConsolidationArtifacts {
36    let external_chunks: Vec<&ContentChunk> = chunks.iter().filter(|c| c.is_external()).collect();
37
38    if external_chunks.is_empty() {
39        return ConsolidationArtifacts::default();
40    }
41
42    let edges = cross_source_edges::extract_cross_source_edges(chunks);
43
44    let facts = knowledge_provider_extract::extract_facts(chunks);
45
46    let cache_entries: Vec<CacheableProviderResult> = external_chunks
47        .iter()
48        .map(|c| CacheableProviderResult {
49            uri: c.file_path.clone(),
50            content: c.content.clone(),
51            token_count: c.token_count,
52        })
53        .collect();
54
55    ConsolidationArtifacts {
56        bm25_chunks: chunks.to_vec(),
57        edges,
58        facts,
59        cache_entries,
60    }
61}
62
63/// Pure artifacts produced by consolidation — no side effects yet.
64#[derive(Debug, Clone, Default)]
65pub struct ConsolidationArtifacts {
66    pub bm25_chunks: Vec<ContentChunk>,
67    pub edges: Vec<IndexEdge>,
68    pub facts: Vec<ExtractedFact>,
69    pub cache_entries: Vec<CacheableProviderResult>,
70}
71
72impl ConsolidationArtifacts {
73    pub fn is_empty(&self) -> bool {
74        self.bm25_chunks.is_empty()
75            && self.edges.is_empty()
76            && self.facts.is_empty()
77            && self.cache_entries.is_empty()
78    }
79
80    pub fn summary(&self) -> ConsolidationResult {
81        ConsolidationResult {
82            chunks_indexed: self.bm25_chunks.iter().filter(|c| c.is_external()).count(),
83            edges_created: self.edges.len(),
84            facts_extracted: self.facts.len(),
85            cache_entries_stored: self.cache_entries.len(),
86        }
87    }
88}
89
90/// A provider result ready to be stored in the session cache.
91#[derive(Debug, Clone)]
92pub struct CacheableProviderResult {
93    pub uri: String,
94    pub content: String,
95    pub token_count: usize,
96}
97
98/// Apply consolidation artifacts to the live systems.
99///
100/// This function performs the actual side effects: writing to BM25, graph,
101/// knowledge, and session cache. Designed to be called from a background
102/// thread or after a provider query returns.
103pub fn apply_artifacts(
104    artifacts: &ConsolidationArtifacts,
105    bm25: Option<&mut crate::core::bm25_index::BM25Index>,
106    graph_edges: Option<&mut Vec<IndexEdge>>,
107    session_cache: Option<&mut crate::core::cache::SessionCache>,
108) -> ConsolidationResult {
109    apply_artifacts_with_pg(artifacts, bm25, graph_edges, session_cache, None)
110}
111
112pub fn apply_artifacts_with_pg(
113    artifacts: &ConsolidationArtifacts,
114    bm25: Option<&mut crate::core::bm25_index::BM25Index>,
115    graph_edges: Option<&mut Vec<IndexEdge>>,
116    session_cache: Option<&mut crate::core::cache::SessionCache>,
117    property_graph: Option<&crate::core::property_graph::CodeGraph>,
118) -> ConsolidationResult {
119    let mut result = ConsolidationResult::default();
120
121    if let Some(index) = bm25 {
122        result.chunks_indexed = index.ingest_content_chunks(artifacts.bm25_chunks.clone());
123    }
124
125    if let Some(edges) = graph_edges {
126        result.edges_created = cross_source_edges::merge_edges(edges, artifacts.edges.clone());
127    }
128
129    if let Some(pg) = property_graph {
130        write_edges_to_property_graph(pg, &artifacts.edges);
131    }
132
133    result.facts_extracted = artifacts.facts.len();
134
135    if let Some(cache) = session_cache {
136        for entry in &artifacts.cache_entries {
137            cache.store(&entry.uri, &entry.content);
138            result.cache_entries_stored += 1;
139        }
140    }
141
142    result
143}
144
145fn write_edges_to_property_graph(pg: &crate::core::property_graph::CodeGraph, edges: &[IndexEdge]) {
146    use crate::core::property_graph::{Edge, EdgeKind, Node};
147    for edge in edges {
148        let Ok(src_id) = pg.upsert_node(&Node::file(&edge.from)) else {
149            continue;
150        };
151        let Ok(tgt_id) = pg.upsert_node(&Node::file(&edge.to)) else {
152            continue;
153        };
154        let kind = EdgeKind::parse(&edge.kind);
155        let _ = pg.upsert_edge(&Edge::new(src_id, tgt_id, kind));
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::core::bm25_index::{BM25Index, ChunkKind};
163    use crate::core::cache::SessionCache;
164    use crate::core::content_chunk::ContentChunk;
165
166    fn sample_chunks() -> Vec<ContentChunk> {
167        vec![
168            ContentChunk::from_provider(
169                "github",
170                "issues",
171                "42",
172                "Auth token bug",
173                ChunkKind::Issue,
174                "Token expires too early in src/auth.rs".into(),
175                vec!["src/auth.rs".into()],
176                Some(serde_json::json!({"state": "open", "labels": ["bug"]})),
177            ),
178            ContentChunk::from_provider(
179                "github",
180                "pull_requests",
181                "100",
182                "Fix auth expiry",
183                ChunkKind::PullRequest,
184                "Fixes token lifetime calculation in src/auth.rs".into(),
185                vec!["src/auth.rs".into()],
186                Some(serde_json::json!({"state": "open"})),
187            ),
188        ]
189    }
190
191    #[test]
192    fn consolidate_produces_all_artifact_types() {
193        let chunks = sample_chunks();
194        let artifacts = consolidate(&chunks);
195
196        assert!(!artifacts.is_empty());
197        assert_eq!(artifacts.bm25_chunks.len(), 2);
198        assert!(!artifacts.edges.is_empty());
199        assert!(!artifacts.facts.is_empty());
200        assert_eq!(artifacts.cache_entries.len(), 2);
201    }
202
203    #[test]
204    fn consolidate_empty_input_produces_empty_artifacts() {
205        let artifacts = consolidate(&[]);
206        assert!(artifacts.is_empty());
207    }
208
209    #[test]
210    fn consolidate_code_only_produces_empty_external_artifacts() {
211        let code = ContentChunk::from(crate::core::bm25_index::CodeChunk {
212            file_path: "src/main.rs".into(),
213            symbol_name: "main".into(),
214            kind: ChunkKind::Function,
215            start_line: 1,
216            end_line: 5,
217            content: "fn main() {}".into(),
218            tokens: vec![],
219            token_count: 0,
220        });
221        let artifacts = consolidate(&[code]);
222        assert!(artifacts.edges.is_empty());
223        assert!(artifacts.facts.is_empty());
224        assert!(artifacts.cache_entries.is_empty());
225    }
226
227    #[test]
228    fn consolidation_summary_counts_correctly() {
229        let chunks = sample_chunks();
230        let artifacts = consolidate(&chunks);
231        let summary = artifacts.summary();
232
233        assert_eq!(summary.chunks_indexed, 2);
234        assert!(summary.edges_created > 0);
235        assert!(summary.facts_extracted > 0);
236        assert_eq!(summary.cache_entries_stored, 2);
237    }
238
239    #[test]
240    fn apply_artifacts_to_bm25() {
241        let chunks = sample_chunks();
242        let artifacts = consolidate(&chunks);
243
244        let mut index = BM25Index {
245            chunks: Vec::new(),
246            inverted: std::collections::HashMap::new(),
247            avg_doc_len: 0.0,
248            doc_count: 0,
249            doc_freqs: std::collections::HashMap::new(),
250            files: std::collections::HashMap::new(),
251        };
252
253        let result = apply_artifacts(&artifacts, Some(&mut index), None, None);
254        assert_eq!(result.chunks_indexed, 2);
255        assert_eq!(index.doc_count, 2);
256        assert_eq!(index.external_chunk_count(), 2);
257    }
258
259    #[test]
260    fn apply_artifacts_to_graph() {
261        let chunks = sample_chunks();
262        let artifacts = consolidate(&chunks);
263
264        let mut edges: Vec<IndexEdge> = Vec::new();
265        let result = apply_artifacts(&artifacts, None, Some(&mut edges), None);
266
267        assert!(result.edges_created > 0);
268        assert!(!edges.is_empty());
269        assert!(edges.iter().any(|e| e.to == "src/auth.rs"));
270    }
271
272    #[test]
273    fn apply_artifacts_to_session_cache() {
274        let chunks = sample_chunks();
275        let artifacts = consolidate(&chunks);
276
277        let mut cache = SessionCache::new();
278        let result = apply_artifacts(&artifacts, None, None, Some(&mut cache));
279
280        assert_eq!(result.cache_entries_stored, 2);
281        assert!(cache.get("github://issues/42").is_some());
282        assert!(cache.get("github://pull_requests/100").is_some());
283    }
284
285    #[test]
286    fn apply_artifacts_to_all_systems() {
287        let chunks = sample_chunks();
288        let artifacts = consolidate(&chunks);
289
290        let mut index = BM25Index {
291            chunks: Vec::new(),
292            inverted: std::collections::HashMap::new(),
293            avg_doc_len: 0.0,
294            doc_count: 0,
295            doc_freqs: std::collections::HashMap::new(),
296            files: std::collections::HashMap::new(),
297        };
298        let mut edges: Vec<IndexEdge> = Vec::new();
299        let mut cache = SessionCache::new();
300
301        let result = apply_artifacts(
302            &artifacts,
303            Some(&mut index),
304            Some(&mut edges),
305            Some(&mut cache),
306        );
307
308        assert!(result.chunks_indexed > 0);
309        assert!(result.edges_created > 0);
310        assert!(result.facts_extracted > 0);
311        assert!(result.cache_entries_stored > 0);
312    }
313}