Skip to main content

walker_extras/visitors/send/
mod.rs

1use backon::{ExponentialBuilder, Retryable};
2use bytes::Bytes;
3use reqwest::{Body, Method, StatusCode, Url, header};
4use std::time::Duration;
5use walker_common::{
6    http::calculate_retry_after_from_response_header,
7    sender::{self, HttpSender},
8};
9
10#[cfg(feature = "sbom-walker")]
11mod sbom;
12#[cfg(feature = "sbom-walker")]
13pub use sbom::*;
14
15#[cfg(feature = "csaf-walker")]
16mod csaf;
17#[cfg(feature = "csaf-walker")]
18pub use csaf::*;
19
20#[cfg(feature = "clap")]
21mod clap;
22#[cfg(feature = "clap")]
23pub use self::clap::*;
24
25#[derive(Debug, thiserror::Error)]
26pub enum SendError {
27    #[error(transparent)]
28    Sender(#[from] sender::Error),
29    #[error(transparent)]
30    Request(#[from] reqwest::Error),
31    #[error("client error: {0}")]
32    Client(StatusCode),
33    #[error("server error: {0}")]
34    Server(StatusCode),
35    #[error("unexpected status: {0}")]
36    UnexpectedStatus(StatusCode),
37    #[error("Rate limited (HTTP 429), retry after {0:?}")]
38    RateLimited(Duration),
39}
40
41/// Send data to a remote sink.
42#[non_exhaustive]
43#[derive(Clone)]
44pub struct SendVisitor {
45    /// The target endpoint
46    pub url: Url,
47
48    /// The HTTP client to use
49    pub sender: HttpSender,
50
51    /// The number of retries in case of a server or transmission failure
52    retries: usize,
53
54    /// The minimum delay between retries, will be overruled by the retry-after header if present.
55    min_delay: Option<Duration>,
56
57    /// The maximum delay between retries, will be overruled by the retry-after header if present.
58    max_delay: Option<Duration>,
59
60    /// The default retry-after duration when a 429 response doesn't include a Retry-After header
61    default_retry_after: Duration,
62}
63
64impl SendVisitor {
65    pub fn new(url: impl Into<Url>, sender: HttpSender) -> Self {
66        Self {
67            url: url.into(),
68            sender,
69            retries: 0,
70            min_delay: None,
71            max_delay: None,
72            default_retry_after: Duration::from_secs(10),
73        }
74    }
75
76    pub fn retries(mut self, retries: usize) -> Self {
77        self.retries = retries;
78        self
79    }
80
81    pub fn min_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
82        self.min_delay = Some(retry_delay.into());
83        self
84    }
85
86    pub fn max_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
87        self.max_delay = Some(retry_delay.into());
88        self
89    }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum SendOnceError {
94    #[error(transparent)]
95    Temporary(SendError),
96    #[error(transparent)]
97    Permanent(SendError),
98}
99
100impl From<SendOnceError> for SendError {
101    fn from(value: SendOnceError) -> Self {
102        match value {
103            SendOnceError::Temporary(e) => e,
104            SendOnceError::Permanent(e) => e,
105        }
106    }
107}
108
109impl SendVisitor {
110    /// Send request once
111    async fn send_once<F>(
112        &self,
113        name: &str,
114        data: Bytes,
115        customizer: F,
116    ) -> Result<(), SendOnceError>
117    where
118        F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
119    {
120        let request = self
121            .sender
122            .request(Method::POST, self.url.clone())
123            .await
124            .map_err(|err| SendOnceError::Temporary(err.into()))?
125            .body(Body::from(data));
126        let request = customizer(request);
127        let response = request
128            .send()
129            .await
130            .map_err(|err| SendOnceError::Temporary(err.into()))?;
131
132        if let Some(retry_after) =
133            calculate_retry_after_from_response_header(&response, self.default_retry_after)
134        {
135            log::info!(
136                "Rate limited (429) when uploading {name}, retry after: {:?}",
137                retry_after
138            );
139            return Err(SendOnceError::Temporary(SendError::RateLimited(
140                retry_after,
141            )));
142        }
143
144        let status = response.status();
145
146        if status.is_success() {
147            log::debug!("Uploaded {} -> {}", name, response.status());
148            Ok(())
149        } else if status.is_client_error() {
150            log::warn!("Failed to upload, payload rejected {name} -> {status}",);
151            Err(SendOnceError::Permanent(SendError::Client(status)))
152        } else if status.is_server_error() {
153            log::warn!("Failed to upload, server error {name} -> {status}",);
154            Err(SendOnceError::Temporary(SendError::Server(status)))
155        } else {
156            Err(SendOnceError::Permanent(SendError::UnexpectedStatus(
157                status,
158            )))
159        }
160    }
161
162    /// Send request, retry in case of temporary errors
163    async fn send<F>(&self, name: &str, data: Bytes, customizer: F) -> Result<(), SendError>
164    where
165        F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
166    {
167        let mut retry = ExponentialBuilder::default();
168        if self.retries > 0 {
169            retry = retry.with_max_times(self.retries);
170        }
171        if let Some(min_delay) = self.min_delay {
172            retry = retry.with_min_delay(min_delay);
173        }
174        if let Some(max_delay) = self.max_delay {
175            retry = retry.with_max_delay(max_delay);
176        }
177
178        Ok(
179            (|| async { self.send_once(name, data.clone(), &customizer).await })
180                .retry(retry)
181                .when(|e| matches!(e, SendOnceError::Temporary(_)))
182                .adjust(|e, dur| {
183                    if let SendOnceError::Temporary(SendError::RateLimited(retry_after)) = e {
184                        if let Some(dur_value) = dur
185                            && dur_value > *retry_after
186                        {
187                            return dur;
188                        }
189                        Some(*retry_after) // only use server-provided delay if it's longer
190                    } else {
191                        dur // minimum delay as per backoff strategy
192                    }
193                })
194                .await?,
195        )
196    }
197}