Skip to main content

rust_memex/rag/
pipeline.rs

1//! Async pipeline for concurrent RAG indexing.
2//!
3//! This module provides an optional pipeline mode where file reading, chunking,
4//! embedding, and storage run concurrently using tokio channels.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────┐     ┌──────────────┐     ┌───────────────┐     ┌─────────────┐
10//! │ File Reader │ ──► │   Chunker    │ ──► │   Embedder    │ ──► │   Storage   │
11//! └─────────────┘     └──────────────┘     └───────────────┘     └─────────────┘
12//!       tx1                 rx1/tx2              rx2/tx3              rx3
13//! ```
14//!
15//! Each stage runs in its own tokio::spawn, connected via bounded mpsc channels
16//! for backpressure. This allows overlapping I/O, CPU, and GPU work.
17//!
18//! Pipeline mode is opt-in via the `--pipeline` CLI flag or by calling
19//! `run_pipeline()` directly.
20
21use anyhow::Result;
22use std::collections::HashSet;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::sync::{Mutex, mpsc};
27use tracing::{debug, error, info, warn};
28
29use crate::embeddings::EmbeddingClient;
30use crate::preprocessing::{PreprocessingConfig, Preprocessor};
31use crate::rag::{ChunkOpts, ChunkerKind, OuterSynthesis, SliceMode, detect_default_chunker};
32use crate::storage::{ChromaDocument, StorageManager};
33
34/// Channel buffer size for backpressure.
35/// 100 items provides good throughput while limiting memory usage.
36const CHANNEL_BUFFER_SIZE: usize = 100;
37
38/// Batch size for storage writes to avoid RAM explosion.
39const STORAGE_BATCH_SIZE: usize = 100;
40
41/// File content with metadata for pipeline processing.
42#[derive(Debug, Clone)]
43pub struct FileContent {
44    /// Path to the source file.
45    pub path: PathBuf,
46    /// Extracted text content.
47    pub text: String,
48    /// Target namespace for storage.
49    pub namespace: String,
50    /// SHA256 content hash for deduplication.
51    pub content_hash: String,
52}
53
54/// A chunk ready for embedding.
55#[derive(Debug, Clone)]
56pub struct Chunk {
57    /// Chunk ID (generated from content).
58    pub id: String,
59    /// Chunk content text.
60    pub content: String,
61    /// Source file path.
62    pub source_path: PathBuf,
63    /// Target namespace.
64    pub namespace: String,
65    /// SHA256 of THIS chunk's text. Drives chunk-level deduplication.
66    /// Differs across the four onion layers from one source.
67    pub chunk_hash: String,
68    /// SHA256 of the source document (same value across all four onion
69    /// layers from one source). Drives pre-index source-level dedup so we
70    /// never re-embed an already-ingested file.
71    pub source_hash: String,
72    /// Onion slice layer (if using onion mode).
73    pub layer: u8,
74    /// Parent slice ID (for onion hierarchy).
75    pub parent_id: Option<String>,
76    /// Children slice IDs (for onion hierarchy).
77    pub children_ids: Vec<String>,
78    /// Extracted keywords.
79    pub keywords: Vec<String>,
80    /// Additional metadata.
81    pub metadata: serde_json::Value,
82}
83
84/// An embedded chunk ready for storage.
85#[derive(Debug, Clone)]
86pub struct EmbeddedChunk {
87    /// Original chunk data.
88    pub chunk: Chunk,
89    /// Embedding vector.
90    pub embedding: Vec<f32>,
91}
92
93#[derive(Debug, Clone)]
94struct ChunkBatch {
95    path: PathBuf,
96    content_hash: String,
97    chunks: Vec<Chunk>,
98}
99
100#[derive(Debug, Clone)]
101struct EmbeddedFile {
102    path: PathBuf,
103    content_hash: String,
104    chunks: Vec<EmbeddedChunk>,
105}
106
107#[derive(Debug, Clone, Copy)]
108struct EmbedRuntimeSettings {
109    max_batch_chars: usize,
110    max_batch_items: usize,
111    concurrency: usize,
112}
113
114impl EmbedRuntimeSettings {
115    fn new(max_batch_chars: usize, max_batch_items: usize, concurrency: usize) -> Self {
116        Self {
117            max_batch_chars: max_batch_chars.max(1),
118            max_batch_items: max_batch_items.max(1),
119            concurrency: concurrency.max(1),
120        }
121    }
122}
123
124/// Adaptive governor envelope for pipeline embedding throughput.
125#[derive(Debug, Clone)]
126pub struct PipelineGovernorConfig {
127    pub min_batch_chars: usize,
128    pub max_batch_chars: usize,
129    pub min_batch_items: usize,
130    pub max_batch_items: usize,
131    pub min_concurrency: usize,
132    pub max_concurrency: usize,
133    pub target_latency: Duration,
134    pub pressure_latency: Duration,
135    pub growth_cooldown: Duration,
136    pub pressure_cooldown: Duration,
137    pub backlog_low_watermark: usize,
138    pub storage_backlog_high_watermark: usize,
139}
140
141impl PipelineGovernorConfig {
142    pub fn adaptive(
143        max_batch_chars: usize,
144        max_batch_items: usize,
145        max_concurrency: usize,
146    ) -> Self {
147        let max_batch_chars = max_batch_chars.max(1);
148        let max_batch_items = max_batch_items.max(1);
149        let max_concurrency = max_concurrency.max(1);
150
151        Self {
152            min_batch_chars: (max_batch_chars / 4).max(4_096).min(max_batch_chars),
153            max_batch_chars,
154            min_batch_items: (max_batch_items / 4).max(1).min(max_batch_items),
155            max_batch_items,
156            min_concurrency: 1,
157            max_concurrency,
158            target_latency: Duration::from_millis(900),
159            pressure_latency: Duration::from_millis(2_200),
160            growth_cooldown: Duration::from_secs(3),
161            pressure_cooldown: Duration::from_millis(750),
162            backlog_low_watermark: 2,
163            storage_backlog_high_watermark: 3,
164        }
165    }
166
167    fn initial_settings(&self) -> EmbedRuntimeSettings {
168        EmbedRuntimeSettings::new(
169            self.min_batch_chars,
170            self.min_batch_items,
171            self.min_concurrency,
172        )
173    }
174}
175
176#[derive(Debug, Clone)]
177struct PipelineGovernorAdjustment {
178    settings: EmbedRuntimeSettings,
179    mode: String,
180    reason: String,
181}
182
183#[derive(Debug, Clone)]
184struct PipelineGovernor {
185    config: PipelineGovernorConfig,
186    current: EmbedRuntimeSettings,
187    last_growth_at: Option<Instant>,
188    last_pressure_at: Option<Instant>,
189}
190
191impl PipelineGovernor {
192    fn new(config: PipelineGovernorConfig) -> Self {
193        let current = config.initial_settings();
194        Self {
195            config,
196            current,
197            last_growth_at: None,
198            last_pressure_at: None,
199        }
200    }
201
202    fn current_settings(&self) -> EmbedRuntimeSettings {
203        self.current
204    }
205
206    fn initial_adjustment(&self) -> PipelineGovernorAdjustment {
207        PipelineGovernorAdjustment {
208            settings: self.current,
209            mode: "adaptive".to_string(),
210            reason: "warming up from conservative limits".to_string(),
211        }
212    }
213
214    fn on_success(
215        &mut self,
216        elapsed: Duration,
217        snapshot: &PipelineSnapshot,
218    ) -> Option<PipelineGovernorAdjustment> {
219        let backlog = snapshot.chunker_queue_depth + snapshot.reader_queue_depth;
220        let storage_pressure =
221            snapshot.storage_queue_depth >= self.config.storage_backlog_high_watermark;
222        let slow_batch = elapsed >= self.config.pressure_latency;
223
224        if (storage_pressure || slow_batch) && self.pressure_ready() {
225            let mut changed = false;
226            let mut reasons = Vec::new();
227
228            if self.current.max_batch_items > self.config.min_batch_items {
229                let next_items =
230                    ((self.current.max_batch_items * 2) / 3).max(self.config.min_batch_items);
231                if next_items != self.current.max_batch_items {
232                    self.current.max_batch_items = next_items;
233                    changed = true;
234                    reasons.push(format!("items {}", next_items));
235                }
236            }
237
238            if self.current.max_batch_chars > self.config.min_batch_chars {
239                let next_chars =
240                    ((self.current.max_batch_chars * 2) / 3).max(self.config.min_batch_chars);
241                if next_chars != self.current.max_batch_chars {
242                    self.current.max_batch_chars = next_chars;
243                    changed = true;
244                    reasons.push(format!("chars {}", next_chars));
245                }
246            }
247
248            if storage_pressure && self.current.concurrency > self.config.min_concurrency {
249                self.current.concurrency -= 1;
250                changed = true;
251                reasons.push(format!("concurrency {}", self.current.concurrency));
252            }
253
254            if changed {
255                self.last_pressure_at = Some(Instant::now());
256                let reason = if storage_pressure {
257                    format!(
258                        "pressure: storage backlog {} -> {}",
259                        snapshot.storage_queue_depth,
260                        reasons.join(", ")
261                    )
262                } else {
263                    format!(
264                        "pressure: embed {:.0}ms -> {}",
265                        elapsed.as_secs_f64() * 1_000.0,
266                        reasons.join(", ")
267                    )
268                };
269                return Some(PipelineGovernorAdjustment {
270                    settings: self.current,
271                    mode: "adaptive".to_string(),
272                    reason,
273                });
274            }
275        }
276
277        if elapsed > self.config.target_latency
278            || backlog < self.config.backlog_low_watermark
279            || snapshot.storage_queue_depth > 1
280            || !self.growth_ready()
281        {
282            return None;
283        }
284
285        if self.current.max_batch_items < self.config.max_batch_items {
286            let step = (self.current.max_batch_items / 4).max(1);
287            let next_items = (self.current.max_batch_items + step).min(self.config.max_batch_items);
288            if next_items != self.current.max_batch_items {
289                self.current.max_batch_items = next_items;
290                self.last_growth_at = Some(Instant::now());
291                return Some(PipelineGovernorAdjustment {
292                    settings: self.current,
293                    mode: "adaptive".to_string(),
294                    reason: format!(
295                        "backlog {} with fast embed {:.0}ms -> items {}",
296                        backlog,
297                        elapsed.as_secs_f64() * 1_000.0,
298                        next_items
299                    ),
300                });
301            }
302        }
303
304        if self.current.max_batch_chars < self.config.max_batch_chars {
305            let step = (self.current.max_batch_chars / 4).max(4_096);
306            let next_chars = (self.current.max_batch_chars + step).min(self.config.max_batch_chars);
307            if next_chars != self.current.max_batch_chars {
308                self.current.max_batch_chars = next_chars;
309                self.last_growth_at = Some(Instant::now());
310                return Some(PipelineGovernorAdjustment {
311                    settings: self.current,
312                    mode: "adaptive".to_string(),
313                    reason: format!(
314                        "backlog {} with fast embed {:.0}ms -> chars {}",
315                        backlog,
316                        elapsed.as_secs_f64() * 1_000.0,
317                        next_chars
318                    ),
319                });
320            }
321        }
322
323        let concurrency_backlog = self
324            .current
325            .concurrency
326            .saturating_mul(2)
327            .max(self.config.backlog_low_watermark);
328        if backlog >= concurrency_backlog && self.current.concurrency < self.config.max_concurrency
329        {
330            self.current.concurrency += 1;
331            self.last_growth_at = Some(Instant::now());
332            return Some(PipelineGovernorAdjustment {
333                settings: self.current,
334                mode: "adaptive".to_string(),
335                reason: format!(
336                    "backlog {} sustained with fast embed {:.0}ms -> concurrency {}",
337                    backlog,
338                    elapsed.as_secs_f64() * 1_000.0,
339                    self.current.concurrency
340                ),
341            });
342        }
343
344        None
345    }
346
347    fn on_error(
348        &mut self,
349        snapshot: &PipelineSnapshot,
350        message: &str,
351    ) -> Option<PipelineGovernorAdjustment> {
352        if !self.pressure_ready() {
353            return None;
354        }
355
356        let mut changed = false;
357
358        if self.current.concurrency > self.config.min_concurrency {
359            self.current.concurrency = self.config.min_concurrency;
360            changed = true;
361        }
362        if self.current.max_batch_items > self.config.min_batch_items {
363            self.current.max_batch_items = self.config.min_batch_items;
364            changed = true;
365        }
366        if self.current.max_batch_chars > self.config.min_batch_chars {
367            self.current.max_batch_chars = self.config.min_batch_chars;
368            changed = true;
369        }
370
371        if !changed {
372            return None;
373        }
374
375        self.last_pressure_at = Some(Instant::now());
376        Some(PipelineGovernorAdjustment {
377            settings: self.current,
378            mode: "adaptive".to_string(),
379            reason: format!(
380                "error with backlog {} and storage {}: {}",
381                snapshot.chunker_queue_depth + snapshot.reader_queue_depth,
382                snapshot.storage_queue_depth,
383                message
384            ),
385        })
386    }
387
388    fn growth_ready(&self) -> bool {
389        self.last_growth_at
390            .map(|instant| instant.elapsed() >= self.config.growth_cooldown)
391            .unwrap_or(true)
392    }
393
394    fn pressure_ready(&self) -> bool {
395        self.last_pressure_at
396            .map(|instant| instant.elapsed() >= self.config.pressure_cooldown)
397            .unwrap_or(true)
398    }
399}
400
401#[derive(Debug)]
402struct EmbedWorkerResult {
403    path: PathBuf,
404    content_hash: String,
405    count: usize,
406    chars: usize,
407    elapsed: Duration,
408    chunks: Option<Vec<EmbeddedChunk>>,
409    error: Option<String>,
410}
411
412/// Pipeline configuration.
413#[derive(Debug, Clone)]
414pub struct PipelineConfig {
415    /// Number of files to buffer between reader and chunker.
416    pub reader_buffer: usize,
417    /// Number of chunk batches to buffer between chunker and embedder.
418    pub chunker_buffer: usize,
419    /// Number of embedded batches to buffer between embedder and storage.
420    pub embedder_buffer: usize,
421    /// Slicing mode for chunking.
422    pub slice_mode: SliceMode,
423    /// Optional provider override. When absent, the chunker stage routes per file.
424    pub chunker: Option<ChunkerKind>,
425    /// Outer-layer synthesis strategy for onion modes (spec P3). Defaults to
426    /// the legacy `Keyword` (TF-based) path; set to `Llm { model, endpoint }`
427    /// to route the outer layer through a local Ollama model. Failures are
428    /// logged and silently fall back to keyword outer so the pipeline never
429    /// stalls on Ollama unavailability.
430    pub outer_synthesis: OuterSynthesis,
431    /// Enable storage-backed deduplication.
432    pub dedup_enabled: bool,
433    /// Optional semantic cleanup applied after raw source hashing/dedup and
434    /// before chunking. The source hash intentionally remains the raw file hash
435    /// so pipeline outputs stay replayable from the original corpus.
436    pub preprocess_config: Option<PreprocessingConfig>,
437    /// Maximum number of embedding requests allowed in flight.
438    pub embed_concurrency: usize,
439    /// Optional adaptive governor for runtime batch/concurrency tuning.
440    pub governor: Option<PipelineGovernorConfig>,
441    /// Optional event stream for progress/reporting consumers.
442    pub event_sender: Option<mpsc::UnboundedSender<PipelineEvent>>,
443    /// Total files discovered for this operator-visible run.
444    ///
445    /// This can be larger than the scheduled pipeline file count when resume
446    /// filtered out already committed files before entering the pipeline.
447    pub discovered_files: usize,
448    /// Files already satisfied before this pipeline run started.
449    pub resumed_files: usize,
450}
451
452impl Default for PipelineConfig {
453    fn default() -> Self {
454        Self {
455            reader_buffer: CHANNEL_BUFFER_SIZE,
456            chunker_buffer: CHANNEL_BUFFER_SIZE,
457            embedder_buffer: CHANNEL_BUFFER_SIZE,
458            slice_mode: SliceMode::default(),
459            chunker: None,
460            outer_synthesis: OuterSynthesis::default(),
461            dedup_enabled: true,
462            preprocess_config: None,
463            embed_concurrency: 1,
464            governor: None,
465            event_sender: None,
466            discovered_files: 0,
467            resumed_files: 0,
468        }
469    }
470}
471
472/// Pipeline statistics for progress reporting.
473#[derive(Debug, Default, Clone)]
474pub struct PipelineStats {
475    /// Total files scheduled for this pipeline run.
476    pub total_files: usize,
477    /// Total files discovered for this operator-visible run.
478    pub discovered_files: usize,
479    /// Files already satisfied before this pipeline run started.
480    pub resumed_files: usize,
481    /// Files successfully read and handed to the chunker.
482    pub files_read: usize,
483    /// Files skipped before chunking (for example exact duplicates).
484    pub files_skipped: usize,
485    /// Files durably committed to storage.
486    pub files_committed: usize,
487    /// Files that failed in any stage.
488    pub files_failed: usize,
489    /// Chunks created.
490    pub chunks_created: usize,
491    /// Chunks embedded.
492    pub chunks_embedded: usize,
493    /// Chunks durably stored.
494    pub chunks_stored: usize,
495    /// Stage-local errors encountered.
496    pub errors: usize,
497}
498
499/// Runtime snapshot for live progress consumers.
500#[derive(Debug, Clone, Default)]
501pub struct PipelineSnapshot {
502    pub total_files: usize,
503    pub discovered_files: usize,
504    pub resumed_files: usize,
505    pub files_read: usize,
506    pub files_skipped: usize,
507    pub files_committed: usize,
508    pub files_failed: usize,
509    pub chunks_created: usize,
510    pub chunks_embedded: usize,
511    pub chunks_stored: usize,
512    pub errors: usize,
513    pub reader_queue_depth: usize,
514    pub chunker_queue_depth: usize,
515    pub storage_queue_depth: usize,
516    pub current_embed_batch_items: usize,
517    pub current_embed_batch_chars: usize,
518    pub embed_batch_items_limit: usize,
519    pub embed_batch_chars_limit: usize,
520    pub embed_active_requests: usize,
521    pub embed_concurrency_limit: usize,
522    pub avg_embed_batch_ms: Option<f64>,
523    pub files_per_sec: f64,
524    pub chunks_per_sec: f64,
525    pub eta: Option<Duration>,
526    pub elapsed: Duration,
527    pub bottleneck: String,
528    pub governor_mode: String,
529    pub governor_reason: String,
530}
531
532impl PipelineSnapshot {
533    pub fn to_stats(&self) -> PipelineStats {
534        PipelineStats {
535            total_files: self.total_files,
536            discovered_files: self.discovered_files,
537            resumed_files: self.resumed_files,
538            files_read: self.files_read,
539            files_skipped: self.files_skipped,
540            files_committed: self.files_committed,
541            files_failed: self.files_failed,
542            chunks_created: self.chunks_created,
543            chunks_embedded: self.chunks_embedded,
544            chunks_stored: self.chunks_stored,
545            errors: self.errors,
546        }
547    }
548}
549
550/// Progress and lifecycle events emitted by the pipeline.
551#[derive(Debug, Clone)]
552pub enum PipelineEvent {
553    FileRead {
554        path: PathBuf,
555        content_hash: String,
556        bytes: usize,
557    },
558    FileSkipped {
559        path: PathBuf,
560        content_hash: String,
561        reason: String,
562    },
563    ChunksCreated {
564        path: PathBuf,
565        content_hash: String,
566        count: usize,
567    },
568    EmbedStarted {
569        path: PathBuf,
570        content_hash: String,
571        count: usize,
572        chars: usize,
573        batch_items_limit: usize,
574        batch_chars_limit: usize,
575        concurrency_limit: usize,
576    },
577    ChunksEmbedded {
578        path: PathBuf,
579        content_hash: String,
580        count: usize,
581        chars: usize,
582        elapsed: Duration,
583    },
584    GovernorAdjusted {
585        batch_items_limit: usize,
586        batch_chars_limit: usize,
587        concurrency_limit: usize,
588        mode: String,
589        reason: String,
590    },
591    FileCommitted {
592        path: PathBuf,
593        content_hash: String,
594        chunk_count: usize,
595    },
596    Error {
597        path: Option<PathBuf>,
598        stage: &'static str,
599        message: String,
600    },
601    Snapshot(Box<PipelineSnapshot>),
602}
603
604/// Result of pipeline execution.
605#[derive(Debug)]
606pub struct PipelineResult {
607    /// Pipeline statistics.
608    pub stats: PipelineStats,
609    /// Error messages if any.
610    pub errors: Vec<String>,
611}
612
613#[derive(Debug)]
614struct PipelineProgressState {
615    snapshot: PipelineSnapshot,
616    started_at: Instant,
617    failed_paths: HashSet<String>,
618    error_messages: Vec<String>,
619}
620
621impl PipelineProgressState {
622    fn new(
623        total_files: usize,
624        discovered_files: usize,
625        resumed_files: usize,
626        initial_runtime: EmbedRuntimeSettings,
627        governor_mode: String,
628        governor_reason: String,
629    ) -> Self {
630        let snapshot = PipelineSnapshot {
631            total_files,
632            discovered_files,
633            resumed_files,
634            embed_batch_items_limit: initial_runtime.max_batch_items,
635            embed_batch_chars_limit: initial_runtime.max_batch_chars,
636            embed_concurrency_limit: initial_runtime.concurrency,
637            bottleneck: "reader".to_string(),
638            governor_mode,
639            governor_reason,
640            ..Default::default()
641        };
642
643        Self {
644            snapshot,
645            started_at: Instant::now(),
646            failed_paths: HashSet::new(),
647            error_messages: Vec::new(),
648        }
649    }
650
651    fn record_failure(&mut self, path: &Path) {
652        let key = path.to_string_lossy().to_string();
653        if self.failed_paths.insert(key) {
654            self.snapshot.files_failed += 1;
655        }
656    }
657
658    fn apply(&mut self, event: &PipelineEvent) -> PipelineSnapshot {
659        match event {
660            PipelineEvent::FileRead { .. } => {
661                self.snapshot.files_read += 1;
662                self.snapshot.reader_queue_depth += 1;
663            }
664            PipelineEvent::FileSkipped { .. } => {
665                self.snapshot.files_skipped += 1;
666            }
667            PipelineEvent::ChunksCreated { count, .. } => {
668                self.snapshot.chunks_created += count;
669                self.snapshot.reader_queue_depth =
670                    self.snapshot.reader_queue_depth.saturating_sub(1);
671                self.snapshot.chunker_queue_depth += 1;
672            }
673            PipelineEvent::EmbedStarted {
674                batch_items_limit,
675                batch_chars_limit,
676                concurrency_limit,
677                ..
678            } => {
679                self.snapshot.embed_batch_items_limit = *batch_items_limit;
680                self.snapshot.embed_batch_chars_limit = *batch_chars_limit;
681                self.snapshot.embed_concurrency_limit = *concurrency_limit;
682                self.snapshot.embed_active_requests += 1;
683            }
684            PipelineEvent::ChunksEmbedded {
685                count,
686                chars,
687                elapsed,
688                ..
689            } => {
690                self.snapshot.chunks_embedded += count;
691                self.snapshot.chunker_queue_depth =
692                    self.snapshot.chunker_queue_depth.saturating_sub(1);
693                self.snapshot.storage_queue_depth += 1;
694                self.snapshot.embed_active_requests =
695                    self.snapshot.embed_active_requests.saturating_sub(1);
696                self.snapshot.current_embed_batch_items = *count;
697                self.snapshot.current_embed_batch_chars = *chars;
698                let latency_ms = elapsed.as_secs_f64() * 1_000.0;
699                self.snapshot.avg_embed_batch_ms = Some(
700                    self.snapshot
701                        .avg_embed_batch_ms
702                        .map(|existing| (existing * 0.7) + (latency_ms * 0.3))
703                        .unwrap_or(latency_ms),
704                );
705            }
706            PipelineEvent::GovernorAdjusted {
707                batch_items_limit,
708                batch_chars_limit,
709                concurrency_limit,
710                mode,
711                reason,
712            } => {
713                self.snapshot.embed_batch_items_limit = *batch_items_limit;
714                self.snapshot.embed_batch_chars_limit = *batch_chars_limit;
715                self.snapshot.embed_concurrency_limit = *concurrency_limit;
716                self.snapshot.governor_mode = mode.clone();
717                self.snapshot.governor_reason = reason.clone();
718            }
719            PipelineEvent::FileCommitted { chunk_count, .. } => {
720                self.snapshot.files_committed += 1;
721                self.snapshot.chunks_stored += chunk_count;
722                self.snapshot.storage_queue_depth =
723                    self.snapshot.storage_queue_depth.saturating_sub(1);
724            }
725            PipelineEvent::Error {
726                path,
727                stage,
728                message,
729            } => {
730                self.snapshot.errors += 1;
731                match *stage {
732                    "embedder" => {
733                        self.snapshot.chunker_queue_depth =
734                            self.snapshot.chunker_queue_depth.saturating_sub(1);
735                        self.snapshot.embed_active_requests =
736                            self.snapshot.embed_active_requests.saturating_sub(1);
737                    }
738                    "storage" => {
739                        self.snapshot.storage_queue_depth =
740                            self.snapshot.storage_queue_depth.saturating_sub(1);
741                    }
742                    _ => {}
743                }
744                if let Some(path) = path {
745                    self.record_failure(path);
746                }
747                self.error_messages.push(match path {
748                    Some(path) => format!("{} [{}]: {}", stage, path.display(), message),
749                    None => format!("{}: {}", stage, message),
750                });
751            }
752            PipelineEvent::Snapshot(snapshot) => {
753                self.snapshot = snapshot.as_ref().clone();
754            }
755        }
756
757        self.refresh_snapshot()
758    }
759
760    fn refresh_snapshot(&mut self) -> PipelineSnapshot {
761        let elapsed = self.started_at.elapsed();
762        let terminal_files = self.snapshot.files_committed
763            + self.snapshot.files_skipped
764            + self.snapshot.files_failed;
765        let elapsed_secs = elapsed.as_secs_f64();
766
767        self.snapshot.elapsed = elapsed;
768        self.snapshot.files_per_sec = if elapsed_secs > 0.0 {
769            terminal_files as f64 / elapsed_secs
770        } else {
771            0.0
772        };
773        self.snapshot.chunks_per_sec = if elapsed_secs > 0.0 {
774            self.snapshot.chunks_stored as f64 / elapsed_secs
775        } else {
776            0.0
777        };
778
779        let remaining_files = self.snapshot.total_files.saturating_sub(terminal_files);
780        self.snapshot.eta = if self.snapshot.files_per_sec > 0.0 && remaining_files > 0 {
781            Some(Duration::from_secs_f64(
782                remaining_files as f64 / self.snapshot.files_per_sec,
783            ))
784        } else {
785            None
786        };
787
788        self.snapshot.bottleneck = determine_bottleneck(&self.snapshot);
789        self.snapshot.clone()
790    }
791}
792
793fn determine_bottleneck(snapshot: &PipelineSnapshot) -> String {
794    let terminal_files = snapshot.files_committed + snapshot.files_skipped + snapshot.files_failed;
795    if terminal_files >= snapshot.total_files {
796        return "complete".to_string();
797    }
798
799    let mut stage = "idle";
800    let mut depth = 0usize;
801
802    if snapshot.reader_queue_depth > depth {
803        stage = "chunker";
804        depth = snapshot.reader_queue_depth;
805    }
806    if snapshot.chunker_queue_depth > depth {
807        stage = "embedder";
808        depth = snapshot.chunker_queue_depth;
809    }
810    if snapshot.storage_queue_depth > depth {
811        stage = "storage";
812        depth = snapshot.storage_queue_depth;
813    }
814    if snapshot.embed_active_requests > 0
815        && snapshot.embed_active_requests >= snapshot.embed_concurrency_limit.max(1)
816        && depth <= snapshot.chunker_queue_depth
817    {
818        stage = "embedder";
819        depth = snapshot.embed_active_requests;
820    }
821    if depth == 0 && snapshot.files_read < snapshot.total_files {
822        stage = "reader";
823    }
824
825    stage.to_string()
826}
827
828#[derive(Clone)]
829struct PipelineObserver {
830    sender: Option<mpsc::UnboundedSender<PipelineEvent>>,
831    state: Arc<Mutex<PipelineProgressState>>,
832}
833
834impl PipelineObserver {
835    fn new(
836        total_files: usize,
837        discovered_files: usize,
838        resumed_files: usize,
839        sender: Option<mpsc::UnboundedSender<PipelineEvent>>,
840        initial_runtime: EmbedRuntimeSettings,
841        governor_mode: String,
842        governor_reason: String,
843    ) -> Self {
844        Self {
845            sender,
846            state: Arc::new(Mutex::new(PipelineProgressState::new(
847                total_files,
848                discovered_files,
849                resumed_files,
850                initial_runtime,
851                governor_mode,
852                governor_reason,
853            ))),
854        }
855    }
856
857    async fn emit(&self, event: PipelineEvent) {
858        let snapshot = {
859            let mut state = self.state.lock().await;
860            state.apply(&event)
861        };
862
863        if let Some(sender) = &self.sender {
864            let _ = sender.send(event);
865            let _ = sender.send(PipelineEvent::Snapshot(Box::new(snapshot)));
866        }
867    }
868
869    async fn emit_initial_snapshot(&self) {
870        let snapshot = {
871            let mut state = self.state.lock().await;
872            state.refresh_snapshot()
873        };
874
875        if let Some(sender) = &self.sender {
876            let _ = sender.send(PipelineEvent::Snapshot(Box::new(snapshot)));
877        }
878    }
879
880    async fn result(&self) -> PipelineResult {
881        let state = self.state.lock().await;
882        PipelineResult {
883            stats: state.snapshot.to_stats(),
884            errors: state.error_messages.clone(),
885        }
886    }
887
888    async fn snapshot(&self) -> PipelineSnapshot {
889        let state = self.state.lock().await;
890        state.snapshot.clone()
891    }
892}
893
894// =============================================================================
895// STAGE 1: FILE READER
896// =============================================================================
897
898/// Stage 1: Read files and extract text content.
899///
900/// Reads each file, extracts text (supports PDF, text files), and sends
901/// to the chunker stage. Handles deduplication check at the storage level.
902async fn stage_read_files(
903    files: Vec<PathBuf>,
904    namespace: String,
905    storage: Arc<StorageManager>,
906    dedup_enabled: bool,
907    preprocess_config: Option<PreprocessingConfig>,
908    tx: mpsc::Sender<FileContent>,
909    observer: PipelineObserver,
910) {
911    let preprocessor = preprocess_config.map(|config| {
912        let min_content_length = config.min_content_length;
913        (min_content_length, Preprocessor::new(config))
914    });
915
916    for path in files {
917        let text = match extract_file_text(&path).await {
918            Ok(text) => text,
919            Err(err) => {
920                warn!("Failed to read file {:?}: {}", path, err);
921                observer
922                    .emit(PipelineEvent::Error {
923                        path: Some(path.clone()),
924                        stage: "reader",
925                        message: err.to_string(),
926                    })
927                    .await;
928                continue;
929            }
930        };
931
932        let content_hash = crate::rag::compute_content_hash(&text);
933
934        if dedup_enabled {
935            // Source-level dedup: skip if any chunk in this namespace already
936            // came from a document with this exact text (P4). Falls back to
937            // per-chunk content_hash check for pre-v4 namespaces where the
938            // source_hash column may not yet exist.
939            let already_indexed = match storage.has_source_hash(&namespace, &content_hash).await {
940                Ok(true) => true,
941                Ok(false) => match storage.has_content_hash(&namespace, &content_hash).await {
942                    Ok(true) => true,
943                    Ok(false) => false,
944                    Err(err) => {
945                        warn!("content_hash dedup fallback failed for {:?}: {}", path, err);
946                        false
947                    }
948                },
949                Err(err) => {
950                    warn!("source_hash dedup check failed for {:?}: {}", path, err);
951                    false
952                }
953            };
954
955            if already_indexed {
956                // Spec P4 acceptance criterion: "Pipeline log: każdy skipped
957                // duplicate source jedna linia z source path + source_hash."
958                // Promoted from `debug!` to `info!` so the line shows in the
959                // default operator run log without needing RUST_LOG=debug; the
960                // `--allow-duplicates` flag on `index` is the documented escape
961                // hatch when an operator actually wants to re-embed.
962                info!(
963                    "Skip duplicate source: {} (source_hash {})",
964                    path.display(),
965                    &content_hash[..16]
966                );
967                observer
968                    .emit(PipelineEvent::FileSkipped {
969                        path: path.clone(),
970                        content_hash,
971                        reason: "exact duplicate".to_string(),
972                    })
973                    .await;
974                continue;
975            }
976        }
977
978        let text = if let Some((min_content_length, preprocessor)) = &preprocessor {
979            let cleaned = preprocessor.extract_semantic_content(&text);
980            if cleaned.trim().len() < *min_content_length {
981                observer
982                    .emit(PipelineEvent::FileSkipped {
983                        path: path.clone(),
984                        content_hash,
985                        reason: "preprocessed content below min length".to_string(),
986                    })
987                    .await;
988                continue;
989            }
990            cleaned
991        } else {
992            text
993        };
994
995        let bytes = text.len();
996        let content = FileContent {
997            path: path.clone(),
998            text,
999            namespace: namespace.clone(),
1000            content_hash: content_hash.clone(),
1001        };
1002
1003        if tx.send(content).await.is_err() {
1004            debug!("Reader: channel closed, stopping");
1005            break;
1006        }
1007
1008        observer
1009            .emit(PipelineEvent::FileRead {
1010                path,
1011                content_hash,
1012                bytes,
1013            })
1014            .await;
1015    }
1016
1017    info!("Reader stage complete");
1018}
1019
1020/// Extract text content from a file (PDF or text).
1021async fn extract_file_text(path: &Path) -> Result<String> {
1022    let ext = path
1023        .extension()
1024        .and_then(|e| e.to_str())
1025        .unwrap_or("")
1026        .to_lowercase();
1027
1028    if ext == "pdf" {
1029        let path = path.to_path_buf();
1030        let pdf_text =
1031            tokio::task::spawn_blocking(move || pdf_extract::extract_text(&path)).await??;
1032        return Ok(pdf_text);
1033    }
1034
1035    let (_path, content) = crate::path_utils::safe_read_to_string_async(path).await?;
1036    Ok(content)
1037}
1038
1039// =============================================================================
1040// STAGE 2: CHUNKER
1041// =============================================================================
1042
1043/// Stage 2: Create chunks/slices from file content.
1044async fn stage_chunk_content(
1045    mut rx: mpsc::Receiver<FileContent>,
1046    tx: mpsc::Sender<ChunkBatch>,
1047    slice_mode: SliceMode,
1048    chunker: Option<ChunkerKind>,
1049    outer_synthesis: OuterSynthesis,
1050    observer: PipelineObserver,
1051) {
1052    while let Some(file_content) = rx.recv().await {
1053        let path = file_content.path.clone();
1054        let content_hash = file_content.content_hash.clone();
1055        let selected_chunker = select_pipeline_chunker(
1056            chunker,
1057            slice_mode,
1058            &file_content.path,
1059            &file_content.namespace,
1060        );
1061        let opts = ChunkOpts::new(
1062            selected_chunker,
1063            selected_chunker.slice_mode(slice_mode),
1064            outer_synthesis.clone(),
1065        );
1066        let provider = selected_chunker.into_provider();
1067        let chunks = match provider.chunk(&file_content, &opts).await {
1068            Ok(chunks) => chunks,
1069            Err(err) => {
1070                warn!(
1071                    "Chunker {} failed for {}: {}",
1072                    provider.name(),
1073                    file_content.path.display(),
1074                    err
1075                );
1076                Vec::new()
1077            }
1078        };
1079        let count = chunks.len();
1080
1081        if tx
1082            .send(ChunkBatch {
1083                path: path.clone(),
1084                content_hash: content_hash.clone(),
1085                chunks,
1086            })
1087            .await
1088            .is_err()
1089        {
1090            debug!("Chunker: channel closed, stopping");
1091            break;
1092        }
1093
1094        observer
1095            .emit(PipelineEvent::ChunksCreated {
1096                path,
1097                content_hash,
1098                count,
1099            })
1100            .await;
1101    }
1102
1103    info!("Chunker stage complete");
1104}
1105
1106fn select_pipeline_chunker(
1107    explicit: Option<ChunkerKind>,
1108    requested_slice_mode: SliceMode,
1109    source_path: &Path,
1110    namespace: &str,
1111) -> ChunkerKind {
1112    if let Some(chunker) = explicit {
1113        return chunker;
1114    }
1115
1116    if requested_slice_mode == SliceMode::Flat {
1117        return ChunkerKind::Flat;
1118    }
1119
1120    detect_default_chunker(source_path, namespace)
1121}
1122
1123// =============================================================================
1124// STAGE 3: EMBEDDER
1125// =============================================================================
1126
1127/// Stage 3: Embed chunks using the embedding client.
1128async fn stage_embed_chunks(
1129    mut rx: mpsc::Receiver<ChunkBatch>,
1130    tx: mpsc::Sender<EmbeddedFile>,
1131    base_client: EmbeddingClient,
1132    embed_concurrency: usize,
1133    governor_config: Option<PipelineGovernorConfig>,
1134    observer: PipelineObserver,
1135) {
1136    let fixed_settings = {
1137        let (max_batch_chars, max_batch_items) = base_client.batch_limits();
1138        EmbedRuntimeSettings::new(max_batch_chars, max_batch_items, embed_concurrency)
1139    };
1140    let mut governor = governor_config.map(PipelineGovernor::new);
1141    let initial_adjustment = governor
1142        .as_ref()
1143        .map(PipelineGovernor::initial_adjustment)
1144        .unwrap_or_else(|| PipelineGovernorAdjustment {
1145            settings: fixed_settings,
1146            mode: "fixed".to_string(),
1147            reason: "operator-configured limits".to_string(),
1148        });
1149    observer
1150        .emit(PipelineEvent::GovernorAdjusted {
1151            batch_items_limit: initial_adjustment.settings.max_batch_items,
1152            batch_chars_limit: initial_adjustment.settings.max_batch_chars,
1153            concurrency_limit: initial_adjustment.settings.concurrency,
1154            mode: initial_adjustment.mode,
1155            reason: initial_adjustment.reason,
1156        })
1157        .await;
1158
1159    let result_capacity = embed_concurrency.max(1).saturating_mul(2);
1160    let (result_tx, mut result_rx) = mpsc::channel::<EmbedWorkerResult>(result_capacity.max(2));
1161    let mut input_closed = false;
1162    let mut in_flight = 0usize;
1163
1164    loop {
1165        if input_closed && in_flight == 0 {
1166            break;
1167        }
1168
1169        let settings = governor
1170            .as_ref()
1171            .map(PipelineGovernor::current_settings)
1172            .unwrap_or(fixed_settings);
1173
1174        if !input_closed && in_flight < settings.concurrency {
1175            tokio::select! {
1176                maybe_batch = rx.recv() => {
1177                    match maybe_batch {
1178                        Some(chunk_batch) => {
1179                            if chunk_batch.chunks.is_empty() {
1180                                continue;
1181                            }
1182
1183                            let batch_chars: usize = chunk_batch
1184                                .chunks
1185                                .iter()
1186                                .map(|chunk| chunk.content.chars().count())
1187                                .sum();
1188                            observer
1189                                .emit(PipelineEvent::EmbedStarted {
1190                                    path: chunk_batch.path.clone(),
1191                                    content_hash: chunk_batch.content_hash.clone(),
1192                                    count: chunk_batch.chunks.len(),
1193                                    chars: batch_chars,
1194                                    batch_items_limit: settings.max_batch_items,
1195                                    batch_chars_limit: settings.max_batch_chars,
1196                                    concurrency_limit: settings.concurrency,
1197                                })
1198                                .await;
1199
1200                            in_flight += 1;
1201                            let worker_tx = result_tx.clone();
1202                            let worker_client = base_client
1203                                .clone_with_batch_limits(settings.max_batch_chars, settings.max_batch_items);
1204                            tokio::spawn(async move {
1205                                let result = embed_chunk_batch(worker_client, chunk_batch).await;
1206                                let _ = worker_tx.send(result).await;
1207                            });
1208                        }
1209                        None => input_closed = true,
1210                    }
1211                }
1212                Some(result) = result_rx.recv(), if in_flight > 0 => {
1213                    in_flight = in_flight.saturating_sub(1);
1214                    if !handle_embed_result(result, &tx, &observer, governor.as_mut()).await {
1215                        break;
1216                    }
1217                }
1218            }
1219        } else if let Some(result) = result_rx.recv().await {
1220            in_flight = in_flight.saturating_sub(1);
1221            if !handle_embed_result(result, &tx, &observer, governor.as_mut()).await {
1222                break;
1223            }
1224        } else {
1225            break;
1226        }
1227    }
1228
1229    info!("Embedder stage complete");
1230}
1231
1232async fn embed_chunk_batch(
1233    mut client: EmbeddingClient,
1234    chunk_batch: ChunkBatch,
1235) -> EmbedWorkerResult {
1236    let path = chunk_batch.path;
1237    let content_hash = chunk_batch.content_hash;
1238    let chars: usize = chunk_batch
1239        .chunks
1240        .iter()
1241        .map(|chunk| chunk.content.chars().count())
1242        .sum();
1243    let texts: Vec<String> = chunk_batch
1244        .chunks
1245        .iter()
1246        .map(|chunk| chunk.content.clone())
1247        .collect();
1248
1249    let start = Instant::now();
1250    match client.embed_batch(&texts).await {
1251        Ok(embeddings) => {
1252            let count = embeddings.len();
1253            let chunks = chunk_batch
1254                .chunks
1255                .into_iter()
1256                .zip(embeddings)
1257                .map(|(chunk, embedding)| EmbeddedChunk { chunk, embedding })
1258                .collect();
1259
1260            EmbedWorkerResult {
1261                path,
1262                content_hash,
1263                count,
1264                chars,
1265                elapsed: start.elapsed(),
1266                chunks: Some(chunks),
1267                error: None,
1268            }
1269        }
1270        Err(err) => EmbedWorkerResult {
1271            path,
1272            content_hash,
1273            count: chunk_batch.chunks.len(),
1274            chars,
1275            elapsed: start.elapsed(),
1276            chunks: None,
1277            error: Some(err.to_string()),
1278        },
1279    }
1280}
1281
1282async fn handle_embed_result(
1283    result: EmbedWorkerResult,
1284    tx: &mpsc::Sender<EmbeddedFile>,
1285    observer: &PipelineObserver,
1286    governor: Option<&mut PipelineGovernor>,
1287) -> bool {
1288    if let Some(error_message) = result.error {
1289        error!(
1290            "Embedding batch failed for {:?}: {}",
1291            result.path, error_message
1292        );
1293        observer
1294            .emit(PipelineEvent::Error {
1295                path: Some(result.path.clone()),
1296                stage: "embedder",
1297                message: error_message.clone(),
1298            })
1299            .await;
1300
1301        if let Some(governor) = governor {
1302            let snapshot = observer.snapshot().await;
1303            if let Some(adjustment) = governor.on_error(&snapshot, &error_message) {
1304                observer
1305                    .emit(PipelineEvent::GovernorAdjusted {
1306                        batch_items_limit: adjustment.settings.max_batch_items,
1307                        batch_chars_limit: adjustment.settings.max_batch_chars,
1308                        concurrency_limit: adjustment.settings.concurrency,
1309                        mode: adjustment.mode,
1310                        reason: adjustment.reason,
1311                    })
1312                    .await;
1313            }
1314        }
1315        return true;
1316    }
1317
1318    let Some(chunks) = result.chunks else {
1319        return true;
1320    };
1321
1322    if tx
1323        .send(EmbeddedFile {
1324            path: result.path.clone(),
1325            content_hash: result.content_hash.clone(),
1326            chunks,
1327        })
1328        .await
1329        .is_err()
1330    {
1331        debug!("Embedder: channel closed, stopping");
1332        observer
1333            .emit(PipelineEvent::Error {
1334                path: Some(result.path),
1335                stage: "embedder",
1336                message: "storage channel closed".to_string(),
1337            })
1338            .await;
1339        return false;
1340    }
1341
1342    observer
1343        .emit(PipelineEvent::ChunksEmbedded {
1344            path: result.path.clone(),
1345            content_hash: result.content_hash.clone(),
1346            count: result.count,
1347            chars: result.chars,
1348            elapsed: result.elapsed,
1349        })
1350        .await;
1351
1352    if let Some(governor) = governor {
1353        let snapshot = observer.snapshot().await;
1354        if let Some(adjustment) = governor.on_success(result.elapsed, &snapshot) {
1355            observer
1356                .emit(PipelineEvent::GovernorAdjusted {
1357                    batch_items_limit: adjustment.settings.max_batch_items,
1358                    batch_chars_limit: adjustment.settings.max_batch_chars,
1359                    concurrency_limit: adjustment.settings.concurrency,
1360                    mode: adjustment.mode,
1361                    reason: adjustment.reason,
1362                })
1363                .await;
1364        }
1365    }
1366
1367    true
1368}
1369
1370// =============================================================================
1371// STAGE 4: STORAGE
1372// =============================================================================
1373
1374/// Stage 4: Store embedded chunks to the database.
1375async fn stage_store_chunks(
1376    mut rx: mpsc::Receiver<EmbeddedFile>,
1377    storage: Arc<StorageManager>,
1378    observer: PipelineObserver,
1379) {
1380    while let Some(mut embedded_file) = rx.recv().await {
1381        let path = embedded_file.path.clone();
1382        let content_hash = embedded_file.content_hash.clone();
1383        let mut stored_for_file = 0usize;
1384        let mut storage_failed = false;
1385        let mut stored_doc_refs: Vec<(String, String)> = Vec::new();
1386
1387        while !embedded_file.chunks.is_empty() {
1388            let take = embedded_file.chunks.len().min(STORAGE_BATCH_SIZE);
1389            let batch: Vec<EmbeddedChunk> = embedded_file.chunks.drain(..take).collect();
1390            let batch_refs: Vec<(String, String)> = batch
1391                .iter()
1392                .map(|embedded| (embedded.chunk.namespace.clone(), embedded.chunk.id.clone()))
1393                .collect();
1394
1395            match store_batch(&storage, batch).await {
1396                Ok(count) => {
1397                    stored_for_file += count;
1398                    stored_doc_refs.extend(batch_refs);
1399                }
1400                Err(err) => {
1401                    let (rolled_back, rollback_failures) =
1402                        rollback_stored_file_chunks(&storage, &stored_doc_refs).await;
1403                    let rollback_suffix = if rollback_failures == 0 {
1404                        format!(
1405                            "rolled back {} previously stored chunks for file",
1406                            rolled_back
1407                        )
1408                    } else {
1409                        format!(
1410                            "rolled back {} previously stored chunks for file, {} rollback deletes failed",
1411                            rolled_back, rollback_failures
1412                        )
1413                    };
1414                    error!("Storage batch failed for {:?}: {}", path, err);
1415                    observer
1416                        .emit(PipelineEvent::Error {
1417                            path: Some(path.clone()),
1418                            stage: "storage",
1419                            message: format!("{err}; {rollback_suffix}"),
1420                        })
1421                        .await;
1422                    storage_failed = true;
1423                    break;
1424                }
1425            }
1426        }
1427
1428        if !storage_failed {
1429            observer
1430                .emit(PipelineEvent::FileCommitted {
1431                    path,
1432                    content_hash,
1433                    chunk_count: stored_for_file,
1434                })
1435                .await;
1436        }
1437    }
1438
1439    info!("Storage stage complete");
1440}
1441
1442async fn rollback_stored_file_chunks(
1443    storage: &StorageManager,
1444    stored_doc_refs: &[(String, String)],
1445) -> (usize, usize) {
1446    let mut deleted = 0usize;
1447    let mut failures = 0usize;
1448
1449    for (namespace, id) in stored_doc_refs.iter().rev() {
1450        match storage.delete_document(namespace, id).await {
1451            Ok(count) => deleted += count,
1452            Err(err) => {
1453                failures += 1;
1454                warn!(
1455                    "Failed to roll back partially stored chunk {}/{}: {}",
1456                    namespace, id, err
1457                );
1458            }
1459        }
1460    }
1461
1462    (deleted, failures)
1463}
1464
1465/// Store a batch of embedded chunks.
1466///
1467/// Writes BOTH per-chunk `content_hash` (for chunk-level dedup, distinguishes
1468/// outer/middle/inner/core slices of one source) AND `source_hash` (same value
1469/// across all four onion layers, for pre-index source dedup).
1470async fn store_batch(storage: &StorageManager, batch: Vec<EmbeddedChunk>) -> Result<usize> {
1471    let count = batch.len();
1472
1473    let documents: Vec<ChromaDocument> = batch
1474        .into_iter()
1475        .map(|embedded| {
1476            let source_hash = Some(embedded.chunk.source_hash);
1477            if embedded.chunk.layer > 0 {
1478                ChromaDocument {
1479                    id: embedded.chunk.id,
1480                    namespace: embedded.chunk.namespace,
1481                    embedding: embedded.embedding,
1482                    metadata: embedded.chunk.metadata,
1483                    document: embedded.chunk.content,
1484                    layer: embedded.chunk.layer,
1485                    parent_id: embedded.chunk.parent_id,
1486                    children_ids: embedded.chunk.children_ids,
1487                    keywords: embedded.chunk.keywords,
1488                    content_hash: Some(embedded.chunk.chunk_hash),
1489                    source_hash,
1490                }
1491            } else {
1492                ChromaDocument::new_flat_with_hashes(
1493                    embedded.chunk.id,
1494                    embedded.chunk.namespace,
1495                    embedded.embedding,
1496                    embedded.chunk.metadata,
1497                    embedded.chunk.content,
1498                    embedded.chunk.chunk_hash,
1499                    source_hash,
1500                )
1501            }
1502        })
1503        .collect();
1504
1505    storage.add_to_store(documents).await?;
1506    debug!("Stored batch of {} chunks", count);
1507    Ok(count)
1508}
1509
1510// =============================================================================
1511// PIPELINE COORDINATOR
1512// =============================================================================
1513
1514/// Run the async pipeline for document indexing.
1515///
1516/// Spawns all stages concurrently and waits for completion.
1517pub async fn run_pipeline(
1518    files: Vec<PathBuf>,
1519    namespace: String,
1520    storage: Arc<StorageManager>,
1521    client: Arc<Mutex<EmbeddingClient>>,
1522    config: PipelineConfig,
1523) -> Result<PipelineResult> {
1524    let total_files = files.len();
1525    let discovered_files = config
1526        .discovered_files
1527        .max(total_files.saturating_add(config.resumed_files))
1528        .max(total_files);
1529    info!(
1530        "Starting pipeline: {} scheduled files ({} discovered, {} resumed), mode: {:?}",
1531        total_files, discovered_files, config.resumed_files, config.slice_mode
1532    );
1533
1534    let base_client = {
1535        let guard = client.lock().await;
1536        guard.clone()
1537    };
1538    let initial_runtime = config
1539        .governor
1540        .as_ref()
1541        .map(PipelineGovernorConfig::initial_settings)
1542        .unwrap_or_else(|| {
1543            let (max_batch_chars, max_batch_items) = base_client.batch_limits();
1544            EmbedRuntimeSettings::new(max_batch_chars, max_batch_items, config.embed_concurrency)
1545        });
1546    let (governor_mode, governor_reason) = if config.governor.is_some() {
1547        (
1548            "adaptive".to_string(),
1549            "warming up from conservative limits".to_string(),
1550        )
1551    } else {
1552        (
1553            "fixed".to_string(),
1554            "operator-configured limits".to_string(),
1555        )
1556    };
1557    let observer = PipelineObserver::new(
1558        total_files,
1559        discovered_files,
1560        config.resumed_files,
1561        config.event_sender.clone(),
1562        initial_runtime,
1563        governor_mode,
1564        governor_reason,
1565    );
1566    observer.emit_initial_snapshot().await;
1567
1568    let (tx1, rx1) = mpsc::channel::<FileContent>(config.reader_buffer);
1569    let (tx2, rx2) = mpsc::channel::<ChunkBatch>(config.chunker_buffer);
1570    let (tx3, rx3) = mpsc::channel::<EmbeddedFile>(config.embedder_buffer);
1571
1572    let storage_for_reader = storage.clone();
1573    let storage_for_storage = storage;
1574    let ns_for_reader = namespace.clone();
1575    let slice_mode = config.slice_mode;
1576    let chunker = config.chunker;
1577    let outer_synthesis = config.outer_synthesis.clone();
1578    let dedup_enabled = config.dedup_enabled;
1579    let preprocess_config = config.preprocess_config.clone();
1580
1581    let reader_handle = tokio::spawn(stage_read_files(
1582        files,
1583        ns_for_reader,
1584        storage_for_reader,
1585        dedup_enabled,
1586        preprocess_config,
1587        tx1,
1588        observer.clone(),
1589    ));
1590
1591    let chunker_handle = tokio::spawn(stage_chunk_content(
1592        rx1,
1593        tx2,
1594        slice_mode,
1595        chunker,
1596        outer_synthesis,
1597        observer.clone(),
1598    ));
1599    let embedder_handle = tokio::spawn(stage_embed_chunks(
1600        rx2,
1601        tx3,
1602        base_client,
1603        config.embed_concurrency.max(1),
1604        config.governor.clone(),
1605        observer.clone(),
1606    ));
1607    let storage_handle = tokio::spawn(stage_store_chunks(
1608        rx3,
1609        storage_for_storage,
1610        observer.clone(),
1611    ));
1612
1613    let (_reader_result, _chunker_result, _embedder_result, _storage_result) = tokio::try_join!(
1614        reader_handle,
1615        chunker_handle,
1616        embedder_handle,
1617        storage_handle
1618    )?;
1619
1620    let result = observer.result().await;
1621
1622    info!(
1623        "Pipeline complete: {} files -> {} chunks -> {} stored",
1624        result.stats.files_committed, result.stats.chunks_created, result.stats.chunks_stored
1625    );
1626
1627    Ok(result)
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632    use super::*;
1633    use std::path::PathBuf;
1634    use tempfile::TempDir;
1635
1636    #[test]
1637    fn test_split_into_chunks_short_text() {
1638        let text = "Hello world";
1639        let chunks = crate::rag::provider::split_into_chunks(text, 100, 20);
1640        assert_eq!(chunks.len(), 1);
1641        assert_eq!(chunks[0], "Hello world");
1642    }
1643
1644    #[test]
1645    fn test_split_into_chunks_with_overlap() {
1646        let text = "abcdefghijklmnopqrstuvwxyz";
1647        let chunks = crate::rag::provider::split_into_chunks(text, 10, 3);
1648        assert!(chunks.len() > 1);
1649        assert_eq!(chunks[0].len(), 10);
1650        assert!(chunks[0].ends_with(&chunks[1][..3]));
1651    }
1652
1653    #[test]
1654    fn test_pipeline_config_default() {
1655        let config = PipelineConfig::default();
1656        assert_eq!(config.reader_buffer, CHANNEL_BUFFER_SIZE);
1657        assert_eq!(config.slice_mode, SliceMode::default());
1658        assert!(matches!(
1659            config.outer_synthesis,
1660            crate::rag::OuterSynthesis::Keyword
1661        ));
1662        assert!(config.dedup_enabled);
1663        assert!(config.preprocess_config.is_none());
1664        assert_eq!(config.embed_concurrency, 1);
1665        assert!(config.governor.is_none());
1666        assert!(config.event_sender.is_none());
1667    }
1668
1669    #[tokio::test]
1670    async fn pipeline_reader_applies_preprocess_after_raw_source_hashing() {
1671        let tmp = TempDir::new().expect("temp dir");
1672        let source = tmp.path().join("conversation.md");
1673        let raw = "Operator decision: ship flat memory.\n\nsession_id: 123e4567-e89b-12d3-a456-426614174000\n";
1674        std::fs::write(&source, raw).expect("write source");
1675
1676        let storage = Arc::new(
1677            StorageManager::new_lance_only(tmp.path().join("lancedb").to_str().unwrap())
1678                .await
1679                .expect("storage"),
1680        );
1681        storage.ensure_collection().await.expect("collection");
1682        let observer = PipelineObserver::new(
1683            1,
1684            1,
1685            0,
1686            None,
1687            EmbedRuntimeSettings::new(8_192, 16, 1),
1688            "fixed".to_string(),
1689            "operator-configured limits".to_string(),
1690        );
1691        let (tx, mut rx) = mpsc::channel(1);
1692
1693        stage_read_files(
1694            vec![source],
1695            "kb:test".to_string(),
1696            storage,
1697            false,
1698            Some(PreprocessingConfig {
1699                remove_metadata: true,
1700                min_content_length: 1,
1701                ..Default::default()
1702            }),
1703            tx,
1704            observer,
1705        )
1706        .await;
1707
1708        let content = rx.recv().await.expect("preprocessed file content");
1709        assert_eq!(content.content_hash, crate::rag::compute_content_hash(raw));
1710        assert!(content.text.contains("Operator decision"));
1711        assert!(!content.text.contains("123e4567"));
1712    }
1713
1714    #[test]
1715    fn pipeline_auto_chunker_honors_explicit_flat_slice_mode() {
1716        let selected = select_pipeline_chunker(
1717            None,
1718            SliceMode::Flat,
1719            Path::new("/Users/silver/memex-index-staging/miksa-clean/conversation.md"),
1720            "kb:mikserka",
1721        );
1722
1723        assert_eq!(selected, ChunkerKind::Flat);
1724    }
1725
1726    #[test]
1727    fn pipeline_auto_chunker_keeps_transcript_routing_for_onion_modes() {
1728        let selected = select_pipeline_chunker(
1729            None,
1730            SliceMode::Onion,
1731            Path::new("/Users/polyversai/.aicx/store/Loctree/session.md"),
1732            "aicx",
1733        );
1734
1735        assert_eq!(selected, ChunkerKind::Aicx);
1736    }
1737
1738    #[tokio::test]
1739    async fn test_pipeline_observer_tracks_snapshot_and_failures() {
1740        let observer = PipelineObserver::new(
1741            3,
1742            3,
1743            0,
1744            None,
1745            EmbedRuntimeSettings::new(8_192, 16, 2),
1746            "fixed".to_string(),
1747            "operator-configured limits".to_string(),
1748        );
1749        observer.emit_initial_snapshot().await;
1750
1751        let path_a = PathBuf::from("a.md");
1752        let path_b = PathBuf::from("b.md");
1753
1754        observer
1755            .emit(PipelineEvent::FileRead {
1756                path: path_a.clone(),
1757                content_hash: "hash-a".to_string(),
1758                bytes: 10,
1759            })
1760            .await;
1761        observer
1762            .emit(PipelineEvent::ChunksCreated {
1763                path: path_a.clone(),
1764                content_hash: "hash-a".to_string(),
1765                count: 4,
1766            })
1767            .await;
1768        observer
1769            .emit(PipelineEvent::EmbedStarted {
1770                path: path_a.clone(),
1771                content_hash: "hash-a".to_string(),
1772                count: 4,
1773                chars: 1200,
1774                batch_items_limit: 16,
1775                batch_chars_limit: 8_192,
1776                concurrency_limit: 2,
1777            })
1778            .await;
1779        observer
1780            .emit(PipelineEvent::ChunksEmbedded {
1781                path: path_a.clone(),
1782                content_hash: "hash-a".to_string(),
1783                count: 4,
1784                chars: 1200,
1785                elapsed: Duration::from_millis(400),
1786            })
1787            .await;
1788        observer
1789            .emit(PipelineEvent::FileCommitted {
1790                path: path_a,
1791                content_hash: "hash-a".to_string(),
1792                chunk_count: 4,
1793            })
1794            .await;
1795        observer
1796            .emit(PipelineEvent::FileSkipped {
1797                path: path_b.clone(),
1798                content_hash: "hash-b".to_string(),
1799                reason: "duplicate".to_string(),
1800            })
1801            .await;
1802        observer
1803            .emit(PipelineEvent::Error {
1804                path: Some(path_b),
1805                stage: "embedder",
1806                message: "boom".to_string(),
1807            })
1808            .await;
1809
1810        let result = observer.result().await;
1811        assert_eq!(result.stats.total_files, 3);
1812        assert_eq!(result.stats.discovered_files, 3);
1813        assert_eq!(result.stats.resumed_files, 0);
1814        assert_eq!(result.stats.files_read, 1);
1815        assert_eq!(result.stats.files_skipped, 1);
1816        assert_eq!(result.stats.files_committed, 1);
1817        assert_eq!(result.stats.files_failed, 1);
1818        assert_eq!(result.stats.chunks_created, 4);
1819        assert_eq!(result.stats.chunks_embedded, 4);
1820        assert_eq!(result.stats.chunks_stored, 4);
1821        assert_eq!(result.stats.errors, 1);
1822        assert_eq!(result.errors.len(), 1);
1823
1824        let snapshot = observer.snapshot().await;
1825        assert_eq!(snapshot.embed_concurrency_limit, 2);
1826        assert_eq!(snapshot.embed_batch_items_limit, 16);
1827        assert_eq!(snapshot.embed_batch_chars_limit, 8_192);
1828        assert!(snapshot.avg_embed_batch_ms.is_some());
1829        assert_eq!(snapshot.governor_mode, "fixed");
1830    }
1831
1832    #[test]
1833    fn test_pipeline_governor_scales_up_only_when_backlog_stays_fast() {
1834        let config = PipelineGovernorConfig::adaptive(64_000, 32, 4);
1835        let initial_items = config.min_batch_items;
1836        let initial_chars = config.min_batch_chars;
1837        let mut governor = PipelineGovernor::new(config);
1838
1839        governor.last_growth_at = Some(Instant::now() - governor.config.growth_cooldown);
1840        let snapshot = PipelineSnapshot {
1841            reader_queue_depth: 1,
1842            chunker_queue_depth: 4,
1843            storage_queue_depth: 0,
1844            ..Default::default()
1845        };
1846
1847        let first = governor
1848            .on_success(Duration::from_millis(320), &snapshot)
1849            .expect("first growth adjustment");
1850        assert!(first.settings.max_batch_items > initial_items);
1851
1852        governor.last_growth_at = Some(Instant::now() - governor.config.growth_cooldown);
1853        governor.current.max_batch_items = governor.config.max_batch_items;
1854        let second = governor
1855            .on_success(Duration::from_millis(320), &snapshot)
1856            .expect("second growth adjustment");
1857        assert!(second.settings.max_batch_chars > initial_chars);
1858    }
1859
1860    #[test]
1861    fn test_pipeline_governor_throttles_quickly_on_pressure() {
1862        let config = PipelineGovernorConfig::adaptive(96_000, 48, 3);
1863        let mut governor = PipelineGovernor::new(config.clone());
1864        governor.current =
1865            EmbedRuntimeSettings::new(config.max_batch_chars, config.max_batch_items, 3);
1866
1867        let snapshot = PipelineSnapshot {
1868            reader_queue_depth: 2,
1869            chunker_queue_depth: 5,
1870            storage_queue_depth: 4,
1871            ..Default::default()
1872        };
1873
1874        let adjustment = governor
1875            .on_success(
1876                config.pressure_latency + Duration::from_millis(10),
1877                &snapshot,
1878            )
1879            .expect("pressure adjustment");
1880        assert!(adjustment.settings.max_batch_items < config.max_batch_items);
1881        assert!(adjustment.settings.max_batch_chars < config.max_batch_chars);
1882        assert!(adjustment.settings.concurrency < 3);
1883        assert!(adjustment.reason.contains("pressure"));
1884    }
1885
1886    #[test]
1887    fn test_snapshot_to_stats_carries_runtime_truth() {
1888        let snapshot = PipelineSnapshot {
1889            total_files: 5,
1890            discovered_files: 7,
1891            resumed_files: 2,
1892            files_read: 4,
1893            files_skipped: 1,
1894            files_committed: 2,
1895            files_failed: 1,
1896            chunks_created: 12,
1897            chunks_embedded: 10,
1898            chunks_stored: 8,
1899            errors: 3,
1900            ..Default::default()
1901        };
1902
1903        let stats = snapshot.to_stats();
1904        assert_eq!(stats.total_files, 5);
1905        assert_eq!(stats.discovered_files, 7);
1906        assert_eq!(stats.resumed_files, 2);
1907        assert_eq!(stats.files_read, 4);
1908        assert_eq!(stats.files_skipped, 1);
1909        assert_eq!(stats.files_committed, 2);
1910        assert_eq!(stats.files_failed, 1);
1911        assert_eq!(stats.chunks_created, 12);
1912        assert_eq!(stats.chunks_embedded, 10);
1913        assert_eq!(stats.chunks_stored, 8);
1914        assert_eq!(stats.errors, 3);
1915    }
1916
1917    #[tokio::test]
1918    async fn test_rollback_stored_file_chunks_removes_partial_file_writes() {
1919        let tmp = TempDir::new().expect("temp dir");
1920        let db_path = tmp.path().join("lancedb");
1921        let storage = StorageManager::new_lance_only(db_path.to_str().unwrap())
1922            .await
1923            .expect("storage");
1924        storage.ensure_collection().await.expect("collection");
1925
1926        let namespace = "rollback-ns".to_string();
1927        let doc_a = ChromaDocument::new_flat_with_hash(
1928            "chunk-a".to_string(),
1929            namespace.clone(),
1930            vec![0.1_f32; 8],
1931            serde_json::json!({"path": "doc-a.md"}),
1932            "alpha".to_string(),
1933            "file-hash".to_string(),
1934        );
1935        let doc_b = ChromaDocument::new_flat_with_hash(
1936            "chunk-b".to_string(),
1937            namespace.clone(),
1938            vec![0.2_f32; 8],
1939            serde_json::json!({"path": "doc-a.md"}),
1940            "beta".to_string(),
1941            "file-hash".to_string(),
1942        );
1943
1944        storage
1945            .add_to_store(vec![doc_a, doc_b])
1946            .await
1947            .expect("seed partial writes");
1948
1949        let (deleted, failures) = rollback_stored_file_chunks(
1950            &storage,
1951            &[
1952                (namespace.clone(), "chunk-a".to_string()),
1953                (namespace.clone(), "chunk-b".to_string()),
1954            ],
1955        )
1956        .await;
1957
1958        assert_eq!(deleted, 2);
1959        assert_eq!(failures, 0);
1960        assert!(
1961            storage
1962                .get_document(&namespace, "chunk-a")
1963                .await
1964                .expect("lookup chunk-a")
1965                .is_none()
1966        );
1967        assert!(
1968            storage
1969                .get_document(&namespace, "chunk-b")
1970                .await
1971                .expect("lookup chunk-b")
1972                .is_none()
1973        );
1974    }
1975}