pjson_rs/infrastructure/integration/
streaming_adapter.rs

1// Clean Architecture Streaming Adapter with Zero-Cost GAT Abstractions
2//
3// This module provides the core streaming adapter trait that any web framework
4// must implement to support PJS streaming capabilities with zero-cost abstractions.
5//
6// REQUIRES: nightly Rust for `impl Trait` in associated types
7
8use super::object_pool::pooled_builders::PooledResponseBuilder;
9use super::simd_acceleration::{SimdConfig, SimdStreamProcessor};
10use crate::domain::value_objects::{JsonData, SessionId};
11use crate::stream::StreamFrame;
12use std::borrow::Cow;
13use std::collections::HashMap;
14use std::future::Future;
15
16/// Streaming format options for framework responses
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum StreamingFormat {
19    /// Standard JSON response
20    Json,
21    /// Newline-Delimited JSON (NDJSON)
22    Ndjson,
23    /// Server-Sent Events
24    ServerSentEvents,
25    /// Custom binary format
26    Binary,
27}
28
29impl StreamingFormat {
30    /// Get the MIME type for this format
31    pub fn content_type(&self) -> &'static str {
32        match self {
33            Self::Json => "application/json",
34            Self::Ndjson => "application/x-ndjson",
35            Self::ServerSentEvents => "text/event-stream",
36            Self::Binary => "application/octet-stream",
37        }
38    }
39
40    /// Detect format from Accept header
41    pub fn from_accept_header(accept: &str) -> Self {
42        if accept.contains("text/event-stream") {
43            Self::ServerSentEvents
44        } else if accept.contains("application/x-ndjson") {
45            Self::Ndjson
46        } else if accept.contains("application/octet-stream") {
47            Self::Binary
48        } else {
49            Self::Json
50        }
51    }
52
53    /// Check if format supports streaming
54    pub fn supports_streaming(&self) -> bool {
55        matches!(self, Self::Ndjson | Self::ServerSentEvents | Self::Binary)
56    }
57}
58
59/// Response body variants supported by the universal adapter
60#[derive(Debug, Clone)]
61pub enum ResponseBody {
62    Json(JsonData),
63    Stream(Vec<StreamFrame>),
64    ServerSentEvents(Vec<String>),
65    Binary(Vec<u8>),
66    Empty,
67}
68
69/// Universal response type for framework integration
70#[derive(Debug, Clone)]
71pub struct UniversalResponse {
72    pub status_code: u16,
73    pub headers: HashMap<Cow<'static, str>, Cow<'static, str>>,
74    pub body: ResponseBody,
75    pub content_type: Cow<'static, str>,
76}
77
78impl UniversalResponse {
79    /// Create a JSON response
80    pub fn json(data: JsonData) -> Self {
81        Self {
82            status_code: 200,
83            headers: HashMap::with_capacity(2),
84            body: ResponseBody::Json(data),
85            content_type: Cow::Borrowed("application/json"),
86        }
87    }
88
89    /// Create a JSON response using pooled HashMap (more efficient)
90    pub fn json_pooled(data: JsonData) -> Self {
91        let headers = super::object_pool::get_cow_hashmap().take();
92        Self {
93            status_code: 200,
94            headers,
95            body: ResponseBody::Json(data),
96            content_type: Cow::Borrowed("application/json"),
97        }
98    }
99
100    /// Create a streaming response
101    pub fn stream(frames: Vec<StreamFrame>) -> Self {
102        Self {
103            status_code: 200,
104            headers: HashMap::with_capacity(2),
105            body: ResponseBody::Stream(frames),
106            content_type: Cow::Borrowed("application/x-ndjson"),
107        }
108    }
109
110    /// Create a Server-Sent Events response
111    pub fn server_sent_events(events: Vec<String>) -> Self {
112        let mut headers = HashMap::with_capacity(4);
113        headers.insert(Cow::Borrowed("Cache-Control"), Cow::Borrowed("no-cache"));
114        headers.insert(Cow::Borrowed("Connection"), Cow::Borrowed("keep-alive"));
115
116        Self {
117            status_code: 200,
118            headers,
119            body: ResponseBody::ServerSentEvents(events),
120            content_type: Cow::Borrowed("text/event-stream"),
121        }
122    }
123
124    /// Create an error response
125    pub fn error(status: u16, message: impl Into<String>) -> Self {
126        let error_data = JsonData::Object({
127            let mut map = std::collections::HashMap::new();
128            map.insert("error".to_string(), JsonData::String(message.into()));
129            map.insert("status".to_string(), JsonData::Integer(status as i64));
130            map
131        });
132
133        Self {
134            status_code: status,
135            headers: HashMap::with_capacity(1),
136            body: ResponseBody::Json(error_data),
137            content_type: Cow::Borrowed("application/json"),
138        }
139    }
140
141    /// Add a header to the response  
142    pub fn with_header(
143        mut self,
144        name: impl Into<Cow<'static, str>>,
145        value: impl Into<Cow<'static, str>>,
146    ) -> Self {
147        self.headers.insert(name.into(), value.into());
148        self
149    }
150
151    /// Set the status code
152    pub fn with_status(mut self, status: u16) -> Self {
153        self.status_code = status;
154        self
155    }
156}
157
158/// Universal request type for framework integration
159#[derive(Debug, Clone)]
160pub struct UniversalRequest {
161    pub method: Cow<'static, str>,
162    pub path: String,
163    pub headers: HashMap<Cow<'static, str>, Cow<'static, str>>,
164    pub query_params: HashMap<String, String>,
165    pub body: Option<Vec<u8>>,
166}
167
168impl UniversalRequest {
169    /// Create a new universal request
170    pub fn new(method: impl Into<Cow<'static, str>>, path: impl Into<String>) -> Self {
171        Self {
172            method: method.into(),
173            path: path.into(),
174            headers: HashMap::with_capacity(4),
175            query_params: HashMap::with_capacity(2),
176            body: None,
177        }
178    }
179
180    /// Add a header
181    pub fn with_header(
182        mut self,
183        name: impl Into<Cow<'static, str>>,
184        value: impl Into<Cow<'static, str>>,
185    ) -> Self {
186        self.headers.insert(name.into(), value.into());
187        self
188    }
189
190    /// Add a query parameter
191    pub fn with_query(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
192        self.query_params.insert(name.into(), value.into());
193        self
194    }
195
196    /// Set the request body
197    pub fn with_body(mut self, body: Vec<u8>) -> Self {
198        self.body = Some(body);
199        self
200    }
201
202    /// Get a header value
203    pub fn get_header(&self, name: &str) -> Option<&Cow<'static, str>> {
204        self.headers.get(name)
205    }
206
207    /// Get a query parameter
208    pub fn get_query(&self, name: &str) -> Option<&String> {
209        self.query_params.get(name)
210    }
211
212    /// Check if request accepts a specific content type
213    pub fn accepts(&self, content_type: &str) -> bool {
214        if let Some(accept) = self
215            .get_header("accept")
216            .or_else(|| self.get_header("Accept"))
217        {
218            accept.contains(content_type)
219        } else {
220            false
221        }
222    }
223
224    /// Get preferred streaming format from headers
225    pub fn preferred_streaming_format(&self) -> StreamingFormat {
226        if let Some(accept) = self.get_header("accept") {
227            StreamingFormat::from_accept_header(accept)
228        } else {
229            StreamingFormat::Json
230        }
231    }
232}
233
234/// Framework integration errors
235#[derive(Debug, thiserror::Error)]
236pub enum IntegrationError {
237    #[error("Unsupported framework: {0}")]
238    UnsupportedFramework(String),
239
240    #[error("Request conversion failed: {0}")]
241    RequestConversion(String),
242
243    #[error("Response conversion failed: {0}")]
244    ResponseConversion(String),
245
246    #[error("Streaming not supported by framework")]
247    StreamingNotSupported,
248
249    #[error("Configuration error: {0}")]
250    Configuration(String),
251
252    #[error("SIMD processing error: {0}")]
253    SimdProcessing(String),
254}
255
256pub type IntegrationResult<T> = Result<T, IntegrationError>;
257
258/// High-performance streaming adapter using GAT zero-cost abstractions
259///
260/// This trait provides zero-cost abstractions for web framework integration
261/// using `impl Trait` in associated types (**requires nightly Rust**).
262///
263/// ## Performance Benefits
264/// - 1.82x faster trait dispatch vs async_trait
265/// - Zero heap allocations for futures  
266/// - Pure stack allocation with static dispatch
267/// - Complete inlining for hot paths
268pub trait StreamingAdapter: Send + Sync {
269    /// The framework's native request type
270    type Request;
271    /// The framework's native response type  
272    type Response;
273    /// The framework's error type
274    type Error: std::error::Error + Send + Sync + 'static;
275
276    // Zero-cost GAT futures with impl Trait - true zero-cost abstractions
277    type StreamingResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
278    where
279        Self: 'a;
280
281    type SseResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
282    where
283        Self: 'a;
284
285    type JsonResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
286    where
287        Self: 'a;
288
289    type MiddlewareFuture<'a>: Future<Output = IntegrationResult<UniversalResponse>> + Send + 'a
290    where
291        Self: 'a;
292
293    /// Convert framework request to universal format
294    fn convert_request(&self, request: Self::Request) -> IntegrationResult<UniversalRequest>;
295
296    /// Convert universal response to framework format
297    fn to_response(&self, response: UniversalResponse) -> IntegrationResult<Self::Response>;
298
299    /// Create a streaming response with priority-based frame delivery
300    fn create_streaming_response<'a>(
301        &'a self,
302        session_id: SessionId,
303        frames: Vec<StreamFrame>,
304        format: StreamingFormat,
305    ) -> Self::StreamingResponseFuture<'a>;
306
307    /// Create a Server-Sent Events response with SIMD acceleration
308    fn create_sse_response<'a>(
309        &'a self,
310        session_id: SessionId,
311        frames: Vec<StreamFrame>,
312    ) -> Self::SseResponseFuture<'a>;
313
314    /// Create a JSON response with optional streaming
315    fn create_json_response<'a>(
316        &'a self,
317        data: JsonData,
318        streaming: bool,
319    ) -> Self::JsonResponseFuture<'a>;
320
321    /// Handle framework-specific middleware integration
322    fn apply_middleware<'a>(
323        &'a self,
324        request: &'a UniversalRequest,
325        response: UniversalResponse,
326    ) -> Self::MiddlewareFuture<'a>;
327
328    /// Check if the framework supports streaming
329    fn supports_streaming(&self) -> bool {
330        true
331    }
332
333    /// Check if the framework supports Server-Sent Events
334    fn supports_sse(&self) -> bool {
335        true
336    }
337
338    /// Get the framework name for debugging/logging
339    fn framework_name(&self) -> &'static str;
340}
341
342/// Extension trait for additional convenience methods
343pub trait StreamingAdapterExt: StreamingAdapter {
344    /// Auto-detection future for streaming format
345    type AutoStreamFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
346    where
347        Self: 'a;
348
349    /// Error response future
350    type ErrorResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
351    where
352        Self: 'a;
353
354    /// Health check response future
355    type HealthResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
356    where
357        Self: 'a;
358
359    /// Auto-detect streaming format from request and create appropriate response
360    fn auto_stream_response<'a>(
361        &'a self,
362        request: &'a UniversalRequest,
363        session_id: SessionId,
364        frames: Vec<StreamFrame>,
365    ) -> Self::AutoStreamFuture<'a>;
366
367    /// Create an error response
368    fn create_error_response<'a>(
369        &'a self,
370        status: u16,
371        message: String,
372    ) -> Self::ErrorResponseFuture<'a>;
373
374    /// Create a health check response
375    fn create_health_response<'a>(&'a self) -> Self::HealthResponseFuture<'a>;
376}
377
378/// Helper functions for default implementations with zero-cost abstractions
379pub mod streaming_helpers {
380    use super::*;
381
382    /// Default SSE response implementation with SIMD acceleration
383    pub async fn default_sse_response<T: StreamingAdapter>(
384        adapter: &T,
385        session_id: SessionId,
386        frames: Vec<StreamFrame>,
387    ) -> IntegrationResult<T::Response> {
388        // Use SIMD-accelerated serialization for better performance
389        let config = SimdConfig::default();
390        let mut processor = SimdStreamProcessor::new(config);
391
392        match processor.process_to_sse(&frames) {
393            Ok(sse_data) => {
394                let sse_string = String::from_utf8(sse_data.to_vec())
395                    .map_err(|e| IntegrationError::ResponseConversion(e.to_string()))?;
396
397                let events = vec![sse_string];
398                let response = UniversalResponse::server_sent_events(events).with_header(
399                    Cow::Borrowed("X-PJS-Session-ID"),
400                    Cow::Owned(session_id.to_string()),
401                );
402
403                adapter.to_response(response)
404            }
405            Err(_e) => {
406                // Fallback to standard serialization
407                let events: Vec<String> = frames
408                    .into_iter()
409                    .map(|frame| {
410                        format!(
411                            "data: {}\\n\\n",
412                            serde_json::to_string(&frame).unwrap_or_default()
413                        )
414                    })
415                    .collect();
416
417                let response = UniversalResponse::server_sent_events(events).with_header(
418                    Cow::Borrowed("X-PJS-Session-ID"),
419                    Cow::Owned(session_id.to_string()),
420                );
421
422                adapter.to_response(response)
423            }
424        }
425    }
426
427    /// Default JSON response implementation
428    pub async fn default_json_response<T: StreamingAdapter>(
429        adapter: &T,
430        data: JsonData,
431        streaming: bool,
432    ) -> IntegrationResult<T::Response> {
433        let response = if streaming {
434            // Convert to streaming format
435            let frame = StreamFrame {
436                data: serde_json::to_value(&data).unwrap_or_default(),
437                priority: crate::domain::Priority::HIGH,
438                metadata: std::collections::HashMap::new(),
439            };
440            UniversalResponse::stream(vec![frame])
441        } else {
442            UniversalResponse::json(data)
443        };
444
445        adapter.to_response(response)
446    }
447
448    /// Default middleware implementation (no-op)
449    pub async fn default_middleware<T: StreamingAdapter>(
450        _adapter: &T,
451        _request: &UniversalRequest,
452        response: UniversalResponse,
453    ) -> IntegrationResult<UniversalResponse> {
454        Ok(response)
455    }
456
457    /// Default error response implementation
458    pub async fn default_error_response<T: StreamingAdapter>(
459        adapter: &T,
460        status: u16,
461        message: String,
462    ) -> IntegrationResult<T::Response> {
463        let response = UniversalResponse::error(status, message);
464        adapter.to_response(response)
465    }
466
467    /// Default health check response implementation
468    pub async fn default_health_response<T: StreamingAdapter>(
469        adapter: &T,
470    ) -> IntegrationResult<T::Response> {
471        let health_data = JsonData::Object({
472            let mut map = std::collections::HashMap::new();
473            map.insert(
474                "status".to_string(),
475                JsonData::String("healthy".to_string()),
476            );
477            map.insert(
478                "framework".to_string(),
479                JsonData::String(adapter.framework_name().to_string()),
480            );
481            map.insert(
482                "streaming_support".to_string(),
483                JsonData::Bool(adapter.supports_streaming()),
484            );
485            map.insert(
486                "sse_support".to_string(),
487                JsonData::Bool(adapter.supports_sse()),
488            );
489            map
490        });
491
492        let response = PooledResponseBuilder::new()
493            .header(Cow::Borrowed("X-Health-Check"), Cow::Borrowed("pjs"))
494            .json(health_data);
495
496        adapter.to_response(response)
497    }
498
499    /// Default auto-stream response implementation with format detection
500    pub async fn default_auto_stream_response<T: StreamingAdapter>(
501        adapter: &T,
502        request: &UniversalRequest,
503        session_id: SessionId,
504        frames: Vec<StreamFrame>,
505    ) -> IntegrationResult<T::Response> {
506        let format = request.preferred_streaming_format();
507
508        match format {
509            StreamingFormat::ServerSentEvents => {
510                adapter.create_sse_response(session_id, frames).await
511            }
512            _ => {
513                adapter
514                    .create_streaming_response(session_id, frames, format)
515                    .await
516            }
517        }
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524
525    #[test]
526    fn test_streaming_format_content_types() {
527        assert_eq!(StreamingFormat::Json.content_type(), "application/json");
528        assert_eq!(
529            StreamingFormat::Ndjson.content_type(),
530            "application/x-ndjson"
531        );
532        assert_eq!(
533            StreamingFormat::ServerSentEvents.content_type(),
534            "text/event-stream"
535        );
536        assert_eq!(
537            StreamingFormat::Binary.content_type(),
538            "application/octet-stream"
539        );
540    }
541
542    #[test]
543    fn test_format_detection_from_accept_header() {
544        assert_eq!(
545            StreamingFormat::from_accept_header("text/event-stream"),
546            StreamingFormat::ServerSentEvents
547        );
548        assert_eq!(
549            StreamingFormat::from_accept_header("application/x-ndjson"),
550            StreamingFormat::Ndjson
551        );
552        assert_eq!(
553            StreamingFormat::from_accept_header("application/json"),
554            StreamingFormat::Json
555        );
556    }
557
558    #[test]
559    fn test_universal_request_creation() {
560        let request = UniversalRequest::new("GET", "/api/stream")
561            .with_header("Accept", "text/event-stream")
562            .with_query("priority", "high");
563
564        assert_eq!(request.method, "GET");
565        assert_eq!(request.path, "/api/stream");
566        assert!(request.accepts("text/event-stream"));
567        assert_eq!(request.get_query("priority"), Some(&"high".to_string()));
568    }
569
570    #[test]
571    fn test_universal_response_creation() {
572        let data = JsonData::String("test".to_string());
573        let response = UniversalResponse::json(data)
574            .with_status(201)
575            .with_header("X-Test", "value");
576
577        assert_eq!(response.status_code, 201);
578        assert_eq!(response.content_type, "application/json");
579        assert!(response.headers.contains_key("X-Test"));
580    }
581}