Skip to main content

tuitbot_core/automation/watchtower/
chunker.rs

1//! Fragment extraction and indexing for ingested content nodes.
2//!
3//! Splits markdown notes into heading-delimited fragments so retrieval can
4//! cite specific sections instead of whole notes. Plain-text files (or notes
5//! without headings) produce a single root fragment.
6
7use std::sync::OnceLock;
8
9use regex::Regex;
10use sha2::{Digest, Sha256};
11
12use crate::error::StorageError;
13use crate::storage::watchtower as store;
14use crate::storage::DbPool;
15
16// ---------------------------------------------------------------------------
17// Error type
18// ---------------------------------------------------------------------------
19
20/// Errors specific to the chunking pipeline.
21#[derive(Debug, thiserror::Error)]
22pub enum ChunkerError {
23    #[error("storage error: {0}")]
24    Storage(#[from] StorageError),
25}
26
27// ---------------------------------------------------------------------------
28// Fragment type
29// ---------------------------------------------------------------------------
30
31/// A parsed fragment from a note body.
32#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct Fragment {
34    /// Slash-delimited heading hierarchy, e.g. `"## Section/### Sub"`.
35    /// Empty string for text before any heading (root fragment).
36    pub heading_path: String,
37    /// Body text under this heading (trimmed of leading/trailing blank lines).
38    pub text: String,
39    /// 0-based position within the note.
40    pub index: usize,
41}
42
43// ---------------------------------------------------------------------------
44// Heading regex
45// ---------------------------------------------------------------------------
46
47fn heading_re() -> &'static Regex {
48    static RE: OnceLock<Regex> = OnceLock::new();
49    RE.get_or_init(|| Regex::new(r"^(#{1,6})\s+(.+)$").expect("heading regex"))
50}
51
52// ---------------------------------------------------------------------------
53// Fragment extraction
54// ---------------------------------------------------------------------------
55
56/// Extract fragments from a markdown or plain-text body.
57///
58/// The caller is responsible for stripping front-matter before calling this.
59/// If the body contains no headings, a single root fragment is returned.
60/// Empty or whitespace-only fragments are skipped.
61pub fn extract_fragments(body: &str) -> Vec<Fragment> {
62    let mut fragments: Vec<Fragment> = Vec::new();
63    // heading_stack: Vec<(level, heading_text_with_hashes)>
64    let mut heading_stack: Vec<(usize, String)> = Vec::new();
65    let mut current_text = String::new();
66    let mut in_code_block = false;
67
68    for line in body.lines() {
69        // Track fenced code blocks to avoid treating `# ` inside them as headings.
70        if line.trim_start().starts_with("```") {
71            in_code_block = !in_code_block;
72            current_text.push_str(line);
73            current_text.push('\n');
74            continue;
75        }
76
77        if in_code_block {
78            current_text.push_str(line);
79            current_text.push('\n');
80            continue;
81        }
82
83        if let Some(caps) = heading_re().captures(line) {
84            // Flush accumulated text as a fragment.
85            flush_fragment(&heading_stack, &mut current_text, &mut fragments);
86
87            let level = caps[1].len();
88            let heading_text = caps[2].trim().to_string();
89            let heading_label = format!("{} {heading_text}", &caps[1]);
90
91            // Pop headings at the same level or deeper.
92            while heading_stack.last().is_some_and(|(lvl, _)| *lvl >= level) {
93                heading_stack.pop();
94            }
95            heading_stack.push((level, heading_label));
96        } else {
97            current_text.push_str(line);
98            current_text.push('\n');
99        }
100    }
101
102    // Flush final accumulated text.
103    flush_fragment(&heading_stack, &mut current_text, &mut fragments);
104
105    // Re-index after filtering.
106    for (i, frag) in fragments.iter_mut().enumerate() {
107        frag.index = i;
108    }
109
110    fragments
111}
112
113/// Flush accumulated text into a fragment if non-empty.
114fn flush_fragment(
115    heading_stack: &[(usize, String)],
116    current_text: &mut String,
117    fragments: &mut Vec<Fragment>,
118) {
119    let trimmed = current_text.trim();
120    if !trimmed.is_empty() {
121        let heading_path = heading_stack
122            .iter()
123            .map(|(_, h)| h.as_str())
124            .collect::<Vec<_>>()
125            .join("/");
126
127        fragments.push(Fragment {
128            heading_path,
129            text: trimmed.to_string(),
130            index: 0, // Will be re-indexed later.
131        });
132    }
133    current_text.clear();
134}
135
136// ---------------------------------------------------------------------------
137// Chunk a content node
138// ---------------------------------------------------------------------------
139
140/// Extract fragments from a content node and persist them as chunks.
141///
142/// 1. Extracts fragments from `node.body_text`
143/// 2. Marks existing chunks stale (idempotent on first chunk)
144/// 3. Upserts new chunks (deduplicates by hash)
145/// 4. Marks the node as `chunked`
146///
147/// Returns the IDs of all active chunks for the node.
148pub async fn chunk_node(
149    pool: &DbPool,
150    account_id: &str,
151    node_id: i64,
152    body_text: &str,
153) -> Result<Vec<i64>, ChunkerError> {
154    let fragments = extract_fragments(body_text);
155
156    // Mark existing chunks stale before upserting new ones.
157    store::mark_chunks_stale(pool, account_id, node_id).await?;
158
159    // Build NewChunk structs with SHA-256 hashes.
160    let new_chunks: Vec<store::NewChunk> = fragments
161        .iter()
162        .map(|f| {
163            let mut hasher = Sha256::new();
164            hasher.update(f.text.as_bytes());
165            let hash = format!("{:x}", hasher.finalize());
166
167            store::NewChunk {
168                heading_path: f.heading_path.clone(),
169                chunk_text: f.text.clone(),
170                chunk_hash: hash,
171                chunk_index: f.index as i64,
172            }
173        })
174        .collect();
175
176    let ids = store::upsert_chunks_for_node(pool, account_id, node_id, &new_chunks).await?;
177
178    // Extract links/tags and persist graph edges (fail-open).
179    super::graph_ingest::extract_and_persist_graph(pool, account_id, node_id, body_text).await;
180
181    // Transition node status: pending → chunked.
182    store::mark_node_chunked(pool, account_id, node_id).await?;
183
184    Ok(ids)
185}
186
187/// Process all pending content nodes for an account: extract and persist fragments.
188///
189/// Returns the total number of nodes chunked.
190pub async fn chunk_pending_nodes(pool: &DbPool, account_id: &str, limit: u32) -> u32 {
191    let nodes = match store::get_pending_content_nodes_for(pool, account_id, limit).await {
192        Ok(n) => n,
193        Err(e) => {
194            tracing::warn!(error = %e, "Failed to get pending nodes for chunking");
195            return 0;
196        }
197    };
198
199    let mut chunked = 0u32;
200    for node in &nodes {
201        match chunk_node(pool, account_id, node.id, &node.body_text).await {
202            Ok(_ids) => {
203                chunked += 1;
204                tracing::debug!(
205                    node_id = node.id,
206                    path = %node.relative_path,
207                    "Chunked content node"
208                );
209            }
210            Err(e) => {
211                tracing::warn!(
212                    node_id = node.id,
213                    path = %node.relative_path,
214                    error = %e,
215                    "Failed to chunk node"
216                );
217            }
218        }
219    }
220
221    chunked
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    // ── Simple markdown document ────────────────────────────────────
229
230    #[test]
231    fn chunk_simple_markdown() {
232        let body = "First paragraph.\n\nSecond paragraph.\n";
233        let frags = extract_fragments(body);
234        assert_eq!(frags.len(), 1);
235        assert_eq!(frags[0].heading_path, "");
236        assert!(frags[0].text.contains("First paragraph"));
237        assert!(frags[0].text.contains("Second paragraph"));
238    }
239
240    // ── Splits on headings ──────────────────────────────────────────
241
242    #[test]
243    fn chunk_splits_on_headings() {
244        let body = "\
245## Introduction
246
247Welcome to the guide.
248
249## Getting Started
250
251Install the CLI tool.
252
253## Advanced Usage
254
255Use the `--verbose` flag.
256";
257        let frags = extract_fragments(body);
258        assert_eq!(frags.len(), 3);
259        assert_eq!(frags[0].heading_path, "## Introduction");
260        assert!(frags[0].text.contains("Welcome"));
261        assert_eq!(frags[1].heading_path, "## Getting Started");
262        assert!(frags[1].text.contains("Install"));
263        assert_eq!(frags[2].heading_path, "## Advanced Usage");
264        assert!(frags[2].text.contains("--verbose"));
265    }
266
267    // ── Empty input ─────────────────────────────────────────────────
268
269    #[test]
270    fn chunk_empty_input_returns_empty() {
271        assert!(extract_fragments("").is_empty());
272    }
273
274    #[test]
275    fn chunk_whitespace_only_returns_empty() {
276        assert!(extract_fragments("   \n\n\t\n  ").is_empty());
277    }
278
279    // ── Long text without headings stays as single fragment ─────────
280
281    #[test]
282    fn chunk_long_text_single_fragment() {
283        let long = "word ".repeat(500);
284        let frags = extract_fragments(&long);
285        assert_eq!(frags.len(), 1);
286        assert_eq!(frags[0].heading_path, "");
287        assert_eq!(frags[0].index, 0);
288    }
289
290    // ── Heading hierarchy preserved ─────────────────────────────────
291
292    #[test]
293    fn chunk_heading_hierarchy_preserved() {
294        let body = "\
295# Top Level
296
297Intro text.
298
299## Section One
300
301Content one.
302
303### Subsection A
304
305Deep content A.
306
307### Subsection B
308
309Deep content B.
310
311## Section Two
312
313Content two.
314";
315        let frags = extract_fragments(body);
316        assert_eq!(frags.len(), 5);
317        assert_eq!(frags[0].heading_path, "# Top Level");
318        assert_eq!(frags[1].heading_path, "# Top Level/## Section One");
319        assert_eq!(
320            frags[2].heading_path,
321            "# Top Level/## Section One/### Subsection A"
322        );
323        assert_eq!(
324            frags[3].heading_path,
325            "# Top Level/## Section One/### Subsection B"
326        );
327        assert_eq!(frags[4].heading_path, "# Top Level/## Section Two");
328    }
329
330    // ── Root text before any heading ────────────────────────────────
331
332    #[test]
333    fn chunk_root_text_before_heading() {
334        let body = "Preamble text.\n\n## Heading\n\nBody.\n";
335        let frags = extract_fragments(body);
336        assert_eq!(frags.len(), 2);
337        assert_eq!(frags[0].heading_path, "");
338        assert!(frags[0].text.contains("Preamble"));
339        assert_eq!(frags[1].heading_path, "## Heading");
340    }
341
342    // ── Code blocks do not create headings ──────────────────────────
343
344    #[test]
345    fn chunk_code_block_headings_ignored() {
346        let body = "\
347## Real Heading
348
349Some text.
350
351```markdown
352# This is inside a code block
353## Also inside
354```
355
356More text after code.
357";
358        let frags = extract_fragments(body);
359        assert_eq!(frags.len(), 1);
360        assert_eq!(frags[0].heading_path, "## Real Heading");
361        assert!(frags[0].text.contains("# This is inside a code block"));
362        assert!(frags[0].text.contains("More text after code"));
363    }
364
365    // ── Consecutive headings with no body skip empty fragments ──────
366
367    #[test]
368    fn chunk_consecutive_headings_skip_empty() {
369        let body = "## A\n## B\n## C\nFinal content.\n";
370        let frags = extract_fragments(body);
371        assert_eq!(frags.len(), 1);
372        assert_eq!(frags[0].heading_path, "## C");
373        assert!(frags[0].text.contains("Final content"));
374    }
375
376    // ── Index values are sequential ─────────────────────────────────
377
378    #[test]
379    fn chunk_indices_sequential() {
380        let body = "Intro.\n\n## A\n\nA text.\n\n## B\n\nB text.\n\n## C\n\nC text.\n";
381        let frags = extract_fragments(body);
382        assert_eq!(frags.len(), 4);
383        for (i, frag) in frags.iter().enumerate() {
384            assert_eq!(frag.index, i, "fragment {i} should have index {i}");
385        }
386    }
387
388    // ── H6 heading level works ──────────────────────────────────────
389
390    #[test]
391    fn chunk_h6_heading() {
392        let body = "###### Deep Heading\n\nDeep content.\n";
393        let frags = extract_fragments(body);
394        assert_eq!(frags.len(), 1);
395        assert_eq!(frags[0].heading_path, "###### Deep Heading");
396    }
397
398    // ── Heading level reset pops stack correctly ────────────────────
399
400    #[test]
401    fn chunk_heading_level_reset() {
402        let body = "\
403### Level 3
404
405Content at 3.
406
407## Level 2
408
409Content at 2.
410";
411        let frags = extract_fragments(body);
412        assert_eq!(frags.len(), 2);
413        assert_eq!(frags[0].heading_path, "### Level 3");
414        assert_eq!(frags[1].heading_path, "## Level 2");
415    }
416
417    // ── Fragment text is trimmed ────────────────────────────────────
418
419    #[test]
420    fn chunk_fragment_text_trimmed() {
421        let body = "## Heading\n\n\n   Some text.   \n\n\n";
422        let frags = extract_fragments(body);
423        assert_eq!(frags.len(), 1);
424        assert_eq!(frags[0].text, "Some text.");
425    }
426}