use super::{
CategoriesTable, CrateDownloadsTable, CrateOwnersTable, CratesCategoriesTable, CratesKeywordsTable, CratesTable, DependenciesTable,
KeywordsTable, Table, TeamsTable, UsersTable, VersionDownloadsTable, VersionsTable,
};
#[cfg(all_tables)]
use super::{DefaultVersionsTable, MetadataTable, ReservedCrateNamesTable};
use crate::Result;
use crate::facts::progress::Progress;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use core::sync::atomic::Ordering;
use core::time::Duration;
use flate2::bufread::GzDecoder;
use futures_util::StreamExt;
use mmap_rs::{MmapFlags, MmapOptions};
use ohno::{EnrichableExt, IntoAppError, bail};
use crate::HashMap;
use std::fs::{self, File};
use std::io::{BufRead, Error as IoError, Read};
use std::path::Path;
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use tar::Archive;
use tokio::sync::mpsc;
use url::Url;
const LOG_TARGET: &str = " crates";
macro_rules! define_tables {
($(
$(#[$meta:meta])*
$field:ident: $type:ty
),* $(,)?) => {
#[derive(Debug)]
pub struct TableMgr {
$(
$(#[$meta])*
$field: Arc<$type>,
)*
}
impl TableMgr {
$(
$(#[$meta])*
#[must_use]
pub fn $field(&self) -> &$type {
&self.$field
}
)*
fn open_tables_from_scratch(
tables_root: impl AsRef<Path>,
max_ttl: Duration,
now: DateTime<Utc>,
progress: &dyn Progress,
) -> Result<Self> {
const NUM_TABLES: u64 = count_tables!($($field)*);
let finished_tables = Arc::new(core::sync::atomic::AtomicU64::new(0));
let finished_tables_clone = Arc::clone(&finished_tables);
progress.set_determinate(Box::new(move || {
(NUM_TABLES, finished_tables_clone.load(Ordering::Relaxed), "FOpening tables".to_string())
}));
$(
$(#[$meta])*
let table_start = Instant::now();
$(#[$meta])*
log::debug!(target: LOG_TARGET, "Opening table '{}'", <$type>::TABLE_NAME);
$(#[$meta])*
let table = <$type>::open(&tables_root, max_ttl, now)
.into_app_err(concat!("opening ", stringify!($field), " table"))?;
$(#[$meta])*
let $field = Arc::new(table);
$(#[$meta])*
{
log::debug!(target: LOG_TARGET, "Finished opening table '{}' in {:.3}s", <$type>::TABLE_NAME, table_start.elapsed().as_secs_f64());
let _ = finished_tables.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
}
)*
Ok(Self {
$(
$(#[$meta])*
$field,
)*
})
}
fn open_tables_from_files(
files: HashMap<&'static str, File>,
max_ttl: Duration,
now: DateTime<Utc>,
progress: &dyn Progress,
) -> Result<Self> {
const NUM_TABLES: u64 = count_tables!($($field)*);
let finished_tables = Arc::new(core::sync::atomic::AtomicU64::new(0));
let finished_tables_clone = Arc::clone(&finished_tables);
progress.set_determinate(Box::new(move || {
(NUM_TABLES, finished_tables_clone.load(Ordering::Relaxed), "Opening tables".to_string())
}));
$(
$(#[$meta])*
let table_start = Instant::now();
$(#[$meta])*
log::debug!(target: LOG_TARGET, "Opening table '{}'", <$type>::TABLE_NAME);
$(#[$meta])*
let file = files.get(<$type>::TABLE_NAME)
.into_app_err_with(|| format!("missing file for table {}", <$type>::TABLE_NAME))?;
$(#[$meta])*
let mmap_start = Instant::now();
$(#[$meta])*
let metadata = file.metadata()
.into_app_err_with(|| format!("getting metadata for {}", <$type>::TABLE_NAME))?;
$(#[$meta])*
#[expect(clippy::cast_possible_truncation, reason = "Table files won't exceed usize::MAX on any supported platform")]
let file_size = metadata.len() as usize;
$(#[$meta])*
#[expect(clippy::multiple_unsafe_ops_per_block, reason = "All operations are part of the same logical mmap creation sequence")]
let mmap = unsafe {
MmapOptions::new(file_size)
.into_app_err_with(|| format!("creating mmap options for {}", <$type>::TABLE_NAME))?
.with_flags(MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL)
.with_file(file, 0)
.map()
.into_app_err_with(|| format!("memory-mapping {}", <$type>::TABLE_NAME))?
};
$(#[$meta])*
log::debug!(target: LOG_TARGET, "Finished mapping '{}' in {:.3}s", <$type>::TABLE_NAME, mmap_start.elapsed().as_secs_f64());
$(#[$meta])*
let open_start = Instant::now();
$(#[$meta])*
let table = <$type>::open_with(mmap, max_ttl, now)
.into_app_err(concat!("opening ", stringify!($field), " table"))?;
$(#[$meta])*
log::debug!(target: LOG_TARGET, "Finished validating {} in {:.3}s", <$type>::TABLE_NAME, open_start.elapsed().as_secs_f64());
$(#[$meta])*
let $field = Arc::new(table);
$(#[$meta])*
{
log::debug!(target: LOG_TARGET, "Finished opening '{}' in {:.3}s", <$type>::TABLE_NAME, table_start.elapsed().as_secs_f64());
let _ = finished_tables.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
}
)*
Ok(Self {
$(
$(#[$meta])*
$field,
)*
})
}
}
fn delete_all_tables(tables_root: impl AsRef<Path>) -> Result<bool> {
let tables_root = tables_root.as_ref();
#[cfg(windows)]
let mut any_locked = false;
$(
$(#[$meta])*
let table_path = tables_root.join(<$type>::TABLE_NAME);
$(#[$meta])*
if table_path.exists() {
if let Err(e) = fs::remove_file(&table_path) {
#[cfg(windows)]
if e.raw_os_error() == Some(32) {
any_locked = true;
} else {
return Err(e).into_app_err_with(|| format!("removing {}", table_path.display()));
}
#[cfg(not(windows))]
{
return Err(e).into_app_err_with(|| format!("removing {}", table_path.display()));
}
}
}
)*
#[cfg(windows)]
return Ok(!any_locked);
#[cfg(not(windows))]
return Ok(true);
}
fn process_csv_entry(
filename: &str,
entry: &mut tar::Entry<impl Read>,
tables_root: &Path,
now: DateTime<Utc>,
) -> Result<Option<(&'static str, File)>> {
match filename {
$(
$(#[$meta])*
<$type>::CSV_NAME => {
log::info!(target: LOG_TARGET, "Processing CSV file '{}' from database", <$type>::CSV_NAME);
let file = <$type>::create_table(tables_root, entry, now)?;
Ok(Some((<$type>::TABLE_NAME, file)))
}
)*
_ => Ok(None),
}
}
};
}
macro_rules! count_tables {
() => (0);
($head:ident $($tail:ident)*) => (1 + count_tables!($($tail)*));
}
define_tables! {
crates_table: CratesTable,
versions_table: VersionsTable,
version_downloads_table: VersionDownloadsTable,
dependencies_table: DependenciesTable,
crate_downloads_table: CrateDownloadsTable,
crates_categories_table: CratesCategoriesTable,
crates_keywords_table: CratesKeywordsTable,
categories_table: CategoriesTable,
keywords_table: KeywordsTable,
teams_table: TeamsTable,
users_table: UsersTable,
crate_owners_table: CrateOwnersTable,
#[cfg(all_tables)]
metadata_table: MetadataTable,
#[cfg(all_tables)]
default_versions_table: DefaultVersionsTable,
#[cfg(all_tables)]
reserved_crate_names_table: ReservedCrateNamesTable,
}
impl TableMgr {
pub async fn new(
source: &Url,
tables_root: impl AsRef<Path>,
max_ttl: Duration,
now: DateTime<Utc>,
ignore_cached: bool,
progress: Arc<dyn Progress>,
) -> Result<Self> {
let tables_root = tables_root.as_ref();
if !ignore_cached {
log::info!("Opening the crates database");
let result = Self::open_tables_from_scratch(tables_root, max_ttl, now, progress.as_ref());
if let Ok(ref table_mgr) = result {
log::debug!(
target: LOG_TARGET,
"successfully opened cached crates.io tables from {} (created at {})",
tables_root.display(),
table_mgr.created_at()
);
return result;
}
}
log::info!(target: LOG_TARGET, "Cached crates database not found or out of date, downloading a fresh copy");
if let Err(e) = Self::cleanup_tables(tables_root) {
log::debug!(
target: LOG_TARGET,
"unable to cleanup stale table files from {}, continuing anyway: {}",
tables_root.display(),
e
);
}
match prep_tables(source, tables_root, max_ttl, now, progress).await {
Ok(table_mgr) => Ok(table_mgr),
Err(e) => Err(e.enrich("could not prepare crates.io tables")),
}
}
#[must_use]
pub fn created_at(&self) -> DateTime<Utc> {
self.crates_table.timestamp()
}
fn cleanup_tables(tables_root: impl AsRef<Path>) -> Result<()> {
const MAX_WAIT_MS: u64 = 4000;
const INITIAL_DELAY_MS: u64 = 100;
const MAX_DELAY_MS: u64 = 1000;
let tables_root = tables_root.as_ref();
let start = Instant::now();
let mut delay_ms = INITIAL_DELAY_MS;
loop {
if delete_all_tables(tables_root)? {
return Ok(());
}
#[expect(
clippy::cast_possible_truncation,
reason = "Elapsed time won't exceed u64::MAX in practice (would require ~584 million years)"
)]
let elapsed_ms = start.elapsed().as_millis() as u64;
if elapsed_ms >= MAX_WAIT_MS {
return Err(ohno::app_err!(
"unable to remove all table files in {}: some files remain locked after {}ms of retrying",
tables_root.display(),
elapsed_ms,
));
}
let remaining_ms = MAX_WAIT_MS - elapsed_ms;
let sleep_ms = delay_ms.min(remaining_ms);
#[expect(
clippy::cast_precision_loss,
reason = "sleep_ms is capped at 1000ms, well within f64 precision range"
)]
let sleep_seconds = sleep_ms as f64 / 1000.0;
log::debug!(
target: LOG_TARGET,
"unable to delete all table files in {}, retrying in {} seconds",
tables_root.display(),
sleep_seconds
);
thread::sleep(Duration::from_millis(sleep_ms));
delay_ms = (delay_ms * 2).min(MAX_DELAY_MS);
}
}
}
const NUM_CHANNEL_BUFFERS: usize = 64;
async fn prep_tables(
source: &Url,
tables_root: impl AsRef<Path>,
max_ttl: Duration,
now: DateTime<Utc>,
progress: Arc<dyn Progress>,
) -> Result<TableMgr> {
let tables_root = tables_root.as_ref().to_path_buf();
let source = source.clone();
crate::facts::resilient_http::resilient_download(
"crates_db_download",
(source, tables_root, max_ttl, now, progress),
Some(Duration::from_mins(30)),
move |(source, tables_root, max_ttl, now, progress)| async move {
prep_tables_core(&source, tables_root, max_ttl, now, progress).await
},
)
.await
}
async fn prep_tables_core(
source: &Url,
tables_root: std::path::PathBuf,
max_ttl: Duration,
now: DateTime<Utc>,
progress: Arc<dyn Progress>,
) -> Result<TableMgr> {
log::info!(target: LOG_TARGET, "Starting crates database download from {source}");
let client = reqwest::Client::builder()
.user_agent("cargo-aprz")
.build()
.into_app_err("creating HTTP client")?;
let response = crate::facts::resilient_http::resilient_get(&client, source.as_str())
.await
.into_app_err("starting crates database dump download")?;
if !response.status().is_success() {
bail!("unable to download crates database dump: HTTP {}", response.status());
}
let content_length = response.content_length();
let downloaded_bytes = Arc::new(core::sync::atomic::AtomicU64::new(0));
let downloaded_bytes_clone = Arc::clone(&downloaded_bytes);
if let Some(total) = content_length {
progress.set_determinate(Box::new(move || {
let downloaded_bytes = downloaded_bytes_clone.load(Ordering::Relaxed);
let downloaded_mb = downloaded_bytes / (1024 * 1024);
let total_mb = total / (1024 * 1024);
let message = format!("{downloaded_mb}/{total_mb} MB: Downloading crates database");
(total, downloaded_bytes, message)
}));
} else {
progress.set_indeterminate(Box::new(move || {
let downloaded_bytes = downloaded_bytes_clone.load(Ordering::Relaxed);
let downloaded_mb = downloaded_bytes / (1024 * 1024);
format!("{downloaded_mb} MB: Downloading crates database")
}));
}
let (tx, rx) = mpsc::channel::<Result<Bytes>>(NUM_CHANNEL_BUFFERS);
let processing_progress = Arc::clone(&progress);
let processing_handle =
tokio::task::spawn_blocking(move || process_download(rx, &tables_root, max_ttl, now, processing_progress.as_ref()));
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
let _ = downloaded_bytes.fetch_add(bytes.len() as u64, Ordering::Relaxed);
if tx.send(Ok(bytes)).await.is_err() {
break;
}
}
Err(e) => {
let _ = tx.send(Err(e.into())).await;
break;
}
}
}
if let Some(total) = content_length {
downloaded_bytes.store(total, Ordering::Relaxed);
}
drop(tx);
let table_mgr = processing_handle.await??;
Ok(table_mgr)
}
fn process_download(
rx: mpsc::Receiver<Result<Bytes>>,
tables_root: &Path,
max_ttl: Duration,
now: DateTime<Utc>,
progress: &dyn Progress,
) -> Result<TableMgr> {
log::info!(target: LOG_TARGET, "Processing crates database download");
let reader = ChannelReader::new(rx);
let decoder = GzDecoder::new(reader);
let mut archive = Archive::new(decoder);
let mut files = HashMap::default();
for entry in archive.entries()? {
let mut entry = entry?;
let path = entry.path()?.to_path_buf();
let filename = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
let start = Instant::now();
if let Some((table_name, file)) = process_csv_entry(filename, &mut entry, tables_root, now)? {
let _ = files.insert(table_name, file);
log::info!(
target: LOG_TARGET,
"Finished processing CSV file '{}' in {:.3}s",
filename,
start.elapsed().as_secs_f64()
);
}
}
let table_mgr = TableMgr::open_tables_from_files(files, max_ttl, now, progress)?;
Ok(table_mgr)
}
struct ChannelReader {
rx: mpsc::Receiver<Result<Bytes>>,
current_chunk: Option<Bytes>,
position: usize,
}
impl ChannelReader {
const fn new(rx: mpsc::Receiver<Result<Bytes>>) -> Self {
Self {
rx,
current_chunk: None,
position: 0,
}
}
}
impl BufRead for ChannelReader {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
while self.current_chunk.as_ref().is_none_or(|chunk| self.position >= chunk.len()) {
match self.rx.blocking_recv() {
Some(Ok(chunk)) => {
self.current_chunk = Some(chunk);
self.position = 0;
}
Some(Err(e)) => return Err(IoError::other(e.to_string())),
None => return Ok(&[]),
}
}
Ok(&self.current_chunk.as_ref().expect("guaranteed by while condition")[self.position..])
}
fn consume(&mut self, amount: usize) {
self.position += amount;
}
}
impl Read for ChannelReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let data = self.fill_buf()?;
let to_copy = data.len().min(buf.len());
buf[..to_copy].copy_from_slice(&data[..to_copy]);
self.consume(to_copy);
Ok(to_copy)
}
}