actix_chain/
service.rs

1use std::{ops::Deref, rc::Rc};
2
3use actix_service::boxed::{BoxService, BoxServiceFactory};
4use actix_web::{
5    HttpMessage,
6    body::BoxBody,
7    dev::{self, Service, ServiceRequest, ServiceResponse},
8    error::Error,
9};
10use futures_core::future::LocalBoxFuture;
11
12use crate::link::{LinkInner, default_response};
13use crate::payload::PayloadRef;
14
15pub type HttpService = BoxService<ServiceRequest, ServiceResponse, Error>;
16pub type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>;
17
18/// Assembled chain service.
19#[derive(Clone)]
20pub struct ChainService(pub(crate) Rc<ChainInner>);
21
22impl Deref for ChainService {
23    type Target = ChainInner;
24
25    fn deref(&self) -> &Self::Target {
26        &self.0
27    }
28}
29
30pub struct ChainInner {
31    pub(crate) links: Vec<LinkInner>,
32    pub(crate) body_buffer_size: usize,
33}
34
35impl Service<ServiceRequest> for ChainService {
36    type Response = ServiceResponse<BoxBody>;
37    type Error = Error;
38    type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
39
40    dev::always_ready!();
41
42    fn call(&self, mut req: ServiceRequest) -> Self::Future {
43        if self.links.is_empty() {
44            tracing::warn!("no links present in chain!");
45            return Box::pin(async move { Ok(default_response(req)) });
46        }
47
48        let this = self.clone();
49        if self.links.len() == 1 {
50            return Box::pin(async move { this.links[0].call_once(req).await });
51        }
52
53        Box::pin(async move {
54            let payload = req.take_payload();
55            let buf = PayloadRef::new(payload, this.body_buffer_size);
56            req.set_payload(buf.into_payload());
57
58            let ctx = req.guard_ctx();
59            let active_links: Vec<_> = this
60                .links
61                .iter()
62                .enumerate()
63                .filter(|(_, link)| link.matches(req.uri().path(), &ctx))
64                .collect();
65
66            let addr = req
67                .peer_addr()
68                .map(|addr| addr.to_string())
69                .unwrap_or_default();
70            tracing::debug!(
71                "{addr} {}/{} links matched {:?} {:?}",
72                active_links.len(),
73                this.links.len(),
74                req.method(),
75                req.uri()
76            );
77
78            let mut link_iter = active_links.into_iter().peekable();
79            while let Some((n, link)) = link_iter.next() {
80                tracing::debug!("{addr} calling link {n}");
81                let mut original_uri = None;
82                if let Some(uri) = link.new_uri(req.uri()) {
83                    original_uri = Some(req.uri().clone());
84                    tracing::debug!("{addr} updated uri {:?} -> {uri:?}", req.uri());
85                    req.head_mut().uri = uri;
86                }
87
88                let res = link.service.call(req).await?;
89                let (http_req, http_res) = res.into_parts();
90                tracing::debug!("{addr} link {n} response={:?}", http_res.status());
91                if link_iter.peek().is_none() || !link.go_next(&http_res) {
92                    return Ok(ServiceResponse::new(http_req, http_res));
93                }
94
95                buf.get_mut().reset_stream();
96                req = ServiceRequest::from_parts(http_req, buf.into_payload());
97
98                if let Some(uri) = original_uri {
99                    req.head_mut().uri = uri;
100                }
101            }
102
103            Ok(default_response(req))
104        })
105    }
106}