1#![doc = include_str!("../README.md")]
2
3mod config;
4mod document;
5mod error;
6mod expression;
7mod functions;
8mod parse;
9
10use crate::document::{FetchState, Task};
11use crate::expression::{evaluate_expression, try_evaluate_interpolated, EvalContext};
12use fastly::http::request::PendingRequest;
13use fastly::http::{header, Method, StatusCode, Url};
14use fastly::{mime, Body, Request, Response};
15use log::{debug, error, trace};
16use std::collections::VecDeque;
17use std::io::{BufRead, Write};
18
19pub use crate::document::{Element, Fragment};
20pub use crate::error::Result;
21pub use crate::parse::{parse_tags, Event, Include, Tag, Tag::Try};
22
23pub use crate::config::Configuration;
24pub use crate::error::ExecutionError;
25
26pub use quick_xml::{Reader, Writer};
28
29type FragmentRequestDispatcher = dyn Fn(Request) -> Result<PendingFragmentContent>;
30
31type FragmentResponseProcessor = dyn Fn(&mut Request, Response) -> Result<Response>;
32
33pub enum PendingFragmentContent {
35 PendingRequest(PendingRequest),
36 CompletedRequest(Response),
37 NoContent,
38}
39
40impl From<PendingRequest> for PendingFragmentContent {
41 fn from(value: PendingRequest) -> Self {
42 Self::PendingRequest(value)
43 }
44}
45
46impl From<Response> for PendingFragmentContent {
47 fn from(value: Response) -> Self {
48 Self::CompletedRequest(value)
49 }
50}
51
52impl PendingFragmentContent {
53 fn wait_for_content(self) -> Result<Response> {
54 Ok(match self {
55 Self::PendingRequest(pending_request) => pending_request.wait()?,
56 Self::CompletedRequest(response) => response,
57 Self::NoContent => Response::from_status(StatusCode::NO_CONTENT),
58 })
59 }
60}
61
62pub struct Processor {
87 original_request_metadata: Option<Request>,
89 configuration: Configuration,
91}
92
93impl Processor {
94 pub const fn new(
95 original_request_metadata: Option<Request>,
96 configuration: Configuration,
97 ) -> Self {
98 Self {
99 original_request_metadata,
100 configuration,
101 }
102 }
103
104 pub fn process_response(
153 self,
154 src_document: &mut Response,
155 client_response_metadata: Option<Response>,
156 dispatch_fragment_request: Option<&FragmentRequestDispatcher>,
157 process_fragment_response: Option<&FragmentResponseProcessor>,
158 ) -> Result<()> {
159 let resp = client_response_metadata.unwrap_or_else(|| {
161 Response::from_status(StatusCode::OK).with_content_type(mime::TEXT_HTML)
162 });
163
164 let output_writer = resp.stream_to_client();
166
167 let mut xml_writer = Writer::new(output_writer);
169
170 match self.process_document(
171 reader_from_body(src_document.take_body()),
172 &mut xml_writer,
173 dispatch_fragment_request,
174 process_fragment_response,
175 ) {
176 Ok(()) => {
177 xml_writer.into_inner().finish()?;
178 Ok(())
179 }
180 Err(err) => {
181 error!("error processing ESI document: {err}");
182 Err(err)
183 }
184 }
185 }
186
187 pub fn process_parsed_document(
233 self,
234 src_events: VecDeque<Event>,
235 output_writer: &mut Writer<impl Write>,
236 dispatch_fragment_request: Option<&FragmentRequestDispatcher>,
237 process_fragment_response: Option<&FragmentResponseProcessor>,
238 ) -> Result<()> {
239 let dispatch_fragment_request =
241 dispatch_fragment_request.unwrap_or(&default_fragment_dispatcher);
242
243 let original_request_metadata = self.original_request_metadata.as_ref().map_or_else(
245 || Request::new(Method::GET, "http://localhost"),
246 Request::clone_without_body,
247 );
248
249 let root_task = &mut Task::new();
251
252 let mut ctx = EvalContext::new();
254 ctx.set_request(original_request_metadata.clone_without_body());
255
256 for event in src_events {
257 event_receiver(
258 event,
259 &mut root_task.queue,
260 self.configuration.is_escaped_content,
261 &original_request_metadata,
262 dispatch_fragment_request,
263 &mut ctx,
264 )?;
265 }
266
267 Self::process_root_task(
268 root_task,
269 output_writer,
270 dispatch_fragment_request,
271 process_fragment_response,
272 )
273 }
274
275 pub fn process_document(
324 self,
325 mut src_document: Reader<impl BufRead>,
326 output_writer: &mut Writer<impl Write>,
327 dispatch_fragment_request: Option<&FragmentRequestDispatcher>,
328 process_fragment_response: Option<&FragmentResponseProcessor>,
329 ) -> Result<()> {
330 let dispatch_fragment_request =
332 dispatch_fragment_request.unwrap_or(&default_fragment_dispatcher);
333
334 let original_request_metadata = self.original_request_metadata.as_ref().map_or_else(
336 || Request::new(Method::GET, "http://localhost"),
337 Request::clone_without_body,
338 );
339
340 let root_task = &mut Task::new();
342
343 let mut ctx = EvalContext::new();
345 ctx.set_request(original_request_metadata.clone_without_body());
346
347 parse_tags(
351 &self.configuration.namespace,
352 &mut src_document,
353 &mut |event| {
354 event_receiver(
355 event,
356 &mut root_task.queue,
357 self.configuration.is_escaped_content,
358 &original_request_metadata,
359 dispatch_fragment_request,
360 &mut ctx,
361 )
362 },
363 )?;
364
365 Self::process_root_task(
366 root_task,
367 output_writer,
368 dispatch_fragment_request,
369 process_fragment_response,
370 )
371 }
372
373 fn process_root_task(
374 root_task: &mut Task,
375 output_writer: &mut Writer<impl Write>,
376 dispatch_fragment_request: &FragmentRequestDispatcher,
377 process_fragment_response: Option<&FragmentResponseProcessor>,
378 ) -> Result<()> {
379 let mut depth = 0;
381
382 debug!("Elements to fetch: {:?}", root_task.queue);
383
384 fetch_elements(
388 &mut depth,
389 root_task,
390 output_writer,
391 dispatch_fragment_request,
392 process_fragment_response,
393 )?;
394
395 Ok(())
396 }
397}
398
399fn default_fragment_dispatcher(req: Request) -> Result<PendingFragmentContent> {
400 debug!("no dispatch method configured, defaulting to hostname");
401 let backend = req
402 .get_url()
403 .host()
404 .unwrap_or_else(|| panic!("no host in request: {}", req.get_url()))
405 .to_string();
406 let pending_req = req.send_async(backend)?;
407 Ok(PendingFragmentContent::PendingRequest(pending_req))
408}
409
410fn fetch_elements(
414 depth: &mut usize,
415 task: &mut Task,
416 output_writer: &mut Writer<impl Write>,
417 dispatch_fragment_request: &FragmentRequestDispatcher,
418 process_fragment_response: Option<&FragmentResponseProcessor>,
419) -> Result<FetchState> {
420 while let Some(element) = task.queue.pop_front() {
421 match element {
422 Element::Raw(raw) => {
423 process_raw(task, output_writer, &raw, *depth)?;
424 }
425 Element::Include(fragment) => {
426 let result = process_include(
427 task,
428 *fragment,
429 output_writer,
430 *depth,
431 dispatch_fragment_request,
432 process_fragment_response,
433 )?;
434 if let FetchState::Failed(_, _) = result {
435 return Ok(result);
436 }
437 }
438 Element::Try {
439 mut attempt_task,
440 mut except_task,
441 } => {
442 *depth += 1;
443 process_try(
444 task,
445 output_writer,
446 &mut attempt_task,
447 &mut except_task,
448 depth,
449 dispatch_fragment_request,
450 process_fragment_response,
451 )?;
452 *depth -= 1;
453 if *depth == 0 {
454 debug!(
455 "Writing try result: {:?}",
456 String::from_utf8(task.output.get_mut().as_slice().to_vec())
457 );
458 output_handler(output_writer, task.output.get_mut().as_ref())?;
459 task.output.get_mut().clear();
460 }
461 }
462 }
463 }
464 Ok(FetchState::Succeeded)
465}
466
467fn process_include(
468 task: &mut Task,
469 fragment: Fragment,
470 output_writer: &mut Writer<impl Write>,
471 depth: usize,
472 dispatch_fragment_request: &FragmentRequestDispatcher,
473 process_fragment_response: Option<&FragmentResponseProcessor>,
474) -> Result<FetchState> {
475 let Fragment {
477 mut request,
478 alt,
479 continue_on_error,
480 pending_content,
481 } = fragment;
482
483 let resp = pending_content.wait_for_content()?;
485
486 let processed_resp = if let Some(process_response) = process_fragment_response {
487 process_response(&mut request, resp)?
488 } else {
489 resp
490 };
491
492 if processed_resp.get_status().is_success() {
494 if depth == 0 && task.output.get_mut().is_empty() {
495 debug!("Include is not nested, writing content to the output stream");
496 output_handler(output_writer, &processed_resp.into_body_bytes())?;
497 } else {
498 debug!("Include is nested, writing content to a buffer");
499 task.output
500 .get_mut()
501 .extend_from_slice(&processed_resp.into_body_bytes());
502 }
503
504 Ok(FetchState::Succeeded)
505 } else {
506 if let Some(request) = alt {
508 debug!("request poll DONE ERROR, trying alt");
509 if let Some(fragment) =
510 send_fragment_request(request?, None, continue_on_error, dispatch_fragment_request)?
511 {
512 task.queue.push_front(Element::Include(Box::new(fragment)));
513 return Ok(FetchState::Pending);
514 }
515 debug!("guest returned None, continuing");
516 return Ok(FetchState::Succeeded);
517 } else if continue_on_error {
518 debug!("request poll DONE ERROR, NO ALT, continuing");
519 return Ok(FetchState::Succeeded);
520 }
521
522 debug!("request poll DONE ERROR, NO ALT, failing");
523 Ok(FetchState::Failed(
524 request,
525 processed_resp.get_status().into(),
526 ))
527 }
528}
529
530fn process_raw(
534 task: &mut Task,
535 output_writer: &mut Writer<impl Write>,
536 raw: &[u8],
537 depth: usize,
538) -> Result<()> {
539 if depth == 0 && task.output.get_mut().is_empty() {
540 debug!("writing previously queued content");
541 output_writer
542 .get_mut()
543 .write_all(raw)
544 .map_err(ExecutionError::WriterError)?;
545 output_writer.get_mut().flush()?;
546 } else {
547 trace!("-- Depth: {depth}");
548 debug!(
549 "writing blocked content to a queue {:?} ",
550 String::from_utf8(raw.to_owned())
551 );
552 task.output.get_mut().extend_from_slice(raw);
553 }
554 Ok(())
555}
556
557fn process_try(
559 task: &mut Task,
560 output_writer: &mut Writer<impl Write>,
561 attempt_task: &mut Task,
562 except_task: &mut Task,
563 depth: &mut usize,
564 dispatch_fragment_request: &FragmentRequestDispatcher,
565 process_fragment_response: Option<&FragmentResponseProcessor>,
566) -> Result<()> {
567 let attempt_state = fetch_elements(
568 depth,
569 attempt_task,
570 output_writer,
571 dispatch_fragment_request,
572 process_fragment_response,
573 )?;
574
575 let except_state = fetch_elements(
576 depth,
577 except_task,
578 output_writer,
579 dispatch_fragment_request,
580 process_fragment_response,
581 )?;
582
583 trace!("*** Depth: {depth}");
584
585 match (attempt_state, except_state) {
586 (FetchState::Succeeded, _) => {
587 task.output
588 .get_mut()
589 .extend_from_slice(&std::mem::take(attempt_task).output.into_inner());
590 }
591 (FetchState::Failed(_, _), FetchState::Succeeded) => {
592 task.output
593 .get_mut()
594 .extend_from_slice(&std::mem::take(except_task).output.into_inner());
595 }
596 (FetchState::Failed(req, res), FetchState::Failed(_req, _res)) => {
597 return Err(ExecutionError::UnexpectedStatus(
599 req.get_url_str().to_string(),
600 res,
601 ));
602 }
603 (FetchState::Pending, _) | (FetchState::Failed(_, _), FetchState::Pending) => {
604 task.queue.push_front(Element::Try {
606 attempt_task: Box::new(std::mem::take(attempt_task)),
607 except_task: Box::new(std::mem::take(except_task)),
608 });
609 }
610 }
611 Ok(())
612}
613
614fn event_receiver(
617 event: Event,
618 queue: &mut VecDeque<Element>,
619 is_escaped: bool,
620 original_request_metadata: &Request,
621 dispatch_fragment_request: &FragmentRequestDispatcher,
622 ctx: &mut EvalContext,
623) -> Result<()> {
624 match event {
625 Event::ESI(Tag::Include {
626 src,
627 alt,
628 continue_on_error,
629 }) => {
630 debug!("Handling <esi:include> tag with src: {src}");
631 let interpolated_src = try_evaluate_interpolated_string(&src, ctx)?;
633
634 let interpolated_alt = alt
636 .map(|a| try_evaluate_interpolated_string(&a, ctx))
637 .transpose()?;
638 let req = build_fragment_request(
639 original_request_metadata.clone_without_body(),
640 &interpolated_src,
641 is_escaped,
642 );
643 let alt_req = interpolated_alt.map(|alt| {
644 build_fragment_request(
645 original_request_metadata.clone_without_body(),
646 &alt,
647 is_escaped,
648 )
649 });
650 if let Some(fragment) =
651 send_fragment_request(req?, alt_req, continue_on_error, dispatch_fragment_request)?
652 {
653 queue.push_back(Element::Include(Box::new(fragment)));
655 }
656 }
657 Event::ESI(Tag::Try {
658 attempt_events,
659 except_events,
660 }) => {
661 let attempt_task = task_handler(
662 attempt_events,
663 is_escaped,
664 original_request_metadata,
665 dispatch_fragment_request,
666 ctx,
667 )?;
668 let except_task = task_handler(
669 except_events,
670 is_escaped,
671 original_request_metadata,
672 dispatch_fragment_request,
673 ctx,
674 )?;
675
676 trace!(
677 "*** pushing try content to queue: Attempt - {:?}, Except - {:?}",
678 attempt_task.queue,
679 except_task.queue
680 );
681 queue.push_back(Element::Try {
683 attempt_task: Box::new(attempt_task),
684 except_task: Box::new(except_task),
685 });
686 }
687 Event::ESI(Tag::Assign { name, value }) => {
688 let result = evaluate_expression(&value, ctx)?;
690 ctx.set_variable(&name, None, result);
691 }
692 Event::ESI(Tag::Vars { name }) => {
693 debug!("Handling <esi:vars> tag with name: {name:?}");
694 if let Some(name) = name {
695 let result = evaluate_expression(&name, ctx)?;
696 debug!("Evaluated <esi:vars> result: {result:?}");
697 queue.push_back(Element::Raw(result.to_string().into_bytes()));
698 }
699 }
700 Event::ESI(Tag::When { .. }) => unreachable!(),
701 Event::ESI(Tag::Choose {
702 when_branches,
703 otherwise_events,
704 }) => {
705 let mut chose_branch = false;
706 for (when, events) in when_branches {
707 if let Tag::When { test, match_name } = when {
708 if let Some(match_name) = match_name {
709 ctx.set_match_name(&match_name);
710 }
711 let result = evaluate_expression(&test, ctx)?;
712 if result.to_bool() {
713 chose_branch = true;
714 for event in events {
715 event_receiver(
716 event,
717 queue,
718 is_escaped,
719 original_request_metadata,
720 dispatch_fragment_request,
721 ctx,
722 )?;
723 }
724 break;
725 }
726 } else {
727 unreachable!()
728 }
729 }
730
731 if !chose_branch {
732 for event in otherwise_events {
733 event_receiver(
734 event,
735 queue,
736 is_escaped,
737 original_request_metadata,
738 dispatch_fragment_request,
739 ctx,
740 )?;
741 }
742 }
743 }
744
745 Event::InterpolatedContent(event) => {
746 debug!("Handling interpolated content: {event:?}");
747 let event_str = String::from_utf8(event.iter().copied().collect()).unwrap_or_default();
748
749 process_interpolated_chars(&event_str, ctx, |segment| {
750 queue.push_back(Element::Raw(segment.into_bytes()));
751 Ok(())
752 })?;
753 }
754 Event::Content(event) => {
755 debug!("pushing content to buffer, len: {}", queue.len());
756 let mut buf = vec![];
757 let mut writer = Writer::new(&mut buf);
758 writer.write_event(event)?;
759 queue.push_back(Element::Raw(buf));
760 }
761 }
762 Ok(())
763}
764
765fn task_handler(
768 events: Vec<Event>,
769 is_escaped: bool,
770 original_request_metadata: &Request,
771 dispatch_fragment_request: &FragmentRequestDispatcher,
772 ctx: &mut EvalContext,
773) -> Result<Task> {
774 let mut task = Task::new();
775 for event in events {
776 event_receiver(
777 event,
778 &mut task.queue,
779 is_escaped,
780 original_request_metadata,
781 dispatch_fragment_request,
782 ctx,
783 )?;
784 }
785 Ok(task)
786}
787
788fn build_fragment_request(mut request: Request, url: &str, is_escaped: bool) -> Result<Request> {
792 let escaped_url = if is_escaped {
793 match quick_xml::escape::unescape(url) {
794 Ok(url) => url.to_string(),
795 Err(err) => {
796 return Err(ExecutionError::InvalidRequestUrl(err.to_string()));
797 }
798 }
799 } else {
800 url.to_string()
801 };
802
803 if escaped_url.starts_with('/') {
804 match Url::parse(
805 format!("{}://0.0.0.0{}", request.get_url().scheme(), escaped_url).as_str(),
806 ) {
807 Ok(u) => {
808 request.get_url_mut().set_path(u.path());
809 request.get_url_mut().set_query(u.query());
810 }
811 Err(_err) => {
812 return Err(ExecutionError::InvalidRequestUrl(escaped_url));
813 }
814 }
815 } else {
816 request.set_url(match Url::parse(&escaped_url) {
817 Ok(url) => url,
818 Err(_err) => {
819 return Err(ExecutionError::InvalidRequestUrl(escaped_url));
820 }
821 });
822 }
823
824 let hostname = request.get_url().host().expect("no host").to_string();
825
826 request.set_header(header::HOST, &hostname);
827
828 Ok(request)
829}
830
831fn send_fragment_request(
832 req: Request,
833 alt: Option<Result<Request>>,
834 continue_on_error: bool,
835 dispatch_request: &FragmentRequestDispatcher,
836) -> Result<Option<Fragment>> {
837 debug!("Requesting ESI fragment: {}", req.get_url());
838
839 let request = req.clone_without_body();
840
841 let pending_content: PendingFragmentContent = dispatch_request(req)?;
842
843 Ok(Some(Fragment {
844 request,
845 alt,
846 continue_on_error,
847 pending_content,
848 }))
849}
850
851fn reader_from_body(body: Body) -> Reader<Body> {
853 let mut reader = Reader::from_reader(body);
854
855 let config = reader.config_mut();
857 config.check_end_names = false;
858
859 reader
860}
861
862fn output_handler(output_writer: &mut Writer<impl Write>, buffer: &[u8]) -> Result<()> {
864 output_writer.get_mut().write_all(buffer)?;
865 output_writer.get_mut().flush()?;
866 Ok(())
867}
868
869pub fn process_interpolated_chars<F>(
883 input: &str,
884 ctx: &mut EvalContext,
885 mut segment_handler: F,
886) -> Result<()>
887where
888 F: FnMut(String) -> Result<()>,
889{
890 let mut buf = vec![];
891 let mut cur = input.chars().peekable();
892
893 while let Some(c) = cur.peek() {
894 if *c == '$' {
895 let mut new_cur = cur.clone();
896
897 if let Some(value) = try_evaluate_interpolated(&mut new_cur, ctx) {
898 if !buf.is_empty() {
900 segment_handler(buf.into_iter().collect())?;
901 buf = vec![];
902 }
903
904 segment_handler(value.to_string())?;
906 }
907 cur = new_cur;
909 } else {
910 buf.push(cur.next().unwrap());
911 }
912 }
913
914 if !buf.is_empty() {
916 segment_handler(buf.into_iter().collect())?;
917 }
918
919 Ok(())
920}
921
922pub fn try_evaluate_interpolated_string(input: &str, ctx: &mut EvalContext) -> Result<String> {
938 let mut result = String::new();
939
940 process_interpolated_chars(input, ctx, |segment| {
941 result.push_str(&segment);
942 Ok(())
943 })?;
944
945 Ok(result)
946}