midstream 0.2.0

Real-time LLM streaming with inflight analysis
Documentation
#[cfg(test)]
mod tests {
    use crate::midstream::{Midstream, StreamProcessor, Intent, LLMClient, HyprService, ToolIntegration, MetricRecord, TimeWindow, AggregateFunction};
    use std::time::Duration;
    use bytes::Bytes;
    use mockall::*;
    use futures::stream::{self, BoxStream};

    type BoxError = Box<dyn std::error::Error>;

    mock! {
        pub LLMClient {}
        impl LLMClient for LLMClient {
            fn stream(&self) -> BoxStream<'static, Bytes>;
        }
    }

    mock! {
        pub HyprService {}
        impl HyprService for HyprService {
            fn ingest_metric(&self, metric: MetricRecord) -> Result<(), BoxError>;
            fn query_aggregate(&self, window: TimeWindow, func: AggregateFunction) -> Result<f64, BoxError>;
        }
    }

    mock! {
        pub ToolClient {}
        impl ToolIntegration for ToolClient {
            fn handle_weather_intent(&self, content: &str) -> Result<String, BoxError>;
            fn handle_calendar_intent(&self, content: &str) -> Result<String, BoxError>;
        }
    }

    #[tokio::test]
    async fn test_stream_processing_with_metrics() {
        let mut mock_llm = MockLLMClient::new();
        let mut mock_hypr = MockHyprService::new();
        
        mock_llm.expect_stream()
            .times(1)
            .return_once(move || {
                Box::pin(stream::iter(vec![
                    Bytes::from_static(b"Process"),
                    Bytes::from_static(b"this"),
                    Bytes::from_static(b"stream"),
                ]))
            });

        mock_hypr.expect_ingest_metric()
            .returning(|_| Ok(()));

        let midstream = Midstream::new(
            Box::new(mock_llm),
            Box::new(mock_hypr),
        );

        let result = midstream.process_stream().await;
        assert!(result.is_ok());

        let metrics = midstream.get_metrics().await;
        assert!(!metrics.is_empty());
    }

    #[tokio::test]
    async fn test_real_time_aggregation() {
        let mut mock_llm = MockLLMClient::new();
        let mut mock_hypr = MockHyprService::new();

        mock_hypr.expect_query_aggregate()
            .times(1)
            .return_once(|_, _| Ok(0.75));

        let midstream = Midstream::new(
            Box::new(mock_llm),
            Box::new(mock_hypr),
        );

        let avg = midstream.get_average_sentiment(Duration::from_secs(300)).await;
        assert!(avg.is_ok());
        assert_eq!(avg.unwrap(), 0.75);
    }

    #[tokio::test]
    async fn test_error_handling() {
        let mut mock_llm = MockLLMClient::new();
        let mut mock_hypr = MockHyprService::new();

        mock_hypr.expect_ingest_metric()
            .times(1)
            .return_once(|_| Err("Ingestion error".into()));

        mock_llm.expect_stream()
            .times(1)
            .return_once(|| {
                Box::pin(stream::iter(vec![Bytes::from_static(b"test message")]))
            });

        let midstream = Midstream::new(
            Box::new(mock_llm),
            Box::new(mock_hypr),
        );

        let result = midstream.process_stream().await;
        assert!(result.is_err());
        assert!(result.unwrap_err().to_string().contains("Failed to ingest metric"));
    }

    #[tokio::test]
    async fn test_empty_stream() {
        let mut mock_llm = MockLLMClient::new();
        let mut mock_hypr = MockHyprService::new();

        mock_llm.expect_stream()
            .times(1)
            .return_once(|| {
                Box::pin(stream::iter(Vec::<Bytes>::new()))
            });

        let midstream = Midstream::new(
            Box::new(mock_llm),
            Box::new(mock_hypr),
        );

        let result = midstream.process_stream().await;
        assert!(result.is_ok());
        assert!(result.unwrap().is_empty());
    }

    #[tokio::test]
    async fn test_large_message_processing() {
        let mut mock_llm = MockLLMClient::new();
        let mut mock_hypr = MockHyprService::new();
        
        let large_message = Bytes::from(vec![b'x'; 1_000_000]);
        mock_llm.expect_stream()
            .times(1)
            .return_once(move || {
                Box::pin(stream::iter(vec![large_message]))
            });

        mock_hypr.expect_ingest_metric()
            .returning(|_| Ok(()));

        let midstream = Midstream::new(
            Box::new(mock_llm),
            Box::new(mock_hypr),
        );

        let result = midstream.process_stream().await;
        assert!(result.is_ok());
        
        let messages = result.unwrap();
        assert_eq!(messages.len(), 1);
        assert_eq!(messages[0].content.len(), 1_000_000);
    }

    #[tokio::test]
    async fn test_inflight_decision_making() {
        let mut mock_llm = MockLLMClient::new();
        let mut mock_hypr = MockHyprService::new();
        let mut mock_tool = MockToolClient::new();

        mock_llm.expect_stream()
            .times(1)
            .return_once(|| {
                Box::pin(stream::iter(vec![
                    Bytes::from_static(b"URGENT: What's the weather"),
                ]))
            });

        mock_tool.expect_handle_weather_intent()
            .times(1)
            .return_once(|_| Ok("Weather info (urgent response)".to_string()));

        mock_hypr.expect_ingest_metric()
            .returning(|_| Ok(()));

        let midstream = Midstream::with_tool_integration(
            Box::new(mock_llm),
            Box::new(mock_hypr),
            Box::new(mock_tool),
        );

        let result = midstream.process_stream().await;
        assert!(result.is_ok());
        let messages = result.unwrap();

        assert_eq!(messages[0].intent, Some(Intent::Weather));
        assert!(messages[0].tool_response.as_ref().unwrap().contains("urgent response"));
    }

    #[tokio::test]
    async fn test_empty_message_handling() {
        let mut mock_llm = MockLLMClient::new();
        let mut mock_hypr = MockHyprService::new();

        mock_llm.expect_stream()
            .times(1)
            .return_once(|| {
                Box::pin(stream::iter(vec![Bytes::new()]))
            });

        let midstream = Midstream::new(
            Box::new(mock_llm),
            Box::new(mock_hypr),
        );

        let result = midstream.process_stream().await;
        assert!(result.is_err());
        assert!(result.unwrap_err().to_string().contains("Empty message content"));
    }
}