Skip to main content

ctx_graph/
lib.rs

1use anyhow::{Context, Result};
2use rusqlite::{Connection, OptionalExtension, params};
3use serde::{Deserialize, Serialize};
4use std::path::Path;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct SymbolHit {
8    pub id: i64,
9    pub file_path: String,
10    pub name: String,
11    pub kind: String,
12    pub signature: String,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SnippetHit {
17    pub snippet_id: i64,
18    pub file_path: String,
19    pub symbol_name: Option<String>,
20    pub content: String,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct FailureRecord {
25    pub message: String,
26    pub root_cause: Option<String>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct MemoryDirective {
31    pub id: i64,
32    pub key: String,
33    pub body: String,
34    pub scope: String,
35    pub source: String,
36    pub created_at: String,
37    pub updated_at: String,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct RunInsert {
42    pub command: String,
43    pub status: String,
44    pub agent: Option<String>,
45    pub exit_code: Option<i32>,
46    pub duration_ms: Option<u64>,
47    pub original_tokens: Option<usize>,
48    pub packed_tokens: Option<usize>,
49    pub reduction_pct: Option<f64>,
50    pub fallback_used: bool,
51    pub pack_path: Option<String>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RunRecord {
56    pub id: i64,
57    pub command: String,
58    pub status: String,
59    pub agent: Option<String>,
60    pub exit_code: Option<i32>,
61    pub duration_ms: Option<u64>,
62    pub original_tokens: Option<usize>,
63    pub packed_tokens: Option<usize>,
64    pub reduction_pct: Option<f64>,
65    pub fallback_used: bool,
66    pub pack_path: Option<String>,
67    pub created_at: String,
68}
69
70pub struct GraphStore {
71    conn: Connection,
72}
73
74impl GraphStore {
75    pub fn open(path: &Path) -> Result<Self> {
76        if let Some(parent) = path.parent() {
77            std::fs::create_dir_all(parent).with_context(|| {
78                format!("failed to create graph parent dir {}", parent.display())
79            })?;
80        }
81
82        let conn = Connection::open(path)
83            .with_context(|| format!("failed to open sqlite db at {}", path.display()))?;
84
85        Ok(Self { conn })
86    }
87
88    pub fn init_schema(&self) -> Result<()> {
89        self.conn
90            .execute_batch(include_str!("schema.sql"))
91            .context("failed to initialize sqlite schema")?;
92        self.migrate_runs_table()
93    }
94
95    pub fn index_file(&self, path: &str) -> Result<()> {
96        self.conn
97            .execute(
98                "INSERT INTO files(path, updated_at) VALUES (?1, CURRENT_TIMESTAMP)
99                 ON CONFLICT(path) DO UPDATE SET updated_at = CURRENT_TIMESTAMP",
100                params![path],
101            )
102            .context("failed to index file")?;
103        Ok(())
104    }
105
106    pub fn query_files(&self, term: &str) -> Result<Vec<String>> {
107        let pattern = format!("%{}%", term);
108        let mut stmt = self
109            .conn
110            .prepare("SELECT path FROM files WHERE path LIKE ?1 ORDER BY path ASC")
111            .context("failed to prepare query")?;
112
113        let mut rows = stmt
114            .query(params![pattern])
115            .context("failed to query files")?;
116        let mut out = Vec::new();
117
118        while let Some(row) = rows.next().context("failed to read row")? {
119            out.push(row.get::<_, String>(0).context("failed to decode path")?);
120        }
121
122        Ok(out)
123    }
124
125    pub fn upsert_symbol(
126        &self,
127        file_path: &str,
128        name: &str,
129        kind: &str,
130        signature: &str,
131    ) -> Result<i64> {
132        self.index_file(file_path)?;
133        let file_id = self
134            .file_id(file_path)?
135            .context("file id should exist after index_file")?;
136
137        self.conn
138            .execute(
139                "INSERT INTO symbols(file_id, name, kind, signature, updated_at)
140                 VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP)
141                 ON CONFLICT(file_id, name, kind) DO UPDATE SET
142                   signature = excluded.signature,
143                   updated_at = CURRENT_TIMESTAMP",
144                params![file_id, name, kind, signature],
145            )
146            .context("failed to upsert symbol")?;
147
148        self.conn
149            .query_row(
150                "SELECT id FROM symbols WHERE file_id = ?1 AND name = ?2 AND kind = ?3",
151                params![file_id, name, kind],
152                |row| row.get::<_, i64>(0),
153            )
154            .context("failed to fetch upserted symbol id")
155    }
156
157    pub fn search_symbols(&self, term: &str) -> Result<Vec<SymbolHit>> {
158        let pattern = format!("%{}%", term);
159        let mut stmt = self
160            .conn
161            .prepare(
162                "SELECT s.id, f.path, s.name, s.kind, COALESCE(s.signature, '')
163                 FROM symbols s
164                 JOIN files f ON f.id = s.file_id
165                 WHERE s.name LIKE ?1 OR s.signature LIKE ?1 OR f.path LIKE ?1
166                 ORDER BY s.updated_at DESC, s.id DESC",
167            )
168            .context("failed to prepare search_symbols")?;
169
170        let rows = stmt
171            .query_map(params![pattern], |row| {
172                Ok(SymbolHit {
173                    id: row.get(0)?,
174                    file_path: row.get(1)?,
175                    name: row.get(2)?,
176                    kind: row.get(3)?,
177                    signature: row.get(4)?,
178                })
179            })
180            .context("failed to run search_symbols")?;
181
182        let mut out = Vec::new();
183        for row in rows {
184            out.push(row.context("failed to decode symbol row")?);
185        }
186        Ok(out)
187    }
188
189    pub fn find_symbols_by_exact_name(&self, name: &str, limit: usize) -> Result<Vec<SymbolHit>> {
190        let mut stmt = self
191            .conn
192            .prepare(
193                "SELECT s.id, f.path, s.name, s.kind, COALESCE(s.signature, '')
194                 FROM symbols s
195                 JOIN files f ON f.id = s.file_id
196                 WHERE s.name = ?1
197                 ORDER BY s.updated_at DESC, s.id DESC
198                 LIMIT ?2",
199            )
200            .context("failed to prepare find_symbols_by_exact_name")?;
201
202        let rows = stmt
203            .query_map(params![name, limit as i64], |row| {
204                Ok(SymbolHit {
205                    id: row.get(0)?,
206                    file_path: row.get(1)?,
207                    name: row.get(2)?,
208                    kind: row.get(3)?,
209                    signature: row.get(4)?,
210                })
211            })
212            .context("failed to run find_symbols_by_exact_name")?;
213
214        let mut out = Vec::new();
215        for row in rows {
216            out.push(row.context("failed to decode exact symbol row")?);
217        }
218        Ok(out)
219    }
220
221    pub fn link_symbols(
222        &self,
223        src_symbol_id: i64,
224        dst_symbol_id: i64,
225        edge_type: &str,
226        metadata_json: Option<&str>,
227    ) -> Result<()> {
228        self.conn
229            .execute(
230                "INSERT INTO edges(src_symbol_id, dst_symbol_id, type, metadata_json)
231                 VALUES (?1, ?2, ?3, ?4)
232                 ON CONFLICT(src_symbol_id, dst_symbol_id, type) DO UPDATE SET
233                   metadata_json = excluded.metadata_json",
234                params![src_symbol_id, dst_symbol_id, edge_type, metadata_json],
235            )
236            .context("failed to link symbols")?;
237        Ok(())
238    }
239
240    pub fn related_symbols(&self, symbol_name: &str, limit: usize) -> Result<Vec<SymbolHit>> {
241        let mut stmt = self
242            .conn
243            .prepare(
244                "SELECT DISTINCT dst.id, f.path, dst.name, dst.kind, COALESCE(dst.signature, '')
245                 FROM symbols src
246                 JOIN edges e ON e.src_symbol_id = src.id
247                 JOIN symbols dst ON dst.id = e.dst_symbol_id
248                 JOIN files f ON f.id = dst.file_id
249                 WHERE src.name = ?1
250                 LIMIT ?2",
251            )
252            .context("failed to prepare related_symbols")?;
253
254        let rows = stmt
255            .query_map(params![symbol_name, limit as i64], |row| {
256                Ok(SymbolHit {
257                    id: row.get(0)?,
258                    file_path: row.get(1)?,
259                    name: row.get(2)?,
260                    kind: row.get(3)?,
261                    signature: row.get(4)?,
262                })
263            })
264            .context("failed to execute related_symbols")?;
265
266        let mut out = Vec::new();
267        for row in rows {
268            out.push(row.context("failed to decode related symbol row")?);
269        }
270
271        Ok(out)
272    }
273
274    pub fn add_snippet(
275        &self,
276        file_path: &str,
277        symbol_name: Option<&str>,
278        content: &str,
279    ) -> Result<i64> {
280        self.index_file(file_path)?;
281        let file_id = self
282            .file_id(file_path)?
283            .context("file id should exist after index_file")?;
284        let symbol_id = if let Some(name) = symbol_name {
285            self.conn
286                .query_row(
287                    "SELECT id FROM symbols WHERE file_id = ?1 AND name = ?2 LIMIT 1",
288                    params![file_id, name],
289                    |row| row.get::<_, i64>(0),
290                )
291                .optional()
292                .context("failed to fetch symbol id for snippet")?
293        } else {
294            None
295        };
296
297        self.conn
298            .execute(
299                "INSERT INTO snippets(file_id, symbol_id, content, created_at)
300                 VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP)",
301                params![file_id, symbol_id, content],
302            )
303            .context("failed to insert snippet")?;
304
305        Ok(self.conn.last_insert_rowid())
306    }
307
308    pub fn search_snippets(&self, term: &str, limit: usize) -> Result<Vec<SnippetHit>> {
309        let escaped = term.replace('"', "\"");
310        let query = if escaped.trim().is_empty() {
311            "*".to_string()
312        } else {
313            escaped
314        };
315
316        let mut stmt = self
317            .conn
318            .prepare(
319                "SELECT s.id, f.path, sym.name, s.content
320                 FROM snippets_fts fts
321                 JOIN snippets s ON s.id = fts.rowid
322                 JOIN files f ON f.id = s.file_id
323                 LEFT JOIN symbols sym ON sym.id = s.symbol_id
324                 WHERE snippets_fts MATCH ?1
325                 LIMIT ?2",
326            )
327            .context("failed to prepare search_snippets")?;
328
329        let rows = stmt
330            .query_map(params![query, limit as i64], |row| {
331                Ok(SnippetHit {
332                    snippet_id: row.get(0)?,
333                    file_path: row.get(1)?,
334                    symbol_name: row.get(2)?,
335                    content: row.get(3)?,
336                })
337            })
338            .context("failed to query snippets fts")?;
339
340        let mut out = Vec::new();
341        for row in rows {
342            out.push(row.context("failed to decode snippet row")?);
343        }
344        Ok(out)
345    }
346
347    pub fn record_run(&self, command: &str, status: &str) -> Result<i64> {
348        self.record_invocation_run(&RunInsert {
349            command: command.to_string(),
350            status: status.to_string(),
351            agent: None,
352            exit_code: None,
353            duration_ms: None,
354            original_tokens: None,
355            packed_tokens: None,
356            reduction_pct: None,
357            fallback_used: false,
358            pack_path: None,
359        })
360    }
361
362    pub fn record_invocation_run(&self, run: &RunInsert) -> Result<i64> {
363        self.conn
364            .execute(
365                "INSERT INTO runs(
366                   command, status, agent, exit_code, duration_ms, original_tokens,
367                   packed_tokens, reduction_pct, fallback_used, pack_path, created_at
368                 )
369                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, CURRENT_TIMESTAMP)",
370                params![
371                    run.command,
372                    run.status,
373                    run.agent,
374                    run.exit_code,
375                    run.duration_ms.map(|value| value as i64),
376                    run.original_tokens.map(|value| value as i64),
377                    run.packed_tokens.map(|value| value as i64),
378                    run.reduction_pct,
379                    if run.fallback_used { 1 } else { 0 },
380                    run.pack_path,
381                ],
382            )
383            .context("failed to insert run")?;
384        Ok(self.conn.last_insert_rowid())
385    }
386
387    pub fn recent_runs(&self, limit: usize) -> Result<Vec<RunRecord>> {
388        let mut stmt = self
389            .conn
390            .prepare(
391                "SELECT id, command, status, agent, exit_code, duration_ms, original_tokens,
392                        packed_tokens, reduction_pct, fallback_used, pack_path, created_at
393                 FROM runs
394                 ORDER BY id DESC
395                 LIMIT ?1",
396            )
397            .context("failed to prepare recent_runs")?;
398
399        let rows = stmt
400            .query_map(params![limit as i64], |row| {
401                let duration_ms = row.get::<_, Option<i64>>(5)?.map(|value| value as u64);
402                let original_tokens = row.get::<_, Option<i64>>(6)?.map(|value| value as usize);
403                let packed_tokens = row.get::<_, Option<i64>>(7)?.map(|value| value as usize);
404                let fallback_used = row.get::<_, i64>(9)? != 0;
405
406                Ok(RunRecord {
407                    id: row.get(0)?,
408                    command: row.get(1)?,
409                    status: row.get(2)?,
410                    agent: row.get(3)?,
411                    exit_code: row.get(4)?,
412                    duration_ms,
413                    original_tokens,
414                    packed_tokens,
415                    reduction_pct: row.get(8)?,
416                    fallback_used,
417                    pack_path: row.get(10)?,
418                    created_at: row.get(11)?,
419                })
420            })
421            .context("failed to query recent_runs")?;
422
423        let mut out = Vec::new();
424        for row in rows {
425            out.push(row.context("failed to decode run row")?);
426        }
427        Ok(out)
428    }
429
430    pub fn record_failure(
431        &self,
432        run_id: i64,
433        message: &str,
434        root_cause: Option<&str>,
435    ) -> Result<i64> {
436        self.conn
437            .execute(
438                "INSERT INTO failures(run_id, message, root_cause, created_at)
439                 VALUES (?1, ?2, ?3, CURRENT_TIMESTAMP)",
440                params![run_id, message, root_cause],
441            )
442            .context("failed to insert failure")?;
443        Ok(self.conn.last_insert_rowid())
444    }
445
446    pub fn recent_failures(&self, limit: usize) -> Result<Vec<FailureRecord>> {
447        let mut stmt = self
448            .conn
449            .prepare(
450                "SELECT message, root_cause
451                 FROM failures
452                 ORDER BY id DESC
453                 LIMIT ?1",
454            )
455            .context("failed to prepare recent_failures")?;
456
457        let rows = stmt
458            .query_map(params![limit as i64], |row| {
459                Ok(FailureRecord {
460                    message: row.get(0)?,
461                    root_cause: row.get(1)?,
462                })
463            })
464            .context("failed to query recent_failures")?;
465
466        let mut out = Vec::new();
467        for row in rows {
468            out.push(row.context("failed to decode failure row")?);
469        }
470        Ok(out)
471    }
472
473    pub fn record_decision(&self, title: &str, summary: &str) -> Result<i64> {
474        self.conn
475            .execute(
476                "INSERT INTO tasks(title, summary, created_at) VALUES (?1, ?2, CURRENT_TIMESTAMP)",
477                params![title, summary],
478            )
479            .context("failed to insert task decision")?;
480        let task_id = self.conn.last_insert_rowid();
481
482        self.conn
483            .execute(
484                "INSERT INTO notes(task_id, body, created_at) VALUES (?1, ?2, CURRENT_TIMESTAMP)",
485                params![task_id, summary],
486            )
487            .context("failed to insert decision note")?;
488
489        Ok(task_id)
490    }
491
492    pub fn recent_decisions(&self, limit: usize) -> Result<Vec<String>> {
493        let mut stmt = self
494            .conn
495            .prepare(
496                "SELECT t.title, n.body
497                 FROM notes n
498                 JOIN tasks t ON t.id = n.task_id
499                 ORDER BY n.id DESC
500                 LIMIT ?1",
501            )
502            .context("failed to prepare recent_decisions")?;
503
504        let rows = stmt
505            .query_map(params![limit as i64], |row| {
506                let title: String = row.get(0)?;
507                let body: String = row.get(1)?;
508                Ok(format!("{title}: {body}"))
509            })
510            .context("failed to query recent_decisions")?;
511
512        let mut out = Vec::new();
513        for row in rows {
514            out.push(row.context("failed to decode decision row")?);
515        }
516        Ok(out)
517    }
518
519    pub fn upsert_memory_directive(
520        &self,
521        key: &str,
522        body: &str,
523        scope: &str,
524        source: &str,
525    ) -> Result<i64> {
526        self.conn
527            .execute(
528                "INSERT INTO memory_directives(key, body, scope, source, created_at, updated_at)
529                 VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
530                 ON CONFLICT(key) DO UPDATE SET
531                   body = excluded.body,
532                   scope = excluded.scope,
533                   source = excluded.source,
534                   updated_at = CURRENT_TIMESTAMP",
535                params![key, body, scope, source],
536            )
537            .context("failed to upsert memory directive")?;
538
539        self.conn
540            .query_row(
541                "SELECT id FROM memory_directives WHERE key = ?1",
542                params![key],
543                |row| row.get::<_, i64>(0),
544            )
545            .context("failed to fetch memory directive id")
546    }
547
548    pub fn get_memory_directive(&self, key: &str) -> Result<Option<MemoryDirective>> {
549        self.conn
550            .query_row(
551                "SELECT id, key, body, scope, source, created_at, updated_at
552                 FROM memory_directives
553                 WHERE key = ?1",
554                params![key],
555                |row| {
556                    Ok(MemoryDirective {
557                        id: row.get(0)?,
558                        key: row.get(1)?,
559                        body: row.get(2)?,
560                        scope: row.get(3)?,
561                        source: row.get(4)?,
562                        created_at: row.get(5)?,
563                        updated_at: row.get(6)?,
564                    })
565                },
566            )
567            .optional()
568            .context("failed to fetch memory directive by key")
569    }
570
571    pub fn list_memory_directives(
572        &self,
573        scope: Option<&str>,
574        limit: usize,
575    ) -> Result<Vec<MemoryDirective>> {
576        let mut out = Vec::new();
577        if let Some(scope_filter) = scope {
578            let mut stmt = self
579                .conn
580                .prepare(
581                    "SELECT id, key, body, scope, source, created_at, updated_at
582                     FROM memory_directives
583                     WHERE scope = ?1
584                     ORDER BY updated_at DESC, id DESC
585                     LIMIT ?2",
586                )
587                .context("failed to prepare scoped memory directives query")?;
588
589            let rows = stmt
590                .query_map(params![scope_filter, limit as i64], |row| {
591                    Ok(MemoryDirective {
592                        id: row.get(0)?,
593                        key: row.get(1)?,
594                        body: row.get(2)?,
595                        scope: row.get(3)?,
596                        source: row.get(4)?,
597                        created_at: row.get(5)?,
598                        updated_at: row.get(6)?,
599                    })
600                })
601                .context("failed to query scoped memory directives")?;
602
603            for row in rows {
604                out.push(row.context("failed to decode scoped memory directive row")?);
605            }
606            return Ok(out);
607        }
608
609        let mut stmt = self
610            .conn
611            .prepare(
612                "SELECT id, key, body, scope, source, created_at, updated_at
613                 FROM memory_directives
614                 ORDER BY updated_at DESC, id DESC
615                 LIMIT ?1",
616            )
617            .context("failed to prepare memory directives query")?;
618
619        let rows = stmt
620            .query_map(params![limit as i64], |row| {
621                Ok(MemoryDirective {
622                    id: row.get(0)?,
623                    key: row.get(1)?,
624                    body: row.get(2)?,
625                    scope: row.get(3)?,
626                    source: row.get(4)?,
627                    created_at: row.get(5)?,
628                    updated_at: row.get(6)?,
629                })
630            })
631            .context("failed to query memory directives")?;
632
633        for row in rows {
634            out.push(row.context("failed to decode memory directive row")?);
635        }
636
637        Ok(out)
638    }
639
640    pub fn search_memory_directives(
641        &self,
642        query: &str,
643        limit: usize,
644    ) -> Result<Vec<MemoryDirective>> {
645        let query = query.trim();
646        if query.is_empty() {
647            return self.list_memory_directives(None, limit);
648        }
649
650        let terms = query
651            .split_whitespace()
652            .filter(|t| t.len() >= 2)
653            .map(|t| t.to_lowercase())
654            .collect::<Vec<_>>();
655
656        if terms.is_empty() {
657            return self.list_memory_directives(None, limit);
658        }
659
660        let mut weighted = Vec::new();
661        for directive in self.list_memory_directives(None, 500)? {
662            let hay = format!(
663                "{} {} {} {}",
664                directive.key, directive.body, directive.scope, directive.source
665            )
666            .to_lowercase();
667            let score = terms.iter().filter(|t| hay.contains(t.as_str())).count();
668            if score > 0 {
669                weighted.push((score, directive));
670            }
671        }
672
673        weighted.sort_by(|a, b| {
674            b.0.cmp(&a.0)
675                .then_with(|| b.1.updated_at.cmp(&a.1.updated_at))
676                .then_with(|| b.1.id.cmp(&a.1.id))
677        });
678
679        Ok(weighted
680            .into_iter()
681            .take(limit)
682            .map(|(_, directive)| directive)
683            .collect())
684    }
685
686    pub fn delete_memory_directive(&self, key: &str) -> Result<bool> {
687        let affected = self
688            .conn
689            .execute("DELETE FROM memory_directives WHERE key = ?1", params![key])
690            .context("failed to delete memory directive")?;
691        Ok(affected > 0)
692    }
693
694    pub fn delete_memory_directives_by_prefix(&self, prefix: &str) -> Result<usize> {
695        let pattern = format!("{prefix}.%");
696        let affected = self
697            .conn
698            .execute(
699                "DELETE FROM memory_directives WHERE key = ?1 OR key LIKE ?2",
700                params![prefix, pattern],
701            )
702            .context("failed to delete memory directives by prefix")?;
703        Ok(affected)
704    }
705
706    fn migrate_runs_table(&self) -> Result<()> {
707        self.ensure_column("runs", "agent", "ALTER TABLE runs ADD COLUMN agent TEXT")?;
708        self.ensure_column(
709            "runs",
710            "exit_code",
711            "ALTER TABLE runs ADD COLUMN exit_code INTEGER",
712        )?;
713        self.ensure_column(
714            "runs",
715            "duration_ms",
716            "ALTER TABLE runs ADD COLUMN duration_ms INTEGER",
717        )?;
718        self.ensure_column(
719            "runs",
720            "original_tokens",
721            "ALTER TABLE runs ADD COLUMN original_tokens INTEGER",
722        )?;
723        self.ensure_column(
724            "runs",
725            "packed_tokens",
726            "ALTER TABLE runs ADD COLUMN packed_tokens INTEGER",
727        )?;
728        self.ensure_column(
729            "runs",
730            "reduction_pct",
731            "ALTER TABLE runs ADD COLUMN reduction_pct REAL",
732        )?;
733        self.ensure_column(
734            "runs",
735            "fallback_used",
736            "ALTER TABLE runs ADD COLUMN fallback_used INTEGER NOT NULL DEFAULT 0",
737        )?;
738        self.ensure_column(
739            "runs",
740            "pack_path",
741            "ALTER TABLE runs ADD COLUMN pack_path TEXT",
742        )?;
743        self.conn
744            .execute_batch(
745                "CREATE INDEX IF NOT EXISTS idx_runs_agent_created_at ON runs(agent, created_at);",
746            )
747            .context("failed to create runs agent index")?;
748        Ok(())
749    }
750
751    fn ensure_column(&self, table: &str, column: &str, ddl: &str) -> Result<()> {
752        let mut stmt = self
753            .conn
754            .prepare(&format!("PRAGMA table_info({table})"))
755            .context("failed to inspect table columns")?;
756        let columns = stmt
757            .query_map([], |row| row.get::<_, String>(1))
758            .context("failed to query table columns")?
759            .collect::<std::result::Result<Vec<_>, _>>()?;
760
761        if !columns.iter().any(|existing| existing == column) {
762            self.conn
763                .execute_batch(ddl)
764                .with_context(|| format!("failed to add column {table}.{column}"))?;
765        }
766        Ok(())
767    }
768
769    fn file_id(&self, file_path: &str) -> Result<Option<i64>> {
770        self.conn
771            .query_row(
772                "SELECT id FROM files WHERE path = ?1",
773                params![file_path],
774                |row| row.get::<_, i64>(0),
775            )
776            .optional()
777            .context("failed to fetch file id")
778    }
779}