walker_extras/visitors/send/
mod.rs1use bytes::Bytes;
2use reqwest::{header, Body, Method, StatusCode, Url};
3use std::time::Duration;
4use walker_common::sender::{self, HttpSender};
5
6#[cfg(feature = "sbom-walker")]
7mod sbom;
8#[cfg(feature = "sbom-walker")]
9pub use sbom::*;
10
11#[cfg(feature = "csaf-walker")]
12mod csaf;
13#[cfg(feature = "csaf-walker")]
14pub use csaf::*;
15
16#[cfg(feature = "clap")]
17mod clap;
18#[cfg(feature = "clap")]
19pub use self::clap::*;
20
21#[derive(Debug, thiserror::Error)]
22pub enum SendError {
23 #[error(transparent)]
24 Sender(#[from] sender::Error),
25 #[error(transparent)]
26 Request(#[from] reqwest::Error),
27 #[error("client error: {0}")]
28 Client(StatusCode),
29 #[error("server error: {0}")]
30 Server(StatusCode),
31 #[error("unexpected status: {0}")]
32 UnexpectedStatus(StatusCode),
33}
34
35#[non_exhaustive]
37#[derive(Clone)]
38pub struct SendVisitor {
39 pub url: Url,
41
42 pub sender: HttpSender,
44
45 pub retries: usize,
47
48 pub retry_delay: Option<Duration>,
50}
51
52impl SendVisitor {
53 pub fn new(url: impl Into<Url>, sender: HttpSender) -> Self {
54 Self {
55 url: url.into(),
56 sender,
57 retries: 0,
58 retry_delay: None,
59 }
60 }
61
62 pub fn retries(mut self, retries: usize) -> Self {
63 self.retries = retries;
64 self
65 }
66
67 pub fn retry_delay(mut self, retry_delay: impl Into<Duration>) -> Self {
68 self.retry_delay = Some(retry_delay.into());
69 self
70 }
71}
72
73const DEFAULT_RETRY_DELAY: Duration = Duration::from_secs(5);
75
76pub enum SendOnceError {
77 Temporary(SendError),
78 Permanent(SendError),
79}
80
81impl SendVisitor {
82 async fn send_once<F>(
84 &self,
85 name: &str,
86 data: Bytes,
87 customizer: F,
88 ) -> Result<(), SendOnceError>
89 where
90 F: FnOnce(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
91 {
92 let request = self
93 .sender
94 .request(Method::POST, self.url.clone())
95 .await
96 .map_err(|err| SendOnceError::Temporary(err.into()))?
97 .body(Body::from(data));
98 let request = customizer(request);
99 let response = request
100 .send()
101 .await
102 .map_err(|err| SendOnceError::Temporary(err.into()))?;
103
104 let status = response.status();
105
106 if status.is_success() {
107 log::debug!("Uploaded {} -> {}", name, response.status());
108 Ok(())
109 } else if status.is_client_error() {
110 log::warn!("Failed to upload, payload rejected {name} -> {status}",);
111 Err(SendOnceError::Permanent(SendError::Client(status)))
112 } else if status.is_server_error() {
113 log::warn!("Failed to upload, server error {name} -> {status}",);
114 Err(SendOnceError::Temporary(SendError::Server(status)))
115 } else {
116 Err(SendOnceError::Permanent(SendError::UnexpectedStatus(
117 status,
118 )))
119 }
120 }
121
122 async fn send<F>(&self, name: &str, data: Bytes, customizer: F) -> Result<(), SendError>
124 where
125 F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
126 {
127 let mut retries = self.retries;
128 loop {
129 match self.send_once(name, data.clone(), &customizer).await {
130 Ok(()) => break Ok(()),
131 Err(SendOnceError::Permanent(err)) => break Err(err),
132 Err(SendOnceError::Temporary(err)) if retries == 0 => break Err(err),
133 Err(SendOnceError::Temporary(_)) => {
134 log::debug!("Failed with a temporary error, retrying ...");
135 }
136 }
137
138 tokio::time::sleep(self.retry_delay.unwrap_or(DEFAULT_RETRY_DELAY)).await;
141 log::info!("Retrying ({retries} attempts left)");
142 retries -= 1;
143 }
144 }
145}