rexis_rag/pipeline.rs
1//! # RRAG Pipeline System
2//!
3//! Composable, async-first processing pipelines for building complex RAG workflows
4//! from simple, reusable components. Features parallel execution, error handling,
5//! comprehensive monitoring, and type-safe data flow.
6//!
7//! ## Features
8//!
9//! - **Composable Steps**: Build complex workflows from simple, reusable components
10//! - **Type-Safe Data Flow**: Compile-time validation of pipeline data types
11//! - **Async Execution**: Full async/await support with parallel step execution
12//! - **Error Handling**: Robust error handling with optional error recovery
13//! - **Monitoring**: Built-in execution tracking and performance metrics
14//! - **Flexible Configuration**: Extensive configuration options for behavior tuning
15//! - **Caching Support**: Optional step result caching for performance
16//!
17//! ## Quick Start
18//!
19//! ### Basic Pipeline
20//!
21//! ```rust
22//! use rrag::prelude::*;
23//!
24//! # #[tokio::main]
25//! # async fn main() -> RragResult<()> {
26//! // Create a simple text processing pipeline
27//! let pipeline = RagPipelineBuilder::new()
28//! .add_step(TextPreprocessingStep::new(vec![
29//! TextOperation::NormalizeWhitespace,
30//! TextOperation::RemoveSpecialChars,
31//! ]))
32//! .add_step(DocumentChunkingStep::new(
33//! ChunkingStrategy::FixedSize { size: 512, overlap: 64 }
34//! ))
35//! .build();
36//!
37//! // Execute pipeline
38//! let context = PipelineContext::new(PipelineData::Text(
39//! "This is some text to process through the pipeline.".to_string()
40//! ));
41//!
42//! let result = pipeline.execute(context).await?;
43//! tracing::debug!("Pipeline completed in {}ms", result.total_execution_time());
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! ### Advanced RAG Pipeline
49//!
50//! ```rust
51//! use rrag::prelude::*;
52//! use std::sync::Arc;
53//!
54//! # #[tokio::main]
55//! # async fn main() -> RragResult<()> {
56//! // Create a comprehensive RAG processing pipeline
57//! let embedding_provider = Arc::new(OpenAIEmbeddingProvider::new("api-key"));
58//! let embedding_service = Arc::new(EmbeddingService::new(embedding_provider));
59//!
60//! let pipeline = RagPipelineBuilder::new()
61//! .with_config(PipelineConfig {
62//! enable_parallelism: true,
63//! max_parallel_steps: 4,
64//! enable_caching: true,
65//! ..Default::default()
66//! })
67//! .add_step(TextPreprocessingStep::new(vec![
68//! TextOperation::NormalizeWhitespace,
69//! TextOperation::RemoveExtraWhitespace,
70//! ]))
71//! .add_step(DocumentChunkingStep::new(
72//! ChunkingStrategy::Semantic { similarity_threshold: 0.8 }
73//! ))
74//! .add_step(EmbeddingStep::new(embedding_service))
75//! .add_step(RetrievalStep::new())
76//! .build();
77//!
78//! // Process documents
79//! let documents = vec![
80//! Document::new("First document content"),
81//! Document::new("Second document content"),
82//! ];
83//!
84//! let context = PipelineContext::new(PipelineData::Documents(documents))
85//! .with_metadata("batch_id", "batch-123".into())
86//! .with_metadata("priority", "high".into());
87//!
88//! let result = pipeline.execute(context).await?;
89//! tracing::debug!("Processed {} documents", result.execution_history.len());
90//! # Ok(())
91//! # }
92//! ```
93//!
94//! ### Custom Pipeline Steps
95//!
96//! ```rust
97//! use rrag::prelude::*;
98//! use async_trait::async_trait;
99//!
100//! // Define a custom pipeline step
101//! struct CustomValidationStep {
102//! min_length: usize,
103//! }
104//!
105//! #[async_trait]
106//! impl PipelineStep for CustomValidationStep {
107//! fn name(&self) -> &str { "custom_validation" }
108//! fn description(&self) -> &str { "Validates document content length" }
109//! fn input_types(&self) -> Vec<&'static str> { vec!["Document", "Documents"] }
110//! fn output_type(&self) -> &'static str { "Document|Documents" }
111//!
112//! async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
113//! // Custom validation logic here
114//! match &context.data {
115//! PipelineData::Document(doc) => {
116//! if doc.content_length() < self.min_length {
117//! return Err(RragError::validation(
118//! "document_length",
119//! format!("minimum {}", self.min_length),
120//! doc.content_length().to_string()
121//! ));
122//! }
123//! }
124//! _ => return Err(RragError::document_processing("Invalid input type"))
125//! }
126//! Ok(context)
127//! }
128//! }
129//!
130//! # #[tokio::main]
131//! # async fn main() -> RragResult<()> {
132//! // Use the custom step in a pipeline
133//! let pipeline = RagPipelineBuilder::new()
134//! .add_step(CustomValidationStep { min_length: 100 })
135//! .add_step(TextPreprocessingStep::new(vec![TextOperation::NormalizeWhitespace]))
136//! .build();
137//! # Ok(())
138//! # }
139//! ```
140//!
141//! ## Pipeline Configuration
142//!
143//! ```rust
144//! use rrag::prelude::*;
145//!
146//! let config = PipelineConfig {
147//! max_execution_time: 600, // 10 minutes
148//! continue_on_error: true, // Continue processing on step failures
149//! enable_parallelism: true,
150//! max_parallel_steps: 8,
151//! enable_caching: true,
152//! custom_config: [
153//! ("batch_size".to_string(), 100.into()),
154//! ("retry_attempts".to_string(), 3.into()),
155//! ].into_iter().collect(),
156//! };
157//! ```
158//!
159//! ## Error Handling
160//!
161//! ```rust
162//! use rrag::prelude::*;
163//!
164//! # #[tokio::main]
165//! # async fn main() {
166//! match pipeline.execute(context).await {
167//! Ok(result) => {
168//! tracing::debug!("Pipeline completed successfully");
169//! tracing::debug!("Total time: {}ms", result.total_execution_time());
170//!
171//! if result.has_failures() {
172//! tracing::debug!("Some steps failed but pipeline continued");
173//! for step in &result.execution_history {
174//! if !step.success {
175//! tracing::debug!("Step '{}' failed: {:?}", step.step_id, step.error_message);
176//! }
177//! }
178//! }
179//! }
180//! Err(RragError::Timeout { operation, duration_ms }) => {
181//! tracing::debug!("Pipeline timed out in {}: {}ms", operation, duration_ms);
182//! }
183//! Err(e) => {
184//! tracing::debug!("Pipeline failed: {}", e);
185//! }
186//! }
187//! # }
188//! ```
189//!
190//! ## Performance Optimization
191//!
192//! - **Parallel Execution**: Steps that don't depend on each other run concurrently
193//! - **Caching**: Enable result caching for expensive operations
194//! - **Batch Processing**: Process multiple items together when possible
195//! - **Memory Management**: Efficient data structures and minimal copying
196//! - **Async Operations**: Non-blocking I/O and CPU-intensive operations
197//!
198//! ## Built-in Steps
199//!
200//! RRAG provides several built-in pipeline steps:
201//!
202//! - [`TextPreprocessingStep`]: Text normalization and cleaning
203//! - [`DocumentChunkingStep`]: Document chunking with various strategies
204//! - [`EmbeddingStep`]: Embedding generation with provider abstraction
205//! - [`RetrievalStep`]: Vector similarity search and retrieval
206//! - Custom steps via the [`PipelineStep`] trait
207
208use crate::{
209 Document, DocumentChunk, DocumentChunker, Embedding, EmbeddingService, RetrievalService,
210 RragError, RragResult, SearchResult, StorageService,
211};
212use async_trait::async_trait;
213use serde::{Deserialize, Serialize};
214use std::collections::HashMap;
215use std::sync::Arc;
216use std::time::Instant;
217
218/// Execution context for pipeline processing
219///
220/// Carries data, metadata, configuration, and execution history through
221/// a pipeline. Each pipeline execution gets its own context that tracks
222/// all steps, timing, errors, and intermediate results.
223///
224/// # Example
225///
226/// ```rust
227/// use rrag::prelude::*;
228///
229/// let context = PipelineContext::new(PipelineData::Text(
230/// "Document content to process".to_string()
231/// ))
232/// .with_metadata("source", "api".into())
233/// .with_metadata("priority", "high".into());
234///
235/// tracing::debug!("Processing execution: {}", context.execution_id);
236/// ```
237#[derive(Debug, Clone)]
238pub struct PipelineContext {
239 /// Execution ID for tracking
240 pub execution_id: String,
241
242 /// Input data for the pipeline
243 pub data: PipelineData,
244
245 /// Execution metadata
246 pub metadata: HashMap<String, serde_json::Value>,
247
248 /// Step execution history
249 pub execution_history: Vec<StepExecution>,
250
251 /// Pipeline configuration
252 pub config: PipelineConfig,
253}
254
255/// Data types that can flow through pipeline steps
256///
257/// Represents the various types of data that can be processed by pipeline steps.
258/// Each step declares which input types it accepts and which output type it produces,
259/// enabling compile-time validation of pipeline composition.
260///
261/// # Type Safety
262///
263/// The pipeline system uses these variants to ensure type safety:
264/// - Steps declare compatible input/output types
265/// - Runtime validation ensures data type correctness
266/// - Clear error messages for type mismatches
267///
268/// # Example
269///
270/// ```rust
271/// use rrag::prelude::*;
272///
273/// // Different data types that can flow through pipelines
274/// let text_data = PipelineData::Text("Raw text content".to_string());
275/// let doc_data = PipelineData::Document(Document::new("Document content"));
276/// let docs_data = PipelineData::Documents(vec![
277/// Document::new("First doc"),
278/// Document::new("Second doc"),
279/// ]);
280/// ```
281#[derive(Debug, Clone)]
282pub enum PipelineData {
283 /// Raw text input
284 Text(String),
285
286 /// Document input
287 Document(Document),
288
289 /// Multiple documents
290 Documents(Vec<Document>),
291
292 /// Document chunks
293 Chunks(Vec<DocumentChunk>),
294
295 /// Embeddings
296 Embeddings(Vec<Embedding>),
297
298 /// Search results
299 SearchResults(Vec<SearchResult>),
300
301 /// JSON data
302 Json(serde_json::Value),
303}
304
305/// Pipeline configuration
306#[derive(Debug, Clone)]
307pub struct PipelineConfig {
308 /// Maximum execution time in seconds
309 pub max_execution_time: u64,
310
311 /// Whether to continue on step errors
312 pub continue_on_error: bool,
313
314 /// Parallel execution where possible
315 pub enable_parallelism: bool,
316
317 /// Maximum parallel steps
318 pub max_parallel_steps: usize,
319
320 /// Enable step caching
321 pub enable_caching: bool,
322
323 /// Custom configuration
324 pub custom_config: HashMap<String, serde_json::Value>,
325}
326
327impl Default for PipelineConfig {
328 fn default() -> Self {
329 Self {
330 max_execution_time: 300, // 5 minutes
331 continue_on_error: false,
332 enable_parallelism: true,
333 max_parallel_steps: 4,
334 enable_caching: false,
335 custom_config: HashMap::new(),
336 }
337 }
338}
339
340/// Step execution record
341#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct StepExecution {
343 /// Step name/ID
344 pub step_id: String,
345
346 /// Execution start time
347 pub start_time: chrono::DateTime<chrono::Utc>,
348
349 /// Execution duration in milliseconds
350 pub duration_ms: u64,
351
352 /// Whether step succeeded
353 pub success: bool,
354
355 /// Error message if failed
356 pub error_message: Option<String>,
357
358 /// Step metadata
359 pub metadata: HashMap<String, serde_json::Value>,
360}
361
362impl PipelineContext {
363 /// Create new pipeline context
364 pub fn new(data: PipelineData) -> Self {
365 Self {
366 execution_id: uuid::Uuid::new_v4().to_string(),
367 data,
368 metadata: HashMap::new(),
369 execution_history: Vec::new(),
370 config: PipelineConfig::default(),
371 }
372 }
373
374 /// Create with configuration
375 pub fn with_config(data: PipelineData, config: PipelineConfig) -> Self {
376 Self {
377 execution_id: uuid::Uuid::new_v4().to_string(),
378 data,
379 metadata: HashMap::new(),
380 execution_history: Vec::new(),
381 config,
382 }
383 }
384
385 /// Add metadata
386 pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
387 self.metadata.insert(key.into(), value);
388 self
389 }
390
391 /// Record step execution
392 pub fn record_step(&mut self, step_execution: StepExecution) {
393 self.execution_history.push(step_execution);
394 }
395
396 /// Get total execution time
397 pub fn total_execution_time(&self) -> u64 {
398 self.execution_history
399 .iter()
400 .map(|step| step.duration_ms)
401 .sum()
402 }
403
404 /// Check if any step failed
405 pub fn has_failures(&self) -> bool {
406 self.execution_history.iter().any(|step| !step.success)
407 }
408}
409
410/// Core trait for implementing pipeline steps
411///
412/// Each pipeline step implements this trait to define its behavior, input/output types,
413/// dependencies, and execution logic. Steps are composable building blocks that can
414/// be combined to create complex processing workflows.
415///
416/// # Design Principles
417///
418/// - **Single Responsibility**: Each step should do one thing well
419/// - **Type Safety**: Declare input/output types for validation
420/// - **Async First**: All execution is async for better concurrency
421/// - **Error Handling**: Comprehensive error reporting with context
422/// - **Monitoring**: Built-in execution tracking and metrics
423///
424/// # Example Implementation
425///
426/// ```rust
427/// use rrag::prelude::*;
428/// use async_trait::async_trait;
429///
430/// struct UppercaseStep;
431///
432/// #[async_trait]
433/// impl PipelineStep for UppercaseStep {
434/// fn name(&self) -> &str { "uppercase_text" }
435/// fn description(&self) -> &str { "Converts text to uppercase" }
436/// fn input_types(&self) -> Vec<&'static str> { vec!["Text"] }
437/// fn output_type(&self) -> &'static str { "Text" }
438///
439/// async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
440/// match &context.data {
441/// PipelineData::Text(text) => {
442/// context.data = PipelineData::Text(text.to_uppercase());
443/// Ok(context)
444/// }
445/// _ => Err(RragError::document_processing("Expected Text input"))
446/// }
447/// }
448/// }
449/// ```
450///
451/// # Parallel Execution
452///
453/// Steps can declare whether they support parallel execution:
454///
455/// ```rust
456/// # use rrag::prelude::*;
457/// # use async_trait::async_trait;
458/// # struct MyStep;
459/// # #[async_trait]
460/// # impl PipelineStep for MyStep {
461/// # fn name(&self) -> &str { "my_step" }
462/// # fn description(&self) -> &str { "description" }
463/// # fn input_types(&self) -> Vec<&'static str> { vec!["Text"] }
464/// # fn output_type(&self) -> &'static str { "Text" }
465/// # async fn execute(&self, context: PipelineContext) -> RragResult<PipelineContext> { Ok(context) }
466/// // Override to disable parallelization for stateful operations
467/// fn is_parallelizable(&self) -> bool {
468/// false // This step cannot run in parallel
469/// }
470///
471/// // Declare dependencies on other steps
472/// fn dependencies(&self) -> Vec<&str> {
473/// vec!["preprocessing", "validation"]
474/// }
475/// # }
476/// ```
477#[async_trait]
478pub trait PipelineStep: Send + Sync {
479 /// Step name/identifier
480 fn name(&self) -> &str;
481
482 /// Step description
483 fn description(&self) -> &str;
484
485 /// Input data types this step accepts
486 fn input_types(&self) -> Vec<&'static str>;
487
488 /// Output data type this step produces
489 fn output_type(&self) -> &'static str;
490
491 /// Execute the step
492 async fn execute(&self, context: PipelineContext) -> RragResult<PipelineContext>;
493
494 /// Validate input data
495 fn validate_input(&self, _data: &PipelineData) -> RragResult<()> {
496 // Default implementation - override for custom validation
497 Ok(())
498 }
499
500 /// Whether this step can run in parallel with others
501 fn is_parallelizable(&self) -> bool {
502 true
503 }
504
505 /// Dependencies on other steps (step names)
506 fn dependencies(&self) -> Vec<&str> {
507 Vec::new()
508 }
509}
510
511/// Built-in text preprocessing step for content normalization
512///
513/// Applies a sequence of text transformations to clean and normalize content
514/// before further processing. Supports common operations like whitespace
515/// normalization, case conversion, and special character handling.
516///
517/// # Supported Operations
518///
519/// - **Whitespace Normalization**: Collapse multiple spaces into single spaces
520/// - **Case Conversion**: Convert text to lowercase for consistency
521/// - **Special Character Removal**: Remove non-alphanumeric characters
522/// - **Regex Replacement**: Custom pattern-based text replacement
523///
524/// # Example
525///
526/// ```rust
527/// use rrag::prelude::*;
528///
529/// let step = TextPreprocessingStep::new(vec![
530/// TextOperation::NormalizeWhitespace,
531/// TextOperation::RemoveSpecialChars,
532/// TextOperation::ToLowercase,
533/// ]);
534///
535/// // Can also be built fluently
536/// let step = TextPreprocessingStep::new(vec![])
537/// .with_operation(TextOperation::NormalizeWhitespace)
538/// .with_operation(TextOperation::RegexReplace {
539/// pattern: r"\d+".to_string(),
540/// replacement: "[NUMBER]".to_string(),
541/// });
542/// ```
543///
544/// # Performance
545///
546/// - Operations are applied in sequence for predictable results
547/// - String allocations are minimized where possible
548/// - Regex operations are compiled once and reused
549/// - Supports batch processing for multiple documents
550pub struct TextPreprocessingStep {
551 /// Preprocessing operations to apply
552 operations: Vec<TextOperation>,
553}
554
555/// Text preprocessing operations for document processing
556#[derive(Debug, Clone)]
557pub enum TextOperation {
558 /// Convert to lowercase
559 ToLowercase,
560
561 /// Remove extra whitespace
562 NormalizeWhitespace,
563
564 /// Remove special characters
565 RemoveSpecialChars,
566
567 /// Custom regex replacement
568 RegexReplace {
569 /// Regular expression pattern to match
570 pattern: String,
571 /// Replacement string for matched patterns
572 replacement: String,
573 },
574}
575
576impl TextPreprocessingStep {
577 /// Create a new text preprocessing step with specified operations
578 pub fn new(operations: Vec<TextOperation>) -> Self {
579 Self { operations }
580 }
581
582 fn process_text(&self, text: &str) -> String {
583 let mut result = text.to_string();
584
585 for operation in &self.operations {
586 result = match operation {
587 TextOperation::ToLowercase => result.to_lowercase(),
588 TextOperation::NormalizeWhitespace => {
589 result.split_whitespace().collect::<Vec<_>>().join(" ")
590 }
591 TextOperation::RemoveSpecialChars => result
592 .chars()
593 .filter(|c| c.is_alphanumeric() || c.is_whitespace())
594 .collect(),
595 TextOperation::RegexReplace {
596 pattern,
597 replacement,
598 } => {
599 // Simple implementation - in production would use regex crate
600 result.replace(pattern, replacement)
601 }
602 };
603 }
604
605 result
606 }
607}
608
609#[async_trait]
610impl PipelineStep for TextPreprocessingStep {
611 fn name(&self) -> &str {
612 "text_preprocessing"
613 }
614
615 fn description(&self) -> &str {
616 "Preprocesses text data with various normalization operations"
617 }
618
619 fn input_types(&self) -> Vec<&'static str> {
620 vec!["Text", "Document", "Documents"]
621 }
622
623 fn output_type(&self) -> &'static str {
624 "Text|Document|Documents"
625 }
626
627 async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
628 let start_time = Instant::now();
629 let step_start = chrono::Utc::now();
630
631 let processed_data = match &context.data {
632 PipelineData::Text(text) => PipelineData::Text(self.process_text(text)),
633 PipelineData::Document(doc) => {
634 // Create a new document with processed content
635 let processed_content = self.process_text(doc.content_str());
636 let mut new_doc = Document::new(processed_content);
637 new_doc.id = doc.id.clone();
638 new_doc.metadata = doc.metadata.clone();
639 new_doc.content_hash = doc.content_hash.clone();
640 new_doc.created_at = doc.created_at;
641 PipelineData::Document(new_doc)
642 }
643 PipelineData::Documents(docs) => {
644 let processed_docs: Vec<Document> = docs
645 .iter()
646 .map(|doc| {
647 let processed_content = self.process_text(doc.content_str());
648 let mut new_doc = Document::new(processed_content);
649 new_doc.id = doc.id.clone();
650 new_doc.metadata = doc.metadata.clone();
651 new_doc.content_hash = doc.content_hash.clone();
652 new_doc.created_at = doc.created_at;
653 new_doc
654 })
655 .collect();
656 PipelineData::Documents(processed_docs)
657 }
658 _ => {
659 let error = "Input must be Text, Document, or Documents";
660 context.record_step(StepExecution {
661 step_id: self.name().to_string(),
662 start_time: step_start,
663 duration_ms: start_time.elapsed().as_millis() as u64,
664 success: false,
665 error_message: Some(error.to_string()),
666 metadata: HashMap::new(),
667 });
668 return Err(RragError::document_processing(error));
669 }
670 };
671
672 context.data = processed_data;
673
674 context.record_step(StepExecution {
675 step_id: self.name().to_string(),
676 start_time: step_start,
677 duration_ms: start_time.elapsed().as_millis() as u64,
678 success: true,
679 error_message: None,
680 metadata: HashMap::new(),
681 });
682
683 Ok(context)
684 }
685}
686
687/// Document chunking step
688pub struct DocumentChunkingStep {
689 /// Document chunker instance
690 chunker: DocumentChunker,
691}
692
693impl DocumentChunkingStep {
694 /// Create a new document chunking step with the specified chunker
695 pub fn new(chunker: DocumentChunker) -> Self {
696 Self { chunker }
697 }
698}
699
700#[async_trait]
701impl PipelineStep for DocumentChunkingStep {
702 fn name(&self) -> &str {
703 "document_chunking"
704 }
705
706 fn description(&self) -> &str {
707 "Splits documents into smaller chunks for processing"
708 }
709
710 fn input_types(&self) -> Vec<&'static str> {
711 vec!["Document", "Documents"]
712 }
713
714 fn output_type(&self) -> &'static str {
715 "Chunks"
716 }
717
718 async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
719 let start_time = Instant::now();
720 let step_start = chrono::Utc::now();
721
722 let chunks = match &context.data {
723 PipelineData::Document(doc) => self.chunker.chunk_document(doc)?,
724 PipelineData::Documents(docs) => {
725 let mut all_chunks = Vec::new();
726 for doc in docs {
727 all_chunks.extend(self.chunker.chunk_document(doc)?);
728 }
729 all_chunks
730 }
731 _ => {
732 let error = "Input must be Document or Documents";
733 context.record_step(StepExecution {
734 step_id: self.name().to_string(),
735 start_time: step_start,
736 duration_ms: start_time.elapsed().as_millis() as u64,
737 success: false,
738 error_message: Some(error.to_string()),
739 metadata: HashMap::new(),
740 });
741 return Err(RragError::document_processing(error));
742 }
743 };
744
745 context.data = PipelineData::Chunks(chunks);
746
747 context.record_step(StepExecution {
748 step_id: self.name().to_string(),
749 start_time: step_start,
750 duration_ms: start_time.elapsed().as_millis() as u64,
751 success: true,
752 error_message: None,
753 metadata: HashMap::new(),
754 });
755
756 Ok(context)
757 }
758}
759
760/// Embedding generation step
761pub struct EmbeddingStep {
762 /// Embedding service
763 embedding_service: Arc<EmbeddingService>,
764}
765
766impl EmbeddingStep {
767 /// Create a new embedding generation step with the specified service
768 pub fn new(embedding_service: Arc<EmbeddingService>) -> Self {
769 Self { embedding_service }
770 }
771}
772
773#[async_trait]
774impl PipelineStep for EmbeddingStep {
775 fn name(&self) -> &str {
776 "embedding_generation"
777 }
778
779 fn description(&self) -> &str {
780 "Generates embeddings for documents or chunks"
781 }
782
783 fn input_types(&self) -> Vec<&'static str> {
784 vec!["Document", "Documents", "Chunks"]
785 }
786
787 fn output_type(&self) -> &'static str {
788 "Embeddings"
789 }
790
791 async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
792 let start_time = Instant::now();
793 let step_start = chrono::Utc::now();
794
795 let embeddings = match &context.data {
796 PipelineData::Document(doc) => {
797 vec![self.embedding_service.embed_document(doc).await?]
798 }
799 PipelineData::Documents(docs) => self.embedding_service.embed_documents(docs).await?,
800 PipelineData::Chunks(chunks) => self.embedding_service.embed_chunks(chunks).await?,
801 _ => {
802 let error = "Input must be Document, Documents, or Chunks";
803 context.record_step(StepExecution {
804 step_id: self.name().to_string(),
805 start_time: step_start,
806 duration_ms: start_time.elapsed().as_millis() as u64,
807 success: false,
808 error_message: Some(error.to_string()),
809 metadata: HashMap::new(),
810 });
811 return Err(RragError::embedding("pipeline", error));
812 }
813 };
814
815 context.data = PipelineData::Embeddings(embeddings);
816
817 context.record_step(StepExecution {
818 step_id: self.name().to_string(),
819 start_time: step_start,
820 duration_ms: start_time.elapsed().as_millis() as u64,
821 success: true,
822 error_message: None,
823 metadata: HashMap::new(),
824 });
825
826 Ok(context)
827 }
828}
829
830/// Retrieval step for similarity search
831pub struct RetrievalStep {
832 /// Retrieval service
833 retrieval_service: Arc<RetrievalService>,
834
835 /// Search configuration
836 search_config: SearchStepConfig,
837}
838
839/// Configuration for search/retrieval step
840#[derive(Debug, Clone)]
841pub struct SearchStepConfig {
842 /// Number of results to retrieve
843 pub limit: usize,
844
845 /// Minimum similarity threshold
846 pub min_score: f32,
847
848 /// Search query text (if not using embeddings)
849 pub query_text: Option<String>,
850}
851
852impl Default for SearchStepConfig {
853 fn default() -> Self {
854 Self {
855 limit: 10,
856 min_score: 0.0,
857 query_text: None,
858 }
859 }
860}
861
862impl RetrievalStep {
863 /// Create a new retrieval step with default configuration
864 pub fn new(retrieval_service: Arc<RetrievalService>) -> Self {
865 Self {
866 retrieval_service,
867 search_config: SearchStepConfig::default(),
868 }
869 }
870
871 /// Create a new retrieval step with custom configuration
872 pub fn with_config(retrieval_service: Arc<RetrievalService>, config: SearchStepConfig) -> Self {
873 Self {
874 retrieval_service,
875 search_config: config,
876 }
877 }
878}
879
880#[async_trait]
881impl PipelineStep for RetrievalStep {
882 fn name(&self) -> &str {
883 "similarity_retrieval"
884 }
885
886 fn description(&self) -> &str {
887 "Performs similarity search using embeddings"
888 }
889
890 fn input_types(&self) -> Vec<&'static str> {
891 vec!["Embeddings"]
892 }
893
894 fn output_type(&self) -> &'static str {
895 "SearchResults"
896 }
897
898 async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
899 let start_time = Instant::now();
900 let step_start = chrono::Utc::now();
901
902 let search_results = match &context.data {
903 PipelineData::Embeddings(embeddings) => {
904 if embeddings.is_empty() {
905 Vec::new()
906 } else {
907 // Use the first embedding as query (could be enhanced)
908 let query_embedding = embeddings[0].clone();
909 self.retrieval_service
910 .search_embedding(query_embedding, Some(self.search_config.limit))
911 .await?
912 }
913 }
914 _ => {
915 let error = "Input must be Embeddings";
916 context.record_step(StepExecution {
917 step_id: self.name().to_string(),
918 start_time: step_start,
919 duration_ms: start_time.elapsed().as_millis() as u64,
920 success: false,
921 error_message: Some(error.to_string()),
922 metadata: HashMap::new(),
923 });
924 return Err(RragError::retrieval(error));
925 }
926 };
927
928 context.data = PipelineData::SearchResults(search_results);
929
930 context.record_step(StepExecution {
931 step_id: self.name().to_string(),
932 start_time: step_start,
933 duration_ms: start_time.elapsed().as_millis() as u64,
934 success: true,
935 error_message: None,
936 metadata: HashMap::new(),
937 });
938
939 Ok(context)
940 }
941}
942
943/// Pipeline for composing multiple steps
944pub struct Pipeline {
945 /// Pipeline steps in execution order
946 steps: Vec<Arc<dyn PipelineStep>>,
947
948 /// Pipeline configuration
949 config: PipelineConfig,
950
951 /// Pipeline metadata
952 metadata: HashMap<String, serde_json::Value>,
953}
954
955impl Pipeline {
956 /// Create new pipeline
957 pub fn new() -> Self {
958 Self {
959 steps: Vec::new(),
960 config: PipelineConfig::default(),
961 metadata: HashMap::new(),
962 }
963 }
964
965 /// Create with configuration
966 pub fn with_config(config: PipelineConfig) -> Self {
967 Self {
968 steps: Vec::new(),
969 config,
970 metadata: HashMap::new(),
971 }
972 }
973
974 /// Add a step to the pipeline
975 pub fn add_step(mut self, step: Arc<dyn PipelineStep>) -> Self {
976 self.steps.push(step);
977 self
978 }
979
980 /// Add metadata
981 pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
982 self.metadata.insert(key.into(), value);
983 self
984 }
985
986 /// Execute the pipeline
987 pub async fn execute(&self, initial_data: PipelineData) -> RragResult<PipelineContext> {
988 let mut context = PipelineContext::with_config(initial_data, self.config.clone());
989
990 // Add pipeline metadata to context
991 context.metadata.extend(self.metadata.clone());
992
993 let start_time = Instant::now();
994
995 for step in &self.steps {
996 // Check timeout
997 if start_time.elapsed().as_secs() > self.config.max_execution_time {
998 return Err(RragError::timeout(
999 "pipeline_execution",
1000 self.config.max_execution_time * 1000,
1001 ));
1002 }
1003
1004 // Validate input
1005 if let Err(e) = step.validate_input(&context.data) {
1006 if !self.config.continue_on_error {
1007 return Err(e);
1008 }
1009 // Record error and continue
1010 context.record_step(StepExecution {
1011 step_id: step.name().to_string(),
1012 start_time: chrono::Utc::now(),
1013 duration_ms: 0,
1014 success: false,
1015 error_message: Some(e.to_string()),
1016 metadata: HashMap::new(),
1017 });
1018 continue;
1019 }
1020
1021 // Execute step (clone context to satisfy borrow checker)
1022 let context_clone = PipelineContext {
1023 execution_id: context.execution_id.clone(),
1024 data: context.data.clone(),
1025 metadata: context.metadata.clone(),
1026 execution_history: context.execution_history.clone(),
1027 config: context.config.clone(),
1028 };
1029
1030 match step.execute(context_clone).await {
1031 Ok(new_context) => {
1032 context = new_context;
1033 }
1034 Err(e) => {
1035 if !self.config.continue_on_error {
1036 return Err(e);
1037 }
1038 // Record error and continue with unchanged context
1039 context.record_step(StepExecution {
1040 step_id: step.name().to_string(),
1041 start_time: chrono::Utc::now(),
1042 duration_ms: 0,
1043 success: false,
1044 error_message: Some(e.to_string()),
1045 metadata: HashMap::new(),
1046 });
1047 }
1048 }
1049 }
1050
1051 Ok(context)
1052 }
1053
1054 /// Get pipeline step information
1055 pub fn get_step_info(&self) -> Vec<PipelineStepInfo> {
1056 self.steps
1057 .iter()
1058 .map(|step| PipelineStepInfo {
1059 name: step.name().to_string(),
1060 description: step.description().to_string(),
1061 input_types: step.input_types().iter().map(|s| s.to_string()).collect(),
1062 output_type: step.output_type().to_string(),
1063 is_parallelizable: step.is_parallelizable(),
1064 dependencies: step.dependencies().iter().map(|s| s.to_string()).collect(),
1065 })
1066 .collect()
1067 }
1068}
1069
1070impl Default for Pipeline {
1071 fn default() -> Self {
1072 Self::new()
1073 }
1074}
1075
1076/// Pipeline step information for introspection
1077#[derive(Debug, Clone, Serialize, Deserialize)]
1078pub struct PipelineStepInfo {
1079 /// Name of the pipeline step
1080 pub name: String,
1081 /// Description of what the step does
1082 pub description: String,
1083 /// Types of input data this step accepts
1084 pub input_types: Vec<String>,
1085 /// Type of output data this step produces
1086 pub output_type: String,
1087 /// Whether this step can run in parallel with others
1088 pub is_parallelizable: bool,
1089 /// Names of steps this step depends on
1090 pub dependencies: Vec<String>,
1091}
1092
1093/// Pre-built pipeline builder for common RAG workflows
1094pub struct RagPipelineBuilder {
1095 /// Embedding service
1096 embedding_service: Option<Arc<EmbeddingService>>,
1097
1098 /// Retrieval service
1099 retrieval_service: Option<Arc<RetrievalService>>,
1100
1101 /// Storage service
1102 storage_service: Option<Arc<StorageService>>,
1103
1104 /// Pipeline configuration
1105 config: PipelineConfig,
1106}
1107
1108impl RagPipelineBuilder {
1109 /// Create a new RAG pipeline builder
1110 pub fn new() -> Self {
1111 Self {
1112 embedding_service: None,
1113 retrieval_service: None,
1114 storage_service: None,
1115 config: PipelineConfig::default(),
1116 }
1117 }
1118
1119 /// Set the embedding service for the pipeline
1120 pub fn with_embedding_service(mut self, service: Arc<EmbeddingService>) -> Self {
1121 self.embedding_service = Some(service);
1122 self
1123 }
1124
1125 /// Set the retrieval service for the pipeline
1126 pub fn with_retrieval_service(mut self, service: Arc<RetrievalService>) -> Self {
1127 self.retrieval_service = Some(service);
1128 self
1129 }
1130
1131 /// Set the storage service for the pipeline
1132 pub fn with_storage_service(mut self, service: Arc<StorageService>) -> Self {
1133 self.storage_service = Some(service);
1134 self
1135 }
1136
1137 /// Set custom configuration for the pipeline
1138 pub fn with_config(mut self, config: PipelineConfig) -> Self {
1139 self.config = config;
1140 self
1141 }
1142
1143 /// Build document ingestion pipeline
1144 pub fn build_ingestion_pipeline(&self) -> RragResult<Pipeline> {
1145 let embedding_service = self
1146 .embedding_service
1147 .as_ref()
1148 .ok_or_else(|| RragError::config("embedding_service", "required", "missing"))?;
1149
1150 let pipeline = Pipeline::with_config(self.config.clone())
1151 .add_step(Arc::new(TextPreprocessingStep::new(vec![
1152 TextOperation::NormalizeWhitespace,
1153 TextOperation::ToLowercase,
1154 ])))
1155 .add_step(Arc::new(DocumentChunkingStep::new(DocumentChunker::new())))
1156 .add_step(Arc::new(EmbeddingStep::new(embedding_service.clone())));
1157
1158 Ok(pipeline)
1159 }
1160
1161 /// Build query pipeline for search
1162 pub fn build_query_pipeline(&self) -> RragResult<Pipeline> {
1163 let embedding_service = self
1164 .embedding_service
1165 .as_ref()
1166 .ok_or_else(|| RragError::config("embedding_service", "required", "missing"))?;
1167
1168 let retrieval_service = self
1169 .retrieval_service
1170 .as_ref()
1171 .ok_or_else(|| RragError::config("retrieval_service", "required", "missing"))?;
1172
1173 let pipeline = Pipeline::with_config(self.config.clone())
1174 .add_step(Arc::new(TextPreprocessingStep::new(vec![
1175 TextOperation::NormalizeWhitespace,
1176 ])))
1177 .add_step(Arc::new(EmbeddingStep::new(embedding_service.clone())))
1178 .add_step(Arc::new(RetrievalStep::new(retrieval_service.clone())));
1179
1180 Ok(pipeline)
1181 }
1182}
1183
1184impl Default for RagPipelineBuilder {
1185 fn default() -> Self {
1186 Self::new()
1187 }
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192 use super::*;
1193 use crate::{Document, EmbeddingService, InMemoryRetriever, LocalEmbeddingProvider};
1194
1195 #[tokio::test]
1196 async fn test_text_preprocessing_step() {
1197 let step = TextPreprocessingStep::new(vec![
1198 TextOperation::ToLowercase,
1199 TextOperation::NormalizeWhitespace,
1200 ]);
1201
1202 let context = PipelineContext::new(PipelineData::Text(" HELLO WORLD ".to_string()));
1203 let result = step.execute(context).await.unwrap();
1204
1205 if let PipelineData::Text(processed) = result.data {
1206 assert_eq!(processed, "hello world");
1207 } else {
1208 panic!("Expected Text output");
1209 }
1210
1211 assert!(result.execution_history[0].success);
1212 }
1213
1214 #[tokio::test]
1215 async fn test_document_chunking_step() {
1216 let step = DocumentChunkingStep::new(DocumentChunker::new());
1217
1218 let doc = Document::new(
1219 "This is a test document with some content that should be chunked appropriately.",
1220 );
1221 let context = PipelineContext::new(PipelineData::Document(doc));
1222
1223 let result = step.execute(context).await.unwrap();
1224
1225 if let PipelineData::Chunks(chunks) = result.data {
1226 assert!(!chunks.is_empty());
1227 } else {
1228 panic!("Expected Chunks output");
1229 }
1230 }
1231
1232 #[tokio::test]
1233 async fn test_embedding_step() {
1234 let provider = Arc::new(LocalEmbeddingProvider::new("test-model", 128));
1235 let embedding_service = Arc::new(EmbeddingService::new(provider));
1236 let step = EmbeddingStep::new(embedding_service);
1237
1238 let doc = Document::new("Test document for embedding");
1239 let context = PipelineContext::new(PipelineData::Document(doc));
1240
1241 let result = step.execute(context).await.unwrap();
1242
1243 if let PipelineData::Embeddings(embeddings) = result.data {
1244 assert_eq!(embeddings.len(), 1);
1245 assert_eq!(embeddings[0].dimensions, 128);
1246 } else {
1247 panic!("Expected Embeddings output");
1248 }
1249 }
1250
1251 #[tokio::test]
1252 async fn test_pipeline_execution() {
1253 let provider = Arc::new(LocalEmbeddingProvider::new("test-model", 128));
1254 let embedding_service = Arc::new(EmbeddingService::new(provider));
1255
1256 let pipeline = Pipeline::new()
1257 .add_step(Arc::new(TextPreprocessingStep::new(vec![
1258 TextOperation::ToLowercase,
1259 ])))
1260 .add_step(Arc::new(EmbeddingStep::new(embedding_service)));
1261
1262 let doc = Document::new("TEST DOCUMENT");
1263 let result = pipeline.execute(PipelineData::Document(doc)).await.unwrap();
1264
1265 // Should have executed 2 steps
1266 assert_eq!(result.execution_history.len(), 2);
1267 assert!(result.execution_history.iter().all(|step| step.success));
1268
1269 // Final output should be embeddings
1270 if let PipelineData::Embeddings(embeddings) = result.data {
1271 assert_eq!(embeddings.len(), 1);
1272 } else {
1273 panic!("Expected Embeddings output");
1274 }
1275 }
1276
1277 #[tokio::test]
1278 async fn test_rag_pipeline_builder() {
1279 let provider = Arc::new(LocalEmbeddingProvider::new("test-model", 128));
1280 let embedding_service = Arc::new(EmbeddingService::new(provider));
1281
1282 let builder = RagPipelineBuilder::new().with_embedding_service(embedding_service);
1283
1284 let pipeline = builder.build_ingestion_pipeline().unwrap();
1285 let step_info = pipeline.get_step_info();
1286
1287 assert_eq!(step_info.len(), 3); // preprocessing, chunking, embedding
1288 assert_eq!(step_info[0].name, "text_preprocessing");
1289 assert_eq!(step_info[1].name, "document_chunking");
1290 assert_eq!(step_info[2].name, "embedding_generation");
1291 }
1292
1293 #[test]
1294 fn test_pipeline_context() {
1295 let mut context = PipelineContext::new(PipelineData::Text("test".to_string()))
1296 .with_metadata(
1297 "test_key",
1298 serde_json::Value::String("test_value".to_string()),
1299 );
1300
1301 assert_eq!(
1302 context.metadata.get("test_key").unwrap().as_str().unwrap(),
1303 "test_value"
1304 );
1305
1306 let step_execution = StepExecution {
1307 step_id: "test_step".to_string(),
1308 start_time: chrono::Utc::now(),
1309 duration_ms: 100,
1310 success: true,
1311 error_message: None,
1312 metadata: HashMap::new(),
1313 };
1314
1315 context.record_step(step_execution);
1316
1317 assert_eq!(context.execution_history.len(), 1);
1318 assert_eq!(context.total_execution_time(), 100);
1319 assert!(!context.has_failures());
1320 }
1321}