pjson_rs/infrastructure/integration/
universal_adapter.rs

1// Universal adapter implementation for any web framework
2//
3// This provides a concrete implementation of StreamingAdapter that can work
4// with any framework through configuration and type mapping.
5//
6// REQUIRES: nightly Rust for `impl Trait` in associated types
7
8use super::streaming_adapter::{
9    IntegrationError, IntegrationResult, ResponseBody, StreamingAdapter, StreamingFormat,
10    UniversalRequest, UniversalResponse, streaming_helpers,
11};
12use crate::domain::value_objects::{JsonData, SessionId};
13use crate::stream::StreamFrame;
14use std::borrow::Cow;
15use std::collections::HashMap;
16use std::future::Future;
17use std::marker::PhantomData;
18
19/// Configuration for the universal adapter
20#[derive(Debug, Clone)]
21pub struct AdapterConfig {
22    /// Framework name for logging/debugging
23    pub framework_name: Cow<'static, str>,
24    /// Whether the framework supports streaming
25    pub supports_streaming: bool,
26    /// Whether the framework supports Server-Sent Events
27    pub supports_sse: bool,
28    /// Default content type for responses
29    pub default_content_type: Cow<'static, str>,
30    /// Custom headers to add to all responses
31    pub default_headers: HashMap<Cow<'static, str>, Cow<'static, str>>,
32}
33
34impl Default for AdapterConfig {
35    fn default() -> Self {
36        Self {
37            framework_name: Cow::Borrowed("universal"),
38            supports_streaming: true,
39            supports_sse: true,
40            default_content_type: Cow::Borrowed("application/json"),
41            default_headers: HashMap::with_capacity(2), // Pre-allocate for common headers
42        }
43    }
44}
45
46/// Universal adapter that can work with any framework
47pub struct UniversalAdapter<Req, Res, Err> {
48    config: AdapterConfig,
49    _phantom: PhantomData<(Req, Res, Err)>,
50}
51
52impl<Req, Res, Err> UniversalAdapter<Req, Res, Err>
53where
54    Err: std::error::Error + Send + Sync + 'static,
55{
56    /// Create a new universal adapter with default configuration
57    pub fn new() -> Self {
58        Self {
59            config: AdapterConfig::default(),
60            _phantom: PhantomData,
61        }
62    }
63
64    /// Create a new universal adapter with custom configuration
65    pub fn with_config(config: AdapterConfig) -> Self {
66        Self {
67            config,
68            _phantom: PhantomData,
69        }
70    }
71
72    /// Update the configuration
73    pub fn set_config(&mut self, config: AdapterConfig) {
74        self.config = config;
75    }
76
77    /// Add a default header
78    pub fn add_default_header(
79        &mut self,
80        name: impl Into<Cow<'static, str>>,
81        value: impl Into<Cow<'static, str>>,
82    ) {
83        self.config
84            .default_headers
85            .insert(name.into(), value.into());
86    }
87}
88
89impl<Req, Res, Err> Default for UniversalAdapter<Req, Res, Err>
90where
91    Err: std::error::Error + Send + Sync + 'static,
92{
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98// NOTE: This is a generic implementation that frameworks can specialize
99// Each framework will need to provide their own implementation of these methods
100impl<Req, Res, Err> StreamingAdapter for UniversalAdapter<Req, Res, Err>
101where
102    Req: Send + Sync + 'static,
103    Res: Send + Sync + 'static,
104    Err: std::error::Error + Send + Sync + 'static,
105{
106    type Request = Req;
107    type Response = Res;
108    type Error = Err;
109
110    // Zero-cost GAT futures with impl Trait - no Box allocation, true zero-cost abstractions
111    type StreamingResponseFuture<'a>
112        = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
113    where
114        Self: 'a;
115
116    type SseResponseFuture<'a>
117        = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
118    where
119        Self: 'a;
120
121    type JsonResponseFuture<'a>
122        = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
123    where
124        Self: 'a;
125
126    type MiddlewareFuture<'a>
127        = impl Future<Output = IntegrationResult<UniversalResponse>> + Send + 'a
128    where
129        Self: 'a;
130
131    fn convert_request(&self, _request: Self::Request) -> IntegrationResult<UniversalRequest> {
132        // Universal adapter cannot convert framework-specific requests generically
133        // This should only be used with concrete adapter implementations
134        Err(IntegrationError::UnsupportedFramework(
135            "Generic UniversalAdapter cannot convert requests - use concrete adapter implementation".to_string()
136        ))
137    }
138
139    fn to_response(&self, _response: UniversalResponse) -> IntegrationResult<Self::Response> {
140        // Universal adapter cannot convert responses to framework-specific types generically
141        // This should only be used with concrete adapter implementations
142        Err(IntegrationError::UnsupportedFramework(
143            "Generic UniversalAdapter cannot convert responses - use concrete adapter implementation".to_string()
144        ))
145    }
146
147    fn create_streaming_response<'a>(
148        &'a self,
149        session_id: SessionId,
150        frames: Vec<StreamFrame>,
151        format: StreamingFormat,
152    ) -> Self::StreamingResponseFuture<'a> {
153        // Return async block directly - compiler creates optimal Future type
154        async move {
155            let response = match format {
156                StreamingFormat::Json => {
157                    // Use SIMD-accelerated serialization for better performance
158                    let json_frames: Vec<_> = frames
159                        .into_iter()
160                        .map(|frame| serde_json::to_value(&frame).unwrap_or_default())
161                        .collect();
162
163                    let data =
164                        JsonData::Array(json_frames.into_iter().map(JsonData::from).collect());
165
166                    UniversalResponse::json_pooled(data) // Use pooled version for efficiency
167                }
168                StreamingFormat::Ndjson => {
169                    // Convert frames to NDJSON format with pooled collections
170                    let ndjson_lines: Vec<String> = frames
171                        .into_iter()
172                        .map(|frame| serde_json::to_string(&frame).unwrap_or_default())
173                        .collect();
174
175                    UniversalResponse {
176                        status_code: 200,
177                        headers: super::object_pool::get_cow_hashmap().take(), // Use pooled HashMap
178                        body: ResponseBody::ServerSentEvents(ndjson_lines),
179                        content_type: Cow::Borrowed(format.content_type()),
180                    }
181                }
182                StreamingFormat::ServerSentEvents => {
183                    // Delegate to specialized SSE handler
184                    return self.create_sse_response(session_id, frames).await;
185                }
186                StreamingFormat::Binary => {
187                    // Convert frames to binary format with SIMD acceleration
188                    let binary_data = frames
189                        .into_iter()
190                        .flat_map(|frame| serde_json::to_vec(&frame).unwrap_or_default())
191                        .collect();
192
193                    UniversalResponse {
194                        status_code: 200,
195                        headers: super::object_pool::get_cow_hashmap().take(), // Use pooled HashMap
196                        body: ResponseBody::Binary(binary_data),
197                        content_type: Cow::Borrowed(format.content_type()),
198                    }
199                }
200            };
201
202            self.to_response(response)
203        }
204    }
205
206    fn create_sse_response<'a>(
207        &'a self,
208        session_id: SessionId,
209        frames: Vec<StreamFrame>,
210    ) -> Self::SseResponseFuture<'a> {
211        // Direct async block - zero-cost abstraction with compile-time optimization
212        async move { streaming_helpers::default_sse_response(self, session_id, frames).await }
213    }
214
215    fn create_json_response<'a>(
216        &'a self,
217        data: JsonData,
218        streaming: bool,
219    ) -> Self::JsonResponseFuture<'a> {
220        // Direct async block for optimal performance
221        async move { streaming_helpers::default_json_response(self, data, streaming).await }
222    }
223
224    fn apply_middleware<'a>(
225        &'a self,
226        request: &'a UniversalRequest,
227        response: UniversalResponse,
228    ) -> Self::MiddlewareFuture<'a> {
229        // Zero-cost middleware application
230        async move { streaming_helpers::default_middleware(self, request, response).await }
231    }
232
233    fn supports_streaming(&self) -> bool {
234        self.config.supports_streaming
235    }
236
237    fn supports_sse(&self) -> bool {
238        self.config.supports_sse
239    }
240
241    fn framework_name(&self) -> &'static str {
242        // Convert Cow to &'static str safely
243        match &self.config.framework_name {
244            Cow::Borrowed(s) => s,
245            Cow::Owned(_) => "universal", // fallback for owned strings
246        }
247    }
248}
249
250/// Builder for creating configured universal adapters
251#[derive(Default)]
252pub struct UniversalAdapterBuilder {
253    config: AdapterConfig,
254}
255
256impl UniversalAdapterBuilder {
257    /// Create a new builder
258    pub fn new() -> Self {
259        Self::default()
260    }
261
262    /// Set the framework name
263    pub fn framework_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
264        self.config.framework_name = name.into();
265        self
266    }
267
268    /// Enable or disable streaming support
269    pub fn streaming_support(mut self, enabled: bool) -> Self {
270        self.config.supports_streaming = enabled;
271        self
272    }
273
274    /// Enable or disable SSE support
275    pub fn sse_support(mut self, enabled: bool) -> Self {
276        self.config.supports_sse = enabled;
277        self
278    }
279
280    /// Set default content type
281    pub fn default_content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
282        self.config.default_content_type = content_type.into();
283        self
284    }
285
286    /// Add a default header
287    pub fn default_header(
288        mut self,
289        name: impl Into<Cow<'static, str>>,
290        value: impl Into<Cow<'static, str>>,
291    ) -> Self {
292        self.config
293            .default_headers
294            .insert(name.into(), value.into());
295        self
296    }
297
298    /// Build the adapter
299    pub fn build<Req, Res, Err>(self) -> UniversalAdapter<Req, Res, Err>
300    where
301        Err: std::error::Error + Send + Sync + 'static,
302    {
303        UniversalAdapter::with_config(self.config)
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn test_adapter_config_creation() {
313        let config = AdapterConfig::default();
314        assert_eq!(config.framework_name, "universal");
315        assert!(config.supports_streaming);
316        assert!(config.supports_sse);
317    }
318
319    #[test]
320    fn test_adapter_builder() {
321        let adapter: UniversalAdapter<(), (), std::io::Error> = UniversalAdapterBuilder::new()
322            .framework_name("test")
323            .streaming_support(false)
324            .sse_support(true)
325            .default_header("X-Test", "test")
326            .build();
327
328        assert_eq!(adapter.config.framework_name, "test");
329        assert!(!adapter.config.supports_streaming);
330        assert!(adapter.config.supports_sse);
331        assert_eq!(
332            adapter.config.default_headers.get("X-Test"),
333            Some(&Cow::Borrowed("test"))
334        );
335    }
336
337    #[test]
338    fn test_universal_adapter_capabilities() {
339        let adapter: UniversalAdapter<(), (), std::io::Error> = UniversalAdapter::new();
340
341        assert!(adapter.supports_streaming());
342        assert!(adapter.supports_sse());
343        assert_eq!(adapter.framework_name(), "universal");
344    }
345}