use super::*;
use tempfile::tempdir;
#[test]
fn test_import_progress() {
let progress = ImportProgress {
current_order: 3,
current_prefix: "th".to_string(),
ngrams_in_file: 1000,
total_ngrams: 50000,
files_completed: 10,
total_files: 678,
bytes_downloaded: 1024 * 1024,
ngrams_per_second: 5000.0,
eta_seconds: Some(3600),
phase: ImportPhase::Importing,
};
assert_eq!(progress.current_order, 3);
assert_eq!(progress.phase, ImportPhase::Importing);
}
#[test]
fn test_import_stats_default() {
let stats = ImportStats::default();
assert_eq!(stats.total_ngrams, 0);
assert_eq!(stats.files_processed, 0);
}
#[test]
fn test_importer_creation() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let config = GoogleBooksConfig::builder()
.language("en")
.orders(1..=1)
.output_path(output_path)
.build()
.expect("Failed to build config");
let importer = GoogleBooksImporter::new(config);
assert!(importer.is_ok());
}
#[test]
fn test_unsupported_language() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let config = GoogleBooksConfig::builder()
.language("invalid")
.orders(1..=1)
.output_path(output_path)
.build()
.expect("Failed to build config");
let importer = GoogleBooksImporter::new(config);
assert!(matches!(importer, Err(ImportError::UnsupportedLanguage(_))));
}
#[test]
fn test_interrupt_flag() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let config = GoogleBooksConfig::builder()
.language("en")
.orders(1..=1)
.output_path(output_path)
.build()
.expect("Failed to build config");
let importer = GoogleBooksImporter::new(config).expect("Failed to create importer");
assert!(!importer.is_interrupted());
importer.interrupt();
assert!(importer.is_interrupted());
}
fn create_mock_ngram_file(path: &std::path::Path, ngrams: &[(&str, u16, u64, u32)]) {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let file = std::fs::File::create(path).expect("Failed to create file");
let mut encoder = GzEncoder::new(file, Compression::default());
for (ngram, year, count, volume_count) in ngrams {
writeln!(encoder, "{}\t{}\t{}\t{}", ngram, year, count, volume_count)
.expect("Failed to write");
}
encoder.finish().expect("Failed to finish compression");
}
#[test]
fn test_file_import_with_mock_data() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let ngram_dir = dir.path().join("ngrams");
std::fs::create_dir(&ngram_dir).expect("Failed to create ngram dir");
let file_path = ngram_dir.join("googlebooks-eng-all-1gram-20200217-t.gz");
create_mock_ngram_file(
&file_path,
&[
("the", 2000, 50000, 1000),
("the", 2001, 55000, 1100),
("the", 2002, 60000, 1200),
("this", 2000, 10000, 500),
("this", 2001, 11000, 550),
("that", 2000, 20000, 800),
("that", 2001, 21000, 850),
("test", 2000, 5000, 200),
],
);
let config = GoogleBooksConfig::builder()
.language("en")
.orders(1..=1)
.min_count(1)
.output_path(output_path)
.build()
.expect("Failed to build config");
let mut importer = GoogleBooksImporter::new(config).expect("Failed to create importer");
let result = importer.import_files(&ngram_dir, |progress| {
assert!(progress.current_order >= 1);
});
assert!(result.is_ok());
let stats = result.unwrap();
assert!(stats.total_ngrams > 0, "Should have imported n-grams");
}
#[test]
fn test_year_filtering() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let ngram_dir = dir.path().join("ngrams");
std::fs::create_dir(&ngram_dir).expect("Failed to create ngram dir");
let file_path = ngram_dir.join("googlebooks-eng-all-1gram-20200217-a.gz");
create_mock_ngram_file(
&file_path,
&[
("apple", 1990, 1000, 100),
("apple", 2000, 2000, 200),
("apple", 2010, 3000, 300),
("ant", 1990, 500, 50),
("ant", 2000, 600, 60),
],
);
let config = GoogleBooksConfig::builder()
.language("en")
.orders(1..=1)
.min_count(1)
.year_range(2000, 2010)
.output_path(output_path)
.build()
.expect("Failed to build config");
let mut importer = GoogleBooksImporter::new(config).expect("Failed to create importer");
let result = importer.import_files(&ngram_dir, |_| {});
assert!(result.is_ok());
}
#[test]
fn test_min_count_filtering() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let ngram_dir = dir.path().join("ngrams");
std::fs::create_dir(&ngram_dir).expect("Failed to create ngram dir");
let file_path = ngram_dir.join("googlebooks-eng-all-1gram-20200217-b.gz");
create_mock_ngram_file(
&file_path,
&[
("big", 2000, 100000, 5000), ("bear", 2000, 50000, 2500), ("bxyz", 2000, 10, 2), ],
);
let config = GoogleBooksConfig::builder()
.language("en")
.orders(1..=1)
.min_count(40)
.output_path(output_path)
.build()
.expect("Failed to build config");
let mut importer = GoogleBooksImporter::new(config).expect("Failed to create importer");
let result = importer.import_files(&ngram_dir, |_| {});
assert!(result.is_ok());
}
#[test]
fn test_pos_tag_filtering() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let ngram_dir = dir.path().join("ngrams");
std::fs::create_dir(&ngram_dir).expect("Failed to create ngram dir");
let file_path = ngram_dir.join("googlebooks-eng-all-1gram-20200217-c.gz");
create_mock_ngram_file(
&file_path,
&[
("cat", 2000, 50000, 2500),
("cat_NOUN", 2000, 45000, 2300), ("car", 2000, 40000, 2000),
("the_DET", 2000, 100000, 5000), ],
);
let mut config = GoogleBooksConfig::builder()
.language("en")
.orders(1..=1)
.min_count(1)
.output_path(output_path)
.build()
.expect("Failed to build config");
config.skip_pos_tags = true;
let mut importer = GoogleBooksImporter::new(config).expect("Failed to create importer");
let result = importer.import_files(&ngram_dir, |_| {});
assert!(result.is_ok());
}
#[test]
fn test_checkpoint_save_and_load() {
let dir = tempdir().expect("Failed to create temp dir");
let output_path = dir.path().join("test.artrie");
let config = GoogleBooksConfig::builder()
.language("en")
.orders(1..=1)
.output_path(output_path.clone())
.build()
.expect("Failed to build config");
let mut importer = GoogleBooksImporter::new(config.clone()).expect("Failed to create importer");
importer
.save_checkpoint()
.expect("Failed to save checkpoint");
let loaded = importer
.storage
.load_import_checkpoint()
.expect("Failed to load checkpoint from trie")
.expect("Checkpoint should exist in trie");
assert!(loaded.order_progress.is_empty()); assert!(loaded.completed_orders().is_empty()); }
#[cfg(feature = "google-books")]
mod shutdown_checkpoint_safety {
use super::super::import_ops::wait_for_worker_exits_before_checkpoint;
use super::super::worker_pool::{JobOutcome, JobResult};
use super::*;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
fn successful_job_result() -> JobResult {
JobResult {
order: 1,
prefix: Arc::from("a"),
outcome: JobOutcome::Success { ngram_count: 1 },
}
}
#[tokio::test]
async fn graceful_cancel_waits_for_worker_exit_before_checkpoint() {
let (result_tx, mut result_rx) = tokio::sync::mpsc::channel::<JobResult>(1);
let (worker_exit_tx, mut worker_exit_rx) = tokio::sync::mpsc::channel::<usize>(1);
let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
let mut active_workers = 1usize;
let mut results_received = 0u64;
let mut worker_shutdown_txs = HashMap::from([(7usize, shutdown_tx)]);
let mut worker_handles = HashMap::from([(7usize, tokio::spawn(async {}))]);
let force_quit = AtomicBool::new(false);
result_tx
.send(successful_job_result())
.await
.expect("result receiver should be open");
{
let wait = wait_for_worker_exits_before_checkpoint(
&mut active_workers,
&mut results_received,
&mut worker_handles,
&mut worker_shutdown_txs,
&mut worker_exit_rx,
&mut result_rx,
&force_quit,
);
tokio::pin!(wait);
tokio::select! {
result = &mut wait => {
panic!("checkpoint wait completed before worker exit: {:?}", result);
}
_ = tokio::time::sleep(Duration::from_millis(25)) => {}
}
worker_exit_tx
.send(7)
.await
.expect("worker exit receiver should be open");
wait.await.expect("worker exit should permit checkpointing");
}
assert_eq!(active_workers, 0);
assert_eq!(results_received, 1);
assert!(worker_handles.is_empty());
assert!(worker_shutdown_txs.is_empty());
}
#[tokio::test]
async fn graceful_cancel_drains_result_when_exit_wins_select() {
let (result_tx, mut result_rx) = tokio::sync::mpsc::channel::<JobResult>(1);
let (worker_exit_tx, mut worker_exit_rx) = tokio::sync::mpsc::channel::<usize>(1);
let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
let mut active_workers = 1usize;
let mut results_received = 0u64;
let mut worker_shutdown_txs = HashMap::from([(7usize, shutdown_tx)]);
let mut worker_handles = HashMap::from([(7usize, tokio::spawn(async {}))]);
let force_quit = AtomicBool::new(false);
result_tx
.send(successful_job_result())
.await
.expect("result receiver should be open");
worker_exit_tx
.send(7)
.await
.expect("worker exit receiver should be open");
wait_for_worker_exits_before_checkpoint(
&mut active_workers,
&mut results_received,
&mut worker_handles,
&mut worker_shutdown_txs,
&mut worker_exit_rx,
&mut result_rx,
&force_quit,
)
.await
.expect("worker exit should permit checkpointing");
assert_eq!(active_workers, 0);
assert_eq!(results_received, 1);
assert!(worker_handles.is_empty());
assert!(worker_shutdown_txs.is_empty());
}
#[tokio::test]
async fn force_quit_during_graceful_cancel_prevents_checkpoint() {
let (_result_tx, mut result_rx) = tokio::sync::mpsc::channel::<JobResult>(1);
let (_worker_exit_tx, mut worker_exit_rx) = tokio::sync::mpsc::channel::<usize>(1);
let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
let mut active_workers = 1usize;
let mut results_received = 0u64;
let mut worker_shutdown_txs = HashMap::from([(7usize, shutdown_tx)]);
let mut worker_handles = HashMap::from([(7usize, tokio::spawn(async {}))]);
let force_quit = AtomicBool::new(true);
let result = wait_for_worker_exits_before_checkpoint(
&mut active_workers,
&mut results_received,
&mut worker_handles,
&mut worker_shutdown_txs,
&mut worker_exit_rx,
&mut result_rx,
&force_quit,
)
.await;
assert!(matches!(result, Err(ImportError::Interrupted)));
assert_eq!(active_workers, 1);
assert_eq!(results_received, 0);
assert!(worker_handles.contains_key(&7));
assert!(worker_shutdown_txs.contains_key(&7));
}
}
#[cfg(feature = "google-books")]
mod cache_files {
use super::*;
use wiremock::matchers::{header_exists, method};
use wiremock::{Mock, MockServer, ResponseTemplate};
fn build_client() -> reqwest::Client {
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("Failed to build test HTTP client")
}
#[tokio::test]
async fn download_to_cache_creates_file() {
let server = MockServer::start().await;
let body: &[u8] = &[0x1f, 0x8b, 0x08, 0x00, b'h', b'i', 0x00, 0x00];
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(body))
.mount(&server)
.await;
let tmp = tempdir().expect("tempdir");
let cache_path = tmp.path().join("test.gz");
let url = format!("{}/test.gz", server.uri());
let client = build_client();
download_to_cache(&url, &cache_path, &client)
.await
.expect("download should succeed");
assert!(
cache_path.exists(),
"cache file should exist after download"
);
let written = std::fs::read(&cache_path).expect("read cache file");
assert_eq!(written, body, "cache file should contain server body");
}
#[tokio::test]
async fn download_to_cache_skips_if_exists() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(b"server bytes"))
.mount(&server)
.await;
let tmp = tempdir().expect("tempdir");
let cache_path = tmp.path().join("test.gz");
std::fs::write(&cache_path, b"sentinel").expect("pre-populate cache");
let url = format!("{}/test.gz", server.uri());
let client = build_client();
download_to_cache(&url, &cache_path, &client)
.await
.expect("download should succeed (no-op)");
let written = std::fs::read(&cache_path).expect("read cache file");
assert_eq!(
written, b"sentinel",
"cache should keep its sentinel content"
);
}
#[tokio::test]
async fn download_to_cache_resume_via_range() {
let full_body: &[u8] = b"0123456789abcdef";
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(header_exists("range"))
.respond_with(ResponseTemplate::new(206).set_body_bytes(&full_body[8..]))
.mount(&server)
.await;
let tmp = tempdir().expect("tempdir");
let cache_path = tmp.path().join("test.gz");
let downloading = cache_path.with_extension("gz.downloading");
std::fs::write(&downloading, &full_body[..8]).expect("seed partial");
let url = format!("{}/test.gz", server.uri());
let client = build_client();
download_to_cache(&url, &cache_path, &client)
.await
.expect("download should resume");
assert!(cache_path.exists());
assert!(
!downloading.exists(),
"downloading remnant should be renamed away"
);
let written = std::fs::read(&cache_path).expect("read");
assert_eq!(
written, full_body,
"resumed download should assemble first-half (cached) + second-half (server) = full body"
);
}
#[tokio::test]
async fn download_to_cache_416_recovery() {
let full_body: &[u8] = b"short";
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(header_exists("range"))
.respond_with(ResponseTemplate::new(416))
.up_to_n_times(1)
.mount(&server)
.await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(full_body))
.mount(&server)
.await;
let tmp = tempdir().expect("tempdir");
let cache_path = tmp.path().join("test.gz");
let downloading = cache_path.with_extension("gz.downloading");
std::fs::write(&downloading, b"toolongbye").expect("seed oversized partial");
let url = format!("{}/test.gz", server.uri());
let client = build_client();
download_to_cache(&url, &cache_path, &client)
.await
.expect("download should recover from 416");
assert!(cache_path.exists());
let written = std::fs::read(&cache_path).expect("read");
assert_eq!(
written, full_body,
"after 416 recovery, file matches server's full body"
);
}
#[tokio::test]
async fn cleanup_cache_file_removes_both() {
let tmp = tempdir().expect("tempdir");
let cache_path = tmp.path().join("test.gz");
let downloading = cache_path.with_extension("gz.downloading");
std::fs::write(&cache_path, b"final").expect("write final");
std::fs::write(&downloading, b"partial").expect("write partial");
assert!(cache_path.exists() && downloading.exists());
cleanup_cache_file(&cache_path).await;
assert!(!cache_path.exists(), ".gz should be removed");
assert!(!downloading.exists(), ".gz.downloading should be removed");
}
#[tokio::test]
async fn cleanup_cache_file_is_idempotent() {
let tmp = tempdir().expect("tempdir");
let cache_path = tmp.path().join("nope.gz");
cleanup_cache_file(&cache_path).await;
assert!(!cache_path.exists());
}
}