1use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use http::{HeaderMap, Method, StatusCode};
18use url::Url;
19
20use crate::error::Error;
21use crate::response::Response;
22use crate::Result;
23
24#[derive(Debug, Clone)]
26pub struct RequestContext {
27 pub url: Url,
29 pub method: Method,
31 pub headers: HeaderMap,
33 pub body: Option<Bytes>,
37 pub retry_attempt: u32,
41}
42
43#[derive(Debug, Clone)]
45pub struct ResponseContext {
46 pub request: RequestContext,
48 pub response: Response,
50}
51
52#[derive(Debug, Clone)]
54pub struct StreamingResponseContext {
55 pub request: RequestContext,
57 pub status: StatusCode,
59 pub headers: HeaderMap,
61}
62
63#[derive(Debug, Clone)]
65pub struct StreamingResponseMeta {
66 pub status: StatusCode,
68 pub headers: HeaderMap,
70}
71
72#[derive(Debug, Clone)]
74pub struct SuccessContext {
75 pub request: RequestContext,
77 pub response: Response,
79}
80
81#[derive(Debug, Clone)]
83pub struct StreamingSuccessContext {
84 pub request: RequestContext,
86 pub status: StatusCode,
88 pub headers: HeaderMap,
90}
91
92#[derive(Debug, Clone)]
94pub struct ErrorContext {
95 pub request: RequestContext,
97 pub response: Option<Response>,
99 pub error: Error,
101}
102
103impl ErrorContext {
104 pub fn response_body_preview(&self, max_bytes: usize) -> Option<String> {
108 let body = self.response.as_ref()?.bytes();
109 if body.is_empty() {
110 return None;
111 }
112 let lossy = String::from_utf8_lossy(body);
113 if lossy.len() <= max_bytes {
114 Some(lossy.into_owned())
115 } else {
116 let end = lossy
117 .char_indices()
118 .map(|(i, _)| i)
119 .nth(max_bytes)
120 .unwrap_or(lossy.len());
121 Some(format!("{}…", &lossy[..end]))
122 }
123 }
124}
125
126type RequestHookFn = Arc<
127 dyn Fn(RequestContext) -> Pin<Box<dyn Future<Output = Result<RequestContext>> + Send>>
128 + Send
129 + Sync,
130>;
131
132type ResponseHookFn = Arc<
133 dyn Fn(ResponseContext) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>> + Send + Sync,
134>;
135
136type StreamingResponseHookFn = Arc<
137 dyn Fn(
138 StreamingResponseContext,
139 ) -> Pin<Box<dyn Future<Output = Result<StreamingResponseMeta>> + Send>>
140 + Send
141 + Sync,
142>;
143
144type SuccessHookFn =
145 Arc<dyn Fn(SuccessContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
146
147type StreamingSuccessHookFn =
148 Arc<dyn Fn(StreamingSuccessContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
149
150type ErrorHookFn =
151 Arc<dyn Fn(ErrorContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
152
153type RetryHookFn =
154 Arc<dyn Fn(ResponseContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
155
156#[derive(Clone, Default)]
158pub struct Hooks {
159 pub(crate) on_request: Vec<RequestHookFn>,
160 pub(crate) on_response: Vec<ResponseHookFn>,
161 pub(crate) on_response_stream: Vec<StreamingResponseHookFn>,
162 pub(crate) on_success: Vec<SuccessHookFn>,
163 pub(crate) on_success_stream: Vec<StreamingSuccessHookFn>,
164 pub(crate) on_error: Vec<ErrorHookFn>,
165 pub(crate) on_retry: Vec<RetryHookFn>,
166}
167
168impl Hooks {
169 pub fn new() -> Self {
171 Self::default()
172 }
173
174 pub fn on_request<F, Fut>(mut self, f: F) -> Self
176 where
177 F: Fn(RequestContext) -> Fut + Send + Sync + 'static,
178 Fut: Future<Output = Result<RequestContext>> + Send + 'static,
179 {
180 self.on_request.push(Arc::new(move |ctx| Box::pin(f(ctx))));
181 self
182 }
183
184 pub fn on_response<F, Fut>(mut self, f: F) -> Self
186 where
187 F: Fn(ResponseContext) -> Fut + Send + Sync + 'static,
188 Fut: Future<Output = Result<Response>> + Send + 'static,
189 {
190 self.on_response.push(Arc::new(move |ctx| Box::pin(f(ctx))));
191 self
192 }
193
194 pub fn on_response_stream<F, Fut>(mut self, f: F) -> Self
199 where
200 F: Fn(StreamingResponseContext) -> Fut + Send + Sync + 'static,
201 Fut: Future<Output = Result<StreamingResponseMeta>> + Send + 'static,
202 {
203 self.on_response_stream
204 .push(Arc::new(move |ctx| Box::pin(f(ctx))));
205 self
206 }
207
208 pub fn on_success<F, Fut>(mut self, f: F) -> Self
210 where
211 F: Fn(SuccessContext) -> Fut + Send + Sync + 'static,
212 Fut: Future<Output = ()> + Send + 'static,
213 {
214 self.on_success.push(Arc::new(move |ctx| Box::pin(f(ctx))));
215 self
216 }
217
218 pub fn on_success_stream<F, Fut>(mut self, f: F) -> Self
220 where
221 F: Fn(StreamingSuccessContext) -> Fut + Send + Sync + 'static,
222 Fut: Future<Output = ()> + Send + 'static,
223 {
224 self.on_success_stream
225 .push(Arc::new(move |ctx| Box::pin(f(ctx))));
226 self
227 }
228
229 pub fn on_error<F, Fut>(mut self, f: F) -> Self
231 where
232 F: Fn(ErrorContext) -> Fut + Send + Sync + 'static,
233 Fut: Future<Output = ()> + Send + 'static,
234 {
235 self.on_error.push(Arc::new(move |ctx| Box::pin(f(ctx))));
236 self
237 }
238
239 pub fn on_retry<F, Fut>(mut self, f: F) -> Self
241 where
242 F: Fn(ResponseContext) -> Fut + Send + Sync + 'static,
243 Fut: Future<Output = ()> + Send + 'static,
244 {
245 self.on_retry.push(Arc::new(move |ctx| Box::pin(f(ctx))));
246 self
247 }
248
249 pub(crate) fn merge(mut self, other: Hooks) -> Self {
250 self.on_request.extend(other.on_request);
251 self.on_response.extend(other.on_response);
252 self.on_response_stream.extend(other.on_response_stream);
253 self.on_success.extend(other.on_success);
254 self.on_success_stream.extend(other.on_success_stream);
255 self.on_error.extend(other.on_error);
256 self.on_retry.extend(other.on_retry);
257 self
258 }
259
260 pub(crate) async fn run_on_request(&self, mut ctx: RequestContext) -> Result<RequestContext> {
261 for hook in &self.on_request {
262 ctx = hook(ctx).await?;
263 }
264 Ok(ctx)
265 }
266
267 pub(crate) async fn run_on_response(&self, ctx: ResponseContext) -> Result<Response> {
268 let request = ctx.request;
269 let mut response = ctx.response;
270 for hook in &self.on_response {
271 response = hook(ResponseContext {
272 request: request.clone(),
273 response,
274 })
275 .await?;
276 }
277 Ok(response)
278 }
279
280 pub(crate) async fn run_on_response_stream(
281 &self,
282 ctx: StreamingResponseContext,
283 ) -> Result<StreamingResponseMeta> {
284 let request = ctx.request;
285 let mut meta = StreamingResponseMeta {
286 status: ctx.status,
287 headers: ctx.headers,
288 };
289 for hook in &self.on_response_stream {
290 meta = hook(StreamingResponseContext {
291 request: request.clone(),
292 status: meta.status,
293 headers: meta.headers,
294 })
295 .await?;
296 }
297 Ok(meta)
298 }
299
300 pub(crate) async fn run_on_success(&self, ctx: SuccessContext) {
301 for hook in &self.on_success {
302 hook(ctx.clone()).await;
303 }
304 }
305
306 pub(crate) async fn run_on_success_stream(&self, ctx: StreamingSuccessContext) {
307 for hook in &self.on_success_stream {
308 hook(ctx.clone()).await;
309 }
310 }
311
312 pub(crate) async fn run_on_error(&self, ctx: ErrorContext) {
313 for hook in &self.on_error {
314 hook(ctx.clone()).await;
315 }
316 }
317
318 pub(crate) async fn run_on_retry(&self, ctx: ResponseContext) {
319 for hook in &self.on_retry {
320 hook(ctx.clone()).await;
321 }
322 }
323}