1use super::{Proxy, StringFilter};
2use rama_net::{
3 address::ProxyAddress,
4 asn::{Asn, InvalidAsn},
5 user::ProxyCredential,
6};
7use std::path::Path;
8use tokio::{
9 fs::File,
10 io::{AsyncBufReadExt, BufReader, Lines},
11};
12
13#[derive(Debug)]
14pub struct ProxyCsvRowReader {
16 data: ProxyCsvRowReaderData,
17}
18
19impl ProxyCsvRowReader {
20 pub async fn open(path: impl AsRef<Path>) -> Result<Self, ProxyCsvRowReaderError> {
22 let file = tokio::fs::File::open(path).await?;
23 let reader = BufReader::new(file);
24 let lines = reader.lines();
25 Ok(ProxyCsvRowReader {
26 data: ProxyCsvRowReaderData::File(lines),
27 })
28 }
29
30 pub fn raw(data: impl AsRef<str>) -> Self {
32 let lines: Vec<_> = data.as_ref().lines().rev().map(str::to_owned).collect();
33 ProxyCsvRowReader {
34 data: ProxyCsvRowReaderData::Raw(lines),
35 }
36 }
37
38 pub async fn next(&mut self) -> Result<Option<Proxy>, ProxyCsvRowReaderError> {
40 match &mut self.data {
41 ProxyCsvRowReaderData::File(lines) => {
42 let line = lines.next_line().await?;
43 match line {
44 Some(line) => Ok(Some(match parse_csv_row(&line) {
45 Some(proxy) => proxy,
46 None => {
47 return Err(ProxyCsvRowReaderError {
48 kind: ProxyCsvRowReaderErrorKind::InvalidRow(line),
49 });
50 }
51 })),
52 None => Ok(None),
53 }
54 }
55 ProxyCsvRowReaderData::Raw(lines) => match lines.pop() {
56 Some(line) => Ok(Some(match parse_csv_row(&line) {
57 Some(proxy) => proxy,
58 None => {
59 return Err(ProxyCsvRowReaderError {
60 kind: ProxyCsvRowReaderErrorKind::InvalidRow(line),
61 });
62 }
63 })),
64 None => Ok(None),
65 },
66 }
67 }
68}
69
70fn strip_csv_quotes(p: &str) -> &str {
71 p.strip_prefix('"')
72 .and_then(|p| p.strip_suffix('"'))
73 .unwrap_or(p)
74}
75
76pub(crate) fn parse_csv_row(row: &str) -> Option<Proxy> {
77 let mut iter = row.split(',').map(strip_csv_quotes);
78
79 let id = iter.next().and_then(|s| s.try_into().ok())?;
80
81 let tcp = iter.next().and_then(parse_csv_bool)?;
82 let udp = iter.next().and_then(parse_csv_bool)?;
83 let http = iter.next().and_then(parse_csv_bool)?;
84 let https = iter.next().and_then(parse_csv_bool)?;
85 let socks5 = iter.next().and_then(parse_csv_bool)?;
86 let socks5h = iter.next().and_then(parse_csv_bool)?;
87 let datacenter = iter.next().and_then(parse_csv_bool)?;
88 let residential = iter.next().and_then(parse_csv_bool)?;
89 let mobile = iter.next().and_then(parse_csv_bool)?;
90 let mut address = iter.next().and_then(|s| {
91 if s.is_empty() {
92 None
93 } else {
94 ProxyAddress::try_from(s).ok()
95 }
96 })?;
97 let pool_id = parse_csv_opt_string_filter(iter.next()?);
98 let continent = parse_csv_opt_string_filter(iter.next()?);
99 let country = parse_csv_opt_string_filter(iter.next()?);
100 let state = parse_csv_opt_string_filter(iter.next()?);
101 let city = parse_csv_opt_string_filter(iter.next()?);
102 let carrier = parse_csv_opt_string_filter(iter.next()?);
103 let asn = parse_csv_opt_asn(iter.next()?).ok()?;
104
105 if let Some(value) = iter.next() {
107 if !value.is_empty() {
108 let credential = ProxyCredential::try_from_header_str(value)
109 .or_else(|_| ProxyCredential::try_from_clear_str(value.to_owned()))
110 .ok()?;
111 address.credential = Some(credential);
112 }
113 }
114
115 if iter.next().is_some() {
117 return None;
118 }
119
120 Some(Proxy {
121 id,
122 address,
123 tcp,
124 udp,
125 http,
126 https,
127 socks5,
128 socks5h,
129 datacenter,
130 residential,
131 mobile,
132 pool_id,
133 continent,
134 country,
135 state,
136 city,
137 carrier,
138 asn,
139 })
140}
141
142fn parse_csv_bool(value: &str) -> Option<bool> {
143 rama_utils::macros::match_ignore_ascii_case_str! {
144 match(value) {
145 "true" | "1" => Some(true),
146 "" | "false" | "0" | "null" | "nil" => Some(false),
147 _ => None,
148 }
149 }
150}
151
152fn parse_csv_opt_string_filter(value: &str) -> Option<StringFilter> {
153 if value.is_empty() {
154 None
155 } else {
156 Some(StringFilter::from(value))
157 }
158}
159
160fn parse_csv_opt_asn(value: &str) -> Result<Option<Asn>, InvalidAsn> {
161 if value.is_empty() {
162 Ok(None)
163 } else {
164 Asn::try_from(value).map(Some)
165 }
166}
167
168#[derive(Debug)]
169enum ProxyCsvRowReaderData {
170 File(Lines<BufReader<File>>),
171 Raw(Vec<String>),
172}
173
174#[derive(Debug)]
175pub struct ProxyCsvRowReaderError {
177 kind: ProxyCsvRowReaderErrorKind,
178}
179
180#[derive(Debug)]
181pub enum ProxyCsvRowReaderErrorKind {
183 IoError(std::io::Error),
185 InvalidRow(String),
187}
188
189impl std::fmt::Display for ProxyCsvRowReaderError {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 match &self.kind {
192 ProxyCsvRowReaderErrorKind::IoError(err) => write!(f, "I/O error: {}", err),
193 ProxyCsvRowReaderErrorKind::InvalidRow(row) => write!(f, "Invalid row: {}", row),
194 }
195 }
196}
197
198impl std::error::Error for ProxyCsvRowReaderError {
199 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
200 match &self.kind {
201 ProxyCsvRowReaderErrorKind::IoError(err) => Some(err),
202 ProxyCsvRowReaderErrorKind::InvalidRow(_) => None,
203 }
204 }
205}
206
207impl From<std::io::Error> for ProxyCsvRowReaderError {
208 fn from(err: std::io::Error) -> Self {
209 Self {
210 kind: ProxyCsvRowReaderErrorKind::IoError(err),
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::{ProxyFilter, proxydb::ProxyContext};
219 use rama_net::transport::TransportProtocol;
220 use rama_utils::str::NonEmptyString;
221 use std::str::FromStr;
222
223 #[test]
224 fn test_parse_csv_bool() {
225 for (input, output) in &[
226 ("1", Some(true)),
227 ("true", Some(true)),
228 ("True", Some(true)),
229 ("TRUE", Some(true)),
230 ("0", Some(false)),
231 ("false", Some(false)),
232 ("False", Some(false)),
233 ("FALSE", Some(false)),
234 ("null", Some(false)),
235 ("nil", Some(false)),
236 ("NULL", Some(false)),
237 ("NIL", Some(false)),
238 ("", Some(false)),
239 ("invalid", None),
240 ] {
241 assert_eq!(parse_csv_bool(input), *output);
242 }
243 }
244
245 #[test]
246 fn test_parse_csv_opt_string_filter() {
247 for (input, output) in [
248 ("", None),
249 ("value", Some("value")),
250 ("*", Some("*")),
251 ("Foo", Some("foo")),
252 (" ok ", Some("ok")),
253 (" NO ", Some("no")),
254 ] {
255 assert_eq!(
256 parse_csv_opt_string_filter(input)
257 .as_ref()
258 .map(|f| f.as_ref()),
259 output,
260 );
261 }
262 }
263
264 #[test]
265 fn test_parse_csv_opt_string_filter_is_any() {
266 let filter = parse_csv_opt_string_filter("*").unwrap();
267 assert!(venndb::Any::is_any(&filter));
268 }
269
270 #[test]
271 fn test_parse_csv_row_happy_path() {
272 for (input, output) in [
273 (
275 "id,,,,,,,,,,authority,,,,,,,,",
276 Proxy {
277 id: NonEmptyString::from_static("id"),
278 address: ProxyAddress::from_str("authority").unwrap(),
279 tcp: false,
280 udp: false,
281 http: false,
282 https: false,
283 socks5: false,
284 socks5h: false,
285 datacenter: false,
286 residential: false,
287 mobile: false,
288 pool_id: None,
289 continent: None,
290 country: None,
291 state: None,
292 city: None,
293 carrier: None,
294 asn: None,
295 },
296 ),
297 (
299 "id,true,false,true,,false,,true,false,true,authority,pool_id,,country,,city,carrier,,Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
300 Proxy {
301 id: NonEmptyString::from_static("id"),
302 address: ProxyAddress::from_str("username:password@authority").unwrap(),
303 tcp: true,
304 udp: false,
305 http: true,
306 https: false,
307 socks5: false,
308 socks5h: false,
309 datacenter: true,
310 residential: false,
311 mobile: true,
312 pool_id: Some("pool_id".into()),
313 continent: None,
314 country: Some("country".into()),
315 state: None,
316 city: Some("city".into()),
317 carrier: Some("carrier".into()),
318 asn: None,
319 },
320 ),
321 (
322 "123,1,0,False,,True,,null,false,true,host:1234,,americas,*,*,*,carrier,13335,",
323 Proxy {
324 id: NonEmptyString::from_static("123"),
325 address: ProxyAddress::from_str("host:1234").unwrap(),
326 tcp: true,
327 udp: false,
328 http: false,
329 https: false,
330 socks5: true,
331 socks5h: false,
332 datacenter: false,
333 residential: false,
334 mobile: true,
335 pool_id: None,
336 continent: Some("americas".into()),
337 country: Some("*".into()),
338 state: Some("*".into()),
339 city: Some("*".into()),
340 carrier: Some("carrier".into()),
341 asn: Some(Asn::from_static(13335)),
342 },
343 ),
344 (
345 "123,1,0,False,,True,,null,false,true,host:1234,,europe,*,,*,carrier,0",
346 Proxy {
347 id: NonEmptyString::from_static("123"),
348 address: ProxyAddress::from_str("host:1234").unwrap(),
349 tcp: true,
350 udp: false,
351 http: false,
352 https: false,
353 socks5: true,
354 socks5h: false,
355 datacenter: false,
356 residential: false,
357 mobile: true,
358 pool_id: None,
359 continent: Some("europe".into()),
360 country: Some("*".into()),
361 state: None,
362 city: Some("*".into()),
363 carrier: Some("carrier".into()),
364 asn: Some(Asn::unspecified()),
365 },
366 ),
367 (
368 "foo,1,0,1,,0,,1,0,0,bar,baz,,US,,,,",
369 Proxy {
370 id: NonEmptyString::from_static("foo"),
371 address: ProxyAddress::from_str("bar").unwrap(),
372 tcp: true,
373 udp: false,
374 http: true,
375 https: false,
376 socks5: false,
377 socks5h: false,
378 datacenter: true,
379 residential: false,
380 mobile: false,
381 pool_id: Some("baz".into()),
382 continent: None,
383 country: Some("us".into()),
384 state: None,
385 city: None,
386 carrier: None,
387 asn: None,
388 },
389 ),
390 ] {
391 let proxy = parse_csv_row(input).unwrap();
392 assert_eq!(proxy.id, output.id);
393 assert_eq!(proxy.address, output.address);
394 assert_eq!(proxy.tcp, output.tcp);
395 assert_eq!(proxy.udp, output.udp);
396 assert_eq!(proxy.http, output.http);
397 assert_eq!(proxy.socks5, output.socks5);
398 assert_eq!(proxy.datacenter, output.datacenter);
399 assert_eq!(proxy.residential, output.residential);
400 assert_eq!(proxy.mobile, output.mobile);
401 assert_eq!(proxy.pool_id, output.pool_id);
402 assert_eq!(proxy.continent, output.continent);
403 assert_eq!(proxy.country, output.country);
404 assert_eq!(proxy.state, output.state);
405 assert_eq!(proxy.city, output.city);
406 assert_eq!(proxy.carrier, output.carrier);
407 assert_eq!(proxy.asn, output.asn);
408 }
409 }
410
411 #[test]
412 fn test_parse_csv_row_mistakes() {
413 for input in [
414 "",
416 ",",
417 ",,,,,,",
418 ",,,,,,,,,,,,,,,,,,,,",
419 ",,,,,,,,,,,,,,,,,,,,,,",
420 ",,,,,,,,,,,,,,,,,,,,,,,",
421 "id,true,false,true,false,true,false,true,authority,pool_id,continent,country,state,city,carrier,15169,Basic dXNlcm5hbWU6cGFzc3dvcmQ=,",
423 "id,,,,,,,,,,,,,,,,",
425 ",,,,,,,,authority,,,,,,,,",
427 "id,foo,,,,,,,,,authority,,,,,,,,",
429 "id,,foo,,,,,,,,authority,,,,,,,,",
430 "id,,,foo,,,,,,,authority,,,,,,,,",
431 "id,,,,,foo,,,,,authority,,,,,,,,",
432 "id,,,,,,foo,,,,authority,,,,,,,,",
433 "id,,,,,,,,foo,,authority,,,,,,,,",
434 "id,,,,,,,foo,authority,,,,,,,,",
435 "id,,,,,,,,authority,,,,,:foo",
437 ] {
438 assert!(parse_csv_row(input).is_none(), "input: {}", input);
439 }
440 }
441
442 #[tokio::test]
443 async fn test_proxy_csv_row_reader_happy_one_row() {
444 let mut reader = ProxyCsvRowReader::raw(
445 "id,true,false,true,,false,,true,false,true,authority,pool_id,continent,country,state,city,carrier,13335,Basic dXNlcm5hbWU6cGFzc3dvcmQ=",
446 );
447 let proxy = reader.next().await.unwrap().unwrap();
448
449 assert_eq!(proxy.id, "id");
450 assert_eq!(
451 proxy.address,
452 ProxyAddress::from_str("username:password@authority").unwrap()
453 );
454 assert!(proxy.tcp);
455 assert!(!proxy.udp);
456 assert!(proxy.http);
457 assert!(!proxy.socks5);
458 assert!(proxy.datacenter);
459 assert!(!proxy.residential);
460 assert!(proxy.mobile);
461 assert_eq!(proxy.pool_id, Some("pool_id".into()));
462 assert_eq!(proxy.continent, Some("continent".into()));
463 assert_eq!(proxy.country, Some("country".into()));
464 assert_eq!(proxy.state, Some("state".into()));
465 assert_eq!(proxy.city, Some("city".into()));
466 assert_eq!(proxy.carrier, Some("carrier".into()));
467 assert_eq!(proxy.asn, Some(Asn::from_static(13335)));
468
469 assert!(reader.next().await.unwrap().is_none());
471 }
472
473 #[tokio::test]
474 async fn test_proxy_csv_row_reader_happy_multi_row() {
475 let mut reader = ProxyCsvRowReader::raw(
476 "id,true,false,false,true,true,false,true,false,true,authority,pool_id,continent,country,state,city,carrier,42,Basic dXNlcm5hbWU6cGFzc3dvcmQ=\nid2,1,0,0,0,0,0,1,0,0,authority2,pool_id2,continent2,country2,state2,city2,carrier2,1",
477 );
478
479 let proxy = reader.next().await.unwrap().unwrap();
480 assert_eq!(proxy.id, "id");
481 assert_eq!(
482 proxy.address,
483 ProxyAddress::from_str("username:password@authority").unwrap()
484 );
485 assert!(proxy.tcp);
486 assert!(!proxy.udp);
487 assert!(!proxy.http);
488 assert!(proxy.https);
489 assert!(proxy.socks5);
490 assert!(!proxy.socks5h);
491 assert!(proxy.datacenter);
492 assert!(!proxy.residential);
493 assert!(proxy.mobile);
494 assert_eq!(proxy.pool_id, Some("pool_id".into()));
495 assert_eq!(proxy.continent, Some("continent".into()));
496 assert_eq!(proxy.country, Some("country".into()));
497 assert_eq!(proxy.state, Some("state".into()));
498 assert_eq!(proxy.city, Some("city".into()));
499 assert_eq!(proxy.carrier, Some("carrier".into()));
500 assert_eq!(proxy.asn, Some(Asn::from_static(42)));
501
502 let proxy = reader.next().await.unwrap().unwrap();
503
504 assert_eq!(proxy.id, "id2");
505 assert_eq!(proxy.address, ProxyAddress::from_str("authority2").unwrap());
506 assert!(proxy.tcp);
507 assert!(!proxy.udp);
508 assert!(!proxy.http);
509 assert!(!proxy.https);
510 assert!(!proxy.socks5);
511 assert!(!proxy.socks5h);
512 assert!(proxy.datacenter);
513 assert!(!proxy.residential);
514 assert!(!proxy.mobile);
515 assert_eq!(proxy.pool_id, Some("pool_id2".into()));
516 assert_eq!(proxy.continent, Some("continent2".into()));
517 assert_eq!(proxy.country, Some("country2".into()));
518 assert_eq!(proxy.city, Some("city2".into()));
519 assert_eq!(proxy.state, Some("state2".into()));
520 assert_eq!(proxy.carrier, Some("carrier2".into()));
521 assert_eq!(proxy.asn, Some(Asn::from_static(1)));
522
523 assert!(reader.next().await.unwrap().is_none());
525 }
526
527 #[tokio::test]
528 async fn test_proxy_csv_row_reader_failure_empty_data() {
529 let mut reader = ProxyCsvRowReader::raw("");
530 assert!(reader.next().await.unwrap().is_none());
531 }
532
533 #[tokio::test]
534 async fn test_proxy_csv_row_reader_failure_invalid_row() {
535 let mut reader = ProxyCsvRowReader::raw(",,,,,,,,,,,");
536 assert!(reader.next().await.is_err());
537 }
538
539 #[test]
540 fn test_proxy_is_match_happy_path_proxy_with_any_filter_string_cases() {
541 let proxy = parse_csv_row("id,1,,1,,,,,,,authority,*,*,*,*,*,*,0").unwrap();
542 let ctx = ProxyContext {
543 protocol: TransportProtocol::Tcp,
544 };
545
546 for filter in [
547 ProxyFilter::default(),
548 ProxyFilter {
549 pool_id: Some(vec![StringFilter::new("pool_a")]),
550 country: Some(vec![StringFilter::new("country_a")]),
551 city: Some(vec![StringFilter::new("city_a")]),
552 carrier: Some(vec![StringFilter::new("carrier_a")]),
553 ..Default::default()
554 },
555 ProxyFilter {
556 pool_id: Some(vec![StringFilter::new("pool_a")]),
557 ..Default::default()
558 },
559 ProxyFilter {
560 continent: Some(vec![StringFilter::new("continent_a")]),
561 ..Default::default()
562 },
563 ProxyFilter {
564 country: Some(vec![StringFilter::new("country_a")]),
565 ..Default::default()
566 },
567 ProxyFilter {
568 state: Some(vec![StringFilter::new("state_a")]),
569 ..Default::default()
570 },
571 ProxyFilter {
572 city: Some(vec![StringFilter::new("city_a")]),
573 carrier: Some(vec![StringFilter::new("carrier_a")]),
574 ..Default::default()
575 },
576 ProxyFilter {
577 carrier: Some(vec![StringFilter::new("carrier_a")]),
578 ..Default::default()
579 },
580 ] {
581 assert!(proxy.is_match(&ctx, &filter), "filter: {:?}", filter);
582 }
583 }
584
585 #[test]
586 fn test_proxy_is_match_happy_path_proxy_with_any_filters_cases() {
587 let proxy =
588 parse_csv_row("id,1,,1,,,,,,,authority,pool,continent,country,state,city,carrier,42")
589 .unwrap();
590 let ctx = ProxyContext {
591 protocol: TransportProtocol::Tcp,
592 };
593
594 for filter in [
595 ProxyFilter::default(),
596 ProxyFilter {
597 pool_id: Some(vec![StringFilter::new("*")]),
598 ..Default::default()
599 },
600 ProxyFilter {
601 continent: Some(vec![StringFilter::new("*")]),
602 ..Default::default()
603 },
604 ProxyFilter {
605 country: Some(vec![StringFilter::new("*")]),
606 ..Default::default()
607 },
608 ProxyFilter {
609 state: Some(vec![StringFilter::new("*")]),
610 ..Default::default()
611 },
612 ProxyFilter {
613 city: Some(vec![StringFilter::new("*")]),
614 ..Default::default()
615 },
616 ProxyFilter {
617 carrier: Some(vec![StringFilter::new("*")]),
618 ..Default::default()
619 },
620 ProxyFilter {
621 pool_id: Some(vec![StringFilter::new("pool")]),
622 continent: Some(vec![StringFilter::new("continent")]),
623 country: Some(vec![StringFilter::new("country")]),
624 state: Some(vec![StringFilter::new("state")]),
625 city: Some(vec![StringFilter::new("city")]),
626 carrier: Some(vec![StringFilter::new("carrier")]),
627 asn: Some(vec![Asn::from_static(42)]),
628 ..Default::default()
629 },
630 ProxyFilter {
631 pool_id: Some(vec![StringFilter::new("*")]),
632 country: Some(vec![StringFilter::new("country")]),
633 city: Some(vec![StringFilter::new("city")]),
634 carrier: Some(vec![StringFilter::new("carrier")]),
635 ..Default::default()
636 },
637 ProxyFilter {
638 pool_id: Some(vec![StringFilter::new("pool")]),
639 country: Some(vec![StringFilter::new("*")]),
640 city: Some(vec![StringFilter::new("city")]),
641 carrier: Some(vec![StringFilter::new("carrier")]),
642 ..Default::default()
643 },
644 ProxyFilter {
645 pool_id: Some(vec![StringFilter::new("pool")]),
646 country: Some(vec![StringFilter::new("country")]),
647 city: Some(vec![StringFilter::new("*")]),
648 carrier: Some(vec![StringFilter::new("carrier")]),
649 ..Default::default()
650 },
651 ProxyFilter {
652 pool_id: Some(vec![StringFilter::new("pool")]),
653 country: Some(vec![StringFilter::new("country")]),
654 city: Some(vec![StringFilter::new("city")]),
655 carrier: Some(vec![StringFilter::new("*")]),
656 ..Default::default()
657 },
658 ProxyFilter {
659 continent: Some(vec![StringFilter::new("*")]),
660 ..Default::default()
661 },
662 ] {
663 assert!(proxy.is_match(&ctx, &filter), "filter: {:?}", filter);
664 }
665 }
666}