upstream_module/
lib.rs

1// Copyright 2024 Wladimir Palant
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Upstream Module for Pingora
16//!
17//! This crate helps configure Pingora’s upstream functionality. It is most useful in combination
18//! with the `virtual-hosts-module` crate that allows applying multiple upstream configurations
19//! conditionally.
20//!
21//! Currently only one configuration option is provided: `upstream` (`--upstream` as command line
22//! option). The value should be a URL like `http://127.0.0.1:8081` or `https://example.com`.
23//!
24//! Supported URL schemes are `http://` and `https://`. Other than the scheme, only host name and
25//! port are considered. Other parts of the URL are ignored if present.
26//!
27//! The `UpstreamHandler` type has to be called in both `request_filter` and `upstream_peer`
28//! Pingora Proxy phases. The former selects an upstream peer and modifies the request by adding
29//! the appropriate `Host` header. The latter retrieves the previously selected upstream peer.
30//!
31//! ```rust
32//! use async_trait::async_trait;
33//! use upstream_module::UpstreamHandler;
34//! use module_utils::RequestFilter;
35//! use pingora_core::Error;
36//! use pingora_core::upstreams::peer::HttpPeer;
37//! use pingora_proxy::{ProxyHttp, Session};
38//!
39//! pub struct MyServer {
40//!     upstream_handler: UpstreamHandler,
41//! }
42//!
43//! #[async_trait]
44//! impl ProxyHttp for MyServer {
45//!     type CTX = <UpstreamHandler as RequestFilter>::CTX;
46//!     fn new_ctx(&self) -> Self::CTX {
47//!         UpstreamHandler::new_ctx()
48//!     }
49//!
50//!     async fn request_filter(
51//!         &self,
52//!         session: &mut Session,
53//!         ctx: &mut Self::CTX,
54//!     ) -> Result<bool, Box<Error>> {
55//!         // Select upstream peer according to configuration. This could be called based on some
56//!         // conditions.
57//!         self.upstream_handler.handle(session, ctx).await
58//!     }
59//!
60//!     async fn upstream_peer(
61//!         &self,
62//!         session: &mut Session,
63//!         ctx: &mut Self::CTX,
64//!     ) -> Result<Box<HttpPeer>, Box<Error>> {
65//!         // Return previously selected peer if any
66//!         UpstreamHandler::upstream_peer(session, ctx).await
67//!     }
68//! }
69//! ```
70//!
71//! To create a handler, you will typically read its configuration from a configuration file,
72//! optionally combined with command line options. The following code will extend Pingora's usual
73//! configuration file and command line options accordingly.
74//!
75//! ```rust
76//! use upstream_module::{UpstreamConf, UpstreamHandler, UpstreamOpt};
77//! use module_utils::{merge_conf, merge_opt, FromYaml};
78//! use pingora_core::server::Server;
79//! use pingora_core::server::configuration::{Opt as ServerOpt, ServerConf};
80//! use structopt::StructOpt;
81//!
82//! #[merge_opt]
83//! struct Opt {
84//!     server: ServerOpt,
85//!     upstream: UpstreamOpt,
86//! }
87//!
88//! #[merge_conf]
89//! struct Conf {
90//!     server: ServerConf,
91//!     upstream: UpstreamConf,
92//! }
93//!
94//! let opt = Opt::from_args();
95//! let mut conf = opt
96//!     .server
97//!     .conf
98//!     .as_ref()
99//!     .and_then(|path| Conf::load_from_yaml(path).ok())
100//!     .unwrap_or_else(Conf::default);
101//! conf.upstream.merge_with_opt(opt.upstream);
102//!
103//! let mut server = Server::new_with_opt_and_conf(opt.server, conf.server);
104//! server.bootstrap();
105//!
106//! let upstream_handler: UpstreamHandler = conf.upstream.try_into().unwrap();
107//! ```
108//!
109//! For complete and more realistic code see `virtual-hosts` example in the repository.
110
111use async_trait::async_trait;
112use http::header;
113use http::uri::{Scheme, Uri};
114use log::error;
115use module_utils::{RequestFilter, RequestFilterResult};
116use pingora_core::upstreams::peer::HttpPeer;
117use pingora_core::{Error, ErrorType};
118use pingora_proxy::Session;
119use serde::{
120    de::{Deserializer, Error as _},
121    Deserialize,
122};
123use std::net::{SocketAddr, ToSocketAddrs};
124use structopt::StructOpt;
125
126/// Command line options of the compression module
127#[derive(Debug, Default, StructOpt)]
128pub struct UpstreamOpt {
129    /// http:// or https:// URL identifying the server that requests should be forwarded for.
130    /// Path and query parts of the URL have no effect.
131    #[structopt(long, parse(try_from_str))]
132    pub upstream: Option<Uri>,
133}
134
135fn deserialize_uri<'de, D>(d: D) -> Result<Option<Uri>, D::Error>
136where
137    D: Deserializer<'de>,
138{
139    let uri = String::deserialize(d)?;
140    let uri = uri
141        .parse()
142        .map_err(|err| D::Error::custom(format!("URL {uri} could not be parsed: {err}")))?;
143    Ok(Some(uri))
144}
145
146/// Configuration settings of the compression module
147#[derive(Debug, Default, Deserialize)]
148#[serde(default)]
149pub struct UpstreamConf {
150    /// http:// or https:// URL identifying the server that requests should be forwarded for.
151    /// Path and query parts of the URL have no effect.
152    #[serde(deserialize_with = "deserialize_uri")]
153    pub upstream: Option<Uri>,
154}
155
156impl UpstreamConf {
157    /// Merges the command line options into the current configuration. Any command line options
158    /// present overwrite existing settings.
159    pub fn merge_with_opt(&mut self, opt: UpstreamOpt) {
160        if opt.upstream.is_some() {
161            self.upstream = opt.upstream;
162        }
163    }
164}
165
166/// Context data of the handler
167#[derive(Debug, Clone)]
168pub struct UpstreamContext {
169    addr: SocketAddr,
170    tls: bool,
171    sni: String,
172}
173
174/// Handler for Pingora’s `request_filter` phase
175#[derive(Debug)]
176pub struct UpstreamHandler {
177    host_port: String,
178    context: Option<UpstreamContext>,
179}
180
181impl UpstreamHandler {
182    /// This function should be called during the `upstream_peer` phase of Pingora Proxy to produce
183    /// the upstream peer which was determined in the `request_filter` phase. Will return a 404 Not
184    /// Found error if no upstream is configured.
185    pub async fn upstream_peer(
186        _session: &mut Session,
187        ctx: &mut Option<UpstreamContext>,
188    ) -> Result<Box<HttpPeer>, Box<Error>> {
189        if let Some(context) = ctx {
190            Ok(Box::new(HttpPeer::new(
191                context.addr,
192                context.tls,
193                context.sni.clone(),
194            )))
195        } else {
196            Err(Error::new(ErrorType::HTTPStatus(404)))
197        }
198    }
199}
200
201impl TryFrom<UpstreamConf> for UpstreamHandler {
202    type Error = Box<Error>;
203
204    fn try_from(conf: UpstreamConf) -> Result<Self, Self::Error> {
205        if let Some(upstream) = conf.upstream {
206            let scheme = upstream.scheme().ok_or_else(|| {
207                error!("provided upstream URL has no scheme: {upstream}");
208                Error::new(ErrorType::InternalError)
209            })?;
210
211            let tls = if scheme == &Scheme::HTTP {
212                false
213            } else if scheme == &Scheme::HTTPS {
214                true
215            } else {
216                error!("provided upstream URL is neither HTTP nor HTTPS: {upstream}");
217                return Err(Error::new(ErrorType::InternalError));
218            };
219
220            let host = upstream.host().ok_or_else(|| {
221                error!("provided upstream URL has no host name: {upstream}");
222                Error::new(ErrorType::InternalError)
223            })?;
224
225            let port = upstream.port_u16().unwrap_or(if tls { 443 } else { 80 });
226
227            let addr = (host, port)
228                .to_socket_addrs()
229                .map_err(|err| {
230                    error!("failed resolving upstream host name {host}: {err}");
231                    Error::new(ErrorType::InternalError)
232                })?
233                .next()
234                .ok_or_else(|| {
235                    error!("DNS lookup of upstream host name {host} didn't produce any results");
236                    Error::new(ErrorType::InternalError)
237                })?;
238
239            let mut host_port = host.to_owned();
240            if let Some(port) = upstream.port() {
241                host_port.push(':');
242                host_port.push_str(port.as_str());
243            }
244
245            Ok(Self {
246                host_port,
247                context: Some(UpstreamContext {
248                    tls,
249                    addr,
250                    sni: host.to_owned(),
251                }),
252            })
253        } else {
254            Ok(Self {
255                host_port: Default::default(),
256                context: None,
257            })
258        }
259    }
260}
261
262#[async_trait]
263impl RequestFilter for UpstreamHandler {
264    type Conf = UpstreamConf;
265    type CTX = Option<UpstreamContext>;
266    fn new_ctx() -> Self::CTX {
267        None
268    }
269
270    async fn request_filter(
271        &self,
272        session: &mut Session,
273        ctx: &mut Self::CTX,
274    ) -> Result<RequestFilterResult, Box<Error>> {
275        if let Some(context) = &self.context {
276            session
277                .req_header_mut()
278                .insert_header(header::HOST, &self.host_port)?;
279
280            *ctx = Some(context.clone());
281
282            Ok(RequestFilterResult::Handled)
283        } else {
284            Ok(RequestFilterResult::Unhandled)
285        }
286    }
287}