use super::streaming_adapter::{
IntegrationError, IntegrationResult, ResponseBody, StreamingAdapter, StreamingFormat,
UniversalRequest, UniversalResponse, streaming_helpers,
};
use crate::domain::value_objects::{JsonData, SessionId};
use crate::stream::StreamFrame;
use std::borrow::Cow;
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
#[derive(Debug, Clone)]
pub struct AdapterConfig {
pub framework_name: Cow<'static, str>,
pub supports_streaming: bool,
pub supports_sse: bool,
pub default_content_type: Cow<'static, str>,
pub default_headers: HashMap<Cow<'static, str>, Cow<'static, str>>,
}
impl Default for AdapterConfig {
fn default() -> Self {
Self {
framework_name: Cow::Borrowed("universal"),
supports_streaming: true,
supports_sse: true,
default_content_type: Cow::Borrowed("application/json"),
default_headers: HashMap::with_capacity(2), }
}
}
pub struct UniversalAdapter<Req, Res, Err> {
config: AdapterConfig,
_phantom: PhantomData<(Req, Res, Err)>,
}
impl<Req, Res, Err> UniversalAdapter<Req, Res, Err>
where
Err: std::error::Error + Send + Sync + 'static,
{
pub fn new() -> Self {
Self {
config: AdapterConfig::default(),
_phantom: PhantomData,
}
}
pub fn with_config(config: AdapterConfig) -> Self {
Self {
config,
_phantom: PhantomData,
}
}
pub fn set_config(&mut self, config: AdapterConfig) {
self.config = config;
}
pub fn add_default_header(
&mut self,
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) {
self.config
.default_headers
.insert(name.into(), value.into());
}
}
impl<Req, Res, Err> Default for UniversalAdapter<Req, Res, Err>
where
Err: std::error::Error + Send + Sync + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<Req, Res, Err> StreamingAdapter for UniversalAdapter<Req, Res, Err>
where
Req: Send + Sync + 'static,
Res: Send + Sync + 'static,
Err: std::error::Error + Send + Sync + 'static,
{
type Request = Req;
type Response = Res;
type Error = Err;
type StreamingResponseFuture<'a>
= impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
where
Self: 'a;
type SseResponseFuture<'a>
= impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
where
Self: 'a;
type JsonResponseFuture<'a>
= impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
where
Self: 'a;
type MiddlewareFuture<'a>
= impl Future<Output = IntegrationResult<UniversalResponse>> + Send + 'a
where
Self: 'a;
fn convert_request(&self, _request: Self::Request) -> IntegrationResult<UniversalRequest> {
Err(IntegrationError::UnsupportedFramework(
"Generic UniversalAdapter cannot convert requests - use concrete adapter implementation".to_string()
))
}
fn to_response(&self, _response: UniversalResponse) -> IntegrationResult<Self::Response> {
Err(IntegrationError::UnsupportedFramework(
"Generic UniversalAdapter cannot convert responses - use concrete adapter implementation".to_string()
))
}
fn create_streaming_response<'a>(
&'a self,
session_id: SessionId,
frames: Vec<StreamFrame>,
format: StreamingFormat,
) -> Self::StreamingResponseFuture<'a> {
async move {
let response = match format {
StreamingFormat::Json => {
let json_frames: Vec<_> = frames
.into_iter()
.map(|frame| serde_json::to_value(&frame).unwrap_or_default())
.collect();
let data =
JsonData::Array(json_frames.into_iter().map(JsonData::from).collect());
UniversalResponse::json_pooled(data) }
StreamingFormat::Ndjson => {
let ndjson_lines: Vec<String> = frames
.into_iter()
.map(|frame| serde_json::to_string(&frame).unwrap_or_default())
.collect();
UniversalResponse {
status_code: 200,
headers: super::object_pool::get_cow_hashmap().take(), body: ResponseBody::ServerSentEvents(ndjson_lines),
content_type: Cow::Borrowed(format.content_type()),
}
}
StreamingFormat::ServerSentEvents => {
return self.create_sse_response(session_id, frames).await;
}
StreamingFormat::Binary => {
let binary_data = frames
.into_iter()
.flat_map(|frame| serde_json::to_vec(&frame).unwrap_or_default())
.collect();
UniversalResponse {
status_code: 200,
headers: super::object_pool::get_cow_hashmap().take(), body: ResponseBody::Binary(binary_data),
content_type: Cow::Borrowed(format.content_type()),
}
}
};
self.to_response(response)
}
}
fn create_sse_response<'a>(
&'a self,
session_id: SessionId,
frames: Vec<StreamFrame>,
) -> Self::SseResponseFuture<'a> {
async move { streaming_helpers::default_sse_response(self, session_id, frames).await }
}
fn create_json_response<'a>(
&'a self,
data: JsonData,
streaming: bool,
) -> Self::JsonResponseFuture<'a> {
async move { streaming_helpers::default_json_response(self, data, streaming).await }
}
fn apply_middleware<'a>(
&'a self,
request: &'a UniversalRequest,
response: UniversalResponse,
) -> Self::MiddlewareFuture<'a> {
async move { streaming_helpers::default_middleware(self, request, response).await }
}
fn supports_streaming(&self) -> bool {
self.config.supports_streaming
}
fn supports_sse(&self) -> bool {
self.config.supports_sse
}
fn framework_name(&self) -> &'static str {
match &self.config.framework_name {
Cow::Borrowed(s) => s,
Cow::Owned(_) => "universal", }
}
}
#[derive(Default)]
pub struct UniversalAdapterBuilder {
config: AdapterConfig,
}
impl UniversalAdapterBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn framework_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
self.config.framework_name = name.into();
self
}
pub fn streaming_support(mut self, enabled: bool) -> Self {
self.config.supports_streaming = enabled;
self
}
pub fn sse_support(mut self, enabled: bool) -> Self {
self.config.supports_sse = enabled;
self
}
pub fn default_content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
self.config.default_content_type = content_type.into();
self
}
pub fn default_header(
mut self,
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) -> Self {
self.config
.default_headers
.insert(name.into(), value.into());
self
}
pub fn build<Req, Res, Err>(self) -> UniversalAdapter<Req, Res, Err>
where
Err: std::error::Error + Send + Sync + 'static,
{
UniversalAdapter::with_config(self.config)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_adapter_config_creation() {
let config = AdapterConfig::default();
assert_eq!(config.framework_name, "universal");
assert!(config.supports_streaming);
assert!(config.supports_sse);
}
#[test]
fn test_adapter_builder() {
let adapter: UniversalAdapter<(), (), std::io::Error> = UniversalAdapterBuilder::new()
.framework_name("test")
.streaming_support(false)
.sse_support(true)
.default_header("X-Test", "test")
.build();
assert_eq!(adapter.config.framework_name, "test");
assert!(!adapter.config.supports_streaming);
assert!(adapter.config.supports_sse);
assert_eq!(
adapter.config.default_headers.get("X-Test"),
Some(&Cow::Borrowed("test"))
);
}
#[test]
fn test_universal_adapter_capabilities() {
let adapter: UniversalAdapter<(), (), std::io::Error> = UniversalAdapter::new();
assert!(adapter.supports_streaming());
assert!(adapter.supports_sse());
assert_eq!(adapter.framework_name(), "universal");
}
}