Skip to main content

orbok_db/repo/
chunks.rs

1//! Chunk and chunk-location repository (RFC-006 §12).
2//!
3//! The central operation is [`ChunkRepository::insert_bundle`]: a
4//! single transaction that replaces old chunks with new ones and
5//! simultaneously updates the FTS index. Old chunks survive if the
6//! transaction fails — the previous active index remains usable
7//! (RFC-006 §12 "rechunk failure preserves previous active chunks").
8
9use crate::catalog::{Catalog, db_err};
10use orbok_core::{ChunkId, ExtractionId, FileId, OrbokResult, now_iso8601};
11use rusqlite::params;
12use sha2::{Digest, Sha256};
13
14/// Data for one chunk being inserted (RFC-006 §5 output).
15#[derive(Debug, Clone)]
16pub struct ChunkSpec {
17    pub chunk_kind: &'static str,
18    pub chunk_ordinal: u32,
19    pub heading_path: Option<String>,
20    pub title: Option<String>,
21    /// Normalized text — used for FTS indexing and the content hash.
22    /// NOT stored in the catalog (contentless design, RFC-007 §8.1).
23    pub normalized_text: String,
24    pub line_start: u32,
25    pub line_end: u32,
26    pub byte_start: Option<u64>,
27    pub byte_end: Option<u64>,
28    pub location_quality: &'static str,
29    /// Index of the parent chunk in the same specs slice, if any.
30    pub parent_idx: Option<usize>,
31}
32
33/// A chunk record returned after insertion.
34#[derive(Debug, Clone)]
35pub struct ChunkRecord {
36    pub chunk_id: ChunkId,
37    pub file_id: FileId,
38    pub chunk_ordinal: u32,
39    pub heading_path: Option<String>,
40    pub line_start: u32,
41    pub line_end: u32,
42    pub byte_start: Option<u64>,
43    pub byte_end: Option<u64>,
44    pub location_quality: String,
45}
46
47pub struct ChunkRepository<'a> {
48    catalog: &'a Catalog,
49}
50
51const CHUNKER_VERSION: &str = "chunker-v1";
52
53impl<'a> ChunkRepository<'a> {
54    pub fn new(catalog: &'a Catalog) -> Self {
55        Self { catalog }
56    }
57
58    /// Replace-on-success bundle insert (RFC-006 §12):
59    ///
60    /// 1. Insert new chunks + locations as active.
61    /// 2. Insert FTS rows and keyword_index_records.
62    /// 3. Mark old chunks (same file, different extraction) stale.
63    /// 4. Mark the file as indexed.
64    ///
65    /// All steps are inside one transaction. A failure leaves the
66    /// previous active chunks untouched.
67    pub fn insert_bundle(
68        &self,
69        file_id: &FileId,
70        extraction_id: &ExtractionId,
71        specs: &[ChunkSpec],
72    ) -> OrbokResult<Vec<ChunkRecord>> {
73        let now = now_iso8601();
74        // Assign IDs up front so parent references resolve.
75        let ids: Vec<ChunkId> = (0..specs.len()).map(|_| ChunkId::generate()).collect();
76
77        let mut conn = self.catalog.lock();
78        let tx = conn.transaction().map_err(db_err)?;
79
80        let mut records = Vec::with_capacity(specs.len());
81        for (i, spec) in specs.iter().enumerate() {
82            let chunk_id = &ids[i];
83            let parent_id = spec.parent_idx.map(|pi| ids[pi].as_str().to_string());
84            let content_hash = sha256_text(&spec.normalized_text);
85            let char_count = spec.normalized_text.chars().count() as i64;
86
87            tx.execute(
88                "INSERT INTO chunks \
89                 (chunk_id, file_id, extraction_id, parent_chunk_id, chunk_kind, \
90                  chunk_ordinal, heading_path, title, char_count, content_hash, \
91                  chunk_status, created_at, updated_at) \
92                 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,'active',?11,?11)",
93                params![
94                    chunk_id.as_str(),
95                    file_id.as_str(),
96                    extraction_id.as_str(),
97                    parent_id,
98                    spec.chunk_kind,
99                    spec.chunk_ordinal as i64,
100                    spec.heading_path,
101                    spec.title,
102                    char_count,
103                    content_hash,
104                    now,
105                ],
106            )
107            .map_err(db_err)?;
108
109            // Insert FTS row and record the rowid mapping.
110            tx.execute(
111                "INSERT INTO chunk_fts (title, heading_path, normalized_text) \
112                 VALUES (?1, ?2, ?3)",
113                params![spec.title, spec.heading_path, spec.normalized_text],
114            )
115            .map_err(db_err)?;
116            let fts_rowid = tx.last_insert_rowid();
117
118            tx.execute(
119                "INSERT INTO keyword_index_records \
120                 (chunk_id, fts_rowid, index_engine, tokenizer_name, tokenizer_version, \
121                  indexed_at, status) \
122                 VALUES (?1,?2,'sqlite-fts5','unicode61',?3,?4,'active')",
123                params![chunk_id.as_str(), fts_rowid, CHUNKER_VERSION, now],
124            )
125            .map_err(db_err)?;
126
127            // Chunk location.
128            tx.execute(
129                "INSERT INTO chunk_locations \
130                 (chunk_id, byte_start, byte_end, line_start, line_end, \
131                  location_quality, created_at, updated_at) \
132                 VALUES (?1,?2,?3,?4,?5,?6,?7,?7)",
133                params![
134                    chunk_id.as_str(),
135                    spec.byte_start.map(|v| v as i64),
136                    spec.byte_end.map(|v| v as i64),
137                    spec.line_start as i64,
138                    spec.line_end as i64,
139                    spec.location_quality,
140                    now,
141                ],
142            )
143            .map_err(db_err)?;
144
145            records.push(ChunkRecord {
146                chunk_id: chunk_id.clone(),
147                file_id: file_id.clone(),
148                chunk_ordinal: spec.chunk_ordinal,
149                heading_path: spec.heading_path.clone(),
150                line_start: spec.line_start,
151                line_end: spec.line_end,
152                byte_start: spec.byte_start,
153                byte_end: spec.byte_end,
154                location_quality: spec.location_quality.to_string(),
155            });
156        }
157
158        // Mark old chunks for this file that belong to a different extraction stale.
159        tx.execute(
160            "UPDATE chunks SET chunk_status = 'stale', updated_at = ?3 \
161             WHERE file_id = ?1 AND extraction_id != ?2 AND chunk_status = 'active'",
162            params![file_id.as_str(), extraction_id.as_str(), now],
163        )
164        .map_err(db_err)?;
165
166        // Mark file indexed.
167        tx.execute(
168            "UPDATE files SET file_status = 'indexed', last_indexed_at = ?2, updated_at = ?2 \
169             WHERE file_id = ?1",
170            params![file_id.as_str(), now],
171        )
172        .map_err(db_err)?;
173
174        tx.commit().map_err(db_err)?;
175        Ok(records)
176    }
177
178    /// Retrieve chunk records for a file (used by snippet loader and
179    /// tests).
180    pub fn list_for_file(&self, file_id: &FileId) -> OrbokResult<Vec<ChunkRecord>> {
181        let conn = self.catalog.lock();
182        let mut stmt = conn
183            .prepare(
184                "SELECT c.chunk_id, c.file_id, c.chunk_ordinal, c.heading_path, \
185                  l.line_start, l.line_end, l.byte_start, l.byte_end, l.location_quality \
186                 FROM chunks c \
187                 LEFT JOIN chunk_locations l ON l.chunk_id = c.chunk_id \
188                 WHERE c.file_id = ?1 AND c.chunk_status = 'active' \
189                 ORDER BY c.chunk_ordinal",
190            )
191            .map_err(db_err)?;
192        let rows = stmt
193            .query_map(params![file_id.as_str()], |row| {
194                Ok(ChunkRecord {
195                    chunk_id: ChunkId::from_string(row.get::<_, String>(0)?),
196                    file_id: FileId::from_string(row.get::<_, String>(1)?),
197                    chunk_ordinal: row.get::<_, i64>(2)? as u32,
198                    heading_path: row.get(3)?,
199                    line_start: row.get::<_, i64>(4)? as u32,
200                    line_end: row.get::<_, i64>(5)? as u32,
201                    byte_start: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
202                    byte_end: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
203                    location_quality: row.get(8).unwrap_or_else(|_| "unknown".to_string()),
204                })
205            })
206            .map_err(db_err)?;
207        let mut out = Vec::new();
208        for row in rows {
209            out.push(row.map_err(db_err)?);
210        }
211        Ok(out)
212    }
213}
214
215fn sha256_text(text: &str) -> String {
216    let mut h = Sha256::new();
217    h.update(text.as_bytes());
218    let d = h.finalize();
219    let mut s = String::with_capacity(d.len() * 2);
220    for b in d.iter() {
221        use std::fmt::Write;
222        let _ = write!(s, "{b:02x}");
223    }
224    s
225}