1use super::object_pool::pooled_builders::PooledResponseBuilder;
9use super::simd_acceleration::{SimdConfig, SimdStreamProcessor};
10use crate::domain::value_objects::{JsonData, SessionId};
11use crate::stream::StreamFrame;
12use std::borrow::Cow;
13use std::collections::HashMap;
14use std::future::Future;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum StreamingFormat {
19 Json,
21 Ndjson,
23 ServerSentEvents,
25 Binary,
27}
28
29impl StreamingFormat {
30 pub fn content_type(&self) -> &'static str {
32 match self {
33 Self::Json => "application/json",
34 Self::Ndjson => "application/x-ndjson",
35 Self::ServerSentEvents => "text/event-stream",
36 Self::Binary => "application/octet-stream",
37 }
38 }
39
40 pub fn from_accept_header(accept: &str) -> Self {
42 if accept.contains("text/event-stream") {
43 Self::ServerSentEvents
44 } else if accept.contains("application/x-ndjson") {
45 Self::Ndjson
46 } else if accept.contains("application/octet-stream") {
47 Self::Binary
48 } else {
49 Self::Json
50 }
51 }
52
53 pub fn supports_streaming(&self) -> bool {
55 matches!(self, Self::Ndjson | Self::ServerSentEvents | Self::Binary)
56 }
57}
58
59#[derive(Debug, Clone)]
61pub enum ResponseBody {
62 Json(JsonData),
63 Stream(Vec<StreamFrame>),
64 ServerSentEvents(Vec<String>),
65 Binary(Vec<u8>),
66 Empty,
67}
68
69#[derive(Debug, Clone)]
71pub struct UniversalResponse {
72 pub status_code: u16,
73 pub headers: HashMap<Cow<'static, str>, Cow<'static, str>>,
74 pub body: ResponseBody,
75 pub content_type: Cow<'static, str>,
76}
77
78impl UniversalResponse {
79 pub fn json(data: JsonData) -> Self {
81 Self {
82 status_code: 200,
83 headers: HashMap::with_capacity(2),
84 body: ResponseBody::Json(data),
85 content_type: Cow::Borrowed("application/json"),
86 }
87 }
88
89 pub fn json_pooled(data: JsonData) -> Self {
91 let headers = super::object_pool::get_cow_hashmap().take();
92 Self {
93 status_code: 200,
94 headers,
95 body: ResponseBody::Json(data),
96 content_type: Cow::Borrowed("application/json"),
97 }
98 }
99
100 pub fn stream(frames: Vec<StreamFrame>) -> Self {
102 Self {
103 status_code: 200,
104 headers: HashMap::with_capacity(2),
105 body: ResponseBody::Stream(frames),
106 content_type: Cow::Borrowed("application/x-ndjson"),
107 }
108 }
109
110 pub fn server_sent_events(events: Vec<String>) -> Self {
112 let mut headers = HashMap::with_capacity(4);
113 headers.insert(Cow::Borrowed("Cache-Control"), Cow::Borrowed("no-cache"));
114 headers.insert(Cow::Borrowed("Connection"), Cow::Borrowed("keep-alive"));
115
116 Self {
117 status_code: 200,
118 headers,
119 body: ResponseBody::ServerSentEvents(events),
120 content_type: Cow::Borrowed("text/event-stream"),
121 }
122 }
123
124 pub fn error(status: u16, message: impl Into<String>) -> Self {
126 let error_data = JsonData::Object({
127 let mut map = std::collections::HashMap::new();
128 map.insert("error".to_string(), JsonData::String(message.into()));
129 map.insert("status".to_string(), JsonData::Integer(status as i64));
130 map
131 });
132
133 Self {
134 status_code: status,
135 headers: HashMap::with_capacity(1),
136 body: ResponseBody::Json(error_data),
137 content_type: Cow::Borrowed("application/json"),
138 }
139 }
140
141 pub fn with_header(
143 mut self,
144 name: impl Into<Cow<'static, str>>,
145 value: impl Into<Cow<'static, str>>,
146 ) -> Self {
147 self.headers.insert(name.into(), value.into());
148 self
149 }
150
151 pub fn with_status(mut self, status: u16) -> Self {
153 self.status_code = status;
154 self
155 }
156}
157
158#[derive(Debug, Clone)]
160pub struct UniversalRequest {
161 pub method: Cow<'static, str>,
162 pub path: String,
163 pub headers: HashMap<Cow<'static, str>, Cow<'static, str>>,
164 pub query_params: HashMap<String, String>,
165 pub body: Option<Vec<u8>>,
166}
167
168impl UniversalRequest {
169 pub fn new(method: impl Into<Cow<'static, str>>, path: impl Into<String>) -> Self {
171 Self {
172 method: method.into(),
173 path: path.into(),
174 headers: HashMap::with_capacity(4),
175 query_params: HashMap::with_capacity(2),
176 body: None,
177 }
178 }
179
180 pub fn with_header(
182 mut self,
183 name: impl Into<Cow<'static, str>>,
184 value: impl Into<Cow<'static, str>>,
185 ) -> Self {
186 self.headers.insert(name.into(), value.into());
187 self
188 }
189
190 pub fn with_query(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
192 self.query_params.insert(name.into(), value.into());
193 self
194 }
195
196 pub fn with_body(mut self, body: Vec<u8>) -> Self {
198 self.body = Some(body);
199 self
200 }
201
202 pub fn get_header(&self, name: &str) -> Option<&Cow<'static, str>> {
204 self.headers.get(name)
205 }
206
207 pub fn get_query(&self, name: &str) -> Option<&String> {
209 self.query_params.get(name)
210 }
211
212 pub fn accepts(&self, content_type: &str) -> bool {
214 if let Some(accept) = self
215 .get_header("accept")
216 .or_else(|| self.get_header("Accept"))
217 {
218 accept.contains(content_type)
219 } else {
220 false
221 }
222 }
223
224 pub fn preferred_streaming_format(&self) -> StreamingFormat {
226 if let Some(accept) = self.get_header("accept") {
227 StreamingFormat::from_accept_header(accept)
228 } else {
229 StreamingFormat::Json
230 }
231 }
232}
233
234#[derive(Debug, thiserror::Error)]
236pub enum IntegrationError {
237 #[error("Unsupported framework: {0}")]
238 UnsupportedFramework(String),
239
240 #[error("Request conversion failed: {0}")]
241 RequestConversion(String),
242
243 #[error("Response conversion failed: {0}")]
244 ResponseConversion(String),
245
246 #[error("Streaming not supported by framework")]
247 StreamingNotSupported,
248
249 #[error("Configuration error: {0}")]
250 Configuration(String),
251
252 #[error("SIMD processing error: {0}")]
253 SimdProcessing(String),
254}
255
256pub type IntegrationResult<T> = Result<T, IntegrationError>;
257
258pub trait StreamingAdapter: Send + Sync {
269 type Request;
271 type Response;
273 type Error: std::error::Error + Send + Sync + 'static;
275
276 type StreamingResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
278 where
279 Self: 'a;
280
281 type SseResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
282 where
283 Self: 'a;
284
285 type JsonResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
286 where
287 Self: 'a;
288
289 type MiddlewareFuture<'a>: Future<Output = IntegrationResult<UniversalResponse>> + Send + 'a
290 where
291 Self: 'a;
292
293 fn convert_request(&self, request: Self::Request) -> IntegrationResult<UniversalRequest>;
295
296 fn to_response(&self, response: UniversalResponse) -> IntegrationResult<Self::Response>;
298
299 fn create_streaming_response<'a>(
301 &'a self,
302 session_id: SessionId,
303 frames: Vec<StreamFrame>,
304 format: StreamingFormat,
305 ) -> Self::StreamingResponseFuture<'a>;
306
307 fn create_sse_response<'a>(
309 &'a self,
310 session_id: SessionId,
311 frames: Vec<StreamFrame>,
312 ) -> Self::SseResponseFuture<'a>;
313
314 fn create_json_response<'a>(
316 &'a self,
317 data: JsonData,
318 streaming: bool,
319 ) -> Self::JsonResponseFuture<'a>;
320
321 fn apply_middleware<'a>(
323 &'a self,
324 request: &'a UniversalRequest,
325 response: UniversalResponse,
326 ) -> Self::MiddlewareFuture<'a>;
327
328 fn supports_streaming(&self) -> bool {
330 true
331 }
332
333 fn supports_sse(&self) -> bool {
335 true
336 }
337
338 fn framework_name(&self) -> &'static str;
340}
341
342pub trait StreamingAdapterExt: StreamingAdapter {
344 type AutoStreamFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
346 where
347 Self: 'a;
348
349 type ErrorResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
351 where
352 Self: 'a;
353
354 type HealthResponseFuture<'a>: Future<Output = IntegrationResult<Self::Response>> + Send + 'a
356 where
357 Self: 'a;
358
359 fn auto_stream_response<'a>(
361 &'a self,
362 request: &'a UniversalRequest,
363 session_id: SessionId,
364 frames: Vec<StreamFrame>,
365 ) -> Self::AutoStreamFuture<'a>;
366
367 fn create_error_response<'a>(
369 &'a self,
370 status: u16,
371 message: String,
372 ) -> Self::ErrorResponseFuture<'a>;
373
374 fn create_health_response<'a>(&'a self) -> Self::HealthResponseFuture<'a>;
376}
377
378pub mod streaming_helpers {
380 use super::*;
381
382 pub async fn default_sse_response<T: StreamingAdapter>(
384 adapter: &T,
385 session_id: SessionId,
386 frames: Vec<StreamFrame>,
387 ) -> IntegrationResult<T::Response> {
388 let config = SimdConfig::default();
390 let mut processor = SimdStreamProcessor::new(config);
391
392 match processor.process_to_sse(&frames) {
393 Ok(sse_data) => {
394 let sse_string = String::from_utf8(sse_data.to_vec())
395 .map_err(|e| IntegrationError::ResponseConversion(e.to_string()))?;
396
397 let events = vec![sse_string];
398 let response = UniversalResponse::server_sent_events(events).with_header(
399 Cow::Borrowed("X-PJS-Session-ID"),
400 Cow::Owned(session_id.to_string()),
401 );
402
403 adapter.to_response(response)
404 }
405 Err(_e) => {
406 let events: Vec<String> = frames
408 .into_iter()
409 .map(|frame| {
410 format!(
411 "data: {}\\n\\n",
412 serde_json::to_string(&frame).unwrap_or_default()
413 )
414 })
415 .collect();
416
417 let response = UniversalResponse::server_sent_events(events).with_header(
418 Cow::Borrowed("X-PJS-Session-ID"),
419 Cow::Owned(session_id.to_string()),
420 );
421
422 adapter.to_response(response)
423 }
424 }
425 }
426
427 pub async fn default_json_response<T: StreamingAdapter>(
429 adapter: &T,
430 data: JsonData,
431 streaming: bool,
432 ) -> IntegrationResult<T::Response> {
433 let response = if streaming {
434 let frame = StreamFrame {
436 data: serde_json::to_value(&data).unwrap_or_default(),
437 priority: crate::domain::Priority::HIGH,
438 metadata: std::collections::HashMap::new(),
439 };
440 UniversalResponse::stream(vec![frame])
441 } else {
442 UniversalResponse::json(data)
443 };
444
445 adapter.to_response(response)
446 }
447
448 pub async fn default_middleware<T: StreamingAdapter>(
450 _adapter: &T,
451 _request: &UniversalRequest,
452 response: UniversalResponse,
453 ) -> IntegrationResult<UniversalResponse> {
454 Ok(response)
455 }
456
457 pub async fn default_error_response<T: StreamingAdapter>(
459 adapter: &T,
460 status: u16,
461 message: String,
462 ) -> IntegrationResult<T::Response> {
463 let response = UniversalResponse::error(status, message);
464 adapter.to_response(response)
465 }
466
467 pub async fn default_health_response<T: StreamingAdapter>(
469 adapter: &T,
470 ) -> IntegrationResult<T::Response> {
471 let health_data = JsonData::Object({
472 let mut map = std::collections::HashMap::new();
473 map.insert(
474 "status".to_string(),
475 JsonData::String("healthy".to_string()),
476 );
477 map.insert(
478 "framework".to_string(),
479 JsonData::String(adapter.framework_name().to_string()),
480 );
481 map.insert(
482 "streaming_support".to_string(),
483 JsonData::Bool(adapter.supports_streaming()),
484 );
485 map.insert(
486 "sse_support".to_string(),
487 JsonData::Bool(adapter.supports_sse()),
488 );
489 map
490 });
491
492 let response = PooledResponseBuilder::new()
493 .header(Cow::Borrowed("X-Health-Check"), Cow::Borrowed("pjs"))
494 .json(health_data);
495
496 adapter.to_response(response)
497 }
498
499 pub async fn default_auto_stream_response<T: StreamingAdapter>(
501 adapter: &T,
502 request: &UniversalRequest,
503 session_id: SessionId,
504 frames: Vec<StreamFrame>,
505 ) -> IntegrationResult<T::Response> {
506 let format = request.preferred_streaming_format();
507
508 match format {
509 StreamingFormat::ServerSentEvents => {
510 adapter.create_sse_response(session_id, frames).await
511 }
512 _ => {
513 adapter
514 .create_streaming_response(session_id, frames, format)
515 .await
516 }
517 }
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524
525 #[test]
526 fn test_streaming_format_content_types() {
527 assert_eq!(StreamingFormat::Json.content_type(), "application/json");
528 assert_eq!(
529 StreamingFormat::Ndjson.content_type(),
530 "application/x-ndjson"
531 );
532 assert_eq!(
533 StreamingFormat::ServerSentEvents.content_type(),
534 "text/event-stream"
535 );
536 assert_eq!(
537 StreamingFormat::Binary.content_type(),
538 "application/octet-stream"
539 );
540 }
541
542 #[test]
543 fn test_format_detection_from_accept_header() {
544 assert_eq!(
545 StreamingFormat::from_accept_header("text/event-stream"),
546 StreamingFormat::ServerSentEvents
547 );
548 assert_eq!(
549 StreamingFormat::from_accept_header("application/x-ndjson"),
550 StreamingFormat::Ndjson
551 );
552 assert_eq!(
553 StreamingFormat::from_accept_header("application/json"),
554 StreamingFormat::Json
555 );
556 }
557
558 #[test]
559 fn test_universal_request_creation() {
560 let request = UniversalRequest::new("GET", "/api/stream")
561 .with_header("Accept", "text/event-stream")
562 .with_query("priority", "high");
563
564 assert_eq!(request.method, "GET");
565 assert_eq!(request.path, "/api/stream");
566 assert!(request.accepts("text/event-stream"));
567 assert_eq!(request.get_query("priority"), Some(&"high".to_string()));
568 }
569
570 #[test]
571 fn test_universal_response_creation() {
572 let data = JsonData::String("test".to_string());
573 let response = UniversalResponse::json(data)
574 .with_status(201)
575 .with_header("X-Test", "value");
576
577 assert_eq!(response.status_code, 201);
578 assert_eq!(response.content_type, "application/json");
579 assert!(response.headers.contains_key("X-Test"));
580 }
581}