#![doc = include_str!("../README.md")]
pub(crate) mod cache;
mod config;
mod element_handler;
mod error;
mod expression;
mod functions;
mod literals;
mod parser;
pub(crate) mod parser_types;
use crate::element_handler::{ElementHandler, Flow};
use crate::expression::EvalContext;
use crate::parser_types::{DcaMode, IncludeAttributes};
#[cfg(not(feature = "expose-internals"))]
use crate::parser_types::{Element, Expr};
use bytes::{Bytes, BytesMut};
use fastly::http::request::{select, PendingRequest};
use fastly::http::{header, Method, StatusCode, Url};
use fastly::{mime, Backend, Request, Response};
use log::debug;
use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::io::{BufRead, Write};
use std::time::Duration;
pub use crate::error::{ESIError, Result};
#[cfg(feature = "expose-internals")]
pub use crate::parser::parse;
#[cfg(feature = "expose-internals")]
pub use crate::parser::{interpolated_content, parse_complete, parse_expression};
pub use crate::cache::CacheConfig;
pub use crate::config::Configuration;
#[cfg(feature = "expose-internals")]
pub use crate::parser_types::{Element, Expr, Tag};
type FragmentRequestDispatcher = dyn Fn(Request, Option<u32>) -> Result<PendingFragmentContent>;
type FragmentResponseProcessor = dyn Fn(&mut Request, Response) -> Result<Response>;
pub enum PendingFragmentContent {
PendingRequest(Box<PendingRequest>),
CompletedRequest(Box<Response>),
NoContent,
}
impl From<PendingRequest> for PendingFragmentContent {
fn from(value: PendingRequest) -> Self {
Self::PendingRequest(Box::new(value))
}
}
impl From<Response> for PendingFragmentContent {
fn from(value: Response) -> Self {
Self::CompletedRequest(Box::new(value))
}
}
struct FragmentMetadata {
method: Option<Bytes>,
entity: Option<Bytes>,
setheaders: Vec<(String, Bytes)>,
appendheaders: Vec<(String, Bytes)>,
removeheaders: Vec<String>,
cacheable: bool,
ttl_override: Option<u32>,
continue_on_error: bool,
maxwait: Option<u32>,
dca: DcaMode,
}
pub struct Fragment {
pub(crate) req: Request,
pub(crate) alt_bytes: Option<Bytes>,
pub(crate) pending_fragment: PendingFragmentContent,
pub(crate) metadata: FragmentMetadata,
}
enum QueuedElement {
Content(Bytes),
Include(Box<Fragment>),
Try {
attempt_elements: Vec<Vec<Element>>,
except_elements: Vec<Element>,
},
}
#[derive(Hash, Eq, PartialEq, Clone)]
struct RequestKey {
method: Method,
url: String,
}
struct TryBlockTracker {
outer_slot: usize,
attempts: Vec<AttemptTracker>,
except_elements: Vec<Element>,
pending_count: usize,
}
struct AttemptTracker {
buf_slots: Vec<usize>,
failed: bool,
}
struct SlotEntry {
buf_slot: usize,
fragment: Box<Fragment>,
try_info: Option<(usize, usize)>,
}
impl PendingFragmentContent {
pub const fn is_ready(&self) -> bool {
!matches!(self, Self::PendingRequest(_))
}
pub fn wait(self) -> Result<Response> {
match self {
Self::PendingRequest(pending_request) => pending_request.wait().map_err(|e| {
ESIError::FragmentRequestError(format!("fragment request wait failed: {e}"))
}),
Self::CompletedRequest(response) => Ok(*response),
Self::NoContent => Ok(Response::from_status(StatusCode::NO_CONTENT)),
}
}
}
pub struct Processor {
ctx: EvalContext,
configuration: Configuration,
queue: VecDeque<QueuedElement>,
}
struct DocumentHandler<'a, W: Write> {
processor: &'a mut Processor,
output: &'a mut W,
dispatch_fragment_request: &'a FragmentRequestDispatcher,
fragment_response_handler: Option<&'a FragmentResponseProcessor>,
}
impl<W: Write> ElementHandler for DocumentHandler<'_, W> {
fn ctx(&mut self) -> &mut EvalContext {
&mut self.processor.ctx
}
fn process_queue(&mut self) -> crate::Result<()> {
self.processor.process_queue(
self.output,
self.dispatch_fragment_request,
self.fragment_response_handler,
)
}
fn write_bytes(&mut self, bytes: Bytes) -> crate::Result<()> {
if self.processor.queue.is_empty() {
self.output
.write_all(&bytes)
.map_err(ESIError::WriterError)?;
} else {
self.processor
.queue
.push_back(QueuedElement::Content(bytes));
}
Ok(())
}
fn on_return(&mut self, _value: &Expr) -> crate::Result<Flow> {
Ok(Flow::Continue)
}
fn on_include(&mut self, attrs: &IncludeAttributes) -> crate::Result<Flow> {
let queued_element = self
.processor
.dispatch_include(attrs, self.dispatch_fragment_request)?;
self.processor.queue.push_back(queued_element);
Ok(Flow::Continue)
}
fn on_eval(&mut self, attrs: &IncludeAttributes) -> crate::Result<Flow> {
let queued_element = self
.processor
.dispatch_include(attrs, self.dispatch_fragment_request)?;
match queued_element {
QueuedElement::Include(fragment) => {
let response = fragment.pending_fragment.wait()?;
if !response.get_status().is_success() {
if fragment.metadata.continue_on_error {
return Ok(Flow::Continue);
}
return Err(ESIError::UnexpectedStatus {
url: fragment.req.get_url_str().to_string(),
status: response.get_status().as_u16(),
});
}
let body_bytes = response.into_body_bytes();
let body_as_bytes = Bytes::from(body_bytes);
let eval_url = fragment.req.get_url_str().to_string();
let (rest, elements) = parser::parse_complete(&body_as_bytes).map_err(|e| {
ESIError::ParseError(format!("failed to parse eval fragment {eval_url}: {e}"))
})?;
if !rest.is_empty() {
return Err(ESIError::ParseError(format!(
"incomplete parse of eval fragment {eval_url}"
)));
}
if fragment.metadata.dca == DcaMode::Esi {
let dispatcher = self.dispatch_fragment_request;
let resp_handler = self.fragment_response_handler;
let mut isolated_processor = Processor::new(
Some(self.processor.ctx.get_request().clone_without_body()),
self.processor.configuration.clone(),
);
let mut isolated_output = Vec::new();
{
let mut isolated_handler = DocumentHandler {
processor: &mut isolated_processor,
output: &mut isolated_output,
dispatch_fragment_request: dispatcher,
fragment_response_handler: resp_handler,
};
for element in elements {
isolated_handler.process(&element)?;
}
}
isolated_processor.drain_queue(
&mut isolated_output,
dispatcher,
resp_handler,
)?;
let isolated_bytes = Bytes::from(isolated_output);
let (rest, output_elements) =
parser::parse_complete(&isolated_bytes).map_err(|e| {
ESIError::ParseError(format!(
"failed to parse eval isolated output: {e}",
))
})?;
if !rest.is_empty() {
return Err(ESIError::ParseError(
"incomplete parse of eval isolated output".into(),
));
}
for element in output_elements {
if matches!(self.process(&element)?, Flow::Break) {
return Ok(Flow::Break);
}
}
} else {
for element in elements {
if matches!(self.process(&element)?, Flow::Break) {
return Ok(Flow::Break); }
}
}
Ok(Flow::Continue)
}
QueuedElement::Content(_) => {
Ok(Flow::Continue)
}
QueuedElement::Try { .. } => {
unreachable!("dispatch_include_to_element should only return Include or Content")
}
}
}
fn on_try(
&mut self,
attempt_events: Vec<Vec<Element>>,
except_events: Vec<Element>,
) -> crate::Result<Flow> {
self.processor.queue.push_back(QueuedElement::Try {
attempt_elements: attempt_events,
except_elements: except_events,
});
Ok(Flow::Continue)
}
fn on_function(&mut self, name: String, body: Vec<Element>) -> crate::Result<Flow> {
self.processor.ctx.register_function(name, body);
Ok(Flow::Continue)
}
}
impl Processor {
pub fn new(original_request_metadata: Option<Request>, configuration: Configuration) -> Self {
let mut ctx = EvalContext::new();
if let Some(req) = original_request_metadata {
ctx.set_request(req);
} else {
ctx.set_request(Request::new(Method::GET, "http://localhost"));
}
ctx.set_max_function_recursion_depth(configuration.function_recursion_depth);
Self {
ctx,
configuration,
queue: VecDeque::new(),
}
}
pub const fn context(&self) -> &EvalContext {
&self.ctx
}
const fn fragment_req_failed(&self) -> &'static [u8] {
if self.configuration.is_escaped_content {
FRAGMENT_REQUEST_FAILED
} else {
b""
}
}
pub fn process_response(
mut self,
src_stream: &mut Response,
client_response_metadata: Option<Response>,
dispatch_fragment_request: Option<&FragmentRequestDispatcher>,
process_fragment_response: Option<&FragmentResponseProcessor>,
) -> Result<()> {
let mut output = Vec::new();
self.process_stream(
src_stream.take_body(),
&mut output,
dispatch_fragment_request,
process_fragment_response,
)?;
let mut resp = client_response_metadata.unwrap_or_else(|| {
Response::from_status(StatusCode::OK).with_content_type(mime::TEXT_HTML)
});
if self.configuration.cache.rendered_cache_control {
if let Some(cache_control_value) = self
.ctx
.cache_control_header(self.configuration.cache.rendered_ttl)
{
resp.set_header(header::CACHE_CONTROL, cache_control_value);
}
}
for (name, value) in self.ctx.response_headers() {
resp.set_header(name, value);
}
if let Some(status) = self.ctx.response_status() {
let status_code = StatusCode::from_u16(status as u16).map_err(|_| {
ESIError::FunctionError("set_response_code: invalid status code".to_string())
})?;
resp.set_status(status_code);
}
let body_bytes = self
.ctx
.response_body_override()
.cloned()
.unwrap_or_else(|| Bytes::from(output));
resp.set_body(body_bytes.as_ref());
resp.send_to_client();
Ok(())
}
pub fn process_response_streaming(
mut self,
src_stream: &mut Response,
client_response_metadata: Option<Response>,
dispatch_fragment_request: Option<&FragmentRequestDispatcher>,
process_fragment_response: Option<&FragmentResponseProcessor>,
) -> Result<()> {
let resp = client_response_metadata.unwrap_or_else(|| {
Response::from_status(StatusCode::OK).with_content_type(mime::TEXT_HTML)
});
let mut output_writer = resp.stream_to_client();
let result = self.process_stream(
src_stream.take_body(),
&mut output_writer,
dispatch_fragment_request,
process_fragment_response,
);
self.warn_ignored_streaming_metadata();
output_writer.finish().map_err(ESIError::WriterError)?;
result
}
fn warn_ignored_streaming_metadata(&self) {
let headers = self.ctx.response_headers();
if !headers.is_empty() {
for (name, value) in headers {
println!(
"warning: $add_header('{name}', '{value}') has no effect in streaming mode \
— response headers were already sent to the client"
);
}
}
if let Some(status) = self.ctx.response_status() {
println!(
"warning: $set_response_code({status}) has no effect in streaming mode \
— response headers were already sent to the client"
);
}
if self.ctx.response_body_override().is_some() {
println!(
"warning: $set_response_code() body override has no effect in streaming mode \
— response body is already being streamed to the client"
);
}
if self.configuration.cache.rendered_cache_control
&& self
.ctx
.cache_control_header(self.configuration.cache.rendered_ttl)
.is_some()
{
println!(
"warning: Cache-Control header cannot be applied in streaming mode \
— response headers were already sent to the client"
);
}
}
pub fn process_stream(
&mut self,
mut src_stream: impl BufRead,
output_writer: &mut impl Write,
dispatch_fragment_request: Option<&FragmentRequestDispatcher>,
process_fragment_response: Option<&FragmentResponseProcessor>,
) -> Result<()> {
const MAX_ITERATIONS: usize = 10000;
let chunk_size = self.configuration.chunk_size;
let dispatcher = dispatch_fragment_request.unwrap_or(&default_fragment_dispatcher);
let mut buffer = BytesMut::with_capacity(chunk_size);
let mut read_buf = vec![0u8; chunk_size];
let mut eof = false;
let mut iterations = 0;
loop {
iterations += 1;
if iterations > MAX_ITERATIONS {
return Err(ESIError::InfiniteLoop {
iterations,
buffer_len: buffer.len(),
eof,
});
}
if !eof {
match src_stream.read(&mut read_buf) {
Ok(0) => eof = true,
Ok(n) => buffer.extend_from_slice(&read_buf[..n]),
Err(e) => return Err(ESIError::WriterError(e)),
}
}
let frozen = buffer.split().freeze();
let parse_result = if eof {
parser::parse_eof(&frozen)
} else {
parser::parse(&frozen)
};
match parse_result {
Ok((remaining, elements)) => {
let mut handler = DocumentHandler {
processor: self,
output: output_writer,
dispatch_fragment_request: dispatcher,
fragment_response_handler: process_fragment_response,
};
for element in elements {
handler.process(&element)?;
handler.process_queue()?;
}
if eof {
break;
}
if !remaining.is_empty() {
let consumed = frozen.len() - remaining.len();
buffer.extend_from_slice(&frozen[consumed..]);
}
}
Err(nom::Err::Incomplete(_)) => {
debug_assert!(!eof, "parse_eof should not return Incomplete");
}
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => {
if eof {
if e.code == nom::error::ErrorKind::Eof {
return Err(ESIError::UnexpectedEndOfDocument);
}
return Err(ESIError::ParseError(format!("parser error: {e:?}")));
}
output_writer.write_all(&buffer)?;
buffer.clear();
}
}
}
self.drain_queue(output_writer, dispatcher, process_fragment_response)?;
Ok(())
}
fn evaluate_request_params(&mut self, attrs: &IncludeAttributes) -> Result<FragmentMetadata> {
let ttl_override = attrs
.ttl
.as_ref()
.and_then(|ttl_str| cache::parse_ttl(ttl_str));
let method = attrs
.method
.as_ref()
.map(|e| eval_expr_to_bytes(e, &mut self.ctx))
.transpose()?;
let entity = attrs
.entity
.as_ref()
.map(|e| eval_expr_to_bytes(e, &mut self.ctx))
.transpose()?;
let mut setheaders = Vec::with_capacity(attrs.setheaders.len());
for expr in &attrs.setheaders {
let full = eval_expr_to_bytes(expr, &mut self.ctx)?;
if let Some((name, val)) = split_header_value(&full) {
setheaders.push((name, val));
}
}
let mut appendheaders = Vec::with_capacity(attrs.appendheaders.len());
for expr in &attrs.appendheaders {
let full = eval_expr_to_bytes(expr, &mut self.ctx)?;
if let Some((name, val)) = split_header_value(&full) {
appendheaders.push((name, val));
}
}
let mut removeheaders = Vec::with_capacity(attrs.removeheaders.len());
for expr in &attrs.removeheaders {
let name_bytes = eval_expr_to_bytes(expr, &mut self.ctx)?;
if let Ok(s) = std::str::from_utf8(name_bytes.as_ref()) {
removeheaders.push(s.trim().to_string());
}
}
let cacheable = !attrs.no_store && self.configuration.cache.is_includes_cacheable;
Ok(FragmentMetadata {
method,
entity,
setheaders,
appendheaders,
removeheaders,
cacheable,
ttl_override,
continue_on_error: attrs.continue_on_error,
maxwait: attrs.maxwait,
dca: attrs.dca,
})
}
fn dispatch_include(
&mut self,
attrs: &IncludeAttributes,
dispatcher: &FragmentRequestDispatcher,
) -> Result<QueuedElement> {
let src_bytes = eval_expr_to_bytes(&attrs.src, &mut self.ctx)?;
let alt_bytes = attrs
.alt
.as_ref()
.map(|e| eval_expr_to_bytes(e, &mut self.ctx))
.transpose()?;
let metadata = self.evaluate_request_params(attrs)?;
let final_src = if attrs.params.is_empty() {
src_bytes
} else {
let url_cow = String::from_utf8_lossy(&src_bytes);
let mut url = String::with_capacity(url_cow.len() + attrs.params.len() * 20);
url.push_str(&url_cow);
let mut separator = if url.contains('?') { '&' } else { '?' };
for (name, value_expr) in &attrs.params {
let value = eval_expr_to_bytes(value_expr, &mut self.ctx)?;
let value_str = String::from_utf8_lossy(&value);
url.push(separator);
url.push_str(name);
url.push('=');
url.push_str(&value_str);
separator = '&';
}
Bytes::from(url)
};
let req = build_fragment_request(
self.ctx.get_request().clone_without_body(),
&final_src,
&metadata,
&self.configuration,
)?;
let req_clone = req.clone_without_body();
match dispatcher(req_clone, metadata.maxwait) {
Ok(pending_fragment) => {
let fragment = Fragment {
req,
alt_bytes,
pending_fragment,
metadata,
};
Ok(QueuedElement::Include(Box::new(fragment)))
}
Err(_) if metadata.continue_on_error => {
if let Some(alt_src) = &alt_bytes {
let alt_req = build_fragment_request(
self.ctx.get_request().clone_without_body(),
alt_src,
&metadata,
&self.configuration,
)?;
let alt_req_without_body = alt_req.clone_without_body();
dispatcher(alt_req_without_body, metadata.maxwait).map_or_else(
|_| {
Ok(QueuedElement::Content(Bytes::from_static(
self.fragment_req_failed(),
)))
},
|alt_pending| {
let fragment = Fragment {
req: alt_req,
alt_bytes: None,
pending_fragment: alt_pending,
metadata,
};
Ok(QueuedElement::Include(Box::new(fragment)))
},
)
} else {
Ok(QueuedElement::Content(Bytes::from_static(
self.fragment_req_failed(),
)))
}
}
Err(e) => Err(ESIError::FragmentRequestError(format!(
"fragment dispatch failed for {}: {e}",
req.get_url_str()
))),
}
}
fn process_queue(
&mut self,
output_writer: &mut impl Write,
dispatcher: &FragmentRequestDispatcher,
processor: Option<&FragmentResponseProcessor>,
) -> Result<()> {
loop {
match self.queue.pop_front() {
None => break,
Some(QueuedElement::Content(content)) => {
output_writer.write_all(&content)?;
}
Some(QueuedElement::Include(mut fragment)) => {
let pending_content = std::mem::replace(
&mut fragment.pending_fragment,
PendingFragmentContent::NoContent,
);
match pending_content {
PendingFragmentContent::PendingRequest(request) => {
fragment.pending_fragment =
PendingFragmentContent::PendingRequest(request);
self.queue.push_front(QueuedElement::Include(fragment));
break;
}
ready => {
fragment.pending_fragment = ready;
self.process_include(*fragment, output_writer, dispatcher, processor)?;
}
}
}
Some(QueuedElement::Try {
attempt_elements,
except_elements,
}) => {
self.process_try_block(
attempt_elements,
&except_elements,
output_writer,
dispatcher,
processor,
)?;
}
}
}
Ok(())
}
fn make_request_key(req: &Request) -> RequestKey {
RequestKey {
method: req.get_method().clone(),
url: req.get_url_str().to_string(),
}
}
fn drain_queue(
&mut self,
output_writer: &mut impl Write,
dispatch_fragment_request: &FragmentRequestDispatcher,
process_fragment_response: Option<&FragmentResponseProcessor>,
) -> Result<()> {
let mut buf: Vec<Option<Bytes>> = Vec::with_capacity(self.queue.len());
let mut next_out: usize = 0;
let mut url_map: HashMap<RequestKey, VecDeque<SlotEntry>> = HashMap::new();
let mut pending: Vec<PendingRequest> = Vec::new();
let mut try_trackers: Vec<TryBlockTracker> = Vec::new();
loop {
while let Some(elem) = self.queue.pop_front() {
match elem {
QueuedElement::Content(bytes) => {
buf.push(Some(bytes));
}
QueuedElement::Include(mut fragment) => {
let slot = buf.len();
buf.push(None);
let pending_content = std::mem::replace(
&mut fragment.pending_fragment,
PendingFragmentContent::NoContent,
);
match pending_content {
PendingFragmentContent::PendingRequest(req) => {
let key = Self::make_request_key(&fragment.req);
url_map.entry(key).or_default().push_back(SlotEntry {
buf_slot: slot,
fragment,
try_info: None,
});
pending.push(*req);
}
ready => {
fragment.pending_fragment = ready;
let mut slot_buf = Vec::new();
self.process_include(
*fragment,
&mut slot_buf,
dispatch_fragment_request,
process_fragment_response,
)?;
buf[slot] = Some(Bytes::from(slot_buf));
}
}
}
QueuedElement::Try {
attempt_elements,
except_elements,
} => {
let outer_slot = buf.len();
buf.push(None);
let tracker_idx = try_trackers.len();
try_trackers.push(TryBlockTracker {
outer_slot,
attempts: Vec::with_capacity(attempt_elements.len()),
except_elements,
pending_count: 0,
});
for (attempt_idx, attempt_elems) in attempt_elements.into_iter().enumerate()
{
try_trackers[tracker_idx].attempts.push(AttemptTracker {
buf_slots: Vec::new(),
failed: false,
});
let mut pre_buf: Vec<u8> = Vec::new();
let mut pre_failed = false;
self.execute_isolated(
&attempt_elems,
&mut pre_buf,
dispatch_fragment_request,
process_fragment_response,
|this, pre_out| {
if !pre_out.is_empty() {
let slot = buf.len();
buf.push(Some(Bytes::from(pre_out.clone())));
try_trackers[tracker_idx].attempts[attempt_idx]
.buf_slots
.push(slot);
}
while let Some(qe) = this.queue.pop_front() {
match qe {
QueuedElement::Content(bytes) => {
let slot = buf.len();
buf.push(Some(bytes));
try_trackers[tracker_idx].attempts[attempt_idx]
.buf_slots
.push(slot);
}
QueuedElement::Include(mut frag) => {
let slot = buf.len();
buf.push(None);
try_trackers[tracker_idx].attempts[attempt_idx]
.buf_slots
.push(slot);
let pc = std::mem::replace(
&mut frag.pending_fragment,
PendingFragmentContent::NoContent,
);
match pc {
PendingFragmentContent::PendingRequest(req) => {
let key = Self::make_request_key(&frag.req);
url_map.entry(key).or_default().push_back(
SlotEntry {
buf_slot: slot,
fragment: frag,
try_info: Some((
tracker_idx,
attempt_idx,
)),
},
);
pending.push(*req);
try_trackers[tracker_idx].pending_count +=
1;
}
ready => {
frag.pending_fragment = ready;
let mut slot_buf = Vec::new();
if this
.process_include(
*frag,
&mut slot_buf,
dispatch_fragment_request,
process_fragment_response,
)
.is_err()
{
pre_failed = true;
}
buf[slot] = Some(Bytes::from(slot_buf));
}
}
}
QueuedElement::Try {
attempt_elements: nested_attempts,
except_elements: nested_except,
} => {
let slot = buf.len();
buf.push(None);
try_trackers[tracker_idx].attempts[attempt_idx]
.buf_slots
.push(slot);
let mut slot_buf = Vec::new();
this.process_try_block(
nested_attempts,
&nested_except,
&mut slot_buf,
dispatch_fragment_request,
process_fragment_response,
)?;
buf[slot] = Some(Bytes::from(slot_buf));
}
}
}
Ok(())
},
)?;
if pre_failed {
try_trackers[tracker_idx].attempts[attempt_idx].failed = true;
}
}
if try_trackers[tracker_idx].pending_count == 0 {
Self::assemble_try_block(
self,
tracker_idx,
&mut try_trackers,
&mut buf,
dispatch_fragment_request,
process_fragment_response,
)?;
}
}
}
}
while next_out < buf.len() {
match &buf[next_out] {
Some(bytes) => {
output_writer.write_all(bytes)?;
buf[next_out] = Some(Bytes::new()); next_out += 1;
}
None => break, }
}
if pending.is_empty() {
break;
}
let (result, remaining) = select(pending);
pending = remaining;
let (key, completed_content) = match result {
Ok(resp) => {
let key = resp
.get_backend_request()
.map(Self::make_request_key)
.ok_or_else(|| {
ESIError::InternalError(
"drain_queue: response missing backend request for correlation"
.into(),
)
})?;
(
key,
PendingFragmentContent::CompletedRequest(Box::new(resp)),
)
}
Err(e) => {
let req = e.into_sent_req();
let key = Self::make_request_key(&req);
debug!(
"Fragment request to {} {} failed; triggering alt/onerror handling",
key.method, key.url
);
(
key,
PendingFragmentContent::CompletedRequest(Box::new(Response::from_status(
StatusCode::INTERNAL_SERVER_ERROR,
))),
)
}
};
let entry = url_map
.get_mut(&key)
.and_then(VecDeque::pop_front)
.ok_or_else(|| {
ESIError::InternalError(format!(
"drain_queue: no in-flight fragment for {}/{}",
key.method, key.url
))
})?;
let SlotEntry {
buf_slot,
mut fragment,
try_info,
} = entry;
match try_info {
None => {
fragment.pending_fragment = completed_content;
let mut slot_buf = Vec::new();
self.process_include(
*fragment,
&mut slot_buf,
dispatch_fragment_request,
process_fragment_response,
)?;
buf[buf_slot] = Some(Bytes::from(slot_buf));
}
Some((tracker_idx, attempt_idx)) => {
fragment.pending_fragment = completed_content;
let mut slot_buf = Vec::new();
let include_failed = self
.process_include(
*fragment,
&mut slot_buf,
dispatch_fragment_request,
process_fragment_response,
)
.is_err();
buf[buf_slot] = Some(Bytes::from(slot_buf));
if include_failed {
try_trackers[tracker_idx].attempts[attempt_idx].failed = true;
}
try_trackers[tracker_idx].pending_count -= 1;
if try_trackers[tracker_idx].pending_count == 0 {
Self::assemble_try_block(
self,
tracker_idx,
&mut try_trackers,
&mut buf,
dispatch_fragment_request,
process_fragment_response,
)?;
}
}
}
}
while next_out < buf.len() {
match &buf[next_out] {
Some(bytes) => {
output_writer.write_all(bytes)?;
next_out += 1;
}
None => {
return Err(ESIError::InternalError(
"drain_queue: slot still pending after all requests resolved".into(),
));
}
}
}
Ok(())
}
fn assemble_try_block(
&mut self,
tracker_idx: usize,
try_trackers: &mut [TryBlockTracker],
buf: &mut [Option<Bytes>],
dispatch_fragment_request: &FragmentRequestDispatcher,
process_fragment_response: Option<&FragmentResponseProcessor>,
) -> Result<()> {
let mut any_failed = false;
let mut output: Vec<u8> = Vec::new();
for attempt in &try_trackers[tracker_idx].attempts {
if attempt.failed {
any_failed = true;
for &slot_idx in &attempt.buf_slots {
buf[slot_idx] = Some(Bytes::new());
}
} else {
for &slot_idx in &attempt.buf_slots {
if let Some(bytes) = &buf[slot_idx] {
output.extend_from_slice(bytes);
}
buf[slot_idx] = Some(Bytes::new());
}
}
}
if any_failed {
let except_elements = std::mem::take(&mut try_trackers[tracker_idx].except_elements);
if !except_elements.is_empty() {
let except_buf = self.process_try_task(
&except_elements,
dispatch_fragment_request,
process_fragment_response,
)?;
output.extend_from_slice(&except_buf);
}
}
buf[try_trackers[tracker_idx].outer_slot] = Some(Bytes::from(output));
Ok(())
}
fn process_try_block(
&mut self,
attempt_elements: Vec<Vec<Element>>,
except_elements: &[Element],
output_writer: &mut impl Write,
dispatcher: &FragmentRequestDispatcher,
processor: Option<&FragmentResponseProcessor>,
) -> Result<()> {
let mut any_failed = false;
for attempt in attempt_elements {
match self.process_try_task(&attempt, dispatcher, processor) {
Ok(buffer) => output_writer.write_all(&buffer)?,
Err(_) => any_failed = true,
}
}
if any_failed {
let buf = self.process_try_task(except_elements, dispatcher, processor)?;
output_writer.write_all(&buf)?;
}
Ok(())
}
fn execute_isolated<R, W: Write>(
&mut self,
elements: &[Element],
output: &mut W,
dispatcher: &FragmentRequestDispatcher,
processor: Option<&FragmentResponseProcessor>,
after: impl FnOnce(&mut Self, &mut W) -> Result<R>,
) -> Result<R> {
let saved_queue = std::mem::take(&mut self.queue);
{
let mut handler = DocumentHandler {
processor: self,
output,
dispatch_fragment_request: dispatcher,
fragment_response_handler: processor,
};
for elem in elements {
handler.process(elem)?;
}
}
let result = after(self, output);
self.queue = saved_queue;
result
}
fn process_try_task(
&mut self,
elements: &[Element],
dispatcher: &FragmentRequestDispatcher,
processor: Option<&FragmentResponseProcessor>,
) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
self.execute_isolated(elements, &mut buffer, dispatcher, processor, |this, out| {
this.drain_queue(out, dispatcher, processor)?;
Ok(())
})?;
Ok(buffer)
}
fn process_include(
&mut self,
fragment: Fragment,
output_writer: &mut impl Write,
dispatch_fragment_request: &FragmentRequestDispatcher,
process_fragment_response: Option<&FragmentResponseProcessor>,
) -> Result<()> {
let continue_on_error = fragment.metadata.continue_on_error;
let fragment_url = fragment.req.get_url_str().to_string();
let response = fragment.pending_fragment.wait()?;
let final_response = if let Some(proc) = process_fragment_response {
let mut req_for_processor = fragment.req.clone_without_body();
proc(&mut req_for_processor, response)?
} else {
response
};
if final_response.get_status().is_success()
&& (self.configuration.cache.is_rendered_cacheable
|| self.configuration.cache.rendered_cache_control)
{
let ttl = if let Some(override_ttl) = fragment.metadata.ttl_override {
debug!("Using TTL override from include tag: {override_ttl} seconds");
Some(override_ttl)
} else {
match cache::calculate_ttl(&final_response, &self.configuration.cache) {
Ok(Some(ttl)) => {
debug!("Calculated TTL from response: {ttl} seconds");
Some(ttl)
}
Ok(None) => {
debug!("Response not cacheable (private/no-cache/set-cookie)");
self.ctx.mark_document_uncacheable();
None
}
Err(e) => {
debug!("Error calculating TTL: {e:?}");
None
}
}
};
if let Some(ttl_value) = ttl {
self.ctx.update_cache_min_ttl(ttl_value);
debug!("Tracking TTL {ttl_value} for rendered document");
}
}
if final_response.get_status().is_success() {
let body_bytes = final_response.into_body_bytes();
self.process_fragment_body(
body_bytes,
fragment.metadata.dca,
&fragment_url,
output_writer,
dispatch_fragment_request,
process_fragment_response,
)?;
Ok(())
} else if let Some(alt_src) = fragment.alt_bytes {
debug!("Main request failed, trying alt");
let alt_req = build_fragment_request(
self.ctx.get_request().clone_without_body(),
&alt_src,
&fragment.metadata,
&self.configuration,
)?;
let alt_req_without_body = alt_req.clone_without_body();
match dispatch_fragment_request(alt_req_without_body, fragment.metadata.maxwait) {
Ok(alt_pending) => {
let alt_response = alt_pending.wait()?;
let final_alt = if let Some(proc) = process_fragment_response {
let mut alt_req_for_proc = alt_req.clone_without_body();
proc(&mut alt_req_for_proc, alt_response)?
} else {
alt_response
};
let body_bytes = final_alt.into_body_bytes();
self.process_fragment_body(
body_bytes,
fragment.metadata.dca,
&String::from_utf8_lossy(&alt_src),
output_writer,
dispatch_fragment_request,
process_fragment_response,
)?;
Ok(())
}
Err(_) if continue_on_error => {
output_writer.write_all(self.fragment_req_failed())?;
Ok(())
}
Err(_) => Err(ESIError::FragmentRequestError(format!(
"both main and alt failed: main={fragment_url}, alt={}",
String::from_utf8_lossy(&alt_src)
))),
}
} else if continue_on_error {
output_writer.write_all(self.fragment_req_failed())?;
Ok(())
} else {
Err(ESIError::UnexpectedStatus {
url: fragment_url,
status: final_response.get_status().as_u16(),
})
}
}
fn process_fragment_body(
&mut self,
body_bytes: Vec<u8>,
dca_mode: DcaMode,
url: &str,
output_writer: &mut impl Write,
dispatcher: &FragmentRequestDispatcher,
process_fragment_response: Option<&FragmentResponseProcessor>,
) -> Result<()> {
if dca_mode == DcaMode::Esi {
let body_as_bytes = Bytes::from(body_bytes);
let (rest, elements) = parser::parse_complete(&body_as_bytes).map_err(|e| {
ESIError::ParseError(format!("failed to parse fragment {url} with dca=esi: {e}"))
})?;
if !rest.is_empty() {
return Err(ESIError::ParseError(format!(
"incomplete parse of fragment {url} with dca=esi"
)));
}
let mut isolated_processor = Processor::new(
Some(self.ctx.get_request().clone_without_body()),
self.configuration.clone(),
);
{
let mut handler = DocumentHandler {
processor: &mut isolated_processor,
output: output_writer,
dispatch_fragment_request: dispatcher,
fragment_response_handler: process_fragment_response,
};
for element in elements {
if matches!(handler.process(&element)?, Flow::Break) {
return Ok(());
}
}
}
isolated_processor.drain_queue(output_writer, dispatcher, process_fragment_response)?;
} else {
output_writer.write_all(&body_bytes)?;
}
Ok(())
}
}
const FRAGMENT_REQUEST_FAILED: &[u8] = b"<!-- fragment request failed -->";
fn eval_expr_to_bytes(expr: &Expr, ctx: &mut EvalContext) -> Result<Bytes> {
let result = expression::eval_expr(expr, ctx)
.map_err(|e| ESIError::ExpressionError(format!("{e}, in expression: {expr}")))?;
Ok(result.to_bytes())
}
fn default_fragment_dispatcher(
req: Request,
maxwait: Option<u32>,
) -> Result<PendingFragmentContent> {
debug!("no dispatch method configured, defaulting to hostname");
let host = req
.get_url()
.host()
.unwrap_or_else(|| panic!("no host in request: {}", req.get_url()))
.to_string();
let mut builder = Backend::builder(&host, &host)
.override_host(&host)
.enable_ssl()
.sni_hostname(&host);
if let Some(timeout_ms) = maxwait {
builder = builder.first_byte_timeout(Duration::from_millis(u64::from(timeout_ms)));
}
let backend = builder.finish().map_err(|e| {
ESIError::FragmentRequestError(format!("failed to create backend for {host}: {e}"))
})?;
let pending_req = req.send_async(backend)?;
Ok(PendingFragmentContent::PendingRequest(Box::new(
pending_req,
)))
}
fn build_fragment_request(
mut request: Request,
url: &Bytes,
metadata: &FragmentMetadata,
config: &Configuration,
) -> Result<Request> {
let url_str = std::str::from_utf8(url).map_err(|_| {
ESIError::InvalidFragmentConfig(format!(
"invalid UTF-8 in URL: {}",
String::from_utf8_lossy(url)
))
})?;
let escaped_url = if config.is_escaped_content {
Cow::Owned(html_escape::decode_html_entities(url_str).into_owned())
} else {
Cow::Borrowed(url_str)
};
if escaped_url.starts_with('/') {
match Url::parse(
format!("{}://0.0.0.0{}", request.get_url().scheme(), escaped_url).as_str(),
) {
Ok(u) => {
request.get_url_mut().set_path(u.path());
request.get_url_mut().set_query(u.query());
}
Err(_err) => {
return Err(ESIError::InvalidRequestUrl(escaped_url.into_owned()));
}
}
} else {
request.set_url(match Url::parse(&escaped_url) {
Ok(url) => url,
Err(_err) => {
return Err(ESIError::InvalidRequestUrl(escaped_url.into_owned()));
}
});
}
let hostname = request.get_url().host().expect("no host").to_string();
request.set_header(header::HOST, &hostname);
if let Some(method_bytes) = &metadata.method {
let method_str = std::str::from_utf8(method_bytes)
.map_err(|_| {
ESIError::InvalidFragmentConfig(format!(
"invalid UTF-8 in method for {}",
request.get_url_str()
))
})?
.to_uppercase();
match method_str.as_str() {
"GET" => request.set_method(Method::GET),
"POST" => request.set_method(Method::POST),
_ => {
return Err(ESIError::InvalidFragmentConfig(format!(
"unsupported HTTP method: {method_str}"
)))
}
}
}
if let Some(entity_bytes) = &metadata.entity {
if request.get_method() == Method::POST {
request.set_body(entity_bytes.as_ref());
}
}
for header_name in &metadata.removeheaders {
request.remove_header(header_name);
}
for (name, value) in &metadata.setheaders {
request.set_header(name, value.as_ref());
}
for (name, value) in &metadata.appendheaders {
request.append_header(name, value.as_ref());
}
if !metadata.cacheable {
request.set_pass(true);
}
Ok(request)
}
fn split_header_value(full: &Bytes) -> Option<(String, Bytes)> {
let s = std::str::from_utf8(full.as_ref()).ok()?;
let (name, val) = s.split_once(':')?;
Some((
name.trim().to_string(),
Bytes::copy_from_slice(val.trim().as_bytes()),
))
}