use crate::crawler::common::{
crawl_months_list, extract_link_size, fetch_body, get_crawler_month_concurrency,
remove_trailing_slash,
};
use crate::crawler::Collector;
use crate::{BrokerError, BrokerItem};
use chrono::{NaiveDate, NaiveDateTime};
use futures::stream::StreamExt;
use regex::Regex;
use tracing::debug;
pub async fn crawl_ripe_ris(
collector: &Collector,
from_ts: Option<NaiveDate>,
) -> Result<Vec<BrokerItem>, BrokerError> {
let collector_url = remove_trailing_slash(collector.url.as_str());
let months_to_crawl = crawl_months_list(collector_url.as_str(), from_ts).await?;
let mut stream = futures::stream::iter(months_to_crawl.into_iter().map(|month| {
let url = format!("{}/{}", collector_url.as_str(), month.format("%Y.%m/"));
crawl_month(url, collector.id.clone())
}))
.buffer_unordered(get_crawler_month_concurrency());
let mut res = vec![];
while let Some(result) = stream.next().await {
let items = result?;
res.extend(items);
}
Ok(res)
}
async fn crawl_month(url: String, collector_id: String) -> Result<Vec<BrokerItem>, BrokerError> {
let url = remove_trailing_slash(url.as_str());
debug!("crawling data for {} ...", url.as_str());
let body = fetch_body(url.as_str()).await?;
debug!(" download for {} finished ", url.as_str());
let new_url = url.to_string();
let data_items: Vec<BrokerItem> = tokio::task::spawn_blocking(move || {
let items = extract_link_size(body.as_str());
items
.iter()
.filter_map(|(link, size)| {
let url = match url.as_str().contains("https") {
true => format!("{}/{}", url, link),
false => format!("{}/{}", url, link).replace("http", "https"),
};
let updates_link_pattern: Regex =
Regex::new(r".*(........\.....)\.gz.*").expect("invalid regex pattern");
let time_str = updates_link_pattern.captures(&url)?.get(1)?.as_str();
let unix_time = NaiveDateTime::parse_from_str(time_str, "%Y%m%d.%H%M").ok()?;
match link.contains("update") {
true => Some(BrokerItem {
ts_start: unix_time,
ts_end: unix_time + chrono::Duration::seconds(5 * 60),
url: url.clone(),
rough_size: *size,
collector_id: collector_id.clone(),
data_type: "updates".to_string(),
exact_size: 0,
}),
false => Some(BrokerItem {
ts_start: unix_time,
ts_end: unix_time,
url: url.clone(),
rough_size: *size,
collector_id: collector_id.clone(),
data_type: "rib".to_string(),
exact_size: 0,
}),
}
})
.collect()
})
.await
.expect("blocking task panicked");
debug!("crawling data for {} ... finished", &new_url);
Ok(data_items)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
#[tokio::test]
async fn test_crawl_ripe_ris() {
tracing_subscriber::fmt::init();
let collector = Collector {
id: "rrc00".to_string(),
project: "riperis".to_string(),
url: "https://data.ris.ripe.net/rrc00/".to_string(),
};
let two_months_ago = Utc::now().date_naive() - chrono::Duration::days(60);
let _items = crawl_ripe_ris(&collector, Some(two_months_ago))
.await
.unwrap();
let _after_date = NaiveDate::from_ymd_opt(2023, 5, 3)
.unwrap()
.and_hms_opt(0, 0, 0)
.unwrap();
}
#[tokio::test]
async fn test_crawl_months() {
let months = crawl_months_list("https://data.ris.ripe.net/rrc00/", None)
.await
.unwrap();
dbg!(months);
let current_month = crawl_months_list(
"https://data.ris.ripe.net/rrc00/",
Some(Utc::now().date_naive()),
)
.await
.unwrap();
assert_eq!(current_month.len(), 1);
}
#[tokio::test]
async fn test_crawl_month() {
let items = crawl_month(
"https://data.ris.ripe.net/rrc00/2008.09/".to_string(),
"rrc00".to_string(),
)
.await
.unwrap();
for item in items {
println!("{}", item.url);
}
}
}