jenkins_sdk/client/
async_client.rs

1//! High-level asynchronous Jenkins client.
2
3use crate::{
4    Auth, BodySnippetConfig, Error, HttpError, RequestHookContext, api,
5    transport::{
6        TransportBody, TransportRequest,
7        async_transport::{DynAsyncTransport, ReqwestAsync},
8        middleware::{CrumbAsync, HookAsync, RetryAsync, RetryConfig},
9        request::{Request, Response},
10    },
11    util::{
12        diagnostics,
13        redact::redact_text,
14        url::{endpoint_url, normalize_base_url, sanitize_url_for_error},
15    },
16};
17use http::HeaderMap;
18use serde::de::DeserializeOwned;
19use std::{sync::Arc, time::Duration};
20use url::Url;
21
22#[cfg(feature = "tracing")]
23use tracing::field;
24
25#[derive(Clone, Copy, Debug)]
26struct CrumbConfig {
27    ttl: Duration,
28}
29
30const DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
31
32/// Configures and constructs [`Client`].
33pub struct ClientBuilder {
34    base_url: Url,
35    auth: Option<Auth>,
36    insecure: bool,
37    user_agent: String,
38    timeout: Duration,
39    connect_timeout: Duration,
40    no_proxy: bool,
41    retry: Option<RetryConfig>,
42    crumb: Option<CrumbConfig>,
43    default_headers: HeaderMap,
44    body_snippet: BodySnippetConfig,
45    request_hook: Option<crate::RequestHook>,
46}
47
48impl ClientBuilder {
49    /// Create a builder with opinionated defaults.
50    fn try_new(base: impl AsRef<str>) -> Result<Self, Error> {
51        let base_url = normalize_base_url(base.as_ref())?;
52        Ok(Self {
53            base_url,
54            auth: None,
55            insecure: false,
56            user_agent: DEFAULT_USER_AGENT.to_owned(),
57            timeout: Duration::from_secs(30),
58            connect_timeout: Duration::from_secs(10),
59            no_proxy: false,
60            retry: None,
61            crumb: None,
62            default_headers: HeaderMap::new(),
63            body_snippet: BodySnippetConfig::default(),
64            request_hook: None,
65        })
66    }
67
68    /// Apply an authentication strategy.
69    pub fn auth(mut self, auth: Auth) -> Self {
70        self.auth = Some(auth);
71        self
72    }
73
74    /// Apply HTTP basic authentication credentials.
75    pub fn auth_basic(mut self, user: impl Into<String>, token: impl Into<String>) -> Self {
76        self.auth = Some(Auth::basic(user, token));
77        self
78    }
79
80    /// Ignore system proxy environment variables.
81    pub fn no_system_proxy(mut self) -> Self {
82        self.no_proxy = true;
83        self
84    }
85
86    /// Accept invalid TLS certificates (**dangerous**).
87    pub fn danger_accept_invalid_certs(mut self, yes: bool) -> Self {
88        self.insecure = yes;
89        self
90    }
91
92    /// Override the default `User-Agent` header.
93    pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
94        self.user_agent = ua.into();
95        self
96    }
97
98    /// Adjust the per-request timeout.
99    pub fn timeout(mut self, value: Duration) -> Self {
100        self.timeout = value;
101        self
102    }
103
104    /// Adjust the connection establishment timeout.
105    pub fn connect_timeout(mut self, value: Duration) -> Self {
106        self.connect_timeout = value;
107        self
108    }
109
110    /// Add a default header applied to every request.
111    pub fn default_header(
112        mut self,
113        name: http::header::HeaderName,
114        value: http::HeaderValue,
115    ) -> Self {
116        self.default_headers.insert(name, value);
117        self
118    }
119
120    /// Add a set of default headers applied to every request.
121    pub fn default_headers(mut self, headers: HeaderMap) -> Self {
122        self.default_headers.extend(headers);
123        self
124    }
125
126    /// Enable/disable capturing `body_snippet` on errors and decode failures.
127    pub fn capture_body_snippet(mut self, enabled: bool) -> Self {
128        self.body_snippet.enabled = enabled;
129        self
130    }
131
132    /// Set max bytes to keep for `body_snippet`.
133    pub fn max_body_snippet_bytes(mut self, max_bytes: usize) -> Self {
134        self.body_snippet.max_bytes = max_bytes;
135        self
136    }
137
138    /// Wrap the transport with a conservative retry policy.
139    pub fn with_retry(mut self, max_retries: usize, base_delay: Duration) -> Self {
140        self.retry = Some(RetryConfig::new(max_retries, base_delay));
141        self
142    }
143
144    /// Use a custom retry configuration.
145    pub fn retry_config(mut self, config: RetryConfig) -> Self {
146        self.retry = Some(config);
147        self
148    }
149
150    /// Enable CSRF crumb fetching on the first non-GET request.
151    pub fn with_crumb(mut self, ttl: Duration) -> Self {
152        self.crumb = Some(CrumbConfig { ttl });
153        self
154    }
155
156    /// Add a hook invoked for every request attempt (including retries).
157    pub fn request_hook<F>(mut self, hook: F) -> Self
158    where
159        F: for<'a> Fn(RequestHookContext<'a>) -> Result<(), Error> + Send + Sync + 'static,
160    {
161        self.request_hook = Some(Arc::new(hook));
162        self
163    }
164
165    /// Finalise configuration and build the client.
166    pub fn build(self) -> Result<Client, Error> {
167        let base = self.base_url;
168
169        let mut transport: DynAsyncTransport = Arc::new(ReqwestAsync::try_new(
170            self.insecure,
171            &self.user_agent,
172            self.timeout,
173            self.connect_timeout,
174            self.no_proxy,
175        )?);
176
177        if let Some(hook) = self.request_hook {
178            transport = Arc::new(HookAsync::new(transport, hook));
179        }
180
181        if let Some(retry) = self.retry {
182            transport = Arc::new(RetryAsync::new(transport, retry));
183        }
184
185        if let Some(crumb) = self.crumb {
186            transport = Arc::new(CrumbAsync::new(
187                transport,
188                base.clone(),
189                self.auth.clone(),
190                self.default_headers.clone(),
191                crumb.ttl,
192                self.timeout,
193                self.body_snippet,
194            ));
195        }
196
197        Ok(Client {
198            inner: Arc::new(Inner {
199                base,
200                auth: self.auth,
201                timeout: self.timeout,
202                default_headers: self.default_headers,
203                body_snippet: self.body_snippet,
204                transport,
205            }),
206        })
207    }
208}
209
210#[derive(Clone)]
211pub struct Client {
212    inner: Arc<Inner>,
213}
214
215struct Inner {
216    base: Url,
217    auth: Option<Auth>,
218    timeout: Duration,
219    default_headers: HeaderMap,
220    body_snippet: BodySnippetConfig,
221    transport: DynAsyncTransport,
222}
223
224impl Client {
225    pub fn builder(base: impl AsRef<str>) -> Result<ClientBuilder, Error> {
226        ClientBuilder::try_new(base)
227    }
228
229    pub fn new(base: impl AsRef<str>) -> Result<Self, Error> {
230        Self::builder(base)?.build()
231    }
232
233    #[must_use]
234    pub fn system(&self) -> api::SystemService {
235        api::SystemService::new(self.clone())
236    }
237
238    #[must_use]
239    pub fn jobs(&self) -> api::JobsService {
240        api::JobsService::new(self.clone())
241    }
242
243    #[must_use]
244    pub fn queue(&self) -> api::QueueService {
245        api::QueueService::new(self.clone())
246    }
247
248    #[must_use]
249    pub fn computers(&self) -> api::ComputersService {
250        api::ComputersService::new(self.clone())
251    }
252
253    #[must_use]
254    pub fn views(&self) -> api::ViewsService {
255        api::ViewsService::new(self.clone())
256    }
257
258    #[must_use]
259    pub fn users(&self) -> api::UsersService {
260        api::UsersService::new(self.clone())
261    }
262
263    #[must_use]
264    pub fn people(&self) -> api::PeopleService {
265        api::PeopleService::new(self.clone())
266    }
267
268    pub(crate) async fn send_json<T: DeserializeOwned + Send + 'static>(
269        &self,
270        req: Request,
271    ) -> Result<T, Error> {
272        let url = endpoint_url(&self.inner.base, req.segments.iter().map(|s| s.as_str()))?;
273        let resp = self.execute_request(&req).await?;
274        resp.json().map_err(|source| Error::Decode {
275            status: resp.status,
276            method: req.method,
277            path: url.path().to_string().into_boxed_str(),
278            request_id: diagnostics::request_id(&resp.headers),
279            body_snippet: diagnostics::body_snippet(
280                &resp.body,
281                self.inner.body_snippet,
282                self.inner.auth.as_ref(),
283            ),
284            source: Box::new(source),
285        })
286    }
287
288    pub(crate) async fn send_text(&self, req: Request) -> Result<String, Error> {
289        let resp = self.execute_request(&req).await?;
290        Ok(String::from_utf8_lossy(&resp.body).into_owned())
291    }
292
293    pub(crate) async fn send_bytes(&self, req: Request) -> Result<Vec<u8>, Error> {
294        let resp = self.execute_request(&req).await?;
295        Ok(resp.body)
296    }
297
298    pub(crate) async fn send_unit(&self, req: Request) -> Result<(), Error> {
299        let _ = self.execute_request(&req).await?;
300        Ok(())
301    }
302
303    pub(crate) async fn send_response(&self, req: Request) -> Result<Response, Error> {
304        self.execute_request(&req).await
305    }
306
307    #[cfg(feature = "unstable-raw")]
308    pub async fn execute(&self, req: &Request) -> Result<Response, Error> {
309        self.execute_request(req).await
310    }
311
312    pub(crate) async fn execute_request(&self, req: &Request) -> Result<Response, Error> {
313        #[cfg(feature = "metrics")]
314        let _inflight = crate::transport::metrics::InFlightGuard::new();
315
316        if req.body.is_some() && !req.form.is_empty() {
317            return Err(Error::InvalidConfig {
318                message: "request.body and request.form are mutually exclusive".into(),
319                source: None,
320            });
321        }
322
323        let url = endpoint_url(&self.inner.base, req.segments.iter().map(|s| s.as_str()))?;
324
325        let mut headers = self.inner.default_headers.clone();
326        if let Some(auth) = &self.inner.auth {
327            auth.apply(&mut headers)?;
328        }
329        headers.extend(req.headers.clone());
330
331        let body = req.body.clone().map(|body| TransportBody {
332            bytes: body.bytes,
333            content_type: body.content_type,
334        });
335
336        #[cfg(any(feature = "tracing", feature = "metrics"))]
337        let start = std::time::Instant::now();
338        #[cfg(feature = "tracing")]
339        let span = tracing::info_span!(
340            "jenkins.request",
341            http.method = %req.method,
342            http.host = %self.inner.base.host_str().unwrap_or_default(),
343            http.path = %url.path(),
344            http.status = field::Empty,
345            request_id = field::Empty,
346            retries = field::Empty,
347            latency_ms = field::Empty,
348            error_kind = field::Empty,
349        );
350        #[cfg(feature = "tracing")]
351        let _enter = span.enter();
352
353        let timeout = req.timeout_override.unwrap_or(self.inner.timeout);
354        let resp = match self
355            .inner
356            .transport
357            .send(TransportRequest {
358                method: req.method.clone(),
359                url: url.clone(),
360                headers,
361                query: req.query.clone(),
362                form: req.form.clone(),
363                body,
364                timeout,
365            })
366            .await
367        {
368            Ok(resp) => resp,
369            Err(err) => {
370                #[cfg(feature = "metrics")]
371                crate::transport::metrics::record_outcome(
372                    &req.method,
373                    err.status(),
374                    start.elapsed(),
375                    0,
376                    Some(err.kind()),
377                );
378                #[cfg(feature = "tracing")]
379                {
380                    span.record("error_kind", field::debug(err.kind()));
381                    span.record("latency_ms", start.elapsed().as_millis() as i64);
382                }
383                return Err(err);
384            }
385        };
386
387        let request_id = diagnostics::request_id(&resp.headers);
388
389        #[cfg(feature = "tracing")]
390        {
391            span.record("http.status", resp.status.as_u16() as i64);
392            span.record("retries", resp.meta.retries as i64);
393            span.record("latency_ms", start.elapsed().as_millis() as i64);
394            if let Some(rid) = request_id.as_deref() {
395                span.record("request_id", field::display(rid));
396            }
397        }
398
399        if resp.status.is_client_error() || resp.status.is_server_error() {
400            let safe_url = sanitize_url_for_error(&url);
401            let message = diagnostics::extract_message(&resp.body)
402                .map(|msg| redact_text(msg.into(), self.inner.auth.as_ref()).into_boxed_str());
403            let http_error = HttpError {
404                status: resp.status,
405                method: req.method.clone(),
406                url: Box::new(safe_url),
407                message,
408                request_id,
409                body_snippet: diagnostics::body_snippet(
410                    &resp.body,
411                    self.inner.body_snippet,
412                    self.inner.auth.as_ref(),
413                ),
414            };
415
416            let retry_after = crate::transport::middleware::retry::parse_retry_after(
417                &resp.headers,
418                std::time::SystemTime::now(),
419            );
420            let err = Error::from_http(http_error, retry_after);
421
422            #[cfg(feature = "metrics")]
423            crate::transport::metrics::record_outcome(
424                &req.method,
425                err.status(),
426                start.elapsed(),
427                resp.meta.retries,
428                Some(err.kind()),
429            );
430            #[cfg(feature = "tracing")]
431            span.record("error_kind", field::debug(err.kind()));
432
433            return Err(err);
434        }
435
436        let _retries = resp.meta.retries;
437        let response = Response {
438            status: resp.status,
439            headers: resp.headers,
440            body: resp.body,
441            #[cfg(feature = "unstable-raw")]
442            retries: _retries,
443        };
444
445        #[cfg(feature = "metrics")]
446        crate::transport::metrics::record_outcome(
447            &req.method,
448            Some(response.status),
449            start.elapsed(),
450            _retries,
451            None,
452        );
453
454        Ok(response)
455    }
456}