db_dump/
load.rs

1use crate::error::{err, Result};
2use crate::DbDump;
3use csv::StringRecord;
4use flate2::read::GzDecoder;
5use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
6use memmap::Mmap;
7use serde::de::DeserializeOwned;
8use std::borrow::Cow;
9use std::fs::File;
10use std::io::Read;
11use std::path::Path;
12use tar::Archive;
13
14/// Perform a streaming load of only relevant database tables.
15///
16/// # Example
17///
18/// This example loads just the version_downloads.csv table, in which each row
19/// is the download count for a single version of a single crate on a single
20/// day. We do not store the rows individually in memory but instead stream from
21/// the csv to accumulate just a total count per day across all crates, which
22/// requires far less memory.
23///
24/// ```no_run
25/// use chrono::Utc;
26/// use db_dump::Date;
27/// use std::collections::BTreeMap as Map;
28///
29/// fn main() -> db_dump::Result<()> {
30///     let mut downloads = Map::<Date<Utc>, u64>::new();
31///     db_dump::Loader::new()
32///         .version_downloads(|row| {
33///             *downloads.entry(row.date).or_default() += row.downloads;
34///         })
35///         .load("./db-dump.tar.gz")?;
36///
37///     for (date, count) in downloads {
38///         println!("{},{}", date, count);
39///     }
40///
41///     Ok(())
42/// }
43/// ```
44#[derive(Default)]
45pub struct Loader<'a> {
46    categories: Option<Callback<'a, crate::categories::Row>>,
47    crate_downloads: Option<Callback<'a, crate::crate_downloads::Row>>,
48    crate_owners: Option<Callback<'a, crate::crate_owners::Row>>,
49    crates: Option<Callback<'a, crate::crates::Row>>,
50    crates_categories: Option<Callback<'a, crate::crates_categories::Row>>,
51    crates_keywords: Option<Callback<'a, crate::crates_keywords::Row>>,
52    default_versions: Option<Callback<'a, crate::default_versions::Row>>,
53    dependencies: Option<Callback<'a, crate::dependencies::Row>>,
54    keywords: Option<Callback<'a, crate::keywords::Row>>,
55    metadata: Option<Callback<'a, crate::metadata::Row>>,
56    reserved_crate_names: Option<Callback<'a, crate::reserved_crate_names::Row>>,
57    teams: Option<Callback<'a, crate::teams::Row>>,
58    users: Option<Callback<'a, crate::users::Row>>,
59    version_downloads: Option<Callback<'a, crate::version_downloads::Row>>,
60    versions: Option<Callback<'a, crate::versions::Row>>,
61}
62
63struct Callback<'a, T> {
64    f: Box<dyn FnMut(T) + 'a>,
65    done: bool,
66}
67
68impl<'a> Loader<'a> {
69    pub fn new() -> Self {
70        Loader::default()
71    }
72
73    pub fn categories(&mut self, f: impl FnMut(crate::categories::Row) + 'a) -> &mut Self {
74        self.categories = Some(Callback::new(f));
75        self
76    }
77
78    pub fn crate_downloads(
79        &mut self,
80        f: impl FnMut(crate::crate_downloads::Row) + 'a,
81    ) -> &mut Self {
82        self.crate_downloads = Some(Callback::new(f));
83        self
84    }
85
86    pub fn crate_owners(&mut self, f: impl FnMut(crate::crate_owners::Row) + 'a) -> &mut Self {
87        self.crate_owners = Some(Callback::new(f));
88        self
89    }
90
91    pub fn crates(&mut self, f: impl FnMut(crate::crates::Row) + 'a) -> &mut Self {
92        self.crates = Some(Callback::new(f));
93        self
94    }
95
96    pub fn crates_categories(
97        &mut self,
98        f: impl FnMut(crate::crates_categories::Row) + 'a,
99    ) -> &mut Self {
100        self.crates_categories = Some(Callback::new(f));
101        self
102    }
103
104    pub fn crates_keywords(
105        &mut self,
106        f: impl FnMut(crate::crates_keywords::Row) + 'a,
107    ) -> &mut Self {
108        self.crates_keywords = Some(Callback::new(f));
109        self
110    }
111
112    pub fn default_versions(
113        &mut self,
114        f: impl FnMut(crate::default_versions::Row) + 'a,
115    ) -> &mut Self {
116        self.default_versions = Some(Callback::new(f));
117        self
118    }
119
120    pub fn dependencies(&mut self, f: impl FnMut(crate::dependencies::Row) + 'a) -> &mut Self {
121        self.dependencies = Some(Callback::new(f));
122        self
123    }
124
125    pub fn keywords(&mut self, f: impl FnMut(crate::keywords::Row) + 'a) -> &mut Self {
126        self.keywords = Some(Callback::new(f));
127        self
128    }
129
130    pub fn metadata(&mut self, f: impl FnMut(crate::metadata::Row) + 'a) -> &mut Self {
131        self.metadata = Some(Callback::new(f));
132        self
133    }
134
135    pub fn reserved_crate_names(
136        &mut self,
137        f: impl FnMut(crate::reserved_crate_names::Row) + 'a,
138    ) -> &mut Self {
139        self.reserved_crate_names = Some(Callback::new(f));
140        self
141    }
142
143    pub fn teams(&mut self, f: impl FnMut(crate::teams::Row) + 'a) -> &mut Self {
144        self.teams = Some(Callback::new(f));
145        self
146    }
147
148    pub fn users(&mut self, f: impl FnMut(crate::users::Row) + 'a) -> &mut Self {
149        self.users = Some(Callback::new(f));
150        self
151    }
152
153    pub fn version_downloads(
154        &mut self,
155        f: impl FnMut(crate::version_downloads::Row) + 'a,
156    ) -> &mut Self {
157        self.version_downloads = Some(Callback::new(f));
158        self
159    }
160
161    pub fn versions(&mut self, f: impl FnMut(crate::versions::Row) + 'a) -> &mut Self {
162        self.versions = Some(Callback::new(f));
163        self
164    }
165
166    pub fn load(&mut self, path: impl AsRef<Path>) -> Result<()> {
167        do_load(path.as_ref(), self)
168    }
169}
170
171impl<'a, T> Callback<'a, T> {
172    fn new(f: impl FnMut(T) + 'a) -> Self {
173        Callback {
174            f: Box::new(f),
175            done: false,
176        }
177    }
178
179    fn done(&self) -> bool {
180        self.done
181    }
182}
183
184fn do_load(path: &Path, loader: &mut Loader) -> Result<()> {
185    let file = File::open(path)?;
186    let mmap = unsafe { Mmap::map(&file) }?;
187
188    let pb = ProgressBar::hidden();
189    pb.set_length(mmap.len() as u64);
190    pb.set_style(
191        ProgressStyle::default_bar()
192            .template("[{wide_bar:.cyan/blue}] {percent}% {msg:>24}")
193            .unwrap()
194            .progress_chars(". "),
195    );
196    pb.set_draw_target(ProgressDrawTarget::stderr());
197    let input = pb.wrap_read(&*mmap);
198
199    let mut archive = Archive::new(GzDecoder::new(input));
200    for entry in archive.entries()? {
201        #[deny(unused_variables)]
202        let Loader {
203            categories,
204            crate_downloads,
205            crate_owners,
206            crates,
207            crates_categories,
208            crates_keywords,
209            default_versions,
210            dependencies,
211            keywords,
212            metadata,
213            reserved_crate_names,
214            teams,
215            users,
216            version_downloads,
217            versions,
218        } = loader;
219
220        if categories.as_ref().map_or(true, Callback::done)
221            && crate_downloads.as_ref().map_or(true, Callback::done)
222            && crate_owners.as_ref().map_or(true, Callback::done)
223            && crates.as_ref().map_or(true, Callback::done)
224            && crates_categories.as_ref().map_or(true, Callback::done)
225            && crates_keywords.as_ref().map_or(true, Callback::done)
226            && default_versions.as_ref().map_or(true, Callback::done)
227            && dependencies.as_ref().map_or(true, Callback::done)
228            && keywords.as_ref().map_or(true, Callback::done)
229            && metadata.as_ref().map_or(true, Callback::done)
230            && reserved_crate_names.as_ref().map_or(true, Callback::done)
231            && teams.as_ref().map_or(true, Callback::done)
232            && users.as_ref().map_or(true, Callback::done)
233            && version_downloads.as_ref().map_or(true, Callback::done)
234            && versions.as_ref().map_or(true, Callback::done)
235        {
236            break;
237        }
238
239        let entry = entry?;
240        let path = entry.path()?;
241        if path.extension().map_or(true, |ext| ext != "csv") {
242            continue;
243        }
244
245        pb.set_message(match path.file_name() {
246            Some(file_name) => Cow::Owned(file_name.to_string_lossy().into_owned()),
247            None => Cow::Borrowed(""),
248        });
249
250        #[deny(unused_variables)]
251        let Loader {
252            categories,
253            crate_downloads,
254            crate_owners,
255            crates,
256            crates_categories,
257            crates_keywords,
258            default_versions,
259            dependencies,
260            keywords,
261            metadata,
262            reserved_crate_names,
263            teams,
264            users,
265            version_downloads,
266            versions,
267        } = loader;
268
269        let (path, result) = if path.ends_with("badges.csv") {
270            continue; // https://github.com/rust-lang/crates.io/pull/8155
271        } else if path.ends_with("categories.csv") {
272            ("categories", read(categories, entry))
273        } else if path.ends_with("crate_downloads.csv") {
274            ("crate_downloads", read(crate_downloads, entry))
275        } else if path.ends_with("crate_owners.csv") {
276            ("crate_owners", read(crate_owners, entry))
277        } else if path.ends_with("crates.csv") {
278            ("crates", read(crates, entry))
279        } else if path.ends_with("crates_categories.csv") {
280            ("crates_categories", read(crates_categories, entry))
281        } else if path.ends_with("crates_keywords.csv") {
282            ("crates_keywords", read(crates_keywords, entry))
283        } else if path.ends_with("default_versions.csv") {
284            ("default_versions", read(default_versions, entry))
285        } else if path.ends_with("dependencies.csv") {
286            ("dependencies", read(dependencies, entry))
287        } else if path.ends_with("keywords.csv") {
288            ("keywords", read(keywords, entry))
289        } else if path.ends_with("metadata.csv") {
290            ("metadata", read(metadata, entry))
291        } else if path.ends_with("reserved_crate_names.csv") {
292            ("reserved_crate_names", read(reserved_crate_names, entry))
293        } else if path.ends_with("teams.csv") {
294            ("teams", read(teams, entry))
295        } else if path.ends_with("users.csv") {
296            ("users", read(users, entry))
297        } else if path.ends_with("version_authors.csv") {
298            continue; // https://github.com/rust-lang/crates.io/pull/3549
299        } else if path.ends_with("version_downloads.csv") {
300            ("version_downloads", read(version_downloads, entry))
301        } else if path.ends_with("versions.csv") {
302            ("versions", read(versions, entry))
303        } else {
304            if cfg!(db_dump_panic_on_unrecognized_csv) {
305                panic!("unimplemented: {}", path.display());
306            } else {
307                eprintln!("unimplemented: {}", path.display());
308            }
309            continue;
310        };
311
312        if let Err(mut err) = result {
313            err.e.path = Some(Path::new(path));
314            return Err(err);
315        }
316    }
317
318    Ok(())
319}
320
321pub(crate) trait FromRecord: Sized {
322    fn from_record(record: &StringRecord, headers: &StringRecord) -> Result<Self>;
323}
324
325impl<T> FromRecord for T
326where
327    T: DeserializeOwned,
328{
329    fn from_record(record: &StringRecord, headers: &StringRecord) -> Result<Self> {
330        record.deserialize(Some(headers)).map_err(err)
331    }
332}
333
334fn read<T>(loader: &mut Option<Callback<T>>, entry: impl Read) -> Result<()>
335where
336    T: FromRecord,
337{
338    if let Some(loader) = loader {
339        let mut csv = csv::Reader::from_reader(entry);
340        let headers = csv.headers().map_err(err)?.clone();
341        let mut record = StringRecord::new();
342        while csv.read_record(&mut record).map_err(err)? {
343            let record = T::from_record(&record, &headers)?;
344            (loader.f)(record);
345        }
346        loader.done = true;
347    }
348    Ok(())
349}
350
351/// Deserialize *everything* in a crates.io DB dump into memory.
352///
353/// This function is equivalent to the following [`Loader`]-based invocation:
354///
355/// ```
356/// # use std::path::Path;
357/// # use db_dump::Result;
358/// #
359/// # struct DbDump {
360/// #     categories: Vec<db_dump::categories::Row>,
361/// #     crate_owners: Vec<db_dump::crate_owners::Row>,
362/// #     versions: Vec<db_dump::versions::Row>,
363/// # }
364/// #
365/// # pub fn load_all(path: impl AsRef<Path>) -> Result<DbDump> {
366/// #     let path = path.as_ref();
367/// let mut categories = Vec::new();
368/// let mut crate_owners = Vec::new();
369/// /* ... */
370/// let mut versions = Vec::new();
371///
372/// db_dump::Loader::new()
373///     .categories(|row| categories.push(row))
374///     .crate_owners(|row| crate_owners.push(row))
375///     /* ... */
376///     .versions(|row| versions.push(row))
377///     .load(path)?;
378///
379/// Ok(DbDump {
380///     categories,
381///     crate_owners,
382///     /* ... */
383///     versions,
384/// })
385/// # }
386/// ```
387///
388/// Usually whatever you are doing will not require *all* of the information in
389/// a dump, in which case utilizing `Loader` to load just what you need can be
390/// significantly more efficient.
391pub fn load_all(path: impl AsRef<Path>) -> Result<DbDump> {
392    do_load_all(path.as_ref())
393}
394
395fn do_load_all(path: &Path) -> Result<DbDump> {
396    let mut categories = Vec::new();
397    let mut crate_downloads = Vec::new();
398    let mut crate_owners = Vec::new();
399    let mut crates = Vec::new();
400    let mut crates_categories = Vec::new();
401    let mut crates_keywords = Vec::new();
402    let mut default_versions = Vec::new();
403    let mut dependencies = Vec::new();
404    let mut keywords = Vec::new();
405    let mut metadata = crate::metadata::Row { total_downloads: 0 };
406    let mut reserved_crate_names = Vec::new();
407    let mut teams = Vec::new();
408    let mut users = Vec::new();
409    let mut version_downloads = Vec::new();
410    let mut versions = Vec::new();
411
412    let mut loader = Loader {
413        categories: Some(Callback::new(|row| categories.push(row))),
414        crate_downloads: Some(Callback::new(|row| crate_downloads.push(row))),
415        crate_owners: Some(Callback::new(|row| crate_owners.push(row))),
416        crates: Some(Callback::new(|row| crates.push(row))),
417        crates_categories: Some(Callback::new(|row| crates_categories.push(row))),
418        crates_keywords: Some(Callback::new(|row| crates_keywords.push(row))),
419        default_versions: Some(Callback::new(|row| default_versions.push(row))),
420        dependencies: Some(Callback::new(|row| dependencies.push(row))),
421        keywords: Some(Callback::new(|row| keywords.push(row))),
422        metadata: Some(Callback::new(|row| metadata = row)),
423        reserved_crate_names: Some(Callback::new(|row| reserved_crate_names.push(row))),
424        teams: Some(Callback::new(|row| teams.push(row))),
425        users: Some(Callback::new(|row| users.push(row))),
426        version_downloads: Some(Callback::new(|row| version_downloads.push(row))),
427        versions: Some(Callback::new(|row| versions.push(row))),
428    };
429
430    loader.load(path)?;
431    drop(loader);
432
433    Ok(DbDump {
434        categories,
435        crate_downloads,
436        crate_owners,
437        crates,
438        crates_categories,
439        crates_keywords,
440        default_versions,
441        dependencies,
442        keywords,
443        metadata,
444        reserved_crate_names,
445        teams,
446        users,
447        version_downloads,
448        versions,
449    })
450}