#![cfg(feature = "google-books")]
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use super::super::checkpoint::ImportCheckpoint;
use super::super::storage::NgramStorage;
use super::{shutdown_signal, GoogleBooksImporter, ImportError, ImportProgress, ImportStats};
#[cfg(feature = "google-books")]
pub struct CheckpointState {
pub ngrams_processed: AtomicU64,
pub unique_ngrams: AtomicU64,
pub storage: Arc<NgramStorage>,
pub checkpoint: arc_swap::ArcSwap<ImportCheckpoint>,
pub checkpoint_in_progress: AtomicBool,
pub start_time: Instant,
}
#[cfg(feature = "google-books")]
impl CheckpointState {
pub fn perform_checkpoint(&self) -> Result<(), ImportError> {
if self.checkpoint_in_progress.swap(true, Ordering::AcqRel) {
log::debug!("Checkpoint already in progress, skipping");
return Ok(());
}
let ngrams = self.ngrams_processed.load(Ordering::Acquire);
let unique = self.unique_ngrams.load(Ordering::Acquire);
let checkpoint_guard = self.checkpoint.load();
let mut checkpoint = (**checkpoint_guard).clone();
checkpoint.stats.ngrams_processed = ngrams;
checkpoint.stats.unique_ngrams = unique;
checkpoint.stats.elapsed_seconds = self.start_time.elapsed().as_secs();
log::debug!("Periodic checkpoint: merging vocabulary and rotating WAL...");
self.storage
.merge_and_rotate_vocabulary_wal()
.map_err(|e| {
self.checkpoint_in_progress.store(false, Ordering::Release);
ImportError::Trie(format!("Failed to merge and rotate vocabulary WAL: {}", e))
})?;
log::debug!("Periodic checkpoint: syncing shards...");
self.storage.sync_parallel(8).map_err(|e| {
self.checkpoint_in_progress.store(false, Ordering::Release);
ImportError::Trie(format!("Failed to sync storage: {}", e))
})?;
self.storage.checkpoint_parallel(8).map_err(|e| {
self.checkpoint_in_progress.store(false, Ordering::Release);
ImportError::Trie(format!("Failed to checkpoint storage: {}", e))
})?;
log::debug!("Periodic checkpoint: saving metadata...");
if let Err(e) = self.storage.save_import_checkpoint(&checkpoint) {
self.checkpoint_in_progress.store(false, Ordering::Release);
return Err(ImportError::Trie(format!(
"Failed to save checkpoint to trie: {}",
e
)));
}
self.checkpoint.store(Arc::new(checkpoint));
self.checkpoint_in_progress.store(false, Ordering::Release);
log::info!("Periodic checkpoint completed: {} n-grams", ngrams);
Ok(())
}
pub fn is_checkpoint_in_progress(&self) -> bool {
self.checkpoint_in_progress.load(Ordering::Acquire)
}
}
#[cfg(feature = "google-books")]
pub async fn run_import_with_periodic_checkpoints<F>(
importer: GoogleBooksImporter,
progress: F,
checkpoint_interval_ms: u64,
) -> Result<ImportStats, ImportError>
where
F: FnMut(ImportProgress) + Send + 'static,
{
use crate::util::cron::spawn_cron_with_interval;
use std::sync::atomic::Ordering as AtomicOrdering;
let terminating = Arc::new(AtomicBool::new(false));
let checkpoint_state = Arc::new(CheckpointState {
ngrams_processed: AtomicU64::new(importer.total_ngrams.load(Ordering::Relaxed)),
unique_ngrams: AtomicU64::new(importer.unique_ngrams.load(Ordering::Relaxed)),
storage: Arc::clone(&importer.storage),
checkpoint: arc_swap::ArcSwap::from_pointee(importer.checkpoint.clone()),
checkpoint_in_progress: AtomicBool::new(false),
start_time: importer.start_time,
});
let (cron_handle, cron_thread, cron_stats, _cron_ready) =
spawn_cron_with_interval(Arc::clone(&terminating), 50);
let checkpoint_state_for_cron = Arc::clone(&checkpoint_state);
let checkpoint_interval = checkpoint_interval_ms;
cron_handle.schedule_recurring(
checkpoint_interval,
checkpoint_interval,
"periodic-checkpoint",
move || {
match checkpoint_state_for_cron.perform_checkpoint() {
Ok(()) => true,
Err(e) => {
log::error!("Periodic checkpoint failed: {}", e);
true
}
}
},
);
let importer_ref = Arc::new(parking_lot::Mutex::new(importer));
let importer_for_shutdown = Arc::clone(&importer_ref);
let checkpoint_state_for_shutdown = Arc::clone(&checkpoint_state);
let terminating_for_shutdown = Arc::clone(&terminating);
let shutdown_handle = tokio::spawn(async move {
shutdown_signal().await;
eprintln!();
log::warn!("╔══════════════════════════════════════════════════════════╗");
log::warn!("║ Shutdown signal received - saving progress... ║");
log::warn!("║ Please wait for checkpoint to complete. ║");
log::warn!("║ Press Ctrl+C again to force quit (may lose progress). ║");
log::warn!("╚══════════════════════════════════════════════════════════╝");
if checkpoint_state_for_shutdown.is_checkpoint_in_progress() {
log::info!("Waiting for in-progress checkpoint to complete...");
}
terminating_for_shutdown.store(true, AtomicOrdering::Release);
if let Some(importer) = importer_for_shutdown.try_lock() {
importer.interrupt();
}
});
let result = {
let mut importer = importer_ref.lock();
let checkpoint_state_for_progress = Arc::clone(&checkpoint_state);
let mut user_progress = progress;
let progress_wrapper = move |p: ImportProgress| {
checkpoint_state_for_progress
.ngrams_processed
.store(p.total_ngrams, AtomicOrdering::Release);
user_progress(p);
};
importer.import_http(progress_wrapper).await
};
terminating.store(true, AtomicOrdering::Release);
log::info!("Stopping periodic checkpoint scheduler...");
if let Err(e) = cron_thread.join() {
log::warn!("Cron thread panicked: {:?}", e);
}
let stats = cron_stats;
log::info!(
"Cron manager stopped. Tasks executed: {}, failed: {}, panicked: {}",
stats.tasks_executed.load(AtomicOrdering::Relaxed),
stats.tasks_failed.load(AtomicOrdering::Relaxed),
stats.tasks_panicked.load(AtomicOrdering::Relaxed)
);
log::info!("╔══════════════════════════════════════════════════════════╗");
log::info!("║ Saving final checkpoint and flushing data to disk... ║");
log::info!("╚══════════════════════════════════════════════════════════╝");
log::info!(" → Syncing vocabulary WAL...");
log::info!(" → Syncing n-gram shards...");
log::info!(" → Writing checkpoint metadata...");
let checkpoint_start = Instant::now();
{
let mut importer = importer_ref.lock();
if let Err(e) = importer.save_checkpoint() {
log::error!("Final checkpoint failed: {}", e);
} else {
let elapsed = checkpoint_start.elapsed();
log::info!(
" ✓ Checkpoint saved successfully in {:.2}s",
elapsed.as_secs_f64()
);
}
}
log::info!("╔══════════════════════════════════════════════════════════╗");
log::info!("║ Shutdown complete. Safe to exit. ║");
log::info!("╚══════════════════════════════════════════════════════════╝");
shutdown_handle.abort();
result
}