1use std::path::Path;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use rusqlite::Connection;
7use tokio::sync::Mutex;
8use tracing::debug;
9
10use bkb_core::model::{
11 Document, DocumentContext, RefType, Reference, SearchParams, SearchResult, SearchResults,
12 SourceType, SyncState, SyncStatus,
13};
14use bkb_core::schema::SCHEMA_SQL;
15use bkb_core::store::KnowledgeStore;
16
17pub struct SqliteStore {
22 conn: Arc<Mutex<Connection>>,
23}
24
25impl SqliteStore {
26 pub fn open(path: &Path) -> Result<Self> {
28 let conn = Connection::open(path)?;
29 Self::init(conn)
30 }
31
32 pub fn open_in_memory() -> Result<Self> {
34 let conn = Connection::open_in_memory()?;
35 Self::init(conn)
36 }
37
38 fn init(conn: Connection) -> Result<Self> {
39 conn.execute_batch("PRAGMA journal_mode=WAL;")?;
40 conn.execute_batch("PRAGMA foreign_keys=ON;")?;
41 conn.execute_batch(SCHEMA_SQL)?;
42 Self::seed_concepts(&conn)?;
43 Ok(Self { conn: Arc::new(Mutex::new(conn)) })
44 }
45
46 fn seed_concepts(conn: &Connection) -> Result<()> {
50 use bkb_core::bitcoin::CONCEPTS;
51 let mut stmt = conn.prepare(
52 "INSERT OR IGNORE INTO concepts (slug, name, category, aliases)
53 VALUES (?1, ?2, ?3, ?4)",
54 )?;
55 for concept in CONCEPTS {
56 let aliases_json = serde_json::to_string(concept.aliases)?;
57 stmt.execute(rusqlite::params![
58 concept.slug,
59 concept.name,
60 concept.category,
61 &aliases_json,
62 ])?;
63 }
64 Ok(())
65 }
66
67 pub async fn upsert_document(&self, doc: &Document) -> Result<()> {
69 let conn = self.conn.lock().await;
70 let metadata_json = doc.metadata.as_ref().map(|m| serde_json::to_string(m)).transpose()?;
71
72 conn.execute_batch("BEGIN IMMEDIATE")?;
73
74 let exists: bool = conn
76 .query_row("SELECT COUNT(*) FROM documents WHERE id = ?1", [&doc.id], |row| {
77 row.get::<_, i64>(0)
78 })
79 .map(|c| c > 0)?;
80
81 let change_type = if exists { "update" } else { "insert" };
82
83 conn.execute(
85 "INSERT INTO change_log (doc_id, change_type) VALUES (?1, ?2)",
86 rusqlite::params![&doc.id, change_type],
87 )?;
88
89 let seq: i64 = conn.last_insert_rowid();
90
91 conn.execute(
93 r#"INSERT INTO documents (id, source_type, source_repo, source_id,
94 title, body, author, author_id, created_at, updated_at,
95 parent_id, metadata, seq)
96 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
97 ON CONFLICT(id) DO UPDATE SET
98 title = excluded.title,
99 body = excluded.body,
100 author = excluded.author,
101 author_id = excluded.author_id,
102 updated_at = excluded.updated_at,
103 parent_id = excluded.parent_id,
104 metadata = excluded.metadata,
105 seq = excluded.seq"#,
106 rusqlite::params![
107 &doc.id,
108 doc.source_type.as_str(),
109 &doc.source_repo,
110 &doc.source_id,
111 &doc.title,
112 &doc.body,
113 &doc.author,
114 &doc.author_id,
115 doc.created_at.to_rfc3339(),
116 doc.updated_at.map(|t| t.to_rfc3339()),
117 &doc.parent_id,
118 &metadata_json,
119 seq,
120 ],
121 )?;
122
123 conn.execute_batch("COMMIT")?;
124
125 debug!(doc_id = %doc.id, change_type, seq, "upserted document");
126 Ok(())
127 }
128
129 pub async fn insert_reference(&self, reference: &Reference) -> Result<()> {
131 let conn = self.conn.lock().await;
132 conn.execute(
133 "INSERT INTO refs (from_doc_id, to_doc_id, ref_type, to_external, context)
134 VALUES (?1, ?2, ?3, ?4, ?5)",
135 rusqlite::params![
136 &reference.from_doc_id,
137 &reference.to_doc_id,
138 reference.ref_type.as_str(),
139 &reference.to_external,
140 &reference.context,
141 ],
142 )?;
143 Ok(())
144 }
145
146 pub async fn delete_refs_from(&self, doc_id: &str) -> Result<()> {
148 let conn = self.conn.lock().await;
149 conn.execute("DELETE FROM refs WHERE from_doc_id = ?1", [doc_id])?;
150 Ok(())
151 }
152
153 pub async fn upsert_concept_mention(
155 &self, doc_id: &str, concept_slug: &str, confidence: f32,
156 ) -> Result<()> {
157 let conn = self.conn.lock().await;
158 conn.execute(
159 "INSERT OR REPLACE INTO concept_mentions (doc_id, concept_slug, confidence)
160 VALUES (?1, ?2, ?3)",
161 rusqlite::params![doc_id, concept_slug, confidence],
162 )?;
163 Ok(())
164 }
165
166 pub async fn delete_concept_mentions(&self, doc_id: &str) -> Result<()> {
168 let conn = self.conn.lock().await;
169 conn.execute("DELETE FROM concept_mentions WHERE doc_id = ?1", [doc_id])?;
170 Ok(())
171 }
172
173 pub async fn reset_source_type(
176 &self, source_type: &str, sync_id_patterns: &[String],
177 ) -> Result<u64> {
178 let conn = self.conn.lock().await;
179
180 conn.execute_batch("BEGIN IMMEDIATE")?;
181
182 conn.execute(
184 "DELETE FROM concept_mentions WHERE doc_id IN \
185 (SELECT id FROM documents WHERE source_type = ?1)",
186 [source_type],
187 )?;
188
189 conn.execute(
191 "DELETE FROM refs WHERE from_doc_id IN \
192 (SELECT id FROM documents WHERE source_type = ?1)",
193 [source_type],
194 )?;
195
196 conn.execute(
198 "DELETE FROM refs WHERE to_doc_id IN \
199 (SELECT id FROM documents WHERE source_type = ?1)",
200 [source_type],
201 )?;
202
203 conn.execute(
205 "DELETE FROM change_log WHERE doc_id IN \
206 (SELECT id FROM documents WHERE source_type = ?1)",
207 [source_type],
208 )?;
209
210 let deleted =
212 conn.execute("DELETE FROM documents WHERE source_type = ?1", [source_type])? as u64;
213
214 for pattern in sync_id_patterns {
216 if pattern.contains('%') {
217 conn.execute("DELETE FROM sync_state WHERE source_id LIKE ?1", [pattern])?;
218 } else {
219 conn.execute("DELETE FROM sync_state WHERE source_id = ?1", [pattern])?;
220 }
221 }
222
223 conn.execute_batch("COMMIT")?;
224
225 Ok(deleted)
226 }
227
228 pub async fn docs_for_reenrich(
232 &self, source_type: &str,
233 ) -> Result<Vec<(String, Option<String>, Option<String>)>> {
234 let conn = self.conn.lock().await;
235 let mut stmt =
236 conn.prepare("SELECT id, body, source_repo FROM documents WHERE source_type = ?1")?;
237 let rows = stmt
238 .query_map([source_type], |row| {
239 Ok((
240 row.get::<_, String>(0)?,
241 row.get::<_, Option<String>>(1)?,
242 row.get::<_, Option<String>>(2)?,
243 ))
244 })?
245 .collect::<rusqlite::Result<Vec<_>>>()?;
246 Ok(rows)
247 }
248
249 pub async fn get_stats(&self) -> Result<Vec<(String, i64)>> {
251 let conn = self.conn.lock().await;
252 let mut stmt = conn.prepare(
253 "SELECT source_type, COUNT(*) FROM documents GROUP BY source_type ORDER BY COUNT(*) DESC",
254 )?;
255 let stats = stmt
256 .query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)))?
257 .collect::<rusqlite::Result<Vec<_>>>()?;
258 Ok(stats)
259 }
260
261 pub async fn get_all_sync_states(&self) -> Result<Vec<SyncState>> {
263 let conn = self.conn.lock().await;
264 let mut stmt = conn.prepare(
265 "SELECT source_id, source_type, source_repo, last_cursor,
266 last_synced_at, next_run_at, status, error_message,
267 retry_count, items_found
268 FROM sync_state ORDER BY source_id",
269 )?;
270 let states = stmt
271 .query_map([], |row| {
272 Ok(SyncState {
273 source_id: row.get(0)?,
274 source_type: row.get(1)?,
275 source_repo: row.get(2)?,
276 last_cursor: row.get(3)?,
277 last_synced_at: row
278 .get::<_, Option<String>>(4)?
279 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
280 .map(|dt| dt.with_timezone(&chrono::Utc)),
281 next_run_at: row
282 .get::<_, Option<String>>(5)?
283 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
284 .map(|dt| dt.with_timezone(&chrono::Utc)),
285 status: SyncStatus::from_str(row.get::<_, String>(6)?.as_str()),
286 error_message: row.get(7)?,
287 retry_count: row.get(8)?,
288 items_found: row.get(9)?,
289 })
290 })?
291 .collect::<rusqlite::Result<Vec<_>>>()?;
292 Ok(states)
293 }
294
295 pub async fn compact_change_log(&self, max_age: std::time::Duration) -> Result<u64> {
297 let conn = self.conn.lock().await;
298 let cutoff = chrono::Utc::now() - chrono::Duration::from_std(max_age)?;
299 let deleted =
300 conn.execute("DELETE FROM change_log WHERE changed_at < ?1", [cutoff.to_rfc3339()])?;
301 Ok(deleted as u64)
302 }
303
304 pub async fn get_sync_state(&self, source_id: &str) -> Result<Option<SyncState>> {
306 let conn = self.conn.lock().await;
307 let mut stmt = conn.prepare(
308 "SELECT source_id, source_type, source_repo, last_cursor,
309 last_synced_at, next_run_at, status, error_message,
310 retry_count, items_found
311 FROM sync_state WHERE source_id = ?1",
312 )?;
313
314 let result = stmt
315 .query_row([source_id], |row| {
316 Ok(SyncState {
317 source_id: row.get(0)?,
318 source_type: row.get(1)?,
319 source_repo: row.get(2)?,
320 last_cursor: row.get(3)?,
321 last_synced_at: row
322 .get::<_, Option<String>>(4)?
323 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
324 .map(|dt| dt.with_timezone(&chrono::Utc)),
325 next_run_at: row
326 .get::<_, Option<String>>(5)?
327 .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
328 .map(|dt| dt.with_timezone(&chrono::Utc)),
329 status: SyncStatus::from_str(row.get::<_, String>(6)?.as_str()),
330 error_message: row.get(7)?,
331 retry_count: row.get(8)?,
332 items_found: row.get(9)?,
333 })
334 })
335 .optional()?;
336
337 Ok(result)
338 }
339
340 pub async fn update_sync_state(&self, state: &SyncState) -> Result<()> {
342 let conn = self.conn.lock().await;
343 conn.execute(
344 "INSERT INTO sync_state (source_id, source_type, source_repo, last_cursor,
345 last_synced_at, next_run_at, status, error_message, retry_count, items_found)
346 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
347 ON CONFLICT(source_id) DO UPDATE SET
348 last_cursor = excluded.last_cursor,
349 last_synced_at = excluded.last_synced_at,
350 next_run_at = excluded.next_run_at,
351 status = excluded.status,
352 error_message = excluded.error_message,
353 retry_count = excluded.retry_count,
354 items_found = excluded.items_found",
355 rusqlite::params![
356 &state.source_id,
357 &state.source_type,
358 &state.source_repo,
359 &state.last_cursor,
360 state.last_synced_at.map(|t| t.to_rfc3339()),
361 state.next_run_at.map(|t| t.to_rfc3339()),
362 state.status.as_str(),
363 &state.error_message,
364 state.retry_count,
365 state.items_found,
366 ],
367 )?;
368 Ok(())
369 }
370}
371
372impl SqliteStore {
373 async fn lookup_spec(
378 &self, source_type: SourceType, number: u32,
379 ) -> Result<Option<DocumentContext>> {
380 let conn = self.conn.lock().await;
381 let source_id = number.to_string();
382 let source_id_padded = format!("{:02}", number);
386
387 let doc = conn
389 .query_row(
390 "SELECT id, source_type, source_repo, source_id, title, body,
391 author, author_id, created_at, updated_at, parent_id, metadata, seq
392 FROM documents WHERE source_type = ?1 AND source_id IN (?2, ?3)",
393 rusqlite::params![source_type.as_str(), &source_id, &source_id_padded],
394 |row| {
395 let source_type_str: String = row.get(1)?;
396 let created_at_str: String = row.get(8)?;
397 let updated_at_str: Option<String> = row.get(9)?;
398 let metadata_str: Option<String> = row.get(11)?;
399 Ok(Document {
400 id: row.get(0)?,
401 source_type: SourceType::from_str(&source_type_str)
402 .unwrap_or(SourceType::GithubIssue),
403 source_repo: row.get(2)?,
404 source_id: row.get(3)?,
405 title: row.get(4)?,
406 body: row.get(5)?,
407 author: row.get(6)?,
408 author_id: row.get(7)?,
409 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
410 .map(|dt| dt.with_timezone(&chrono::Utc))
411 .unwrap_or_else(|_| chrono::Utc::now()),
412 updated_at: updated_at_str.and_then(|s| {
413 chrono::DateTime::parse_from_rfc3339(&s)
414 .ok()
415 .map(|dt| dt.with_timezone(&chrono::Utc))
416 }),
417 parent_id: row.get(10)?,
418 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
419 seq: row.get(12)?,
420 })
421 },
422 )
423 .optional()?;
424
425 let doc = match doc {
426 Some(d) => d,
427 None => return Ok(None),
428 };
429
430 let url = doc.url();
431 let doc_id = &doc.id;
432
433 let mut stmt = conn.prepare(
435 "SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
436 FROM refs WHERE from_doc_id = ?1",
437 )?;
438 let outgoing_refs = stmt
439 .query_map([doc_id], |row| {
440 let ref_type_str: String = row.get(3)?;
441 Ok(Reference {
442 id: row.get(0)?,
443 from_doc_id: row.get(1)?,
444 to_doc_id: row.get(2)?,
445 ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
446 to_external: row.get(4)?,
447 context: row.get(5)?,
448 })
449 })?
450 .collect::<rusqlite::Result<Vec<_>>>()?;
451
452 let prefix = match source_type {
454 SourceType::Bip => "BIP",
455 SourceType::Bolt => "BOLT",
456 SourceType::Blip => "bLIP",
457 SourceType::Lud => "LUD",
458 SourceType::Nut => "NUT",
459 _ => unreachable!(),
460 };
461 let external_pattern = format!("{}-{}", prefix, number);
462
463 let mut stmt = conn.prepare(
464 "SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
465 FROM refs WHERE to_doc_id = ?1 OR to_external = ?2",
466 )?;
467 let incoming_refs = stmt
468 .query_map(rusqlite::params![doc_id, &external_pattern], |row| {
469 let ref_type_str: String = row.get(3)?;
470 Ok(Reference {
471 id: row.get(0)?,
472 from_doc_id: row.get(1)?,
473 to_doc_id: row.get(2)?,
474 ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
475 to_external: row.get(4)?,
476 context: row.get(5)?,
477 })
478 })?
479 .collect::<rusqlite::Result<Vec<_>>>()?;
480
481 let mut stmt =
483 conn.prepare("SELECT concept_slug FROM concept_mentions WHERE doc_id = ?1")?;
484 let concepts: Vec<String> =
485 stmt.query_map([doc_id], |row| row.get(0))?.collect::<rusqlite::Result<Vec<_>>>()?;
486
487 Ok(Some(DocumentContext { document: doc, url, outgoing_refs, incoming_refs, concepts }))
488 }
489}
490
491trait OptionalRow<T> {
493 fn optional(self) -> rusqlite::Result<Option<T>>;
494}
495
496impl<T> OptionalRow<T> for rusqlite::Result<T> {
497 fn optional(self) -> rusqlite::Result<Option<T>> {
498 match self {
499 Ok(v) => Ok(Some(v)),
500 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
501 Err(e) => Err(e),
502 }
503 }
504}
505
506#[async_trait]
507impl KnowledgeStore for SqliteStore {
508 async fn search(&self, params: SearchParams) -> Result<SearchResults> {
509 let conn = self.conn.lock().await;
510 let limit = params.limit.unwrap_or(20).min(100);
511
512 let query_trimmed = params.query.trim();
513 let is_wildcard = query_trimmed.is_empty() || query_trimmed == "*";
514
515 if is_wildcard {
517 let has_filter = params.source_type.as_ref().is_some_and(|v| !v.is_empty())
518 || params.source_repo.as_ref().is_some_and(|v| !v.is_empty())
519 || params.author.is_some()
520 || params.after.is_some()
521 || params.before.is_some();
522 if !has_filter {
523 anyhow::bail!(
524 "wildcard queries require at least one filter \
525 (source_type, source_repo, author, after, or before)"
526 );
527 }
528 }
529
530 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
531 let mut param_idx = 1;
532
533 let mut sql = if is_wildcard {
534 String::from(
536 "SELECT d.id, d.source_type, d.source_repo, d.title,
537 NULL as snippet,
538 d.author, d.created_at,
539 0.0 as score,
540 d.source_id, d.parent_id, d.metadata
541 FROM documents d
542 WHERE 1=1",
543 )
544 } else {
545 let fts_query = build_fts_query(query_trimmed);
547 param_values.push(Box::new(fts_query));
548 param_idx = 2;
549 String::from(
550 "SELECT d.id, d.source_type, d.source_repo, d.title,
551 snippet(documents_fts, 1, '<mark>', '</mark>', '...', 64) as snippet,
552 d.author, d.created_at,
553 bm25(documents_fts, 5.0, 1.0) as score,
554 d.source_id, d.parent_id, d.metadata
555 FROM documents_fts
556 JOIN documents d ON d.rowid = documents_fts.rowid
557 WHERE documents_fts MATCH ?1",
558 )
559 };
560
561 if let Some(ref source_types) = params.source_type {
563 if !source_types.is_empty() {
564 let placeholders: Vec<String> = source_types
565 .iter()
566 .enumerate()
567 .map(|(i, _)| format!("?{}", param_idx + i))
568 .collect();
569 sql.push_str(&format!(" AND d.source_type IN ({})", placeholders.join(",")));
570 for st in source_types {
571 param_values.push(Box::new(st.as_str().to_string()));
572 param_idx += 1;
573 }
574 }
575 }
576
577 if let Some(ref repos) = params.source_repo {
579 if !repos.is_empty() {
580 let placeholders: Vec<String> =
581 repos.iter().enumerate().map(|(i, _)| format!("?{}", param_idx + i)).collect();
582 sql.push_str(&format!(" AND d.source_repo IN ({})", placeholders.join(",")));
583 for repo in repos {
584 param_values.push(Box::new(repo.clone()));
585 param_idx += 1;
586 }
587 }
588 }
589
590 if let Some(ref author) = params.author {
592 sql.push_str(&format!(" AND d.author = ?{}", param_idx));
593 param_values.push(Box::new(author.clone()));
594 param_idx += 1;
595 }
596
597 if let Some(ref after) = params.after {
599 sql.push_str(&format!(" AND d.created_at >= ?{}", param_idx));
600 param_values.push(Box::new(after.to_rfc3339()));
601 param_idx += 1;
602 }
603
604 if let Some(ref before) = params.before {
605 sql.push_str(&format!(" AND d.created_at <= ?{}", param_idx));
606 param_values.push(Box::new(before.to_rfc3339()));
607 let _ = param_idx;
608 }
609
610 if is_wildcard {
611 sql.push_str(" ORDER BY d.created_at DESC LIMIT ?");
612 } else {
613 sql.push_str(" ORDER BY score LIMIT ?");
614 }
615 param_values.push(Box::new(limit as i64));
616
617 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
618 param_values.iter().map(|b| b.as_ref()).collect();
619
620 let mut stmt = conn.prepare(&sql)?;
621
622 let results: Vec<SearchResult> = stmt
623 .query_map(param_refs.as_slice(), |row| {
624 let source_type_str: String = row.get(1)?;
625 let created_at_str: String = row.get(6)?;
626 let source_type =
627 SourceType::from_str(&source_type_str).unwrap_or(SourceType::GithubIssue);
628 let source_repo: Option<String> = row.get(2)?;
629 let source_id: String = row.get(8)?;
630 let parent_id: Option<String> = row.get(9)?;
631 let metadata_str: Option<String> = row.get(10)?;
632 let doc = Document {
633 id: String::new(),
634 source_type: source_type.clone(),
635 source_repo: source_repo.clone(),
636 source_id,
637 title: None,
638 body: None,
639 author: None,
640 author_id: None,
641 created_at: chrono::Utc::now(),
642 updated_at: None,
643 parent_id,
644 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
645 seq: None,
646 };
647 Ok(SearchResult {
648 id: row.get(0)?,
649 source_type,
650 source_repo,
651 title: row.get(3)?,
652 snippet: row.get(4)?,
653 author: row.get(5)?,
654 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
655 .map(|dt| dt.with_timezone(&chrono::Utc))
656 .unwrap_or_else(|_| chrono::Utc::now()),
657 score: row.get::<_, f64>(7)?.abs(),
658 url: doc.url(),
659 concepts: Vec::new(),
660 })
661 })?
662 .collect::<rusqlite::Result<Vec<_>>>()?;
663
664 let mut concept_stmt =
666 conn.prepare("SELECT concept_slug FROM concept_mentions WHERE doc_id = ?1")?;
667 let results: Vec<SearchResult> = results
668 .into_iter()
669 .map(|mut r| {
670 if let Ok(concepts) = concept_stmt
671 .query_map([&r.id], |row| row.get::<_, String>(0))
672 .and_then(|rows| rows.collect::<rusqlite::Result<Vec<_>>>())
673 {
674 r.concepts = concepts;
675 }
676 r
677 })
678 .collect();
679
680 let total_count = results.len() as u32;
681 Ok(SearchResults { results, total_count })
682 }
683
684 async fn get_document(&self, id: &str) -> Result<Option<DocumentContext>> {
685 let conn = self.conn.lock().await;
686
687 let doc = conn
689 .query_row(
690 "SELECT id, source_type, source_repo, source_id, title, body,
691 author, author_id, created_at, updated_at, parent_id, metadata, seq
692 FROM documents WHERE id = ?1",
693 [id],
694 |row| {
695 let source_type_str: String = row.get(1)?;
696 let created_at_str: String = row.get(8)?;
697 let updated_at_str: Option<String> = row.get(9)?;
698 let metadata_str: Option<String> = row.get(11)?;
699 Ok(Document {
700 id: row.get(0)?,
701 source_type: SourceType::from_str(&source_type_str)
702 .unwrap_or(SourceType::GithubIssue),
703 source_repo: row.get(2)?,
704 source_id: row.get(3)?,
705 title: row.get(4)?,
706 body: row.get(5)?,
707 author: row.get(6)?,
708 author_id: row.get(7)?,
709 created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
710 .map(|dt| dt.with_timezone(&chrono::Utc))
711 .unwrap_or_else(|_| chrono::Utc::now()),
712 updated_at: updated_at_str.and_then(|s| {
713 chrono::DateTime::parse_from_rfc3339(&s)
714 .ok()
715 .map(|dt| dt.with_timezone(&chrono::Utc))
716 }),
717 parent_id: row.get(10)?,
718 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
719 seq: row.get(12)?,
720 })
721 },
722 )
723 .optional()?;
724
725 let doc = match doc {
726 Some(d) => d,
727 None => return Ok(None),
728 };
729
730 let url = doc.url();
731
732 let mut stmt = conn.prepare(
734 "SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
735 FROM refs WHERE from_doc_id = ?1",
736 )?;
737 let outgoing_refs = stmt
738 .query_map([id], |row| {
739 let ref_type_str: String = row.get(3)?;
740 Ok(Reference {
741 id: row.get(0)?,
742 from_doc_id: row.get(1)?,
743 to_doc_id: row.get(2)?,
744 ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
745 to_external: row.get(4)?,
746 context: row.get(5)?,
747 })
748 })?
749 .collect::<rusqlite::Result<Vec<_>>>()?;
750
751 let mut stmt = conn.prepare(
753 "SELECT id, from_doc_id, to_doc_id, ref_type, to_external, context
754 FROM refs WHERE to_doc_id = ?1",
755 )?;
756 let incoming_refs = stmt
757 .query_map([id], |row| {
758 let ref_type_str: String = row.get(3)?;
759 Ok(Reference {
760 id: row.get(0)?,
761 from_doc_id: row.get(1)?,
762 to_doc_id: row.get(2)?,
763 ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
764 to_external: row.get(4)?,
765 context: row.get(5)?,
766 })
767 })?
768 .collect::<rusqlite::Result<Vec<_>>>()?;
769
770 let mut stmt =
772 conn.prepare("SELECT concept_slug FROM concept_mentions WHERE doc_id = ?1")?;
773 let concepts: Vec<String> =
774 stmt.query_map([id], |row| row.get(0))?.collect::<rusqlite::Result<Vec<_>>>()?;
775
776 Ok(Some(DocumentContext { document: doc, url, outgoing_refs, incoming_refs, concepts }))
777 }
778
779 async fn get_references(
780 &self, entity: &str, ref_type: Option<&str>, limit: u32,
781 ) -> Result<Vec<Reference>> {
782 let conn = self.conn.lock().await;
783
784 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = match ref_type {
785 Some(rt) => (
786 "SELECT r.id, r.from_doc_id, r.to_doc_id, r.ref_type, r.to_external, r.context \
787 FROM refs r \
788 WHERE r.to_external = ?1 AND r.ref_type = ?2 \
789 ORDER BY r.id \
790 LIMIT ?3"
791 .to_string(),
792 vec![
793 Box::new(entity.to_string()),
794 Box::new(rt.to_string()),
795 Box::new(limit as i64),
796 ],
797 ),
798 None => (
799 "SELECT r.id, r.from_doc_id, r.to_doc_id, r.ref_type, r.to_external, r.context \
800 FROM refs r \
801 WHERE r.to_external = ?1 \
802 ORDER BY r.id \
803 LIMIT ?2"
804 .to_string(),
805 vec![Box::new(entity.to_string()), Box::new(limit as i64)],
806 ),
807 };
808
809 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
810 params.iter().map(|b| b.as_ref()).collect();
811
812 let mut stmt = conn.prepare(&sql)?;
813 let refs = stmt
814 .query_map(param_refs.as_slice(), |row| {
815 let ref_type_str: String = row.get(3)?;
816 Ok(Reference {
817 id: row.get(0)?,
818 from_doc_id: row.get(1)?,
819 to_doc_id: row.get(2)?,
820 ref_type: RefType::from_str(&ref_type_str).unwrap_or(RefType::MentionsIssue),
821 to_external: row.get(4)?,
822 context: row.get(5)?,
823 })
824 })?
825 .collect::<rusqlite::Result<Vec<_>>>()?;
826
827 Ok(refs)
828 }
829
830 async fn lookup_bip(&self, number: u32) -> Result<Option<DocumentContext>> {
831 self.lookup_spec(SourceType::Bip, number).await
832 }
833
834 async fn lookup_bolt(&self, number: u32) -> Result<Option<DocumentContext>> {
835 self.lookup_spec(SourceType::Bolt, number).await
836 }
837
838 async fn lookup_blip(&self, number: u32) -> Result<Option<DocumentContext>> {
839 self.lookup_spec(SourceType::Blip, number).await
840 }
841
842 async fn lookup_lud(&self, number: u32) -> Result<Option<DocumentContext>> {
843 self.lookup_spec(SourceType::Lud, number).await
844 }
845
846 async fn lookup_nut(&self, number: u32) -> Result<Option<DocumentContext>> {
847 self.lookup_spec(SourceType::Nut, number).await
848 }
849
850 async fn timeline(
851 &self, concept: &str, after: Option<chrono::DateTime<chrono::Utc>>,
852 before: Option<chrono::DateTime<chrono::Utc>>,
853 ) -> Result<bkb_core::model::Timeline> {
854 let conn = self.conn.lock().await;
855
856 let mut sql = String::from(
857 "SELECT d.id, d.source_type, d.title, d.created_at, d.source_repo, d.source_id,
858 d.parent_id, d.metadata
859 FROM concept_mentions cm
860 JOIN documents d ON d.id = cm.doc_id
861 WHERE cm.concept_slug = ?1",
862 );
863
864 let mut param_values: Vec<Box<dyn rusqlite::types::ToSql>> =
865 vec![Box::new(concept.to_string())];
866 let mut param_idx = 2;
867
868 if let Some(ref after) = after {
869 sql.push_str(&format!(" AND d.created_at >= ?{}", param_idx));
870 param_values.push(Box::new(after.to_rfc3339()));
871 param_idx += 1;
872 }
873
874 if let Some(ref before) = before {
875 sql.push_str(&format!(" AND d.created_at <= ?{}", param_idx));
876 param_values.push(Box::new(before.to_rfc3339()));
877 let _ = param_idx;
878 }
879
880 sql.push_str(" ORDER BY d.created_at ASC LIMIT 200");
881
882 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
883 param_values.iter().map(|b| b.as_ref()).collect();
884
885 let mut stmt = conn.prepare(&sql)?;
886
887 let events: Vec<bkb_core::model::TimelineEvent> = stmt
888 .query_map(param_refs.as_slice(), |row| {
889 let source_type_str: String = row.get(1)?;
890 let created_at_str: String = row.get(3)?;
891 let source_type =
892 SourceType::from_str(&source_type_str).unwrap_or(SourceType::GithubIssue);
893 let source_repo: Option<String> = row.get(4)?;
894 let source_id: String = row.get(5)?;
895 let parent_id: Option<String> = row.get(6)?;
896 let metadata_str: Option<String> = row.get(7)?;
897 let doc = Document {
898 id: String::new(),
899 source_type: source_type.clone(),
900 source_repo: source_repo.clone(),
901 source_id,
902 title: None,
903 body: None,
904 author: None,
905 author_id: None,
906 created_at: chrono::Utc::now(),
907 updated_at: None,
908 parent_id,
909 metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
910 seq: None,
911 };
912 let date = created_at_str.get(..10).unwrap_or(&created_at_str).to_string();
913 Ok(bkb_core::model::TimelineEvent {
914 date,
915 source_type,
916 title: row.get(2)?,
917 id: row.get(0)?,
918 url: doc.url(),
919 })
920 })?
921 .collect::<rusqlite::Result<Vec<_>>>()?;
922
923 Ok(bkb_core::model::Timeline { concept: concept.to_string(), events })
924 }
925
926 async fn find_commit(
927 &self, query: &str, repo: Option<&str>,
928 ) -> Result<Vec<bkb_core::model::CommitContext>> {
929 let mut source_types = vec![SourceType::Commit];
931 source_types.push(SourceType::GithubPr);
932
933 let params = SearchParams {
934 query: query.to_string(),
935 source_type: Some(source_types),
936 source_repo: repo.map(|r| vec![r.to_string()]),
937 limit: Some(10),
938 ..Default::default()
939 };
940
941 let results = self.search(params).await?;
942
943 let mut contexts = Vec::new();
944 for result in results.results {
945 let doc_ctx = self.get_document(&result.id).await?;
946 if let Some(ctx) = doc_ctx {
947 let associated_prs = if ctx.document.source_type == SourceType::Commit {
950 self.find_associated_prs(
951 &ctx.document.source_id,
952 ctx.document.source_repo.as_deref(),
953 )
954 .await
955 .unwrap_or_default()
956 } else {
957 Vec::new()
958 };
959
960 contexts.push(bkb_core::model::CommitContext {
961 url: ctx.url.clone(),
962 document: ctx.document,
963 associated_prs,
964 });
965 }
966 }
967
968 Ok(contexts)
969 }
970}
971
972impl SqliteStore {
973 async fn find_associated_prs(
976 &self, commit_sha: &str, source_repo: Option<&str>,
977 ) -> Result<Vec<SearchResult>> {
978 let conn = self.conn.lock().await;
979 let sha = commit_sha.to_string();
980 let repo = source_repo.map(String::from);
981
982 let sha_prefix = &sha[..sha.len().min(12)];
984
985 let mut sql = String::from(
986 "SELECT d.id, d.source_type, d.source_repo, d.title, d.author, d.created_at \
987 FROM documents d \
988 JOIN documents_fts ON documents_fts.rowid = d.rowid \
989 WHERE d.source_type = 'github_pr' \
990 AND documents_fts MATCH ?1",
991 );
992
993 let fts_sha = build_fts_query(sha_prefix);
994 let params: Vec<Box<dyn rusqlite::types::ToSql>> = if let Some(ref repo) = repo {
995 sql.push_str(" AND d.source_repo = ?2");
996 vec![Box::new(fts_sha.clone()), Box::new(repo.clone())]
997 } else {
998 vec![Box::new(fts_sha.clone())]
999 };
1000
1001 sql.push_str(" LIMIT 5");
1002
1003 let mut stmt = conn.prepare(&sql)?;
1004 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1005 params.iter().map(|p| p.as_ref()).collect();
1006
1007 let results = stmt
1008 .query_map(param_refs.as_slice(), |row| {
1009 Ok(SearchResult {
1010 id: row.get(0)?,
1011 source_type: SourceType::from_str(&row.get::<_, String>(1)?)
1012 .unwrap_or(SourceType::GithubPr),
1013 source_repo: row.get(2)?,
1014 title: row.get(3)?,
1015 snippet: None,
1016 author: row.get(4)?,
1017 created_at: row
1018 .get::<_, String>(5)
1019 .ok()
1020 .and_then(|s| s.parse().ok())
1021 .unwrap_or_else(chrono::Utc::now),
1022 score: 0.0,
1023 url: None,
1024 concepts: Vec::new(),
1025 })
1026 })?
1027 .filter_map(|r| r.ok())
1028 .collect();
1029
1030 Ok(results)
1031 }
1032}
1033
1034fn build_fts_query(input: &str) -> String {
1038 let trimmed = input.trim();
1039 if trimmed.is_empty() {
1040 return "\"\"".to_string();
1041 }
1042
1043 let trimmed = &trimmed.replace('"', "");
1046 let trimmed = trimmed.trim();
1047 if trimmed.is_empty() {
1048 return "\"\"".to_string();
1049 }
1050
1051 let terms: Vec<&str> = trimmed.split_whitespace().collect();
1053
1054 let is_fts_keyword = |t: &str| matches!(t, "AND" | "OR" | "NOT" | "NEAR");
1059
1060 let needs_quoting =
1063 |t: &str| t.contains(':') || t.contains('/') || t.contains('#') || t.contains('-');
1064
1065 let quoted: Vec<String> = terms
1066 .iter()
1067 .map(|t| {
1068 if is_fts_keyword(t) || t.ends_with('*') {
1069 t.to_string()
1071 } else if needs_quoting(t) {
1072 format!("\"{}\"", t)
1073 } else {
1074 t.to_string()
1075 }
1076 })
1077 .collect();
1078
1079 if quoted.len() == 1 {
1080 let term = "ed[0];
1081 if term.starts_with('"') || term.ends_with('*') {
1083 term.clone()
1084 } else {
1085 format!("{}*", term)
1086 }
1087 } else {
1088 quoted.join(" ")
1090 }
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use chrono::Utc;
1096
1097 use super::*;
1098 use bkb_core::model::{Document, SourceType};
1099
1100 fn test_doc(id: &str, title: &str, body: &str) -> Document {
1101 Document {
1102 id: id.to_string(),
1103 source_type: SourceType::GithubIssue,
1104 source_repo: Some("bitcoin/bitcoin".to_string()),
1105 source_id: "1".to_string(),
1106 title: Some(title.to_string()),
1107 body: Some(body.to_string()),
1108 author: Some("satoshi".to_string()),
1109 author_id: None,
1110 created_at: Utc::now(),
1111 updated_at: None,
1112 parent_id: None,
1113 metadata: None,
1114 seq: None,
1115 }
1116 }
1117
1118 #[tokio::test]
1119 async fn test_upsert_and_get_document() {
1120 let store = SqliteStore::open_in_memory().unwrap();
1121 let doc =
1122 test_doc("github_issue:bitcoin/bitcoin:1", "Test issue", "This is a test issue body");
1123
1124 store.upsert_document(&doc).await.unwrap();
1125
1126 let ctx = store.get_document("github_issue:bitcoin/bitcoin:1").await.unwrap();
1127 assert!(ctx.is_some());
1128 let ctx = ctx.unwrap();
1129 assert_eq!(ctx.document.title.as_deref(), Some("Test issue"));
1130 assert_eq!(ctx.document.body.as_deref(), Some("This is a test issue body"));
1131 }
1132
1133 #[tokio::test]
1134 async fn test_upsert_updates_existing() {
1135 let store = SqliteStore::open_in_memory().unwrap();
1136 let doc = test_doc("github_issue:bitcoin/bitcoin:1", "Original title", "Original body");
1137 store.upsert_document(&doc).await.unwrap();
1138
1139 let mut updated = doc.clone();
1140 updated.title = Some("Updated title".to_string());
1141 store.upsert_document(&updated).await.unwrap();
1142
1143 let ctx = store.get_document("github_issue:bitcoin/bitcoin:1").await.unwrap().unwrap();
1144 assert_eq!(ctx.document.title.as_deref(), Some("Updated title"));
1145 }
1146
1147 #[tokio::test]
1148 async fn test_search_fts() {
1149 let store = SqliteStore::open_in_memory().unwrap();
1150
1151 let doc1 = test_doc(
1152 "github_issue:bitcoin/bitcoin:1",
1153 "Add taproot support",
1154 "Implementing BIP-340 and BIP-341 for schnorr signatures",
1155 );
1156 let mut doc2 = test_doc(
1157 "github_issue:bitcoin/bitcoin:2",
1158 "Fix mempool bug",
1159 "There is a bug in the mempool validation logic",
1160 );
1161 doc2.source_id = "2".to_string();
1162
1163 store.upsert_document(&doc1).await.unwrap();
1164 store.upsert_document(&doc2).await.unwrap();
1165
1166 let results = store
1167 .search(SearchParams { query: "taproot".to_string(), ..Default::default() })
1168 .await
1169 .unwrap();
1170
1171 assert_eq!(results.results.len(), 1);
1172 assert!(results.results[0].title.as_deref().unwrap().contains("taproot"));
1173 }
1174
1175 #[tokio::test]
1178 async fn test_search_fts_with_colon_in_query() {
1179 let store = SqliteStore::open_in_memory().unwrap();
1180
1181 let mut doc = test_doc(
1182 "github_pr:lightningdevkit/ldk-node:4463",
1183 "Fix lightning channel close",
1184 "Resolves an issue with force-close in ldk-node",
1185 );
1186 doc.source_type = SourceType::GithubPr;
1187 doc.source_repo = Some("lightningdevkit/ldk-node".to_string());
1188 doc.source_id = "4463".to_string();
1189 store.upsert_document(&doc).await.unwrap();
1190
1191 let results = store
1193 .search(SearchParams {
1194 query: "lightningdevkit/ldk-node:4463".to_string(),
1195 ..Default::default()
1196 })
1197 .await;
1198 assert!(results.is_ok(), "search with colon must not fail: {:?}", results.err());
1199
1200 let ctx = store.get_document("github_pr:lightningdevkit/ldk-node:4463").await.unwrap();
1202 assert!(ctx.is_some());
1203 assert_eq!(ctx.unwrap().document.source_id, "4463");
1204 }
1205
1206 #[tokio::test]
1210 async fn test_search_fts_operator_substring_false_positive() {
1211 let store = SqliteStore::open_in_memory().unwrap();
1212
1213 let doc = test_doc(
1214 "github_issue:lightningdevkit/rust-lightning:1",
1215 "ChannelMonitor persistence ordering",
1216 "The ChannelManager and ChannelMonitor must be persisted in the right order",
1217 );
1218 store.upsert_document(&doc).await.unwrap();
1219
1220 let results = store
1223 .search(SearchParams {
1224 query: "ChannelMonitor ChannelManager ordering".to_string(),
1225 ..Default::default()
1226 })
1227 .await;
1228 assert!(
1229 results.is_ok(),
1230 "search with OR/AND substrings must not fail: {:?}",
1231 results.err()
1232 );
1233 assert!(!results.unwrap().results.is_empty());
1234 }
1235
1236 #[tokio::test]
1239 async fn test_search_fts_hyphenated_term() {
1240 let store = SqliteStore::open_in_memory().unwrap();
1241
1242 let doc = test_doc(
1243 "github_issue:lightningdevkit/rust-lightning:42",
1244 "rust-lightning bug fix",
1245 "Fix a bug in rust-lightning channel handling",
1246 );
1247 store.upsert_document(&doc).await.unwrap();
1248
1249 let results = store
1250 .search(SearchParams { query: "rust-lightning".to_string(), ..Default::default() })
1251 .await;
1252 assert!(results.is_ok(), "hyphenated search must not fail: {:?}", results.err());
1253 }
1254
1255 #[tokio::test]
1256 async fn test_search_fts_quotes_with_colon() {
1257 let store = SqliteStore::open_in_memory().unwrap();
1260
1261 let doc = test_doc(
1262 "github_issue:lightningdevkit/rust-lightning:99",
1263 "lightning channel close",
1264 "Details about closing a lightning channel",
1265 );
1266 store.upsert_document(&doc).await.unwrap();
1267
1268 let results = store
1269 .search(SearchParams {
1270 query: "lightning:\"channel close\"".to_string(),
1271 ..Default::default()
1272 })
1273 .await;
1274 assert!(results.is_ok(), "quoted+colon search must not crash: {:?}", results.err());
1275 }
1276
1277 #[test]
1278 fn test_build_fts_query_strips_quotes() {
1279 assert_eq!(build_fts_query("lightning:\"channel close\""), "\"lightning:channel\" close");
1281
1282 assert_eq!(build_fts_query("\"\""), "\"\"");
1284
1285 assert_eq!(build_fts_query("hello world"), "hello world");
1287 }
1288
1289 #[tokio::test]
1290 async fn test_get_nonexistent_document() {
1291 let store = SqliteStore::open_in_memory().unwrap();
1292 let result = store.get_document("nonexistent:id").await.unwrap();
1293 assert!(result.is_none());
1294 }
1295
1296 #[tokio::test]
1297 async fn test_references() {
1298 let store = SqliteStore::open_in_memory().unwrap();
1299
1300 let doc = test_doc("github_issue:bitcoin/bitcoin:1", "Test", "Body");
1301 store.upsert_document(&doc).await.unwrap();
1302
1303 let reference = Reference {
1304 id: None,
1305 from_doc_id: "github_issue:bitcoin/bitcoin:1".to_string(),
1306 to_doc_id: None,
1307 ref_type: RefType::ReferencesBip,
1308 to_external: Some("BIP-340".to_string()),
1309 context: Some("Implementing BIP-340 schnorr".to_string()),
1310 };
1311 store.insert_reference(&reference).await.unwrap();
1312
1313 let ctx = store.get_document("github_issue:bitcoin/bitcoin:1").await.unwrap().unwrap();
1314 assert_eq!(ctx.outgoing_refs.len(), 1);
1315 assert_eq!(ctx.outgoing_refs[0].ref_type, RefType::ReferencesBip);
1316 }
1317
1318 #[tokio::test]
1319 async fn test_sync_state() {
1320 let store = SqliteStore::open_in_memory().unwrap();
1321
1322 let state = SyncState {
1323 source_id: "github:bitcoin/bitcoin:issues".to_string(),
1324 source_type: "github_issue".to_string(),
1325 source_repo: Some("bitcoin/bitcoin".to_string()),
1326 last_cursor: Some("2024-01-01T00:00:00Z".to_string()),
1327 last_synced_at: Some(Utc::now()),
1328 next_run_at: None,
1329 status: SyncStatus::Ok,
1330 error_message: None,
1331 retry_count: 0,
1332 items_found: 42,
1333 };
1334
1335 store.update_sync_state(&state).await.unwrap();
1336
1337 let retrieved = store.get_sync_state("github:bitcoin/bitcoin:issues").await.unwrap();
1338 assert!(retrieved.is_some());
1339 let retrieved = retrieved.unwrap();
1340 assert_eq!(retrieved.items_found, 42);
1341 assert_eq!(retrieved.status, SyncStatus::Ok);
1342 }
1343
1344 #[tokio::test]
1345 async fn test_lookup_lud() {
1346 let store = SqliteStore::open_in_memory().unwrap();
1347
1348 let doc = Document {
1350 id: Document::make_id(&SourceType::Lud, None, "6"),
1351 source_type: SourceType::Lud,
1352 source_repo: None,
1353 source_id: "6".to_string(),
1354 title: Some("LUD-06: lnurl-pay".to_string()),
1355 body: Some("# lnurl-pay\n\nSpec content here.".to_string()),
1356 author: None,
1357 author_id: None,
1358 created_at: Utc::now(),
1359 updated_at: None,
1360 parent_id: None,
1361 metadata: Some(
1362 serde_json::json!({ "url": "https://github.com/lnurl/luds/blob/luds/06.md" }),
1363 ),
1364 seq: None,
1365 };
1366 store.upsert_document(&doc).await.unwrap();
1367
1368 let ctx = store.lookup_lud(6).await.unwrap();
1369 assert!(ctx.is_some(), "lookup_lud(6) should find the document");
1370 let ctx = ctx.unwrap();
1371 assert_eq!(ctx.document.title.as_deref(), Some("LUD-06: lnurl-pay"));
1372 assert_eq!(ctx.url.as_deref(), Some("https://github.com/lnurl/luds/blob/luds/06.md"));
1373 }
1374
1375 #[tokio::test]
1376 async fn test_lookup_lud_padded_source_id() {
1377 let store = SqliteStore::open_in_memory().unwrap();
1378
1379 let doc = Document {
1381 id: Document::make_id(&SourceType::Lud, None, "06"),
1382 source_type: SourceType::Lud,
1383 source_repo: None,
1384 source_id: "06".to_string(),
1385 title: Some("LUD-06: lnurl-pay".to_string()),
1386 body: Some("# lnurl-pay\n\nSpec content here.".to_string()),
1387 author: None,
1388 author_id: None,
1389 created_at: Utc::now(),
1390 updated_at: None,
1391 parent_id: None,
1392 metadata: None,
1393 seq: None,
1394 };
1395 store.upsert_document(&doc).await.unwrap();
1396
1397 let ctx = store.lookup_lud(6).await.unwrap();
1399 assert!(ctx.is_some(), "lookup_lud(6) should find padded source_id '06'");
1400 assert_eq!(ctx.unwrap().document.title.as_deref(), Some("LUD-06: lnurl-pay"));
1401 }
1402
1403 #[tokio::test]
1404 async fn test_lookup_nut_zero() {
1405 let store = SqliteStore::open_in_memory().unwrap();
1406
1407 let doc = Document {
1409 id: Document::make_id(&SourceType::Nut, None, "0"),
1410 source_type: SourceType::Nut,
1411 source_repo: None,
1412 source_id: "0".to_string(),
1413 title: Some("NUT-00: Notation, Usage, and Terminology".to_string()),
1414 body: Some("# NUT-00\n\nSpec content here.".to_string()),
1415 author: None,
1416 author_id: None,
1417 created_at: Utc::now(),
1418 updated_at: None,
1419 parent_id: None,
1420 metadata: Some(
1421 serde_json::json!({ "url": "https://github.com/cashubtc/nuts/blob/main/00.md" }),
1422 ),
1423 seq: None,
1424 };
1425 store.upsert_document(&doc).await.unwrap();
1426
1427 let ctx = store.lookup_nut(0).await.unwrap();
1428 assert!(ctx.is_some(), "lookup_nut(0) should find NUT-00");
1429 let ctx = ctx.unwrap();
1430 assert_eq!(ctx.document.title.as_deref(), Some("NUT-00: Notation, Usage, and Terminology"));
1431 assert_eq!(ctx.url.as_deref(), Some("https://github.com/cashubtc/nuts/blob/main/00.md"));
1432 }
1433
1434 #[tokio::test]
1435 async fn test_reset_source_type_only_deletes_targeted() {
1436 let store = SqliteStore::open_in_memory().unwrap();
1437
1438 let lud_doc = Document {
1440 id: Document::make_id(&SourceType::Lud, None, "6"),
1441 source_type: SourceType::Lud,
1442 source_repo: None,
1443 source_id: "6".to_string(),
1444 title: Some("LUD-06".to_string()),
1445 body: Some("lnurl-pay spec".to_string()),
1446 author: None,
1447 author_id: None,
1448 created_at: Utc::now(),
1449 updated_at: None,
1450 parent_id: None,
1451 metadata: None,
1452 seq: None,
1453 };
1454 store.upsert_document(&lud_doc).await.unwrap();
1455
1456 let bip_doc = Document {
1458 id: Document::make_id(&SourceType::Bip, None, "340"),
1459 source_type: SourceType::Bip,
1460 source_repo: None,
1461 source_id: "340".to_string(),
1462 title: Some("BIP-340".to_string()),
1463 body: Some("Schnorr signatures".to_string()),
1464 author: None,
1465 author_id: None,
1466 created_at: Utc::now(),
1467 updated_at: None,
1468 parent_id: None,
1469 metadata: None,
1470 seq: None,
1471 };
1472 store.upsert_document(&bip_doc).await.unwrap();
1473
1474 let issue_doc = Document {
1476 id: "github_issue:bitcoin/bitcoin:1".to_string(),
1477 source_type: SourceType::GithubIssue,
1478 source_repo: Some("bitcoin/bitcoin".to_string()),
1479 source_id: "1".to_string(),
1480 title: Some("Test issue".to_string()),
1481 body: Some("Mentions LUD-6 and BIP-340".to_string()),
1482 author: Some("alice".to_string()),
1483 author_id: None,
1484 created_at: Utc::now(),
1485 updated_at: None,
1486 parent_id: None,
1487 metadata: None,
1488 seq: None,
1489 };
1490 store.upsert_document(&issue_doc).await.unwrap();
1491
1492 let lud_ref = Reference {
1494 id: None,
1495 from_doc_id: issue_doc.id.clone(),
1496 to_doc_id: None,
1497 ref_type: RefType::ReferencesLud,
1498 to_external: Some("LUD-6".to_string()),
1499 context: None,
1500 };
1501 store.insert_reference(&lud_ref).await.unwrap();
1502
1503 let bip_ref = Reference {
1505 id: None,
1506 from_doc_id: issue_doc.id.clone(),
1507 to_doc_id: None,
1508 ref_type: RefType::ReferencesBip,
1509 to_external: Some("BIP-340".to_string()),
1510 context: None,
1511 };
1512 store.insert_reference(&bip_ref).await.unwrap();
1513
1514 store.upsert_concept_mention(&lud_doc.id, "taproot", 1.0).await.unwrap();
1516
1517 let lud_state = SyncState {
1519 source_id: "specs:luds".to_string(),
1520 source_type: "specs".to_string(),
1521 source_repo: None,
1522 last_cursor: None,
1523 last_synced_at: Some(Utc::now()),
1524 next_run_at: None,
1525 status: SyncStatus::Ok,
1526 error_message: None,
1527 retry_count: 0,
1528 items_found: 21,
1529 };
1530 store.update_sync_state(&lud_state).await.unwrap();
1531
1532 let bip_state = SyncState {
1533 source_id: "specs:bips".to_string(),
1534 source_type: "specs".to_string(),
1535 source_repo: None,
1536 last_cursor: None,
1537 last_synced_at: Some(Utc::now()),
1538 next_run_at: None,
1539 status: SyncStatus::Ok,
1540 error_message: None,
1541 retry_count: 0,
1542 items_found: 300,
1543 };
1544 store.update_sync_state(&bip_state).await.unwrap();
1545
1546 assert!(store.get_document(&lud_doc.id).await.unwrap().is_some());
1548 assert!(store.get_document(&bip_doc.id).await.unwrap().is_some());
1549 assert!(store.get_document(&issue_doc.id).await.unwrap().is_some());
1550
1551 let deleted = store.reset_source_type("lud", &["specs:luds".into()]).await.unwrap();
1553 assert_eq!(deleted, 1, "should delete exactly 1 LUD document");
1554
1555 assert!(store.get_document(&lud_doc.id).await.unwrap().is_none());
1557
1558 assert!(store.get_document(&bip_doc.id).await.unwrap().is_some());
1560 let issue_ctx = store.get_document(&issue_doc.id).await.unwrap().unwrap();
1561 assert_eq!(issue_ctx.document.title.as_deref(), Some("Test issue"));
1562
1563 assert_eq!(issue_ctx.outgoing_refs.len(), 2);
1566
1567 assert!(store.get_sync_state("specs:luds").await.unwrap().is_none());
1569 assert!(store.get_sync_state("specs:bips").await.unwrap().is_some());
1570 }
1571
1572 #[tokio::test]
1573 async fn test_reset_source_type_with_like_pattern() {
1574 let store = SqliteStore::open_in_memory().unwrap();
1575
1576 let doc1 = Document {
1578 id: "github_issue:bitcoin/bitcoin:1".to_string(),
1579 source_type: SourceType::GithubIssue,
1580 source_repo: Some("bitcoin/bitcoin".to_string()),
1581 source_id: "1".to_string(),
1582 title: Some("Bitcoin issue".to_string()),
1583 body: Some("body".to_string()),
1584 author: None,
1585 author_id: None,
1586 created_at: Utc::now(),
1587 updated_at: None,
1588 parent_id: None,
1589 metadata: None,
1590 seq: None,
1591 };
1592 let doc2 = Document {
1593 id: "github_issue:lightningdevkit/ldk-node:1".to_string(),
1594 source_type: SourceType::GithubIssue,
1595 source_repo: Some("lightningdevkit/ldk-node".to_string()),
1596 source_id: "1".to_string(),
1597 title: Some("LDK issue".to_string()),
1598 body: Some("body".to_string()),
1599 author: None,
1600 author_id: None,
1601 created_at: Utc::now(),
1602 updated_at: None,
1603 parent_id: None,
1604 metadata: None,
1605 seq: None,
1606 };
1607 let bolt_doc = Document {
1608 id: Document::make_id(&SourceType::Bolt, None, "2"),
1609 source_type: SourceType::Bolt,
1610 source_repo: None,
1611 source_id: "2".to_string(),
1612 title: Some("BOLT-2".to_string()),
1613 body: Some("Peer protocol".to_string()),
1614 author: None,
1615 author_id: None,
1616 created_at: Utc::now(),
1617 updated_at: None,
1618 parent_id: None,
1619 metadata: None,
1620 seq: None,
1621 };
1622 store.upsert_document(&doc1).await.unwrap();
1623 store.upsert_document(&doc2).await.unwrap();
1624 store.upsert_document(&bolt_doc).await.unwrap();
1625
1626 for sid in &["github:bitcoin/bitcoin:issues", "github:lightningdevkit/ldk-node:issues"] {
1628 let state = SyncState {
1629 source_id: sid.to_string(),
1630 source_type: "github".to_string(),
1631 source_repo: None,
1632 last_cursor: None,
1633 last_synced_at: Some(Utc::now()),
1634 next_run_at: None,
1635 status: SyncStatus::Ok,
1636 error_message: None,
1637 retry_count: 0,
1638 items_found: 10,
1639 };
1640 store.update_sync_state(&state).await.unwrap();
1641 }
1642
1643 let deleted =
1645 store.reset_source_type("github_issue", &["github:%:issues".into()]).await.unwrap();
1646 assert_eq!(deleted, 2, "should delete both GitHub issues");
1647
1648 assert!(store.get_document(&doc1.id).await.unwrap().is_none());
1650 assert!(store.get_document(&doc2.id).await.unwrap().is_none());
1651
1652 assert!(store.get_document(&bolt_doc.id).await.unwrap().is_some());
1654
1655 assert!(store.get_sync_state("github:bitcoin/bitcoin:issues").await.unwrap().is_none());
1657 assert!(store
1658 .get_sync_state("github:lightningdevkit/ldk-node:issues")
1659 .await
1660 .unwrap()
1661 .is_none());
1662 }
1663
1664 #[tokio::test]
1665 async fn test_lookup_nut_padded_source_id() {
1666 let store = SqliteStore::open_in_memory().unwrap();
1667
1668 let doc = Document {
1670 id: Document::make_id(&SourceType::Nut, None, "00"),
1671 source_type: SourceType::Nut,
1672 source_repo: None,
1673 source_id: "00".to_string(),
1674 title: Some("NUT-00: Notation, Usage, and Terminology".to_string()),
1675 body: Some("# NUT-00\n\nSpec content here.".to_string()),
1676 author: None,
1677 author_id: None,
1678 created_at: Utc::now(),
1679 updated_at: None,
1680 parent_id: None,
1681 metadata: None,
1682 seq: None,
1683 };
1684 store.upsert_document(&doc).await.unwrap();
1685
1686 let ctx = store.lookup_nut(0).await.unwrap();
1688 assert!(ctx.is_some(), "lookup_nut(0) should find padded source_id '00'");
1689 assert_eq!(
1690 ctx.unwrap().document.title.as_deref(),
1691 Some("NUT-00: Notation, Usage, and Terminology")
1692 );
1693 }
1694}