use crate::datapipe_types::{OutputWriter, good_url};
use log::{error, trace};
use std::io::{Error, ErrorKind};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct HttpWriter {
client: reqwest::Client,
url: url::Url,
delimiter: Vec<u8>,
include_delimiter: bool,
output_rate: std::time::Duration,
last_output: Instant,
payload: Vec<u8>, buffer: Vec<u8>, buffer_index: usize, }
impl HttpWriter {
pub const DEFAULT_DELIMITER: [u8; 1] = [b'\n'];
pub const DEFAULT_WRITE_RATE: Duration = Duration::from_secs(5);
pub fn new(
http_output_url: &str,
http_output_delimiter: Vec<u8>,
http_output_include_delimiter: bool,
http_output_rate: Duration,
) -> Result<Self, Error> {
let url = good_url(http_output_url, "http://")?;
Ok(Self {
client: reqwest::Client::builder()
.user_agent("datapipe")
.build()
.unwrap(),
url,
delimiter: http_output_delimiter,
include_delimiter: http_output_include_delimiter,
output_rate: http_output_rate,
last_output: Instant::now() - http_output_rate, payload: Vec::new(),
buffer: Vec::new(),
buffer_index: 0,
})
}
fn scan_for_delimiter(&mut self) -> bool {
let mut found_delimiter = false;
for maybe_delimiter in self.buffer[self.buffer_index..].windows(self.delimiter.len()) {
if maybe_delimiter == self.delimiter.as_slice() {
found_delimiter = true;
break;
}
self.buffer_index += 1; }
found_delimiter
}
fn extract_payload(&mut self) {
self.buffer_index += self.delimiter.len();
let mut rest = self.buffer.split_off(self.buffer_index);
if !self.include_delimiter {
self.buffer
.truncate(self.buffer.len() - self.delimiter.len());
}
std::mem::swap(&mut rest, &mut self.buffer);
self.payload.append(&mut rest);
}
async fn send_payload(&mut self) -> Result<(), Error> {
let payload = std::mem::take(&mut self.payload);
match self
.client
.put(self.url.as_str())
.body(payload)
.send()
.await
{
Ok(response) => {
trace!("HttpOutput: Web server response is: {:?}", response);
self.last_output = Instant::now();
Ok(())
}
Err(error) => {
let error_message = format!("HttpOutput: Error sending HTTP output: {}", error);
error!("{}", error_message);
Err(Error::new(ErrorKind::NotConnected, error_message))
}
}
}
}
impl OutputWriter for HttpWriter {
async fn write(&mut self, bytes: &[u8]) -> Result<(), Error> {
self.buffer.append(&mut bytes.to_vec());
let found_delimiter = self.scan_for_delimiter();
if found_delimiter {
self.extract_payload();
}
if self.last_output.elapsed() >= self.output_rate && !self.payload.is_empty() {
self.send_payload().await?;
}
Ok(())
}
}