Skip to main content

bkb_store/
sqlite.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use rusqlite::Connection;
7use tokio::sync::Mutex;
8use tracing::debug;
9
10use bkb_core::model::{
11	Document, DocumentContext, RefType, Reference, SearchParams, SearchResult, SearchResults,
12	SourceType, SyncState, SyncStatus,
13};
14use bkb_core::schema::SCHEMA_SQL;
15use bkb_core::store::KnowledgeStore;
16
17/// SQLite-backed storage for the Bitcoin Knowledge Base.
18///
19/// Uses `rusqlite` with bundled FTS5 for full-text search.
20/// All database access is serialized through a `Mutex<Connection>`.
21pub struct SqliteStore {
22	conn: Arc<Mutex<Connection>>,
23}
24
25impl SqliteStore {
26	/// Open (or create) a database at the given path and run migrations.
27	pub fn open(path: &Path) -> Result<Self> {
28		let conn = Connection::open(path)?;
29		Self::init(conn)
30	}
31
32	/// Create an in-memory database (for tests).
33	pub fn open_in_memory() -> Result<Self> {
34		let conn = Connection::open_in_memory()?;
35		Self::init(conn)
36	}
37
38	fn init(conn: Connection) -> Result<Self> {
39		conn.execute_batch("PRAGMA journal_mode=WAL;")?;
40		conn.execute_batch("PRAGMA foreign_keys=ON;")?;
41		conn.execute_batch(SCHEMA_SQL)?;
42		Self::seed_concepts(&conn)?;
43		Ok(Self { conn: Arc::new(Mutex::new(conn)) })
44	}
45
46	/// Seed the `concepts` table with all entries from the curated vocabulary.
47	///
48	/// Uses `INSERT OR IGNORE` so existing rows are not overwritten.
49	fn seed_concepts(conn: &Connection) -> Result<()> {
50		use bkb_core::bitcoin::CONCEPTS;
51		let mut stmt = conn.prepare(
52			"INSERT OR IGNORE INTO concepts (slug, name, category, aliases)
53			 VALUES (?1, ?2, ?3, ?4)",
54		)?;
55		for concept in CONCEPTS {
56			let aliases_json = serde_json::to_string(concept.aliases)?;
57			stmt.execute(rusqlite::params![
58				concept.slug,
59				concept.name,
60				concept.category,
61				&aliases_json,
62			])?;
63		}
64		Ok(())
65	}
66
67	/// Insert or update a document, appending to the change log.
68	pub async fn upsert_document(&self, doc: &Document) -> Result<()> {
69		let conn = self.conn.lock().await;
70		let metadata_json = doc.metadata.as_ref().map(|m| serde_json::to_string(m)).transpose()?;
71
72		conn.execute_batch("BEGIN IMMEDIATE")?;
73
74		// Check if document exists for change_log type
75		let exists: bool = conn
76			.query_row("SELECT COUNT(*) FROM documents WHERE id = ?1", [&doc.id], |row| {
77				row.get::<_, i64>(0)
78			})
79			.map(|c| c > 0)?;
80
81		let change_type = if exists { "update" } else { "insert" };
82
83		// Insert into change_log first to get seq
84		conn.execute(
85			"INSERT INTO change_log (doc_id, change_type) VALUES (?1, ?2)",
86			rusqlite::params![&doc.id, change_type],
87		)?;
88
89		let seq: i64 = conn.last_insert_rowid();
90
91		// Upsert document
92		conn.execute(
93			r#"INSERT INTO documents (id, source_type, source_repo, source_id,
94				title, body, author, author_id, created_at, updated_at,
95				parent_id, metadata, seq)
96			VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
97			ON CONFLICT(id) DO UPDATE SET
98				title = excluded.title,
99				body = excluded.body,
100				author = excluded.author,
101				author_id = excluded.author_id,
102				updated_at = excluded.updated_at,
103				parent_id = excluded.parent_id,
104				metadata = excluded.metadata,
105				seq = excluded.seq"#,
106			rusqlite::params![
107				&doc.id,
108				doc.source_type.as_str(),
109				&doc.source_repo,
110				&doc.source_id,
111				&doc.title,
112				&doc.body,
113				&doc.author,
114				&doc.author_id,
115				doc.created_at.to_rfc3339(),
116				doc.updated_at.map(|t| t.to_rfc3339()),
117				&doc.parent_id,
118				&metadata_json,
119				seq,
120			],
121		)?;
122
123		conn.execute_batch("COMMIT")?;
124
125		debug!(doc_id = %doc.id, change_type, seq, "upserted document");
126		Ok(())
127	}
128
129	/// Insert a cross-reference.
130	pub async fn insert_reference(&self, reference: &Reference) -> Result<()> {
131		let conn = self.conn.lock().await;
132		conn.execute(
133			"INSERT INTO refs (from_doc_id, to_doc_id, ref_type, to_external, context)
134			 VALUES (?1, ?2, ?3, ?4, ?5)",
135			rusqlite::params![
136				&reference.from_doc_id,
137				&reference.to_doc_id,
138				reference.ref_type.as_str(),
139				&reference.to_external,
140				&reference.context,
141			],
142		)?;
143		Ok(())
144	}
145
146	/// Delete all references originating from a document (for re-enrichment).
147	pub async fn delete_refs_from(&self, doc_id: &str) -> Result<()> {
148		let conn = self.conn.lock().await;
149		conn.execute("DELETE FROM refs WHERE from_doc_id = ?1", [doc_id])?;
150		Ok(())
151	}
152
153	/// Insert or replace a concept mention for a document.
154	pub async fn upsert_concept_mention(
155		&self, doc_id: &str, concept_slug: &str, confidence: f32,
156	) -> Result<()> {
157		let conn = self.conn.lock().await;
158		conn.execute(
159			"INSERT OR REPLACE INTO concept_mentions (doc_id, concept_slug, confidence)
160			 VALUES (?1, ?2, ?3)",
161			rusqlite::params![doc_id, concept_slug, confidence],
162		)?;
163		Ok(())
164	}
165
166	/// Delete all concept mentions for a document (for re-enrichment).
167	pub async fn delete_concept_mentions(&self, doc_id: &str) -> Result<()> {
168		let conn = self.conn.lock().await;
169		conn.execute("DELETE FROM concept_mentions WHERE doc_id = ?1", [doc_id])?;
170		Ok(())
171	}
172
173	/// Delete all documents, refs, concept mentions, and sync state for a given
174	/// source type.  Returns the number of documents deleted.
175	pub async fn reset_source_type(
176		&self, source_type: &str, sync_id_patterns: &[String],
177	) -> Result<u64> {
178		let conn = self.conn.lock().await;
179
180		conn.execute_batch("BEGIN IMMEDIATE")?;
181
182		// Delete concept mentions for matching documents.
183		conn.execute(
184			"DELETE FROM concept_mentions WHERE doc_id IN \
185			 (SELECT id FROM documents WHERE source_type = ?1)",
186			[source_type],
187		)?;
188
189		// Delete outgoing refs from matching documents.
190		conn.execute(
191			"DELETE FROM refs WHERE from_doc_id IN \
192			 (SELECT id FROM documents WHERE source_type = ?1)",
193			[source_type],
194		)?;
195
196		// Delete incoming refs pointing at matching documents.
197		conn.execute(
198			"DELETE FROM refs WHERE to_doc_id IN \
199			 (SELECT id FROM documents WHERE source_type = ?1)",
200			[source_type],
201		)?;
202
203		// Delete change log entries for matching documents.
204		conn.execute(
205			"DELETE FROM change_log WHERE doc_id IN \
206			 (SELECT id FROM documents WHERE source_type = ?1)",
207			[source_type],
208		)?;
209
210		// Delete the documents themselves.
211		let deleted =
212			conn.execute("DELETE FROM documents WHERE source_type = ?1", [source_type])? as u64;
213
214		// Reset matching sync state entries.
215		for pattern in sync_id_patterns {
216			if pattern.contains('%') {
217				conn.execute("DELETE FROM sync_state WHERE source_id LIKE ?1", [pattern])?;
218			} else {
219				conn.execute("DELETE FROM sync_state WHERE source_id = ?1", [pattern])?;
220			}
221		}
222
223		conn.execute_batch("COMMIT")?;
224
225		Ok(deleted)
226	}
227
228	/// Return all document IDs and bodies for a given source type, for
229	/// re-enrichment.  Streams in batches to avoid loading everything into
230	/// memory at once.
231	pub async fn docs_for_reenrich(
232		&self, source_type: &str,
233	) -> Result<Vec<(String, Option<String>, Option<String>)>> {
234		let conn = self.conn.lock().await;
235		let mut stmt =
236			conn.prepare("SELECT id, body, source_repo FROM documents WHERE source_type = ?1")?;
237		let rows = stmt
238			.query_map([source_type], |row| {
239				Ok((
240					row.get::<_, String>(0)?,
241					row.get::<_, Option<String>>(1)?,
242					row.get::<_, Option<String>>(2)?,
243				))
244			})?
245			.collect::<rusqlite::Result<Vec<_>>>()?;
246		Ok(rows)
247	}
248
249	/// Get document counts grouped by source type.
250	pub async fn get_stats(&self) -> Result<Vec<(String, i64)>> {
251		let conn = self.conn.lock().await;
252		let mut stmt = conn.prepare(
253			"SELECT source_type, COUNT(*) FROM documents GROUP BY source_type ORDER BY COUNT(*) DESC",
254		)?;
255		let stats = stmt
256			.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)))?
257			.collect::<rusqlite::Result<Vec<_>>>()?;
258		Ok(stats)
259	}
260
261	/// Get all sync states.
262	pub async fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
263		let conn = self.conn.lock().await;
264		let mut stmt = conn.prepare(
265			"SELECT source_id, source_type, source_repo, last_cursor,
266				last_synced_at, next_run_at, status, error_message,
267				retry_count, items_found
268			 FROM sync_state ORDER BY source_id",
269		)?;
270		let states = stmt
271			.query_map([], |row| {
272				Ok(SyncState {
273					source_id: row.get(0)?,
274					source_type: row.get(1)?,
275					source_repo: row.get(2)?,
276					last_cursor: row.get(3)?,
277					last_synced_at: row
278						.get::<_, Option<String>>(4)?
279						.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
280						.map(|dt| dt.with_timezone(&chrono::Utc)),
281					next_run_at: row
282						.get::<_, Option<String>>(5)?
283						.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
284						.map(|dt| dt.with_timezone(&chrono::Utc)),
285					status: SyncStatus::from_str(row.get::<_, String>(6)?.as_str()),
286					error_message: row.get(7)?,
287					retry_count: row.get(8)?,
288					items_found: row.get(9)?,
289				})
290			})?
291			.collect::<rusqlite::Result<Vec<_>>>()?;
292		Ok(states)
293	}
294
295	/// Compact the change log by deleting entries older than the given duration.
296	pub async fn compact_change_log(&self, max_age: std::time::Duration) -> Result<u64> {
297		let conn = self.conn.lock().await;
298		let cutoff = chrono::Utc::now() - chrono::Duration::from_std(max_age)?;
299		let deleted =
300			conn.execute("DELETE FROM change_log WHERE changed_at < ?1", [cutoff.to_rfc3339()])?;
301		Ok(deleted as u64)
302	}
303
304	/// Get or create sync state for a source.
305	pub async fn get_sync_state(&self, source_id: &str) -> Result<Option<SyncState>> {
306		let conn = self.conn.lock().await;
307		let mut stmt = conn.prepare(
308			"SELECT source_id, source_type, source_repo, last_cursor,
309				last_synced_at, next_run_at, status, error_message,
310				retry_count, items_found
311			 FROM sync_state WHERE source_id = ?1",
312		)?;
313
314		let result = stmt
315			.query_row([source_id], |row| {
316				Ok(SyncState {
317					source_id: row.get(0)?,
318					source_type: row.get(1)?,
319					source_repo: row.get(2)?,
320					last_cursor: row.get(3)?,
321					last_synced_at: row
322						.get::<_, Option<String>>(4)?
323						.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
324						.map(|dt| dt.with_timezone(&chrono::Utc)),
325					next_run_at: row
326						.get::<_, Option<String>>(5)?
327						.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
328						.map(|dt| dt.with_timezone(&chrono::Utc)),
329					status: SyncStatus::from_str(row.get::<_, String>(6)?.as_str()),
330					error_message: row.get(7)?,
331					retry_count: row.get(8)?,
332					items_found: row.get(9)?,
333				})
334			})
335			.optional()?;
336
337		Ok(result)
338	}
339
340	/// Update sync state after a sync cycle.
341	pub async fn update_sync_state(&self, state: &SyncState) -> Result<()> {
342		let conn = self.conn.lock().await;
343		conn.execute(
344			"INSERT INTO sync_state (source_id, source_type, source_repo, last_cursor,
345				last_synced_at, next_run_at, status, error_message, retry_count, items_found)
346			 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
347			 ON CONFLICT(source_id) DO UPDATE SET
348				last_cursor = excluded.last_cursor,
349				last_synced_at = excluded.last_synced_at,
350				next_run_at = excluded.next_run_at,
351				status = excluded.status,
352				error_message = excluded.error_message,
353				retry_count = excluded.retry_count,
354				items_found = excluded.items_found",
355			rusqlite::params![
356				&state.source_id,
357				&state.source_type,
358				&state.source_repo,
359				&state.last_cursor,
360				state.last_synced_at.map(|t| t.to_rfc3339()),
361				state.next_run_at.map(|t| t.to_rfc3339()),
362				state.status.as_str(),
363				&state.error_message,
364				state.retry_count,
365				state.items_found,
366			],
367		)?;
368		Ok(())
369	}
370}
371
372impl SqliteStore {
373	/// Shared implementation for `lookup_bip` and `lookup_bolt`.
374	///
375	/// Finds the spec document by `source_type` and `source_id`, then collects all
376	/// incoming references via `to_external LIKE '{PREFIX}-{number}'`.
377	async fn lookup_spec(
378		&self, source_type: SourceType, number: u32,
379	) -> Result<Option<DocumentContext>> {
380		let conn = self.conn.lock().await;
381		let source_id = number.to_string();
382		// Also try zero-padded form (e.g., "06") for LUDs/NUTs whose filenames
383		// use two-digit padding.  This ensures lookups work regardless of
384		// whether the data was ingested before or after the padding fix.
385		let source_id_padded = format!("{:02}", number);
386
387		// Find the spec document.
388		let doc = conn
389			.query_row(
390				"SELECT id, source_type, source_repo, source_id, title, body,
391				 author, author_id, created_at, updated_at, parent_id, metadata, seq
392				 FROM documents WHERE source_type = ?1 AND source_id IN (?2, ?3)",
393				rusqlite::params![source_type.as_str(), &source_id, &source_id_padded],
394				|row| {
395					let source_type_str: String = row.get(1)?;
396					let created_at_str: String = row.get(8)?;
397					let updated_at_str: Option<String> = row.get(9)?;
398					let metadata_str: Option<String> = row.get(11)?;
399					Ok(Document {
400						id: row.get(0)?,
401						source_type: SourceType::from_str(&source_type_str)
402							.unwrap_or(SourceType::GithubIssue),
403						source_repo: row.get(2)?,
404						source_id: row.get(3)?,
405						title: row.get(4)?,
406						body: row.get(5)?,
407						author: row.get(6)?,
408						author_id: row.get(7)?,
409						created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
410							.map(|dt| dt.with_timezone(&chrono::Utc))
411							.unwrap_or_else(|_| chrono::Utc::now()),
412						updated_at: updated_at_str.and_then(|s| {
413							chrono::DateTime::parse_from_rfc3339(&s)
414								.ok()
415								.map(|dt| dt.with_timezone(&chrono::Utc))
416						}),
417						parent_id: row.get(10)?,
418						metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
419						seq: row.get(12)?,
420					})
421				},
422			)
423			.optional()?;
424
425		let doc = match doc {
426			Some(d) => d,
427			None => return Ok(None),
428		};
429
430		let url = doc.url();
431		let doc_id = &doc.id;
432
433		// Fetch outgoing refs from this document.
434		let mut stmt = conn.prepare(
435			"SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
436			 FROM refs WHERE from_doc_id = ?1",
437		)?;
438		let outgoing_refs = stmt
439			.query_map([doc_id], |row| {
440				let ref_type_str: String = row.get(3)?;
441				Ok(Reference {
442					id: row.get(0)?,
443					from_doc_id: row.get(1)?,
444					to_doc_id: row.get(2)?,
445					ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
446					to_external: row.get(4)?,
447					context: row.get(5)?,
448				})
449			})?
450			.collect::<rusqlite::Result<Vec<_>>>()?;
451
452		// Fetch incoming refs: both via to_doc_id (resolved) and to_external (unresolved).
453		let prefix = match source_type {
454			SourceType::Bip => "BIP",
455			SourceType::Bolt => "BOLT",
456			SourceType::Blip => "bLIP",
457			SourceType::Lud => "LUD",
458			SourceType::Nut => "NUT",
459			_ => unreachable!(),
460		};
461		let external_pattern = format!("{}-{}", prefix, number);
462
463		let mut stmt = conn.prepare(
464			"SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
465			 FROM refs WHERE to_doc_id = ?1 OR to_external = ?2",
466		)?;
467		let incoming_refs = stmt
468			.query_map(rusqlite::params![doc_id, &external_pattern], |row| {
469				let ref_type_str: String = row.get(3)?;
470				Ok(Reference {
471					id: row.get(0)?,
472					from_doc_id: row.get(1)?,
473					to_doc_id: row.get(2)?,
474					ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
475					to_external: row.get(4)?,
476					context: row.get(5)?,
477				})
478			})?
479			.collect::<rusqlite::Result<Vec<_>>>()?;
480
481		// Fetch concept tags.
482		let mut stmt =
483			conn.prepare("SELECT concept_slug FROM concept_mentions WHERE doc_id = ?1")?;
484		let concepts: Vec<String> =
485			stmt.query_map([doc_id], |row| row.get(0))?.collect::<rusqlite::Result<Vec<_>>>()?;
486
487		Ok(Some(DocumentContext { document: doc, url, outgoing_refs, incoming_refs, concepts }))
488	}
489}
490
491/// Helper trait for `rusqlite::OptionalExtension`-like behavior.
492trait OptionalRow<T> {
493	fn optional(self) -> rusqlite::Result<Option<T>>;
494}
495
496impl<T> OptionalRow<T> for rusqlite::Result<T> {
497	fn optional(self) -> rusqlite::Result<Option<T>> {
498		match self {
499			Ok(v) => Ok(Some(v)),
500			Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
501			Err(e) => Err(e),
502		}
503	}
504}
505
506#[async_trait]
507impl KnowledgeStore for SqliteStore {
508	async fn search(&self, params: SearchParams) -> Result<SearchResults> {
509		let conn = self.conn.lock().await;
510		let limit = params.limit.unwrap_or(20).min(100);
511
512		let query_trimmed = params.query.trim();
513		let is_wildcard = query_trimmed.is_empty() || query_trimmed == "*";
514
515		// Wildcard queries require at least one filter to avoid full table scans.
516		if is_wildcard {
517			let has_filter = params.source_type.as_ref().is_some_and(|v| !v.is_empty())
518				|| params.source_repo.as_ref().is_some_and(|v| !v.is_empty())
519				|| params.author.is_some()
520				|| params.after.is_some()
521				|| params.before.is_some();
522			if !has_filter {
523				anyhow::bail!(
524					"wildcard queries require at least one filter \
525					 (source_type, source_repo, author, after, or before)"
526				);
527			}
528		}
529
530		let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
531		let mut param_idx = 1;
532
533		let mut sql = if is_wildcard {
534			// No FTS -- query directly from documents table
535			String::from(
536				"SELECT d.id, d.source_type, d.source_repo, d.title,
537					NULL as snippet,
538					d.author, d.created_at,
539					0.0 as score,
540					d.source_id, d.parent_id, d.metadata
541				 FROM documents d
542				 WHERE 1=1",
543			)
544		} else {
545			// Full-text search via FTS5
546			let fts_query = build_fts_query(query_trimmed);
547			param_values.push(Box::new(fts_query));
548			param_idx = 2;
549			String::from(
550				"SELECT d.id, d.source_type, d.source_repo, d.title,
551					snippet(documents_fts, 1, '<mark>', '</mark>', '...', 64) as snippet,
552					d.author, d.created_at,
553					bm25(documents_fts, 5.0, 1.0) as score,
554					d.source_id, d.parent_id, d.metadata
555				 FROM documents_fts
556				 JOIN documents d ON d.rowid = documents_fts.rowid
557				 WHERE documents_fts MATCH ?1",
558			)
559		};
560
561		// Source type filter
562		if let Some(ref source_types) = params.source_type {
563			if !source_types.is_empty() {
564				let placeholders: Vec<String> = source_types
565					.iter()
566					.enumerate()
567					.map(|(i, _)| format!("?{}", param_idx + i))
568					.collect();
569				sql.push_str(&format!(" AND d.source_type IN ({})", placeholders.join(",")));
570				for st in source_types {
571					param_values.push(Box::new(st.as_str().to_string()));
572					param_idx += 1;
573				}
574			}
575		}
576
577		// Source repo filter
578		if let Some(ref repos) = params.source_repo {
579			if !repos.is_empty() {
580				let placeholders: Vec<String> =
581					repos.iter().enumerate().map(|(i, _)| format!("?{}", param_idx + i)).collect();
582				sql.push_str(&format!(" AND d.source_repo IN ({})", placeholders.join(",")));
583				for repo in repos {
584					param_values.push(Box::new(repo.clone()));
585					param_idx += 1;
586				}
587			}
588		}
589
590		// Author filter
591		if let Some(ref author) = params.author {
592			sql.push_str(&format!(" AND d.author = ?{}", param_idx));
593			param_values.push(Box::new(author.clone()));
594			param_idx += 1;
595		}
596
597		// Date filters
598		if let Some(ref after) = params.after {
599			sql.push_str(&format!(" AND d.created_at >= ?{}", param_idx));
600			param_values.push(Box::new(after.to_rfc3339()));
601			param_idx += 1;
602		}
603
604		if let Some(ref before) = params.before {
605			sql.push_str(&format!(" AND d.created_at <= ?{}", param_idx));
606			param_values.push(Box::new(before.to_rfc3339()));
607			let _ = param_idx;
608		}
609
610		if is_wildcard {
611			sql.push_str(" ORDER BY d.created_at DESC LIMIT ?");
612		} else {
613			sql.push_str(" ORDER BY score LIMIT ?");
614		}
615		param_values.push(Box::new(limit as i64));
616
617		let param_refs: Vec<&dyn rusqlite::types::ToSql> =
618			param_values.iter().map(|b| b.as_ref()).collect();
619
620		let mut stmt = conn.prepare(&sql)?;
621
622		let results: Vec<SearchResult> = stmt
623			.query_map(param_refs.as_slice(), |row| {
624				let source_type_str: String = row.get(1)?;
625				let created_at_str: String = row.get(6)?;
626				let source_type =
627					SourceType::from_str(&source_type_str).unwrap_or(SourceType::GithubIssue);
628				let source_repo: Option<String> = row.get(2)?;
629				let source_id: String = row.get(8)?;
630				let parent_id: Option<String> = row.get(9)?;
631				let metadata_str: Option<String> = row.get(10)?;
632				let doc = Document {
633					id: String::new(),
634					source_type: source_type.clone(),
635					source_repo: source_repo.clone(),
636					source_id,
637					title: None,
638					body: None,
639					author: None,
640					author_id: None,
641					created_at: chrono::Utc::now(),
642					updated_at: None,
643					parent_id,
644					metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
645					seq: None,
646				};
647				Ok(SearchResult {
648					id: row.get(0)?,
649					source_type,
650					source_repo,
651					title: row.get(3)?,
652					snippet: row.get(4)?,
653					author: row.get(5)?,
654					created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
655						.map(|dt| dt.with_timezone(&chrono::Utc))
656						.unwrap_or_else(|_| chrono::Utc::now()),
657					score: row.get::<_, f64>(7)?.abs(),
658					url: doc.url(),
659					concepts: Vec::new(),
660				})
661			})?
662			.collect::<rusqlite::Result<Vec<_>>>()?;
663
664		// Populate concept tags for each result
665		let mut concept_stmt =
666			conn.prepare("SELECT concept_slug FROM concept_mentions WHERE doc_id = ?1")?;
667		let results: Vec<SearchResult> = results
668			.into_iter()
669			.map(|mut r| {
670				if let Ok(concepts) = concept_stmt
671					.query_map([&r.id], |row| row.get::<_, String>(0))
672					.and_then(|rows| rows.collect::<rusqlite::Result<Vec<_>>>())
673				{
674					r.concepts = concepts;
675				}
676				r
677			})
678			.collect();
679
680		let total_count = results.len() as u32;
681		Ok(SearchResults { results, total_count })
682	}
683
684	async fn get_document(&self, id: &str) -> Result<Option<DocumentContext>> {
685		let conn = self.conn.lock().await;
686
687		// Fetch document
688		let doc = conn
689			.query_row(
690				"SELECT id, source_type, source_repo, source_id, title, body,
691				 author, author_id, created_at, updated_at, parent_id, metadata, seq
692				 FROM documents WHERE id = ?1",
693				[id],
694				|row| {
695					let source_type_str: String = row.get(1)?;
696					let created_at_str: String = row.get(8)?;
697					let updated_at_str: Option<String> = row.get(9)?;
698					let metadata_str: Option<String> = row.get(11)?;
699					Ok(Document {
700						id: row.get(0)?,
701						source_type: SourceType::from_str(&source_type_str)
702							.unwrap_or(SourceType::GithubIssue),
703						source_repo: row.get(2)?,
704						source_id: row.get(3)?,
705						title: row.get(4)?,
706						body: row.get(5)?,
707						author: row.get(6)?,
708						author_id: row.get(7)?,
709						created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
710							.map(|dt| dt.with_timezone(&chrono::Utc))
711							.unwrap_or_else(|_| chrono::Utc::now()),
712						updated_at: updated_at_str.and_then(|s| {
713							chrono::DateTime::parse_from_rfc3339(&s)
714								.ok()
715								.map(|dt| dt.with_timezone(&chrono::Utc))
716						}),
717						parent_id: row.get(10)?,
718						metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
719						seq: row.get(12)?,
720					})
721				},
722			)
723			.optional()?;
724
725		let doc = match doc {
726			Some(d) => d,
727			None => return Ok(None),
728		};
729
730		let url = doc.url();
731
732		// Fetch outgoing refs
733		let mut stmt = conn.prepare(
734			"SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
735			 FROM refs WHERE from_doc_id = ?1",
736		)?;
737		let outgoing_refs = stmt
738			.query_map([id], |row| {
739				let ref_type_str: String = row.get(3)?;
740				Ok(Reference {
741					id: row.get(0)?,
742					from_doc_id: row.get(1)?,
743					to_doc_id: row.get(2)?,
744					ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
745					to_external: row.get(4)?,
746					context: row.get(5)?,
747				})
748			})?
749			.collect::<rusqlite::Result<Vec<_>>>()?;
750
751		// Fetch incoming refs
752		let mut stmt = conn.prepare(
753			"SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
754			 FROM refs WHERE to_doc_id = ?1",
755		)?;
756		let incoming_refs = stmt
757			.query_map([id], |row| {
758				let ref_type_str: String = row.get(3)?;
759				Ok(Reference {
760					id: row.get(0)?,
761					from_doc_id: row.get(1)?,
762					to_doc_id: row.get(2)?,
763					ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
764					to_external: row.get(4)?,
765					context: row.get(5)?,
766				})
767			})?
768			.collect::<rusqlite::Result<Vec<_>>>()?;
769
770		// Fetch concept tags
771		let mut stmt =
772			conn.prepare("SELECT concept_slug FROM concept_mentions WHERE doc_id = ?1")?;
773		let concepts: Vec<String> =
774			stmt.query_map([id], |row| row.get(0))?.collect::<rusqlite::Result<Vec<_>>>()?;
775
776		Ok(Some(DocumentContext { document: doc, url, outgoing_refs, incoming_refs, concepts }))
777	}
778
779	async fn get_references(
780		&self, entity: &str, ref_type: Option<&str>, limit: u32,
781	) -> Result<Vec<Reference>> {
782		let conn = self.conn.lock().await;
783
784		let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match ref_type {
785			Some(rt) => (
786				"SELECT r.id, r.from_doc_id, r.to_doc_id, r.ref_type, r.to_external, r.context \
787				 FROM refs r \
788				 WHERE r.to_external = ?1 AND r.ref_type = ?2 \
789				 ORDER BY r.id \
790				 LIMIT ?3"
791					.to_string(),
792				vec![
793					Box::new(entity.to_string()),
794					Box::new(rt.to_string()),
795					Box::new(limit as i64),
796				],
797			),
798			None => (
799				"SELECT r.id, r.from_doc_id, r.to_doc_id, r.ref_type, r.to_external, r.context \
800				 FROM refs r \
801				 WHERE r.to_external = ?1 \
802				 ORDER BY r.id \
803				 LIMIT ?2"
804					.to_string(),
805				vec![Box::new(entity.to_string()), Box::new(limit as i64)],
806			),
807		};
808
809		let param_refs: Vec<&dyn rusqlite::types::ToSql> =
810			params.iter().map(|b| b.as_ref()).collect();
811
812		let mut stmt = conn.prepare(&sql)?;
813		let refs = stmt
814			.query_map(param_refs.as_slice(), |row| {
815				let ref_type_str: String = row.get(3)?;
816				Ok(Reference {
817					id: row.get(0)?,
818					from_doc_id: row.get(1)?,
819					to_doc_id: row.get(2)?,
820					ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
821					to_external: row.get(4)?,
822					context: row.get(5)?,
823				})
824			})?
825			.collect::<rusqlite::Result<Vec<_>>>()?;
826
827		Ok(refs)
828	}
829
830	async fn lookup_bip(&self, number: u32) -> Result<Option<DocumentContext>> {
831		self.lookup_spec(SourceType::Bip, number).await
832	}
833
834	async fn lookup_bolt(&self, number: u32) -> Result<Option<DocumentContext>> {
835		self.lookup_spec(SourceType::Bolt, number).await
836	}
837
838	async fn lookup_blip(&self, number: u32) -> Result<Option<DocumentContext>> {
839		self.lookup_spec(SourceType::Blip, number).await
840	}
841
842	async fn lookup_lud(&self, number: u32) -> Result<Option<DocumentContext>> {
843		self.lookup_spec(SourceType::Lud, number).await
844	}
845
846	async fn lookup_nut(&self, number: u32) -> Result<Option<DocumentContext>> {
847		self.lookup_spec(SourceType::Nut, number).await
848	}
849
850	async fn timeline(
851		&self, concept: &str, after: Option<chrono::DateTime<chrono::Utc>>,
852		before: Option<chrono::DateTime<chrono::Utc>>,
853	) -> Result<bkb_core::model::Timeline> {
854		let conn = self.conn.lock().await;
855
856		let mut sql = String::from(
857			"SELECT d.id, d.source_type, d.title, d.created_at, d.source_repo, d.source_id,
858				d.parent_id, d.metadata
859			 FROM concept_mentions cm
860			 JOIN documents d ON d.id = cm.doc_id
861			 WHERE cm.concept_slug = ?1",
862		);
863
864		let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
865			vec![Box::new(concept.to_string())];
866		let mut param_idx = 2;
867
868		if let Some(ref after) = after {
869			sql.push_str(&format!(" AND d.created_at >= ?{}", param_idx));
870			param_values.push(Box::new(after.to_rfc3339()));
871			param_idx += 1;
872		}
873
874		if let Some(ref before) = before {
875			sql.push_str(&format!(" AND d.created_at <= ?{}", param_idx));
876			param_values.push(Box::new(before.to_rfc3339()));
877			let _ = param_idx;
878		}
879
880		sql.push_str(" ORDER BY d.created_at ASC LIMIT 200");
881
882		let param_refs: Vec<&dyn rusqlite::types::ToSql> =
883			param_values.iter().map(|b| b.as_ref()).collect();
884
885		let mut stmt = conn.prepare(&sql)?;
886
887		let events: Vec<bkb_core::model::TimelineEvent> = stmt
888			.query_map(param_refs.as_slice(), |row| {
889				let source_type_str: String = row.get(1)?;
890				let created_at_str: String = row.get(3)?;
891				let source_type =
892					SourceType::from_str(&source_type_str).unwrap_or(SourceType::GithubIssue);
893				let source_repo: Option<String> = row.get(4)?;
894				let source_id: String = row.get(5)?;
895				let parent_id: Option<String> = row.get(6)?;
896				let metadata_str: Option<String> = row.get(7)?;
897				let doc = Document {
898					id: String::new(),
899					source_type: source_type.clone(),
900					source_repo: source_repo.clone(),
901					source_id,
902					title: None,
903					body: None,
904					author: None,
905					author_id: None,
906					created_at: chrono::Utc::now(),
907					updated_at: None,
908					parent_id,
909					metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
910					seq: None,
911				};
912				let date = created_at_str.get(..10).unwrap_or(&created_at_str).to_string();
913				Ok(bkb_core::model::TimelineEvent {
914					date,
915					source_type,
916					title: row.get(2)?,
917					id: row.get(0)?,
918					url: doc.url(),
919				})
920			})?
921			.collect::<rusqlite::Result<Vec<_>>>()?;
922
923		Ok(bkb_core::model::Timeline { concept: concept.to_string(), events })
924	}
925
926	async fn find_commit(
927		&self, query: &str, repo: Option<&str>,
928	) -> Result<Vec<bkb_core::model::CommitContext>> {
929		// Search commits first, then also search PRs as fallback context.
930		let mut source_types = vec![SourceType::Commit];
931		source_types.push(SourceType::GithubPr);
932
933		let params = SearchParams {
934			query: query.to_string(),
935			source_type: Some(source_types),
936			source_repo: repo.map(|r| vec![r.to_string()]),
937			limit: Some(10),
938			..Default::default()
939		};
940
941		let results = self.search(params).await?;
942
943		let mut contexts = Vec::new();
944		for result in results.results {
945			let doc_ctx = self.get_document(&result.id).await?;
946			if let Some(ctx) = doc_ctx {
947				// For commit documents, look up associated PRs that mention
948				// this commit SHA in their body or via references.
949				let associated_prs = if ctx.document.source_type == SourceType::Commit {
950					self.find_associated_prs(
951						&ctx.document.source_id,
952						ctx.document.source_repo.as_deref(),
953					)
954					.await
955					.unwrap_or_default()
956				} else {
957					Vec::new()
958				};
959
960				contexts.push(bkb_core::model::CommitContext {
961					url: ctx.url.clone(),
962					document: ctx.document,
963					associated_prs,
964				});
965			}
966		}
967
968		Ok(contexts)
969	}
970}
971
972impl SqliteStore {
973	/// Find PRs that reference a given commit SHA by searching PR bodies for
974	/// the SHA prefix.
975	async fn find_associated_prs(
976		&self, commit_sha: &str, source_repo: Option<&str>,
977	) -> Result<Vec<SearchResult>> {
978		let conn = self.conn.lock().await;
979		let sha = commit_sha.to_string();
980		let repo = source_repo.map(String::from);
981
982		// Search for PRs that mention this commit SHA (use 12-char prefix)
983		let sha_prefix = &sha[..sha.len().min(12)];
984
985		let mut sql = String::from(
986			"SELECT d.id, d.source_type, d.source_repo, d.title, d.author, d.created_at \
987			 FROM documents d \
988			 JOIN documents_fts ON documents_fts.rowid = d.rowid \
989			 WHERE d.source_type = 'github_pr' \
990			 AND documents_fts MATCH ?1",
991		);
992
993		let fts_sha = build_fts_query(sha_prefix);
994		let params: Vec<Box<dyn rusqlite::types::ToSql>> = if let Some(ref repo) = repo {
995			sql.push_str(" AND d.source_repo = ?2");
996			vec![Box::new(fts_sha.clone()), Box::new(repo.clone())]
997		} else {
998			vec![Box::new(fts_sha.clone())]
999		};
1000
1001		sql.push_str(" LIMIT 5");
1002
1003		let mut stmt = conn.prepare(&sql)?;
1004		let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1005			params.iter().map(|p| p.as_ref()).collect();
1006
1007		let results = stmt
1008			.query_map(param_refs.as_slice(), |row| {
1009				Ok(SearchResult {
1010					id: row.get(0)?,
1011					source_type: SourceType::from_str(&row.get::<_, String>(1)?)
1012						.unwrap_or(SourceType::GithubPr),
1013					source_repo: row.get(2)?,
1014					title: row.get(3)?,
1015					snippet: None,
1016					author: row.get(4)?,
1017					created_at: row
1018						.get::<_, String>(5)
1019						.ok()
1020						.and_then(|s| s.parse().ok())
1021						.unwrap_or_else(chrono::Utc::now),
1022					score: 0.0,
1023					url: None,
1024					concepts: Vec::new(),
1025				})
1026			})?
1027			.filter_map(|r| r.ok())
1028			.collect();
1029
1030		Ok(results)
1031	}
1032}
1033
1034/// Build an FTS5 query string from user input.
1035///
1036/// Escapes special characters and wraps terms for prefix matching.
1037fn build_fts_query(input: &str) -> String {
1038	let trimmed = input.trim();
1039	if trimmed.is_empty() {
1040		return "\"\"".to_string();
1041	}
1042
1043	// Strip all double-quotes so that user input like `lightning:"channel close"`
1044	// cannot bypass per-term quoting and re-expose the `:` column-filter bug.
1045	let trimmed = &trimmed.replace('"', "");
1046	let trimmed = trimmed.trim();
1047	if trimmed.is_empty() {
1048		return "\"\"".to_string();
1049	}
1050
1051	// Split into whitespace-delimited terms.
1052	let terms: Vec<&str> = trimmed.split_whitespace().collect();
1053
1054	// A term is a whole-word FTS5 boolean keyword when it exactly matches
1055	// one of the reserved words.  Previous substring-based checks caused
1056	// false positives for words like "MONITOR" (contains "OR"), "HANDLER"
1057	// (contains "AND"), or "CANNOT" (contains "NOT").
1058	let is_fts_keyword = |t: &str| matches!(t, "AND" | "OR" | "NOT" | "NEAR");
1059
1060	// Terms that contain FTS5 syntax characters need quoting so the engine
1061	// doesn't interpret them as column-filter (`:`) or other operators.
1062	let needs_quoting =
1063		|t: &str| t.contains(':') || t.contains('/') || t.contains('#') || t.contains('-');
1064
1065	let quoted: Vec<String> = terms
1066		.iter()
1067		.map(|t| {
1068			if is_fts_keyword(t) || t.ends_with('*') {
1069				// Preserve intentional FTS5 operators and prefix globs.
1070				t.to_string()
1071			} else if needs_quoting(t) {
1072				format!("\"{}\"", t)
1073			} else {
1074				t.to_string()
1075			}
1076		})
1077		.collect();
1078
1079	if quoted.len() == 1 {
1080		let term = &quoted[0];
1081		// Single term: use prefix matching unless already quoted/globbed.
1082		if term.starts_with('"') || term.ends_with('*') {
1083			term.clone()
1084		} else {
1085			format!("{}*", term)
1086		}
1087	} else {
1088		// Multiple terms: implicit AND (space-separated).
1089		quoted.join(" ")
1090	}
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095	use chrono::Utc;
1096
1097	use super::*;
1098	use bkb_core::model::{Document, SourceType};
1099
1100	fn test_doc(id: &str, title: &str, body: &str) -> Document {
1101		Document {
1102			id: id.to_string(),
1103			source_type: SourceType::GithubIssue,
1104			source_repo: Some("bitcoin/bitcoin".to_string()),
1105			source_id: "1".to_string(),
1106			title: Some(title.to_string()),
1107			body: Some(body.to_string()),
1108			author: Some("satoshi".to_string()),
1109			author_id: None,
1110			created_at: Utc::now(),
1111			updated_at: None,
1112			parent_id: None,
1113			metadata: None,
1114			seq: None,
1115		}
1116	}
1117
1118	#[tokio::test]
1119	async fn test_upsert_and_get_document() {
1120		let store = SqliteStore::open_in_memory().unwrap();
1121		let doc =
1122			test_doc("github_issue:bitcoin/bitcoin:1", "Test issue", "This is a test issue body");
1123
1124		store.upsert_document(&doc).await.unwrap();
1125
1126		let ctx = store.get_document("github_issue:bitcoin/bitcoin:1").await.unwrap();
1127		assert!(ctx.is_some());
1128		let ctx = ctx.unwrap();
1129		assert_eq!(ctx.document.title.as_deref(), Some("Test issue"));
1130		assert_eq!(ctx.document.body.as_deref(), Some("This is a test issue body"));
1131	}
1132
1133	#[tokio::test]
1134	async fn test_upsert_updates_existing() {
1135		let store = SqliteStore::open_in_memory().unwrap();
1136		let doc = test_doc("github_issue:bitcoin/bitcoin:1", "Original title", "Original body");
1137		store.upsert_document(&doc).await.unwrap();
1138
1139		let mut updated = doc.clone();
1140		updated.title = Some("Updated title".to_string());
1141		store.upsert_document(&updated).await.unwrap();
1142
1143		let ctx = store.get_document("github_issue:bitcoin/bitcoin:1").await.unwrap().unwrap();
1144		assert_eq!(ctx.document.title.as_deref(), Some("Updated title"));
1145	}
1146
1147	#[tokio::test]
1148	async fn test_search_fts() {
1149		let store = SqliteStore::open_in_memory().unwrap();
1150
1151		let doc1 = test_doc(
1152			"github_issue:bitcoin/bitcoin:1",
1153			"Add taproot support",
1154			"Implementing BIP-340 and BIP-341 for schnorr signatures",
1155		);
1156		let mut doc2 = test_doc(
1157			"github_issue:bitcoin/bitcoin:2",
1158			"Fix mempool bug",
1159			"There is a bug in the mempool validation logic",
1160		);
1161		doc2.source_id = "2".to_string();
1162
1163		store.upsert_document(&doc1).await.unwrap();
1164		store.upsert_document(&doc2).await.unwrap();
1165
1166		let results = store
1167			.search(SearchParams { query: "taproot".to_string(), ..Default::default() })
1168			.await
1169			.unwrap();
1170
1171		assert_eq!(results.results.len(), 1);
1172		assert!(results.results[0].title.as_deref().unwrap().contains("taproot"));
1173	}
1174
1175	/// Regression test: queries containing `:` must not trigger FTS5
1176	/// "no such column" errors (`:` is the column-filter operator).
1177	#[tokio::test]
1178	async fn test_search_fts_with_colon_in_query() {
1179		let store = SqliteStore::open_in_memory().unwrap();
1180
1181		let mut doc = test_doc(
1182			"github_pr:lightningdevkit/ldk-node:4463",
1183			"Fix lightning channel close",
1184			"Resolves an issue with force-close in ldk-node",
1185		);
1186		doc.source_type = SourceType::GithubPr;
1187		doc.source_repo = Some("lightningdevkit/ldk-node".to_string());
1188		doc.source_id = "4463".to_string();
1189		store.upsert_document(&doc).await.unwrap();
1190
1191		// Query containing `:` — previously caused "no such column: lightning".
1192		let results = store
1193			.search(SearchParams {
1194				query: "lightningdevkit/ldk-node:4463".to_string(),
1195				..Default::default()
1196			})
1197			.await;
1198		assert!(results.is_ok(), "search with colon must not fail: {:?}", results.err());
1199
1200		// Also verify the document can be retrieved by its ID (which contains `/`).
1201		let ctx = store.get_document("github_pr:lightningdevkit/ldk-node:4463").await.unwrap();
1202		assert!(ctx.is_some());
1203		assert_eq!(ctx.unwrap().document.source_id, "4463");
1204	}
1205
1206	/// Regression test: words containing "OR"/"AND"/"NOT" as substrings
1207	/// (e.g. "ChannelMonitor", "ChannelManager") must not trigger the
1208	/// pass-through path that skips quoting.
1209	#[tokio::test]
1210	async fn test_search_fts_operator_substring_false_positive() {
1211		let store = SqliteStore::open_in_memory().unwrap();
1212
1213		let doc = test_doc(
1214			"github_issue:lightningdevkit/rust-lightning:1",
1215			"ChannelMonitor persistence ordering",
1216			"The ChannelManager and ChannelMonitor must be persisted in the right order",
1217		);
1218		store.upsert_document(&doc).await.unwrap();
1219
1220		// "ChannelMonitor" contains "OR", "ChannelManager" contains "AND" —
1221		// these must NOT cause the query to be passed through raw.
1222		let results = store
1223			.search(SearchParams {
1224				query: "ChannelMonitor ChannelManager ordering".to_string(),
1225				..Default::default()
1226			})
1227			.await;
1228		assert!(
1229			results.is_ok(),
1230			"search with OR/AND substrings must not fail: {:?}",
1231			results.err()
1232		);
1233		assert!(!results.unwrap().results.is_empty());
1234	}
1235
1236	/// Regression test: hyphenated terms like `rust-lightning` must not be
1237	/// interpreted as `rust NOT lightning` by FTS5.
1238	#[tokio::test]
1239	async fn test_search_fts_hyphenated_term() {
1240		let store = SqliteStore::open_in_memory().unwrap();
1241
1242		let doc = test_doc(
1243			"github_issue:lightningdevkit/rust-lightning:42",
1244			"rust-lightning bug fix",
1245			"Fix a bug in rust-lightning channel handling",
1246		);
1247		store.upsert_document(&doc).await.unwrap();
1248
1249		let results = store
1250			.search(SearchParams { query: "rust-lightning".to_string(), ..Default::default() })
1251			.await;
1252		assert!(results.is_ok(), "hyphenated search must not fail: {:?}", results.err());
1253	}
1254
1255	#[tokio::test]
1256	async fn test_search_fts_quotes_with_colon() {
1257		// Regression: `lightning:"channel close"` previously bypassed per-term
1258		// quoting, exposing the `:` column-filter operator to FTS5.
1259		let store = SqliteStore::open_in_memory().unwrap();
1260
1261		let doc = test_doc(
1262			"github_issue:lightningdevkit/rust-lightning:99",
1263			"lightning channel close",
1264			"Details about closing a lightning channel",
1265		);
1266		store.upsert_document(&doc).await.unwrap();
1267
1268		let results = store
1269			.search(SearchParams {
1270				query: "lightning:\"channel close\"".to_string(),
1271				..Default::default()
1272			})
1273			.await;
1274		assert!(results.is_ok(), "quoted+colon search must not crash: {:?}", results.err());
1275	}
1276
1277	#[test]
1278	fn test_build_fts_query_strips_quotes() {
1279		// Quotes are stripped so `:` still gets properly quoted.
1280		assert_eq!(build_fts_query("lightning:\"channel close\""), "\"lightning:channel\" close");
1281
1282		// Pure quotes collapse to empty.
1283		assert_eq!(build_fts_query("\"\""), "\"\"");
1284
1285		// Normal input unchanged.
1286		assert_eq!(build_fts_query("hello world"), "hello world");
1287	}
1288
1289	#[tokio::test]
1290	async fn test_get_nonexistent_document() {
1291		let store = SqliteStore::open_in_memory().unwrap();
1292		let result = store.get_document("nonexistent:id").await.unwrap();
1293		assert!(result.is_none());
1294	}
1295
1296	#[tokio::test]
1297	async fn test_references() {
1298		let store = SqliteStore::open_in_memory().unwrap();
1299
1300		let doc = test_doc("github_issue:bitcoin/bitcoin:1", "Test", "Body");
1301		store.upsert_document(&doc).await.unwrap();
1302
1303		let reference = Reference {
1304			id: None,
1305			from_doc_id: "github_issue:bitcoin/bitcoin:1".to_string(),
1306			to_doc_id: None,
1307			ref_type: RefType::ReferencesBip,
1308			to_external: Some("BIP-340".to_string()),
1309			context: Some("Implementing BIP-340 schnorr".to_string()),
1310		};
1311		store.insert_reference(&reference).await.unwrap();
1312
1313		let ctx = store.get_document("github_issue:bitcoin/bitcoin:1").await.unwrap().unwrap();
1314		assert_eq!(ctx.outgoing_refs.len(), 1);
1315		assert_eq!(ctx.outgoing_refs[0].ref_type, RefType::ReferencesBip);
1316	}
1317
1318	#[tokio::test]
1319	async fn test_sync_state() {
1320		let store = SqliteStore::open_in_memory().unwrap();
1321
1322		let state = SyncState {
1323			source_id: "github:bitcoin/bitcoin:issues".to_string(),
1324			source_type: "github_issue".to_string(),
1325			source_repo: Some("bitcoin/bitcoin".to_string()),
1326			last_cursor: Some("2024-01-01T00:00:00Z".to_string()),
1327			last_synced_at: Some(Utc::now()),
1328			next_run_at: None,
1329			status: SyncStatus::Ok,
1330			error_message: None,
1331			retry_count: 0,
1332			items_found: 42,
1333		};
1334
1335		store.update_sync_state(&state).await.unwrap();
1336
1337		let retrieved = store.get_sync_state("github:bitcoin/bitcoin:issues").await.unwrap();
1338		assert!(retrieved.is_some());
1339		let retrieved = retrieved.unwrap();
1340		assert_eq!(retrieved.items_found, 42);
1341		assert_eq!(retrieved.status, SyncStatus::Ok);
1342	}
1343
1344	#[tokio::test]
1345	async fn test_lookup_lud() {
1346		let store = SqliteStore::open_in_memory().unwrap();
1347
1348		// Store a LUD with unpadded source_id (as the sync adapter now does)
1349		let doc = Document {
1350			id: Document::make_id(&SourceType::Lud, None, "6"),
1351			source_type: SourceType::Lud,
1352			source_repo: None,
1353			source_id: "6".to_string(),
1354			title: Some("LUD-06: lnurl-pay".to_string()),
1355			body: Some("# lnurl-pay\n\nSpec content here.".to_string()),
1356			author: None,
1357			author_id: None,
1358			created_at: Utc::now(),
1359			updated_at: None,
1360			parent_id: None,
1361			metadata: Some(
1362				serde_json::json!({ "url": "https://github.com/lnurl/luds/blob/luds/06.md" }),
1363			),
1364			seq: None,
1365		};
1366		store.upsert_document(&doc).await.unwrap();
1367
1368		let ctx = store.lookup_lud(6).await.unwrap();
1369		assert!(ctx.is_some(), "lookup_lud(6) should find the document");
1370		let ctx = ctx.unwrap();
1371		assert_eq!(ctx.document.title.as_deref(), Some("LUD-06: lnurl-pay"));
1372		assert_eq!(ctx.url.as_deref(), Some("https://github.com/lnurl/luds/blob/luds/06.md"));
1373	}
1374
1375	#[tokio::test]
1376	async fn test_lookup_lud_padded_source_id() {
1377		let store = SqliteStore::open_in_memory().unwrap();
1378
1379		// Simulate old data with zero-padded source_id
1380		let doc = Document {
1381			id: Document::make_id(&SourceType::Lud, None, "06"),
1382			source_type: SourceType::Lud,
1383			source_repo: None,
1384			source_id: "06".to_string(),
1385			title: Some("LUD-06: lnurl-pay".to_string()),
1386			body: Some("# lnurl-pay\n\nSpec content here.".to_string()),
1387			author: None,
1388			author_id: None,
1389			created_at: Utc::now(),
1390			updated_at: None,
1391			parent_id: None,
1392			metadata: None,
1393			seq: None,
1394		};
1395		store.upsert_document(&doc).await.unwrap();
1396
1397		// lookup_lud(6) should still find it even though source_id is "06"
1398		let ctx = store.lookup_lud(6).await.unwrap();
1399		assert!(ctx.is_some(), "lookup_lud(6) should find padded source_id '06'");
1400		assert_eq!(ctx.unwrap().document.title.as_deref(), Some("LUD-06: lnurl-pay"));
1401	}
1402
1403	#[tokio::test]
1404	async fn test_lookup_nut_zero() {
1405		let store = SqliteStore::open_in_memory().unwrap();
1406
1407		// NUT-00 is a real spec — source_id is "0"
1408		let doc = Document {
1409			id: Document::make_id(&SourceType::Nut, None, "0"),
1410			source_type: SourceType::Nut,
1411			source_repo: None,
1412			source_id: "0".to_string(),
1413			title: Some("NUT-00: Notation, Usage, and Terminology".to_string()),
1414			body: Some("# NUT-00\n\nSpec content here.".to_string()),
1415			author: None,
1416			author_id: None,
1417			created_at: Utc::now(),
1418			updated_at: None,
1419			parent_id: None,
1420			metadata: Some(
1421				serde_json::json!({ "url": "https://github.com/cashubtc/nuts/blob/main/00.md" }),
1422			),
1423			seq: None,
1424		};
1425		store.upsert_document(&doc).await.unwrap();
1426
1427		let ctx = store.lookup_nut(0).await.unwrap();
1428		assert!(ctx.is_some(), "lookup_nut(0) should find NUT-00");
1429		let ctx = ctx.unwrap();
1430		assert_eq!(ctx.document.title.as_deref(), Some("NUT-00: Notation, Usage, and Terminology"));
1431		assert_eq!(ctx.url.as_deref(), Some("https://github.com/cashubtc/nuts/blob/main/00.md"));
1432	}
1433
1434	#[tokio::test]
1435	async fn test_reset_source_type_only_deletes_targeted() {
1436		let store = SqliteStore::open_in_memory().unwrap();
1437
1438		// Insert a LUD document
1439		let lud_doc = Document {
1440			id: Document::make_id(&SourceType::Lud, None, "6"),
1441			source_type: SourceType::Lud,
1442			source_repo: None,
1443			source_id: "6".to_string(),
1444			title: Some("LUD-06".to_string()),
1445			body: Some("lnurl-pay spec".to_string()),
1446			author: None,
1447			author_id: None,
1448			created_at: Utc::now(),
1449			updated_at: None,
1450			parent_id: None,
1451			metadata: None,
1452			seq: None,
1453		};
1454		store.upsert_document(&lud_doc).await.unwrap();
1455
1456		// Insert a BIP document
1457		let bip_doc = Document {
1458			id: Document::make_id(&SourceType::Bip, None, "340"),
1459			source_type: SourceType::Bip,
1460			source_repo: None,
1461			source_id: "340".to_string(),
1462			title: Some("BIP-340".to_string()),
1463			body: Some("Schnorr signatures".to_string()),
1464			author: None,
1465			author_id: None,
1466			created_at: Utc::now(),
1467			updated_at: None,
1468			parent_id: None,
1469			metadata: None,
1470			seq: None,
1471		};
1472		store.upsert_document(&bip_doc).await.unwrap();
1473
1474		// Insert a GitHub issue
1475		let issue_doc = Document {
1476			id: "github_issue:bitcoin/bitcoin:1".to_string(),
1477			source_type: SourceType::GithubIssue,
1478			source_repo: Some("bitcoin/bitcoin".to_string()),
1479			source_id: "1".to_string(),
1480			title: Some("Test issue".to_string()),
1481			body: Some("Mentions LUD-6 and BIP-340".to_string()),
1482			author: Some("alice".to_string()),
1483			author_id: None,
1484			created_at: Utc::now(),
1485			updated_at: None,
1486			parent_id: None,
1487			metadata: None,
1488			seq: None,
1489		};
1490		store.upsert_document(&issue_doc).await.unwrap();
1491
1492		// Add a cross-reference from the issue to the LUD
1493		let lud_ref = Reference {
1494			id: None,
1495			from_doc_id: issue_doc.id.clone(),
1496			to_doc_id: None,
1497			ref_type: RefType::ReferencesLud,
1498			to_external: Some("LUD-6".to_string()),
1499			context: None,
1500		};
1501		store.insert_reference(&lud_ref).await.unwrap();
1502
1503		// Add a cross-reference from the issue to the BIP
1504		let bip_ref = Reference {
1505			id: None,
1506			from_doc_id: issue_doc.id.clone(),
1507			to_doc_id: None,
1508			ref_type: RefType::ReferencesBip,
1509			to_external: Some("BIP-340".to_string()),
1510			context: None,
1511		};
1512		store.insert_reference(&bip_ref).await.unwrap();
1513
1514		// Add a concept mention on the LUD doc (use a seeded concept)
1515		store.upsert_concept_mention(&lud_doc.id, "taproot", 1.0).await.unwrap();
1516
1517		// Add sync state entries
1518		let lud_state = SyncState {
1519			source_id: "specs:luds".to_string(),
1520			source_type: "specs".to_string(),
1521			source_repo: None,
1522			last_cursor: None,
1523			last_synced_at: Some(Utc::now()),
1524			next_run_at: None,
1525			status: SyncStatus::Ok,
1526			error_message: None,
1527			retry_count: 0,
1528			items_found: 21,
1529		};
1530		store.update_sync_state(&lud_state).await.unwrap();
1531
1532		let bip_state = SyncState {
1533			source_id: "specs:bips".to_string(),
1534			source_type: "specs".to_string(),
1535			source_repo: None,
1536			last_cursor: None,
1537			last_synced_at: Some(Utc::now()),
1538			next_run_at: None,
1539			status: SyncStatus::Ok,
1540			error_message: None,
1541			retry_count: 0,
1542			items_found: 300,
1543		};
1544		store.update_sync_state(&bip_state).await.unwrap();
1545
1546		// Verify all 3 docs exist
1547		assert!(store.get_document(&lud_doc.id).await.unwrap().is_some());
1548		assert!(store.get_document(&bip_doc.id).await.unwrap().is_some());
1549		assert!(store.get_document(&issue_doc.id).await.unwrap().is_some());
1550
1551		// Reset LUD source type
1552		let deleted = store.reset_source_type("lud", &["specs:luds".into()]).await.unwrap();
1553		assert_eq!(deleted, 1, "should delete exactly 1 LUD document");
1554
1555		// LUD doc should be gone
1556		assert!(store.get_document(&lud_doc.id).await.unwrap().is_none());
1557
1558		// BIP and issue docs should still exist
1559		assert!(store.get_document(&bip_doc.id).await.unwrap().is_some());
1560		let issue_ctx = store.get_document(&issue_doc.id).await.unwrap().unwrap();
1561		assert_eq!(issue_ctx.document.title.as_deref(), Some("Test issue"));
1562
1563		// The issue's outgoing refs should still be intact (they originate
1564		// from the issue, not from the deleted LUD)
1565		assert_eq!(issue_ctx.outgoing_refs.len(), 2);
1566
1567		// LUD sync state should be gone, BIP sync state should remain
1568		assert!(store.get_sync_state("specs:luds").await.unwrap().is_none());
1569		assert!(store.get_sync_state("specs:bips").await.unwrap().is_some());
1570	}
1571
1572	#[tokio::test]
1573	async fn test_reset_source_type_with_like_pattern() {
1574		let store = SqliteStore::open_in_memory().unwrap();
1575
1576		// Insert two GitHub issues from different repos
1577		let doc1 = Document {
1578			id: "github_issue:bitcoin/bitcoin:1".to_string(),
1579			source_type: SourceType::GithubIssue,
1580			source_repo: Some("bitcoin/bitcoin".to_string()),
1581			source_id: "1".to_string(),
1582			title: Some("Bitcoin issue".to_string()),
1583			body: Some("body".to_string()),
1584			author: None,
1585			author_id: None,
1586			created_at: Utc::now(),
1587			updated_at: None,
1588			parent_id: None,
1589			metadata: None,
1590			seq: None,
1591		};
1592		let doc2 = Document {
1593			id: "github_issue:lightningdevkit/ldk-node:1".to_string(),
1594			source_type: SourceType::GithubIssue,
1595			source_repo: Some("lightningdevkit/ldk-node".to_string()),
1596			source_id: "1".to_string(),
1597			title: Some("LDK issue".to_string()),
1598			body: Some("body".to_string()),
1599			author: None,
1600			author_id: None,
1601			created_at: Utc::now(),
1602			updated_at: None,
1603			parent_id: None,
1604			metadata: None,
1605			seq: None,
1606		};
1607		let bolt_doc = Document {
1608			id: Document::make_id(&SourceType::Bolt, None, "2"),
1609			source_type: SourceType::Bolt,
1610			source_repo: None,
1611			source_id: "2".to_string(),
1612			title: Some("BOLT-2".to_string()),
1613			body: Some("Peer protocol".to_string()),
1614			author: None,
1615			author_id: None,
1616			created_at: Utc::now(),
1617			updated_at: None,
1618			parent_id: None,
1619			metadata: None,
1620			seq: None,
1621		};
1622		store.upsert_document(&doc1).await.unwrap();
1623		store.upsert_document(&doc2).await.unwrap();
1624		store.upsert_document(&bolt_doc).await.unwrap();
1625
1626		// Add sync states
1627		for sid in &["github:bitcoin/bitcoin:issues", "github:lightningdevkit/ldk-node:issues"] {
1628			let state = SyncState {
1629				source_id: sid.to_string(),
1630				source_type: "github".to_string(),
1631				source_repo: None,
1632				last_cursor: None,
1633				last_synced_at: Some(Utc::now()),
1634				next_run_at: None,
1635				status: SyncStatus::Ok,
1636				error_message: None,
1637				retry_count: 0,
1638				items_found: 10,
1639			};
1640			store.update_sync_state(&state).await.unwrap();
1641		}
1642
1643		// Reset github_issue — should delete both issues but not the BOLT
1644		let deleted =
1645			store.reset_source_type("github_issue", &["github:%:issues".into()]).await.unwrap();
1646		assert_eq!(deleted, 2, "should delete both GitHub issues");
1647
1648		// Both issues gone
1649		assert!(store.get_document(&doc1.id).await.unwrap().is_none());
1650		assert!(store.get_document(&doc2.id).await.unwrap().is_none());
1651
1652		// BOLT still exists
1653		assert!(store.get_document(&bolt_doc.id).await.unwrap().is_some());
1654
1655		// Both issue sync states cleared
1656		assert!(store.get_sync_state("github:bitcoin/bitcoin:issues").await.unwrap().is_none());
1657		assert!(store
1658			.get_sync_state("github:lightningdevkit/ldk-node:issues")
1659			.await
1660			.unwrap()
1661			.is_none());
1662	}
1663
1664	#[tokio::test]
1665	async fn test_lookup_nut_padded_source_id() {
1666		let store = SqliteStore::open_in_memory().unwrap();
1667
1668		// Simulate old data with zero-padded source_id "00"
1669		let doc = Document {
1670			id: Document::make_id(&SourceType::Nut, None, "00"),
1671			source_type: SourceType::Nut,
1672			source_repo: None,
1673			source_id: "00".to_string(),
1674			title: Some("NUT-00: Notation, Usage, and Terminology".to_string()),
1675			body: Some("# NUT-00\n\nSpec content here.".to_string()),
1676			author: None,
1677			author_id: None,
1678			created_at: Utc::now(),
1679			updated_at: None,
1680			parent_id: None,
1681			metadata: None,
1682			seq: None,
1683		};
1684		store.upsert_document(&doc).await.unwrap();
1685
1686		// lookup_nut(0) should still find it even though source_id is "00"
1687		let ctx = store.lookup_nut(0).await.unwrap();
1688		assert!(ctx.is_some(), "lookup_nut(0) should find padded source_id '00'");
1689		assert_eq!(
1690			ctx.unwrap().document.title.as_deref(),
1691			Some("NUT-00: Notation, Usage, and Terminology")
1692		);
1693	}
1694}