walker_extras/visitors/send/
mod.rs1use backon::{ExponentialBuilder, Retryable};
2use bytes::Bytes;
3use reqwest::{Body, Method, StatusCode, Url, header};
4use std::time::Duration;
5use walker_common::sender::{self, HttpSender};
6
7#[cfg(feature = "sbom-walker")]
8mod sbom;
9#[cfg(feature = "sbom-walker")]
10pub use sbom::*;
11
12#[cfg(feature = "csaf-walker")]
13mod csaf;
14#[cfg(feature = "csaf-walker")]
15pub use csaf::*;
16
17#[cfg(feature = "clap")]
18mod clap;
19#[cfg(feature = "clap")]
20pub use self::clap::*;
21
22#[derive(Debug, thiserror::Error)]
23pub enum SendError {
24 #[error(transparent)]
25 Sender(#[from] sender::Error),
26 #[error(transparent)]
27 Request(#[from] reqwest::Error),
28 #[error("client error: {0}")]
29 Client(StatusCode),
30 #[error("server error: {0}")]
31 Server(StatusCode),
32 #[error("unexpected status: {0}")]
33 UnexpectedStatus(StatusCode),
34}
35
36#[non_exhaustive]
38#[derive(Clone)]
39pub struct SendVisitor {
40 pub url: Url,
42
43 pub sender: HttpSender,
45
46 pub retries: usize,
48
49 pub min_delay: Option<Duration>,
51
52 pub max_delay: Option<Duration>,
54}
55
56impl SendVisitor {
57 pub fn new(url: impl Into<Url>, sender: HttpSender) -> Self {
58 Self {
59 url: url.into(),
60 sender,
61 retries: 0,
62 min_delay: None,
63 max_delay: None,
64 }
65 }
66
67 pub fn retries(mut self, retries: usize) -> Self {
68 self.retries = retries;
69 self
70 }
71
72 pub fn min_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
73 self.min_delay = Some(retry_delay.into());
74 self
75 }
76
77 pub fn max_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
78 self.max_delay = Some(retry_delay.into());
79 self
80 }
81}
82
83#[derive(Debug, thiserror::Error)]
84pub enum SendOnceError {
85 #[error(transparent)]
86 Temporary(SendError),
87 #[error(transparent)]
88 Permanent(SendError),
89}
90
91impl From<SendOnceError> for SendError {
92 fn from(value: SendOnceError) -> Self {
93 match value {
94 SendOnceError::Temporary(e) => e,
95 SendOnceError::Permanent(e) => e,
96 }
97 }
98}
99
100impl SendVisitor {
101 async fn send_once<F>(
103 &self,
104 name: &str,
105 data: Bytes,
106 customizer: F,
107 ) -> Result<(), SendOnceError>
108 where
109 F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
110 {
111 let request = self
112 .sender
113 .request(Method::POST, self.url.clone())
114 .await
115 .map_err(|err| SendOnceError::Temporary(err.into()))?
116 .body(Body::from(data));
117 let request = customizer(request);
118 let response = request
119 .send()
120 .await
121 .map_err(|err| SendOnceError::Temporary(err.into()))?;
122
123 let status = response.status();
124
125 if status.is_success() {
126 log::debug!("Uploaded {} -> {}", name, response.status());
127 Ok(())
128 } else if status.is_client_error() {
129 log::warn!("Failed to upload, payload rejected {name} -> {status}",);
130 Err(SendOnceError::Permanent(SendError::Client(status)))
131 } else if status.is_server_error() {
132 log::warn!("Failed to upload, server error {name} -> {status}",);
133 Err(SendOnceError::Temporary(SendError::Server(status)))
134 } else {
135 Err(SendOnceError::Permanent(SendError::UnexpectedStatus(
136 status,
137 )))
138 }
139 }
140
141 async fn send<F>(&self, name: &str, data: Bytes, customizer: F) -> Result<(), SendError>
143 where
144 F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
145 {
146 let mut retry = ExponentialBuilder::default();
147 if self.retries > 0 {
148 retry = retry.with_max_times(self.retries);
149 }
150 if let Some(min_delay) = self.min_delay {
151 retry = retry.with_min_delay(min_delay);
152 }
153 if let Some(max_delay) = self.max_delay {
154 retry = retry.with_max_delay(max_delay);
155 }
156
157 Ok(
158 (|| async { self.send_once(name, data.clone(), &customizer).await })
159 .retry(retry)
160 .sleep(tokio::time::sleep)
161 .when(|e| matches!(e, SendOnceError::Temporary(_)))
162 .notify(|err, dur| {
163 log::info!("retrying {err} after {dur:?}");
164 })
165 .await?,
166 )
167 }
168}