rexis_rag/
pipeline.rs

1//! # RRAG Pipeline System
2//!
3//! Composable, async-first processing pipelines for building complex RAG workflows
4//! from simple, reusable components. Features parallel execution, error handling,
5//! comprehensive monitoring, and type-safe data flow.
6//!
7//! ## Features
8//!
9//! - **Composable Steps**: Build complex workflows from simple, reusable components
10//! - **Type-Safe Data Flow**: Compile-time validation of pipeline data types
11//! - **Async Execution**: Full async/await support with parallel step execution
12//! - **Error Handling**: Robust error handling with optional error recovery
13//! - **Monitoring**: Built-in execution tracking and performance metrics
14//! - **Flexible Configuration**: Extensive configuration options for behavior tuning
15//! - **Caching Support**: Optional step result caching for performance
16//!
17//! ## Quick Start
18//!
19//! ### Basic Pipeline
20//!
21//! ```rust
22//! use rrag::prelude::*;
23//!
24//! # #[tokio::main]
25//! # async fn main() -> RragResult<()> {
26//! // Create a simple text processing pipeline
27//! let pipeline = RagPipelineBuilder::new()
28//!     .add_step(TextPreprocessingStep::new(vec![
29//!         TextOperation::NormalizeWhitespace,
30//!         TextOperation::RemoveSpecialChars,
31//!     ]))
32//!     .add_step(DocumentChunkingStep::new(
33//!         ChunkingStrategy::FixedSize { size: 512, overlap: 64 }
34//!     ))
35//!     .build();
36//!
37//! // Execute pipeline
38//! let context = PipelineContext::new(PipelineData::Text(
39//!     "This is some text to process through the pipeline.".to_string()
40//! ));
41//!
42//! let result = pipeline.execute(context).await?;
43//! tracing::debug!("Pipeline completed in {}ms", result.total_execution_time());
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! ### Advanced RAG Pipeline
49//!
50//! ```rust
51//! use rrag::prelude::*;
52//! use std::sync::Arc;
53//!
54//! # #[tokio::main]
55//! # async fn main() -> RragResult<()> {
56//! // Create a comprehensive RAG processing pipeline
57//! let embedding_provider = Arc::new(OpenAIEmbeddingProvider::new("api-key"));
58//! let embedding_service = Arc::new(EmbeddingService::new(embedding_provider));
59//!
60//! let pipeline = RagPipelineBuilder::new()
61//!     .with_config(PipelineConfig {
62//!         enable_parallelism: true,
63//!         max_parallel_steps: 4,
64//!         enable_caching: true,
65//!         ..Default::default()
66//!     })
67//!     .add_step(TextPreprocessingStep::new(vec![
68//!         TextOperation::NormalizeWhitespace,
69//!         TextOperation::RemoveExtraWhitespace,
70//!     ]))
71//!     .add_step(DocumentChunkingStep::new(
72//!         ChunkingStrategy::Semantic { similarity_threshold: 0.8 }
73//!     ))
74//!     .add_step(EmbeddingStep::new(embedding_service))
75//!     .add_step(RetrievalStep::new())
76//!     .build();
77//!
78//! // Process documents
79//! let documents = vec![
80//!     Document::new("First document content"),
81//!     Document::new("Second document content"),
82//! ];
83//!
84//! let context = PipelineContext::new(PipelineData::Documents(documents))
85//!     .with_metadata("batch_id", "batch-123".into())
86//!     .with_metadata("priority", "high".into());
87//!
88//! let result = pipeline.execute(context).await?;
89//! tracing::debug!("Processed {} documents", result.execution_history.len());
90//! # Ok(())
91//! # }
92//! ```
93//!
94//! ### Custom Pipeline Steps
95//!
96//! ```rust
97//! use rrag::prelude::*;
98//! use async_trait::async_trait;
99//!
100//! // Define a custom pipeline step
101//! struct CustomValidationStep {
102//!     min_length: usize,
103//! }
104//!
105//! #[async_trait]
106//! impl PipelineStep for CustomValidationStep {
107//!     fn name(&self) -> &str { "custom_validation" }
108//!     fn description(&self) -> &str { "Validates document content length" }
109//!     fn input_types(&self) -> Vec<&'static str> { vec!["Document", "Documents"] }
110//!     fn output_type(&self) -> &'static str { "Document|Documents" }
111//!
112//!     async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
113//!         // Custom validation logic here
114//!         match &context.data {
115//!             PipelineData::Document(doc) => {
116//!                 if doc.content_length() < self.min_length {
117//!                     return Err(RragError::validation(
118//!                         "document_length",
119//!                         format!("minimum {}", self.min_length),
120//!                         doc.content_length().to_string()
121//!                     ));
122//!                 }
123//!             }
124//!             _ => return Err(RragError::document_processing("Invalid input type"))
125//!         }
126//!         Ok(context)
127//!     }
128//! }
129//!
130//! # #[tokio::main]
131//! # async fn main() -> RragResult<()> {
132//! // Use the custom step in a pipeline
133//! let pipeline = RagPipelineBuilder::new()
134//!     .add_step(CustomValidationStep { min_length: 100 })
135//!     .add_step(TextPreprocessingStep::new(vec![TextOperation::NormalizeWhitespace]))
136//!     .build();
137//! # Ok(())
138//! # }
139//! ```
140//!
141//! ## Pipeline Configuration
142//!
143//! ```rust
144//! use rrag::prelude::*;
145//!
146//! let config = PipelineConfig {
147//!     max_execution_time: 600, // 10 minutes
148//!     continue_on_error: true, // Continue processing on step failures
149//!     enable_parallelism: true,
150//!     max_parallel_steps: 8,
151//!     enable_caching: true,
152//!     custom_config: [
153//!         ("batch_size".to_string(), 100.into()),
154//!         ("retry_attempts".to_string(), 3.into()),
155//!     ].into_iter().collect(),
156//! };
157//! ```
158//!
159//! ## Error Handling
160//!
161//! ```rust
162//! use rrag::prelude::*;
163//!
164//! # #[tokio::main]
165//! # async fn main() {
166//! match pipeline.execute(context).await {
167//!     Ok(result) => {
168//!         tracing::debug!("Pipeline completed successfully");
169//!         tracing::debug!("Total time: {}ms", result.total_execution_time());
170//!         
171//!         if result.has_failures() {
172//!             tracing::debug!("Some steps failed but pipeline continued");
173//!             for step in &result.execution_history {
174//!                 if !step.success {
175//!                     tracing::debug!("Step '{}' failed: {:?}", step.step_id, step.error_message);
176//!                 }
177//!             }
178//!         }
179//!     }
180//!     Err(RragError::Timeout { operation, duration_ms }) => {
181//!         tracing::debug!("Pipeline timed out in {}: {}ms", operation, duration_ms);
182//!     }
183//!     Err(e) => {
184//!         tracing::debug!("Pipeline failed: {}", e);
185//!     }
186//! }
187//! # }
188//! ```
189//!
190//! ## Performance Optimization
191//!
192//! - **Parallel Execution**: Steps that don't depend on each other run concurrently
193//! - **Caching**: Enable result caching for expensive operations
194//! - **Batch Processing**: Process multiple items together when possible
195//! - **Memory Management**: Efficient data structures and minimal copying
196//! - **Async Operations**: Non-blocking I/O and CPU-intensive operations
197//!
198//! ## Built-in Steps
199//!
200//! RRAG provides several built-in pipeline steps:
201//!
202//! - [`TextPreprocessingStep`]: Text normalization and cleaning
203//! - [`DocumentChunkingStep`]: Document chunking with various strategies  
204//! - [`EmbeddingStep`]: Embedding generation with provider abstraction
205//! - [`RetrievalStep`]: Vector similarity search and retrieval
206//! - Custom steps via the [`PipelineStep`] trait
207
208use crate::{
209    Document, DocumentChunk, DocumentChunker, Embedding, EmbeddingService, RetrievalService,
210    RragError, RragResult, SearchResult, StorageService,
211};
212use async_trait::async_trait;
213use serde::{Deserialize, Serialize};
214use std::collections::HashMap;
215use std::sync::Arc;
216use std::time::Instant;
217
218/// Execution context for pipeline processing
219///
220/// Carries data, metadata, configuration, and execution history through
221/// a pipeline. Each pipeline execution gets its own context that tracks
222/// all steps, timing, errors, and intermediate results.
223///
224/// # Example
225///
226/// ```rust
227/// use rrag::prelude::*;
228///
229/// let context = PipelineContext::new(PipelineData::Text(
230///     "Document content to process".to_string()
231/// ))
232/// .with_metadata("source", "api".into())
233/// .with_metadata("priority", "high".into());
234///
235/// tracing::debug!("Processing execution: {}", context.execution_id);
236/// ```
237#[derive(Debug, Clone)]
238pub struct PipelineContext {
239    /// Execution ID for tracking
240    pub execution_id: String,
241
242    /// Input data for the pipeline
243    pub data: PipelineData,
244
245    /// Execution metadata
246    pub metadata: HashMap<String, serde_json::Value>,
247
248    /// Step execution history
249    pub execution_history: Vec<StepExecution>,
250
251    /// Pipeline configuration
252    pub config: PipelineConfig,
253}
254
255/// Data types that can flow through pipeline steps
256///
257/// Represents the various types of data that can be processed by pipeline steps.
258/// Each step declares which input types it accepts and which output type it produces,
259/// enabling compile-time validation of pipeline composition.
260///
261/// # Type Safety
262///
263/// The pipeline system uses these variants to ensure type safety:
264/// - Steps declare compatible input/output types
265/// - Runtime validation ensures data type correctness
266/// - Clear error messages for type mismatches
267///
268/// # Example
269///
270/// ```rust
271/// use rrag::prelude::*;
272///
273/// // Different data types that can flow through pipelines
274/// let text_data = PipelineData::Text("Raw text content".to_string());
275/// let doc_data = PipelineData::Document(Document::new("Document content"));
276/// let docs_data = PipelineData::Documents(vec![
277///     Document::new("First doc"),
278///     Document::new("Second doc"),
279/// ]);
280/// ```
281#[derive(Debug, Clone)]
282pub enum PipelineData {
283    /// Raw text input
284    Text(String),
285
286    /// Document input
287    Document(Document),
288
289    /// Multiple documents
290    Documents(Vec<Document>),
291
292    /// Document chunks
293    Chunks(Vec<DocumentChunk>),
294
295    /// Embeddings
296    Embeddings(Vec<Embedding>),
297
298    /// Search results
299    SearchResults(Vec<SearchResult>),
300
301    /// JSON data
302    Json(serde_json::Value),
303}
304
305/// Pipeline configuration
306#[derive(Debug, Clone)]
307pub struct PipelineConfig {
308    /// Maximum execution time in seconds
309    pub max_execution_time: u64,
310
311    /// Whether to continue on step errors
312    pub continue_on_error: bool,
313
314    /// Parallel execution where possible
315    pub enable_parallelism: bool,
316
317    /// Maximum parallel steps
318    pub max_parallel_steps: usize,
319
320    /// Enable step caching
321    pub enable_caching: bool,
322
323    /// Custom configuration
324    pub custom_config: HashMap<String, serde_json::Value>,
325}
326
327impl Default for PipelineConfig {
328    fn default() -> Self {
329        Self {
330            max_execution_time: 300, // 5 minutes
331            continue_on_error: false,
332            enable_parallelism: true,
333            max_parallel_steps: 4,
334            enable_caching: false,
335            custom_config: HashMap::new(),
336        }
337    }
338}
339
340/// Step execution record
341#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct StepExecution {
343    /// Step name/ID
344    pub step_id: String,
345
346    /// Execution start time
347    pub start_time: chrono::DateTime<chrono::Utc>,
348
349    /// Execution duration in milliseconds
350    pub duration_ms: u64,
351
352    /// Whether step succeeded
353    pub success: bool,
354
355    /// Error message if failed
356    pub error_message: Option<String>,
357
358    /// Step metadata
359    pub metadata: HashMap<String, serde_json::Value>,
360}
361
362impl PipelineContext {
363    /// Create new pipeline context
364    pub fn new(data: PipelineData) -> Self {
365        Self {
366            execution_id: uuid::Uuid::new_v4().to_string(),
367            data,
368            metadata: HashMap::new(),
369            execution_history: Vec::new(),
370            config: PipelineConfig::default(),
371        }
372    }
373
374    /// Create with configuration
375    pub fn with_config(data: PipelineData, config: PipelineConfig) -> Self {
376        Self {
377            execution_id: uuid::Uuid::new_v4().to_string(),
378            data,
379            metadata: HashMap::new(),
380            execution_history: Vec::new(),
381            config,
382        }
383    }
384
385    /// Add metadata
386    pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
387        self.metadata.insert(key.into(), value);
388        self
389    }
390
391    /// Record step execution
392    pub fn record_step(&mut self, step_execution: StepExecution) {
393        self.execution_history.push(step_execution);
394    }
395
396    /// Get total execution time
397    pub fn total_execution_time(&self) -> u64 {
398        self.execution_history
399            .iter()
400            .map(|step| step.duration_ms)
401            .sum()
402    }
403
404    /// Check if any step failed
405    pub fn has_failures(&self) -> bool {
406        self.execution_history.iter().any(|step| !step.success)
407    }
408}
409
410/// Core trait for implementing pipeline steps
411///
412/// Each pipeline step implements this trait to define its behavior, input/output types,
413/// dependencies, and execution logic. Steps are composable building blocks that can
414/// be combined to create complex processing workflows.
415///
416/// # Design Principles
417///
418/// - **Single Responsibility**: Each step should do one thing well
419/// - **Type Safety**: Declare input/output types for validation
420/// - **Async First**: All execution is async for better concurrency
421/// - **Error Handling**: Comprehensive error reporting with context
422/// - **Monitoring**: Built-in execution tracking and metrics
423///
424/// # Example Implementation
425///
426/// ```rust
427/// use rrag::prelude::*;
428/// use async_trait::async_trait;
429///
430/// struct UppercaseStep;
431///
432/// #[async_trait]
433/// impl PipelineStep for UppercaseStep {
434///     fn name(&self) -> &str { "uppercase_text" }
435///     fn description(&self) -> &str { "Converts text to uppercase" }
436///     fn input_types(&self) -> Vec<&'static str> { vec!["Text"] }
437///     fn output_type(&self) -> &'static str { "Text" }
438///
439///     async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
440///         match &context.data {
441///             PipelineData::Text(text) => {
442///                 context.data = PipelineData::Text(text.to_uppercase());
443///                 Ok(context)
444///             }
445///             _ => Err(RragError::document_processing("Expected Text input"))
446///         }
447///     }
448/// }
449/// ```
450///
451/// # Parallel Execution
452///
453/// Steps can declare whether they support parallel execution:
454///
455/// ```rust
456/// # use rrag::prelude::*;
457/// # use async_trait::async_trait;
458/// # struct MyStep;
459/// # #[async_trait]
460/// # impl PipelineStep for MyStep {
461/// #   fn name(&self) -> &str { "my_step" }
462/// #   fn description(&self) -> &str { "description" }
463/// #   fn input_types(&self) -> Vec<&'static str> { vec!["Text"] }
464/// #   fn output_type(&self) -> &'static str { "Text" }
465/// #   async fn execute(&self, context: PipelineContext) -> RragResult<PipelineContext> { Ok(context) }
466/// // Override to disable parallelization for stateful operations
467/// fn is_parallelizable(&self) -> bool {
468///     false // This step cannot run in parallel
469/// }
470///
471/// // Declare dependencies on other steps
472/// fn dependencies(&self) -> Vec<&str> {
473///     vec!["preprocessing", "validation"]
474/// }
475/// # }
476/// ```
477#[async_trait]
478pub trait PipelineStep: Send + Sync {
479    /// Step name/identifier
480    fn name(&self) -> &str;
481
482    /// Step description
483    fn description(&self) -> &str;
484
485    /// Input data types this step accepts
486    fn input_types(&self) -> Vec<&'static str>;
487
488    /// Output data type this step produces
489    fn output_type(&self) -> &'static str;
490
491    /// Execute the step
492    async fn execute(&self, context: PipelineContext) -> RragResult<PipelineContext>;
493
494    /// Validate input data
495    fn validate_input(&self, _data: &PipelineData) -> RragResult<()> {
496        // Default implementation - override for custom validation
497        Ok(())
498    }
499
500    /// Whether this step can run in parallel with others
501    fn is_parallelizable(&self) -> bool {
502        true
503    }
504
505    /// Dependencies on other steps (step names)
506    fn dependencies(&self) -> Vec<&str> {
507        Vec::new()
508    }
509}
510
511/// Built-in text preprocessing step for content normalization
512///
513/// Applies a sequence of text transformations to clean and normalize content
514/// before further processing. Supports common operations like whitespace
515/// normalization, case conversion, and special character handling.
516///
517/// # Supported Operations
518///
519/// - **Whitespace Normalization**: Collapse multiple spaces into single spaces
520/// - **Case Conversion**: Convert text to lowercase for consistency
521/// - **Special Character Removal**: Remove non-alphanumeric characters
522/// - **Regex Replacement**: Custom pattern-based text replacement
523///
524/// # Example
525///
526/// ```rust
527/// use rrag::prelude::*;
528///
529/// let step = TextPreprocessingStep::new(vec![
530///     TextOperation::NormalizeWhitespace,
531///     TextOperation::RemoveSpecialChars,
532///     TextOperation::ToLowercase,
533/// ]);
534///
535/// // Can also be built fluently
536/// let step = TextPreprocessingStep::new(vec![])
537///     .with_operation(TextOperation::NormalizeWhitespace)
538///     .with_operation(TextOperation::RegexReplace {
539///         pattern: r"\d+".to_string(),
540///         replacement: "[NUMBER]".to_string(),
541///     });
542/// ```
543///
544/// # Performance
545///
546/// - Operations are applied in sequence for predictable results
547/// - String allocations are minimized where possible
548/// - Regex operations are compiled once and reused
549/// - Supports batch processing for multiple documents
550pub struct TextPreprocessingStep {
551    /// Preprocessing operations to apply
552    operations: Vec<TextOperation>,
553}
554
555/// Text preprocessing operations for document processing
556#[derive(Debug, Clone)]
557pub enum TextOperation {
558    /// Convert to lowercase
559    ToLowercase,
560
561    /// Remove extra whitespace
562    NormalizeWhitespace,
563
564    /// Remove special characters
565    RemoveSpecialChars,
566
567    /// Custom regex replacement
568    RegexReplace {
569        /// Regular expression pattern to match
570        pattern: String,
571        /// Replacement string for matched patterns
572        replacement: String,
573    },
574}
575
576impl TextPreprocessingStep {
577    /// Create a new text preprocessing step with specified operations
578    pub fn new(operations: Vec<TextOperation>) -> Self {
579        Self { operations }
580    }
581
582    fn process_text(&self, text: &str) -> String {
583        let mut result = text.to_string();
584
585        for operation in &self.operations {
586            result = match operation {
587                TextOperation::ToLowercase => result.to_lowercase(),
588                TextOperation::NormalizeWhitespace => {
589                    result.split_whitespace().collect::<Vec<_>>().join(" ")
590                }
591                TextOperation::RemoveSpecialChars => result
592                    .chars()
593                    .filter(|c| c.is_alphanumeric() || c.is_whitespace())
594                    .collect(),
595                TextOperation::RegexReplace {
596                    pattern,
597                    replacement,
598                } => {
599                    // Simple implementation - in production would use regex crate
600                    result.replace(pattern, replacement)
601                }
602            };
603        }
604
605        result
606    }
607}
608
609#[async_trait]
610impl PipelineStep for TextPreprocessingStep {
611    fn name(&self) -> &str {
612        "text_preprocessing"
613    }
614
615    fn description(&self) -> &str {
616        "Preprocesses text data with various normalization operations"
617    }
618
619    fn input_types(&self) -> Vec<&'static str> {
620        vec!["Text", "Document", "Documents"]
621    }
622
623    fn output_type(&self) -> &'static str {
624        "Text|Document|Documents"
625    }
626
627    async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
628        let start_time = Instant::now();
629        let step_start = chrono::Utc::now();
630
631        let processed_data = match &context.data {
632            PipelineData::Text(text) => PipelineData::Text(self.process_text(text)),
633            PipelineData::Document(doc) => {
634                // Create a new document with processed content
635                let processed_content = self.process_text(doc.content_str());
636                let mut new_doc = Document::new(processed_content);
637                new_doc.id = doc.id.clone();
638                new_doc.metadata = doc.metadata.clone();
639                new_doc.content_hash = doc.content_hash.clone();
640                new_doc.created_at = doc.created_at;
641                PipelineData::Document(new_doc)
642            }
643            PipelineData::Documents(docs) => {
644                let processed_docs: Vec<Document> = docs
645                    .iter()
646                    .map(|doc| {
647                        let processed_content = self.process_text(doc.content_str());
648                        let mut new_doc = Document::new(processed_content);
649                        new_doc.id = doc.id.clone();
650                        new_doc.metadata = doc.metadata.clone();
651                        new_doc.content_hash = doc.content_hash.clone();
652                        new_doc.created_at = doc.created_at;
653                        new_doc
654                    })
655                    .collect();
656                PipelineData::Documents(processed_docs)
657            }
658            _ => {
659                let error = "Input must be Text, Document, or Documents";
660                context.record_step(StepExecution {
661                    step_id: self.name().to_string(),
662                    start_time: step_start,
663                    duration_ms: start_time.elapsed().as_millis() as u64,
664                    success: false,
665                    error_message: Some(error.to_string()),
666                    metadata: HashMap::new(),
667                });
668                return Err(RragError::document_processing(error));
669            }
670        };
671
672        context.data = processed_data;
673
674        context.record_step(StepExecution {
675            step_id: self.name().to_string(),
676            start_time: step_start,
677            duration_ms: start_time.elapsed().as_millis() as u64,
678            success: true,
679            error_message: None,
680            metadata: HashMap::new(),
681        });
682
683        Ok(context)
684    }
685}
686
687/// Document chunking step
688pub struct DocumentChunkingStep {
689    /// Document chunker instance
690    chunker: DocumentChunker,
691}
692
693impl DocumentChunkingStep {
694    /// Create a new document chunking step with the specified chunker
695    pub fn new(chunker: DocumentChunker) -> Self {
696        Self { chunker }
697    }
698}
699
700#[async_trait]
701impl PipelineStep for DocumentChunkingStep {
702    fn name(&self) -> &str {
703        "document_chunking"
704    }
705
706    fn description(&self) -> &str {
707        "Splits documents into smaller chunks for processing"
708    }
709
710    fn input_types(&self) -> Vec<&'static str> {
711        vec!["Document", "Documents"]
712    }
713
714    fn output_type(&self) -> &'static str {
715        "Chunks"
716    }
717
718    async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
719        let start_time = Instant::now();
720        let step_start = chrono::Utc::now();
721
722        let chunks = match &context.data {
723            PipelineData::Document(doc) => self.chunker.chunk_document(doc)?,
724            PipelineData::Documents(docs) => {
725                let mut all_chunks = Vec::new();
726                for doc in docs {
727                    all_chunks.extend(self.chunker.chunk_document(doc)?);
728                }
729                all_chunks
730            }
731            _ => {
732                let error = "Input must be Document or Documents";
733                context.record_step(StepExecution {
734                    step_id: self.name().to_string(),
735                    start_time: step_start,
736                    duration_ms: start_time.elapsed().as_millis() as u64,
737                    success: false,
738                    error_message: Some(error.to_string()),
739                    metadata: HashMap::new(),
740                });
741                return Err(RragError::document_processing(error));
742            }
743        };
744
745        context.data = PipelineData::Chunks(chunks);
746
747        context.record_step(StepExecution {
748            step_id: self.name().to_string(),
749            start_time: step_start,
750            duration_ms: start_time.elapsed().as_millis() as u64,
751            success: true,
752            error_message: None,
753            metadata: HashMap::new(),
754        });
755
756        Ok(context)
757    }
758}
759
760/// Embedding generation step
761pub struct EmbeddingStep {
762    /// Embedding service
763    embedding_service: Arc<EmbeddingService>,
764}
765
766impl EmbeddingStep {
767    /// Create a new embedding generation step with the specified service
768    pub fn new(embedding_service: Arc<EmbeddingService>) -> Self {
769        Self { embedding_service }
770    }
771}
772
773#[async_trait]
774impl PipelineStep for EmbeddingStep {
775    fn name(&self) -> &str {
776        "embedding_generation"
777    }
778
779    fn description(&self) -> &str {
780        "Generates embeddings for documents or chunks"
781    }
782
783    fn input_types(&self) -> Vec<&'static str> {
784        vec!["Document", "Documents", "Chunks"]
785    }
786
787    fn output_type(&self) -> &'static str {
788        "Embeddings"
789    }
790
791    async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
792        let start_time = Instant::now();
793        let step_start = chrono::Utc::now();
794
795        let embeddings = match &context.data {
796            PipelineData::Document(doc) => {
797                vec![self.embedding_service.embed_document(doc).await?]
798            }
799            PipelineData::Documents(docs) => self.embedding_service.embed_documents(docs).await?,
800            PipelineData::Chunks(chunks) => self.embedding_service.embed_chunks(chunks).await?,
801            _ => {
802                let error = "Input must be Document, Documents, or Chunks";
803                context.record_step(StepExecution {
804                    step_id: self.name().to_string(),
805                    start_time: step_start,
806                    duration_ms: start_time.elapsed().as_millis() as u64,
807                    success: false,
808                    error_message: Some(error.to_string()),
809                    metadata: HashMap::new(),
810                });
811                return Err(RragError::embedding("pipeline", error));
812            }
813        };
814
815        context.data = PipelineData::Embeddings(embeddings);
816
817        context.record_step(StepExecution {
818            step_id: self.name().to_string(),
819            start_time: step_start,
820            duration_ms: start_time.elapsed().as_millis() as u64,
821            success: true,
822            error_message: None,
823            metadata: HashMap::new(),
824        });
825
826        Ok(context)
827    }
828}
829
830/// Retrieval step for similarity search
831pub struct RetrievalStep {
832    /// Retrieval service
833    retrieval_service: Arc<RetrievalService>,
834
835    /// Search configuration
836    search_config: SearchStepConfig,
837}
838
839/// Configuration for search/retrieval step
840#[derive(Debug, Clone)]
841pub struct SearchStepConfig {
842    /// Number of results to retrieve
843    pub limit: usize,
844
845    /// Minimum similarity threshold
846    pub min_score: f32,
847
848    /// Search query text (if not using embeddings)
849    pub query_text: Option<String>,
850}
851
852impl Default for SearchStepConfig {
853    fn default() -> Self {
854        Self {
855            limit: 10,
856            min_score: 0.0,
857            query_text: None,
858        }
859    }
860}
861
862impl RetrievalStep {
863    /// Create a new retrieval step with default configuration
864    pub fn new(retrieval_service: Arc<RetrievalService>) -> Self {
865        Self {
866            retrieval_service,
867            search_config: SearchStepConfig::default(),
868        }
869    }
870
871    /// Create a new retrieval step with custom configuration
872    pub fn with_config(retrieval_service: Arc<RetrievalService>, config: SearchStepConfig) -> Self {
873        Self {
874            retrieval_service,
875            search_config: config,
876        }
877    }
878}
879
880#[async_trait]
881impl PipelineStep for RetrievalStep {
882    fn name(&self) -> &str {
883        "similarity_retrieval"
884    }
885
886    fn description(&self) -> &str {
887        "Performs similarity search using embeddings"
888    }
889
890    fn input_types(&self) -> Vec<&'static str> {
891        vec!["Embeddings"]
892    }
893
894    fn output_type(&self) -> &'static str {
895        "SearchResults"
896    }
897
898    async fn execute(&self, mut context: PipelineContext) -> RragResult<PipelineContext> {
899        let start_time = Instant::now();
900        let step_start = chrono::Utc::now();
901
902        let search_results = match &context.data {
903            PipelineData::Embeddings(embeddings) => {
904                if embeddings.is_empty() {
905                    Vec::new()
906                } else {
907                    // Use the first embedding as query (could be enhanced)
908                    let query_embedding = embeddings[0].clone();
909                    self.retrieval_service
910                        .search_embedding(query_embedding, Some(self.search_config.limit))
911                        .await?
912                }
913            }
914            _ => {
915                let error = "Input must be Embeddings";
916                context.record_step(StepExecution {
917                    step_id: self.name().to_string(),
918                    start_time: step_start,
919                    duration_ms: start_time.elapsed().as_millis() as u64,
920                    success: false,
921                    error_message: Some(error.to_string()),
922                    metadata: HashMap::new(),
923                });
924                return Err(RragError::retrieval(error));
925            }
926        };
927
928        context.data = PipelineData::SearchResults(search_results);
929
930        context.record_step(StepExecution {
931            step_id: self.name().to_string(),
932            start_time: step_start,
933            duration_ms: start_time.elapsed().as_millis() as u64,
934            success: true,
935            error_message: None,
936            metadata: HashMap::new(),
937        });
938
939        Ok(context)
940    }
941}
942
943/// Pipeline for composing multiple steps
944pub struct Pipeline {
945    /// Pipeline steps in execution order
946    steps: Vec<Arc<dyn PipelineStep>>,
947
948    /// Pipeline configuration
949    config: PipelineConfig,
950
951    /// Pipeline metadata
952    metadata: HashMap<String, serde_json::Value>,
953}
954
955impl Pipeline {
956    /// Create new pipeline
957    pub fn new() -> Self {
958        Self {
959            steps: Vec::new(),
960            config: PipelineConfig::default(),
961            metadata: HashMap::new(),
962        }
963    }
964
965    /// Create with configuration
966    pub fn with_config(config: PipelineConfig) -> Self {
967        Self {
968            steps: Vec::new(),
969            config,
970            metadata: HashMap::new(),
971        }
972    }
973
974    /// Add a step to the pipeline
975    pub fn add_step(mut self, step: Arc<dyn PipelineStep>) -> Self {
976        self.steps.push(step);
977        self
978    }
979
980    /// Add metadata
981    pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
982        self.metadata.insert(key.into(), value);
983        self
984    }
985
986    /// Execute the pipeline
987    pub async fn execute(&self, initial_data: PipelineData) -> RragResult<PipelineContext> {
988        let mut context = PipelineContext::with_config(initial_data, self.config.clone());
989
990        // Add pipeline metadata to context
991        context.metadata.extend(self.metadata.clone());
992
993        let start_time = Instant::now();
994
995        for step in &self.steps {
996            // Check timeout
997            if start_time.elapsed().as_secs() > self.config.max_execution_time {
998                return Err(RragError::timeout(
999                    "pipeline_execution",
1000                    self.config.max_execution_time * 1000,
1001                ));
1002            }
1003
1004            // Validate input
1005            if let Err(e) = step.validate_input(&context.data) {
1006                if !self.config.continue_on_error {
1007                    return Err(e);
1008                }
1009                // Record error and continue
1010                context.record_step(StepExecution {
1011                    step_id: step.name().to_string(),
1012                    start_time: chrono::Utc::now(),
1013                    duration_ms: 0,
1014                    success: false,
1015                    error_message: Some(e.to_string()),
1016                    metadata: HashMap::new(),
1017                });
1018                continue;
1019            }
1020
1021            // Execute step (clone context to satisfy borrow checker)
1022            let context_clone = PipelineContext {
1023                execution_id: context.execution_id.clone(),
1024                data: context.data.clone(),
1025                metadata: context.metadata.clone(),
1026                execution_history: context.execution_history.clone(),
1027                config: context.config.clone(),
1028            };
1029
1030            match step.execute(context_clone).await {
1031                Ok(new_context) => {
1032                    context = new_context;
1033                }
1034                Err(e) => {
1035                    if !self.config.continue_on_error {
1036                        return Err(e);
1037                    }
1038                    // Record error and continue with unchanged context
1039                    context.record_step(StepExecution {
1040                        step_id: step.name().to_string(),
1041                        start_time: chrono::Utc::now(),
1042                        duration_ms: 0,
1043                        success: false,
1044                        error_message: Some(e.to_string()),
1045                        metadata: HashMap::new(),
1046                    });
1047                }
1048            }
1049        }
1050
1051        Ok(context)
1052    }
1053
1054    /// Get pipeline step information
1055    pub fn get_step_info(&self) -> Vec<PipelineStepInfo> {
1056        self.steps
1057            .iter()
1058            .map(|step| PipelineStepInfo {
1059                name: step.name().to_string(),
1060                description: step.description().to_string(),
1061                input_types: step.input_types().iter().map(|s| s.to_string()).collect(),
1062                output_type: step.output_type().to_string(),
1063                is_parallelizable: step.is_parallelizable(),
1064                dependencies: step.dependencies().iter().map(|s| s.to_string()).collect(),
1065            })
1066            .collect()
1067    }
1068}
1069
1070impl Default for Pipeline {
1071    fn default() -> Self {
1072        Self::new()
1073    }
1074}
1075
1076/// Pipeline step information for introspection
1077#[derive(Debug, Clone, Serialize, Deserialize)]
1078pub struct PipelineStepInfo {
1079    /// Name of the pipeline step
1080    pub name: String,
1081    /// Description of what the step does
1082    pub description: String,
1083    /// Types of input data this step accepts
1084    pub input_types: Vec<String>,
1085    /// Type of output data this step produces
1086    pub output_type: String,
1087    /// Whether this step can run in parallel with others
1088    pub is_parallelizable: bool,
1089    /// Names of steps this step depends on
1090    pub dependencies: Vec<String>,
1091}
1092
1093/// Pre-built pipeline builder for common RAG workflows
1094pub struct RagPipelineBuilder {
1095    /// Embedding service
1096    embedding_service: Option<Arc<EmbeddingService>>,
1097
1098    /// Retrieval service
1099    retrieval_service: Option<Arc<RetrievalService>>,
1100
1101    /// Storage service
1102    storage_service: Option<Arc<StorageService>>,
1103
1104    /// Pipeline configuration
1105    config: PipelineConfig,
1106}
1107
1108impl RagPipelineBuilder {
1109    /// Create a new RAG pipeline builder
1110    pub fn new() -> Self {
1111        Self {
1112            embedding_service: None,
1113            retrieval_service: None,
1114            storage_service: None,
1115            config: PipelineConfig::default(),
1116        }
1117    }
1118
1119    /// Set the embedding service for the pipeline
1120    pub fn with_embedding_service(mut self, service: Arc<EmbeddingService>) -> Self {
1121        self.embedding_service = Some(service);
1122        self
1123    }
1124
1125    /// Set the retrieval service for the pipeline
1126    pub fn with_retrieval_service(mut self, service: Arc<RetrievalService>) -> Self {
1127        self.retrieval_service = Some(service);
1128        self
1129    }
1130
1131    /// Set the storage service for the pipeline
1132    pub fn with_storage_service(mut self, service: Arc<StorageService>) -> Self {
1133        self.storage_service = Some(service);
1134        self
1135    }
1136
1137    /// Set custom configuration for the pipeline
1138    pub fn with_config(mut self, config: PipelineConfig) -> Self {
1139        self.config = config;
1140        self
1141    }
1142
1143    /// Build document ingestion pipeline
1144    pub fn build_ingestion_pipeline(&self) -> RragResult<Pipeline> {
1145        let embedding_service = self
1146            .embedding_service
1147            .as_ref()
1148            .ok_or_else(|| RragError::config("embedding_service", "required", "missing"))?;
1149
1150        let pipeline = Pipeline::with_config(self.config.clone())
1151            .add_step(Arc::new(TextPreprocessingStep::new(vec![
1152                TextOperation::NormalizeWhitespace,
1153                TextOperation::ToLowercase,
1154            ])))
1155            .add_step(Arc::new(DocumentChunkingStep::new(DocumentChunker::new())))
1156            .add_step(Arc::new(EmbeddingStep::new(embedding_service.clone())));
1157
1158        Ok(pipeline)
1159    }
1160
1161    /// Build query pipeline for search
1162    pub fn build_query_pipeline(&self) -> RragResult<Pipeline> {
1163        let embedding_service = self
1164            .embedding_service
1165            .as_ref()
1166            .ok_or_else(|| RragError::config("embedding_service", "required", "missing"))?;
1167
1168        let retrieval_service = self
1169            .retrieval_service
1170            .as_ref()
1171            .ok_or_else(|| RragError::config("retrieval_service", "required", "missing"))?;
1172
1173        let pipeline = Pipeline::with_config(self.config.clone())
1174            .add_step(Arc::new(TextPreprocessingStep::new(vec![
1175                TextOperation::NormalizeWhitespace,
1176            ])))
1177            .add_step(Arc::new(EmbeddingStep::new(embedding_service.clone())))
1178            .add_step(Arc::new(RetrievalStep::new(retrieval_service.clone())));
1179
1180        Ok(pipeline)
1181    }
1182}
1183
1184impl Default for RagPipelineBuilder {
1185    fn default() -> Self {
1186        Self::new()
1187    }
1188}
1189
1190#[cfg(test)]
1191mod tests {
1192    use super::*;
1193    use crate::{Document, EmbeddingService, InMemoryRetriever, LocalEmbeddingProvider};
1194
1195    #[tokio::test]
1196    async fn test_text_preprocessing_step() {
1197        let step = TextPreprocessingStep::new(vec![
1198            TextOperation::ToLowercase,
1199            TextOperation::NormalizeWhitespace,
1200        ]);
1201
1202        let context = PipelineContext::new(PipelineData::Text("  HELLO    WORLD  ".to_string()));
1203        let result = step.execute(context).await.unwrap();
1204
1205        if let PipelineData::Text(processed) = result.data {
1206            assert_eq!(processed, "hello world");
1207        } else {
1208            panic!("Expected Text output");
1209        }
1210
1211        assert!(result.execution_history[0].success);
1212    }
1213
1214    #[tokio::test]
1215    async fn test_document_chunking_step() {
1216        let step = DocumentChunkingStep::new(DocumentChunker::new());
1217
1218        let doc = Document::new(
1219            "This is a test document with some content that should be chunked appropriately.",
1220        );
1221        let context = PipelineContext::new(PipelineData::Document(doc));
1222
1223        let result = step.execute(context).await.unwrap();
1224
1225        if let PipelineData::Chunks(chunks) = result.data {
1226            assert!(!chunks.is_empty());
1227        } else {
1228            panic!("Expected Chunks output");
1229        }
1230    }
1231
1232    #[tokio::test]
1233    async fn test_embedding_step() {
1234        let provider = Arc::new(LocalEmbeddingProvider::new("test-model", 128));
1235        let embedding_service = Arc::new(EmbeddingService::new(provider));
1236        let step = EmbeddingStep::new(embedding_service);
1237
1238        let doc = Document::new("Test document for embedding");
1239        let context = PipelineContext::new(PipelineData::Document(doc));
1240
1241        let result = step.execute(context).await.unwrap();
1242
1243        if let PipelineData::Embeddings(embeddings) = result.data {
1244            assert_eq!(embeddings.len(), 1);
1245            assert_eq!(embeddings[0].dimensions, 128);
1246        } else {
1247            panic!("Expected Embeddings output");
1248        }
1249    }
1250
1251    #[tokio::test]
1252    async fn test_pipeline_execution() {
1253        let provider = Arc::new(LocalEmbeddingProvider::new("test-model", 128));
1254        let embedding_service = Arc::new(EmbeddingService::new(provider));
1255
1256        let pipeline = Pipeline::new()
1257            .add_step(Arc::new(TextPreprocessingStep::new(vec![
1258                TextOperation::ToLowercase,
1259            ])))
1260            .add_step(Arc::new(EmbeddingStep::new(embedding_service)));
1261
1262        let doc = Document::new("TEST DOCUMENT");
1263        let result = pipeline.execute(PipelineData::Document(doc)).await.unwrap();
1264
1265        // Should have executed 2 steps
1266        assert_eq!(result.execution_history.len(), 2);
1267        assert!(result.execution_history.iter().all(|step| step.success));
1268
1269        // Final output should be embeddings
1270        if let PipelineData::Embeddings(embeddings) = result.data {
1271            assert_eq!(embeddings.len(), 1);
1272        } else {
1273            panic!("Expected Embeddings output");
1274        }
1275    }
1276
1277    #[tokio::test]
1278    async fn test_rag_pipeline_builder() {
1279        let provider = Arc::new(LocalEmbeddingProvider::new("test-model", 128));
1280        let embedding_service = Arc::new(EmbeddingService::new(provider));
1281
1282        let builder = RagPipelineBuilder::new().with_embedding_service(embedding_service);
1283
1284        let pipeline = builder.build_ingestion_pipeline().unwrap();
1285        let step_info = pipeline.get_step_info();
1286
1287        assert_eq!(step_info.len(), 3); // preprocessing, chunking, embedding
1288        assert_eq!(step_info[0].name, "text_preprocessing");
1289        assert_eq!(step_info[1].name, "document_chunking");
1290        assert_eq!(step_info[2].name, "embedding_generation");
1291    }
1292
1293    #[test]
1294    fn test_pipeline_context() {
1295        let mut context = PipelineContext::new(PipelineData::Text("test".to_string()))
1296            .with_metadata(
1297                "test_key",
1298                serde_json::Value::String("test_value".to_string()),
1299            );
1300
1301        assert_eq!(
1302            context.metadata.get("test_key").unwrap().as_str().unwrap(),
1303            "test_value"
1304        );
1305
1306        let step_execution = StepExecution {
1307            step_id: "test_step".to_string(),
1308            start_time: chrono::Utc::now(),
1309            duration_ms: 100,
1310            success: true,
1311            error_message: None,
1312            metadata: HashMap::new(),
1313        };
1314
1315        context.record_step(step_execution);
1316
1317        assert_eq!(context.execution_history.len(), 1);
1318        assert_eq!(context.total_execution_time(), 100);
1319        assert!(!context.has_failures());
1320    }
1321}