ip2location_bin_format/content/
querier.rs1use std::{
2 collections::BTreeMap,
3 io::{Error as IoError, SeekFrom},
4};
5
6use futures_util::{AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _};
7
8use crate::{
9 content::UNKNOWN_STR,
10 record_field::{RecordFieldContent, RecordFieldContents},
11};
12
13pub const COUNTRY_NAME_INDEX_OFFSET: usize = 3;
15
16#[derive(Debug)]
18pub struct Querier<S> {
19 stream: S,
20 buf: Vec<u8>,
21 static_cache: BTreeMap<u32, Box<str>>,
22 #[cfg(feature = "lru")]
23 lru_cache: lru::LruCache<u32, Box<str>>,
24}
25
26impl<S> Querier<S> {
30 pub fn new(stream: S) -> Self {
31 Self {
32 stream,
33 buf: {
34 let len = 1 + 255;
35 let mut buf = Vec::with_capacity(len);
36 buf.resize_with(len, Default::default);
37 buf
38 },
39 static_cache: BTreeMap::default(),
40 #[cfg(feature = "lru")]
41 lru_cache: lru::LruCache::new(core::num::NonZeroUsize::new(10000).expect("")),
42 }
43 }
44}
45
46impl<S> Querier<S>
50where
51 S: AsyncSeek + AsyncRead + Unpin,
52{
53 pub async fn fill(
54 &mut self,
55 record_field_contents: &mut RecordFieldContents,
56 ) -> Result<(), FillError> {
57 for record_field_content in record_field_contents.iter_mut() {
58 let (seek_from_start, s_len_estimatable) = match record_field_content {
60 RecordFieldContent::COUNTRY(i, v, v_name) => {
61 if let Some(s) = self.static_cache.get(i) {
62 *v = filter_str(s);
63
64 if let Some(s) = self
65 .static_cache
66 .get(&(*i + COUNTRY_NAME_INDEX_OFFSET as u32))
67 {
68 *v_name = filter_str(s);
69
70 continue;
71 }
72 }
73
74 (*i, 28)
75 }
76 #[allow(unused_variables)]
77 RecordFieldContent::REGION(i, v) => {
78 #[cfg(feature = "lru")]
79 {
80 if let Some(s) = self.lru_cache.get(i) {
81 *v = filter_str(s);
82
83 continue;
84 }
85 }
86
87 (*i, 20)
88 }
89 #[allow(unused_variables)]
90 RecordFieldContent::CITY(i, v) => {
91 #[cfg(feature = "lru")]
92 {
93 if let Some(s) = self.lru_cache.get(i) {
94 *v = filter_str(s);
95
96 continue;
97 }
98 }
99
100 (*i, 20)
101 }
102 RecordFieldContent::ISP(i, _) => (*i, 10),
103 RecordFieldContent::DOMAIN(i, _) => (*i, 30),
104 RecordFieldContent::LATITUDE(_) => continue,
106 RecordFieldContent::LONGITUDE(_) => continue,
107 #[allow(unused_variables)]
108 RecordFieldContent::ZIPCODE(i, v) => {
109 #[cfg(feature = "lru")]
110 {
111 if let Some(s) = self.lru_cache.get(i) {
112 *v = filter_str(s);
113
114 continue;
115 }
116 }
117
118 (*i, 8)
119 }
120 #[allow(unused_variables)]
121 RecordFieldContent::TIMEZONE(i, v) => {
122 #[cfg(feature = "lru")]
123 {
124 if let Some(s) = self.lru_cache.get(i) {
125 *v = filter_str(s);
126
127 continue;
128 }
129 }
130
131 (*i, 8)
132 }
133 RecordFieldContent::NETSPEED(i, v) => {
134 if let Some(s) = self.static_cache.get(i) {
135 *v = filter_str(s);
136
137 continue;
138 }
139
140 (*i, 10)
142 }
143 RecordFieldContent::PROXYTYPE(i, v) => {
145 if let Some(s) = self.static_cache.get(i) {
146 *v = filter_str(s);
147
148 continue;
149 }
150
151 (*i, 3)
152 }
153 RecordFieldContent::USAGETYPE(i, v) => {
154 if let Some(s) = self.static_cache.get(i) {
155 *v = filter_str(s);
156
157 continue;
158 }
159
160 (*i, 3)
161 }
162 RecordFieldContent::ASN(i, _) => (*i, 10),
163 RecordFieldContent::AS(i, _) => (*i, 30),
164 RecordFieldContent::LASTSEEN(i, _) => (*i, 6),
165 RecordFieldContent::THREAT(i, _) => (*i, 30),
166 RecordFieldContent::RESIDENTIAL(i, _) => (*i, 30),
167 RecordFieldContent::PROVIDER(i, _) => (*i, 30),
168 };
169
170 self.stream
175 .seek(SeekFrom::Start(seek_from_start as u64))
176 .await
177 .map_err(FillError::SeekFailed)?;
178
179 let mut n_read = 0;
181
182 let n = self
184 .stream
185 .read(&mut self.buf[..s_len_estimatable + 1])
186 .await
187 .map_err(FillError::ReadFailed)?;
188 n_read += n;
189 if n == 0 {
190 return Err(FillError::Other("read is not completed in first read"));
191 }
192
193 let mut n_loop = 0;
195 loop {
196 loop {
198 if !self.buf.is_empty() {
199 let len = self.buf[0];
200
201 #[allow(clippy::int_plus_one)]
202 if (len as usize) <= n_read - 1 {
203 break;
204 }
205 }
206
207 let n = self
208 .stream
209 .read(&mut self.buf[n_read..])
210 .await
211 .map_err(FillError::ReadFailed)?;
212 n_read += n;
213
214 if n == 0 {
215 return Err(FillError::Other("read is not completed in loop read"));
216 }
217 }
218
219 let s_len = self.buf[0];
220 let s = core::str::from_utf8(&self.buf[1..1 + s_len as usize])
221 .map_err(FillError::ToUtf8Failed)?;
222
223 match record_field_content {
224 RecordFieldContent::COUNTRY(i, v, v_name) => {
225 match n_loop {
226 0 => {
227 *v = filter_str(s);
228 self.static_cache.insert(*i, s.into());
229
230 n_loop += 1;
231 self.buf.rotate_left(COUNTRY_NAME_INDEX_OFFSET);
234 n_read -= COUNTRY_NAME_INDEX_OFFSET;
235
236 continue;
237 }
238 1 => {
239 *v_name = filter_str(s);
240 self.static_cache
241 .insert(*i + COUNTRY_NAME_INDEX_OFFSET as u32, s.into());
242 }
243 _ => unreachable!(),
244 }
245 }
246 #[allow(unused_variables)]
247 RecordFieldContent::REGION(i, v) => {
248 *v = filter_str(s);
249 #[cfg(feature = "lru")]
250 {
251 self.lru_cache.push(*i, s.into());
252 }
253 }
254 #[allow(unused_variables)]
255 RecordFieldContent::CITY(i, v) => {
256 *v = filter_str(s);
257 #[cfg(feature = "lru")]
258 {
259 self.lru_cache.push(*i, s.into());
260 }
261 }
262 RecordFieldContent::ISP(_, v) => {
263 *v = filter_str(s);
264 }
265 RecordFieldContent::DOMAIN(_, v) => {
266 *v = filter_str(s);
267 }
268 RecordFieldContent::LATITUDE(_) => {}
270 RecordFieldContent::LONGITUDE(_) => {}
271 #[allow(unused_variables)]
272 RecordFieldContent::ZIPCODE(i, v) => {
273 *v = filter_str(s);
274 #[cfg(feature = "lru")]
275 {
276 self.lru_cache.push(*i, s.into());
277 }
278 }
279 #[allow(unused_variables)]
280 RecordFieldContent::TIMEZONE(i, v) => {
281 *v = filter_str(s);
282 #[cfg(feature = "lru")]
283 {
284 self.lru_cache.push(*i, s.into());
285 }
286 }
287 RecordFieldContent::NETSPEED(_, v) => {
288 *v = filter_str(s);
289 }
290 RecordFieldContent::PROXYTYPE(i, v) => {
292 *v = filter_str(s);
293 self.static_cache.insert(*i, s.into());
294 }
295 RecordFieldContent::USAGETYPE(i, v) => {
296 *v = filter_str(s);
297 self.static_cache.insert(*i, s.into());
298 }
299 RecordFieldContent::ASN(_, v) => {
300 *v = filter_str(s);
301 }
302 RecordFieldContent::AS(_, v) => {
303 *v = filter_str(s);
304 }
305 RecordFieldContent::LASTSEEN(_, v) => {
306 *v = filter_str(s);
307 }
308 RecordFieldContent::THREAT(_, v) => {
309 *v = filter_str(s);
310 }
311 RecordFieldContent::RESIDENTIAL(_, v) => {
312 *v = filter_str(s);
313 }
314 RecordFieldContent::PROVIDER(_, v) => {
315 *v = filter_str(s);
316 }
317 }
318
319 break;
320 }
321 }
322
323 Ok(())
324 }
325}
326
327fn filter_str(s: impl AsRef<str>) -> Option<Box<str>> {
328 let s = s.as_ref();
329 if s == UNKNOWN_STR {
330 None
331 } else {
332 Some(s.into())
333 }
334}
335
336#[derive(Debug)]
338pub enum FillError {
339 SeekFailed(IoError),
340 ReadFailed(IoError),
341 ToUtf8Failed(core::str::Utf8Error),
342 Other(&'static str),
343}
344
345impl core::fmt::Display for FillError {
346 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
347 write!(f, "{self:?}")
348 }
349}
350
351impl std::error::Error for FillError {}