upstream_module/lib.rs
1// Copyright 2024 Wladimir Palant
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Upstream Module for Pingora
16//!
17//! This crate helps configure Pingora’s upstream functionality. It is most useful in combination
18//! with the `virtual-hosts-module` crate that allows applying multiple upstream configurations
19//! conditionally.
20//!
21//! Currently only one configuration option is provided: `upstream` (`--upstream` as command line
22//! option). The value should be a URL like `http://127.0.0.1:8081` or `https://example.com`.
23//!
24//! Supported URL schemes are `http://` and `https://`. Other than the scheme, only host name and
25//! port are considered. Other parts of the URL are ignored if present.
26//!
27//! The `UpstreamHandler` type has to be called in both `request_filter` and `upstream_peer`
28//! Pingora Proxy phases. The former selects an upstream peer and modifies the request by adding
29//! the appropriate `Host` header. The latter retrieves the previously selected upstream peer.
30//!
31//! ```rust
32//! use async_trait::async_trait;
33//! use upstream_module::UpstreamHandler;
34//! use module_utils::RequestFilter;
35//! use pingora_core::Error;
36//! use pingora_core::upstreams::peer::HttpPeer;
37//! use pingora_proxy::{ProxyHttp, Session};
38//!
39//! pub struct MyServer {
40//! upstream_handler: UpstreamHandler,
41//! }
42//!
43//! #[async_trait]
44//! impl ProxyHttp for MyServer {
45//! type CTX = <UpstreamHandler as RequestFilter>::CTX;
46//! fn new_ctx(&self) -> Self::CTX {
47//! UpstreamHandler::new_ctx()
48//! }
49//!
50//! async fn request_filter(
51//! &self,
52//! session: &mut Session,
53//! ctx: &mut Self::CTX,
54//! ) -> Result<bool, Box<Error>> {
55//! // Select upstream peer according to configuration. This could be called based on some
56//! // conditions.
57//! self.upstream_handler.handle(session, ctx).await
58//! }
59//!
60//! async fn upstream_peer(
61//! &self,
62//! session: &mut Session,
63//! ctx: &mut Self::CTX,
64//! ) -> Result<Box<HttpPeer>, Box<Error>> {
65//! // Return previously selected peer if any
66//! UpstreamHandler::upstream_peer(session, ctx).await
67//! }
68//! }
69//! ```
70//!
71//! To create a handler, you will typically read its configuration from a configuration file,
72//! optionally combined with command line options. The following code will extend Pingora's usual
73//! configuration file and command line options accordingly.
74//!
75//! ```rust
76//! use upstream_module::{UpstreamConf, UpstreamHandler, UpstreamOpt};
77//! use module_utils::{merge_conf, merge_opt, FromYaml};
78//! use pingora_core::server::Server;
79//! use pingora_core::server::configuration::{Opt as ServerOpt, ServerConf};
80//! use structopt::StructOpt;
81//!
82//! #[merge_opt]
83//! struct Opt {
84//! server: ServerOpt,
85//! upstream: UpstreamOpt,
86//! }
87//!
88//! #[merge_conf]
89//! struct Conf {
90//! server: ServerConf,
91//! upstream: UpstreamConf,
92//! }
93//!
94//! let opt = Opt::from_args();
95//! let mut conf = opt
96//! .server
97//! .conf
98//! .as_ref()
99//! .and_then(|path| Conf::load_from_yaml(path).ok())
100//! .unwrap_or_else(Conf::default);
101//! conf.upstream.merge_with_opt(opt.upstream);
102//!
103//! let mut server = Server::new_with_opt_and_conf(opt.server, conf.server);
104//! server.bootstrap();
105//!
106//! let upstream_handler: UpstreamHandler = conf.upstream.try_into().unwrap();
107//! ```
108//!
109//! For complete and more realistic code see `virtual-hosts` example in the repository.
110
111use async_trait::async_trait;
112use http::header;
113use http::uri::{Scheme, Uri};
114use log::error;
115use module_utils::{RequestFilter, RequestFilterResult};
116use pingora_core::upstreams::peer::HttpPeer;
117use pingora_core::{Error, ErrorType};
118use pingora_proxy::Session;
119use serde::{
120 de::{Deserializer, Error as _},
121 Deserialize,
122};
123use std::net::{SocketAddr, ToSocketAddrs};
124use structopt::StructOpt;
125
126/// Command line options of the compression module
127#[derive(Debug, Default, StructOpt)]
128pub struct UpstreamOpt {
129 /// http:// or https:// URL identifying the server that requests should be forwarded for.
130 /// Path and query parts of the URL have no effect.
131 #[structopt(long, parse(try_from_str))]
132 pub upstream: Option<Uri>,
133}
134
135fn deserialize_uri<'de, D>(d: D) -> Result<Option<Uri>, D::Error>
136where
137 D: Deserializer<'de>,
138{
139 let uri = String::deserialize(d)?;
140 let uri = uri
141 .parse()
142 .map_err(|err| D::Error::custom(format!("URL {uri} could not be parsed: {err}")))?;
143 Ok(Some(uri))
144}
145
146/// Configuration settings of the compression module
147#[derive(Debug, Default, Deserialize)]
148#[serde(default)]
149pub struct UpstreamConf {
150 /// http:// or https:// URL identifying the server that requests should be forwarded for.
151 /// Path and query parts of the URL have no effect.
152 #[serde(deserialize_with = "deserialize_uri")]
153 pub upstream: Option<Uri>,
154}
155
156impl UpstreamConf {
157 /// Merges the command line options into the current configuration. Any command line options
158 /// present overwrite existing settings.
159 pub fn merge_with_opt(&mut self, opt: UpstreamOpt) {
160 if opt.upstream.is_some() {
161 self.upstream = opt.upstream;
162 }
163 }
164}
165
166/// Context data of the handler
167#[derive(Debug, Clone)]
168pub struct UpstreamContext {
169 addr: SocketAddr,
170 tls: bool,
171 sni: String,
172}
173
174/// Handler for Pingora’s `request_filter` phase
175#[derive(Debug)]
176pub struct UpstreamHandler {
177 host_port: String,
178 context: Option<UpstreamContext>,
179}
180
181impl UpstreamHandler {
182 /// This function should be called during the `upstream_peer` phase of Pingora Proxy to produce
183 /// the upstream peer which was determined in the `request_filter` phase. Will return a 404 Not
184 /// Found error if no upstream is configured.
185 pub async fn upstream_peer(
186 _session: &mut Session,
187 ctx: &mut Option<UpstreamContext>,
188 ) -> Result<Box<HttpPeer>, Box<Error>> {
189 if let Some(context) = ctx {
190 Ok(Box::new(HttpPeer::new(
191 context.addr,
192 context.tls,
193 context.sni.clone(),
194 )))
195 } else {
196 Err(Error::new(ErrorType::HTTPStatus(404)))
197 }
198 }
199}
200
201impl TryFrom<UpstreamConf> for UpstreamHandler {
202 type Error = Box<Error>;
203
204 fn try_from(conf: UpstreamConf) -> Result<Self, Self::Error> {
205 if let Some(upstream) = conf.upstream {
206 let scheme = upstream.scheme().ok_or_else(|| {
207 error!("provided upstream URL has no scheme: {upstream}");
208 Error::new(ErrorType::InternalError)
209 })?;
210
211 let tls = if scheme == &Scheme::HTTP {
212 false
213 } else if scheme == &Scheme::HTTPS {
214 true
215 } else {
216 error!("provided upstream URL is neither HTTP nor HTTPS: {upstream}");
217 return Err(Error::new(ErrorType::InternalError));
218 };
219
220 let host = upstream.host().ok_or_else(|| {
221 error!("provided upstream URL has no host name: {upstream}");
222 Error::new(ErrorType::InternalError)
223 })?;
224
225 let port = upstream.port_u16().unwrap_or(if tls { 443 } else { 80 });
226
227 let addr = (host, port)
228 .to_socket_addrs()
229 .map_err(|err| {
230 error!("failed resolving upstream host name {host}: {err}");
231 Error::new(ErrorType::InternalError)
232 })?
233 .next()
234 .ok_or_else(|| {
235 error!("DNS lookup of upstream host name {host} didn't produce any results");
236 Error::new(ErrorType::InternalError)
237 })?;
238
239 let mut host_port = host.to_owned();
240 if let Some(port) = upstream.port() {
241 host_port.push(':');
242 host_port.push_str(port.as_str());
243 }
244
245 Ok(Self {
246 host_port,
247 context: Some(UpstreamContext {
248 tls,
249 addr,
250 sni: host.to_owned(),
251 }),
252 })
253 } else {
254 Ok(Self {
255 host_port: Default::default(),
256 context: None,
257 })
258 }
259 }
260}
261
262#[async_trait]
263impl RequestFilter for UpstreamHandler {
264 type Conf = UpstreamConf;
265 type CTX = Option<UpstreamContext>;
266 fn new_ctx() -> Self::CTX {
267 None
268 }
269
270 async fn request_filter(
271 &self,
272 session: &mut Session,
273 ctx: &mut Self::CTX,
274 ) -> Result<RequestFilterResult, Box<Error>> {
275 if let Some(context) = &self.context {
276 session
277 .req_header_mut()
278 .insert_header(header::HOST, &self.host_port)?;
279
280 *ctx = Some(context.clone());
281
282 Ok(RequestFilterResult::Handled)
283 } else {
284 Ok(RequestFilterResult::Unhandled)
285 }
286 }
287}