use bytes::Bytes;
use futures::future::BoxFuture;
use reqwest::{
Client, StatusCode,
header::{
AUTHORIZATION, CONTENT_TYPE, ETAG, HeaderName, HeaderValue, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED,
},
};
use tracing::debug;
use crate::{
error::{ExternalError, Result},
watcher::{ExternalEvent, ExternalSource},
};
#[derive(Debug, Clone)]
pub struct HttpResponse {
pub body: Bytes,
pub url: String,
pub status: u16,
pub etag: Option<String>,
pub last_modified: Option<String>,
pub content_type: Option<String>,
}
pub struct HttpPoller {
client: Client,
url: String,
name: String,
extra_headers: Vec<(String, String)>,
last_etag: Option<String>,
last_modified_value: Option<String>,
seen: bool,
}
impl HttpPoller {
pub fn new(url: impl Into<String>) -> Self {
let url = url.into();
Self {
name: url.clone(),
client: Client::new(),
url,
extra_headers: Vec::new(),
last_etag: None,
last_modified_value: None,
seen: false,
}
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
pub fn with_client(mut self, client: Client) -> Self {
self.client = client;
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.extra_headers.push((key.into(), value.into()));
self
}
pub fn with_bearer_token(self, token: impl AsRef<str>) -> Self {
self.with_header(AUTHORIZATION.as_str(), format!("Bearer {}", token.as_ref()))
}
}
impl ExternalSource for HttpPoller {
type Item = HttpResponse;
fn name(&self) -> &str {
&self.name
}
fn poll(&mut self) -> BoxFuture<'_, Result<Vec<ExternalEvent<HttpResponse>>>> {
let this = self;
Box::pin(async move {
let mut req = this.client.get(&this.url);
if let Some(etag) = &this.last_etag {
req = req.header(IF_NONE_MATCH, etag.as_str());
} else if let Some(lm) = &this.last_modified_value {
req = req.header(IF_MODIFIED_SINCE, lm.as_str());
}
for (key, value) in &this.extra_headers {
let name: HeaderName =
key.parse()
.map_err(|e: reqwest::header::InvalidHeaderName| {
ExternalError::Internal(e.to_string())
})?;
let val: HeaderValue =
value
.parse()
.map_err(|e: reqwest::header::InvalidHeaderValue| {
ExternalError::Internal(e.to_string())
})?;
req = req.header(name, val);
}
let response = req.send().await?;
match response.status() {
StatusCode::NOT_MODIFIED => {
debug!(url = %this.url, "Not modified (304)");
Ok(vec![])
}
StatusCode::NOT_FOUND => {
if this.seen {
debug!(url = %this.url, "Resource removed (404)");
this.seen = false;
this.last_etag = None;
this.last_modified_value = None;
Ok(vec![ExternalEvent::Removed(HttpResponse {
body: Bytes::new(),
url: this.url.clone(),
status: 404,
etag: None,
last_modified: None,
content_type: None,
})])
} else {
Ok(vec![])
}
}
s if s.is_success() => {
let etag = response
.headers()
.get(ETAG)
.and_then(|v| v.to_str().ok())
.map(String::from);
let last_modified = response
.headers()
.get(LAST_MODIFIED)
.and_then(|v| v.to_str().ok())
.map(String::from);
let content_type = response
.headers()
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(String::from);
let status = response.status().as_u16();
let body = response.bytes().await?;
let http_response = HttpResponse {
body,
url: this.url.clone(),
status,
etag: etag.clone(),
last_modified: last_modified.clone(),
content_type,
};
let event = if this.seen {
ExternalEvent::Modified(http_response)
} else {
this.seen = true;
ExternalEvent::Added(http_response)
};
this.last_etag = etag;
this.last_modified_value = last_modified;
Ok(vec![event])
}
s => Err(ExternalError::Internal(format!(
"Unexpected HTTP status: {s}"
))),
}
})
}
}