pjson_rs/infrastructure/integration/
universal_adapter.rs1use super::streaming_adapter::{
9 IntegrationError, IntegrationResult, ResponseBody, StreamingAdapter, StreamingFormat,
10 UniversalRequest, UniversalResponse, streaming_helpers,
11};
12use crate::domain::value_objects::{JsonData, SessionId};
13use crate::stream::StreamFrame;
14use std::borrow::Cow;
15use std::collections::HashMap;
16use std::future::Future;
17use std::marker::PhantomData;
18
19#[derive(Debug, Clone)]
21pub struct AdapterConfig {
22 pub framework_name: Cow<'static, str>,
24 pub supports_streaming: bool,
26 pub supports_sse: bool,
28 pub default_content_type: Cow<'static, str>,
30 pub default_headers: HashMap<Cow<'static, str>, Cow<'static, str>>,
32}
33
34impl Default for AdapterConfig {
35 fn default() -> Self {
36 Self {
37 framework_name: Cow::Borrowed("universal"),
38 supports_streaming: true,
39 supports_sse: true,
40 default_content_type: Cow::Borrowed("application/json"),
41 default_headers: HashMap::with_capacity(2), }
43 }
44}
45
46pub struct UniversalAdapter<Req, Res, Err> {
48 config: AdapterConfig,
49 _phantom: PhantomData<(Req, Res, Err)>,
50}
51
52impl<Req, Res, Err> UniversalAdapter<Req, Res, Err>
53where
54 Err: std::error::Error + Send + Sync + 'static,
55{
56 pub fn new() -> Self {
58 Self {
59 config: AdapterConfig::default(),
60 _phantom: PhantomData,
61 }
62 }
63
64 pub fn with_config(config: AdapterConfig) -> Self {
66 Self {
67 config,
68 _phantom: PhantomData,
69 }
70 }
71
72 pub fn set_config(&mut self, config: AdapterConfig) {
74 self.config = config;
75 }
76
77 pub fn add_default_header(
79 &mut self,
80 name: impl Into<Cow<'static, str>>,
81 value: impl Into<Cow<'static, str>>,
82 ) {
83 self.config
84 .default_headers
85 .insert(name.into(), value.into());
86 }
87}
88
89impl<Req, Res, Err> Default for UniversalAdapter<Req, Res, Err>
90where
91 Err: std::error::Error + Send + Sync + 'static,
92{
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98impl<Req, Res, Err> StreamingAdapter for UniversalAdapter<Req, Res, Err>
101where
102 Req: Send + Sync + 'static,
103 Res: Send + Sync + 'static,
104 Err: std::error::Error + Send + Sync + 'static,
105{
106 type Request = Req;
107 type Response = Res;
108 type Error = Err;
109
110 type StreamingResponseFuture<'a>
112 = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
113 where
114 Self: 'a;
115
116 type SseResponseFuture<'a>
117 = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
118 where
119 Self: 'a;
120
121 type JsonResponseFuture<'a>
122 = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
123 where
124 Self: 'a;
125
126 type MiddlewareFuture<'a>
127 = impl Future<Output = IntegrationResult<UniversalResponse>> + Send + 'a
128 where
129 Self: 'a;
130
131 fn convert_request(&self, _request: Self::Request) -> IntegrationResult<UniversalRequest> {
132 Err(IntegrationError::UnsupportedFramework(
135 "Generic UniversalAdapter cannot convert requests - use concrete adapter implementation".to_string()
136 ))
137 }
138
139 fn to_response(&self, _response: UniversalResponse) -> IntegrationResult<Self::Response> {
140 Err(IntegrationError::UnsupportedFramework(
143 "Generic UniversalAdapter cannot convert responses - use concrete adapter implementation".to_string()
144 ))
145 }
146
147 fn create_streaming_response<'a>(
148 &'a self,
149 session_id: SessionId,
150 frames: Vec<StreamFrame>,
151 format: StreamingFormat,
152 ) -> Self::StreamingResponseFuture<'a> {
153 async move {
155 let response = match format {
156 StreamingFormat::Json => {
157 let json_frames: Vec<_> = frames
159 .into_iter()
160 .map(|frame| serde_json::to_value(&frame).unwrap_or_default())
161 .collect();
162
163 let data =
164 JsonData::Array(json_frames.into_iter().map(JsonData::from).collect());
165
166 UniversalResponse::json_pooled(data) }
168 StreamingFormat::Ndjson => {
169 let ndjson_lines: Vec<String> = frames
171 .into_iter()
172 .map(|frame| serde_json::to_string(&frame).unwrap_or_default())
173 .collect();
174
175 UniversalResponse {
176 status_code: 200,
177 headers: super::object_pool::get_cow_hashmap().take(), body: ResponseBody::ServerSentEvents(ndjson_lines),
179 content_type: Cow::Borrowed(format.content_type()),
180 }
181 }
182 StreamingFormat::ServerSentEvents => {
183 return self.create_sse_response(session_id, frames).await;
185 }
186 StreamingFormat::Binary => {
187 let binary_data = frames
189 .into_iter()
190 .flat_map(|frame| serde_json::to_vec(&frame).unwrap_or_default())
191 .collect();
192
193 UniversalResponse {
194 status_code: 200,
195 headers: super::object_pool::get_cow_hashmap().take(), body: ResponseBody::Binary(binary_data),
197 content_type: Cow::Borrowed(format.content_type()),
198 }
199 }
200 };
201
202 self.to_response(response)
203 }
204 }
205
206 fn create_sse_response<'a>(
207 &'a self,
208 session_id: SessionId,
209 frames: Vec<StreamFrame>,
210 ) -> Self::SseResponseFuture<'a> {
211 async move { streaming_helpers::default_sse_response(self, session_id, frames).await }
213 }
214
215 fn create_json_response<'a>(
216 &'a self,
217 data: JsonData,
218 streaming: bool,
219 ) -> Self::JsonResponseFuture<'a> {
220 async move { streaming_helpers::default_json_response(self, data, streaming).await }
222 }
223
224 fn apply_middleware<'a>(
225 &'a self,
226 request: &'a UniversalRequest,
227 response: UniversalResponse,
228 ) -> Self::MiddlewareFuture<'a> {
229 async move { streaming_helpers::default_middleware(self, request, response).await }
231 }
232
233 fn supports_streaming(&self) -> bool {
234 self.config.supports_streaming
235 }
236
237 fn supports_sse(&self) -> bool {
238 self.config.supports_sse
239 }
240
241 fn framework_name(&self) -> &'static str {
242 match &self.config.framework_name {
244 Cow::Borrowed(s) => s,
245 Cow::Owned(_) => "universal", }
247 }
248}
249
250#[derive(Default)]
252pub struct UniversalAdapterBuilder {
253 config: AdapterConfig,
254}
255
256impl UniversalAdapterBuilder {
257 pub fn new() -> Self {
259 Self::default()
260 }
261
262 pub fn framework_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
264 self.config.framework_name = name.into();
265 self
266 }
267
268 pub fn streaming_support(mut self, enabled: bool) -> Self {
270 self.config.supports_streaming = enabled;
271 self
272 }
273
274 pub fn sse_support(mut self, enabled: bool) -> Self {
276 self.config.supports_sse = enabled;
277 self
278 }
279
280 pub fn default_content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
282 self.config.default_content_type = content_type.into();
283 self
284 }
285
286 pub fn default_header(
288 mut self,
289 name: impl Into<Cow<'static, str>>,
290 value: impl Into<Cow<'static, str>>,
291 ) -> Self {
292 self.config
293 .default_headers
294 .insert(name.into(), value.into());
295 self
296 }
297
298 pub fn build<Req, Res, Err>(self) -> UniversalAdapter<Req, Res, Err>
300 where
301 Err: std::error::Error + Send + Sync + 'static,
302 {
303 UniversalAdapter::with_config(self.config)
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_adapter_config_creation() {
313 let config = AdapterConfig::default();
314 assert_eq!(config.framework_name, "universal");
315 assert!(config.supports_streaming);
316 assert!(config.supports_sse);
317 }
318
319 #[test]
320 fn test_adapter_builder() {
321 let adapter: UniversalAdapter<(), (), std::io::Error> = UniversalAdapterBuilder::new()
322 .framework_name("test")
323 .streaming_support(false)
324 .sse_support(true)
325 .default_header("X-Test", "test")
326 .build();
327
328 assert_eq!(adapter.config.framework_name, "test");
329 assert!(!adapter.config.supports_streaming);
330 assert!(adapter.config.supports_sse);
331 assert_eq!(
332 adapter.config.default_headers.get("X-Test"),
333 Some(&Cow::Borrowed("test"))
334 );
335 }
336
337 #[test]
338 fn test_universal_adapter_capabilities() {
339 let adapter: UniversalAdapter<(), (), std::io::Error> = UniversalAdapter::new();
340
341 assert!(adapter.supports_streaming());
342 assert!(adapter.supports_sse());
343 assert_eq!(adapter.framework_name(), "universal");
344 }
345}