crate_activity/
crate_activity_data.rs

1crate::ix!();
2
3#[derive(Debug, Getters)]
4pub struct CrateActivityData {
5
6    #[getset(get = "pub")]
7    summaries: Vec<CrateUsageSummary>,
8    
9    #[getset(get = "pub")]
10    interval_downloads_1d: HashMap<String, i64>,
11    
12    #[getset(get = "pub")]
13    interval_downloads_3d: HashMap<String, i64>,
14
15    #[getset(get = "pub")]
16    interval_downloads_7d: HashMap<String, i64>,
17}
18
19pub async fn gather_crate_activity_data(
20    ignore_cache:   bool,
21    crate_names:    &[String],
22    user_agent:     &str,
23    config_dir:     &Path,
24    one_day_ago:    NaiveDate,
25    three_days_ago: NaiveDate,
26    seven_days_ago: NaiveDate,
27) -> Result<CrateActivityData, CrateActivityError> {
28    use futures::{StreamExt};
29
30    println!(
31        "Gathering crate activity data for {} crates (ignore_cache={})",
32        crate_names.len(),
33        ignore_cache
34    );
35
36    // We'll limit concurrency to avoid overwhelming crates.io.
37    let concurrency_limit = 8usize;
38
39    // Create a stream of futures (one for each crate).
40    let crate_fetches = futures::stream::iter(crate_names.iter().map(|crate_name| {
41        let crate_name = crate_name.clone();
42        let ua = user_agent.to_string();
43        let cfg_dir = config_dir.to_path_buf();
44        async move {
45            debug!("Fetching usage for crate '{}'", crate_name);
46            match fetch_usage(ignore_cache, &ua, &cfg_dir, &crate_name).await {
47                Ok(Some(response)) => {
48                    debug!("Successfully fetched usage for crate '{}'", crate_name);
49                    Some((crate_name, response))
50                },
51                Ok(None) => {
52                    warn!("No data for crate '{}'", crate_name);
53                    None
54                },
55                Err(e) => {
56                    error!("Failed to fetch data for '{}': {:?}", crate_name, e);
57                    None
58                }
59            }
60        }
61    }))
62    .buffer_unordered(concurrency_limit);
63
64    // Collect all results in parallel.
65    let results: Vec<Option<(String, CrateResponse)>> = crate_fetches.collect().await;
66
67    let mut summaries             = Vec::new();
68    let mut interval_downloads_1d = HashMap::new();
69    let mut interval_downloads_3d = HashMap::new();
70    let mut interval_downloads_7d = HashMap::new();
71
72    // Process the completed fetches.
73    for item in results {
74        if let Some((crate_name, response)) = item {
75            let summary = analyze_usage(&crate_name, response.version_downloads().to_vec());
76            summaries.push(summary);
77
78            let downloads_last_1d: i64 = response
79                .version_downloads()
80                .iter()
81                .filter(|d| *d.date() >= one_day_ago)
82                .map(|d| d.downloads())
83                .sum();
84
85            let downloads_last_3d: i64 = response
86                .version_downloads()
87                .iter()
88                .filter(|d| *d.date() >= three_days_ago)
89                .map(|d| d.downloads())
90                .sum();
91
92            let downloads_last_7d: i64 = response
93                .version_downloads()
94                .iter()
95                .filter(|d| *d.date() >= seven_days_ago)
96                .map(|d| d.downloads())
97                .sum();
98
99            interval_downloads_1d.insert(crate_name.clone(), downloads_last_1d);
100            interval_downloads_3d.insert(crate_name.clone(), downloads_last_3d);
101            interval_downloads_7d.insert(crate_name.clone(), downloads_last_7d);
102        }
103    }
104
105    println!(
106        "Collected activity data for {} crates (out of {} requested).",
107        summaries.len(),
108        crate_names.len()
109    );
110
111    Ok(CrateActivityData {
112        summaries,
113        interval_downloads_1d,
114        interval_downloads_3d,
115        interval_downloads_7d,
116    })
117}