rama_proxy/proxydb/
csv.rs

1use super::{Proxy, StringFilter};
2use rama_net::{
3    address::ProxyAddress,
4    asn::{Asn, InvalidAsn},
5    user::ProxyCredential,
6};
7use std::path::Path;
8use tokio::{
9    fs::File,
10    io::{AsyncBufReadExt, BufReader, Lines},
11};
12
13#[derive(Debug)]
14/// A CSV Reader that can be used to create a [`Proxy`] database from a CSV file or raw data.
15pub struct ProxyCsvRowReader {
16    data: ProxyCsvRowReaderData,
17}
18
19impl ProxyCsvRowReader {
20    /// Create a new [`ProxyCsvRowReader`] from the given CSV file.
21    pub async fn open(path: impl AsRef<Path>) -> Result<Self, ProxyCsvRowReaderError> {
22        let file = tokio::fs::File::open(path).await?;
23        let reader = BufReader::new(file);
24        let lines = reader.lines();
25        Ok(ProxyCsvRowReader {
26            data: ProxyCsvRowReaderData::File(lines),
27        })
28    }
29
30    /// Create a new [`ProxyCsvRowReader`] from the given CSV data.
31    pub fn raw(data: impl AsRef<str>) -> Self {
32        let lines: Vec<_> = data.as_ref().lines().rev().map(str::to_owned).collect();
33        ProxyCsvRowReader {
34            data: ProxyCsvRowReaderData::Raw(lines),
35        }
36    }
37
38    /// Read the next row from the CSV file.
39    pub async fn next(&mut self) -> Result<Option<Proxy>, ProxyCsvRowReaderError> {
40        match &mut self.data {
41            ProxyCsvRowReaderData::File(lines) => {
42                let line = lines.next_line().await?;
43                match line {
44                    Some(line) => Ok(Some(match parse_csv_row(&line) {
45                        Some(proxy) => proxy,
46                        None => {
47                            return Err(ProxyCsvRowReaderError {
48                                kind: ProxyCsvRowReaderErrorKind::InvalidRow(line),
49                            });
50                        }
51                    })),
52                    None => Ok(None),
53                }
54            }
55            ProxyCsvRowReaderData::Raw(lines) => match lines.pop() {
56                Some(line) => Ok(Some(match parse_csv_row(&line) {
57                    Some(proxy) => proxy,
58                    None => {
59                        return Err(ProxyCsvRowReaderError {
60                            kind: ProxyCsvRowReaderErrorKind::InvalidRow(line),
61                        });
62                    }
63                })),
64                None => Ok(None),
65            },
66        }
67    }
68}
69
70fn strip_csv_quotes(p: &str) -> &str {
71    p.strip_prefix('"')
72        .and_then(|p| p.strip_suffix('"'))
73        .unwrap_or(p)
74}
75
76pub(crate) fn parse_csv_row(row: &str) -> Option<Proxy> {
77    let mut iter = row.split(',').map(strip_csv_quotes);
78
79    let id = iter.next().and_then(|s| s.try_into().ok())?;
80
81    let tcp = iter.next().and_then(parse_csv_bool)?;
82    let udp = iter.next().and_then(parse_csv_bool)?;
83    let http = iter.next().and_then(parse_csv_bool)?;
84    let https = iter.next().and_then(parse_csv_bool)?;
85    let socks5 = iter.next().and_then(parse_csv_bool)?;
86    let socks5h = iter.next().and_then(parse_csv_bool)?;
87    let datacenter = iter.next().and_then(parse_csv_bool)?;
88    let residential = iter.next().and_then(parse_csv_bool)?;
89    let mobile = iter.next().and_then(parse_csv_bool)?;
90    let mut address = iter.next().and_then(|s| {
91        if s.is_empty() {
92            None
93        } else {
94            ProxyAddress::try_from(s).ok()
95        }
96    })?;
97    let pool_id = parse_csv_opt_string_filter(iter.next()?);
98    let continent = parse_csv_opt_string_filter(iter.next()?);
99    let country = parse_csv_opt_string_filter(iter.next()?);
100    let state = parse_csv_opt_string_filter(iter.next()?);
101    let city = parse_csv_opt_string_filter(iter.next()?);
102    let carrier = parse_csv_opt_string_filter(iter.next()?);
103    let asn = parse_csv_opt_asn(iter.next()?).ok()?;
104
105    // support header format or cleartext format
106    if let Some(value) = iter.next() {
107        if !value.is_empty() {
108            let credential = ProxyCredential::try_from_header_str(value)
109                .or_else(|_| ProxyCredential::try_from_clear_str(value.to_owned()))
110                .ok()?;
111            address.credential = Some(credential);
112        }
113    }
114
115    // Ensure there are no more values in the row
116    if iter.next().is_some() {
117        return None;
118    }
119
120    Some(Proxy {
121        id,
122        address,
123        tcp,
124        udp,
125        http,
126        https,
127        socks5,
128        socks5h,
129        datacenter,
130        residential,
131        mobile,
132        pool_id,
133        continent,
134        country,
135        state,
136        city,
137        carrier,
138        asn,
139    })
140}
141
142fn parse_csv_bool(value: &str) -> Option<bool> {
143    rama_utils::macros::match_ignore_ascii_case_str! {
144        match(value) {
145            "true" | "1" => Some(true),
146            "" | "false" | "0" | "null" | "nil" => Some(false),
147            _ => None,
148        }
149    }
150}
151
152fn parse_csv_opt_string_filter(value: &str) -> Option<StringFilter> {
153    if value.is_empty() {
154        None
155    } else {
156        Some(StringFilter::from(value))
157    }
158}
159
160fn parse_csv_opt_asn(value: &str) -> Result<Option<Asn>, InvalidAsn> {
161    if value.is_empty() {
162        Ok(None)
163    } else {
164        Asn::try_from(value).map(Some)
165    }
166}
167
168#[derive(Debug)]
169enum ProxyCsvRowReaderData {
170    File(Lines<BufReader<File>>),
171    Raw(Vec<String>),
172}
173
174#[derive(Debug)]
175/// An error that can occur when reading a Proxy CSV row.
176pub struct ProxyCsvRowReaderError {
177    kind: ProxyCsvRowReaderErrorKind,
178}
179
180#[derive(Debug)]
181/// The kind of error that can occur when reading a Proxy CSV row.
182pub enum ProxyCsvRowReaderErrorKind {
183    /// An I/O error occurred while reading the CSV row.
184    IoError(std::io::Error),
185    /// The CSV row is invalid, and could not be parsed.
186    InvalidRow(String),
187}
188
189impl std::fmt::Display for ProxyCsvRowReaderError {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        match &self.kind {
192            ProxyCsvRowReaderErrorKind::IoError(err) => write!(f, "I/O error: {}", err),
193            ProxyCsvRowReaderErrorKind::InvalidRow(row) => write!(f, "Invalid row: {}", row),
194        }
195    }
196}
197
198impl std::error::Error for ProxyCsvRowReaderError {
199    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
200        match &self.kind {
201            ProxyCsvRowReaderErrorKind::IoError(err) => Some(err),
202            ProxyCsvRowReaderErrorKind::InvalidRow(_) => None,
203        }
204    }
205}
206
207impl From<std::io::Error> for ProxyCsvRowReaderError {
208    fn from(err: std::io::Error) -> Self {
209        Self {
210            kind: ProxyCsvRowReaderErrorKind::IoError(err),
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::{ProxyFilter, proxydb::ProxyContext};
219    use rama_net::transport::TransportProtocol;
220    use rama_utils::str::NonEmptyString;
221    use std::str::FromStr;
222
223    #[test]
224    fn test_parse_csv_bool() {
225        for (input, output) in &[
226            ("1", Some(true)),
227            ("true", Some(true)),
228            ("True", Some(true)),
229            ("TRUE", Some(true)),
230            ("0", Some(false)),
231            ("false", Some(false)),
232            ("False", Some(false)),
233            ("FALSE", Some(false)),
234            ("null", Some(false)),
235            ("nil", Some(false)),
236            ("NULL", Some(false)),
237            ("NIL", Some(false)),
238            ("", Some(false)),
239            ("invalid", None),
240        ] {
241            assert_eq!(parse_csv_bool(input), *output);
242        }
243    }
244
245    #[test]
246    fn test_parse_csv_opt_string_filter() {
247        for (input, output) in [
248            ("", None),
249            ("value", Some("value")),
250            ("*", Some("*")),
251            ("Foo", Some("foo")),
252            ("  ok ", Some("ok")),
253            (" NO  ", Some("no")),
254        ] {
255            assert_eq!(
256                parse_csv_opt_string_filter(input)
257                    .as_ref()
258                    .map(|f| f.as_ref()),
259                output,
260            );
261        }
262    }
263
264    #[test]
265    fn test_parse_csv_opt_string_filter_is_any() {
266        let filter = parse_csv_opt_string_filter("*").unwrap();
267        assert!(venndb::Any::is_any(&filter));
268    }
269
270    #[test]
271    fn test_parse_csv_row_happy_path() {
272        for (input, output) in [
273            // most minimal
274            (
275                "id,,,,,,,,,,authority,,,,,,,,",
276                Proxy {
277                    id: NonEmptyString::from_static("id"),
278                    address: ProxyAddress::from_str("authority").unwrap(),
279                    tcp: false,
280                    udp: false,
281                    http: false,
282                    https: false,
283                    socks5: false,
284                    socks5h: false,
285                    datacenter: false,
286                    residential: false,
287                    mobile: false,
288                    pool_id: None,
289                    continent: None,
290                    country: None,
291                    state: None,
292                    city: None,
293                    carrier: None,
294                    asn: None,
295                },
296            ),
297            // more happy row tests
298            (
299                "id,true,false,true,,false,,true,false,true,authority,pool_id,,country,,city,carrier,,Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
300                Proxy {
301                    id: NonEmptyString::from_static("id"),
302                    address: ProxyAddress::from_str("username:password@authority").unwrap(),
303                    tcp: true,
304                    udp: false,
305                    http: true,
306                    https: false,
307                    socks5: false,
308                    socks5h: false,
309                    datacenter: true,
310                    residential: false,
311                    mobile: true,
312                    pool_id: Some("pool_id".into()),
313                    continent: None,
314                    country: Some("country".into()),
315                    state: None,
316                    city: Some("city".into()),
317                    carrier: Some("carrier".into()),
318                    asn: None,
319                },
320            ),
321            (
322                "123,1,0,False,,True,,null,false,true,host:1234,,americas,*,*,*,carrier,13335,",
323                Proxy {
324                    id: NonEmptyString::from_static("123"),
325                    address: ProxyAddress::from_str("host:1234").unwrap(),
326                    tcp: true,
327                    udp: false,
328                    http: false,
329                    https: false,
330                    socks5: true,
331                    socks5h: false,
332                    datacenter: false,
333                    residential: false,
334                    mobile: true,
335                    pool_id: None,
336                    continent: Some("americas".into()),
337                    country: Some("*".into()),
338                    state: Some("*".into()),
339                    city: Some("*".into()),
340                    carrier: Some("carrier".into()),
341                    asn: Some(Asn::from_static(13335)),
342                },
343            ),
344            (
345                "123,1,0,False,,True,,null,false,true,host:1234,,europe,*,,*,carrier,0",
346                Proxy {
347                    id: NonEmptyString::from_static("123"),
348                    address: ProxyAddress::from_str("host:1234").unwrap(),
349                    tcp: true,
350                    udp: false,
351                    http: false,
352                    https: false,
353                    socks5: true,
354                    socks5h: false,
355                    datacenter: false,
356                    residential: false,
357                    mobile: true,
358                    pool_id: None,
359                    continent: Some("europe".into()),
360                    country: Some("*".into()),
361                    state: None,
362                    city: Some("*".into()),
363                    carrier: Some("carrier".into()),
364                    asn: Some(Asn::unspecified()),
365                },
366            ),
367            (
368                "foo,1,0,1,,0,,1,0,0,bar,baz,,US,,,,",
369                Proxy {
370                    id: NonEmptyString::from_static("foo"),
371                    address: ProxyAddress::from_str("bar").unwrap(),
372                    tcp: true,
373                    udp: false,
374                    http: true,
375                    https: false,
376                    socks5: false,
377                    socks5h: false,
378                    datacenter: true,
379                    residential: false,
380                    mobile: false,
381                    pool_id: Some("baz".into()),
382                    continent: None,
383                    country: Some("us".into()),
384                    state: None,
385                    city: None,
386                    carrier: None,
387                    asn: None,
388                },
389            ),
390        ] {
391            let proxy = parse_csv_row(input).unwrap();
392            assert_eq!(proxy.id, output.id);
393            assert_eq!(proxy.address, output.address);
394            assert_eq!(proxy.tcp, output.tcp);
395            assert_eq!(proxy.udp, output.udp);
396            assert_eq!(proxy.http, output.http);
397            assert_eq!(proxy.socks5, output.socks5);
398            assert_eq!(proxy.datacenter, output.datacenter);
399            assert_eq!(proxy.residential, output.residential);
400            assert_eq!(proxy.mobile, output.mobile);
401            assert_eq!(proxy.pool_id, output.pool_id);
402            assert_eq!(proxy.continent, output.continent);
403            assert_eq!(proxy.country, output.country);
404            assert_eq!(proxy.state, output.state);
405            assert_eq!(proxy.city, output.city);
406            assert_eq!(proxy.carrier, output.carrier);
407            assert_eq!(proxy.asn, output.asn);
408        }
409    }
410
411    #[test]
412    fn test_parse_csv_row_mistakes() {
413        for input in [
414            // garbage rows
415            "",
416            ",",
417            ",,,,,,",
418            ",,,,,,,,,,,,,,,,,,,,",
419            ",,,,,,,,,,,,,,,,,,,,,,",
420            ",,,,,,,,,,,,,,,,,,,,,,,",
421            // too many rows
422            "id,true,false,true,false,true,false,true,authority,pool_id,continent,country,state,city,carrier,15169,Basic dXNlcm5hbWU6cGFzc3dvcmQ=,",
423            // missing authority
424            "id,,,,,,,,,,,,,,,,",
425            // missing proxy id
426            ",,,,,,,,authority,,,,,,,,",
427            // invalid bool values
428            "id,foo,,,,,,,,,authority,,,,,,,,",
429            "id,,foo,,,,,,,,authority,,,,,,,,",
430            "id,,,foo,,,,,,,authority,,,,,,,,",
431            "id,,,,,foo,,,,,authority,,,,,,,,",
432            "id,,,,,,foo,,,,authority,,,,,,,,",
433            "id,,,,,,,,foo,,authority,,,,,,,,",
434            "id,,,,,,,foo,authority,,,,,,,,",
435            // invalid credentials
436            "id,,,,,,,,authority,,,,,:foo",
437        ] {
438            assert!(parse_csv_row(input).is_none(), "input: {}", input);
439        }
440    }
441
442    #[tokio::test]
443    async fn test_proxy_csv_row_reader_happy_one_row() {
444        let mut reader = ProxyCsvRowReader::raw(
445            "id,true,false,true,,false,,true,false,true,authority,pool_id,continent,country,state,city,carrier,13335,Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
446        );
447        let proxy = reader.next().await.unwrap().unwrap();
448
449        assert_eq!(proxy.id, "id");
450        assert_eq!(
451            proxy.address,
452            ProxyAddress::from_str("username:password@authority").unwrap()
453        );
454        assert!(proxy.tcp);
455        assert!(!proxy.udp);
456        assert!(proxy.http);
457        assert!(!proxy.socks5);
458        assert!(proxy.datacenter);
459        assert!(!proxy.residential);
460        assert!(proxy.mobile);
461        assert_eq!(proxy.pool_id, Some("pool_id".into()));
462        assert_eq!(proxy.continent, Some("continent".into()));
463        assert_eq!(proxy.country, Some("country".into()));
464        assert_eq!(proxy.state, Some("state".into()));
465        assert_eq!(proxy.city, Some("city".into()));
466        assert_eq!(proxy.carrier, Some("carrier".into()));
467        assert_eq!(proxy.asn, Some(Asn::from_static(13335)));
468
469        // no more rows to read
470        assert!(reader.next().await.unwrap().is_none());
471    }
472
473    #[tokio::test]
474    async fn test_proxy_csv_row_reader_happy_multi_row() {
475        let mut reader = ProxyCsvRowReader::raw(
476            "id,true,false,false,true,true,false,true,false,true,authority,pool_id,continent,country,state,city,carrier,42,Basic dXNlcm5hbWU6cGFzc3dvcmQ=\nid2,1,0,0,0,0,0,1,0,0,authority2,pool_id2,continent2,country2,state2,city2,carrier2,1",
477        );
478
479        let proxy = reader.next().await.unwrap().unwrap();
480        assert_eq!(proxy.id, "id");
481        assert_eq!(
482            proxy.address,
483            ProxyAddress::from_str("username:password@authority").unwrap()
484        );
485        assert!(proxy.tcp);
486        assert!(!proxy.udp);
487        assert!(!proxy.http);
488        assert!(proxy.https);
489        assert!(proxy.socks5);
490        assert!(!proxy.socks5h);
491        assert!(proxy.datacenter);
492        assert!(!proxy.residential);
493        assert!(proxy.mobile);
494        assert_eq!(proxy.pool_id, Some("pool_id".into()));
495        assert_eq!(proxy.continent, Some("continent".into()));
496        assert_eq!(proxy.country, Some("country".into()));
497        assert_eq!(proxy.state, Some("state".into()));
498        assert_eq!(proxy.city, Some("city".into()));
499        assert_eq!(proxy.carrier, Some("carrier".into()));
500        assert_eq!(proxy.asn, Some(Asn::from_static(42)));
501
502        let proxy = reader.next().await.unwrap().unwrap();
503
504        assert_eq!(proxy.id, "id2");
505        assert_eq!(proxy.address, ProxyAddress::from_str("authority2").unwrap());
506        assert!(proxy.tcp);
507        assert!(!proxy.udp);
508        assert!(!proxy.http);
509        assert!(!proxy.https);
510        assert!(!proxy.socks5);
511        assert!(!proxy.socks5h);
512        assert!(proxy.datacenter);
513        assert!(!proxy.residential);
514        assert!(!proxy.mobile);
515        assert_eq!(proxy.pool_id, Some("pool_id2".into()));
516        assert_eq!(proxy.continent, Some("continent2".into()));
517        assert_eq!(proxy.country, Some("country2".into()));
518        assert_eq!(proxy.city, Some("city2".into()));
519        assert_eq!(proxy.state, Some("state2".into()));
520        assert_eq!(proxy.carrier, Some("carrier2".into()));
521        assert_eq!(proxy.asn, Some(Asn::from_static(1)));
522
523        // no more rows to read
524        assert!(reader.next().await.unwrap().is_none());
525    }
526
527    #[tokio::test]
528    async fn test_proxy_csv_row_reader_failure_empty_data() {
529        let mut reader = ProxyCsvRowReader::raw("");
530        assert!(reader.next().await.unwrap().is_none());
531    }
532
533    #[tokio::test]
534    async fn test_proxy_csv_row_reader_failure_invalid_row() {
535        let mut reader = ProxyCsvRowReader::raw(",,,,,,,,,,,");
536        assert!(reader.next().await.is_err());
537    }
538
539    #[test]
540    fn test_proxy_is_match_happy_path_proxy_with_any_filter_string_cases() {
541        let proxy = parse_csv_row("id,1,,1,,,,,,,authority,*,*,*,*,*,*,0").unwrap();
542        let ctx = ProxyContext {
543            protocol: TransportProtocol::Tcp,
544        };
545
546        for filter in [
547            ProxyFilter::default(),
548            ProxyFilter {
549                pool_id: Some(vec![StringFilter::new("pool_a")]),
550                country: Some(vec![StringFilter::new("country_a")]),
551                city: Some(vec![StringFilter::new("city_a")]),
552                carrier: Some(vec![StringFilter::new("carrier_a")]),
553                ..Default::default()
554            },
555            ProxyFilter {
556                pool_id: Some(vec![StringFilter::new("pool_a")]),
557                ..Default::default()
558            },
559            ProxyFilter {
560                continent: Some(vec![StringFilter::new("continent_a")]),
561                ..Default::default()
562            },
563            ProxyFilter {
564                country: Some(vec![StringFilter::new("country_a")]),
565                ..Default::default()
566            },
567            ProxyFilter {
568                state: Some(vec![StringFilter::new("state_a")]),
569                ..Default::default()
570            },
571            ProxyFilter {
572                city: Some(vec![StringFilter::new("city_a")]),
573                carrier: Some(vec![StringFilter::new("carrier_a")]),
574                ..Default::default()
575            },
576            ProxyFilter {
577                carrier: Some(vec![StringFilter::new("carrier_a")]),
578                ..Default::default()
579            },
580        ] {
581            assert!(proxy.is_match(&ctx, &filter), "filter: {:?}", filter);
582        }
583    }
584
585    #[test]
586    fn test_proxy_is_match_happy_path_proxy_with_any_filters_cases() {
587        let proxy =
588            parse_csv_row("id,1,,1,,,,,,,authority,pool,continent,country,state,city,carrier,42")
589                .unwrap();
590        let ctx = ProxyContext {
591            protocol: TransportProtocol::Tcp,
592        };
593
594        for filter in [
595            ProxyFilter::default(),
596            ProxyFilter {
597                pool_id: Some(vec![StringFilter::new("*")]),
598                ..Default::default()
599            },
600            ProxyFilter {
601                continent: Some(vec![StringFilter::new("*")]),
602                ..Default::default()
603            },
604            ProxyFilter {
605                country: Some(vec![StringFilter::new("*")]),
606                ..Default::default()
607            },
608            ProxyFilter {
609                state: Some(vec![StringFilter::new("*")]),
610                ..Default::default()
611            },
612            ProxyFilter {
613                city: Some(vec![StringFilter::new("*")]),
614                ..Default::default()
615            },
616            ProxyFilter {
617                carrier: Some(vec![StringFilter::new("*")]),
618                ..Default::default()
619            },
620            ProxyFilter {
621                pool_id: Some(vec![StringFilter::new("pool")]),
622                continent: Some(vec![StringFilter::new("continent")]),
623                country: Some(vec![StringFilter::new("country")]),
624                state: Some(vec![StringFilter::new("state")]),
625                city: Some(vec![StringFilter::new("city")]),
626                carrier: Some(vec![StringFilter::new("carrier")]),
627                asn: Some(vec![Asn::from_static(42)]),
628                ..Default::default()
629            },
630            ProxyFilter {
631                pool_id: Some(vec![StringFilter::new("*")]),
632                country: Some(vec![StringFilter::new("country")]),
633                city: Some(vec![StringFilter::new("city")]),
634                carrier: Some(vec![StringFilter::new("carrier")]),
635                ..Default::default()
636            },
637            ProxyFilter {
638                pool_id: Some(vec![StringFilter::new("pool")]),
639                country: Some(vec![StringFilter::new("*")]),
640                city: Some(vec![StringFilter::new("city")]),
641                carrier: Some(vec![StringFilter::new("carrier")]),
642                ..Default::default()
643            },
644            ProxyFilter {
645                pool_id: Some(vec![StringFilter::new("pool")]),
646                country: Some(vec![StringFilter::new("country")]),
647                city: Some(vec![StringFilter::new("*")]),
648                carrier: Some(vec![StringFilter::new("carrier")]),
649                ..Default::default()
650            },
651            ProxyFilter {
652                pool_id: Some(vec![StringFilter::new("pool")]),
653                country: Some(vec![StringFilter::new("country")]),
654                city: Some(vec![StringFilter::new("city")]),
655                carrier: Some(vec![StringFilter::new("*")]),
656                ..Default::default()
657            },
658            ProxyFilter {
659                continent: Some(vec![StringFilter::new("*")]),
660                ..Default::default()
661            },
662        ] {
663            assert!(proxy.is_match(&ctx, &filter), "filter: {:?}", filter);
664        }
665    }
666}