use crate::datapipe_types::{InputReader, good_url};
use bytes::Bytes;
use log::{error, trace};
use reqwest::StatusCode;
use std::io::{Error, ErrorKind};
#[derive(Debug)]
pub struct HttpReader {
client: reqwest::Client,
url: url::Url,
update_interval: tokio::time::Interval,
}
impl HttpReader {
pub const DEFAULT_UPDATE_RATE: u64 = 5000;
pub fn new(http_input_url: &str, update_rate: u64) -> Result<Self, Error> {
let url = good_url(http_input_url, "http://")?;
Ok(Self {
client: reqwest::Client::builder()
.user_agent("datapipe")
.build()
.unwrap(),
url,
update_interval: tokio::time::interval(std::time::Duration::from_millis(update_rate)),
})
}
}
impl InputReader for HttpReader {
async fn read(&mut self) -> Result<Bytes, Error> {
self.update_interval.tick().await; match self.client.get(self.url.as_str()).send().await {
Ok(response) => {
trace!("HttpInput: Web server response is: {:?}", response);
match response.status() {
StatusCode::OK => {
trace!("HttpInput: Status is OK. Getting response body bytes");
match response.bytes().await {
Ok(bytes) => Ok(bytes),
Err(error) => {
let error_message = format!(
"HttpInput: Error converting response body to bytes: {}",
error
);
error!("{}", error_message);
Err(Error::other(error_message))
}
}
}
_ => {
error!("HttpInput: non-Ok status from web server: {:?}", response);
match response.error_for_status() {
Ok(res) => {
let error_message = format!(
"HttpInput: Failed converting web server response to error: {:?}",
res
);
error!("{}", error_message);
Err(Error::other(error_message))
}
Err(error) => {
let error_message =
format!("HttpInput: decoded web server status: {}", error);
error!("{}", error_message);
Err(Error::other(error_message))
}
}
}
}
}
Err(error) => {
let error_message = format!("HttpInput: Error getting HTTP input: {}", error);
error!("{}", error_message);
Err(Error::new(ErrorKind::NotConnected, error_message))
}
}
}
}