use rusqlite::Connection;
pub const HIGH_SIGNAL_TYPES: [&str; 3] = ["decision", "rejection", "constraint"];
const SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS global_memory (
event_id TEXT PRIMARY KEY,
project_hash TEXT NOT NULL,
task_id TEXT NOT NULL,
type TEXT NOT NULL,
tier TEXT NOT NULL DEFAULT 'episodic',
text TEXT NOT NULL,
model TEXT NOT NULL,
dim INTEGER NOT NULL,
vec BLOB NOT NULL,
created_at TEXT NOT NULL,
superseded INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_gm_type ON global_memory(type);
CREATE INDEX IF NOT EXISTS idx_gm_model ON global_memory(model);
CREATE VIRTUAL TABLE IF NOT EXISTS global_fts USING fts5(event_id UNINDEXED, text);
"#;
pub fn open(path: impl AsRef<std::path::Path>) -> anyhow::Result<Connection> {
if let Some(parent) = path.as_ref().parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(path)?;
conn.execute_batch(SCHEMA)?;
Ok(conn)
}
pub struct GlobalHit {
pub event_id: String,
pub project_hash: String,
pub task_id: String,
pub event_type: String,
pub tier: String,
pub text: String,
pub score: f32,
}
pub fn sync_from_project(
global: &Connection,
project: &Connection,
project_hash: &str,
) -> anyhow::Result<usize> {
let placeholders = HIGH_SIGNAL_TYPES
.iter()
.map(|_| "?")
.collect::<Vec<_>>()
.join(",");
let sql = format!(
"SELECT e.event_id, e.task_id, f.type, e.tier, f.text, e.model, e.dim, e.vec, e.created_at,
CASE WHEN d.superseded_by IS NOT NULL THEN 1 ELSE 0 END
FROM embeddings e
JOIN search_fts f ON f.event_id = e.event_id
LEFT JOIN decisions d ON d.decision_id = e.event_id
WHERE f.type IN ({placeholders})"
);
let mut stmt = project.prepare(&sql)?;
let rows = stmt.query_map(rusqlite::params_from_iter(HIGH_SIGNAL_TYPES.iter()), |r| {
Ok((
r.get::<_, String>(0)?, r.get::<_, String>(1)?, r.get::<_, String>(2)?, r.get::<_, String>(3)?, r.get::<_, String>(4)?, r.get::<_, String>(5)?, r.get::<_, i64>(6)?, r.get::<_, Vec<u8>>(7)?, r.get::<_, String>(8)?, r.get::<_, i64>(9)?, ))
})?;
let mut n = 0usize;
for row in rows {
let (event_id, task_id, ty, tier, text, model, dim, vec, created_at, superseded) = row?;
global.execute(
"INSERT OR REPLACE INTO global_memory
(event_id, project_hash, task_id, type, tier, text, model, dim, vec, created_at, superseded)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
rusqlite::params![
event_id, project_hash, task_id, ty, tier, text, model, dim, vec, created_at, superseded
],
)?;
global.execute(
"DELETE FROM global_fts WHERE event_id = ?1",
rusqlite::params![event_id],
)?;
global.execute(
"INSERT INTO global_fts(event_id, text) VALUES (?1, ?2)",
rusqlite::params![event_id, text],
)?;
n += 1;
}
Ok(n)
}
pub fn keyword_search(conn: &Connection, prompt: &str, k: usize) -> anyhow::Result<Vec<GlobalHit>> {
let mut seen = std::collections::HashSet::new();
let terms: Vec<String> = prompt
.split(|c: char| !c.is_alphanumeric())
.filter(|t| t.chars().count() >= 4)
.map(|t| t.to_lowercase())
.filter(|t| seen.insert(t.clone()))
.collect();
if terms.is_empty() {
return Ok(Vec::new());
}
let query = terms.join(" OR ");
let mut stmt = conn.prepare(
"SELECT g.event_id, g.project_hash, g.task_id, g.type, g.tier, g.text, g.superseded,
bm25(global_fts)
FROM global_fts
JOIN global_memory g ON g.event_id = global_fts.event_id
WHERE global_fts MATCH ?1
ORDER BY bm25(global_fts)
LIMIT ?2",
)?;
let rows = stmt.query_map(rusqlite::params![query, k as i64], |r| {
let bm: f64 = r.get(7)?;
let superseded: i64 = r.get(6)?;
let score = (-bm) as f32 - if superseded != 0 { 0.5 } else { 0.0 };
Ok(GlobalHit {
event_id: r.get(0)?,
project_hash: r.get(1)?,
task_id: r.get(2)?,
event_type: r.get(3)?,
tier: r.get(4)?,
text: r.get(5)?,
score,
})
})?;
let mut out = Vec::new();
for row in rows {
out.push(row?);
}
Ok(out)
}
pub fn search(
conn: &Connection,
query_vec: &[f32],
model: &str,
k: usize,
) -> anyhow::Result<Vec<GlobalHit>> {
let mut stmt = conn.prepare(
"SELECT event_id, project_hash, task_id, type, tier, text, vec, superseded
FROM global_memory WHERE model = ?1",
)?;
let rows = stmt.query_map(rusqlite::params![model], |r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
r.get::<_, String>(3)?,
r.get::<_, String>(4)?,
r.get::<_, String>(5)?,
r.get::<_, Vec<u8>>(6)?,
r.get::<_, i64>(7)?,
))
})?;
let mut hits = Vec::new();
for row in rows {
let (event_id, project_hash, task_id, event_type, tier, text, blob, superseded) = row?;
let mut score = crate::embed::cosine(query_vec, &crate::embed::from_blob(&blob));
if superseded != 0 {
score -= 0.1; }
hits.push(GlobalHit {
event_id,
project_hash,
task_id,
event_type,
tier,
text,
score,
});
}
hits.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
hits.truncate(k);
Ok(hits)
}
pub fn count(conn: &Connection) -> anyhow::Result<usize> {
let n: i64 = conn.query_row("SELECT COUNT(*) FROM global_memory", [], |r| r.get(0))?;
Ok(n as usize)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::embed::Embedder;
fn finding(text: &str) -> crate::event::Event {
crate::event::Event::new(
"tj-x",
crate::event::EventType::Decision,
crate::event::Author::User,
crate::event::Source::Cli,
text.into(),
)
}
#[test]
fn sync_then_search_finds_cross_project_decision() {
let d = tempfile::TempDir::new().unwrap();
let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
let global = open(d.path().join("memory.sqlite")).unwrap();
let emb = crate::embed::HashEmbedder::new(256);
for text in [
"chose to route refunds through the idempotent payment ledger",
"use postgres advisory locks for the cron job leader election",
] {
crate::db::index_event(&proj, &finding(text)).unwrap();
}
crate::db::embed_pending(&proj, "projhash", &emb, "t", 100).unwrap();
let synced = sync_from_project(&global, &proj, "projhash").unwrap();
assert_eq!(synced, 2);
assert_eq!(count(&global).unwrap(), 2);
let q = emb.embed_one("refund ledger idempotent").unwrap();
let hits = search(&global, &q, emb.model_id(), 5).unwrap();
assert!(!hits.is_empty());
assert!(
hits[0].text.contains("refund"),
"the refund decision must rank first across the global index, got: {}",
hits[0].text
);
assert_eq!(hits[0].project_hash, "projhash");
}
#[test]
fn keyword_search_matches_prompt_terms() {
let d = tempfile::TempDir::new().unwrap();
let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
let global = open(d.path().join("memory.sqlite")).unwrap();
let emb = crate::embed::HashEmbedder::new(64);
crate::db::index_event(
&proj,
&finding("chose the idempotent payment ledger for refunds"),
)
.unwrap();
crate::db::index_event(
&proj,
&finding("rejected kafka for the audit log; too heavy"),
)
.unwrap();
crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
sync_from_project(&global, &proj, "ph").unwrap();
let hits = keyword_search(&global, "should we add a refund ledger here?", 5).unwrap();
assert!(
!hits.is_empty(),
"prompt terms must match the ledger decision"
);
assert!(hits[0].text.contains("ledger"));
assert!(keyword_search(&global, "tiny ui css fix", 5)
.unwrap()
.is_empty());
}
#[test]
fn search_filters_by_model() {
let d = tempfile::TempDir::new().unwrap();
let proj = crate::db::open(d.path().join("p.sqlite")).unwrap();
let global = open(d.path().join("memory.sqlite")).unwrap();
let emb = crate::embed::HashEmbedder::new(64);
crate::db::index_event(&proj, &finding("decided to adopt the outbox pattern")).unwrap();
crate::db::embed_pending(&proj, "ph", &emb, "t", 100).unwrap();
sync_from_project(&global, &proj, "ph").unwrap();
let q = emb.embed_one("outbox").unwrap();
assert_eq!(search(&global, &q, "other-model", 5).unwrap().len(), 0);
assert_eq!(search(&global, &q, emb.model_id(), 5).unwrap().len(), 1);
}
}