Skip to main content

db_dump/
load.rs

1use crate::error::{err, Result};
2use crate::DbDump;
3use csv::StringRecord;
4use flate2::read::GzDecoder;
5use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
6use memmap::Mmap;
7use serde::de::DeserializeOwned;
8use std::borrow::Cow;
9use std::fs::File;
10use std::io::Read;
11use std::path::Path;
12use tar::Archive;
13
14/// Perform a streaming load of only relevant database tables.
15///
16/// # Example
17///
18/// This example loads just the version_downloads.csv table, in which each row
19/// is the download count for a single version of a single crate on a single
20/// day. We do not store the rows individually in memory but instead stream from
21/// the csv to accumulate just a total count per day across all crates, which
22/// requires far less memory.
23///
24/// ```no_run
25/// use chrono::Utc;
26/// use db_dump::Date;
27/// use std::collections::BTreeMap as Map;
28///
29/// fn main() -> db_dump::Result<()> {
30///     let mut downloads = Map::<Date<Utc>, u64>::new();
31///     db_dump::Loader::new()
32///         .version_downloads(|row| {
33///             *downloads.entry(row.date).or_default() += row.downloads;
34///         })
35///         .load("./db-dump.tar.gz")?;
36///
37///     for (date, count) in downloads {
38///         println!("{},{}", date, count);
39///     }
40///
41///     Ok(())
42/// }
43/// ```
44#[derive(#[automatically_derived]
impl<'a> ::core::default::Default for Loader<'a> {
    #[inline]
    fn default() -> Loader<'a> {
        Loader {
            categories: ::core::default::Default::default(),
            crate_downloads: ::core::default::Default::default(),
            crate_owners: ::core::default::Default::default(),
            crates: ::core::default::Default::default(),
            crates_categories: ::core::default::Default::default(),
            crates_keywords: ::core::default::Default::default(),
            default_versions: ::core::default::Default::default(),
            deleted_crates: ::core::default::Default::default(),
            dependencies: ::core::default::Default::default(),
            keywords: ::core::default::Default::default(),
            metadata: ::core::default::Default::default(),
            reserved_crate_names: ::core::default::Default::default(),
            teams: ::core::default::Default::default(),
            users: ::core::default::Default::default(),
            version_downloads: ::core::default::Default::default(),
            versions: ::core::default::Default::default(),
        }
    }
}Default)]
45pub struct Loader<'a> {
46    categories: Option<Callback<'a, crate::categories::Row>>,
47    crate_downloads: Option<Callback<'a, crate::crate_downloads::Row>>,
48    crate_owners: Option<Callback<'a, crate::crate_owners::Row>>,
49    crates: Option<Callback<'a, crate::crates::Row>>,
50    crates_categories: Option<Callback<'a, crate::crates_categories::Row>>,
51    crates_keywords: Option<Callback<'a, crate::crates_keywords::Row>>,
52    default_versions: Option<Callback<'a, crate::default_versions::Row>>,
53    deleted_crates: Option<Callback<'a, crate::deleted_crates::Row>>,
54    dependencies: Option<Callback<'a, crate::dependencies::Row>>,
55    keywords: Option<Callback<'a, crate::keywords::Row>>,
56    metadata: Option<Callback<'a, crate::metadata::Row>>,
57    reserved_crate_names: Option<Callback<'a, crate::reserved_crate_names::Row>>,
58    teams: Option<Callback<'a, crate::teams::Row>>,
59    users: Option<Callback<'a, crate::users::Row>>,
60    version_downloads: Option<Callback<'a, crate::version_downloads::Row>>,
61    versions: Option<Callback<'a, crate::versions::Row>>,
62}
63
64struct Callback<'a, T> {
65    f: Box<dyn FnMut(T) + 'a>,
66    done: bool,
67}
68
69impl<'a> Loader<'a> {
70    pub fn new() -> Self {
71        Loader::default()
72    }
73
74    pub fn categories(&mut self, f: impl FnMut(crate::categories::Row) + 'a) -> &mut Self {
75        self.categories = Some(Callback::new(f));
76        self
77    }
78
79    pub fn crate_downloads(
80        &mut self,
81        f: impl FnMut(crate::crate_downloads::Row) + 'a,
82    ) -> &mut Self {
83        self.crate_downloads = Some(Callback::new(f));
84        self
85    }
86
87    pub fn crate_owners(&mut self, f: impl FnMut(crate::crate_owners::Row) + 'a) -> &mut Self {
88        self.crate_owners = Some(Callback::new(f));
89        self
90    }
91
92    pub fn crates(&mut self, f: impl FnMut(crate::crates::Row) + 'a) -> &mut Self {
93        self.crates = Some(Callback::new(f));
94        self
95    }
96
97    pub fn crates_categories(
98        &mut self,
99        f: impl FnMut(crate::crates_categories::Row) + 'a,
100    ) -> &mut Self {
101        self.crates_categories = Some(Callback::new(f));
102        self
103    }
104
105    pub fn crates_keywords(
106        &mut self,
107        f: impl FnMut(crate::crates_keywords::Row) + 'a,
108    ) -> &mut Self {
109        self.crates_keywords = Some(Callback::new(f));
110        self
111    }
112
113    pub fn default_versions(
114        &mut self,
115        f: impl FnMut(crate::default_versions::Row) + 'a,
116    ) -> &mut Self {
117        self.default_versions = Some(Callback::new(f));
118        self
119    }
120
121    pub fn deleted_crates(&mut self, f: impl FnMut(crate::deleted_crates::Row) + 'a) -> &mut Self {
122        self.deleted_crates = Some(Callback::new(f));
123        self
124    }
125
126    pub fn dependencies(&mut self, f: impl FnMut(crate::dependencies::Row) + 'a) -> &mut Self {
127        self.dependencies = Some(Callback::new(f));
128        self
129    }
130
131    pub fn keywords(&mut self, f: impl FnMut(crate::keywords::Row) + 'a) -> &mut Self {
132        self.keywords = Some(Callback::new(f));
133        self
134    }
135
136    pub fn metadata(&mut self, f: impl FnMut(crate::metadata::Row) + 'a) -> &mut Self {
137        self.metadata = Some(Callback::new(f));
138        self
139    }
140
141    pub fn reserved_crate_names(
142        &mut self,
143        f: impl FnMut(crate::reserved_crate_names::Row) + 'a,
144    ) -> &mut Self {
145        self.reserved_crate_names = Some(Callback::new(f));
146        self
147    }
148
149    pub fn teams(&mut self, f: impl FnMut(crate::teams::Row) + 'a) -> &mut Self {
150        self.teams = Some(Callback::new(f));
151        self
152    }
153
154    pub fn users(&mut self, f: impl FnMut(crate::users::Row) + 'a) -> &mut Self {
155        self.users = Some(Callback::new(f));
156        self
157    }
158
159    pub fn version_downloads(
160        &mut self,
161        f: impl FnMut(crate::version_downloads::Row) + 'a,
162    ) -> &mut Self {
163        self.version_downloads = Some(Callback::new(f));
164        self
165    }
166
167    pub fn versions(&mut self, f: impl FnMut(crate::versions::Row) + 'a) -> &mut Self {
168        self.versions = Some(Callback::new(f));
169        self
170    }
171
172    pub fn load(&mut self, path: impl AsRef<Path>) -> Result<()> {
173        do_load(path.as_ref(), self)
174    }
175}
176
177impl<'a, T> Callback<'a, T> {
178    fn new(f: impl FnMut(T) + 'a) -> Self {
179        Callback {
180            f: Box::new(f),
181            done: false,
182        }
183    }
184
185    fn done(&self) -> bool {
186        self.done
187    }
188}
189
190fn do_load(path: &Path, loader: &mut Loader) -> Result<()> {
191    let file = File::open(path)?;
192    let mmap = unsafe { Mmap::map(&file) }?;
193
194    let pb = ProgressBar::hidden();
195    pb.set_length(mmap.len() as u64);
196    pb.set_style(
197        ProgressStyle::default_bar()
198            .template("[{wide_bar:.cyan/blue}] {percent}% {msg:>24}")
199            .unwrap()
200            .progress_chars(". "),
201    );
202    pb.set_draw_target(ProgressDrawTarget::stderr());
203    let input = pb.wrap_read(&*mmap);
204
205    let mut archive = Archive::new(GzDecoder::new(input));
206    for entry in archive.entries()? {
207        #[deny(unused_variables)]
208        let Loader {
209            categories,
210            crate_downloads,
211            crate_owners,
212            crates,
213            crates_categories,
214            crates_keywords,
215            default_versions,
216            deleted_crates,
217            dependencies,
218            keywords,
219            metadata,
220            reserved_crate_names,
221            teams,
222            users,
223            version_downloads,
224            versions,
225        } = loader;
226
227        if categories.as_ref().map_or(true, Callback::done)
228            && crate_downloads.as_ref().map_or(true, Callback::done)
229            && crate_owners.as_ref().map_or(true, Callback::done)
230            && crates.as_ref().map_or(true, Callback::done)
231            && crates_categories.as_ref().map_or(true, Callback::done)
232            && crates_keywords.as_ref().map_or(true, Callback::done)
233            && default_versions.as_ref().map_or(true, Callback::done)
234            && deleted_crates.as_ref().map_or(true, Callback::done)
235            && dependencies.as_ref().map_or(true, Callback::done)
236            && keywords.as_ref().map_or(true, Callback::done)
237            && metadata.as_ref().map_or(true, Callback::done)
238            && reserved_crate_names.as_ref().map_or(true, Callback::done)
239            && teams.as_ref().map_or(true, Callback::done)
240            && users.as_ref().map_or(true, Callback::done)
241            && version_downloads.as_ref().map_or(true, Callback::done)
242            && versions.as_ref().map_or(true, Callback::done)
243        {
244            break;
245        }
246
247        let entry = entry?;
248        let path = entry.path()?;
249        if path.extension().map_or(true, |ext| ext != "csv") {
250            continue;
251        }
252
253        pb.set_message(match path.file_name() {
254            Some(file_name) => Cow::Owned(file_name.to_string_lossy().into_owned()),
255            None => Cow::Borrowed(""),
256        });
257
258        #[deny(unused_variables)]
259        let Loader {
260            categories,
261            crate_downloads,
262            crate_owners,
263            crates,
264            crates_categories,
265            crates_keywords,
266            default_versions,
267            deleted_crates,
268            dependencies,
269            keywords,
270            metadata,
271            reserved_crate_names,
272            teams,
273            users,
274            version_downloads,
275            versions,
276        } = loader;
277
278        let (path, result) = if path.ends_with("badges.csv") {
279            continue; // https://github.com/rust-lang/crates.io/pull/8155
280        } else if path.ends_with("categories.csv") {
281            ("categories", read(categories, entry))
282        } else if path.ends_with("crate_downloads.csv") {
283            ("crate_downloads", read(crate_downloads, entry))
284        } else if path.ends_with("crate_owners.csv") {
285            ("crate_owners", read(crate_owners, entry))
286        } else if path.ends_with("crates.csv") {
287            ("crates", read(crates, entry))
288        } else if path.ends_with("crates_categories.csv") {
289            ("crates_categories", read(crates_categories, entry))
290        } else if path.ends_with("crates_keywords.csv") {
291            ("crates_keywords", read(crates_keywords, entry))
292        } else if path.ends_with("default_versions.csv") {
293            ("default_versions", read(default_versions, entry))
294        } else if path.ends_with("deleted_crates.csv") {
295            ("deleted_crates", read(deleted_crates, entry))
296        } else if path.ends_with("dependencies.csv") {
297            ("dependencies", read(dependencies, entry))
298        } else if path.ends_with("keywords.csv") {
299            ("keywords", read(keywords, entry))
300        } else if path.ends_with("metadata.csv") {
301            ("metadata", read(metadata, entry))
302        } else if path.ends_with("reserved_crate_names.csv") {
303            ("reserved_crate_names", read(reserved_crate_names, entry))
304        } else if path.ends_with("teams.csv") {
305            ("teams", read(teams, entry))
306        } else if path.ends_with("users.csv") {
307            ("users", read(users, entry))
308        } else if path.ends_with("version_authors.csv") {
309            continue; // https://github.com/rust-lang/crates.io/pull/3549
310        } else if path.ends_with("version_downloads.csv") {
311            ("version_downloads", read(version_downloads, entry))
312        } else if path.ends_with("versions.csv") {
313            ("versions", read(versions, entry))
314        } else {
315            if falsecfg!(db_dump_panic_on_unrecognized_csv) {
316                {
    ::core::panicking::panic_fmt(format_args!("unimplemented: {0}",
            path.display()));
};panic!("unimplemented: {}", path.display());
317            } else {
318                { ::std::io::_eprint(format_args!("unimplemented: {0}\n", path.display())); };eprintln!("unimplemented: {}", path.display());
319            }
320            continue;
321        };
322
323        if let Err(mut err) = result {
324            err.e.path = Some(Path::new(path));
325            return Err(err);
326        }
327    }
328
329    Ok(())
330}
331
332pub(crate) trait FromRecord: Sized {
333    fn from_record(record: &StringRecord, headers: &StringRecord) -> Result<Self>;
334}
335
336impl<T> FromRecord for T
337where
338    T: DeserializeOwned,
339{
340    fn from_record(record: &StringRecord, headers: &StringRecord) -> Result<Self> {
341        record.deserialize(Some(headers)).map_err(err)
342    }
343}
344
345fn read<T>(loader: &mut Option<Callback<T>>, entry: impl Read) -> Result<()>
346where
347    T: FromRecord,
348{
349    if let Some(loader) = loader {
350        let mut csv = csv::Reader::from_reader(entry);
351        let headers = csv.headers().map_err(err)?.clone();
352        let mut record = StringRecord::new();
353        while csv.read_record(&mut record).map_err(err)? {
354            let record = T::from_record(&record, &headers)?;
355            (loader.f)(record);
356        }
357        loader.done = true;
358    }
359    Ok(())
360}
361
362/// Deserialize *everything* in a crates.io DB dump into memory.
363///
364/// This function is equivalent to the following [`Loader`]-based invocation:
365///
366/// ```
367/// # use std::path::Path;
368/// # use db_dump::Result;
369/// #
370/// # struct DbDump {
371/// #     categories: Vec<db_dump::categories::Row>,
372/// #     crate_owners: Vec<db_dump::crate_owners::Row>,
373/// #     versions: Vec<db_dump::versions::Row>,
374/// # }
375/// #
376/// # pub fn load_all(path: impl AsRef<Path>) -> Result<DbDump> {
377/// #     let path = path.as_ref();
378/// let mut categories = Vec::new();
379/// let mut crate_owners = Vec::new();
380/// /* ... */
381/// let mut versions = Vec::new();
382///
383/// db_dump::Loader::new()
384///     .categories(|row| categories.push(row))
385///     .crate_owners(|row| crate_owners.push(row))
386///     /* ... */
387///     .versions(|row| versions.push(row))
388///     .load(path)?;
389///
390/// Ok(DbDump {
391///     categories,
392///     crate_owners,
393///     /* ... */
394///     versions,
395/// })
396/// # }
397/// ```
398///
399/// Usually whatever you are doing will not require *all* of the information in
400/// a dump, in which case utilizing `Loader` to load just what you need can be
401/// significantly more efficient.
402pub fn load_all(path: impl AsRef<Path>) -> Result<DbDump> {
403    do_load_all(path.as_ref())
404}
405
406fn do_load_all(path: &Path) -> Result<DbDump> {
407    let mut categories = Vec::new();
408    let mut crate_downloads = Vec::new();
409    let mut crate_owners = Vec::new();
410    let mut crates = Vec::new();
411    let mut crates_categories = Vec::new();
412    let mut crates_keywords = Vec::new();
413    let mut default_versions = Vec::new();
414    let mut deleted_crates = Vec::new();
415    let mut dependencies = Vec::new();
416    let mut keywords = Vec::new();
417    let mut metadata = crate::metadata::Row { total_downloads: 0 };
418    let mut reserved_crate_names = Vec::new();
419    let mut teams = Vec::new();
420    let mut users = Vec::new();
421    let mut version_downloads = Vec::new();
422    let mut versions = Vec::new();
423
424    let mut loader = Loader {
425        categories: Some(Callback::new(|row| categories.push(row))),
426        crate_downloads: Some(Callback::new(|row| crate_downloads.push(row))),
427        crate_owners: Some(Callback::new(|row| crate_owners.push(row))),
428        crates: Some(Callback::new(|row| crates.push(row))),
429        crates_categories: Some(Callback::new(|row| crates_categories.push(row))),
430        crates_keywords: Some(Callback::new(|row| crates_keywords.push(row))),
431        default_versions: Some(Callback::new(|row| default_versions.push(row))),
432        deleted_crates: Some(Callback::new(|row| deleted_crates.push(row))),
433        dependencies: Some(Callback::new(|row| dependencies.push(row))),
434        keywords: Some(Callback::new(|row| keywords.push(row))),
435        metadata: Some(Callback::new(|row| metadata = row)),
436        reserved_crate_names: Some(Callback::new(|row| reserved_crate_names.push(row))),
437        teams: Some(Callback::new(|row| teams.push(row))),
438        users: Some(Callback::new(|row| users.push(row))),
439        version_downloads: Some(Callback::new(|row| version_downloads.push(row))),
440        versions: Some(Callback::new(|row| versions.push(row))),
441    };
442
443    loader.load(path)?;
444    drop(loader);
445
446    Ok(DbDump {
447        categories,
448        crate_downloads,
449        crate_owners,
450        crates,
451        crates_categories,
452        crates_keywords,
453        default_versions,
454        deleted_crates,
455        dependencies,
456        keywords,
457        metadata,
458        reserved_crate_names,
459        teams,
460        users,
461        version_downloads,
462        versions,
463    })
464}