walker_extras/visitors/send/
mod.rs

1use backon::{ExponentialBuilder, Retryable};
2use bytes::Bytes;
3use reqwest::{Body, Method, StatusCode, Url, header};
4use std::time::Duration;
5use walker_common::sender::{self, HttpSender};
6
7#[cfg(feature = "sbom-walker")]
8mod sbom;
9#[cfg(feature = "sbom-walker")]
10pub use sbom::*;
11
12#[cfg(feature = "csaf-walker")]
13mod csaf;
14#[cfg(feature = "csaf-walker")]
15pub use csaf::*;
16
17#[cfg(feature = "clap")]
18mod clap;
19#[cfg(feature = "clap")]
20pub use self::clap::*;
21
22#[derive(Debug, thiserror::Error)]
23pub enum SendError {
24    #[error(transparent)]
25    Sender(#[from] sender::Error),
26    #[error(transparent)]
27    Request(#[from] reqwest::Error),
28    #[error("client error: {0}")]
29    Client(StatusCode),
30    #[error("server error: {0}")]
31    Server(StatusCode),
32    #[error("unexpected status: {0}")]
33    UnexpectedStatus(StatusCode),
34}
35
36/// Send data to a remote sink.
37#[non_exhaustive]
38#[derive(Clone)]
39pub struct SendVisitor {
40    /// The target endpoint
41    pub url: Url,
42
43    /// The HTTP client to use
44    pub sender: HttpSender,
45
46    /// The number of retries in case of a server or transmission failure
47    pub retries: usize,
48
49    /// The minimum delay between retries
50    pub min_delay: Option<Duration>,
51
52    /// The maximum delay between retries
53    pub max_delay: Option<Duration>,
54}
55
56impl SendVisitor {
57    pub fn new(url: impl Into<Url>, sender: HttpSender) -> Self {
58        Self {
59            url: url.into(),
60            sender,
61            retries: 0,
62            min_delay: None,
63            max_delay: None,
64        }
65    }
66
67    pub fn retries(mut self, retries: usize) -> Self {
68        self.retries = retries;
69        self
70    }
71
72    pub fn min_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
73        self.min_delay = Some(retry_delay.into());
74        self
75    }
76
77    pub fn max_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
78        self.max_delay = Some(retry_delay.into());
79        self
80    }
81}
82
83#[derive(Debug, thiserror::Error)]
84pub enum SendOnceError {
85    #[error(transparent)]
86    Temporary(SendError),
87    #[error(transparent)]
88    Permanent(SendError),
89}
90
91impl From<SendOnceError> for SendError {
92    fn from(value: SendOnceError) -> Self {
93        match value {
94            SendOnceError::Temporary(e) => e,
95            SendOnceError::Permanent(e) => e,
96        }
97    }
98}
99
100impl SendVisitor {
101    /// Send request once
102    async fn send_once<F>(
103        &self,
104        name: &str,
105        data: Bytes,
106        customizer: F,
107    ) -> Result<(), SendOnceError>
108    where
109        F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
110    {
111        let request = self
112            .sender
113            .request(Method::POST, self.url.clone())
114            .await
115            .map_err(|err| SendOnceError::Temporary(err.into()))?
116            .body(Body::from(data));
117        let request = customizer(request);
118        let response = request
119            .send()
120            .await
121            .map_err(|err| SendOnceError::Temporary(err.into()))?;
122
123        let status = response.status();
124
125        if status.is_success() {
126            log::debug!("Uploaded {} -> {}", name, response.status());
127            Ok(())
128        } else if status.is_client_error() {
129            log::warn!("Failed to upload, payload rejected {name} -> {status}",);
130            Err(SendOnceError::Permanent(SendError::Client(status)))
131        } else if status.is_server_error() {
132            log::warn!("Failed to upload, server error {name} -> {status}",);
133            Err(SendOnceError::Temporary(SendError::Server(status)))
134        } else {
135            Err(SendOnceError::Permanent(SendError::UnexpectedStatus(
136                status,
137            )))
138        }
139    }
140
141    /// Send request, retry in case of temporary errors
142    async fn send<F>(&self, name: &str, data: Bytes, customizer: F) -> Result<(), SendError>
143    where
144        F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
145    {
146        let mut retry = ExponentialBuilder::default();
147        if self.retries > 0 {
148            retry = retry.with_max_times(self.retries);
149        }
150        if let Some(min_delay) = self.min_delay {
151            retry = retry.with_min_delay(min_delay);
152        }
153        if let Some(max_delay) = self.max_delay {
154            retry = retry.with_max_delay(max_delay);
155        }
156
157        Ok(
158            (|| async { self.send_once(name, data.clone(), &customizer).await })
159                .retry(retry)
160                .sleep(tokio::time::sleep)
161                .when(|e| matches!(e, SendOnceError::Temporary(_)))
162                .notify(|err, dur| {
163                    log::info!("retrying {err} after {dur:?}");
164                })
165                .await?,
166        )
167    }
168}