walker_extras/visitors/send/
mod.rs

1use bytes::Bytes;
2use reqwest::{header, Body, Method, StatusCode, Url};
3use std::time::Duration;
4use walker_common::sender::{self, HttpSender};
5
6#[cfg(feature = "sbom-walker")]
7mod sbom;
8#[cfg(feature = "sbom-walker")]
9pub use sbom::*;
10
11#[cfg(feature = "csaf-walker")]
12mod csaf;
13#[cfg(feature = "csaf-walker")]
14pub use csaf::*;
15
16#[cfg(feature = "clap")]
17mod clap;
18#[cfg(feature = "clap")]
19pub use self::clap::*;
20
21#[derive(Debug, thiserror::Error)]
22pub enum SendError {
23    #[error(transparent)]
24    Sender(#[from] sender::Error),
25    #[error(transparent)]
26    Request(#[from] reqwest::Error),
27    #[error("client error: {0}")]
28    Client(StatusCode),
29    #[error("server error: {0}")]
30    Server(StatusCode),
31    #[error("unexpected status: {0}")]
32    UnexpectedStatus(StatusCode),
33}
34
35/// Send data to a remote sink.
36#[non_exhaustive]
37#[derive(Clone)]
38pub struct SendVisitor {
39    /// The target endpoint
40    pub url: Url,
41
42    /// The HTTP client to use
43    pub sender: HttpSender,
44
45    /// The number of retries in case of a server or transmission failure
46    pub retries: usize,
47
48    /// The delay between retries
49    pub retry_delay: Option<Duration>,
50}
51
52impl SendVisitor {
53    pub fn new(url: impl Into<Url>, sender: HttpSender) -> Self {
54        Self {
55            url: url.into(),
56            sender,
57            retries: 0,
58            retry_delay: None,
59        }
60    }
61
62    pub fn retries(mut self, retries: usize) -> Self {
63        self.retries = retries;
64        self
65    }
66
67    pub fn retry_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
68        self.retry_delay = Some(retry_delay.into());
69        self
70    }
71}
72
73/// The default amount of time to wait before trying
74const DEFAULT_RETRY_DELAY: Duration = Duration::from_secs(5);
75
76pub enum SendOnceError {
77    Temporary(SendError),
78    Permanent(SendError),
79}
80
81impl SendVisitor {
82    /// Send request once
83    async fn send_once<F>(
84        &self,
85        name: &str,
86        data: Bytes,
87        customizer: F,
88    ) -> Result<(), SendOnceError>
89    where
90        F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
91    {
92        let request = self
93            .sender
94            .request(Method::POST, self.url.clone())
95            .await
96            .map_err(|err| SendOnceError::Temporary(err.into()))?
97            .body(Body::from(data));
98        let request = customizer(request);
99        let response = request
100            .send()
101            .await
102            .map_err(|err| SendOnceError::Temporary(err.into()))?;
103
104        let status = response.status();
105
106        if status.is_success() {
107            log::debug!("Uploaded {} -> {}", name, response.status());
108            Ok(())
109        } else if status.is_client_error() {
110            log::warn!("Failed to upload, payload rejected {name} -> {status}",);
111            Err(SendOnceError::Permanent(SendError::Client(status)))
112        } else if status.is_server_error() {
113            log::warn!("Failed to upload, server error {name} -> {status}",);
114            Err(SendOnceError::Temporary(SendError::Server(status)))
115        } else {
116            Err(SendOnceError::Permanent(SendError::UnexpectedStatus(
117                status,
118            )))
119        }
120    }
121
122    /// Send request, retry in case of temporary errors
123    async fn send<F>(&self, name: &str, data: Bytes, customizer: F) -> Result<(), SendError>
124    where
125        F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
126    {
127        let mut retries = self.retries;
128        loop {
129            match self.send_once(name, data.clone(), &customizer).await {
130                Ok(()) => break Ok(()),
131                Err(SendOnceError::Permanent(err)) => break Err(err),
132                Err(SendOnceError::Temporary(err)) if retries == 0 => break Err(err),
133                Err(SendOnceError::Temporary(_)) => {
134                    log::debug!("Failed with a temporary error, retrying ...");
135                }
136            }
137
138            // sleep, then try again
139
140            tokio::time::sleep(self.retry_delay.unwrap_or(DEFAULT_RETRY_DELAY)).await;
141            log::info!("Retrying ({retries} attempts left)");
142            retries -= 1;
143        }
144    }
145}