mod config;
mod document;
mod error;
mod parse;
use fastly::http::request::PendingRequest;
use fastly::http::{header, Method, StatusCode, Url};
use fastly::{mime, Body, Request, Response};
use log::{debug, error};
use quick_xml::{Reader, Writer};
use std::collections::VecDeque;
use std::io::{BufRead, Write};
pub use crate::document::Element;
use crate::error::Result;
pub use crate::parse::{parse_tags, Event, Tag};
pub use crate::config::Configuration;
pub use crate::error::ExecutionError;
pub struct Processor {
original_request_metadata: Option<Request>,
client_response_metadata: Option<Response>,
configuration: Configuration,
}
impl Processor {
pub fn new(
original_request_metadata: Option<Request>,
client_response_metadata: Option<Response>,
configuration: Configuration,
) -> Self {
Self {
original_request_metadata,
client_response_metadata,
configuration,
}
}
pub fn process_response(
self,
src_document: &mut Response,
dispatch_fragment_request: Option<&dyn Fn(Request) -> Result<Option<PendingRequest>>>,
process_fragment_response: Option<&dyn Fn(Request, Response) -> Result<Response>>,
) -> Result<()> {
let resp = self
.client_response_metadata
.as_ref()
.map(|c| c.clone_without_body())
.unwrap_or_else(|| {
Response::from_status(StatusCode::OK).with_content_type(mime::TEXT_HTML)
});
let output_writer = resp.stream_to_client();
let mut xml_writer = Writer::new(output_writer);
match self.process_document(
reader_from_body(src_document.take_body()),
&mut xml_writer,
dispatch_fragment_request,
process_fragment_response,
) {
Ok(()) => {
xml_writer.into_inner().finish().unwrap();
Ok(())
}
Err(err) => {
error!("error processing ESI document: {}", err);
Err(err)
}
}
}
pub fn process_document(
self,
mut src_document: Reader<impl BufRead>,
output_writer: &mut Writer<impl Write>,
dispatch_fragment_request: Option<&dyn Fn(Request) -> Result<Option<PendingRequest>>>,
process_fragment_response: Option<&dyn Fn(Request, Response) -> Result<Response>>,
) -> Result<()> {
let dispatch_fragment_request = dispatch_fragment_request.unwrap_or({
&|req| {
debug!("no dispatch method configured, defaulting to hostname");
let backend = req
.get_url()
.host()
.unwrap_or_else(|| panic!("no host in request: {}", req.get_url()))
.to_string();
let pending_req = req.send_async(backend)?;
Ok(Some(pending_req))
}
});
let mut elements: VecDeque<Element> = VecDeque::new();
let original_request_metadata = if let Some(req) = &self.original_request_metadata {
req.clone_without_body()
} else {
Request::new(Method::GET, "http://localhost")
};
parse_tags(
&self.configuration.namespace,
&mut src_document,
&mut |event| {
debug!("got {:?}", event);
match event {
Event::ESI(Tag::Include {
src,
alt,
continue_on_error,
}) => {
let req = build_fragment_request(
original_request_metadata.clone_without_body(),
&src,
);
let alt_req = alt.map(|alt| {
build_fragment_request(
original_request_metadata.clone_without_body(),
&alt,
)
});
if let Some(element) = send_fragment_request(
req?,
alt_req,
continue_on_error,
dispatch_fragment_request,
)? {
elements.push_back(element);
}
}
Event::XML(event) => {
if elements.is_empty() {
debug!("nothing waiting so streaming directly to client");
output_writer.write_event(event)?;
output_writer
.inner()
.flush()
.expect("failed to flush output");
} else {
debug!("pushing content to buffer, len: {}", elements.len());
let mut vec = Vec::new();
let mut writer = Writer::new(&mut vec);
writer.write_event(event)?;
elements.push_back(Element::Raw(vec));
}
}
}
poll_elements(
&mut elements,
output_writer,
dispatch_fragment_request,
process_fragment_response,
)?;
Ok(())
},
)?;
loop {
if elements.is_empty() {
break;
}
poll_elements(
&mut elements,
output_writer,
dispatch_fragment_request,
process_fragment_response,
)?;
}
Ok(())
}
}
fn build_fragment_request(mut request: Request, url: &str) -> Result<Request> {
let escaped_url = match quick_xml::escape::unescape(url) {
Ok(url) => url,
Err(err) => {
return Err(ExecutionError::InvalidRequestUrl(err.to_string()));
}
}
.to_string();
if escaped_url.starts_with('/') {
match Url::parse(
format!("{}://0.0.0.0{}", request.get_url().scheme(), escaped_url).as_str(),
) {
Ok(u) => {
request.get_url_mut().set_path(u.path());
request.get_url_mut().set_query(u.query());
}
Err(_err) => {
return Err(ExecutionError::InvalidRequestUrl(escaped_url));
}
};
} else {
request.set_url(match Url::parse(&escaped_url) {
Ok(url) => url,
Err(_err) => {
return Err(ExecutionError::InvalidRequestUrl(escaped_url));
}
});
}
let hostname = request.get_url().host().expect("no host").to_string();
request.set_header(header::HOST, &hostname);
Ok(request)
}
fn send_fragment_request(
req: Request,
alt: Option<Result<Request>>,
continue_on_error: bool,
dispatch_request: &dyn Fn(Request) -> Result<Option<PendingRequest>>,
) -> Result<Option<Element>> {
debug!("Requesting ESI fragment: {}", req.get_url());
let req_metadata = req.clone_without_body();
let pending_request = match dispatch_request(req) {
Ok(Some(req)) => req,
Ok(None) => {
debug!("No pending request returned, skipping");
return Ok(None);
}
Err(err) => {
error!("Failed to dispatch request: {:?}", err);
return Err(err);
}
};
Ok(Some(Element::Fragment(
req_metadata,
alt,
continue_on_error,
pending_request,
)))
}
fn poll_elements(
elements: &mut VecDeque<Element>,
output_writer: &mut Writer<impl Write>,
dispatch_fragment_request: &dyn Fn(Request) -> Result<Option<PendingRequest>>,
process_fragment_response: Option<&dyn Fn(Request, Response) -> Result<Response>>,
) -> Result<()> {
loop {
let element = elements.pop_front();
if let Some(element) = element {
match element {
Element::Raw(raw) => {
debug!("writing previously queued other content");
output_writer.inner().write_all(&raw).unwrap();
}
Element::Fragment(request, alt, continue_on_error, pending_request) => {
match pending_request.poll() {
fastly::http::request::PollResult::Pending(pending_request) => {
elements.insert(
0,
Element::Fragment(request, alt, continue_on_error, pending_request),
);
break;
}
fastly::http::request::PollResult::Done(Ok(res)) => {
if !res.get_status().is_success() {
if let Some(alt) = alt {
debug!("request poll DONE ERROR, trying alt");
if let Some(pending_request) = dispatch_fragment_request(alt?)?
{
elements.insert(
0,
Element::Fragment(
request,
None,
continue_on_error,
pending_request,
),
);
break;
} else {
debug!("guest returned None, continuing");
continue;
}
} else if continue_on_error {
debug!("request poll DONE ERROR, NO ALT, continuing");
continue;
} else {
debug!("request poll DONE ERROR, NO ALT, failing");
return Err(ExecutionError::UnexpectedStatus(
request.get_url_str().to_string(),
res.get_status().into(),
));
}
} else {
let res = if let Some(process_response) = process_fragment_response
{
process_response(request, res)?
} else {
res
};
output_writer
.inner()
.write_all(&res.into_body_bytes())
.unwrap();
output_writer
.inner()
.flush()
.expect("failed to flush output");
}
}
fastly::http::request::PollResult::Done(Err(err)) => {
return Err(ExecutionError::RequestError(err))
}
}
}
}
} else {
break;
}
}
Ok(())
}
fn reader_from_body(body: Body) -> Reader<Body> {
let mut reader = Reader::from_reader(body);
reader.check_end_names(false);
reader
}