Skip to main content

tj_core/
memory.rs

1//! Global cross-project memory index (Pillar B).
2//!
3//! A single SQLite file (`data_dir/memory.sqlite`) mirrors the *high-signal*
4//! events — decisions, rejections, constraints (and, later, consolidated
5//! semantic/procedural/preference facts) — from every project, together with
6//! their embeddings. This is what lets the agent recall relevant prior
7//! reasoning across its whole history, not just the current repo — the thing
8//! single-project memory tools can't do.
9//!
10//! The index is a denormalised cache: the per-project JSONL logs remain the
11//! source of truth. It is rebuilt idempotently by [`sync_from_project`] and
12//! queried by [`search`].
13
14use rusqlite::Connection;
15
16/// Event types worth surfacing proactively: a committed choice, a ruled-out
17/// path, or an external limit. These are the reasoning the agent most wants
18/// before repeating itself.
19pub const HIGH_SIGNAL_TYPES: [&str; 3] = ["decision", "rejection", "constraint"];
20
21const SCHEMA: &str = r#"
22CREATE TABLE IF NOT EXISTS global_memory (
23  event_id     TEXT PRIMARY KEY,
24  project_hash TEXT NOT NULL,
25  task_id      TEXT NOT NULL,
26  type         TEXT NOT NULL,
27  tier         TEXT NOT NULL DEFAULT 'episodic',
28  text         TEXT NOT NULL,
29  model        TEXT NOT NULL,
30  dim          INTEGER NOT NULL,
31  vec          BLOB NOT NULL,
32  created_at   TEXT NOT NULL,
33  superseded   INTEGER NOT NULL DEFAULT 0
34);
35CREATE INDEX IF NOT EXISTS idx_gm_type ON global_memory(type);
36CREATE INDEX IF NOT EXISTS idx_gm_model ON global_memory(model);
37CREATE VIRTUAL TABLE IF NOT EXISTS global_fts USING fts5(event_id UNINDEXED, text);
38CREATE TABLE IF NOT EXISTS preferences (
39  id         INTEGER PRIMARY KEY AUTOINCREMENT,
40  text       TEXT NOT NULL UNIQUE,
41  created_at TEXT NOT NULL
42);
43"#;
44
45/// Open (creating + migrating) the global memory database at `path`.
46pub fn open(path: impl AsRef<std::path::Path>) -> anyhow::Result<Connection> {
47    if let Some(parent) = path.as_ref().parent() {
48        std::fs::create_dir_all(parent)?;
49    }
50    let conn = Connection::open(path)?;
51    conn.execute_batch(SCHEMA)?;
52    Ok(conn)
53}
54
55/// A cross-project recall hit.
56pub struct GlobalHit {
57    pub event_id: String,
58    pub project_hash: String,
59    pub task_id: String,
60    pub event_type: String,
61    pub tier: String,
62    pub text: String,
63    pub score: f32,
64}
65
66/// Copy this project's high-signal embedded events into the global index.
67/// Idempotent (`INSERT OR REPLACE` on `event_id`); call after embedding a
68/// project. Returns how many rows were synced. `superseded` is flagged from the
69/// `decisions.superseded_by` projection so contradicted decisions can be
70/// down-ranked at query time.
71pub fn sync_from_project(
72    global: &Connection,
73    project: &Connection,
74    project_hash: &str,
75) -> anyhow::Result<usize> {
76    let placeholders = HIGH_SIGNAL_TYPES
77        .iter()
78        .map(|_| "?")
79        .collect::<Vec<_>>()
80        .join(",");
81    let sql = format!(
82        "SELECT e.event_id, e.task_id, f.type, e.tier, f.text, e.model, e.dim, e.vec, e.created_at,
83                CASE WHEN d.superseded_by IS NOT NULL THEN 1 ELSE 0 END
84           FROM embeddings e
85           JOIN search_fts f ON f.event_id = e.event_id
86           LEFT JOIN decisions d ON d.decision_id = e.event_id
87          WHERE f.type IN ({placeholders})"
88    );
89    let mut stmt = project.prepare(&sql)?;
90    let rows = stmt.query_map(rusqlite::params_from_iter(HIGH_SIGNAL_TYPES.iter()), |r| {
91        Ok((
92            r.get::<_, String>(0)?,  // event_id
93            r.get::<_, String>(1)?,  // task_id
94            r.get::<_, String>(2)?,  // type
95            r.get::<_, String>(3)?,  // tier
96            r.get::<_, String>(4)?,  // text
97            r.get::<_, String>(5)?,  // model
98            r.get::<_, i64>(6)?,     // dim
99            r.get::<_, Vec<u8>>(7)?, // vec
100            r.get::<_, String>(8)?,  // created_at
101            r.get::<_, i64>(9)?,     // superseded
102        ))
103    })?;
104
105    let mut n = 0usize;
106    for row in rows {
107        let (event_id, task_id, ty, tier, text, model, dim, vec, created_at, superseded) = row?;
108        global.execute(
109            "INSERT OR REPLACE INTO global_memory
110               (event_id, project_hash, task_id, type, tier, text, model, dim, vec, created_at, superseded)
111             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
112            rusqlite::params![
113                event_id, project_hash, task_id, ty, tier, text, model, dim, vec, created_at, superseded
114            ],
115        )?;
116        // Mirror into FTS5 for the fast keyword path (proactive hook).
117        global.execute(
118            "DELETE FROM global_fts WHERE event_id = ?1",
119            rusqlite::params![event_id],
120        )?;
121        global.execute(
122            "INSERT INTO global_fts(event_id, text) VALUES (?1, ?2)",
123            rusqlite::params![event_id, text],
124        )?;
125        n += 1;
126    }
127    Ok(n)
128}
129
130/// Fast keyword (FTS5) search over the global index — no embedding, so it's
131/// cheap enough to run on every prompt in the proactive hook (loading a model
132/// per prompt would be too slow). Builds an OR query from the prompt's
133/// alphanumeric tokens (≥4 chars) and ranks by BM25.
134pub fn keyword_search(conn: &Connection, prompt: &str, k: usize) -> anyhow::Result<Vec<GlobalHit>> {
135    let mut seen = std::collections::HashSet::new();
136    let terms: Vec<String> = prompt
137        .split(|c: char| !c.is_alphanumeric())
138        .filter(|t| t.chars().count() >= 4)
139        .map(|t| t.to_lowercase())
140        .filter(|t| seen.insert(t.clone()))
141        .collect();
142    if terms.is_empty() {
143        return Ok(Vec::new());
144    }
145    let query = terms.join(" OR ");
146    let mut stmt = conn.prepare(
147        "SELECT g.event_id, g.project_hash, g.task_id, g.type, g.tier, g.text, g.superseded,
148                bm25(global_fts)
149           FROM global_fts
150           JOIN global_memory g ON g.event_id = global_fts.event_id
151          WHERE global_fts MATCH ?1
152          ORDER BY bm25(global_fts)
153          LIMIT ?2",
154    )?;
155    let rows = stmt.query_map(rusqlite::params![query, k as i64], |r| {
156        let bm: f64 = r.get(7)?;
157        let superseded: i64 = r.get(6)?;
158        // BM25 is lower-is-better; negate so higher == more relevant, then
159        // nudge contradicted reasoning down.
160        let score = (-bm) as f32 - if superseded != 0 { 0.5 } else { 0.0 };
161        Ok(GlobalHit {
162            event_id: r.get(0)?,
163            project_hash: r.get(1)?,
164            task_id: r.get(2)?,
165            event_type: r.get(3)?,
166            tier: r.get(4)?,
167            text: r.get(5)?,
168            score,
169        })
170    })?;
171    let mut out = Vec::new();
172    for row in rows {
173        out.push(row?);
174    }
175    Ok(out)
176}
177
178/// Semantic search across the whole global index for the embedder's `model`.
179/// Returns the top `k` hits by cosine, with a small penalty applied to
180/// superseded/contradicted entries so live reasoning ranks above stale.
181pub fn search(
182    conn: &Connection,
183    query_vec: &[f32],
184    model: &str,
185    k: usize,
186) -> anyhow::Result<Vec<GlobalHit>> {
187    let mut stmt = conn.prepare(
188        "SELECT event_id, project_hash, task_id, type, tier, text, vec, superseded
189           FROM global_memory WHERE model = ?1",
190    )?;
191    let rows = stmt.query_map(rusqlite::params![model], |r| {
192        Ok((
193            r.get::<_, String>(0)?,
194            r.get::<_, String>(1)?,
195            r.get::<_, String>(2)?,
196            r.get::<_, String>(3)?,
197            r.get::<_, String>(4)?,
198            r.get::<_, String>(5)?,
199            r.get::<_, Vec<u8>>(6)?,
200            r.get::<_, i64>(7)?,
201        ))
202    })?;
203
204    let mut hits = Vec::new();
205    for row in rows {
206        let (event_id, project_hash, task_id, event_type, tier, text, blob, superseded) = row?;
207        let mut score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
208        if superseded != 0 {
209            score -= 0.1; // down-rank contradicted reasoning
210        }
211        hits.push(GlobalHit {
212            event_id,
213            project_hash,
214            task_id,
215            event_type,
216            tier,
217            text,
218            score,
219        });
220    }
221    hits.sort_by(|a, b| {
222        b.score
223            .partial_cmp(&a.score)
224            .unwrap_or(std::cmp::Ordering::Equal)
225    });
226    hits.truncate(k);
227    Ok(hits)
228}
229
230/// Count of indexed entries (test/stats helper).
231pub fn count(conn: &Connection) -> anyhow::Result<usize> {
232    let n: i64 = conn.query_row("SELECT COUNT(*) FROM global_memory", [], |r| r.get(0))?;
233    Ok(n as usize)
234}
235
236// ---------------------------------------------------------------------------
237// Preference tier (Pillar C): user-level, cross-project memory injected every
238// session — "I prefer terse output", "always use X here", etc.
239// ---------------------------------------------------------------------------
240
241/// Record a durable user preference. De-duplicated on text (a repeat is a
242/// no-op). Returns whether a new preference was stored.
243pub fn add_preference(conn: &Connection, text: &str, created_at: &str) -> anyhow::Result<bool> {
244    let trimmed = text.trim();
245    if trimmed.is_empty() {
246        anyhow::bail!("preference text is empty");
247    }
248    let changed = conn.execute(
249        "INSERT OR IGNORE INTO preferences(text, created_at) VALUES (?1, ?2)",
250        rusqlite::params![trimmed, created_at],
251    )?;
252    Ok(changed > 0)
253}
254
255/// All stored preferences, oldest first.
256pub fn list_preferences(conn: &Connection) -> anyhow::Result<Vec<String>> {
257    let mut stmt = conn.prepare("SELECT text FROM preferences ORDER BY id")?;
258    let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
259    let mut out = Vec::new();
260    for r in rows {
261        out.push(r?);
262    }
263    Ok(out)
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::embed::Embedder;
270
271    fn finding(text: &str) -> crate::event::Event {
272        // A decision event so it passes the HIGH_SIGNAL_TYPES filter.
273        crate::event::Event::new(
274            "tj-x",
275            crate::event::EventType::Decision,
276            crate::event::Author::User,
277            crate::event::Source::Cli,
278            text.into(),
279        )
280    }
281
282    #[test]
283    fn sync_then_search_finds_cross_project_decision() {
284        let d = tempfile::TempDir::new().unwrap();
285        let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
286        let global = open(d.path().join("memory.sqlite")).unwrap();
287        let emb = crate::embed::HashEmbedder::new(256);
288
289        for text in [
290            "chose to route refunds through the idempotent payment ledger",
291            "use postgres advisory locks for the cron job leader election",
292        ] {
293            crate::db::index_event(&proj, &finding(text)).unwrap();
294        }
295        crate::db::embed_pending(&proj, "projhash", &emb, "t", 100).unwrap();
296
297        let synced = sync_from_project(&global, &proj, "projhash").unwrap();
298        assert_eq!(synced, 2);
299        assert_eq!(count(&global).unwrap(), 2);
300
301        let q = emb.embed_one("refund ledger idempotent").unwrap();
302        let hits = search(&global, &q, emb.model_id(), 5).unwrap();
303        assert!(!hits.is_empty());
304        assert!(
305            hits[0].text.contains("refund"),
306            "the refund decision must rank first across the global index, got: {}",
307            hits[0].text
308        );
309        assert_eq!(hits[0].project_hash, "projhash");
310    }
311
312    #[test]
313    fn keyword_search_matches_prompt_terms() {
314        let d = tempfile::TempDir::new().unwrap();
315        let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
316        let global = open(d.path().join("memory.sqlite")).unwrap();
317        let emb = crate::embed::HashEmbedder::new(64);
318        crate::db::index_event(
319            &proj,
320            &finding("chose the idempotent payment ledger for refunds"),
321        )
322        .unwrap();
323        crate::db::index_event(
324            &proj,
325            &finding("rejected kafka for the audit log; too heavy"),
326        )
327        .unwrap();
328        crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
329        sync_from_project(&global, &proj, "ph").unwrap();
330
331        let hits = keyword_search(&global, "should we add a refund ledger here?", 5).unwrap();
332        assert!(
333            !hits.is_empty(),
334            "prompt terms must match the ledger decision"
335        );
336        assert!(hits[0].text.contains("ledger"));
337
338        // No overlapping ≥4-char term => no hit.
339        assert!(keyword_search(&global, "tiny ui css fix", 5)
340            .unwrap()
341            .is_empty());
342    }
343
344    #[test]
345    fn preferences_store_dedup_and_list_in_order() {
346        let d = tempfile::TempDir::new().unwrap();
347        let g = open(d.path().join("memory.sqlite")).unwrap();
348        assert!(add_preference(&g, "prefer terse output", "t1").unwrap());
349        assert!(add_preference(&g, "respond in Russian", "t2").unwrap());
350        // Duplicate is a no-op.
351        assert!(!add_preference(&g, "prefer terse output", "t3").unwrap());
352        assert_eq!(
353            list_preferences(&g).unwrap(),
354            vec![
355                "prefer terse output".to_string(),
356                "respond in Russian".to_string()
357            ]
358        );
359    }
360
361    #[test]
362    fn search_filters_by_model() {
363        let d = tempfile::TempDir::new().unwrap();
364        let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
365        let global = open(d.path().join("memory.sqlite")).unwrap();
366        let emb = crate::embed::HashEmbedder::new(64);
367        crate::db::index_event(&proj, &finding("decided to adopt the outbox pattern")).unwrap();
368        crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
369        sync_from_project(&global, &proj, "ph").unwrap();
370
371        let q = emb.embed_one("outbox").unwrap();
372        assert_eq!(search(&global, &q, "other-model", 5).unwrap().len(), 0);
373        assert_eq!(search(&global, &q, emb.model_id(), 5).unwrap().len(), 1);
374    }
375}