walker_extras/visitors/send/
mod.rs1use backon::{ExponentialBuilder, Retryable};
2use bytes::Bytes;
3use reqwest::{Body, Method, StatusCode, Url, header};
4use std::time::Duration;
5use walker_common::{
6 http::calculate_retry_after_from_response_header,
7 sender::{self, HttpSender},
8};
9
10#[cfg(feature = "sbom-walker")]
11mod sbom;
12#[cfg(feature = "sbom-walker")]
13pub use sbom::*;
14
15#[cfg(feature = "csaf-walker")]
16mod csaf;
17#[cfg(feature = "csaf-walker")]
18pub use csaf::*;
19
20#[cfg(feature = "clap")]
21mod clap;
22#[cfg(feature = "clap")]
23pub use self::clap::*;
24
25#[derive(Debug, thiserror::Error)]
26pub enum SendError {
27 #[error(transparent)]
28 Sender(#[from] sender::Error),
29 #[error(transparent)]
30 Request(#[from] reqwest::Error),
31 #[error("client error: {0}")]
32 Client(StatusCode),
33 #[error("server error: {0}")]
34 Server(StatusCode),
35 #[error("unexpected status: {0}")]
36 UnexpectedStatus(StatusCode),
37 #[error("Rate limited (HTTP 429), retry after {0:?}")]
38 RateLimited(Duration),
39}
40
41#[non_exhaustive]
43#[derive(Clone)]
44pub struct SendVisitor {
45 pub url: Url,
47
48 pub sender: HttpSender,
50
51 retries: usize,
53
54 min_delay: Option<Duration>,
56
57 max_delay: Option<Duration>,
59
60 default_retry_after: Duration,
62}
63
64impl SendVisitor {
65 pub fn new(url: impl Into<Url>, sender: HttpSender) -> Self {
66 Self {
67 url: url.into(),
68 sender,
69 retries: 0,
70 min_delay: None,
71 max_delay: None,
72 default_retry_after: Duration::from_secs(10),
73 }
74 }
75
76 pub fn retries(mut self, retries: usize) -> Self {
77 self.retries = retries;
78 self
79 }
80
81 pub fn min_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
82 self.min_delay = Some(retry_delay.into());
83 self
84 }
85
86 pub fn max_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
87 self.max_delay = Some(retry_delay.into());
88 self
89 }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum SendOnceError {
94 #[error(transparent)]
95 Temporary(SendError),
96 #[error(transparent)]
97 Permanent(SendError),
98}
99
100impl From<SendOnceError> for SendError {
101 fn from(value: SendOnceError) -> Self {
102 match value {
103 SendOnceError::Temporary(e) => e,
104 SendOnceError::Permanent(e) => e,
105 }
106 }
107}
108
109impl SendVisitor {
110 async fn send_once<F>(
112 &self,
113 name: &str,
114 data: Bytes,
115 customizer: F,
116 ) -> Result<(), SendOnceError>
117 where
118 F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
119 {
120 let request = self
121 .sender
122 .request(Method::POST, self.url.clone())
123 .await
124 .map_err(|err| SendOnceError::Temporary(err.into()))?
125 .body(Body::from(data));
126 let request = customizer(request);
127 let response = request
128 .send()
129 .await
130 .map_err(|err| SendOnceError::Temporary(err.into()))?;
131
132 if let Some(retry_after) =
133 calculate_retry_after_from_response_header(&response, self.default_retry_after)
134 {
135 log::info!(
136 "Rate limited (429) when uploading {name}, retry after: {:?}",
137 retry_after
138 );
139 return Err(SendOnceError::Temporary(SendError::RateLimited(
140 retry_after,
141 )));
142 }
143
144 let status = response.status();
145
146 if status.is_success() {
147 log::debug!("Uploaded {} -> {}", name, response.status());
148 Ok(())
149 } else if status.is_client_error() {
150 log::warn!("Failed to upload, payload rejected {name} -> {status}",);
151 Err(SendOnceError::Permanent(SendError::Client(status)))
152 } else if status.is_server_error() {
153 log::warn!("Failed to upload, server error {name} -> {status}",);
154 Err(SendOnceError::Temporary(SendError::Server(status)))
155 } else {
156 Err(SendOnceError::Permanent(SendError::UnexpectedStatus(
157 status,
158 )))
159 }
160 }
161
162 async fn send<F>(&self, name: &str, data: Bytes, customizer: F) -> Result<(), SendError>
164 where
165 F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
166 {
167 let mut retry = ExponentialBuilder::default();
168 if self.retries > 0 {
169 retry = retry.with_max_times(self.retries);
170 }
171 if let Some(min_delay) = self.min_delay {
172 retry = retry.with_min_delay(min_delay);
173 }
174 if let Some(max_delay) = self.max_delay {
175 retry = retry.with_max_delay(max_delay);
176 }
177
178 Ok(
179 (|| async { self.send_once(name, data.clone(), &customizer).await })
180 .retry(retry)
181 .when(|e| matches!(e, SendOnceError::Temporary(_)))
182 .adjust(|e, dur| {
183 if let SendOnceError::Temporary(SendError::RateLimited(retry_after)) = e {
184 if let Some(dur_value) = dur
185 && dur_value > *retry_after
186 {
187 return dur;
188 }
189 Some(*retry_after) } else {
191 dur }
193 })
194 .await?,
195 )
196 }
197}