1use rusqlite::Connection;
15
16pub const HIGH_SIGNAL_TYPES: [&str; 3] = ["decision", "rejection", "constraint"];
20
21const SCHEMA: &str = r#"
22CREATE TABLE IF NOT EXISTS global_memory (
23 event_id TEXT PRIMARY KEY,
24 project_hash TEXT NOT NULL,
25 task_id TEXT NOT NULL,
26 type TEXT NOT NULL,
27 tier TEXT NOT NULL DEFAULT 'episodic',
28 text TEXT NOT NULL,
29 model TEXT NOT NULL,
30 dim INTEGER NOT NULL,
31 vec BLOB NOT NULL,
32 created_at TEXT NOT NULL,
33 superseded INTEGER NOT NULL DEFAULT 0
34);
35CREATE INDEX IF NOT EXISTS idx_gm_type ON global_memory(type);
36CREATE INDEX IF NOT EXISTS idx_gm_model ON global_memory(model);
37CREATE VIRTUAL TABLE IF NOT EXISTS global_fts USING fts5(event_id UNINDEXED, text);
38"#;
39
40pub fn open(path: impl AsRef<std::path::Path>) -> anyhow::Result<Connection> {
42 if let Some(parent) = path.as_ref().parent() {
43 std::fs::create_dir_all(parent)?;
44 }
45 let conn = Connection::open(path)?;
46 conn.execute_batch(SCHEMA)?;
47 Ok(conn)
48}
49
50pub struct GlobalHit {
52 pub event_id: String,
53 pub project_hash: String,
54 pub task_id: String,
55 pub event_type: String,
56 pub tier: String,
57 pub text: String,
58 pub score: f32,
59}
60
61pub fn sync_from_project(
67 global: &Connection,
68 project: &Connection,
69 project_hash: &str,
70) -> anyhow::Result<usize> {
71 let placeholders = HIGH_SIGNAL_TYPES
72 .iter()
73 .map(|_| "?")
74 .collect::<Vec<_>>()
75 .join(",");
76 let sql = format!(
77 "SELECT e.event_id, e.task_id, f.type, e.tier, f.text, e.model, e.dim, e.vec, e.created_at,
78 CASE WHEN d.superseded_by IS NOT NULL THEN 1 ELSE 0 END
79 FROM embeddings e
80 JOIN search_fts f ON f.event_id = e.event_id
81 LEFT JOIN decisions d ON d.decision_id = e.event_id
82 WHERE f.type IN ({placeholders})"
83 );
84 let mut stmt = project.prepare(&sql)?;
85 let rows = stmt.query_map(rusqlite::params_from_iter(HIGH_SIGNAL_TYPES.iter()), |r| {
86 Ok((
87 r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?, r.get::<_, String>(3)?, r.get::<_, String>(4)?, r.get::<_, String>(5)?, r.get::<_, i64>(6)?, r.get::<_, Vec<u8>>(7)?, r.get::<_, String>(8)?, r.get::<_, i64>(9)?, ))
98 })?;
99
100 let mut n = 0usize;
101 for row in rows {
102 let (event_id, task_id, ty, tier, text, model, dim, vec, created_at, superseded) = row?;
103 global.execute(
104 "INSERT OR REPLACE INTO global_memory
105 (event_id, project_hash, task_id, type, tier, text, model, dim, vec, created_at, superseded)
106 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
107 rusqlite::params![
108 event_id, project_hash, task_id, ty, tier, text, model, dim, vec, created_at, superseded
109 ],
110 )?;
111 global.execute(
113 "DELETE FROM global_fts WHERE event_id = ?1",
114 rusqlite::params![event_id],
115 )?;
116 global.execute(
117 "INSERT INTO global_fts(event_id, text) VALUES (?1, ?2)",
118 rusqlite::params![event_id, text],
119 )?;
120 n += 1;
121 }
122 Ok(n)
123}
124
125pub fn keyword_search(conn: &Connection, prompt: &str, k: usize) -> anyhow::Result<Vec<GlobalHit>> {
130 let mut seen = std::collections::HashSet::new();
131 let terms: Vec<String> = prompt
132 .split(|c: char| !c.is_alphanumeric())
133 .filter(|t| t.chars().count() >= 4)
134 .map(|t| t.to_lowercase())
135 .filter(|t| seen.insert(t.clone()))
136 .collect();
137 if terms.is_empty() {
138 return Ok(Vec::new());
139 }
140 let query = terms.join(" OR ");
141 let mut stmt = conn.prepare(
142 "SELECT g.event_id, g.project_hash, g.task_id, g.type, g.tier, g.text, g.superseded,
143 bm25(global_fts)
144 FROM global_fts
145 JOIN global_memory g ON g.event_id = global_fts.event_id
146 WHERE global_fts MATCH ?1
147 ORDER BY bm25(global_fts)
148 LIMIT ?2",
149 )?;
150 let rows = stmt.query_map(rusqlite::params![query, k as i64], |r| {
151 let bm: f64 = r.get(7)?;
152 let superseded: i64 = r.get(6)?;
153 let score = (-bm) as f32 - if superseded != 0 { 0.5 } else { 0.0 };
156 Ok(GlobalHit {
157 event_id: r.get(0)?,
158 project_hash: r.get(1)?,
159 task_id: r.get(2)?,
160 event_type: r.get(3)?,
161 tier: r.get(4)?,
162 text: r.get(5)?,
163 score,
164 })
165 })?;
166 let mut out = Vec::new();
167 for row in rows {
168 out.push(row?);
169 }
170 Ok(out)
171}
172
173pub fn search(
177 conn: &Connection,
178 query_vec: &[f32],
179 model: &str,
180 k: usize,
181) -> anyhow::Result<Vec<GlobalHit>> {
182 let mut stmt = conn.prepare(
183 "SELECT event_id, project_hash, task_id, type, tier, text, vec, superseded
184 FROM global_memory WHERE model = ?1",
185 )?;
186 let rows = stmt.query_map(rusqlite::params![model], |r| {
187 Ok((
188 r.get::<_, String>(0)?,
189 r.get::<_, String>(1)?,
190 r.get::<_, String>(2)?,
191 r.get::<_, String>(3)?,
192 r.get::<_, String>(4)?,
193 r.get::<_, String>(5)?,
194 r.get::<_, Vec<u8>>(6)?,
195 r.get::<_, i64>(7)?,
196 ))
197 })?;
198
199 let mut hits = Vec::new();
200 for row in rows {
201 let (event_id, project_hash, task_id, event_type, tier, text, blob, superseded) = row?;
202 let mut score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
203 if superseded != 0 {
204 score -= 0.1; }
206 hits.push(GlobalHit {
207 event_id,
208 project_hash,
209 task_id,
210 event_type,
211 tier,
212 text,
213 score,
214 });
215 }
216 hits.sort_by(|a, b| {
217 b.score
218 .partial_cmp(&a.score)
219 .unwrap_or(std::cmp::Ordering::Equal)
220 });
221 hits.truncate(k);
222 Ok(hits)
223}
224
225pub fn count(conn: &Connection) -> anyhow::Result<usize> {
227 let n: i64 = conn.query_row("SELECT COUNT(*) FROM global_memory", [], |r| r.get(0))?;
228 Ok(n as usize)
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::embed::Embedder;
235
236 fn finding(text: &str) -> crate::event::Event {
237 crate::event::Event::new(
239 "tj-x",
240 crate::event::EventType::Decision,
241 crate::event::Author::User,
242 crate::event::Source::Cli,
243 text.into(),
244 )
245 }
246
247 #[test]
248 fn sync_then_search_finds_cross_project_decision() {
249 let d = tempfile::TempDir::new().unwrap();
250 let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
251 let global = open(d.path().join("memory.sqlite")).unwrap();
252 let emb = crate::embed::HashEmbedder::new(256);
253
254 for text in [
255 "chose to route refunds through the idempotent payment ledger",
256 "use postgres advisory locks for the cron job leader election",
257 ] {
258 crate::db::index_event(&proj, &finding(text)).unwrap();
259 }
260 crate::db::embed_pending(&proj, "projhash", &emb, "t", 100).unwrap();
261
262 let synced = sync_from_project(&global, &proj, "projhash").unwrap();
263 assert_eq!(synced, 2);
264 assert_eq!(count(&global).unwrap(), 2);
265
266 let q = emb.embed_one("refund ledger idempotent").unwrap();
267 let hits = search(&global, &q, emb.model_id(), 5).unwrap();
268 assert!(!hits.is_empty());
269 assert!(
270 hits[0].text.contains("refund"),
271 "the refund decision must rank first across the global index, got: {}",
272 hits[0].text
273 );
274 assert_eq!(hits[0].project_hash, "projhash");
275 }
276
277 #[test]
278 fn keyword_search_matches_prompt_terms() {
279 let d = tempfile::TempDir::new().unwrap();
280 let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
281 let global = open(d.path().join("memory.sqlite")).unwrap();
282 let emb = crate::embed::HashEmbedder::new(64);
283 crate::db::index_event(
284 &proj,
285 &finding("chose the idempotent payment ledger for refunds"),
286 )
287 .unwrap();
288 crate::db::index_event(
289 &proj,
290 &finding("rejected kafka for the audit log; too heavy"),
291 )
292 .unwrap();
293 crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
294 sync_from_project(&global, &proj, "ph").unwrap();
295
296 let hits = keyword_search(&global, "should we add a refund ledger here?", 5).unwrap();
297 assert!(
298 !hits.is_empty(),
299 "prompt terms must match the ledger decision"
300 );
301 assert!(hits[0].text.contains("ledger"));
302
303 assert!(keyword_search(&global, "tiny ui css fix", 5)
305 .unwrap()
306 .is_empty());
307 }
308
309 #[test]
310 fn search_filters_by_model() {
311 let d = tempfile::TempDir::new().unwrap();
312 let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
313 let global = open(d.path().join("memory.sqlite")).unwrap();
314 let emb = crate::embed::HashEmbedder::new(64);
315 crate::db::index_event(&proj, &finding("decided to adopt the outbox pattern")).unwrap();
316 crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
317 sync_from_project(&global, &proj, "ph").unwrap();
318
319 let q = emb.embed_one("outbox").unwrap();
320 assert_eq!(search(&global, &q, "other-model", 5).unwrap().len(), 0);
321 assert_eq!(search(&global, &q, emb.model_id(), 5).unwrap().len(), 1);
322 }
323}