#![cfg(feature = "google-books")]
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::super::config::GoogleBooksConfig;
use super::super::reader::ReaderError;
use super::super::storage::{NgramStorage, StoragePrefixTx};
use super::super::task_manager::RetryAfter;
use super::{
cleanup_cache_file, download_to_cache, extract_retry_after, is_retryable_error,
store_ngram_shared, ImportError, WorkerUpdate, COUNTER_BATCH_SIZE,
};
#[cfg(feature = "google-books")]
pub(super) const MAX_RETRIES: u8 = 5;
#[cfg(feature = "google-books")]
pub(super) const INITIAL_BACKOFF_MS: u64 = 1000;
#[cfg(feature = "google-books")]
#[derive(Clone)]
pub(super) struct Job {
url: Arc<str>,
prefix: Arc<str>,
order: u8,
attempt: u8,
backoff_ms: u64,
ready_at: Option<std::time::Instant>,
}
#[cfg(feature = "google-books")]
impl Job {
pub(super) fn new(url: impl Into<Arc<str>>, prefix: impl Into<Arc<str>>, order: u8) -> Self {
Self {
url: url.into(),
prefix: prefix.into(),
order,
attempt: 0,
backoff_ms: INITIAL_BACKOFF_MS,
ready_at: None, }
}
fn with_retry_after(&self, retry_after: Option<RetryAfter>) -> Self {
let new_backoff = self.backoff_ms.saturating_mul(2);
let ready_at = match retry_after {
Some(ra) => {
let duration = ra.to_duration();
std::time::Instant::now() + duration
}
None => {
std::time::Instant::now() + Duration::from_millis(new_backoff)
}
};
Self {
url: Arc::clone(&self.url),
prefix: Arc::clone(&self.prefix),
order: self.order,
attempt: self.attempt + 1,
backoff_ms: new_backoff,
ready_at: Some(ready_at),
}
}
fn is_ready(&self) -> bool {
self.ready_at
.map(|t| t <= std::time::Instant::now())
.unwrap_or(true)
}
}
#[cfg(feature = "google-books")]
#[derive(Debug)]
pub(super) struct JobResult {
pub(super) order: u8,
pub(super) prefix: Arc<str>,
pub(super) outcome: JobOutcome,
}
#[cfg(feature = "google-books")]
#[derive(Debug)]
pub(super) enum JobOutcome {
Success {
ngram_count: u64,
},
Failed {
error: ImportError,
attempts: u32,
},
Skipped {
error: ImportError,
attempts: u32,
},
}
#[cfg(feature = "google-books")]
#[derive(Debug, Clone)]
struct RequestDebugInfo {
url: String,
status_code: Option<u16>,
response_time_ms: u64,
error_message: String,
}
#[cfg(feature = "google-books")]
impl RequestDebugInfo {
fn from_error(url: &str, error: &ImportError, response_time: Duration) -> Self {
let status_code = match error {
ImportError::Reader(e) => {
let msg = e.to_string();
if msg.contains("404") {
Some(404)
} else if msg.contains("429") {
Some(429)
} else if msg.contains("500") {
Some(500)
} else if msg.contains("503") {
Some(503)
} else {
None
}
}
_ => None,
};
Self {
url: url.to_string(),
status_code,
response_time_ms: response_time.as_millis() as u64,
error_message: error.to_string(),
}
}
}
#[cfg(feature = "google-books")]
pub(super) enum PrefixOutcome {
Success {
prefix: Arc<str>,
ngram_count: u64,
},
Deferred {
url: Arc<str>,
prefix: Arc<str>,
order: u8,
attempt: u8,
backoff_ms: u64,
},
Failed {
prefix: Arc<str>,
error: ImportError,
attempts: u32,
},
}
#[cfg(feature = "google-books")]
pub(super) struct WorkerSharedState {
pub(super) config: GoogleBooksConfig,
pub(super) storage: Arc<NgramStorage>,
pub(super) total_ngrams: Arc<AtomicU64>,
pub(super) unique_ngrams: Arc<AtomicU64>,
pub(super) progress_tx: tokio::sync::mpsc::Sender<WorkerUpdate>,
pub(super) paused: Arc<AtomicBool>,
pub(super) queue_size: Arc<AtomicUsize>,
pub(super) worker_stats: Vec<AtomicU64>,
pub(super) http_client: reqwest::Client,
}
#[cfg(feature = "google-books")]
pub(super) struct PrefixProcessingContext {
pub(super) config: GoogleBooksConfig,
pub(super) storage: Arc<NgramStorage>,
pub(super) total_ngrams: Arc<AtomicU64>,
pub(super) unique_ngrams: Arc<AtomicU64>,
pub(super) progress_tx: Option<tokio::sync::mpsc::Sender<WorkerUpdate>>,
pub(super) http_client: reqwest::Client,
pub(super) worker_id_pool_tx: tokio::sync::mpsc::Sender<usize>,
pub(super) worker_id_pool_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<usize>>>,
}
#[cfg(feature = "google-books")]
async fn process_aggregated_stream<S>(
stream: S,
mut tx: StoragePrefixTx,
ctx: &Arc<PrefixProcessingContext>,
prefix: &str,
order: u8,
worker_id: usize,
source_label: &str,
) -> Result<u64, ImportError>
where
S: tokio_stream::Stream<Item = Result<super::super::aggregator::AggregatedNgram, ReaderError>>,
{
use tokio_stream::StreamExt;
tokio::pin!(stream);
const NGRAM_PROGRESS_INTERVAL: u64 = 50_000;
let mut count = 0u64;
let mut chunk_count = 0u64;
let mut stream_err: Option<ImportError> = None;
let tx_chunk_size = ctx.config.tx_chunk_size;
while let Some(result) = stream.next().await {
let agg = match result {
Ok(agg) => agg,
Err(e) => {
stream_err = Some(e.into());
break;
}
};
if let Err(e) = ctx
.storage
.tx_insert_ngram(&mut tx, &agg.ngram, agg.total_count)
{
stream_err = Some(e.into());
break;
}
count += 1;
chunk_count += 1;
if tx_chunk_size > 0 && chunk_count >= tx_chunk_size {
match ctx
.storage
.commit_and_renew_prefix_tx(&mut tx, prefix, order)
{
Ok(committed) => {
log::trace!(
"Worker {}: committed chunk for {} '{}' ({} n-grams)",
worker_id,
source_label,
prefix,
committed
);
chunk_count = 0;
}
Err(e) => {
stream_err = Some(e.into());
break;
}
}
}
if count % NGRAM_PROGRESS_INTERVAL == 0 {
if let Some(ref ptx) = ctx.progress_tx {
let _ = ptx.try_send(WorkerUpdate::NgramProgress {
worker_id,
ngram_count: count,
});
}
}
}
if let Some(e) = stream_err {
if let Err(abort_err) = ctx.storage.abort_prefix_tx(tx) {
log::warn!(
"Worker {}: failed to abort transaction for {} '{}': {}",
worker_id,
source_label,
prefix,
abort_err
);
}
return Err(e);
}
let committed = ctx.storage.commit_prefix_tx(tx)?;
ctx.total_ngrams.fetch_add(count, Ordering::Relaxed);
ctx.unique_ngrams
.fetch_add(committed as u64, Ordering::Relaxed);
log::trace!(
"Worker {}: committed {} '{}' with {} n-grams ({} inserted)",
worker_id,
source_label,
prefix,
count,
committed
);
Ok(count)
}
#[cfg(feature = "google-books")]
async fn process_single_attempt(
job: &Job,
shared: &WorkerSharedState,
worker_id: usize,
) -> Result<u64, ImportError> {
use super::super::reader::HttpNgramReader;
use tokio_stream::StreamExt;
let jitter_ms = rand::random::<u64>() % 500;
tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
let mut reader = HttpNgramReader::with_options(
&job.url,
shared.config.skip_pos_tags,
shared.config.min_count,
);
let stream = reader
.stream_aggregated_with_client(shared.config.year_range, Some(shared.http_client.clone()));
tokio::pin!(stream);
let mut count = 0u64;
let maybe_tx = shared.storage.begin_prefix_tx(&job.prefix, job.order)?;
let tx_chunk_size = shared.config.tx_chunk_size;
let result = if let Some(mut tx) = maybe_tx {
let tx_result: Result<u64, ImportError> = async {
let mut chunk_count = 0u64;
let mut stream_err: Option<ImportError> = None;
while let Some(result) = stream.next().await {
let agg = match result {
Ok(agg) => agg,
Err(e) => {
stream_err = Some(e.into());
break;
}
};
if let Err(e) = shared
.storage
.tx_insert_ngram(&mut tx, &agg.ngram, agg.total_count)
{
stream_err = Some(e.into());
break;
}
count += 1;
chunk_count += 1;
if tx_chunk_size > 0 && chunk_count >= tx_chunk_size {
match shared
.storage
.commit_and_renew_prefix_tx(&mut tx, &job.prefix, job.order)
{
Ok(committed) => {
log::trace!(
"Worker {}: committed chunk for prefix '{}' ({} n-grams)",
worker_id,
job.prefix,
committed
);
chunk_count = 0;
}
Err(e) => {
stream_err = Some(e.into());
break;
}
}
}
if worker_id < shared.worker_stats.len() {
let packed = (count as u64) << 32;
shared.worker_stats[worker_id].store(packed, Ordering::Relaxed);
}
}
if let Some(e) = stream_err {
if let Err(abort_err) = shared.storage.abort_prefix_tx(tx) {
log::warn!(
"Worker {}: failed to abort transaction for prefix '{}': {}",
worker_id,
job.prefix,
abort_err
);
}
return Err(e);
}
let committed = shared.storage.commit_prefix_tx(tx)?;
log::trace!(
"Worker {}: committed prefix '{}' with {} n-grams",
worker_id,
job.prefix,
committed
);
Ok(count)
}
.await;
tx_result
} else {
let mut unique_count = 0u64;
while let Some(result) = stream.next().await {
let agg = result?;
let storage_result = store_ngram_shared(&agg.ngram, agg.total_count, &shared.storage)?;
count += 1;
if storage_result.is_new {
unique_count += 1;
}
if worker_id < shared.worker_stats.len() {
let packed = ((count as u64) << 32) | (unique_count as u64 & 0xFFFFFFFF);
shared.worker_stats[worker_id].store(packed, Ordering::Relaxed);
}
}
if unique_count > 0 {
shared
.unique_ngrams
.fetch_add(unique_count, Ordering::Relaxed);
}
Ok(count)
};
if let Ok(ngram_count) = result {
shared
.total_ngrams
.fetch_add(ngram_count, Ordering::Relaxed);
}
if worker_id < shared.worker_stats.len() {
shared.worker_stats[worker_id].store(0, Ordering::Relaxed);
}
result
}
#[cfg(feature = "google-books")]
async fn process_single_attempt_cached(
job: &Job,
shared: &WorkerSharedState,
worker_id: usize,
) -> Result<u64, ImportError> {
use super::super::reader::stream_aggregated_from_cached_file;
use tokio_stream::StreamExt;
let cache_path = shared
.config
.cache_file_path(job.order, &job.prefix)
.ok_or_else(|| {
ImportError::Config(format!(
"Unknown language '{}' for cache file path",
shared.config.language
))
})?;
download_to_cache(&job.url, &cache_path, &shared.http_client).await?;
let stream = stream_aggregated_from_cached_file(
&cache_path,
shared.config.year_range,
shared.config.skip_pos_tags,
shared.config.min_count,
);
tokio::pin!(stream);
let mut count = 0u64;
let maybe_tx = shared.storage.begin_prefix_tx(&job.prefix, job.order)?;
let tx_chunk_size = shared.config.tx_chunk_size;
let result = if let Some(mut tx) = maybe_tx {
let tx_result: Result<u64, ImportError> = async {
let mut chunk_count = 0u64;
let mut stream_err: Option<ImportError> = None;
while let Some(result) = stream.next().await {
let agg = match result {
Ok(agg) => agg,
Err(e) => {
stream_err = Some(e.into());
break;
}
};
if let Err(e) = shared
.storage
.tx_insert_ngram(&mut tx, &agg.ngram, agg.total_count)
{
stream_err = Some(e.into());
break;
}
count += 1;
chunk_count += 1;
if tx_chunk_size > 0 && chunk_count >= tx_chunk_size {
match shared
.storage
.commit_and_renew_prefix_tx(&mut tx, &job.prefix, job.order)
{
Ok(committed) => {
log::trace!(
"Worker {}: committed chunk for cached prefix '{}' ({} n-grams)",
worker_id,
job.prefix,
committed
);
chunk_count = 0;
}
Err(e) => {
stream_err = Some(e.into());
break;
}
}
}
if worker_id < shared.worker_stats.len() {
let packed = (count as u64) << 32;
shared.worker_stats[worker_id].store(packed, Ordering::Relaxed);
}
}
if let Some(e) = stream_err {
if let Err(abort_err) = shared.storage.abort_prefix_tx(tx) {
log::warn!(
"Worker {}: failed to abort transaction for prefix '{}': {}",
worker_id,
job.prefix,
abort_err
);
}
return Err(e);
}
let committed = shared.storage.commit_prefix_tx(tx)?;
log::trace!(
"Worker {}: committed cached prefix '{}' with {} n-grams",
worker_id,
job.prefix,
committed
);
Ok(count)
}
.await;
tx_result
} else {
let mut unique_count = 0u64;
while let Some(result) = stream.next().await {
let agg = result?;
let storage_result = store_ngram_shared(&agg.ngram, agg.total_count, &shared.storage)?;
count += 1;
if storage_result.is_new {
unique_count += 1;
}
if worker_id < shared.worker_stats.len() {
let packed = ((count as u64) << 32) | (unique_count as u64 & 0xFFFFFFFF);
shared.worker_stats[worker_id].store(packed, Ordering::Relaxed);
}
}
if unique_count > 0 {
shared
.unique_ngrams
.fetch_add(unique_count, Ordering::Relaxed);
}
Ok(count)
};
cleanup_cache_file(&cache_path).await;
if let Ok(ngram_count) = result {
shared
.total_ngrams
.fetch_add(ngram_count, Ordering::Relaxed);
}
if worker_id < shared.worker_stats.len() {
shared.worker_stats[worker_id].store(0, Ordering::Relaxed);
}
result
}
#[cfg(feature = "google-books")]
pub(super) async fn worker_task(
worker_id: usize,
job_rx: async_channel::Receiver<Job>,
job_tx: async_channel::Sender<Job>,
mut shutdown_rx: tokio::sync::watch::Receiver<bool>,
shared: Arc<WorkerSharedState>,
result_tx: tokio::sync::mpsc::Sender<JobResult>,
worker_exit_tx: tokio::sync::mpsc::Sender<usize>,
) {
let mut consecutive_deferred = 0usize;
let mut earliest_ready: Option<Instant> = None;
loop {
if *shutdown_rx.borrow() {
log::debug!("Worker {} shutting down", worker_id);
break;
}
let job = tokio::select! {
biased;
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
log::debug!("Worker {} received shutdown signal while waiting for job", worker_id);
break;
}
continue;
}
result = job_rx.recv() => result.ok(),
};
let Some(job) = job else {
if consecutive_deferred > 0 {
log::debug!(
"Worker {} queue closed with {} deferred jobs pending",
worker_id,
consecutive_deferred
);
}
log::debug!("Worker {} finished - queue empty", worker_id);
break;
};
if !job.is_ready() {
consecutive_deferred += 1;
if let Some(ready_at) = job.ready_at {
earliest_ready = Some(match earliest_ready {
Some(e) => e.min(ready_at),
None => ready_at,
});
}
let _ = job_tx.send(job).await;
let queue_size = shared.queue_size.load(Ordering::SeqCst);
if queue_size > 0 && consecutive_deferred >= queue_size {
if let Some(ready_at) = earliest_ready {
let wait = ready_at.saturating_duration_since(Instant::now());
if !wait.is_zero() {
let jitter = Duration::from_millis(
(worker_id as u64 * 100) + (rand::random::<u64>() % 500),
);
let staggered_wait = wait + jitter;
log::debug!(
"Worker {} blocking {}ms (+{}ms jitter) - all {} jobs deferred",
worker_id,
wait.as_millis(),
jitter.as_millis(),
queue_size
);
tokio::time::sleep(staggered_wait).await;
}
}
consecutive_deferred = 0;
earliest_ready = None;
}
continue;
}
consecutive_deferred = 0;
earliest_ready = None;
if shared
.storage
.is_prefix_shard_syncing(&job.prefix, job.order)
{
let deferred_job = Job {
url: Arc::clone(&job.url),
prefix: Arc::clone(&job.prefix),
order: job.order,
attempt: job.attempt, backoff_ms: job.backoff_ms, ready_at: Some(Instant::now() + Duration::from_millis(50)), };
log::trace!(
"Worker {} deferring {} (order {}) - shard syncing",
worker_id,
job.prefix,
job.order
);
let _ = job_tx.send(deferred_job).await; consecutive_deferred += 1;
let queue_size = shared.queue_size.load(Ordering::SeqCst);
if queue_size > 0 && consecutive_deferred >= queue_size {
let jitter =
Duration::from_millis((worker_id as u64 * 10) + (rand::random::<u64>() % 100));
log::debug!(
"Worker {} blocking {}ms - all {} jobs targeting syncing shards",
worker_id,
jitter.as_millis(),
queue_size
);
tokio::time::sleep(jitter).await;
consecutive_deferred = 0;
}
continue;
}
while shared.paused.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(100)).await;
if *shutdown_rx.borrow() {
break;
}
}
let _ = shared.progress_tx.try_send(WorkerUpdate::Started {
worker_id,
order: job.order,
prefix: job.prefix.clone(),
attempt: job.attempt,
});
let start_time = Instant::now();
let result = if shared.config.cache_files {
process_single_attempt_cached(&job, &shared, worker_id).await
} else {
process_single_attempt(&job, &shared, worker_id).await
};
let elapsed = start_time.elapsed();
match result {
Ok(count) => {
shared.queue_size.fetch_sub(1, Ordering::SeqCst);
let _ = shared.progress_tx.try_send(WorkerUpdate::Finished {
worker_id,
order: job.order,
prefix: job.prefix.clone(),
ngram_count: count,
duration: elapsed,
});
let job_result = JobResult {
order: job.order,
prefix: job.prefix,
outcome: JobOutcome::Success { ngram_count: count },
};
if result_tx.send(job_result).await.is_err() {
let _ = worker_exit_tx.send(worker_id).await;
let _ = shared
.progress_tx
.try_send(WorkerUpdate::Exited { worker_id });
return;
}
}
Err(e) if is_retryable_error(&e) && job.attempt < MAX_RETRIES => {
let retry_after = extract_retry_after(&e);
let retry_job = job.with_retry_after(retry_after.clone());
let debug_info = RequestDebugInfo::from_error(&job.url, &e, elapsed);
let delay_ms = retry_job
.ready_at
.map(|ra| {
ra.saturating_duration_since(std::time::Instant::now())
.as_millis() as u64
})
.unwrap_or(retry_job.backoff_ms);
log::debug!(
"Worker {} deferring {} (order {}) - attempt {}/{}, retry at +{}ms{}\n\
URL: {}\n\
Error: {}\n\
Status code: {:?}\n\
Response time: {}ms",
worker_id,
retry_job.prefix,
retry_job.order,
retry_job.attempt,
MAX_RETRIES,
delay_ms,
if retry_after.is_some() {
" (from Retry-After header)"
} else {
""
},
debug_info.url,
debug_info.error_message,
debug_info.status_code,
debug_info.response_time_ms,
);
let _ = shared.progress_tx.try_send(WorkerUpdate::Retrying {
worker_id,
order: retry_job.order,
prefix: Arc::clone(&retry_job.prefix),
attempt: retry_job.attempt as u32,
error: Arc::from(e.to_string()),
});
let _ = job_tx.send(retry_job).await;
}
Err(error) => {
shared.queue_size.fetch_sub(1, Ordering::SeqCst);
let debug_info = RequestDebugInfo::from_error(&job.url, &error, elapsed);
let is_max_retries = is_retryable_error(&error) && job.attempt >= MAX_RETRIES;
if is_max_retries {
log::warn!(
"Worker {} SKIPPING prefix {} (order {}) after {} failed attempts - will retry next session\n\
URL: {}\n\
Final error: {}\n\
Status code: {:?}\n\
Response time: {}ms",
worker_id,
job.prefix,
job.order,
job.attempt + 1,
debug_info.url,
debug_info.error_message,
debug_info.status_code,
debug_info.response_time_ms,
);
let job_result = JobResult {
order: job.order,
prefix: job.prefix,
outcome: JobOutcome::Skipped {
error,
attempts: (job.attempt + 1) as u32,
},
};
if result_tx.send(job_result).await.is_err() {
let _ = worker_exit_tx.send(worker_id).await;
let _ = shared
.progress_tx
.try_send(WorkerUpdate::Exited { worker_id });
return;
}
} else {
log::warn!(
"Worker {} FAILED on prefix {} (order {}) - non-retryable error after {} attempts\n\
URL: {}\n\
Error: {}\n\
Status code: {:?}\n\
Response time: {}ms",
worker_id,
job.prefix,
job.order,
job.attempt + 1,
debug_info.url,
debug_info.error_message,
debug_info.status_code,
debug_info.response_time_ms,
);
let job_result = JobResult {
order: job.order,
prefix: job.prefix,
outcome: JobOutcome::Failed {
error,
attempts: (job.attempt + 1) as u32,
},
};
if result_tx.send(job_result).await.is_err() {
let _ = worker_exit_tx.send(worker_id).await;
let _ = shared
.progress_tx
.try_send(WorkerUpdate::Exited { worker_id });
return;
}
}
}
}
}
let _ = worker_exit_tx.send(worker_id).await;
let _ = shared
.progress_tx
.try_send(WorkerUpdate::Exited { worker_id });
log::debug!("Worker {} exited", worker_id);
}
#[cfg(feature = "google-books")]
pub(super) async fn process_prefix_file(
ctx: Arc<PrefixProcessingContext>,
url: Arc<str>,
prefix: Arc<str>,
order: u8,
attempt: u8,
backoff_ms: u64,
) -> PrefixOutcome {
use super::super::reader::HttpNgramReader;
use tokio_stream::StreamExt;
let worker_id = {
let mut rx = ctx.worker_id_pool_rx.lock().await;
rx.recv().await.expect("Worker ID pool closed unexpectedly")
};
let return_worker_id = |pool_tx: tokio::sync::mpsc::Sender<usize>, id: usize| async move {
let _ = pool_tx.send(id).await;
};
if let Some(ref tx) = ctx.progress_tx {
let _ = tx.try_send(WorkerUpdate::Started {
worker_id,
order,
prefix: Arc::clone(&prefix),
attempt,
});
}
if ctx.config.cache_files {
let outcome =
process_prefix_file_cached(&ctx, worker_id, url, prefix, order, attempt, backoff_ms)
.await;
return_worker_id(ctx.worker_id_pool_tx.clone(), worker_id).await;
return outcome;
}
let jitter_ms = rand::random::<u64>() % 500;
tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
let start_time = Instant::now();
let result: Result<u64, ImportError> = async {
let mut reader =
HttpNgramReader::with_options(&url, ctx.config.skip_pos_tags, ctx.config.min_count);
let stream = reader.stream_aggregated(ctx.config.year_range);
let maybe_tx = ctx.storage.begin_prefix_tx(&prefix, order)?;
if let Some(tx) = maybe_tx {
process_aggregated_stream(stream, tx, &ctx, &prefix, order, worker_id, "prefix").await
} else {
tokio::pin!(stream);
const NGRAM_PROGRESS_INTERVAL: u64 = 50_000;
let mut local_total: u64 = 0;
let mut local_unique: u64 = 0;
let mut count = 0u64;
while let Some(result) = stream.next().await {
let agg = result?;
let storage_result = store_ngram_shared(&agg.ngram, agg.total_count, &ctx.storage)?;
count += 1;
local_total += 1;
if storage_result.is_new {
local_unique += 1;
}
if local_total >= COUNTER_BATCH_SIZE {
ctx.total_ngrams.fetch_add(local_total, Ordering::Relaxed);
if local_unique > 0 {
ctx.unique_ngrams.fetch_add(local_unique, Ordering::Relaxed);
}
local_total = 0;
local_unique = 0;
}
if count % NGRAM_PROGRESS_INTERVAL == 0 {
if let Some(ref tx) = ctx.progress_tx {
let _ = tx.try_send(WorkerUpdate::NgramProgress {
worker_id,
ngram_count: count,
});
}
}
}
if local_total > 0 {
ctx.total_ngrams.fetch_add(local_total, Ordering::Relaxed);
}
if local_unique > 0 {
ctx.unique_ngrams.fetch_add(local_unique, Ordering::Relaxed);
}
Ok(count)
}
}
.await;
return_worker_id(ctx.worker_id_pool_tx.clone(), worker_id).await;
let elapsed = start_time.elapsed();
match result {
Ok(count) => {
if let Some(ref tx) = ctx.progress_tx {
let _ = tx.try_send(WorkerUpdate::Finished {
worker_id,
order,
prefix: Arc::clone(&prefix),
ngram_count: count,
duration: elapsed,
});
}
PrefixOutcome::Success {
prefix,
ngram_count: count,
}
}
Err(e) if attempt < MAX_RETRIES && is_retryable_error(&e) => {
let next_backoff_ms = backoff_ms * 2;
tracing::debug!(
"Prefix '{}' (order {}) failed attempt {} with retryable error, deferring: {}",
prefix,
order,
attempt + 1,
e
);
if let Some(ref tx) = ctx.progress_tx {
let _ = tx.try_send(WorkerUpdate::Deferred {
worker_id,
order,
prefix: Arc::clone(&prefix),
attempt: (attempt + 1) as u32,
delay_seconds: backoff_ms / 1000,
error: Arc::from(e.to_string()),
});
}
PrefixOutcome::Deferred {
url,
prefix,
order,
attempt: attempt + 1,
backoff_ms: next_backoff_ms,
}
}
Err(e) => {
tracing::warn!(
"Prefix '{}' (order {}) failed permanently after {} attempts: {}",
prefix,
order,
attempt + 1,
e
);
PrefixOutcome::Failed {
prefix,
error: e,
attempts: (attempt + 1) as u32,
}
}
}
}
#[cfg(feature = "google-books")]
pub(super) async fn process_prefix_file_cached(
ctx: &Arc<PrefixProcessingContext>,
worker_id: usize,
url: Arc<str>,
prefix: Arc<str>,
order: u8,
attempt: u8,
backoff_ms: u64,
) -> PrefixOutcome {
use super::super::reader::stream_aggregated_from_cached_file;
use tokio_stream::StreamExt;
let cache_path = match ctx.config.cache_file_path(order, &prefix) {
Some(p) => p,
None => {
return PrefixOutcome::Failed {
prefix,
error: ImportError::Config(format!(
"Unknown language '{}' for cache file path",
ctx.config.language
)),
attempts: (attempt + 1) as u32,
};
}
};
let client = ctx.http_client.clone();
if let Err(e) = download_to_cache(&url, &cache_path, &client).await {
if attempt < MAX_RETRIES && is_retryable_error(&e) {
let next_backoff_ms = backoff_ms * 2;
if let Some(ref tx) = ctx.progress_tx {
let _ = tx.try_send(WorkerUpdate::Deferred {
worker_id,
order,
prefix: Arc::clone(&prefix),
attempt: (attempt + 1) as u32,
delay_seconds: backoff_ms / 1000,
error: Arc::from(e.to_string()),
});
}
return PrefixOutcome::Deferred {
url,
prefix,
order,
attempt: attempt + 1,
backoff_ms: next_backoff_ms,
};
}
return PrefixOutcome::Failed {
prefix,
error: e,
attempts: (attempt + 1) as u32,
};
}
let start_time = Instant::now();
let result: Result<u64, ImportError> = async {
let stream = stream_aggregated_from_cached_file(
&cache_path,
ctx.config.year_range,
ctx.config.skip_pos_tags,
ctx.config.min_count,
);
let maybe_tx = ctx.storage.begin_prefix_tx(&prefix, order)?;
if let Some(tx) = maybe_tx {
process_aggregated_stream(stream, tx, &ctx, &prefix, order, worker_id, "cached prefix")
.await
} else {
tokio::pin!(stream);
const NGRAM_PROGRESS_INTERVAL: u64 = 50_000;
let mut local_total: u64 = 0;
let mut local_unique: u64 = 0;
let mut count = 0u64;
while let Some(result) = stream.next().await {
let agg = result?;
let storage_result = store_ngram_shared(&agg.ngram, agg.total_count, &ctx.storage)?;
count += 1;
local_total += 1;
if storage_result.is_new {
local_unique += 1;
}
if local_total >= COUNTER_BATCH_SIZE {
ctx.total_ngrams.fetch_add(local_total, Ordering::Relaxed);
if local_unique > 0 {
ctx.unique_ngrams.fetch_add(local_unique, Ordering::Relaxed);
}
local_total = 0;
local_unique = 0;
}
if count % NGRAM_PROGRESS_INTERVAL == 0 {
if let Some(ref ptx) = ctx.progress_tx {
let _ = ptx.try_send(WorkerUpdate::NgramProgress {
worker_id,
ngram_count: count,
});
}
}
}
if local_total > 0 {
ctx.total_ngrams.fetch_add(local_total, Ordering::Relaxed);
}
if local_unique > 0 {
ctx.unique_ngrams.fetch_add(local_unique, Ordering::Relaxed);
}
Ok(count)
}
}
.await;
cleanup_cache_file(&cache_path).await;
let elapsed = start_time.elapsed();
match result {
Ok(count) => {
if let Some(ref tx) = ctx.progress_tx {
let _ = tx.try_send(WorkerUpdate::Finished {
worker_id,
order,
prefix: Arc::clone(&prefix),
ngram_count: count,
duration: elapsed,
});
}
PrefixOutcome::Success {
prefix,
ngram_count: count,
}
}
Err(e) if attempt < MAX_RETRIES && is_retryable_error(&e) => {
let next_backoff_ms = backoff_ms * 2;
if let Some(ref tx) = ctx.progress_tx {
let _ = tx.try_send(WorkerUpdate::Deferred {
worker_id,
order,
prefix: Arc::clone(&prefix),
attempt: (attempt + 1) as u32,
delay_seconds: backoff_ms / 1000,
error: Arc::from(e.to_string()),
});
}
PrefixOutcome::Deferred {
url,
prefix,
order,
attempt: attempt + 1,
backoff_ms: next_backoff_ms,
}
}
Err(e) => PrefixOutcome::Failed {
prefix,
error: e,
attempts: (attempt + 1) as u32,
},
}
}