mmdb_grpc/
lib.rs

1pub mod proto;
2
3use crate::proto::geoip2::*;
4use crate::proto::geoip2_grpc::*;
5use futures::prelude::*;
6use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
7use grpcio_health::proto::*;
8use log::{debug, error};
9use maxminddb::{self, geoip2, MaxMindDBError, Metadata};
10use spin::RwLock;
11use std::collections::{BTreeMap, HashMap, HashSet};
12use std::sync::Arc;
13
14#[derive(Clone)]
15pub struct CityService<T, R>(Arc<RwLock<maxminddb::Reader<T>>>, R)
16where
17    T: AsRef<[u8]>,
18    R: Fn() -> Result<maxminddb::Reader<T>, MaxMindDBError>;
19
20impl<T, R> CityService<T, R>
21where
22    T: AsRef<[u8]>,
23    R: Fn() -> Result<maxminddb::Reader<T>, MaxMindDBError>,
24{
25    pub fn new(db: Arc<RwLock<maxminddb::Reader<T>>>, reloader: R) -> CityService<T, R> {
26        CityService(db, reloader)
27    }
28}
29
30impl<T, R> GeoIp for CityService<T, R>
31where
32    T: AsRef<[u8]>,
33    R: Fn() -> Result<maxminddb::Reader<T>, MaxMindDBError>,
34{
35    fn lookup(&mut self, ctx: RpcContext<'_>, req: Message, sink: UnarySink<CityReply>) {
36        debug!("received the message: {:?}", req);
37
38        let Message { ip, locales, .. } = req;
39        let result = ip
40            .parse()
41            .map_err(|_| {
42                RpcStatus::with_message(
43                    RpcStatusCode::INVALID_ARGUMENT,
44                    format!("The request must be IP address but given '{}'", ip),
45                )
46            })
47            .and_then(|ip| {
48                let db = (*self.0).read();
49                match db.lookup::<geoip2::City>(ip) {
50                    Ok(value) => {
51                        let ns = locales.iter().map(|l| l.to_string()).collect::<HashSet<_>>();
52                        Ok(CityReply::from(WrappedCity(value, ns)))
53                    }
54                    Err(err) => Err(convert_error(err)),
55                }
56            });
57
58        let f = match result {
59            Ok(reply) => sink.success(reply),
60            Err(status) => sink.fail(status),
61        };
62
63        let f = f
64            .map_err(move |err| error!("failed to reply, cause: {:?}", err))
65            .map(|_| ());
66
67        ctx.spawn(f)
68    }
69
70    fn metadata(&mut self, ctx: RpcContext<'_>, _req: Empty, sink: UnarySink<MetadataReply>) {
71        let result = MetadataReply::from(&self.0.read().metadata);
72        let f = sink
73            .success(result)
74            .map_err(move |err| error!("failed to reply, cause: {:?}", err))
75            .map(|_| ());
76        ctx.spawn(f)
77    }
78
79    fn reload(&mut self, ctx: RpcContext<'_>, _req: Empty, sink: UnarySink<MetadataReply>) {
80        let result = self.1()
81            .map(|reader| {
82                let mut guard = self.0.write();
83                *guard = reader;
84                MetadataReply::from(&guard.metadata)
85            })
86            .map_err(convert_error);
87
88        let f = match result {
89            Ok(reply) => sink.success(reply),
90            Err(status) => sink.fail(status),
91        };
92
93        let f = f
94            .map_err(move |err| error!("failed to reply, cause: {:?}", err))
95            .map(|_| ());
96
97        ctx.spawn(f)
98    }
99}
100
101impl ToString for Message_Locale {
102    fn to_string(&self) -> String {
103        match self {
104            Message_Locale::BRAZLIAN_PORTUGUESE => "pt-BR".into(),
105            Message_Locale::ENGLISH => "en".into(),
106            Message_Locale::FRENCH => "fr".into(),
107            Message_Locale::GERMAN => "de".into(),
108            Message_Locale::JAPANESE => "ja".into(),
109            Message_Locale::RUSSIAN => "ru".into(),
110            Message_Locale::SIMPLIFIED_CHINESE => "zh-CN".into(),
111            Message_Locale::SPANISH => "es".into(),
112            Message_Locale::UNSPECIFIED => "".into(), // TODO: should it panic?
113        }
114    }
115}
116
117struct WrappedCity<'a>(geoip2::City<'a>, HashSet<String>);
118
119impl<'a> From<WrappedCity<'a>> for CityReply {
120    fn from(geo_city: WrappedCity) -> CityReply {
121        let mut reply = CityReply::default();
122
123        let filter = geo_city.1;
124
125        if let Some(c) = geo_city.0.city {
126            reply.set_city(City::from(MCity(c, &filter)));
127        }
128
129        if let Some(c) = geo_city.0.continent {
130            reply.set_continent(Continent::from(MContinent(c, &filter)));
131        }
132
133        if let Some(c) = geo_city.0.country {
134            reply.set_country(Country::from(MCountry(c, &filter)));
135        }
136
137        if let Some(c) = geo_city.0.location {
138            reply.set_location(Location::from(c));
139        }
140
141        if let Some(c) = geo_city.0.postal {
142            reply.set_postal(Postal::from(c));
143        }
144
145        if let Some(c) = geo_city.0.registered_country {
146            reply.set_registered_country(Country::from(MCountry(c, &filter)));
147        }
148
149        if let Some(c) = geo_city.0.represented_country {
150            reply.set_represented_country(RepresentedCountry::from(MRepresentedCountry(c, &filter)));
151        }
152
153        if let Some(xs) = geo_city.0.subdivisions {
154            let subs = Subdivisions::from(xs);
155            let vs = ::protobuf::RepeatedField::from_vec(subs.0);
156            reply.set_subdivisions(vs);
157        }
158
159        if let Some(c) = geo_city.0.traits {
160            reply.set_traits(Traits::from(c));
161        }
162
163        reply
164    }
165}
166
167struct MCity<'a>(geoip2::city::City<'a>, &'a HashSet<String>);
168
169impl<'a> From<MCity<'a>> for City {
170    fn from(c: MCity) -> City {
171        let mut r = City::default();
172        if let Some(a) = c.0.geoname_id {
173            r.set_geoname_id(a);
174        }
175        if let Some(n) = c.0.names {
176            r.set_names(filter_locales(&n, c.1));
177        }
178        r
179    }
180}
181
182struct MContinent<'a>(geoip2::city::Continent<'a>, &'a HashSet<String>);
183
184impl<'a> From<MContinent<'a>> for Continent {
185    fn from(c: MContinent) -> Continent {
186        let mut r = Continent::default();
187        if let Some(a) = c.0.code {
188            r.set_code(a.to_string())
189        }
190        if let Some(a) = c.0.geoname_id {
191            r.set_geoname_id(a)
192        }
193        if let Some(n) = c.0.names {
194            r.set_names(filter_locales(&n, c.1));
195        }
196        r
197    }
198}
199
200struct MCountry<'a>(geoip2::city::Country<'a>, &'a HashSet<String>);
201
202impl<'a> From<MCountry<'a>> for Country {
203    fn from(c: MCountry) -> Country {
204        let mut r = Country::default();
205        if let Some(a) = c.0.geoname_id {
206            r.set_geoname_id(a);
207        }
208        if let Some(a) = c.0.is_in_european_union {
209            r.is_in_european_union = a;
210        }
211        if let Some(a) = c.0.iso_code {
212            r.set_iso_code(a.to_string());
213        }
214        if let Some(n) = &c.0.names {
215            r.set_names(filter_locales(n, c.1)); // TODO: clone
216        }
217        r
218    }
219}
220
221impl<'a> From<geoip2::city::Location<'a>> for Location {
222    fn from(c: geoip2::city::Location) -> Location {
223        let mut r = Location::default();
224        if let Some(a) = c.latitude {
225            r.set_latitude(a);
226        }
227        if let Some(a) = c.longitude {
228            r.set_longitude(a);
229        }
230        if let Some(a) = c.metro_code {
231            r.set_metro_code(a as u32);
232        }
233        if let Some(a) = c.time_zone {
234            r.set_time_zone(a.to_string());
235        }
236        r
237    }
238}
239
240impl<'a> From<geoip2::city::Postal<'a>> for Postal {
241    fn from(c: geoip2::city::Postal) -> Postal {
242        let mut r = Postal::default();
243        if let Some(a) = c.code {
244            r.set_code(a.to_string());
245        }
246        r
247    }
248}
249
250struct MRepresentedCountry<'a>(geoip2::city::RepresentedCountry<'a>, &'a HashSet<String>);
251
252impl<'a> From<MRepresentedCountry<'a>> for RepresentedCountry {
253    fn from(c: MRepresentedCountry) -> RepresentedCountry {
254        let mut r = RepresentedCountry::default();
255        if let Some(a) = c.0.geoname_id {
256            r.set_geoname_id(a);
257        }
258        if let Some(a) = c.0.iso_code {
259            r.set_iso_code(a.to_string());
260        }
261        if let Some(n) = c.0.names {
262            r.set_names(filter_locales(&n, c.1));
263        }
264        r
265    }
266}
267
268#[derive(PartialEq, Clone, Default)]
269struct Subdivisions(Vec<Subdivision>);
270
271impl<'a> From<Vec<geoip2::city::Subdivision<'a>>> for Subdivisions {
272    fn from(vs: Vec<geoip2::city::Subdivision>) -> Subdivisions {
273        let mut subs = Vec::with_capacity(vs.len());
274
275        for s in vs {
276            let mut sub = Subdivision::default();
277            if let Some(v) = s.geoname_id {
278                sub.set_geoname_id(v);
279            }
280            if let Some(v) = s.iso_code {
281                sub.set_iso_code(v.to_string());
282            }
283            subs.push(sub);
284        }
285        Subdivisions(subs)
286    }
287}
288
289impl From<geoip2::city::Traits> for Traits {
290    fn from(c: geoip2::city::Traits) -> Traits {
291        let mut t = Traits::default();
292        if let Some(v) = c.is_anonymous_proxy {
293            t.is_anonymous_proxy = v;
294        }
295        if let Some(v) = c.is_satellite_provider {
296            t.is_satellite_provider = v;
297        }
298        t
299    }
300}
301
302fn convert_error(err: MaxMindDBError) -> RpcStatus {
303    match err {
304        MaxMindDBError::AddressNotFoundError(msg) => RpcStatus::with_message(RpcStatusCode::NOT_FOUND, msg),
305        MaxMindDBError::InvalidNetworkError(msg) => RpcStatus::with_message(RpcStatusCode::INTERNAL, msg),
306        MaxMindDBError::InvalidDatabaseError(msg) => RpcStatus::with_message(RpcStatusCode::INTERNAL, msg),
307        MaxMindDBError::IoError(msg) => RpcStatus::with_message(RpcStatusCode::INTERNAL, msg),
308        MaxMindDBError::MapError(msg) => RpcStatus::with_message(RpcStatusCode::INTERNAL, msg),
309        MaxMindDBError::DecodingError(msg) => RpcStatus::with_message(RpcStatusCode::INTERNAL, msg),
310    }
311}
312
313fn filter_locales<'a>(names: &'a BTreeMap<&'a str, &'a str>, filter: &'a HashSet<String>) -> HashMap<String, String> {
314    let cap = if filter.is_empty() { names.len() } else { filter.len() };
315    let mut h = HashMap::with_capacity(cap);
316    for (k, v) in names.iter() {
317        let k = k.to_string();
318        if filter.is_empty() || filter.contains(&k) {
319            h.insert(k, v.to_string());
320        }
321    }
322    h
323}
324
325impl From<&Metadata> for MetadataReply {
326    fn from(v: &Metadata) -> MetadataReply {
327        let mut r = MetadataReply::default();
328        r.set_binary_format_major_version(v.binary_format_major_version as u32);
329        r.set_binary_format_minor_version(v.binary_format_minor_version as u32);
330        r.set_build_epoch(v.build_epoch);
331        r.set_database_type(v.database_type.clone());
332        let d =
333            v.description
334                .clone()
335                .into_iter()
336                .fold(HashMap::with_capacity(v.description.len()), |mut acc, (k, v)| {
337                    acc.insert(k, v);
338                    acc
339                });
340        r.set_description(d);
341        r.set_ip_version(v.ip_version as u32);
342        r.set_languages(::protobuf::RepeatedField::from_vec(v.languages.clone()));
343        r.set_node_count(v.node_count);
344        r.set_record_size(v.record_size as u32);
345        r
346    }
347}
348
349#[derive(Clone)]
350pub struct HealthService;
351
352impl Health for HealthService {
353    fn check(&mut self, ctx: RpcContext<'_>, req: HealthCheckRequest, sink: UnarySink<HealthCheckResponse>) {
354        debug!("check the service: {}", req.get_service());
355        let mut resp = HealthCheckResponse::default();
356        resp.set_status(ServingStatus::Serving);
357        ctx.spawn(
358            sink.success(resp)
359                .map_err(|e| error!("failed to report result: {:?}", e))
360                .map(|_| ()),
361        );
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368
369    #[test]
370    fn test_filter_locales() {
371        let mut src = BTreeMap::new();
372        src.insert("1", "one");
373        src.insert("2", "two");
374        src.insert("3", "three");
375        src.insert("4", "four");
376
377        let mut filters = HashSet::new();
378        filters.insert("11".to_string());
379        filters.insert("2".to_string());
380        filters.insert("3".to_string());
381        let actual = filter_locales(&src, &filters);
382
383        let mut expected = HashMap::new();
384        expected.insert("2".to_string(), "two".to_string());
385        expected.insert("3".to_string(), "three".to_string());
386        assert_eq!(actual, expected);
387
388        let filters = HashSet::new();
389        let actual = filter_locales(&src, &filters);
390
391        let mut expected = HashMap::new();
392        expected.insert("1".to_string(), "one".to_string());
393        expected.insert("2".to_string(), "two".to_string());
394        expected.insert("3".to_string(), "three".to_string());
395        expected.insert("4".to_string(), "four".to_string());
396        assert_eq!(actual, expected);
397    }
398}