trustformers 0.1.1

TrustformeRS - Rust port of Hugging Face Transformers
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
//! Main streaming pipeline implementation for conversational AI.
//!
//! This module provides the central orchestrator for streaming conversational responses,
//! integrating all specialized streaming components including chunking, typing simulation,
//! quality analysis, backpressure control, and state management for natural, real-time
//! conversational experiences.
//!
//! # Architecture
//!
//! The `ConversationalStreamingPipeline` serves as the main coordinator that:
//! - Orchestrates the complete streaming workflow from input to output
//! - Integrates specialized components (chunking, typing simulation, quality analysis, etc.)
//! - Manages pipeline-wide configuration and settings
//! - Handles stream lifecycle management and resource cleanup
//! - Provides comprehensive error handling and recovery mechanisms
//! - Coordinates component interactions for optimal streaming performance
//!
//! # Key Features
//!
//! - **End-to-end streaming orchestration**: Complete workflow management from user input to streaming output
//! - **Component integration**: Seamless coordination of all streaming subsystems
//! - **Adaptive streaming**: Dynamic adjustment based on content, quality, and performance metrics
//! - **Quality-driven delivery**: Continuous quality monitoring and optimization
//! - **Graceful resource management**: Proper lifecycle management and cleanup
//! - **Comprehensive error handling**: Pipeline-level error propagation and recovery
//!
//! # Usage
//!
//! ```rust,ignore
//! use trustformers::pipeline::conversational::streaming::pipeline::ConversationalStreamingPipeline;
//! use trustformers::pipeline::conversational::streaming::types::AdvancedStreamingConfig;
//!
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! # let model = todo!();
//! # let tokenizer = todo!();
//! # let input = todo!();
//! # let conversation_state = todo!();
//! // Create pipeline with model and tokenizer
//! let pipeline = ConversationalStreamingPipeline::new(
//!     model,
//!     tokenizer,
//!     AdvancedStreamingConfig::default()
//! );
//!
//! // Generate streaming response
//! let stream = pipeline.generate_streaming_response(input, &conversation_state).await?;
//! # Ok(())
//! # }
//! ```

use super::backpressure::{BackpressureController, FlowAction as BackpressureFlowAction};
use super::chunking::ResponseChunker;
use super::coordinator::{GlobalStreamingMetrics, StreamingCoordinator};
use super::quality_analyzer::{QualityAnalyzer, StreamingQuality};
use super::state_management::StreamStateManager;
use super::types::*;
use super::typing_simulation::TypingSimulator;
use crate::core::error::Result;
use crate::core::traits::{Model, Tokenizer};
use crate::pipeline::conversational::types::*;

use async_stream::stream;
use futures::Stream;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use trustformers_models::common_patterns::{
    GenerationConfig as ModelsGenerationConfig, GenerativeModel,
};
use uuid::Uuid;

/// Main streaming pipeline for conversational responses.
///
/// This is the central orchestrator that brings together all streaming components
/// to provide a comprehensive, high-quality streaming conversational experience.
/// It manages the complete lifecycle of streaming responses from input processing
/// to final delivery, ensuring optimal performance, quality, and user experience.
///
/// # Architecture
///
/// The pipeline coordinates the following components:
/// - **StreamingCoordinator**: Manages streaming sessions and global coordination
/// - **ResponseChunker**: Handles intelligent response chunking strategies
/// - **TypingSimulator**: Provides natural typing speed and pause simulation
/// - **StreamStateManager**: Manages streaming state transitions and monitoring
/// - **BackpressureController**: Handles flow control and buffer management
/// - **QualityAnalyzer**: Continuous quality monitoring and optimization
///
/// # Type Parameters
///
/// - `M`: Model type that implements `Model + Send + Sync + GenerativeModel`
/// - `T`: Tokenizer type that implements `Tokenizer + Send + Sync`
///
/// # Thread Safety
///
/// This pipeline is fully thread-safe and designed for concurrent use across
/// multiple streaming sessions. All internal components use appropriate
/// synchronization mechanisms.
pub struct ConversationalStreamingPipeline<M, T>
where
    M: Model + Send + Sync + GenerativeModel,
    T: Tokenizer + Send + Sync,
{
    /// Model reference for text generation
    model: Arc<M>,
    /// Tokenizer reference for text processing
    tokenizer: Arc<T>,
    /// Streaming coordinator for session management
    coordinator: StreamingCoordinator,
    /// Response chunker for intelligent content segmentation
    chunker: ResponseChunker,
    /// Typing simulator for natural delivery timing
    typing_simulator: TypingSimulator,
    /// State manager for streaming state coordination
    state_manager: StreamStateManager,
    /// Backpressure controller for flow management
    backpressure_controller: BackpressureController,
    /// Quality analyzer for continuous quality monitoring
    quality_analyzer: QualityAnalyzer,
    /// Pipeline configuration
    config: AdvancedStreamingConfig,
}

impl<M, T> ConversationalStreamingPipeline<M, T>
where
    M: Model + Send + Sync + GenerativeModel,
    T: Tokenizer + Send + Sync,
{
    /// Creates a new streaming pipeline with the specified model, tokenizer, and configuration.
    ///
    /// # Arguments
    ///
    /// * `model` - The generative model for text generation
    /// * `tokenizer` - The tokenizer for text processing
    /// * `config` - Advanced streaming configuration
    ///
    /// # Returns
    ///
    /// A new `ConversationalStreamingPipeline` instance ready for streaming operations.
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// # use trustformers::pipeline::conversational::streaming::pipeline::ConversationalStreamingPipeline;
    /// # use trustformers::pipeline::conversational::streaming::types::AdvancedStreamingConfig;
    /// # let my_model = todo!();
    /// # let my_tokenizer = todo!();
    /// let pipeline = ConversationalStreamingPipeline::new(
    ///     my_model,
    ///     my_tokenizer,
    ///     AdvancedStreamingConfig::default()
    /// );
    /// ```
    pub fn new(model: M, tokenizer: T, config: AdvancedStreamingConfig) -> Self {
        let coordinator = StreamingCoordinator::new(config.clone());
        let chunker = ResponseChunker::new(ChunkingStrategy::Adaptive, config.clone());
        let typing_simulator = TypingSimulator::new(config.clone());
        let state_manager = StreamStateManager::new(config.clone());
        let backpressure_controller = BackpressureController::new(config.clone());
        let quality_analyzer = QualityAnalyzer::new();

        Self {
            model: Arc::new(model),
            tokenizer: Arc::new(tokenizer),
            coordinator,
            chunker,
            typing_simulator,
            state_manager,
            backpressure_controller,
            quality_analyzer,
            config,
        }
    }

    /// Generates a streaming response for the given conversational input.
    ///
    /// This is the main entry point for streaming response generation. It orchestrates
    /// the complete streaming workflow:
    /// 1. Creates a new streaming session
    /// 2. Updates streaming state
    /// 3. Builds conversational context
    /// 4. Generates the full response using the model
    /// 5. Analyzes response metadata
    /// 6. Chunks the response intelligently
    /// 7. Creates and returns the streaming implementation
    ///
    /// # Arguments
    ///
    /// * `input` - The conversational input containing user message and context
    /// * `conversation_state` - Current conversation state and history
    ///
    /// # Returns
    ///
    /// A pinned boxed stream of `ExtendedStreamingResponse` items that can be
    /// consumed by the client for real-time response delivery.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - Session creation fails
    /// - State management fails
    /// - Context building fails
    /// - Model generation fails
    /// - Stream creation fails
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// # use futures::stream::StreamExt;
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// # let pipeline = todo!();
    /// # let conversational_input = todo!();
    /// # let conversation_state = todo!();
    /// let mut stream = pipeline.generate_streaming_response(
    ///     conversational_input,
    ///     &conversation_state
    /// ).await?;
    ///
    /// // Consume the stream
    /// while let Some(response) = stream.next().await {
    ///     match response {
    ///         Ok(extended_response) => {
    ///             // Process streaming response
    ///             println!("Chunk: {}", extended_response.base_response.chunk);
    ///         }
    ///         Err(e) => {
    ///             // Handle streaming error
    ///             eprintln!("Streaming error: {}", e);
    ///         }
    ///     }
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn generate_streaming_response(
        &self,
        input: ConversationalInput,
        conversation_state: &ConversationState,
    ) -> Result<Pin<Box<dyn Stream<Item = Result<ExtendedStreamingResponse>> + Send + '_>>> {
        // Create streaming session
        let session_id = self
            .coordinator
            .create_session(
                input.conversation_id.clone().unwrap_or_else(|| Uuid::new_v4().to_string()),
            )
            .await?;

        // Update state to streaming
        self.state_manager
            .update_state(
                StreamConnection::Streaming,
                "Starting streaming response".to_string(),
            )
            .await?;

        // Build context
        let context = self.build_streaming_context(&input, conversation_state)?;

        // Generate full response first (in practice, this would be token-by-token)
        let full_response = self.generate_full_response(&context).await?;

        // Analyze response metadata
        let metadata = self.analyze_response_metadata(&full_response, &input);

        // Chunk the response
        let chunks = self.chunker.chunk_response(&full_response, &metadata);

        // Create the streaming implementation
        let stream = self.create_chunk_stream(chunks, session_id, metadata).await?;

        Ok(Box::pin(stream))
    }

    /// Generates the full response using the model.
    ///
    /// This method handles the actual text generation using the configured model.
    /// It tokenizes the input context, configures generation parameters, and
    /// produces the complete response text that will later be chunked for streaming.
    ///
    /// # Arguments
    ///
    /// * `context` - The complete conversational context string
    ///
    /// # Returns
    ///
    /// The generated response text, cleaned and formatted.
    ///
    /// # Errors
    ///
    /// Returns an error if tokenization or generation fails.
    ///
    /// # Implementation Note
    ///
    /// In a production implementation, this would typically generate tokens
    /// incrementally for true streaming. This current implementation generates
    /// the full response first for simplicity.
    async fn generate_full_response(&self, context: &str) -> Result<String> {
        // Tokenize context
        let tokenized = self.tokenizer.encode(context)?;

        // Create generation config
        let gen_config = ModelsGenerationConfig {
            max_new_tokens: self.config.base_config.chunk_size * 20, // Rough estimate
            temperature: 0.7,
            top_p: 0.9,
            top_k: Some(50),
            do_sample: true,
            early_stopping: true,
            repetition_penalty: 1.1,
            length_penalty: 1.0,
            ..ModelsGenerationConfig::default()
        };

        // Generate response
        let response = self.model.generate(context, &gen_config)?;

        Ok(self.clean_response(&response))
    }

    /// Cleans the generated response by removing artifacts and ensuring proper formatting.
    ///
    /// # Arguments
    ///
    /// * `response` - Raw response text from the model
    ///
    /// # Returns
    ///
    /// Cleaned and formatted response text ready for chunking.
    fn clean_response(&self, response: &str) -> String {
        let mut cleaned = response.trim().to_string();

        // Remove generation artifacts
        cleaned = cleaned.replace("<|endoftext|>", "");
        cleaned = cleaned.replace("<|end|>", "");

        // Ensure proper ending
        if !cleaned.ends_with(['.', '!', '?']) && !cleaned.is_empty() {
            cleaned.push('.');
        }

        cleaned
    }

    /// Builds the conversational context from input and conversation state.
    ///
    /// This method constructs the complete context string that will be sent to
    /// the model for generation. It includes recent conversation history and
    /// the current user input, formatted appropriately for the model.
    ///
    /// # Arguments
    ///
    /// * `input` - Current conversational input
    /// * `conversation_state` - Complete conversation state and history
    ///
    /// # Returns
    ///
    /// The formatted context string ready for model generation.
    ///
    /// # Context Structure
    ///
    /// The context includes:
    /// - Recent conversation turns (up to 2000 characters)
    /// - Current user input
    /// - Proper role formatting (User/Assistant/System)
    fn build_streaming_context(
        &self,
        input: &ConversationalInput,
        conversation_state: &ConversationState,
    ) -> Result<String> {
        let mut context = String::new();

        // Add recent conversation context
        let recent_turns = conversation_state.get_recent_context(2000);
        for turn in recent_turns {
            let role_str = match turn.role {
                ConversationRole::User => "User",
                ConversationRole::Assistant => "Assistant",
                ConversationRole::System => "System",
            };
            context.push_str(&format!("{}: {}\n", role_str, turn.content));
        }

        // Add current input
        context.push_str(&format!("User: {}\nAssistant:", input.message));

        Ok(context)
    }

    /// Analyzes the response to generate metadata.
    ///
    /// This method performs analysis on the generated response to extract
    /// metadata such as sentiment, intent, confidence, topics, and quality scores.
    /// This metadata is used by downstream components for quality assessment
    /// and streaming optimization.
    ///
    /// # Arguments
    ///
    /// * `response` - The generated response text
    /// * `_input` - The original input (for future enhancement)
    ///
    /// # Returns
    ///
    /// Comprehensive metadata about the response including sentiment, topics,
    /// quality metrics, and engagement indicators.
    ///
    /// # Future Enhancement
    ///
    /// This is a simplified implementation. In production, this would include:
    /// - Advanced sentiment analysis
    /// - Intent classification
    /// - Entity extraction
    /// - Topic modeling
    /// - Safety analysis
    /// - Quality assessment
    fn analyze_response_metadata(
        &self,
        response: &str,
        _input: &ConversationalInput,
    ) -> ConversationMetadata {
        // Simple metadata analysis
        ConversationMetadata {
            sentiment: Some("neutral".to_string()),
            intent: Some("response".to_string()),
            confidence: 0.8,
            topics: vec!["conversation".to_string()],
            safety_flags: Vec::new(),
            entities: Vec::new(),
            quality_score: 0.8,
            engagement_level: EngagementLevel::Medium,
            reasoning_type: None,
        }
    }

    /// Creates the actual streaming implementation from response chunks.
    ///
    /// This is the core streaming implementation that orchestrates the delivery
    /// of response chunks with proper timing, quality monitoring, backpressure
    /// handling, and comprehensive metrics collection.
    ///
    /// # Arguments
    ///
    /// * `chunks` - Vector of response chunks ready for streaming
    /// * `session_id` - Unique session identifier for coordination
    /// * `metadata` - Response metadata for quality assessment
    ///
    /// # Returns
    ///
    /// An async stream that yields `ExtendedStreamingResponse` items with
    /// comprehensive streaming information.
    ///
    /// # Streaming Features
    ///
    /// - **Natural timing**: Typing speed simulation and natural pauses
    /// - **Quality monitoring**: Continuous quality assessment and reporting
    /// - **Backpressure handling**: Adaptive flow control based on buffer state
    /// - **Comprehensive metrics**: Real-time performance and quality metrics
    /// - **Error resilience**: Graceful handling of streaming errors
    /// - **Resource cleanup**: Proper session management and cleanup
    ///
    /// # Implementation Details
    ///
    /// The stream implementation:
    /// 1. Iterates through response chunks
    /// 2. Applies typing delay simulation
    /// 3. Monitors backpressure and adjusts flow
    /// 4. Analyzes chunk quality in real-time
    /// 5. Collects comprehensive metrics
    /// 6. Yields extended streaming responses
    /// 7. Applies natural pauses between chunks
    /// 8. Closes session upon completion
    async fn create_chunk_stream(
        &self,
        chunks: Vec<StreamChunk>,
        session_id: String,
        metadata: ConversationMetadata,
    ) -> Result<impl Stream<Item = Result<ExtendedStreamingResponse>> + Send + '_> {
        let total_chunks = chunks.len();

        let stream = stream! {
            let mut chunk_index = 0;
            let start_time = Instant::now();

            for chunk in chunks {
                // Calculate typing delay for natural delivery
                let typing_delay = self.typing_simulator.calculate_typing_delay(&chunk);

                // Apply natural pause for punctuation and content flow
                let natural_pause = self.typing_simulator.calculate_natural_pause(&chunk);

                // Wait for typing simulation to create natural delivery rhythm
                sleep(typing_delay).await;

                // Check backpressure and adjust flow accordingly
                let buffer_state = BufferState {
                    current_size: chunk_index * 50, // Rough estimate of buffer usage
                    max_size: self.config.max_buffer_size,
                    utilization: (chunk_index * 50) as f32 / self.config.max_buffer_size as f32,
                    pending_chunks: total_chunks - chunk_index,
                };

                // Apply backpressure adjustments if necessary
                let enhanced_buffer_state = super::backpressure::EnhancedBufferState::from(buffer_state.clone());
                if let Ok(actions) = self.backpressure_controller.monitor_and_adjust(&enhanced_buffer_state).await {
                    for action in actions {
                        match action {
                            BackpressureFlowAction::PauseFlow => {
                                sleep(Duration::from_millis(100)).await;
                            },
                            BackpressureFlowAction::DecreaseRate(_) => {
                                sleep(Duration::from_millis(50)).await;
                            },
                            _ => {},
                        }
                    }
                }

                // Analyze chunk quality for continuous optimization
                let quality_measurement = self.quality_analyzer.analyze_chunk_quality(&chunk, typing_delay).await;

                // Create comprehensive streaming metrics
                let elapsed = start_time.elapsed();
                let metrics = StreamingMetrics {
                    chunks_per_second: if elapsed.as_secs() > 0 {
                        chunk_index as f32 / elapsed.as_secs() as f32
                    } else {
                        0.0
                    },
                    avg_chunk_size: chunk.content.len() as f32,
                    total_chunks: chunk_index + 1,
                    bytes_streamed: (chunk_index + 1) * chunk.content.len(),
                    duration_ms: elapsed.as_millis() as u64,
                    buffer_utilization: buffer_state.utilization,
                    error_count: 0,
                    retry_count: 0,
                };

                // Create comprehensive quality assessment (using types::StreamingQuality)
                let quality = super::types::StreamingQuality {
                    smoothness: quality_measurement.smoothness,
                    naturalness: quality_measurement.naturalness,
                    responsiveness: quality_measurement.responsiveness,
                    coherence: quality_measurement.coherence,
                    overall_quality: (quality_measurement.smoothness +
                                    quality_measurement.naturalness +
                                    quality_measurement.responsiveness +
                                    quality_measurement.coherence) / 4.0,
                };

                // Create extended streaming response with comprehensive information
                let extended_response = ExtendedStreamingResponse {
                    base_response: StreamingResponse {
                        chunk: chunk.content.clone(),
                        is_final: chunk_index == total_chunks - 1,
                        chunk_index,
                        total_chunks: Some(total_chunks),
                        metadata: Some(metadata.clone()),
                    },
                    state: if chunk_index == total_chunks - 1 {
                        StreamingState::Completed
                    } else {
                        StreamingState::Streaming
                    },
                    timestamp: chrono::Utc::now(),
                    estimated_completion: if chunk_index < total_chunks - 1 {
                        let remaining_chunks = total_chunks - chunk_index - 1;
                        let estimated_remaining_ms = remaining_chunks as u64 * typing_delay.as_millis() as u64;
                        Some(chrono::Utc::now() + chrono::Duration::milliseconds(estimated_remaining_ms as i64))
                    } else {
                        None
                    },
                    metrics,
                    quality,
                };

                yield Ok(extended_response);

                // Apply natural pause for improved delivery rhythm
                if !natural_pause.is_zero() {
                    sleep(natural_pause).await;
                }

                chunk_index += 1;
            }

            // Close session and cleanup resources
            let _ = self.coordinator.close_session(&session_id).await;
        };

        Ok(stream)
    }

    /// Retrieves comprehensive streaming statistics.
    ///
    /// This method provides access to global streaming metrics including
    /// performance indicators, resource utilization, and quality trends
    /// across all active streaming sessions.
    ///
    /// # Returns
    ///
    /// Global streaming metrics including session counts, throughput,
    /// resource utilization, and performance indicators.
    ///
    /// # Use Cases
    ///
    /// - Performance monitoring
    /// - Resource optimization
    /// - Quality assessment
    /// - Debugging and diagnostics
    pub async fn get_streaming_stats(&self) -> Result<GlobalStreamingMetrics> {
        Ok(self.coordinator.get_global_metrics().await)
    }

    /// Retrieves current quality metrics.
    ///
    /// This method provides access to the current overall quality assessment
    /// including smoothness, naturalness, responsiveness, and coherence metrics.
    ///
    /// # Returns
    ///
    /// Current streaming quality metrics with detailed quality indicators.
    ///
    /// # Quality Metrics
    ///
    /// - **Smoothness**: Consistency of chunk delivery timing
    /// - **Naturalness**: Human-like typing and pause patterns
    /// - **Responsiveness**: Speed of response delivery
    /// - **Coherence**: Logical flow and chunk boundaries
    /// - **Overall Quality**: Composite quality score
    pub async fn get_quality_metrics(&self) -> Result<StreamingQuality> {
        Ok(self.quality_analyzer.calculate_overall_quality().await)
    }

    /// Cleans up expired streaming sessions.
    ///
    /// This method performs maintenance by removing expired sessions and
    /// freeing associated resources. It should be called periodically to
    /// prevent resource leaks in long-running applications.
    ///
    /// # Arguments
    ///
    /// * `max_age_minutes` - Maximum age in minutes for session retention
    ///
    /// # Returns
    ///
    /// The number of sessions that were cleaned up.
    ///
    /// # Resource Management
    ///
    /// This method ensures:
    /// - Memory cleanup for expired sessions
    /// - Resource deallocation
    /// - State consistency
    /// - Performance optimization
    ///
    /// # Recommended Usage
    ///
    /// Call this method periodically (e.g., every 30 minutes) to maintain
    /// optimal resource utilization:
    ///
    /// ```rust,ignore
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// # let pipeline = todo!();
    /// // Cleanup sessions older than 60 minutes
    /// let cleaned_count = pipeline.cleanup_sessions(60).await?;
    /// println!("Cleaned up {} expired sessions", cleaned_count);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn cleanup_sessions(&self, max_age_minutes: u64) -> Result<usize> {
        Ok(self.coordinator.cleanup_expired_sessions(max_age_minutes).await)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::pipeline::conversational::streaming::chunking::ResponseChunker;
    use crate::pipeline::conversational::streaming::types::{
        AdvancedStreamingConfig, ChunkingStrategy, StreamingQuality,
    };
    use crate::pipeline::conversational::types::{
        ConversationMetadata, ConversationRole, ConversationTurn, EngagementLevel,
    };

    fn default_adv_config() -> AdvancedStreamingConfig {
        AdvancedStreamingConfig::default()
    }

    fn empty_metadata() -> ConversationMetadata {
        ConversationMetadata {
            sentiment: None,
            intent: None,
            confidence: 0.0,
            topics: vec![],
            safety_flags: vec![],
            entities: vec![],
            quality_score: 0.0,
            engagement_level: EngagementLevel::Medium,
            reasoning_type: None,
        }
    }

    // ---- AdvancedStreamingConfig tests ----

    #[test]
    fn test_advanced_config_default_values() {
        let config = default_adv_config();
        assert!(
            config.adaptive_chunking,
            "adaptive_chunking should be true by default"
        );
        assert!(
            config.natural_pausing,
            "natural_pausing should be true by default"
        );
        assert!(
            config.enable_backpressure,
            "enable_backpressure should be true by default"
        );
        assert!(
            config.enable_error_recovery,
            "enable_error_recovery should be true by default"
        );
    }

    #[test]
    fn test_advanced_config_max_buffer_size_positive() {
        let config = default_adv_config();
        assert!(
            config.max_buffer_size > 0,
            "max_buffer_size must be positive"
        );
    }

    #[test]
    fn test_advanced_config_typing_speed_positive() {
        let config = default_adv_config();
        assert!(
            config.base_typing_speed > 0.0,
            "base_typing_speed must be positive"
        );
    }

    // ---- ResponseChunker tests ----

    #[test]
    fn test_chunker_fixed_size_produces_chunks() {
        let chunker = ResponseChunker::new(ChunkingStrategy::FixedSize(5), default_adv_config());
        let chunks = chunker.chunk_response("Hello World!", &empty_metadata());
        assert!(
            !chunks.is_empty(),
            "chunker must produce at least one chunk"
        );
    }

    #[test]
    fn test_chunker_word_boundary_preserves_content() {
        let chunker = ResponseChunker::new(ChunkingStrategy::WordBoundary, default_adv_config());
        let text = "The quick brown fox";
        let chunks = chunker.chunk_response(text, &empty_metadata());
        let reassembled: String = chunks
            .iter()
            .map(|c| c.content.clone())
            .collect::<Vec<_>>()
            .join(" ")
            .trim()
            .to_string();
        // Word boundary split may add/remove whitespace but words must be present
        for word in ["The", "quick", "brown", "fox"] {
            assert!(
                reassembled.contains(word),
                "reassembled text must contain original word '{}'",
                word
            );
        }
    }

    #[test]
    fn test_chunker_sentence_boundary_splits_on_period() {
        let chunker =
            ResponseChunker::new(ChunkingStrategy::SentenceBoundary, default_adv_config());
        let text = "First sentence. Second sentence. Third sentence.";
        let chunks = chunker.chunk_response(text, &empty_metadata());
        assert!(
            !chunks.is_empty(),
            "sentence boundary chunker must produce chunks"
        );
    }

    #[test]
    fn test_chunker_adaptive_produces_nonempty_chunks_for_nonempty_text() {
        let chunker = ResponseChunker::new(ChunkingStrategy::Adaptive, default_adv_config());
        let text = "This is a test of the adaptive chunking strategy.";
        let chunks = chunker.chunk_response(text, &empty_metadata());
        assert!(
            !chunks.is_empty(),
            "adaptive chunker must produce chunks for nonempty text"
        );
    }

    #[test]
    fn test_chunker_fixed_size_chunks_have_sequential_indices() {
        let chunker = ResponseChunker::new(ChunkingStrategy::FixedSize(3), default_adv_config());
        let chunks = chunker.chunk_response("abcdefghij", &empty_metadata());
        for (i, chunk) in chunks.iter().enumerate() {
            assert_eq!(
                chunk.index, i,
                "chunk index must be sequential starting from 0"
            );
        }
    }

    #[test]
    fn test_chunker_empty_text_produces_no_chunks() {
        let chunker = ResponseChunker::new(ChunkingStrategy::FixedSize(10), default_adv_config());
        let chunks = chunker.chunk_response("", &empty_metadata());
        assert!(chunks.is_empty(), "empty text should yield no chunks");
    }

    // ---- StreamingQuality tests ----

    #[test]
    fn test_streaming_quality_default_all_ones() {
        let quality = StreamingQuality::default();
        assert!(
            (quality.smoothness - 1.0).abs() < f32::EPSILON,
            "default smoothness must be 1.0"
        );
        assert!(
            (quality.naturalness - 1.0).abs() < f32::EPSILON,
            "default naturalness must be 1.0"
        );
        assert!(
            (quality.responsiveness - 1.0).abs() < f32::EPSILON,
            "default responsiveness must be 1.0"
        );
        assert!(
            (quality.coherence - 1.0).abs() < f32::EPSILON,
            "default coherence must be 1.0"
        );
        assert!(
            (quality.overall_quality - 1.0).abs() < f32::EPSILON,
            "default overall_quality must be 1.0"
        );
    }

    // ---- Streaming pipeline coordinator session lifecycle ----

    #[tokio::test]
    async fn test_pipeline_coordinator_session_lifecycle() {
        let coord = StreamingCoordinator::new(default_adv_config());
        assert_eq!(
            coord.get_active_session_count().await,
            0,
            "coordinator should start with 0 sessions"
        );
        let id = coord
            .create_session("conv-test".to_string())
            .await
            .expect("create_session must succeed");
        assert_eq!(
            coord.get_active_session_count().await,
            1,
            "one session must be active after creation"
        );
        coord.close_session(&id).await.expect("close_session must succeed");
        assert_eq!(
            coord.get_active_session_count().await,
            0,
            "count must be 0 after close"
        );
    }

    #[tokio::test]
    async fn test_pipeline_coordinator_cleanup_empty_returns_zero() {
        let coord = StreamingCoordinator::new(default_adv_config());
        let removed = coord.cleanup_expired_sessions(1).await;
        assert_eq!(removed, 0, "cleanup on empty coordinator must return 0");
    }

    #[tokio::test]
    async fn test_pipeline_get_global_metrics_initial() {
        let coord = StreamingCoordinator::new(default_adv_config());
        let metrics = coord.get_global_metrics().await;
        assert_eq!(
            metrics.active_streams, 0,
            "initial active_streams must be 0"
        );
        assert_eq!(
            metrics.total_streams_created, 0,
            "initial total_streams_created must be 0"
        );
    }

    #[tokio::test]
    async fn test_pipeline_metrics_update_after_session_create() {
        let coord = StreamingCoordinator::new(default_adv_config());
        let _id = coord
            .create_session("c1".to_string())
            .await
            .expect("create_session must succeed");
        let metrics = coord.get_global_metrics().await;
        assert_eq!(
            metrics.total_streams_created, 1,
            "total_streams_created must be 1 after one session"
        );
    }

    #[tokio::test]
    async fn test_pipeline_sessions_by_conversation() {
        let coord = StreamingCoordinator::new(default_adv_config());
        let _id1 = coord
            .create_session("conv-a".to_string())
            .await
            .expect("session 1 must succeed");
        let _id2 = coord
            .create_session("conv-a".to_string())
            .await
            .expect("session 2 must succeed");
        let sessions = coord.get_sessions_by_conversation("conv-a").await;
        assert_eq!(sessions.len(), 2, "two sessions for conv-a must be found");
    }
}