1use rusqlite::Connection;
15
16pub const HIGH_SIGNAL_TYPES: [&str; 3] = ["decision", "rejection", "constraint"];
20
21const SCHEMA: &str = r#"
22CREATE TABLE IF NOT EXISTS global_memory (
23 event_id TEXT PRIMARY KEY,
24 project_hash TEXT NOT NULL,
25 task_id TEXT NOT NULL,
26 type TEXT NOT NULL,
27 tier TEXT NOT NULL DEFAULT 'episodic',
28 text TEXT NOT NULL,
29 model TEXT NOT NULL,
30 dim INTEGER NOT NULL,
31 vec BLOB NOT NULL,
32 created_at TEXT NOT NULL,
33 superseded INTEGER NOT NULL DEFAULT 0
34);
35CREATE INDEX IF NOT EXISTS idx_gm_type ON global_memory(type);
36CREATE INDEX IF NOT EXISTS idx_gm_model ON global_memory(model);
37CREATE VIRTUAL TABLE IF NOT EXISTS global_fts USING fts5(event_id UNINDEXED, text);
38CREATE TABLE IF NOT EXISTS preferences (
39 id INTEGER PRIMARY KEY AUTOINCREMENT,
40 text TEXT NOT NULL UNIQUE,
41 created_at TEXT NOT NULL
42);
43"#;
44
45pub fn open(path: impl AsRef<std::path::Path>) -> anyhow::Result<Connection> {
47 if let Some(parent) = path.as_ref().parent() {
48 std::fs::create_dir_all(parent)?;
49 }
50 let conn = Connection::open(path)?;
51 conn.execute_batch(SCHEMA)?;
52 Ok(conn)
53}
54
55pub struct GlobalHit {
57 pub event_id: String,
58 pub project_hash: String,
59 pub task_id: String,
60 pub event_type: String,
61 pub tier: String,
62 pub text: String,
63 pub score: f32,
64}
65
66pub fn sync_from_project(
72 global: &Connection,
73 project: &Connection,
74 project_hash: &str,
75) -> anyhow::Result<usize> {
76 let placeholders = HIGH_SIGNAL_TYPES
77 .iter()
78 .map(|_| "?")
79 .collect::<Vec<_>>()
80 .join(",");
81 let sql = format!(
82 "SELECT e.event_id, e.task_id, f.type, e.tier, f.text, e.model, e.dim, e.vec, e.created_at,
83 CASE WHEN d.superseded_by IS NOT NULL THEN 1 ELSE 0 END
84 FROM embeddings e
85 JOIN search_fts f ON f.event_id = e.event_id
86 LEFT JOIN decisions d ON d.decision_id = e.event_id
87 WHERE f.type IN ({placeholders})"
88 );
89 let mut stmt = project.prepare(&sql)?;
90 let rows = stmt.query_map(rusqlite::params_from_iter(HIGH_SIGNAL_TYPES.iter()), |r| {
91 Ok((
92 r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?, r.get::<_, String>(3)?, r.get::<_, String>(4)?, r.get::<_, String>(5)?, r.get::<_, i64>(6)?, r.get::<_, Vec<u8>>(7)?, r.get::<_, String>(8)?, r.get::<_, i64>(9)?, ))
103 })?;
104
105 let mut n = 0usize;
106 for row in rows {
107 let (event_id, task_id, ty, tier, text, model, dim, vec, created_at, superseded) = row?;
108 global.execute(
109 "INSERT OR REPLACE INTO global_memory
110 (event_id, project_hash, task_id, type, tier, text, model, dim, vec, created_at, superseded)
111 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
112 rusqlite::params![
113 event_id, project_hash, task_id, ty, tier, text, model, dim, vec, created_at, superseded
114 ],
115 )?;
116 global.execute(
118 "DELETE FROM global_fts WHERE event_id = ?1",
119 rusqlite::params![event_id],
120 )?;
121 global.execute(
122 "INSERT INTO global_fts(event_id, text) VALUES (?1, ?2)",
123 rusqlite::params![event_id, text],
124 )?;
125 n += 1;
126 }
127 Ok(n)
128}
129
130pub fn keyword_search(conn: &Connection, prompt: &str, k: usize) -> anyhow::Result<Vec<GlobalHit>> {
135 let mut seen = std::collections::HashSet::new();
136 let terms: Vec<String> = prompt
137 .split(|c: char| !c.is_alphanumeric())
138 .filter(|t| t.chars().count() >= 4)
139 .map(|t| t.to_lowercase())
140 .filter(|t| seen.insert(t.clone()))
141 .collect();
142 if terms.is_empty() {
143 return Ok(Vec::new());
144 }
145 let query = terms.join(" OR ");
146 let mut stmt = conn.prepare(
147 "SELECT g.event_id, g.project_hash, g.task_id, g.type, g.tier, g.text, g.superseded,
148 bm25(global_fts)
149 FROM global_fts
150 JOIN global_memory g ON g.event_id = global_fts.event_id
151 WHERE global_fts MATCH ?1
152 ORDER BY bm25(global_fts)
153 LIMIT ?2",
154 )?;
155 let rows = stmt.query_map(rusqlite::params![query, k as i64], |r| {
156 let bm: f64 = r.get(7)?;
157 let superseded: i64 = r.get(6)?;
158 let score = (-bm) as f32 - if superseded != 0 { 0.5 } else { 0.0 };
161 Ok(GlobalHit {
162 event_id: r.get(0)?,
163 project_hash: r.get(1)?,
164 task_id: r.get(2)?,
165 event_type: r.get(3)?,
166 tier: r.get(4)?,
167 text: r.get(5)?,
168 score,
169 })
170 })?;
171 let mut out = Vec::new();
172 for row in rows {
173 out.push(row?);
174 }
175 Ok(out)
176}
177
178pub fn search(
182 conn: &Connection,
183 query_vec: &[f32],
184 model: &str,
185 k: usize,
186) -> anyhow::Result<Vec<GlobalHit>> {
187 let mut stmt = conn.prepare(
188 "SELECT event_id, project_hash, task_id, type, tier, text, vec, superseded
189 FROM global_memory WHERE model = ?1",
190 )?;
191 let rows = stmt.query_map(rusqlite::params![model], |r| {
192 Ok((
193 r.get::<_, String>(0)?,
194 r.get::<_, String>(1)?,
195 r.get::<_, String>(2)?,
196 r.get::<_, String>(3)?,
197 r.get::<_, String>(4)?,
198 r.get::<_, String>(5)?,
199 r.get::<_, Vec<u8>>(6)?,
200 r.get::<_, i64>(7)?,
201 ))
202 })?;
203
204 let mut hits = Vec::new();
205 for row in rows {
206 let (event_id, project_hash, task_id, event_type, tier, text, blob, superseded) = row?;
207 let mut score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
208 if superseded != 0 {
209 score -= 0.1; }
211 hits.push(GlobalHit {
212 event_id,
213 project_hash,
214 task_id,
215 event_type,
216 tier,
217 text,
218 score,
219 });
220 }
221 hits.sort_by(|a, b| {
222 b.score
223 .partial_cmp(&a.score)
224 .unwrap_or(std::cmp::Ordering::Equal)
225 });
226 hits.truncate(k);
227 Ok(hits)
228}
229
230pub fn count(conn: &Connection) -> anyhow::Result<usize> {
232 let n: i64 = conn.query_row("SELECT COUNT(*) FROM global_memory", [], |r| r.get(0))?;
233 Ok(n as usize)
234}
235
236pub fn add_preference(conn: &Connection, text: &str, created_at: &str) -> anyhow::Result<bool> {
244 let trimmed = text.trim();
245 if trimmed.is_empty() {
246 anyhow::bail!("preference text is empty");
247 }
248 let changed = conn.execute(
249 "INSERT OR IGNORE INTO preferences(text, created_at) VALUES (?1, ?2)",
250 rusqlite::params![trimmed, created_at],
251 )?;
252 Ok(changed > 0)
253}
254
255pub fn list_preferences(conn: &Connection) -> anyhow::Result<Vec<String>> {
257 let mut stmt = conn.prepare("SELECT text FROM preferences ORDER BY id")?;
258 let rows = stmt.query_map([], |r| r.get::<_, String>(0))?;
259 let mut out = Vec::new();
260 for r in rows {
261 out.push(r?);
262 }
263 Ok(out)
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::embed::Embedder;
270
271 fn finding(text: &str) -> crate::event::Event {
272 crate::event::Event::new(
274 "tj-x",
275 crate::event::EventType::Decision,
276 crate::event::Author::User,
277 crate::event::Source::Cli,
278 text.into(),
279 )
280 }
281
282 #[test]
283 fn sync_then_search_finds_cross_project_decision() {
284 let d = tempfile::TempDir::new().unwrap();
285 let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
286 let global = open(d.path().join("memory.sqlite")).unwrap();
287 let emb = crate::embed::HashEmbedder::new(256);
288
289 for text in [
290 "chose to route refunds through the idempotent payment ledger",
291 "use postgres advisory locks for the cron job leader election",
292 ] {
293 crate::db::index_event(&proj, &finding(text)).unwrap();
294 }
295 crate::db::embed_pending(&proj, "projhash", &emb, "t", 100).unwrap();
296
297 let synced = sync_from_project(&global, &proj, "projhash").unwrap();
298 assert_eq!(synced, 2);
299 assert_eq!(count(&global).unwrap(), 2);
300
301 let q = emb.embed_one("refund ledger idempotent").unwrap();
302 let hits = search(&global, &q, emb.model_id(), 5).unwrap();
303 assert!(!hits.is_empty());
304 assert!(
305 hits[0].text.contains("refund"),
306 "the refund decision must rank first across the global index, got: {}",
307 hits[0].text
308 );
309 assert_eq!(hits[0].project_hash, "projhash");
310 }
311
312 #[test]
313 fn keyword_search_matches_prompt_terms() {
314 let d = tempfile::TempDir::new().unwrap();
315 let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
316 let global = open(d.path().join("memory.sqlite")).unwrap();
317 let emb = crate::embed::HashEmbedder::new(64);
318 crate::db::index_event(
319 &proj,
320 &finding("chose the idempotent payment ledger for refunds"),
321 )
322 .unwrap();
323 crate::db::index_event(
324 &proj,
325 &finding("rejected kafka for the audit log; too heavy"),
326 )
327 .unwrap();
328 crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
329 sync_from_project(&global, &proj, "ph").unwrap();
330
331 let hits = keyword_search(&global, "should we add a refund ledger here?", 5).unwrap();
332 assert!(
333 !hits.is_empty(),
334 "prompt terms must match the ledger decision"
335 );
336 assert!(hits[0].text.contains("ledger"));
337
338 assert!(keyword_search(&global, "tiny ui css fix", 5)
340 .unwrap()
341 .is_empty());
342 }
343
344 #[test]
345 fn preferences_store_dedup_and_list_in_order() {
346 let d = tempfile::TempDir::new().unwrap();
347 let g = open(d.path().join("memory.sqlite")).unwrap();
348 assert!(add_preference(&g, "prefer terse output", "t1").unwrap());
349 assert!(add_preference(&g, "respond in Russian", "t2").unwrap());
350 assert!(!add_preference(&g, "prefer terse output", "t3").unwrap());
352 assert_eq!(
353 list_preferences(&g).unwrap(),
354 vec![
355 "prefer terse output".to_string(),
356 "respond in Russian".to_string()
357 ]
358 );
359 }
360
361 #[test]
362 fn search_filters_by_model() {
363 let d = tempfile::TempDir::new().unwrap();
364 let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
365 let global = open(d.path().join("memory.sqlite")).unwrap();
366 let emb = crate::embed::HashEmbedder::new(64);
367 crate::db::index_event(&proj, &finding("decided to adopt the outbox pattern")).unwrap();
368 crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
369 sync_from_project(&global, &proj, "ph").unwrap();
370
371 let q = emb.embed_one("outbox").unwrap();
372 assert_eq!(search(&global, &q, "other-model", 5).unwrap().len(), 0);
373 assert_eq!(search(&global, &q, emb.model_id(), 5).unwrap().len(), 1);
374 }
375}