1use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use bytes::Bytes;
17use http::{HeaderMap, Method, StatusCode};
18use url::Url;
19
20use crate::error::Error;
21use crate::response::Response;
22use crate::Result;
23
24#[derive(Debug, Clone)]
26pub struct RequestContext {
27 pub url: Url,
29 pub method: Method,
31 pub headers: HeaderMap,
33 pub body: Option<Bytes>,
35 pub retry_attempt: u32,
39}
40
41#[derive(Debug, Clone)]
43pub struct ResponseContext {
44 pub request: RequestContext,
46 pub response: Response,
48}
49
50#[derive(Debug, Clone)]
52pub struct StreamingResponseContext {
53 pub request: RequestContext,
55 pub status: StatusCode,
57 pub headers: HeaderMap,
59}
60
61#[derive(Debug, Clone)]
63pub struct StreamingResponseMeta {
64 pub status: StatusCode,
66 pub headers: HeaderMap,
68}
69
70#[derive(Debug, Clone)]
72pub struct SuccessContext {
73 pub request: RequestContext,
75 pub response: Response,
77}
78
79#[derive(Debug, Clone)]
81pub struct StreamingSuccessContext {
82 pub request: RequestContext,
84 pub status: StatusCode,
86 pub headers: HeaderMap,
88}
89
90#[derive(Debug, Clone)]
92pub struct ErrorContext {
93 pub request: RequestContext,
95 pub response: Option<Response>,
97 pub error: Error,
99}
100
101type RequestHookFn = Arc<
102 dyn Fn(RequestContext) -> Pin<Box<dyn Future<Output = Result<RequestContext>> + Send>>
103 + Send
104 + Sync,
105>;
106
107type ResponseHookFn = Arc<
108 dyn Fn(ResponseContext) -> Pin<Box<dyn Future<Output = Result<Response>> + Send>> + Send + Sync,
109>;
110
111type StreamingResponseHookFn = Arc<
112 dyn Fn(
113 StreamingResponseContext,
114 ) -> Pin<Box<dyn Future<Output = Result<StreamingResponseMeta>> + Send>>
115 + Send
116 + Sync,
117>;
118
119type SuccessHookFn =
120 Arc<dyn Fn(SuccessContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
121
122type StreamingSuccessHookFn =
123 Arc<dyn Fn(StreamingSuccessContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
124
125type ErrorHookFn =
126 Arc<dyn Fn(ErrorContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
127
128type RetryHookFn =
129 Arc<dyn Fn(ResponseContext) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
130
131#[derive(Clone, Default)]
133pub struct Hooks {
134 pub(crate) on_request: Vec<RequestHookFn>,
135 pub(crate) on_response: Vec<ResponseHookFn>,
136 pub(crate) on_response_stream: Vec<StreamingResponseHookFn>,
137 pub(crate) on_success: Vec<SuccessHookFn>,
138 pub(crate) on_success_stream: Vec<StreamingSuccessHookFn>,
139 pub(crate) on_error: Vec<ErrorHookFn>,
140 pub(crate) on_retry: Vec<RetryHookFn>,
141}
142
143impl Hooks {
144 pub fn new() -> Self {
146 Self::default()
147 }
148
149 pub fn on_request<F, Fut>(mut self, f: F) -> Self
151 where
152 F: Fn(RequestContext) -> Fut + Send + Sync + 'static,
153 Fut: Future<Output = Result<RequestContext>> + Send + 'static,
154 {
155 self.on_request.push(Arc::new(move |ctx| Box::pin(f(ctx))));
156 self
157 }
158
159 pub fn on_response<F, Fut>(mut self, f: F) -> Self
161 where
162 F: Fn(ResponseContext) -> Fut + Send + Sync + 'static,
163 Fut: Future<Output = Result<Response>> + Send + 'static,
164 {
165 self.on_response.push(Arc::new(move |ctx| Box::pin(f(ctx))));
166 self
167 }
168
169 pub fn on_response_stream<F, Fut>(mut self, f: F) -> Self
174 where
175 F: Fn(StreamingResponseContext) -> Fut + Send + Sync + 'static,
176 Fut: Future<Output = Result<StreamingResponseMeta>> + Send + 'static,
177 {
178 self.on_response_stream
179 .push(Arc::new(move |ctx| Box::pin(f(ctx))));
180 self
181 }
182
183 pub fn on_success<F, Fut>(mut self, f: F) -> Self
185 where
186 F: Fn(SuccessContext) -> Fut + Send + Sync + 'static,
187 Fut: Future<Output = ()> + Send + 'static,
188 {
189 self.on_success.push(Arc::new(move |ctx| Box::pin(f(ctx))));
190 self
191 }
192
193 pub fn on_success_stream<F, Fut>(mut self, f: F) -> Self
195 where
196 F: Fn(StreamingSuccessContext) -> Fut + Send + Sync + 'static,
197 Fut: Future<Output = ()> + Send + 'static,
198 {
199 self.on_success_stream
200 .push(Arc::new(move |ctx| Box::pin(f(ctx))));
201 self
202 }
203
204 pub fn on_error<F, Fut>(mut self, f: F) -> Self
206 where
207 F: Fn(ErrorContext) -> Fut + Send + Sync + 'static,
208 Fut: Future<Output = ()> + Send + 'static,
209 {
210 self.on_error.push(Arc::new(move |ctx| Box::pin(f(ctx))));
211 self
212 }
213
214 pub fn on_retry<F, Fut>(mut self, f: F) -> Self
216 where
217 F: Fn(ResponseContext) -> Fut + Send + Sync + 'static,
218 Fut: Future<Output = ()> + Send + 'static,
219 {
220 self.on_retry.push(Arc::new(move |ctx| Box::pin(f(ctx))));
221 self
222 }
223
224 pub(crate) fn merge(mut self, other: Hooks) -> Self {
225 self.on_request.extend(other.on_request);
226 self.on_response.extend(other.on_response);
227 self.on_response_stream.extend(other.on_response_stream);
228 self.on_success.extend(other.on_success);
229 self.on_success_stream.extend(other.on_success_stream);
230 self.on_error.extend(other.on_error);
231 self.on_retry.extend(other.on_retry);
232 self
233 }
234
235 pub(crate) async fn run_on_request(&self, mut ctx: RequestContext) -> Result<RequestContext> {
236 for hook in &self.on_request {
237 ctx = hook(ctx).await?;
238 }
239 Ok(ctx)
240 }
241
242 pub(crate) async fn run_on_response(&self, ctx: ResponseContext) -> Result<Response> {
243 let request = ctx.request;
244 let mut response = ctx.response;
245 for hook in &self.on_response {
246 response = hook(ResponseContext {
247 request: request.clone(),
248 response,
249 })
250 .await?;
251 }
252 Ok(response)
253 }
254
255 pub(crate) async fn run_on_response_stream(
256 &self,
257 ctx: StreamingResponseContext,
258 ) -> Result<StreamingResponseMeta> {
259 let request = ctx.request;
260 let mut meta = StreamingResponseMeta {
261 status: ctx.status,
262 headers: ctx.headers,
263 };
264 for hook in &self.on_response_stream {
265 meta = hook(StreamingResponseContext {
266 request: request.clone(),
267 status: meta.status,
268 headers: meta.headers,
269 })
270 .await?;
271 }
272 Ok(meta)
273 }
274
275 pub(crate) async fn run_on_success(&self, ctx: SuccessContext) {
276 for hook in &self.on_success {
277 hook(ctx.clone()).await;
278 }
279 }
280
281 pub(crate) async fn run_on_success_stream(&self, ctx: StreamingSuccessContext) {
282 for hook in &self.on_success_stream {
283 hook(ctx.clone()).await;
284 }
285 }
286
287 pub(crate) async fn run_on_error(&self, ctx: ErrorContext) {
288 for hook in &self.on_error {
289 hook(ctx.clone()).await;
290 }
291 }
292
293 pub(crate) async fn run_on_retry(&self, ctx: ResponseContext) {
294 for hook in &self.on_retry {
295 hook(ctx.clone()).await;
296 }
297 }
298}