Skip to main content

tuitbot_core/automation/watchtower/
chunker.rs

1//! Fragment extraction and indexing for ingested content nodes.
2//!
3//! Splits markdown notes into heading-delimited fragments so retrieval can
4//! cite specific sections instead of whole notes. Plain-text files (or notes
5//! without headings) produce a single root fragment.
6
7use std::sync::OnceLock;
8
9use regex::Regex;
10use sha2::{Digest, Sha256};
11
12use crate::error::StorageError;
13use crate::storage::watchtower as store;
14use crate::storage::DbPool;
15
16// ---------------------------------------------------------------------------
17// Error type
18// ---------------------------------------------------------------------------
19
20/// Errors specific to the chunking pipeline.
21#[derive(Debug, thiserror::Error)]
22pub enum ChunkerError {
23    #[error("storage error: {0}")]
24    Storage(#[from] StorageError),
25}
26
27// ---------------------------------------------------------------------------
28// Fragment type
29// ---------------------------------------------------------------------------
30
31/// A parsed fragment from a note body.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct Fragment {
34    /// Slash-delimited heading hierarchy, e.g. `"## Section/### Sub"`.
35    /// Empty string for text before any heading (root fragment).
36    pub heading_path: String,
37    /// Body text under this heading (trimmed of leading/trailing blank lines).
38    pub text: String,
39    /// 0-based position within the note.
40    pub index: usize,
41}
42
43// ---------------------------------------------------------------------------
44// Heading regex
45// ---------------------------------------------------------------------------
46
47fn heading_re() -> &'static Regex {
48    static RE: OnceLock<Regex> = OnceLock::new();
49    RE.get_or_init(|| Regex::new(r"^(#{1,6})\s+(.+)$").expect("heading regex"))
50}
51
52// ---------------------------------------------------------------------------
53// Fragment extraction
54// ---------------------------------------------------------------------------
55
56/// Extract fragments from a markdown or plain-text body.
57///
58/// The caller is responsible for stripping front-matter before calling this.
59/// If the body contains no headings, a single root fragment is returned.
60/// Empty or whitespace-only fragments are skipped.
61pub fn extract_fragments(body: &str) -> Vec<Fragment> {
62    let mut fragments: Vec<Fragment> = Vec::new();
63    // heading_stack: Vec<(level, heading_text_with_hashes)>
64    let mut heading_stack: Vec<(usize, String)> = Vec::new();
65    let mut current_text = String::new();
66    let mut in_code_block = false;
67
68    for line in body.lines() {
69        // Track fenced code blocks to avoid treating `# ` inside them as headings.
70        if line.trim_start().starts_with("```") {
71            in_code_block = !in_code_block;
72            current_text.push_str(line);
73            current_text.push('\n');
74            continue;
75        }
76
77        if in_code_block {
78            current_text.push_str(line);
79            current_text.push('\n');
80            continue;
81        }
82
83        if let Some(caps) = heading_re().captures(line) {
84            // Flush accumulated text as a fragment.
85            flush_fragment(&heading_stack, &mut current_text, &mut fragments);
86
87            let level = caps[1].len();
88            let heading_text = caps[2].trim().to_string();
89            let heading_label = format!("{} {heading_text}", &caps[1]);
90
91            // Pop headings at the same level or deeper.
92            while heading_stack.last().is_some_and(|(lvl, _)| *lvl >= level) {
93                heading_stack.pop();
94            }
95            heading_stack.push((level, heading_label));
96        } else {
97            current_text.push_str(line);
98            current_text.push('\n');
99        }
100    }
101
102    // Flush final accumulated text.
103    flush_fragment(&heading_stack, &mut current_text, &mut fragments);
104
105    // Re-index after filtering.
106    for (i, frag) in fragments.iter_mut().enumerate() {
107        frag.index = i;
108    }
109
110    fragments
111}
112
113/// Flush accumulated text into a fragment if non-empty.
114fn flush_fragment(
115    heading_stack: &[(usize, String)],
116    current_text: &mut String,
117    fragments: &mut Vec<Fragment>,
118) {
119    let trimmed = current_text.trim();
120    if !trimmed.is_empty() {
121        let heading_path = heading_stack
122            .iter()
123            .map(|(_, h)| h.as_str())
124            .collect::<Vec<_>>()
125            .join("/");
126
127        fragments.push(Fragment {
128            heading_path,
129            text: trimmed.to_string(),
130            index: 0, // Will be re-indexed later.
131        });
132    }
133    current_text.clear();
134}
135
136// ---------------------------------------------------------------------------
137// Chunk a content node
138// ---------------------------------------------------------------------------
139
140/// Extract fragments from a content node and persist them as chunks.
141///
142/// 1. Extracts fragments from `node.body_text`
143/// 2. Marks existing chunks stale (idempotent on first chunk)
144/// 3. Upserts new chunks (deduplicates by hash)
145/// 4. Marks the node as `chunked`
146///
147/// Returns the IDs of all active chunks for the node.
148pub async fn chunk_node(
149    pool: &DbPool,
150    account_id: &str,
151    node_id: i64,
152    body_text: &str,
153) -> Result<Vec<i64>, ChunkerError> {
154    let fragments = extract_fragments(body_text);
155
156    // Mark existing chunks stale before upserting new ones.
157    store::mark_chunks_stale(pool, account_id, node_id).await?;
158
159    // Build NewChunk structs with SHA-256 hashes.
160    let new_chunks: Vec<store::NewChunk> = fragments
161        .iter()
162        .map(|f| {
163            let mut hasher = Sha256::new();
164            hasher.update(f.text.as_bytes());
165            let hash = format!("{:x}", hasher.finalize());
166
167            store::NewChunk {
168                heading_path: f.heading_path.clone(),
169                chunk_text: f.text.clone(),
170                chunk_hash: hash,
171                chunk_index: f.index as i64,
172            }
173        })
174        .collect();
175
176    let ids = store::upsert_chunks_for_node(pool, account_id, node_id, &new_chunks).await?;
177
178    // Transition node status: pending → chunked.
179    store::mark_node_chunked(pool, account_id, node_id).await?;
180
181    Ok(ids)
182}
183
184/// Process all pending content nodes for an account: extract and persist fragments.
185///
186/// Returns the total number of nodes chunked.
187pub async fn chunk_pending_nodes(pool: &DbPool, account_id: &str, limit: u32) -> u32 {
188    let nodes = match store::get_pending_content_nodes_for(pool, account_id, limit).await {
189        Ok(n) => n,
190        Err(e) => {
191            tracing::warn!(error = %e, "Failed to get pending nodes for chunking");
192            return 0;
193        }
194    };
195
196    let mut chunked = 0u32;
197    for node in &nodes {
198        match chunk_node(pool, account_id, node.id, &node.body_text).await {
199            Ok(_ids) => {
200                chunked += 1;
201                tracing::debug!(
202                    node_id = node.id,
203                    path = %node.relative_path,
204                    "Chunked content node"
205                );
206            }
207            Err(e) => {
208                tracing::warn!(
209                    node_id = node.id,
210                    path = %node.relative_path,
211                    error = %e,
212                    "Failed to chunk node"
213                );
214            }
215        }
216    }
217
218    chunked
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    // ── Simple markdown document ────────────────────────────────────
226
227    #[test]
228    fn chunk_simple_markdown() {
229        let body = "First paragraph.\n\nSecond paragraph.\n";
230        let frags = extract_fragments(body);
231        assert_eq!(frags.len(), 1);
232        assert_eq!(frags[0].heading_path, "");
233        assert!(frags[0].text.contains("First paragraph"));
234        assert!(frags[0].text.contains("Second paragraph"));
235    }
236
237    // ── Splits on headings ──────────────────────────────────────────
238
239    #[test]
240    fn chunk_splits_on_headings() {
241        let body = "\
242## Introduction
243
244Welcome to the guide.
245
246## Getting Started
247
248Install the CLI tool.
249
250## Advanced Usage
251
252Use the `--verbose` flag.
253";
254        let frags = extract_fragments(body);
255        assert_eq!(frags.len(), 3);
256        assert_eq!(frags[0].heading_path, "## Introduction");
257        assert!(frags[0].text.contains("Welcome"));
258        assert_eq!(frags[1].heading_path, "## Getting Started");
259        assert!(frags[1].text.contains("Install"));
260        assert_eq!(frags[2].heading_path, "## Advanced Usage");
261        assert!(frags[2].text.contains("--verbose"));
262    }
263
264    // ── Empty input ─────────────────────────────────────────────────
265
266    #[test]
267    fn chunk_empty_input_returns_empty() {
268        assert!(extract_fragments("").is_empty());
269    }
270
271    #[test]
272    fn chunk_whitespace_only_returns_empty() {
273        assert!(extract_fragments("   \n\n\t\n  ").is_empty());
274    }
275
276    // ── Long text without headings stays as single fragment ─────────
277
278    #[test]
279    fn chunk_long_text_single_fragment() {
280        let long = "word ".repeat(500);
281        let frags = extract_fragments(&long);
282        assert_eq!(frags.len(), 1);
283        assert_eq!(frags[0].heading_path, "");
284        assert_eq!(frags[0].index, 0);
285    }
286
287    // ── Heading hierarchy preserved ─────────────────────────────────
288
289    #[test]
290    fn chunk_heading_hierarchy_preserved() {
291        let body = "\
292# Top Level
293
294Intro text.
295
296## Section One
297
298Content one.
299
300### Subsection A
301
302Deep content A.
303
304### Subsection B
305
306Deep content B.
307
308## Section Two
309
310Content two.
311";
312        let frags = extract_fragments(body);
313        assert_eq!(frags.len(), 5);
314        assert_eq!(frags[0].heading_path, "# Top Level");
315        assert_eq!(frags[1].heading_path, "# Top Level/## Section One");
316        assert_eq!(
317            frags[2].heading_path,
318            "# Top Level/## Section One/### Subsection A"
319        );
320        assert_eq!(
321            frags[3].heading_path,
322            "# Top Level/## Section One/### Subsection B"
323        );
324        assert_eq!(frags[4].heading_path, "# Top Level/## Section Two");
325    }
326
327    // ── Root text before any heading ────────────────────────────────
328
329    #[test]
330    fn chunk_root_text_before_heading() {
331        let body = "Preamble text.\n\n## Heading\n\nBody.\n";
332        let frags = extract_fragments(body);
333        assert_eq!(frags.len(), 2);
334        assert_eq!(frags[0].heading_path, "");
335        assert!(frags[0].text.contains("Preamble"));
336        assert_eq!(frags[1].heading_path, "## Heading");
337    }
338
339    // ── Code blocks do not create headings ──────────────────────────
340
341    #[test]
342    fn chunk_code_block_headings_ignored() {
343        let body = "\
344## Real Heading
345
346Some text.
347
348```markdown
349# This is inside a code block
350## Also inside
351```
352
353More text after code.
354";
355        let frags = extract_fragments(body);
356        assert_eq!(frags.len(), 1);
357        assert_eq!(frags[0].heading_path, "## Real Heading");
358        assert!(frags[0].text.contains("# This is inside a code block"));
359        assert!(frags[0].text.contains("More text after code"));
360    }
361
362    // ── Consecutive headings with no body skip empty fragments ──────
363
364    #[test]
365    fn chunk_consecutive_headings_skip_empty() {
366        let body = "## A\n## B\n## C\nFinal content.\n";
367        let frags = extract_fragments(body);
368        assert_eq!(frags.len(), 1);
369        assert_eq!(frags[0].heading_path, "## C");
370        assert!(frags[0].text.contains("Final content"));
371    }
372
373    // ── Index values are sequential ─────────────────────────────────
374
375    #[test]
376    fn chunk_indices_sequential() {
377        let body = "Intro.\n\n## A\n\nA text.\n\n## B\n\nB text.\n\n## C\n\nC text.\n";
378        let frags = extract_fragments(body);
379        assert_eq!(frags.len(), 4);
380        for (i, frag) in frags.iter().enumerate() {
381            assert_eq!(frag.index, i, "fragment {i} should have index {i}");
382        }
383    }
384
385    // ── H6 heading level works ──────────────────────────────────────
386
387    #[test]
388    fn chunk_h6_heading() {
389        let body = "###### Deep Heading\n\nDeep content.\n";
390        let frags = extract_fragments(body);
391        assert_eq!(frags.len(), 1);
392        assert_eq!(frags[0].heading_path, "###### Deep Heading");
393    }
394
395    // ── Heading level reset pops stack correctly ────────────────────
396
397    #[test]
398    fn chunk_heading_level_reset() {
399        let body = "\
400### Level 3
401
402Content at 3.
403
404## Level 2
405
406Content at 2.
407";
408        let frags = extract_fragments(body);
409        assert_eq!(frags.len(), 2);
410        assert_eq!(frags[0].heading_path, "### Level 3");
411        assert_eq!(frags[1].heading_path, "## Level 2");
412    }
413
414    // ── Fragment text is trimmed ────────────────────────────────────
415
416    #[test]
417    fn chunk_fragment_text_trimmed() {
418        let body = "## Heading\n\n\n   Some text.   \n\n\n";
419        let frags = extract_fragments(body);
420        assert_eq!(frags.len(), 1);
421        assert_eq!(frags[0].text, "Some text.");
422    }
423}