mod errors {
use error_chain::*;
error_chain! {
foreign_links {
Reqwest(reqwest::Error);
Io(::std::io::Error);
}
errors {
Http(status: reqwest::StatusCode) {
description("HTTP request failed")
display("HTTP status code: {}", status)
}
InvalidContentType(mime_type: mime::Mime) {
description("unexpected Content-Type header")
display("unexpected Content-Type: {}", mime_type)
}
NoContentType {
description("no Content-Type header in response")
display("Content-Type missing")
}
}
}
}
pub use self::errors::*;
use super::event::{parse_event_line, Event, ParseResult};
use reqwest::blocking as reqw;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE};
use std::io::{BufRead, BufReader};
use std::time::{Duration, Instant};
const DEFAULT_RETRY: u64 = 5000;
pub struct Client {
client: reqw::Client,
response: Option<BufReader<reqw::Response>>,
url: reqwest::Url,
last_event_id: Option<String>,
last_try: Option<Instant>,
headers: HeaderMap,
pub retry: Duration,
}
impl Client {
pub fn new(url: reqwest::Url, headers: HeaderMap) -> Client {
Client {
client: reqw::Client::new(),
response: None,
url: url,
last_event_id: None,
last_try: None,
retry: Duration::from_millis(DEFAULT_RETRY),
headers: headers,
}
}
fn next_request(&mut self) -> Result<()> {
let mut headers = self.headers.clone();
headers.insert(ACCEPT, HeaderValue::from_str("text/event-stream").unwrap());
if let Some(ref id) = self.last_event_id {
headers.insert("Last-Event-ID", HeaderValue::from_str(id).unwrap());
}
let res = self.client.get(self.url.clone()).headers(headers).send()?;
{
let status = res.status();
if !status.is_success() {
return Err(ErrorKind::Http(status.clone()).into());
}
if let Some(content_type_hv) = res.headers().get(CONTENT_TYPE) {
let content_type = content_type_hv
.to_str()
.unwrap()
.to_string()
.parse::<mime::Mime>()
.unwrap();
if (content_type.type_(), content_type.subtype())
!= (mime::TEXT, mime::EVENT_STREAM)
{
return Err(ErrorKind::InvalidContentType(content_type.clone()).into());
}
} else {
return Err(ErrorKind::NoContentType.into());
}
}
self.response = Some(BufReader::new(res));
Ok(())
}
}
macro_rules! try_option {
($e:expr) => {
match $e {
Ok(val) => val,
Err(err) => return Some(Err(::std::convert::From::from(err))),
}
};
}
impl Iterator for Client {
type Item = Result<Event>;
fn next(&mut self) -> Option<Result<Event>> {
if self.response.is_none() {
if let Some(last_try) = self.last_try {
let elapsed = last_try.elapsed();
if elapsed < self.retry {
::std::thread::sleep(self.retry - elapsed);
}
}
self.last_try = Some(Instant::now());
try_option!(self.next_request());
}
let result = {
let mut event = Event::new();
let mut line = String::new();
let reader = self.response.as_mut().unwrap();
loop {
match reader.read_line(&mut line) {
Ok(_n) if _n > 0 => {
match parse_event_line(&line, &mut event) {
ParseResult::Next => (), ParseResult::Dispatch => {
if let Some(ref id) = event.id {
self.last_event_id = Some(id.clone());
}
return Some(Ok(event));
}
ParseResult::SetRetry(ref retry) => {
self.retry = *retry;
}
}
line.clear();
}
Ok(_) => break None,
Err(err) => break Some(Err(::std::convert::From::from(err))),
}
}
};
match result {
None | Some(Err(_)) => {
self.last_try = Some(Instant::now());
self.response = None;
self.next()
}
_ => result,
}
}
}