1use anyhow::Result;
22use std::collections::HashSet;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use std::time::{Duration, Instant};
26use tokio::sync::{Mutex, mpsc};
27use tracing::{debug, error, info, warn};
28
29use crate::embeddings::EmbeddingClient;
30use crate::preprocessing::{PreprocessingConfig, Preprocessor};
31use crate::rag::{ChunkOpts, ChunkerKind, OuterSynthesis, SliceMode, detect_default_chunker};
32use crate::storage::{ChromaDocument, StorageManager};
33
34const CHANNEL_BUFFER_SIZE: usize = 100;
37
38const STORAGE_BATCH_SIZE: usize = 100;
40
41#[derive(Debug, Clone)]
43pub struct FileContent {
44 pub path: PathBuf,
46 pub text: String,
48 pub namespace: String,
50 pub content_hash: String,
52}
53
54#[derive(Debug, Clone)]
56pub struct Chunk {
57 pub id: String,
59 pub content: String,
61 pub source_path: PathBuf,
63 pub namespace: String,
65 pub chunk_hash: String,
68 pub source_hash: String,
72 pub layer: u8,
74 pub parent_id: Option<String>,
76 pub children_ids: Vec<String>,
78 pub keywords: Vec<String>,
80 pub metadata: serde_json::Value,
82}
83
84#[derive(Debug, Clone)]
86pub struct EmbeddedChunk {
87 pub chunk: Chunk,
89 pub embedding: Vec<f32>,
91}
92
93#[derive(Debug, Clone)]
94struct ChunkBatch {
95 path: PathBuf,
96 content_hash: String,
97 chunks: Vec<Chunk>,
98}
99
100#[derive(Debug, Clone)]
101struct EmbeddedFile {
102 path: PathBuf,
103 content_hash: String,
104 chunks: Vec<EmbeddedChunk>,
105}
106
107#[derive(Debug, Clone, Copy)]
108struct EmbedRuntimeSettings {
109 max_batch_chars: usize,
110 max_batch_items: usize,
111 concurrency: usize,
112}
113
114impl EmbedRuntimeSettings {
115 fn new(max_batch_chars: usize, max_batch_items: usize, concurrency: usize) -> Self {
116 Self {
117 max_batch_chars: max_batch_chars.max(1),
118 max_batch_items: max_batch_items.max(1),
119 concurrency: concurrency.max(1),
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct PipelineGovernorConfig {
127 pub min_batch_chars: usize,
128 pub max_batch_chars: usize,
129 pub min_batch_items: usize,
130 pub max_batch_items: usize,
131 pub min_concurrency: usize,
132 pub max_concurrency: usize,
133 pub target_latency: Duration,
134 pub pressure_latency: Duration,
135 pub growth_cooldown: Duration,
136 pub pressure_cooldown: Duration,
137 pub backlog_low_watermark: usize,
138 pub storage_backlog_high_watermark: usize,
139}
140
141impl PipelineGovernorConfig {
142 pub fn adaptive(
143 max_batch_chars: usize,
144 max_batch_items: usize,
145 max_concurrency: usize,
146 ) -> Self {
147 let max_batch_chars = max_batch_chars.max(1);
148 let max_batch_items = max_batch_items.max(1);
149 let max_concurrency = max_concurrency.max(1);
150
151 Self {
152 min_batch_chars: (max_batch_chars / 4).max(4_096).min(max_batch_chars),
153 max_batch_chars,
154 min_batch_items: (max_batch_items / 4).max(1).min(max_batch_items),
155 max_batch_items,
156 min_concurrency: 1,
157 max_concurrency,
158 target_latency: Duration::from_millis(900),
159 pressure_latency: Duration::from_millis(2_200),
160 growth_cooldown: Duration::from_secs(3),
161 pressure_cooldown: Duration::from_millis(750),
162 backlog_low_watermark: 2,
163 storage_backlog_high_watermark: 3,
164 }
165 }
166
167 fn initial_settings(&self) -> EmbedRuntimeSettings {
168 EmbedRuntimeSettings::new(
169 self.min_batch_chars,
170 self.min_batch_items,
171 self.min_concurrency,
172 )
173 }
174}
175
176#[derive(Debug, Clone)]
177struct PipelineGovernorAdjustment {
178 settings: EmbedRuntimeSettings,
179 mode: String,
180 reason: String,
181}
182
183#[derive(Debug, Clone)]
184struct PipelineGovernor {
185 config: PipelineGovernorConfig,
186 current: EmbedRuntimeSettings,
187 last_growth_at: Option<Instant>,
188 last_pressure_at: Option<Instant>,
189}
190
191impl PipelineGovernor {
192 fn new(config: PipelineGovernorConfig) -> Self {
193 let current = config.initial_settings();
194 Self {
195 config,
196 current,
197 last_growth_at: None,
198 last_pressure_at: None,
199 }
200 }
201
202 fn current_settings(&self) -> EmbedRuntimeSettings {
203 self.current
204 }
205
206 fn initial_adjustment(&self) -> PipelineGovernorAdjustment {
207 PipelineGovernorAdjustment {
208 settings: self.current,
209 mode: "adaptive".to_string(),
210 reason: "warming up from conservative limits".to_string(),
211 }
212 }
213
214 fn on_success(
215 &mut self,
216 elapsed: Duration,
217 snapshot: &PipelineSnapshot,
218 ) -> Option<PipelineGovernorAdjustment> {
219 let backlog = snapshot.chunker_queue_depth + snapshot.reader_queue_depth;
220 let storage_pressure =
221 snapshot.storage_queue_depth >= self.config.storage_backlog_high_watermark;
222 let slow_batch = elapsed >= self.config.pressure_latency;
223
224 if (storage_pressure || slow_batch) && self.pressure_ready() {
225 let mut changed = false;
226 let mut reasons = Vec::new();
227
228 if self.current.max_batch_items > self.config.min_batch_items {
229 let next_items =
230 ((self.current.max_batch_items * 2) / 3).max(self.config.min_batch_items);
231 if next_items != self.current.max_batch_items {
232 self.current.max_batch_items = next_items;
233 changed = true;
234 reasons.push(format!("items {}", next_items));
235 }
236 }
237
238 if self.current.max_batch_chars > self.config.min_batch_chars {
239 let next_chars =
240 ((self.current.max_batch_chars * 2) / 3).max(self.config.min_batch_chars);
241 if next_chars != self.current.max_batch_chars {
242 self.current.max_batch_chars = next_chars;
243 changed = true;
244 reasons.push(format!("chars {}", next_chars));
245 }
246 }
247
248 if storage_pressure && self.current.concurrency > self.config.min_concurrency {
249 self.current.concurrency -= 1;
250 changed = true;
251 reasons.push(format!("concurrency {}", self.current.concurrency));
252 }
253
254 if changed {
255 self.last_pressure_at = Some(Instant::now());
256 let reason = if storage_pressure {
257 format!(
258 "pressure: storage backlog {} -> {}",
259 snapshot.storage_queue_depth,
260 reasons.join(", ")
261 )
262 } else {
263 format!(
264 "pressure: embed {:.0}ms -> {}",
265 elapsed.as_secs_f64() * 1_000.0,
266 reasons.join(", ")
267 )
268 };
269 return Some(PipelineGovernorAdjustment {
270 settings: self.current,
271 mode: "adaptive".to_string(),
272 reason,
273 });
274 }
275 }
276
277 if elapsed > self.config.target_latency
278 || backlog < self.config.backlog_low_watermark
279 || snapshot.storage_queue_depth > 1
280 || !self.growth_ready()
281 {
282 return None;
283 }
284
285 if self.current.max_batch_items < self.config.max_batch_items {
286 let step = (self.current.max_batch_items / 4).max(1);
287 let next_items = (self.current.max_batch_items + step).min(self.config.max_batch_items);
288 if next_items != self.current.max_batch_items {
289 self.current.max_batch_items = next_items;
290 self.last_growth_at = Some(Instant::now());
291 return Some(PipelineGovernorAdjustment {
292 settings: self.current,
293 mode: "adaptive".to_string(),
294 reason: format!(
295 "backlog {} with fast embed {:.0}ms -> items {}",
296 backlog,
297 elapsed.as_secs_f64() * 1_000.0,
298 next_items
299 ),
300 });
301 }
302 }
303
304 if self.current.max_batch_chars < self.config.max_batch_chars {
305 let step = (self.current.max_batch_chars / 4).max(4_096);
306 let next_chars = (self.current.max_batch_chars + step).min(self.config.max_batch_chars);
307 if next_chars != self.current.max_batch_chars {
308 self.current.max_batch_chars = next_chars;
309 self.last_growth_at = Some(Instant::now());
310 return Some(PipelineGovernorAdjustment {
311 settings: self.current,
312 mode: "adaptive".to_string(),
313 reason: format!(
314 "backlog {} with fast embed {:.0}ms -> chars {}",
315 backlog,
316 elapsed.as_secs_f64() * 1_000.0,
317 next_chars
318 ),
319 });
320 }
321 }
322
323 let concurrency_backlog = self
324 .current
325 .concurrency
326 .saturating_mul(2)
327 .max(self.config.backlog_low_watermark);
328 if backlog >= concurrency_backlog && self.current.concurrency < self.config.max_concurrency
329 {
330 self.current.concurrency += 1;
331 self.last_growth_at = Some(Instant::now());
332 return Some(PipelineGovernorAdjustment {
333 settings: self.current,
334 mode: "adaptive".to_string(),
335 reason: format!(
336 "backlog {} sustained with fast embed {:.0}ms -> concurrency {}",
337 backlog,
338 elapsed.as_secs_f64() * 1_000.0,
339 self.current.concurrency
340 ),
341 });
342 }
343
344 None
345 }
346
347 fn on_error(
348 &mut self,
349 snapshot: &PipelineSnapshot,
350 message: &str,
351 ) -> Option<PipelineGovernorAdjustment> {
352 if !self.pressure_ready() {
353 return None;
354 }
355
356 let mut changed = false;
357
358 if self.current.concurrency > self.config.min_concurrency {
359 self.current.concurrency = self.config.min_concurrency;
360 changed = true;
361 }
362 if self.current.max_batch_items > self.config.min_batch_items {
363 self.current.max_batch_items = self.config.min_batch_items;
364 changed = true;
365 }
366 if self.current.max_batch_chars > self.config.min_batch_chars {
367 self.current.max_batch_chars = self.config.min_batch_chars;
368 changed = true;
369 }
370
371 if !changed {
372 return None;
373 }
374
375 self.last_pressure_at = Some(Instant::now());
376 Some(PipelineGovernorAdjustment {
377 settings: self.current,
378 mode: "adaptive".to_string(),
379 reason: format!(
380 "error with backlog {} and storage {}: {}",
381 snapshot.chunker_queue_depth + snapshot.reader_queue_depth,
382 snapshot.storage_queue_depth,
383 message
384 ),
385 })
386 }
387
388 fn growth_ready(&self) -> bool {
389 self.last_growth_at
390 .map(|instant| instant.elapsed() >= self.config.growth_cooldown)
391 .unwrap_or(true)
392 }
393
394 fn pressure_ready(&self) -> bool {
395 self.last_pressure_at
396 .map(|instant| instant.elapsed() >= self.config.pressure_cooldown)
397 .unwrap_or(true)
398 }
399}
400
401#[derive(Debug)]
402struct EmbedWorkerResult {
403 path: PathBuf,
404 content_hash: String,
405 count: usize,
406 chars: usize,
407 elapsed: Duration,
408 chunks: Option<Vec<EmbeddedChunk>>,
409 error: Option<String>,
410}
411
412#[derive(Debug, Clone)]
414pub struct PipelineConfig {
415 pub reader_buffer: usize,
417 pub chunker_buffer: usize,
419 pub embedder_buffer: usize,
421 pub slice_mode: SliceMode,
423 pub chunker: Option<ChunkerKind>,
425 pub outer_synthesis: OuterSynthesis,
431 pub dedup_enabled: bool,
433 pub preprocess_config: Option<PreprocessingConfig>,
437 pub embed_concurrency: usize,
439 pub governor: Option<PipelineGovernorConfig>,
441 pub event_sender: Option<mpsc::UnboundedSender<PipelineEvent>>,
443 pub discovered_files: usize,
448 pub resumed_files: usize,
450}
451
452impl Default for PipelineConfig {
453 fn default() -> Self {
454 Self {
455 reader_buffer: CHANNEL_BUFFER_SIZE,
456 chunker_buffer: CHANNEL_BUFFER_SIZE,
457 embedder_buffer: CHANNEL_BUFFER_SIZE,
458 slice_mode: SliceMode::default(),
459 chunker: None,
460 outer_synthesis: OuterSynthesis::default(),
461 dedup_enabled: true,
462 preprocess_config: None,
463 embed_concurrency: 1,
464 governor: None,
465 event_sender: None,
466 discovered_files: 0,
467 resumed_files: 0,
468 }
469 }
470}
471
472#[derive(Debug, Default, Clone)]
474pub struct PipelineStats {
475 pub total_files: usize,
477 pub discovered_files: usize,
479 pub resumed_files: usize,
481 pub files_read: usize,
483 pub files_skipped: usize,
485 pub files_committed: usize,
487 pub files_failed: usize,
489 pub chunks_created: usize,
491 pub chunks_embedded: usize,
493 pub chunks_stored: usize,
495 pub errors: usize,
497}
498
499#[derive(Debug, Clone, Default)]
501pub struct PipelineSnapshot {
502 pub total_files: usize,
503 pub discovered_files: usize,
504 pub resumed_files: usize,
505 pub files_read: usize,
506 pub files_skipped: usize,
507 pub files_committed: usize,
508 pub files_failed: usize,
509 pub chunks_created: usize,
510 pub chunks_embedded: usize,
511 pub chunks_stored: usize,
512 pub errors: usize,
513 pub reader_queue_depth: usize,
514 pub chunker_queue_depth: usize,
515 pub storage_queue_depth: usize,
516 pub current_embed_batch_items: usize,
517 pub current_embed_batch_chars: usize,
518 pub embed_batch_items_limit: usize,
519 pub embed_batch_chars_limit: usize,
520 pub embed_active_requests: usize,
521 pub embed_concurrency_limit: usize,
522 pub avg_embed_batch_ms: Option<f64>,
523 pub files_per_sec: f64,
524 pub chunks_per_sec: f64,
525 pub eta: Option<Duration>,
526 pub elapsed: Duration,
527 pub bottleneck: String,
528 pub governor_mode: String,
529 pub governor_reason: String,
530}
531
532impl PipelineSnapshot {
533 pub fn to_stats(&self) -> PipelineStats {
534 PipelineStats {
535 total_files: self.total_files,
536 discovered_files: self.discovered_files,
537 resumed_files: self.resumed_files,
538 files_read: self.files_read,
539 files_skipped: self.files_skipped,
540 files_committed: self.files_committed,
541 files_failed: self.files_failed,
542 chunks_created: self.chunks_created,
543 chunks_embedded: self.chunks_embedded,
544 chunks_stored: self.chunks_stored,
545 errors: self.errors,
546 }
547 }
548}
549
550#[derive(Debug, Clone)]
552pub enum PipelineEvent {
553 FileRead {
554 path: PathBuf,
555 content_hash: String,
556 bytes: usize,
557 },
558 FileSkipped {
559 path: PathBuf,
560 content_hash: String,
561 reason: String,
562 },
563 ChunksCreated {
564 path: PathBuf,
565 content_hash: String,
566 count: usize,
567 },
568 EmbedStarted {
569 path: PathBuf,
570 content_hash: String,
571 count: usize,
572 chars: usize,
573 batch_items_limit: usize,
574 batch_chars_limit: usize,
575 concurrency_limit: usize,
576 },
577 ChunksEmbedded {
578 path: PathBuf,
579 content_hash: String,
580 count: usize,
581 chars: usize,
582 elapsed: Duration,
583 },
584 GovernorAdjusted {
585 batch_items_limit: usize,
586 batch_chars_limit: usize,
587 concurrency_limit: usize,
588 mode: String,
589 reason: String,
590 },
591 FileCommitted {
592 path: PathBuf,
593 content_hash: String,
594 chunk_count: usize,
595 },
596 Error {
597 path: Option<PathBuf>,
598 stage: &'static str,
599 message: String,
600 },
601 Snapshot(Box<PipelineSnapshot>),
602}
603
604#[derive(Debug)]
606pub struct PipelineResult {
607 pub stats: PipelineStats,
609 pub errors: Vec<String>,
611}
612
613#[derive(Debug)]
614struct PipelineProgressState {
615 snapshot: PipelineSnapshot,
616 started_at: Instant,
617 failed_paths: HashSet<String>,
618 error_messages: Vec<String>,
619}
620
621impl PipelineProgressState {
622 fn new(
623 total_files: usize,
624 discovered_files: usize,
625 resumed_files: usize,
626 initial_runtime: EmbedRuntimeSettings,
627 governor_mode: String,
628 governor_reason: String,
629 ) -> Self {
630 let snapshot = PipelineSnapshot {
631 total_files,
632 discovered_files,
633 resumed_files,
634 embed_batch_items_limit: initial_runtime.max_batch_items,
635 embed_batch_chars_limit: initial_runtime.max_batch_chars,
636 embed_concurrency_limit: initial_runtime.concurrency,
637 bottleneck: "reader".to_string(),
638 governor_mode,
639 governor_reason,
640 ..Default::default()
641 };
642
643 Self {
644 snapshot,
645 started_at: Instant::now(),
646 failed_paths: HashSet::new(),
647 error_messages: Vec::new(),
648 }
649 }
650
651 fn record_failure(&mut self, path: &Path) {
652 let key = path.to_string_lossy().to_string();
653 if self.failed_paths.insert(key) {
654 self.snapshot.files_failed += 1;
655 }
656 }
657
658 fn apply(&mut self, event: &PipelineEvent) -> PipelineSnapshot {
659 match event {
660 PipelineEvent::FileRead { .. } => {
661 self.snapshot.files_read += 1;
662 self.snapshot.reader_queue_depth += 1;
663 }
664 PipelineEvent::FileSkipped { .. } => {
665 self.snapshot.files_skipped += 1;
666 }
667 PipelineEvent::ChunksCreated { count, .. } => {
668 self.snapshot.chunks_created += count;
669 self.snapshot.reader_queue_depth =
670 self.snapshot.reader_queue_depth.saturating_sub(1);
671 self.snapshot.chunker_queue_depth += 1;
672 }
673 PipelineEvent::EmbedStarted {
674 batch_items_limit,
675 batch_chars_limit,
676 concurrency_limit,
677 ..
678 } => {
679 self.snapshot.embed_batch_items_limit = *batch_items_limit;
680 self.snapshot.embed_batch_chars_limit = *batch_chars_limit;
681 self.snapshot.embed_concurrency_limit = *concurrency_limit;
682 self.snapshot.embed_active_requests += 1;
683 }
684 PipelineEvent::ChunksEmbedded {
685 count,
686 chars,
687 elapsed,
688 ..
689 } => {
690 self.snapshot.chunks_embedded += count;
691 self.snapshot.chunker_queue_depth =
692 self.snapshot.chunker_queue_depth.saturating_sub(1);
693 self.snapshot.storage_queue_depth += 1;
694 self.snapshot.embed_active_requests =
695 self.snapshot.embed_active_requests.saturating_sub(1);
696 self.snapshot.current_embed_batch_items = *count;
697 self.snapshot.current_embed_batch_chars = *chars;
698 let latency_ms = elapsed.as_secs_f64() * 1_000.0;
699 self.snapshot.avg_embed_batch_ms = Some(
700 self.snapshot
701 .avg_embed_batch_ms
702 .map(|existing| (existing * 0.7) + (latency_ms * 0.3))
703 .unwrap_or(latency_ms),
704 );
705 }
706 PipelineEvent::GovernorAdjusted {
707 batch_items_limit,
708 batch_chars_limit,
709 concurrency_limit,
710 mode,
711 reason,
712 } => {
713 self.snapshot.embed_batch_items_limit = *batch_items_limit;
714 self.snapshot.embed_batch_chars_limit = *batch_chars_limit;
715 self.snapshot.embed_concurrency_limit = *concurrency_limit;
716 self.snapshot.governor_mode = mode.clone();
717 self.snapshot.governor_reason = reason.clone();
718 }
719 PipelineEvent::FileCommitted { chunk_count, .. } => {
720 self.snapshot.files_committed += 1;
721 self.snapshot.chunks_stored += chunk_count;
722 self.snapshot.storage_queue_depth =
723 self.snapshot.storage_queue_depth.saturating_sub(1);
724 }
725 PipelineEvent::Error {
726 path,
727 stage,
728 message,
729 } => {
730 self.snapshot.errors += 1;
731 match *stage {
732 "embedder" => {
733 self.snapshot.chunker_queue_depth =
734 self.snapshot.chunker_queue_depth.saturating_sub(1);
735 self.snapshot.embed_active_requests =
736 self.snapshot.embed_active_requests.saturating_sub(1);
737 }
738 "storage" => {
739 self.snapshot.storage_queue_depth =
740 self.snapshot.storage_queue_depth.saturating_sub(1);
741 }
742 _ => {}
743 }
744 if let Some(path) = path {
745 self.record_failure(path);
746 }
747 self.error_messages.push(match path {
748 Some(path) => format!("{} [{}]: {}", stage, path.display(), message),
749 None => format!("{}: {}", stage, message),
750 });
751 }
752 PipelineEvent::Snapshot(snapshot) => {
753 self.snapshot = snapshot.as_ref().clone();
754 }
755 }
756
757 self.refresh_snapshot()
758 }
759
760 fn refresh_snapshot(&mut self) -> PipelineSnapshot {
761 let elapsed = self.started_at.elapsed();
762 let terminal_files = self.snapshot.files_committed
763 + self.snapshot.files_skipped
764 + self.snapshot.files_failed;
765 let elapsed_secs = elapsed.as_secs_f64();
766
767 self.snapshot.elapsed = elapsed;
768 self.snapshot.files_per_sec = if elapsed_secs > 0.0 {
769 terminal_files as f64 / elapsed_secs
770 } else {
771 0.0
772 };
773 self.snapshot.chunks_per_sec = if elapsed_secs > 0.0 {
774 self.snapshot.chunks_stored as f64 / elapsed_secs
775 } else {
776 0.0
777 };
778
779 let remaining_files = self.snapshot.total_files.saturating_sub(terminal_files);
780 self.snapshot.eta = if self.snapshot.files_per_sec > 0.0 && remaining_files > 0 {
781 Some(Duration::from_secs_f64(
782 remaining_files as f64 / self.snapshot.files_per_sec,
783 ))
784 } else {
785 None
786 };
787
788 self.snapshot.bottleneck = determine_bottleneck(&self.snapshot);
789 self.snapshot.clone()
790 }
791}
792
793fn determine_bottleneck(snapshot: &PipelineSnapshot) -> String {
794 let terminal_files = snapshot.files_committed + snapshot.files_skipped + snapshot.files_failed;
795 if terminal_files >= snapshot.total_files {
796 return "complete".to_string();
797 }
798
799 let mut stage = "idle";
800 let mut depth = 0usize;
801
802 if snapshot.reader_queue_depth > depth {
803 stage = "chunker";
804 depth = snapshot.reader_queue_depth;
805 }
806 if snapshot.chunker_queue_depth > depth {
807 stage = "embedder";
808 depth = snapshot.chunker_queue_depth;
809 }
810 if snapshot.storage_queue_depth > depth {
811 stage = "storage";
812 depth = snapshot.storage_queue_depth;
813 }
814 if snapshot.embed_active_requests > 0
815 && snapshot.embed_active_requests >= snapshot.embed_concurrency_limit.max(1)
816 && depth <= snapshot.chunker_queue_depth
817 {
818 stage = "embedder";
819 depth = snapshot.embed_active_requests;
820 }
821 if depth == 0 && snapshot.files_read < snapshot.total_files {
822 stage = "reader";
823 }
824
825 stage.to_string()
826}
827
828#[derive(Clone)]
829struct PipelineObserver {
830 sender: Option<mpsc::UnboundedSender<PipelineEvent>>,
831 state: Arc<Mutex<PipelineProgressState>>,
832}
833
834impl PipelineObserver {
835 fn new(
836 total_files: usize,
837 discovered_files: usize,
838 resumed_files: usize,
839 sender: Option<mpsc::UnboundedSender<PipelineEvent>>,
840 initial_runtime: EmbedRuntimeSettings,
841 governor_mode: String,
842 governor_reason: String,
843 ) -> Self {
844 Self {
845 sender,
846 state: Arc::new(Mutex::new(PipelineProgressState::new(
847 total_files,
848 discovered_files,
849 resumed_files,
850 initial_runtime,
851 governor_mode,
852 governor_reason,
853 ))),
854 }
855 }
856
857 async fn emit(&self, event: PipelineEvent) {
858 let snapshot = {
859 let mut state = self.state.lock().await;
860 state.apply(&event)
861 };
862
863 if let Some(sender) = &self.sender {
864 let _ = sender.send(event);
865 let _ = sender.send(PipelineEvent::Snapshot(Box::new(snapshot)));
866 }
867 }
868
869 async fn emit_initial_snapshot(&self) {
870 let snapshot = {
871 let mut state = self.state.lock().await;
872 state.refresh_snapshot()
873 };
874
875 if let Some(sender) = &self.sender {
876 let _ = sender.send(PipelineEvent::Snapshot(Box::new(snapshot)));
877 }
878 }
879
880 async fn result(&self) -> PipelineResult {
881 let state = self.state.lock().await;
882 PipelineResult {
883 stats: state.snapshot.to_stats(),
884 errors: state.error_messages.clone(),
885 }
886 }
887
888 async fn snapshot(&self) -> PipelineSnapshot {
889 let state = self.state.lock().await;
890 state.snapshot.clone()
891 }
892}
893
894async fn stage_read_files(
903 files: Vec<PathBuf>,
904 namespace: String,
905 storage: Arc<StorageManager>,
906 dedup_enabled: bool,
907 preprocess_config: Option<PreprocessingConfig>,
908 tx: mpsc::Sender<FileContent>,
909 observer: PipelineObserver,
910) {
911 let preprocessor = preprocess_config.map(|config| {
912 let min_content_length = config.min_content_length;
913 (min_content_length, Preprocessor::new(config))
914 });
915
916 for path in files {
917 let text = match extract_file_text(&path).await {
918 Ok(text) => text,
919 Err(err) => {
920 warn!("Failed to read file {:?}: {}", path, err);
921 observer
922 .emit(PipelineEvent::Error {
923 path: Some(path.clone()),
924 stage: "reader",
925 message: err.to_string(),
926 })
927 .await;
928 continue;
929 }
930 };
931
932 let content_hash = crate::rag::compute_content_hash(&text);
933
934 if dedup_enabled {
935 let already_indexed = match storage.has_source_hash(&namespace, &content_hash).await {
940 Ok(true) => true,
941 Ok(false) => match storage.has_content_hash(&namespace, &content_hash).await {
942 Ok(true) => true,
943 Ok(false) => false,
944 Err(err) => {
945 warn!("content_hash dedup fallback failed for {:?}: {}", path, err);
946 false
947 }
948 },
949 Err(err) => {
950 warn!("source_hash dedup check failed for {:?}: {}", path, err);
951 false
952 }
953 };
954
955 if already_indexed {
956 info!(
963 "Skip duplicate source: {} (source_hash {})",
964 path.display(),
965 &content_hash[..16]
966 );
967 observer
968 .emit(PipelineEvent::FileSkipped {
969 path: path.clone(),
970 content_hash,
971 reason: "exact duplicate".to_string(),
972 })
973 .await;
974 continue;
975 }
976 }
977
978 let text = if let Some((min_content_length, preprocessor)) = &preprocessor {
979 let cleaned = preprocessor.extract_semantic_content(&text);
980 if cleaned.trim().len() < *min_content_length {
981 observer
982 .emit(PipelineEvent::FileSkipped {
983 path: path.clone(),
984 content_hash,
985 reason: "preprocessed content below min length".to_string(),
986 })
987 .await;
988 continue;
989 }
990 cleaned
991 } else {
992 text
993 };
994
995 let bytes = text.len();
996 let content = FileContent {
997 path: path.clone(),
998 text,
999 namespace: namespace.clone(),
1000 content_hash: content_hash.clone(),
1001 };
1002
1003 if tx.send(content).await.is_err() {
1004 debug!("Reader: channel closed, stopping");
1005 break;
1006 }
1007
1008 observer
1009 .emit(PipelineEvent::FileRead {
1010 path,
1011 content_hash,
1012 bytes,
1013 })
1014 .await;
1015 }
1016
1017 info!("Reader stage complete");
1018}
1019
1020async fn extract_file_text(path: &Path) -> Result<String> {
1022 let ext = path
1023 .extension()
1024 .and_then(|e| e.to_str())
1025 .unwrap_or("")
1026 .to_lowercase();
1027
1028 if ext == "pdf" {
1029 let path = path.to_path_buf();
1030 let pdf_text =
1031 tokio::task::spawn_blocking(move || pdf_extract::extract_text(&path)).await??;
1032 return Ok(pdf_text);
1033 }
1034
1035 let (_path, content) = crate::path_utils::safe_read_to_string_async(path).await?;
1036 Ok(content)
1037}
1038
1039async fn stage_chunk_content(
1045 mut rx: mpsc::Receiver<FileContent>,
1046 tx: mpsc::Sender<ChunkBatch>,
1047 slice_mode: SliceMode,
1048 chunker: Option<ChunkerKind>,
1049 outer_synthesis: OuterSynthesis,
1050 observer: PipelineObserver,
1051) {
1052 while let Some(file_content) = rx.recv().await {
1053 let path = file_content.path.clone();
1054 let content_hash = file_content.content_hash.clone();
1055 let selected_chunker = select_pipeline_chunker(
1056 chunker,
1057 slice_mode,
1058 &file_content.path,
1059 &file_content.namespace,
1060 );
1061 let opts = ChunkOpts::new(
1062 selected_chunker,
1063 selected_chunker.slice_mode(slice_mode),
1064 outer_synthesis.clone(),
1065 );
1066 let provider = selected_chunker.into_provider();
1067 let chunks = match provider.chunk(&file_content, &opts).await {
1068 Ok(chunks) => chunks,
1069 Err(err) => {
1070 warn!(
1071 "Chunker {} failed for {}: {}",
1072 provider.name(),
1073 file_content.path.display(),
1074 err
1075 );
1076 Vec::new()
1077 }
1078 };
1079 let count = chunks.len();
1080
1081 if tx
1082 .send(ChunkBatch {
1083 path: path.clone(),
1084 content_hash: content_hash.clone(),
1085 chunks,
1086 })
1087 .await
1088 .is_err()
1089 {
1090 debug!("Chunker: channel closed, stopping");
1091 break;
1092 }
1093
1094 observer
1095 .emit(PipelineEvent::ChunksCreated {
1096 path,
1097 content_hash,
1098 count,
1099 })
1100 .await;
1101 }
1102
1103 info!("Chunker stage complete");
1104}
1105
1106fn select_pipeline_chunker(
1107 explicit: Option<ChunkerKind>,
1108 requested_slice_mode: SliceMode,
1109 source_path: &Path,
1110 namespace: &str,
1111) -> ChunkerKind {
1112 if let Some(chunker) = explicit {
1113 return chunker;
1114 }
1115
1116 if requested_slice_mode == SliceMode::Flat {
1117 return ChunkerKind::Flat;
1118 }
1119
1120 detect_default_chunker(source_path, namespace)
1121}
1122
1123async fn stage_embed_chunks(
1129 mut rx: mpsc::Receiver<ChunkBatch>,
1130 tx: mpsc::Sender<EmbeddedFile>,
1131 base_client: EmbeddingClient,
1132 embed_concurrency: usize,
1133 governor_config: Option<PipelineGovernorConfig>,
1134 observer: PipelineObserver,
1135) {
1136 let fixed_settings = {
1137 let (max_batch_chars, max_batch_items) = base_client.batch_limits();
1138 EmbedRuntimeSettings::new(max_batch_chars, max_batch_items, embed_concurrency)
1139 };
1140 let mut governor = governor_config.map(PipelineGovernor::new);
1141 let initial_adjustment = governor
1142 .as_ref()
1143 .map(PipelineGovernor::initial_adjustment)
1144 .unwrap_or_else(|| PipelineGovernorAdjustment {
1145 settings: fixed_settings,
1146 mode: "fixed".to_string(),
1147 reason: "operator-configured limits".to_string(),
1148 });
1149 observer
1150 .emit(PipelineEvent::GovernorAdjusted {
1151 batch_items_limit: initial_adjustment.settings.max_batch_items,
1152 batch_chars_limit: initial_adjustment.settings.max_batch_chars,
1153 concurrency_limit: initial_adjustment.settings.concurrency,
1154 mode: initial_adjustment.mode,
1155 reason: initial_adjustment.reason,
1156 })
1157 .await;
1158
1159 let result_capacity = embed_concurrency.max(1).saturating_mul(2);
1160 let (result_tx, mut result_rx) = mpsc::channel::<EmbedWorkerResult>(result_capacity.max(2));
1161 let mut input_closed = false;
1162 let mut in_flight = 0usize;
1163
1164 loop {
1165 if input_closed && in_flight == 0 {
1166 break;
1167 }
1168
1169 let settings = governor
1170 .as_ref()
1171 .map(PipelineGovernor::current_settings)
1172 .unwrap_or(fixed_settings);
1173
1174 if !input_closed && in_flight < settings.concurrency {
1175 tokio::select! {
1176 maybe_batch = rx.recv() => {
1177 match maybe_batch {
1178 Some(chunk_batch) => {
1179 if chunk_batch.chunks.is_empty() {
1180 continue;
1181 }
1182
1183 let batch_chars: usize = chunk_batch
1184 .chunks
1185 .iter()
1186 .map(|chunk| chunk.content.chars().count())
1187 .sum();
1188 observer
1189 .emit(PipelineEvent::EmbedStarted {
1190 path: chunk_batch.path.clone(),
1191 content_hash: chunk_batch.content_hash.clone(),
1192 count: chunk_batch.chunks.len(),
1193 chars: batch_chars,
1194 batch_items_limit: settings.max_batch_items,
1195 batch_chars_limit: settings.max_batch_chars,
1196 concurrency_limit: settings.concurrency,
1197 })
1198 .await;
1199
1200 in_flight += 1;
1201 let worker_tx = result_tx.clone();
1202 let worker_client = base_client
1203 .clone_with_batch_limits(settings.max_batch_chars, settings.max_batch_items);
1204 tokio::spawn(async move {
1205 let result = embed_chunk_batch(worker_client, chunk_batch).await;
1206 let _ = worker_tx.send(result).await;
1207 });
1208 }
1209 None => input_closed = true,
1210 }
1211 }
1212 Some(result) = result_rx.recv(), if in_flight > 0 => {
1213 in_flight = in_flight.saturating_sub(1);
1214 if !handle_embed_result(result, &tx, &observer, governor.as_mut()).await {
1215 break;
1216 }
1217 }
1218 }
1219 } else if let Some(result) = result_rx.recv().await {
1220 in_flight = in_flight.saturating_sub(1);
1221 if !handle_embed_result(result, &tx, &observer, governor.as_mut()).await {
1222 break;
1223 }
1224 } else {
1225 break;
1226 }
1227 }
1228
1229 info!("Embedder stage complete");
1230}
1231
1232async fn embed_chunk_batch(
1233 mut client: EmbeddingClient,
1234 chunk_batch: ChunkBatch,
1235) -> EmbedWorkerResult {
1236 let path = chunk_batch.path;
1237 let content_hash = chunk_batch.content_hash;
1238 let chars: usize = chunk_batch
1239 .chunks
1240 .iter()
1241 .map(|chunk| chunk.content.chars().count())
1242 .sum();
1243 let texts: Vec<String> = chunk_batch
1244 .chunks
1245 .iter()
1246 .map(|chunk| chunk.content.clone())
1247 .collect();
1248
1249 let start = Instant::now();
1250 match client.embed_batch(&texts).await {
1251 Ok(embeddings) => {
1252 let count = embeddings.len();
1253 let chunks = chunk_batch
1254 .chunks
1255 .into_iter()
1256 .zip(embeddings)
1257 .map(|(chunk, embedding)| EmbeddedChunk { chunk, embedding })
1258 .collect();
1259
1260 EmbedWorkerResult {
1261 path,
1262 content_hash,
1263 count,
1264 chars,
1265 elapsed: start.elapsed(),
1266 chunks: Some(chunks),
1267 error: None,
1268 }
1269 }
1270 Err(err) => EmbedWorkerResult {
1271 path,
1272 content_hash,
1273 count: chunk_batch.chunks.len(),
1274 chars,
1275 elapsed: start.elapsed(),
1276 chunks: None,
1277 error: Some(err.to_string()),
1278 },
1279 }
1280}
1281
1282async fn handle_embed_result(
1283 result: EmbedWorkerResult,
1284 tx: &mpsc::Sender<EmbeddedFile>,
1285 observer: &PipelineObserver,
1286 governor: Option<&mut PipelineGovernor>,
1287) -> bool {
1288 if let Some(error_message) = result.error {
1289 error!(
1290 "Embedding batch failed for {:?}: {}",
1291 result.path, error_message
1292 );
1293 observer
1294 .emit(PipelineEvent::Error {
1295 path: Some(result.path.clone()),
1296 stage: "embedder",
1297 message: error_message.clone(),
1298 })
1299 .await;
1300
1301 if let Some(governor) = governor {
1302 let snapshot = observer.snapshot().await;
1303 if let Some(adjustment) = governor.on_error(&snapshot, &error_message) {
1304 observer
1305 .emit(PipelineEvent::GovernorAdjusted {
1306 batch_items_limit: adjustment.settings.max_batch_items,
1307 batch_chars_limit: adjustment.settings.max_batch_chars,
1308 concurrency_limit: adjustment.settings.concurrency,
1309 mode: adjustment.mode,
1310 reason: adjustment.reason,
1311 })
1312 .await;
1313 }
1314 }
1315 return true;
1316 }
1317
1318 let Some(chunks) = result.chunks else {
1319 return true;
1320 };
1321
1322 if tx
1323 .send(EmbeddedFile {
1324 path: result.path.clone(),
1325 content_hash: result.content_hash.clone(),
1326 chunks,
1327 })
1328 .await
1329 .is_err()
1330 {
1331 debug!("Embedder: channel closed, stopping");
1332 observer
1333 .emit(PipelineEvent::Error {
1334 path: Some(result.path),
1335 stage: "embedder",
1336 message: "storage channel closed".to_string(),
1337 })
1338 .await;
1339 return false;
1340 }
1341
1342 observer
1343 .emit(PipelineEvent::ChunksEmbedded {
1344 path: result.path.clone(),
1345 content_hash: result.content_hash.clone(),
1346 count: result.count,
1347 chars: result.chars,
1348 elapsed: result.elapsed,
1349 })
1350 .await;
1351
1352 if let Some(governor) = governor {
1353 let snapshot = observer.snapshot().await;
1354 if let Some(adjustment) = governor.on_success(result.elapsed, &snapshot) {
1355 observer
1356 .emit(PipelineEvent::GovernorAdjusted {
1357 batch_items_limit: adjustment.settings.max_batch_items,
1358 batch_chars_limit: adjustment.settings.max_batch_chars,
1359 concurrency_limit: adjustment.settings.concurrency,
1360 mode: adjustment.mode,
1361 reason: adjustment.reason,
1362 })
1363 .await;
1364 }
1365 }
1366
1367 true
1368}
1369
1370async fn stage_store_chunks(
1376 mut rx: mpsc::Receiver<EmbeddedFile>,
1377 storage: Arc<StorageManager>,
1378 observer: PipelineObserver,
1379) {
1380 while let Some(mut embedded_file) = rx.recv().await {
1381 let path = embedded_file.path.clone();
1382 let content_hash = embedded_file.content_hash.clone();
1383 let mut stored_for_file = 0usize;
1384 let mut storage_failed = false;
1385 let mut stored_doc_refs: Vec<(String, String)> = Vec::new();
1386
1387 while !embedded_file.chunks.is_empty() {
1388 let take = embedded_file.chunks.len().min(STORAGE_BATCH_SIZE);
1389 let batch: Vec<EmbeddedChunk> = embedded_file.chunks.drain(..take).collect();
1390 let batch_refs: Vec<(String, String)> = batch
1391 .iter()
1392 .map(|embedded| (embedded.chunk.namespace.clone(), embedded.chunk.id.clone()))
1393 .collect();
1394
1395 match store_batch(&storage, batch).await {
1396 Ok(count) => {
1397 stored_for_file += count;
1398 stored_doc_refs.extend(batch_refs);
1399 }
1400 Err(err) => {
1401 let (rolled_back, rollback_failures) =
1402 rollback_stored_file_chunks(&storage, &stored_doc_refs).await;
1403 let rollback_suffix = if rollback_failures == 0 {
1404 format!(
1405 "rolled back {} previously stored chunks for file",
1406 rolled_back
1407 )
1408 } else {
1409 format!(
1410 "rolled back {} previously stored chunks for file, {} rollback deletes failed",
1411 rolled_back, rollback_failures
1412 )
1413 };
1414 error!("Storage batch failed for {:?}: {}", path, err);
1415 observer
1416 .emit(PipelineEvent::Error {
1417 path: Some(path.clone()),
1418 stage: "storage",
1419 message: format!("{err}; {rollback_suffix}"),
1420 })
1421 .await;
1422 storage_failed = true;
1423 break;
1424 }
1425 }
1426 }
1427
1428 if !storage_failed {
1429 observer
1430 .emit(PipelineEvent::FileCommitted {
1431 path,
1432 content_hash,
1433 chunk_count: stored_for_file,
1434 })
1435 .await;
1436 }
1437 }
1438
1439 info!("Storage stage complete");
1440}
1441
1442async fn rollback_stored_file_chunks(
1443 storage: &StorageManager,
1444 stored_doc_refs: &[(String, String)],
1445) -> (usize, usize) {
1446 let mut deleted = 0usize;
1447 let mut failures = 0usize;
1448
1449 for (namespace, id) in stored_doc_refs.iter().rev() {
1450 match storage.delete_document(namespace, id).await {
1451 Ok(count) => deleted += count,
1452 Err(err) => {
1453 failures += 1;
1454 warn!(
1455 "Failed to roll back partially stored chunk {}/{}: {}",
1456 namespace, id, err
1457 );
1458 }
1459 }
1460 }
1461
1462 (deleted, failures)
1463}
1464
1465async fn store_batch(storage: &StorageManager, batch: Vec<EmbeddedChunk>) -> Result<usize> {
1471 let count = batch.len();
1472
1473 let documents: Vec<ChromaDocument> = batch
1474 .into_iter()
1475 .map(|embedded| {
1476 let source_hash = Some(embedded.chunk.source_hash);
1477 if embedded.chunk.layer > 0 {
1478 ChromaDocument {
1479 id: embedded.chunk.id,
1480 namespace: embedded.chunk.namespace,
1481 embedding: embedded.embedding,
1482 metadata: embedded.chunk.metadata,
1483 document: embedded.chunk.content,
1484 layer: embedded.chunk.layer,
1485 parent_id: embedded.chunk.parent_id,
1486 children_ids: embedded.chunk.children_ids,
1487 keywords: embedded.chunk.keywords,
1488 content_hash: Some(embedded.chunk.chunk_hash),
1489 source_hash,
1490 }
1491 } else {
1492 ChromaDocument::new_flat_with_hashes(
1493 embedded.chunk.id,
1494 embedded.chunk.namespace,
1495 embedded.embedding,
1496 embedded.chunk.metadata,
1497 embedded.chunk.content,
1498 embedded.chunk.chunk_hash,
1499 source_hash,
1500 )
1501 }
1502 })
1503 .collect();
1504
1505 storage.add_to_store(documents).await?;
1506 debug!("Stored batch of {} chunks", count);
1507 Ok(count)
1508}
1509
1510pub async fn run_pipeline(
1518 files: Vec<PathBuf>,
1519 namespace: String,
1520 storage: Arc<StorageManager>,
1521 client: Arc<Mutex<EmbeddingClient>>,
1522 config: PipelineConfig,
1523) -> Result<PipelineResult> {
1524 let total_files = files.len();
1525 let discovered_files = config
1526 .discovered_files
1527 .max(total_files.saturating_add(config.resumed_files))
1528 .max(total_files);
1529 info!(
1530 "Starting pipeline: {} scheduled files ({} discovered, {} resumed), mode: {:?}",
1531 total_files, discovered_files, config.resumed_files, config.slice_mode
1532 );
1533
1534 let base_client = {
1535 let guard = client.lock().await;
1536 guard.clone()
1537 };
1538 let initial_runtime = config
1539 .governor
1540 .as_ref()
1541 .map(PipelineGovernorConfig::initial_settings)
1542 .unwrap_or_else(|| {
1543 let (max_batch_chars, max_batch_items) = base_client.batch_limits();
1544 EmbedRuntimeSettings::new(max_batch_chars, max_batch_items, config.embed_concurrency)
1545 });
1546 let (governor_mode, governor_reason) = if config.governor.is_some() {
1547 (
1548 "adaptive".to_string(),
1549 "warming up from conservative limits".to_string(),
1550 )
1551 } else {
1552 (
1553 "fixed".to_string(),
1554 "operator-configured limits".to_string(),
1555 )
1556 };
1557 let observer = PipelineObserver::new(
1558 total_files,
1559 discovered_files,
1560 config.resumed_files,
1561 config.event_sender.clone(),
1562 initial_runtime,
1563 governor_mode,
1564 governor_reason,
1565 );
1566 observer.emit_initial_snapshot().await;
1567
1568 let (tx1, rx1) = mpsc::channel::<FileContent>(config.reader_buffer);
1569 let (tx2, rx2) = mpsc::channel::<ChunkBatch>(config.chunker_buffer);
1570 let (tx3, rx3) = mpsc::channel::<EmbeddedFile>(config.embedder_buffer);
1571
1572 let storage_for_reader = storage.clone();
1573 let storage_for_storage = storage;
1574 let ns_for_reader = namespace.clone();
1575 let slice_mode = config.slice_mode;
1576 let chunker = config.chunker;
1577 let outer_synthesis = config.outer_synthesis.clone();
1578 let dedup_enabled = config.dedup_enabled;
1579 let preprocess_config = config.preprocess_config.clone();
1580
1581 let reader_handle = tokio::spawn(stage_read_files(
1582 files,
1583 ns_for_reader,
1584 storage_for_reader,
1585 dedup_enabled,
1586 preprocess_config,
1587 tx1,
1588 observer.clone(),
1589 ));
1590
1591 let chunker_handle = tokio::spawn(stage_chunk_content(
1592 rx1,
1593 tx2,
1594 slice_mode,
1595 chunker,
1596 outer_synthesis,
1597 observer.clone(),
1598 ));
1599 let embedder_handle = tokio::spawn(stage_embed_chunks(
1600 rx2,
1601 tx3,
1602 base_client,
1603 config.embed_concurrency.max(1),
1604 config.governor.clone(),
1605 observer.clone(),
1606 ));
1607 let storage_handle = tokio::spawn(stage_store_chunks(
1608 rx3,
1609 storage_for_storage,
1610 observer.clone(),
1611 ));
1612
1613 let (_reader_result, _chunker_result, _embedder_result, _storage_result) = tokio::try_join!(
1614 reader_handle,
1615 chunker_handle,
1616 embedder_handle,
1617 storage_handle
1618 )?;
1619
1620 let result = observer.result().await;
1621
1622 info!(
1623 "Pipeline complete: {} files -> {} chunks -> {} stored",
1624 result.stats.files_committed, result.stats.chunks_created, result.stats.chunks_stored
1625 );
1626
1627 Ok(result)
1628}
1629
1630#[cfg(test)]
1631mod tests {
1632 use super::*;
1633 use std::path::PathBuf;
1634 use tempfile::TempDir;
1635
1636 #[test]
1637 fn test_split_into_chunks_short_text() {
1638 let text = "Hello world";
1639 let chunks = crate::rag::provider::split_into_chunks(text, 100, 20);
1640 assert_eq!(chunks.len(), 1);
1641 assert_eq!(chunks[0], "Hello world");
1642 }
1643
1644 #[test]
1645 fn test_split_into_chunks_with_overlap() {
1646 let text = "abcdefghijklmnopqrstuvwxyz";
1647 let chunks = crate::rag::provider::split_into_chunks(text, 10, 3);
1648 assert!(chunks.len() > 1);
1649 assert_eq!(chunks[0].len(), 10);
1650 assert!(chunks[0].ends_with(&chunks[1][..3]));
1651 }
1652
1653 #[test]
1654 fn test_pipeline_config_default() {
1655 let config = PipelineConfig::default();
1656 assert_eq!(config.reader_buffer, CHANNEL_BUFFER_SIZE);
1657 assert_eq!(config.slice_mode, SliceMode::default());
1658 assert!(matches!(
1659 config.outer_synthesis,
1660 crate::rag::OuterSynthesis::Keyword
1661 ));
1662 assert!(config.dedup_enabled);
1663 assert!(config.preprocess_config.is_none());
1664 assert_eq!(config.embed_concurrency, 1);
1665 assert!(config.governor.is_none());
1666 assert!(config.event_sender.is_none());
1667 }
1668
1669 #[tokio::test]
1670 async fn pipeline_reader_applies_preprocess_after_raw_source_hashing() {
1671 let tmp = TempDir::new().expect("temp dir");
1672 let source = tmp.path().join("conversation.md");
1673 let raw = "Operator decision: ship flat memory.\n\nsession_id: 123e4567-e89b-12d3-a456-426614174000\n";
1674 std::fs::write(&source, raw).expect("write source");
1675
1676 let storage = Arc::new(
1677 StorageManager::new_lance_only(tmp.path().join("lancedb").to_str().unwrap())
1678 .await
1679 .expect("storage"),
1680 );
1681 storage.ensure_collection().await.expect("collection");
1682 let observer = PipelineObserver::new(
1683 1,
1684 1,
1685 0,
1686 None,
1687 EmbedRuntimeSettings::new(8_192, 16, 1),
1688 "fixed".to_string(),
1689 "operator-configured limits".to_string(),
1690 );
1691 let (tx, mut rx) = mpsc::channel(1);
1692
1693 stage_read_files(
1694 vec![source],
1695 "kb:test".to_string(),
1696 storage,
1697 false,
1698 Some(PreprocessingConfig {
1699 remove_metadata: true,
1700 min_content_length: 1,
1701 ..Default::default()
1702 }),
1703 tx,
1704 observer,
1705 )
1706 .await;
1707
1708 let content = rx.recv().await.expect("preprocessed file content");
1709 assert_eq!(content.content_hash, crate::rag::compute_content_hash(raw));
1710 assert!(content.text.contains("Operator decision"));
1711 assert!(!content.text.contains("123e4567"));
1712 }
1713
1714 #[test]
1715 fn pipeline_auto_chunker_honors_explicit_flat_slice_mode() {
1716 let selected = select_pipeline_chunker(
1717 None,
1718 SliceMode::Flat,
1719 Path::new("/Users/silver/memex-index-staging/miksa-clean/conversation.md"),
1720 "kb:mikserka",
1721 );
1722
1723 assert_eq!(selected, ChunkerKind::Flat);
1724 }
1725
1726 #[test]
1727 fn pipeline_auto_chunker_keeps_transcript_routing_for_onion_modes() {
1728 let selected = select_pipeline_chunker(
1729 None,
1730 SliceMode::Onion,
1731 Path::new("/Users/polyversai/.aicx/store/Loctree/session.md"),
1732 "aicx",
1733 );
1734
1735 assert_eq!(selected, ChunkerKind::Aicx);
1736 }
1737
1738 #[tokio::test]
1739 async fn test_pipeline_observer_tracks_snapshot_and_failures() {
1740 let observer = PipelineObserver::new(
1741 3,
1742 3,
1743 0,
1744 None,
1745 EmbedRuntimeSettings::new(8_192, 16, 2),
1746 "fixed".to_string(),
1747 "operator-configured limits".to_string(),
1748 );
1749 observer.emit_initial_snapshot().await;
1750
1751 let path_a = PathBuf::from("a.md");
1752 let path_b = PathBuf::from("b.md");
1753
1754 observer
1755 .emit(PipelineEvent::FileRead {
1756 path: path_a.clone(),
1757 content_hash: "hash-a".to_string(),
1758 bytes: 10,
1759 })
1760 .await;
1761 observer
1762 .emit(PipelineEvent::ChunksCreated {
1763 path: path_a.clone(),
1764 content_hash: "hash-a".to_string(),
1765 count: 4,
1766 })
1767 .await;
1768 observer
1769 .emit(PipelineEvent::EmbedStarted {
1770 path: path_a.clone(),
1771 content_hash: "hash-a".to_string(),
1772 count: 4,
1773 chars: 1200,
1774 batch_items_limit: 16,
1775 batch_chars_limit: 8_192,
1776 concurrency_limit: 2,
1777 })
1778 .await;
1779 observer
1780 .emit(PipelineEvent::ChunksEmbedded {
1781 path: path_a.clone(),
1782 content_hash: "hash-a".to_string(),
1783 count: 4,
1784 chars: 1200,
1785 elapsed: Duration::from_millis(400),
1786 })
1787 .await;
1788 observer
1789 .emit(PipelineEvent::FileCommitted {
1790 path: path_a,
1791 content_hash: "hash-a".to_string(),
1792 chunk_count: 4,
1793 })
1794 .await;
1795 observer
1796 .emit(PipelineEvent::FileSkipped {
1797 path: path_b.clone(),
1798 content_hash: "hash-b".to_string(),
1799 reason: "duplicate".to_string(),
1800 })
1801 .await;
1802 observer
1803 .emit(PipelineEvent::Error {
1804 path: Some(path_b),
1805 stage: "embedder",
1806 message: "boom".to_string(),
1807 })
1808 .await;
1809
1810 let result = observer.result().await;
1811 assert_eq!(result.stats.total_files, 3);
1812 assert_eq!(result.stats.discovered_files, 3);
1813 assert_eq!(result.stats.resumed_files, 0);
1814 assert_eq!(result.stats.files_read, 1);
1815 assert_eq!(result.stats.files_skipped, 1);
1816 assert_eq!(result.stats.files_committed, 1);
1817 assert_eq!(result.stats.files_failed, 1);
1818 assert_eq!(result.stats.chunks_created, 4);
1819 assert_eq!(result.stats.chunks_embedded, 4);
1820 assert_eq!(result.stats.chunks_stored, 4);
1821 assert_eq!(result.stats.errors, 1);
1822 assert_eq!(result.errors.len(), 1);
1823
1824 let snapshot = observer.snapshot().await;
1825 assert_eq!(snapshot.embed_concurrency_limit, 2);
1826 assert_eq!(snapshot.embed_batch_items_limit, 16);
1827 assert_eq!(snapshot.embed_batch_chars_limit, 8_192);
1828 assert!(snapshot.avg_embed_batch_ms.is_some());
1829 assert_eq!(snapshot.governor_mode, "fixed");
1830 }
1831
1832 #[test]
1833 fn test_pipeline_governor_scales_up_only_when_backlog_stays_fast() {
1834 let config = PipelineGovernorConfig::adaptive(64_000, 32, 4);
1835 let initial_items = config.min_batch_items;
1836 let initial_chars = config.min_batch_chars;
1837 let mut governor = PipelineGovernor::new(config);
1838
1839 governor.last_growth_at = Some(Instant::now() - governor.config.growth_cooldown);
1840 let snapshot = PipelineSnapshot {
1841 reader_queue_depth: 1,
1842 chunker_queue_depth: 4,
1843 storage_queue_depth: 0,
1844 ..Default::default()
1845 };
1846
1847 let first = governor
1848 .on_success(Duration::from_millis(320), &snapshot)
1849 .expect("first growth adjustment");
1850 assert!(first.settings.max_batch_items > initial_items);
1851
1852 governor.last_growth_at = Some(Instant::now() - governor.config.growth_cooldown);
1853 governor.current.max_batch_items = governor.config.max_batch_items;
1854 let second = governor
1855 .on_success(Duration::from_millis(320), &snapshot)
1856 .expect("second growth adjustment");
1857 assert!(second.settings.max_batch_chars > initial_chars);
1858 }
1859
1860 #[test]
1861 fn test_pipeline_governor_throttles_quickly_on_pressure() {
1862 let config = PipelineGovernorConfig::adaptive(96_000, 48, 3);
1863 let mut governor = PipelineGovernor::new(config.clone());
1864 governor.current =
1865 EmbedRuntimeSettings::new(config.max_batch_chars, config.max_batch_items, 3);
1866
1867 let snapshot = PipelineSnapshot {
1868 reader_queue_depth: 2,
1869 chunker_queue_depth: 5,
1870 storage_queue_depth: 4,
1871 ..Default::default()
1872 };
1873
1874 let adjustment = governor
1875 .on_success(
1876 config.pressure_latency + Duration::from_millis(10),
1877 &snapshot,
1878 )
1879 .expect("pressure adjustment");
1880 assert!(adjustment.settings.max_batch_items < config.max_batch_items);
1881 assert!(adjustment.settings.max_batch_chars < config.max_batch_chars);
1882 assert!(adjustment.settings.concurrency < 3);
1883 assert!(adjustment.reason.contains("pressure"));
1884 }
1885
1886 #[test]
1887 fn test_snapshot_to_stats_carries_runtime_truth() {
1888 let snapshot = PipelineSnapshot {
1889 total_files: 5,
1890 discovered_files: 7,
1891 resumed_files: 2,
1892 files_read: 4,
1893 files_skipped: 1,
1894 files_committed: 2,
1895 files_failed: 1,
1896 chunks_created: 12,
1897 chunks_embedded: 10,
1898 chunks_stored: 8,
1899 errors: 3,
1900 ..Default::default()
1901 };
1902
1903 let stats = snapshot.to_stats();
1904 assert_eq!(stats.total_files, 5);
1905 assert_eq!(stats.discovered_files, 7);
1906 assert_eq!(stats.resumed_files, 2);
1907 assert_eq!(stats.files_read, 4);
1908 assert_eq!(stats.files_skipped, 1);
1909 assert_eq!(stats.files_committed, 2);
1910 assert_eq!(stats.files_failed, 1);
1911 assert_eq!(stats.chunks_created, 12);
1912 assert_eq!(stats.chunks_embedded, 10);
1913 assert_eq!(stats.chunks_stored, 8);
1914 assert_eq!(stats.errors, 3);
1915 }
1916
1917 #[tokio::test]
1918 async fn test_rollback_stored_file_chunks_removes_partial_file_writes() {
1919 let tmp = TempDir::new().expect("temp dir");
1920 let db_path = tmp.path().join("lancedb");
1921 let storage = StorageManager::new_lance_only(db_path.to_str().unwrap())
1922 .await
1923 .expect("storage");
1924 storage.ensure_collection().await.expect("collection");
1925
1926 let namespace = "rollback-ns".to_string();
1927 let doc_a = ChromaDocument::new_flat_with_hash(
1928 "chunk-a".to_string(),
1929 namespace.clone(),
1930 vec![0.1_f32; 8],
1931 serde_json::json!({"path": "doc-a.md"}),
1932 "alpha".to_string(),
1933 "file-hash".to_string(),
1934 );
1935 let doc_b = ChromaDocument::new_flat_with_hash(
1936 "chunk-b".to_string(),
1937 namespace.clone(),
1938 vec![0.2_f32; 8],
1939 serde_json::json!({"path": "doc-a.md"}),
1940 "beta".to_string(),
1941 "file-hash".to_string(),
1942 );
1943
1944 storage
1945 .add_to_store(vec![doc_a, doc_b])
1946 .await
1947 .expect("seed partial writes");
1948
1949 let (deleted, failures) = rollback_stored_file_chunks(
1950 &storage,
1951 &[
1952 (namespace.clone(), "chunk-a".to_string()),
1953 (namespace.clone(), "chunk-b".to_string()),
1954 ],
1955 )
1956 .await;
1957
1958 assert_eq!(deleted, 2);
1959 assert_eq!(failures, 0);
1960 assert!(
1961 storage
1962 .get_document(&namespace, "chunk-a")
1963 .await
1964 .expect("lookup chunk-a")
1965 .is_none()
1966 );
1967 assert!(
1968 storage
1969 .get_document(&namespace, "chunk-b")
1970 .await
1971 .expect("lookup chunk-b")
1972 .is_none()
1973 );
1974 }
1975}