Skip to main content

orbok_db/repo/
chunks.rs

1//! Chunk and chunk-location repository (RFC-006 §12).
2//!
3//! The central operation is [`ChunkRepository::insert_bundle`]: a
4//! single transaction that replaces old chunks with new ones and
5//! simultaneously updates the FTS index. Old chunks survive if the
6//! transaction fails — the previous active index remains usable
7//! (RFC-006 §12 "rechunk failure preserves previous active chunks").
8
9use crate::catalog::{Catalog, db_err};
10use orbok_core::{ChunkId, ExtractionId, FileId, OrbokResult, now_iso8601};
11use rusqlite::params;
12use sha2::{Digest, Sha256};
13
14/// Data for one chunk being inserted (RFC-006 §5 output).
15#[derive(Debug, Clone)]
16pub struct ChunkSpec {
17    pub chunk_kind: &'static str,
18    pub chunk_ordinal: u32,
19    pub heading_path: Option<String>,
20    pub title: Option<String>,
21    /// Normalized text — used for FTS indexing and the content hash.
22    /// NOT stored in the catalog (contentless design, RFC-007 §8.1).
23    pub normalized_text: String,
24    pub line_start: u32,
25    pub line_end: u32,
26    pub byte_start: Option<u64>,
27    pub byte_end: Option<u64>,
28    pub location_quality: &'static str,
29    /// Index of the parent chunk in the same specs slice, if any.
30    pub parent_idx: Option<usize>,
31}
32
33/// A chunk record returned after insertion.
34#[derive(Debug, Clone)]
35pub struct ChunkRecord {
36    pub chunk_id: ChunkId,
37    pub file_id: FileId,
38    pub chunk_ordinal: u32,
39    pub heading_path: Option<String>,
40    pub line_start: u32,
41    pub line_end: u32,
42    pub byte_start: Option<u64>,
43    pub byte_end: Option<u64>,
44    pub location_quality: String,
45}
46
47pub struct ChunkRepository<'a> {
48    catalog: &'a Catalog,
49}
50
51const CHUNKER_VERSION: &str = "chunker-v1";
52
53impl<'a> ChunkRepository<'a> {
54    pub fn new(catalog: &'a Catalog) -> Self {
55        Self { catalog }
56    }
57
58    /// Replace-on-success bundle insert (RFC-006 §12):
59    ///
60    /// 1. Insert new chunks + locations as active.
61    /// 2. Insert FTS rows and keyword_index_records.
62    /// 3. Mark old chunks (same file, different extraction) stale.
63    /// 4. Mark the file as indexed.
64    ///
65    /// All steps are inside one transaction. A failure leaves the
66    /// previous active chunks untouched.
67    pub fn insert_bundle(
68        &self,
69        file_id: &FileId,
70        extraction_id: &ExtractionId,
71        specs: &[ChunkSpec],
72    ) -> OrbokResult<Vec<ChunkRecord>> {
73        let now = now_iso8601();
74        // Assign IDs up front so parent references resolve.
75        let ids: Vec<ChunkId> = (0..specs.len()).map(|_| ChunkId::generate()).collect();
76
77        let mut conn = self.catalog.lock();
78        let tx = conn.transaction().map_err(db_err)?;
79
80        let mut records = Vec::with_capacity(specs.len());
81        for (i, spec) in specs.iter().enumerate() {
82            let chunk_id = &ids[i];
83            let parent_id = spec.parent_idx.map(|pi| ids[pi].as_str().to_string());
84            let content_hash = sha256_text(&spec.normalized_text);
85            let char_count = spec.normalized_text.chars().count() as i64;
86
87            tx.execute(
88                "INSERT INTO chunks \
89                 (chunk_id, file_id, extraction_id, parent_chunk_id, chunk_kind, \
90                  chunk_ordinal, heading_path, title, char_count, content_hash, \
91                  chunk_status, created_at, updated_at) \
92                 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,'active',?11,?11)",
93                params![
94                    chunk_id.as_str(),
95                    file_id.as_str(),
96                    extraction_id.as_str(),
97                    parent_id,
98                    spec.chunk_kind,
99                    spec.chunk_ordinal as i64,
100                    spec.heading_path,
101                    spec.title,
102                    char_count,
103                    content_hash,
104                    now,
105                ],
106            )
107            .map_err(db_err)?;
108
109            // Insert into unicode61 FTS and record rowid.
110            tx.execute(
111                "INSERT INTO chunk_fts (title, heading_path, normalized_text) \
112                 VALUES (?1, ?2, ?3)",
113                params![spec.title, spec.heading_path, spec.normalized_text],
114            )
115            .map_err(db_err)?;
116            let fts_rowid = tx.last_insert_rowid();
117
118            // Insert into trigram FTS (RFC-014 §12) for Japanese/CJK recall.
119            tx.execute(
120                "INSERT INTO chunk_fts_trigram (title, heading_path, normalized_text) \
121                 VALUES (?1, ?2, ?3)",
122                params![spec.title, spec.heading_path, spec.normalized_text],
123            )
124            .map_err(db_err)?;
125            let trigram_fts_rowid = tx.last_insert_rowid();
126
127            tx.execute(
128                "INSERT INTO keyword_index_records \
129                 (chunk_id, fts_rowid, trigram_fts_rowid, index_engine, tokenizer_name, \
130                  tokenizer_version, indexed_at, status) \
131                 VALUES (?1, ?2, ?3, 'sqlite-fts5', 'unicode61', ?4, ?5, 'active') \
132                 ON CONFLICT(chunk_id) DO UPDATE SET fts_rowid = ?2, trigram_fts_rowid = ?3, \
133                  index_engine = 'sqlite-fts5', tokenizer_name = 'unicode61', \
134                  tokenizer_version = ?4, indexed_at = ?5, status = 'active'",
135                params![
136                    chunk_id.as_str(),
137                    fts_rowid,
138                    trigram_fts_rowid,
139                    CHUNKER_VERSION,
140                    now,
141                ],
142            )
143            .map_err(db_err)?;
144
145            // Chunk location.
146            tx.execute(
147                "INSERT INTO chunk_locations \
148                 (chunk_id, byte_start, byte_end, line_start, line_end, \
149                  location_quality, created_at, updated_at) \
150                 VALUES (?1,?2,?3,?4,?5,?6,?7,?7)",
151                params![
152                    chunk_id.as_str(),
153                    spec.byte_start.map(|v| v as i64),
154                    spec.byte_end.map(|v| v as i64),
155                    spec.line_start as i64,
156                    spec.line_end as i64,
157                    spec.location_quality,
158                    now,
159                ],
160            )
161            .map_err(db_err)?;
162
163            records.push(ChunkRecord {
164                chunk_id: chunk_id.clone(),
165                file_id: file_id.clone(),
166                chunk_ordinal: spec.chunk_ordinal,
167                heading_path: spec.heading_path.clone(),
168                line_start: spec.line_start,
169                line_end: spec.line_end,
170                byte_start: spec.byte_start,
171                byte_end: spec.byte_end,
172                location_quality: spec.location_quality.to_string(),
173            });
174        }
175
176        // Mark old chunks for this file that belong to a different extraction stale.
177        tx.execute(
178            "UPDATE chunks SET chunk_status = 'stale', updated_at = ?3 \
179             WHERE file_id = ?1 AND extraction_id != ?2 AND chunk_status = 'active'",
180            params![file_id.as_str(), extraction_id.as_str(), now],
181        )
182        .map_err(db_err)?;
183
184        // Mark file indexed.
185        tx.execute(
186            "UPDATE files SET file_status = 'indexed', last_indexed_at = ?2, updated_at = ?2 \
187             WHERE file_id = ?1",
188            params![file_id.as_str(), now],
189        )
190        .map_err(db_err)?;
191
192        tx.commit().map_err(db_err)?;
193        Ok(records)
194    }
195
196    /// Retrieve chunk records for a file (used by snippet loader and
197    /// tests).
198    pub fn list_for_file(&self, file_id: &FileId) -> OrbokResult<Vec<ChunkRecord>> {
199        let conn = self.catalog.lock();
200        let mut stmt = conn
201            .prepare(
202                "SELECT c.chunk_id, c.file_id, c.chunk_ordinal, c.heading_path, \
203                  l.line_start, l.line_end, l.byte_start, l.byte_end, l.location_quality \
204                 FROM chunks c \
205                 LEFT JOIN chunk_locations l ON l.chunk_id = c.chunk_id \
206                 WHERE c.file_id = ?1 AND c.chunk_status = 'active' \
207                 ORDER BY c.chunk_ordinal",
208            )
209            .map_err(db_err)?;
210        let rows = stmt
211            .query_map(params![file_id.as_str()], |row| {
212                Ok(ChunkRecord {
213                    chunk_id: ChunkId::from_string(row.get::<_, String>(0)?),
214                    file_id: FileId::from_string(row.get::<_, String>(1)?),
215                    chunk_ordinal: row.get::<_, i64>(2)? as u32,
216                    heading_path: row.get(3)?,
217                    line_start: row.get::<_, i64>(4)? as u32,
218                    line_end: row.get::<_, i64>(5)? as u32,
219                    byte_start: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
220                    byte_end: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
221                    location_quality: row.get(8).unwrap_or_else(|_| "unknown".to_string()),
222                })
223            })
224            .map_err(db_err)?;
225        let mut out = Vec::new();
226        for row in rows {
227            out.push(row.map_err(db_err)?);
228        }
229        Ok(out)
230    }
231}
232
233fn sha256_text(text: &str) -> String {
234    let mut h = Sha256::new();
235    h.update(text.as_bytes());
236    let d = h.finalize();
237    let mut s = String::with_capacity(d.len() * 2);
238    for b in d.iter() {
239        use std::fmt::Write;
240        let _ = write!(s, "{b:02x}");
241    }
242    s
243}