quilkin/net/
maxmind_db.rs

1/*
2 * Copyright 2024 Google LLC All Rights Reserved.
3 *
4 *  Licensed under the Apache License, Version 2.0 (the "License");
5 *  you may not use this file except in compliance with the License.
6 *  You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 *  Unless required by applicable law or agreed to in writing, software
11 *  distributed under the License is distributed on an "AS IS" BASIS,
12 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 *  See the License for the specific language governing permissions and
14 *  limitations under the License.
15 */
16
17use std::sync::Arc;
18
19use bytes::Bytes;
20use hyper_util::client::legacy;
21use maxminddb::Reader;
22use once_cell::sync::Lazy;
23
24type Result<T, E = Error> = std::result::Result<T, E>;
25
26static HTTP: Lazy<
27    legacy::Client<
28        hyper_rustls::HttpsConnector<legacy::connect::HttpConnector>,
29        http_body_util::Empty<Bytes>,
30    >,
31> = Lazy::new(|| {
32    legacy::Client::builder(hyper_util::rt::TokioExecutor::new()).build(
33        hyper_rustls::HttpsConnectorBuilder::new()
34            .with_webpki_roots()
35            .https_or_http()
36            .enable_http1()
37            .enable_http2()
38            .build(),
39    )
40});
41pub static CLIENT: Lazy<arc_swap::ArcSwapOption<MaxmindDb>> = Lazy::new(<_>::default);
42
43#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, schemars::JsonSchema)]
44#[serde(tag = "kind")]
45pub enum Source {
46    File { path: std::path::PathBuf },
47    Url { url: url::Url },
48}
49
50impl std::str::FromStr for Source {
51    type Err = eyre::Error;
52
53    fn from_str(input: &str) -> Result<Self, Self::Err> {
54        if let Ok(url) = input.parse() {
55            Ok(Self::Url { url })
56        } else if let Ok(path) = input.parse() {
57            Ok(Self::File { path })
58        } else {
59            Err(eyre::eyre!("'{}' is not a valid URL or path", input))
60        }
61    }
62}
63
64#[derive(Debug)]
65pub struct MaxmindDb {
66    reader: Reader<Bytes>,
67}
68
69impl MaxmindDb {
70    fn new(reader: Reader<Bytes>) -> Self {
71        Self { reader }
72    }
73
74    pub fn instance() -> arc_swap::Guard<Option<Arc<MaxmindDb>>> {
75        CLIENT.load()
76    }
77
78    pub fn lookup(ip: std::net::IpAddr) -> Option<IpNetEntry> {
79        let mmdb = match crate::MaxmindDb::instance().clone() {
80            Some(mmdb) => mmdb,
81            None => {
82                tracing::trace!("skipping mmdb telemetry, no maxmind database available");
83                return None;
84            }
85        };
86
87        match mmdb.lookup::<IpNetEntry>(ip) {
88            Ok(asn) => Some(asn),
89            Err(error) => {
90                tracing::warn!(%ip, %error, "ip not found in maxmind database");
91                None
92            }
93        }
94    }
95
96    #[tracing::instrument(skip_all)]
97    pub async fn update(source: Source) -> Result<()> {
98        let db = Self::from_source(source).await?;
99        CLIENT.store(Some(Arc::new(db)));
100        tracing::info!("maxmind database updated");
101        Ok(())
102    }
103
104    #[tracing::instrument(skip_all)]
105    pub async fn from_source(source: Source) -> Result<Self> {
106        match source {
107            Source::File { path } => Self::open(path).await,
108            Source::Url { url } => Self::open_url(&url).await,
109        }
110    }
111
112    #[tracing::instrument(skip_all, fields(path = %path.as_ref().display()))]
113    pub async fn open<A: AsRef<std::path::Path>>(path: A) -> Result<Self> {
114        let path = path.as_ref();
115        tracing::info!(path=%path.display(), "trying to read local maxmind database");
116        let bytes = Bytes::from(tokio::fs::read(path).await?);
117        Reader::from_source(bytes)
118            .map(Self::new)
119            .map_err(From::from)
120    }
121
122    /// Reads a Maxmind DB from `url`, and if `cache` is `true`, then will use
123    /// the cached result, retreiving a fresh copy otherwise.
124    #[tracing::instrument(skip_all, fields(url = %url))]
125    pub async fn open_url(url: &url::Url) -> Result<Self> {
126        tracing::info!("requesting maxmind database from network");
127
128        use http_body_util::BodyExt;
129        let data = HTTP
130            .get(url.as_str().try_into().unwrap())
131            .await?
132            .into_body()
133            .collect()
134            .await?
135            .to_bytes();
136
137        tracing::debug!("finished download");
138        let reader = Reader::from_source(data)?;
139
140        Ok(Self { reader })
141    }
142}
143
144impl std::ops::Deref for MaxmindDb {
145    type Target = Reader<Bytes>;
146
147    fn deref(&self) -> &Self::Target {
148        &self.reader
149    }
150}
151
152impl std::ops::DerefMut for MaxmindDb {
153    fn deref_mut(&mut self) -> &mut Self::Target {
154        &mut self.reader
155    }
156}
157
158#[derive(Clone, serde::Deserialize)]
159pub struct IpNetEntry {
160    #[serde(default, rename = "as")]
161    pub id: u64,
162    #[serde(default)]
163    pub as_cc: String,
164    #[serde(default)]
165    pub as_name: String,
166    #[serde(default)]
167    pub prefix_entity: String,
168    #[serde(default)]
169    pub prefix_name: String,
170    #[serde(default)]
171    pub prefix: String,
172}
173
174#[derive(Clone)]
175pub struct MetricsIpNetEntry {
176    pub prefix: String,
177    pub id: u64,
178}
179
180impl<'a> From<&'a IpNetEntry> for MetricsIpNetEntry {
181    fn from(value: &'a IpNetEntry) -> Self {
182        Self {
183            prefix: value.prefix.clone(),
184            id: value.id,
185        }
186    }
187}
188
189#[derive(Debug, thiserror::Error)]
190pub enum Error {
191    #[error(transparent)]
192    MaxmindDb(#[from] maxminddb::MaxMindDBError),
193    #[error(transparent)]
194    Http(#[from] hyper::Error),
195    #[error(transparent)]
196    HttpClient(#[from] legacy::Error),
197
198    #[error(transparent)]
199    Io(#[from] std::io::Error),
200}