use std::ops::ControlFlow;
use crate::Request;
use super::Part;
use crate::types::PartialResponse;
pub fn fetch_streaming_blocking(
request: Request,
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
) {
let mut req = ureq::request(&request.method, &request.url);
for (k, v) in &request.headers {
req = req.set(k, v);
}
let resp = if request.body.is_empty() {
req.call()
} else {
req.send_bytes(&request.body)
};
let (ok, resp) = match resp {
Ok(resp) => (true, resp),
Err(ureq::Error::Status(_, resp)) => (false, resp), Err(ureq::Error::Transport(err)) => {
on_data(Err(err.to_string()));
return;
}
};
let url = resp.get_url().to_owned();
let status = resp.status();
let status_text = resp.status_text().to_owned();
let mut headers = crate::Headers::default();
for key in &resp.headers_names() {
if let Some(value) = resp.header(key) {
headers.insert(key.to_ascii_lowercase(), value.to_owned());
}
}
headers.sort();
let response = PartialResponse {
url,
ok,
status,
status_text,
headers,
};
if on_data(Ok(Part::Response(response))).is_break() {
return;
};
let mut reader = resp.into_reader();
loop {
let mut buf = vec![0; 2048];
match reader.read(&mut buf) {
Ok(n) if n > 0 => {
let chunk = buf[..n].to_vec();
if on_data(Ok(Part::Chunk(chunk))).is_break() {
return;
};
}
Ok(_) => {
on_data(Ok(Part::Chunk(vec![])));
break;
}
Err(err) => {
if request.method == "HEAD" && err.kind() == std::io::ErrorKind::UnexpectedEof {
on_data(Ok(Part::Chunk(vec![])));
break;
} else {
on_data(Err(format!("Failed to read response body: {err}")));
return;
}
}
};
}
}
pub(crate) fn fetch_streaming(
request: Request,
on_data: Box<dyn Fn(crate::Result<Part>) -> ControlFlow<()> + Send>,
) {
std::thread::Builder::new()
.name("ehttp".to_owned())
.spawn(move || fetch_streaming_blocking(request, on_data))
.expect("Failed to spawn ehttp thread");
}