1use std::{ops::Deref, rc::Rc};
2
3use actix_service::boxed::{BoxService, BoxServiceFactory};
4use actix_web::{
5 HttpMessage,
6 body::BoxBody,
7 dev::{self, Service, ServiceRequest, ServiceResponse},
8 error::Error,
9};
10use futures_core::future::LocalBoxFuture;
11
12use crate::link::{LinkInner, default_response};
13use crate::payload::PayloadRef;
14
15pub type HttpService = BoxService<ServiceRequest, ServiceResponse, Error>;
16pub type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>;
17
18#[derive(Clone)]
20pub struct ChainService(pub(crate) Rc<ChainInner>);
21
22impl Deref for ChainService {
23 type Target = ChainInner;
24
25 fn deref(&self) -> &Self::Target {
26 &self.0
27 }
28}
29
30pub struct ChainInner {
31 pub(crate) links: Vec<LinkInner>,
32 pub(crate) body_buffer_size: usize,
33}
34
35impl Service<ServiceRequest> for ChainService {
36 type Response = ServiceResponse<BoxBody>;
37 type Error = Error;
38 type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
39
40 dev::always_ready!();
41
42 fn call(&self, mut req: ServiceRequest) -> Self::Future {
43 if self.links.is_empty() {
44 tracing::warn!("no links present in chain!");
45 return Box::pin(async move { Ok(default_response(req)) });
46 }
47
48 let this = self.clone();
49 if self.links.len() == 1 {
50 return Box::pin(async move { this.links[0].call_once(req).await });
51 }
52
53 Box::pin(async move {
54 let payload = req.take_payload();
55 let buf = PayloadRef::new(payload, this.body_buffer_size);
56 req.set_payload(buf.into_payload());
57
58 let ctx = req.guard_ctx();
59 let active_links: Vec<_> = this
60 .links
61 .iter()
62 .enumerate()
63 .filter(|(_, link)| link.matches(req.uri().path(), &ctx))
64 .collect();
65
66 let addr = req
67 .peer_addr()
68 .map(|addr| addr.to_string())
69 .unwrap_or_default();
70 tracing::debug!(
71 "{addr} {}/{} links matched {:?} {:?}",
72 active_links.len(),
73 this.links.len(),
74 req.method(),
75 req.uri()
76 );
77
78 let mut link_iter = active_links.into_iter().peekable();
79 while let Some((n, link)) = link_iter.next() {
80 tracing::debug!("{addr} calling link {n}");
81 let mut original_uri = None;
82 if let Some(uri) = link.new_uri(req.uri()) {
83 original_uri = Some(req.uri().clone());
84 tracing::debug!("{addr} updated uri {:?} -> {uri:?}", req.uri());
85 req.head_mut().uri = uri;
86 }
87
88 let res = link.service.call(req).await?;
89 let (http_req, http_res) = res.into_parts();
90 tracing::debug!("{addr} link {n} response={:?}", http_res.status());
91 if link_iter.peek().is_none() || !link.go_next(&http_res) {
92 return Ok(ServiceResponse::new(http_req, http_res));
93 }
94
95 buf.get_mut().reset_stream();
96 req = ServiceRequest::from_parts(http_req, buf.into_payload());
97
98 if let Some(uri) = original_uri {
99 req.head_mut().uri = uri;
100 }
101 }
102
103 Ok(default_response(req))
104 })
105 }
106}