use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use http::{HeaderMap, Method, StatusCode};
use url::Url;
use crate::error::Error;
use crate::response::Response;
use crate::Result;
#[derive(Debug, Clone)]
pub struct RequestContext {
pub url: Url,
pub method: Method,
pub headers: HeaderMap,
pub body: Option<Bytes>,
pub retry_attempt: u32,
}
#[derive(Debug, Clone)]
pub struct ResponseContext {
pub request: RequestContext,
pub response: Response,
}
#[derive(Debug, Clone)]
pub struct StreamingResponseContext {
pub request: RequestContext,
pub status: StatusCode,
pub headers: HeaderMap,
}
#[derive(Debug, Clone)]
pub struct StreamingResponseMeta {
pub status: StatusCode,
pub headers: HeaderMap,
}
#[derive(Debug, Clone)]
pub struct SuccessContext {
pub request: RequestContext,
pub response: Response,
}
#[derive(Debug, Clone)]
pub struct StreamingSuccessContext {
pub request: RequestContext,
pub status: StatusCode,
pub headers: HeaderMap,
}
#[derive(Debug, Clone)]
pub struct ErrorContext {
pub request: RequestContext,
pub response: Option<Response>,
pub error: Error,
}
impl ErrorContext {
pub fn response_body_preview(&self, max_bytes: usize) -> Option<String> {
let body = self.response.as_ref()?.bytes();
if body.is_empty() {
return None;
}
let lossy = String::from_utf8_lossy(body);
if lossy.len() <= max_bytes {
Some(lossy.into_owned())
} else {
let end = lossy
.char_indices()
.map(|(i, _)| i)
.nth(max_bytes)
.unwrap_or(lossy.len());
Some(format!("{}…", &lossy[..end]))
}
}
}
type RequestHookFn = Arc<
dyn Fn(RequestContext) -> Pin<Box<dyn Future<Output = Result<RequestContext>> + Send>>
+ Send
+ Sync,
>;
type ResponseHookFn = Arc<
dyn Fn(ResponseContext) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>> + Send + Sync,
>;
type StreamingResponseHookFn = Arc<
dyn Fn(
StreamingResponseContext,
) -> Pin<Box<dyn Future<Output = Result<StreamingResponseMeta>> + Send>>
+ Send
+ Sync,
>;
type SuccessHookFn =
Arc<dyn Fn(SuccessContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
type StreamingSuccessHookFn =
Arc<dyn Fn(StreamingSuccessContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
type ErrorHookFn =
Arc<dyn Fn(ErrorContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
type RetryHookFn =
Arc<dyn Fn(ResponseContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
#[derive(Clone, Default)]
pub struct Hooks {
pub(crate) on_request: Vec<RequestHookFn>,
pub(crate) on_response: Vec<ResponseHookFn>,
pub(crate) on_response_stream: Vec<StreamingResponseHookFn>,
pub(crate) on_success: Vec<SuccessHookFn>,
pub(crate) on_success_stream: Vec<StreamingSuccessHookFn>,
pub(crate) on_error: Vec<ErrorHookFn>,
pub(crate) on_retry: Vec<RetryHookFn>,
}
impl Hooks {
pub fn new() -> Self {
Self::default()
}
pub fn on_request<F, Fut>(mut self, f: F) -> Self
where
F: Fn(RequestContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<RequestContext>> + Send + 'static,
{
self.on_request.push(Arc::new(move |ctx| Box::pin(f(ctx))));
self
}
pub fn on_response<F, Fut>(mut self, f: F) -> Self
where
F: Fn(ResponseContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Response>> + Send + 'static,
{
self.on_response.push(Arc::new(move |ctx| Box::pin(f(ctx))));
self
}
pub fn on_response_stream<F, Fut>(mut self, f: F) -> Self
where
F: Fn(StreamingResponseContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<StreamingResponseMeta>> + Send + 'static,
{
self.on_response_stream
.push(Arc::new(move |ctx| Box::pin(f(ctx))));
self
}
pub fn on_success<F, Fut>(mut self, f: F) -> Self
where
F: Fn(SuccessContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.on_success.push(Arc::new(move |ctx| Box::pin(f(ctx))));
self
}
pub fn on_success_stream<F, Fut>(mut self, f: F) -> Self
where
F: Fn(StreamingSuccessContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.on_success_stream
.push(Arc::new(move |ctx| Box::pin(f(ctx))));
self
}
pub fn on_error<F, Fut>(mut self, f: F) -> Self
where
F: Fn(ErrorContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.on_error.push(Arc::new(move |ctx| Box::pin(f(ctx))));
self
}
pub fn on_retry<F, Fut>(mut self, f: F) -> Self
where
F: Fn(ResponseContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.on_retry.push(Arc::new(move |ctx| Box::pin(f(ctx))));
self
}
pub(crate) fn merge(mut self, other: Hooks) -> Self {
self.on_request.extend(other.on_request);
self.on_response.extend(other.on_response);
self.on_response_stream.extend(other.on_response_stream);
self.on_success.extend(other.on_success);
self.on_success_stream.extend(other.on_success_stream);
self.on_error.extend(other.on_error);
self.on_retry.extend(other.on_retry);
self
}
pub(crate) async fn run_on_request(&self, mut ctx: RequestContext) -> Result<RequestContext> {
for hook in &self.on_request {
ctx = hook(ctx).await?;
}
Ok(ctx)
}
pub(crate) async fn run_on_response(&self, ctx: ResponseContext) -> Result<Response> {
let request = ctx.request;
let mut response = ctx.response;
for hook in &self.on_response {
response = hook(ResponseContext {
request: request.clone(),
response,
})
.await?;
}
Ok(response)
}
pub(crate) async fn run_on_response_stream(
&self,
ctx: StreamingResponseContext,
) -> Result<StreamingResponseMeta> {
let request = ctx.request;
let mut meta = StreamingResponseMeta {
status: ctx.status,
headers: ctx.headers,
};
for hook in &self.on_response_stream {
meta = hook(StreamingResponseContext {
request: request.clone(),
status: meta.status,
headers: meta.headers,
})
.await?;
}
Ok(meta)
}
pub(crate) async fn run_on_success(&self, ctx: SuccessContext) {
for hook in &self.on_success {
hook(ctx.clone()).await;
}
}
pub(crate) async fn run_on_success_stream(&self, ctx: StreamingSuccessContext) {
for hook in &self.on_success_stream {
hook(ctx.clone()).await;
}
}
pub(crate) async fn run_on_error(&self, ctx: ErrorContext) {
for hook in &self.on_error {
hook(ctx.clone()).await;
}
}
pub(crate) async fn run_on_retry(&self, ctx: ResponseContext) {
for hook in &self.on_retry {
hook(ctx.clone()).await;
}
}
}