use std::sync::Arc;
use crate::codec::Codec;
use crate::error::{KnafehError, RpcStatusCode};
use crate::rpc::message::{Metadata, RpcRequest};
use crate::rpc::middleware::MiddlewareStack;
use crate::rpc::router::MethodRouter;
use crate::rpc::stream::RpcStreamReceiver;
pub struct RequestHandler {
pub(crate) router: Arc<MethodRouter>,
pub(crate) middleware: Arc<MiddlewareStack>,
pub(crate) codec: Arc<dyn Codec>,
}
impl RequestHandler {
pub fn new(
router: Arc<MethodRouter>,
middleware: Arc<MiddlewareStack>,
codec: Arc<dyn Codec>,
) -> Self {
Self {
router,
middleware,
codec,
}
}
pub async fn handle_unary(
&self,
method_path: String,
raw_body: Vec<u8>,
metadata: Metadata,
) -> (Vec<u8>, RpcStatusCode, String, Metadata) {
let body = match self.codec.decode(&raw_body) {
Ok(b) => b,
Err(e) => {
return (
Vec::new(),
RpcStatusCode::InvalidArgument,
format!("codec decode error: {e}"),
Metadata::new(),
);
}
};
let mut request = RpcRequest {
method: method_path,
metadata,
body,
};
if let Err(e) = self.middleware.apply_request(&mut request).await {
return (
Vec::new(),
RpcStatusCode::Internal,
format!("middleware error: {e}"),
Metadata::new(),
);
}
let mut response = match self.router.route_unary(request).await {
Ok(resp) => resp,
Err(KnafehError::Service { code, message }) => {
return (Vec::new(), code, message, Metadata::new());
}
Err(e) => {
return (
Vec::new(),
RpcStatusCode::Internal,
format!("handler error: {e}"),
Metadata::new(),
);
}
};
if let Err(e) = self.middleware.apply_response(&mut response).await {
return (
Vec::new(),
RpcStatusCode::Internal,
format!("middleware error: {e}"),
Metadata::new(),
);
}
let encoded_body = if response.body.is_empty() {
Vec::new()
} else {
match self.codec.encode(&response.body) {
Ok(b) => b,
Err(e) => {
return (
Vec::new(),
RpcStatusCode::Internal,
format!("codec encode error: {e}"),
Metadata::new(),
);
}
}
};
(
encoded_body,
response.status.code,
response.status.message,
response.metadata,
)
}
pub async fn handle_server_stream(
&self,
method_path: String,
raw_body: Vec<u8>,
metadata: Metadata,
) -> (RpcStatusCode, String, Metadata, Option<RpcStreamReceiver>) {
let body = match self.codec.decode(&raw_body) {
Ok(b) => b,
Err(e) => {
return (
RpcStatusCode::InvalidArgument,
format!("codec decode error: {e}"),
Metadata::new(),
None,
);
}
};
let mut request = RpcRequest {
method: method_path,
metadata,
body,
};
if let Err(e) = self.middleware.apply_request(&mut request).await {
return (
RpcStatusCode::Internal,
format!("middleware error: {e}"),
Metadata::new(),
None,
);
}
match self.router.route_server_stream(request).await {
Ok(stream_response) => (
RpcStatusCode::Ok,
String::new(),
stream_response.metadata,
Some(stream_response.receiver),
),
Err(KnafehError::Service { code, message }) => (code, message, Metadata::new(), None),
Err(e) => (
RpcStatusCode::Internal,
format!("handler error: {e}"),
Metadata::new(),
None,
),
}
}
}