use super::{
args::{ParallelArgs, Restore},
bars::*,
connection::SolrClient,
fails::*,
helpers::*,
ingest::*,
state::*,
};
use crossbeam_channel::{Receiver, Sender, bounded};
use log::{debug, error, info, trace};
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use std::thread;
use std::{path::Path, path::PathBuf, time::Instant};
pub(crate) fn restore_main(params: &Restore) -> BoxedError {
debug!("# RESTORE {:?}", params);
let found = params.find_archives()?;
if found.is_empty() {
throw(format!(
"Found no archives to restore from: {}\n note: try to specify the option --pattern \
with the source core name",
params.get_pattern()
))?;
}
let core = params.options.core.clone();
info!(
"Found {} zip archives in {:?} for updating into core {:?}",
found.len(),
params.transfer.dir,
core
);
if params.options.is_quiet() {
wait_with_progress(
params.transfer.delay_before,
&format!("Starting restore for core {}...", core),
);
}
pre_post_processing(params, false)?;
let started = Instant::now();
let updated = unzip_archives_and_send(params, &found)?;
info!("Updated {} batches in solr core {} in {:?}.", updated, core, started.elapsed());
pre_post_processing(params, true)?;
if updated > 0 {
wait_with_progress(params.transfer.delay_after, "Restoring documents...");
}
Ok(())
}
fn unzip_archives_and_send(params: &Restore, found: &[PathBuf]) -> BoxedResult<u64> {
let doc_count = estimate_batch_count(found)?;
let mut updated = 0;
let core = params.options.core.clone();
info!("Estimated {} batches for indexing in solr core {}", doc_count, core);
thread::scope(|pool| {
let transfer = ¶ms.transfer;
let readers_channel = transfer.readers * 2;
let writers_channel = transfer.writers * 2;
let (generator, sequence) = bounded::<&Path>(readers_channel.to_usize());
let (sender, receiver) = bounded::<Docs>(writers_channel.to_usize());
let (progress, reporter) = bounded::<u64>(transfer.writers.to_usize());
let scan_handle = thread::Builder::new()
.name("Scanner".to_string())
.spawn_scoped(pool, || {
start_listing_archives(found, generator);
})
.unwrap();
let reader_handles = start_archive_readers(pool, transfer, sequence, sender);
let update_hadler_url = params.get_update_url();
debug!("Solr Update Handler: {}", update_hadler_url);
let writer_handles =
start_archive_writers(pool, transfer, receiver, progress, update_hadler_url);
let bar_handle = thread::Builder::new()
.name("Generator".to_string())
.spawn_scoped(pool, || {
updated = foreach_progress(reporter, doc_count, params.options.is_quiet());
})
.unwrap();
let mut handles = vec![];
handles.push(scan_handle);
handles.extend(reader_handles);
handles.extend(writer_handles);
handles.push(bar_handle);
for handle in handles {
handle.join().unwrap();
}
});
if updated > 0 && !params.no_final_commit {
crate::commit::commit_main(¶ms.options.to_command())?;
}
finish_progress(updated)
}
fn start_archive_readers<'scope>(
pool: &'scope thread::Scope<'scope, '_>, transfer: &ParallelArgs,
sequence: Receiver<&'scope Path>, sender: Sender<Docs>,
) -> Vec<thread::ScopedJoinHandle<'scope, ()>> {
let mut handles = vec![];
for ir in 0..transfer.readers {
let producer = sender.clone();
let iterator = sequence.clone();
let reader = ir;
let thread_name = format!("Reader_{}", reader);
let handle = thread::Builder::new()
.name(thread_name)
.spawn_scoped(pool, move || {
debug!("Started reader #{}", reader);
start_reading_archive(reader, iterator, producer);
debug!("Finished reader #{}", reader);
})
.unwrap();
handles.push(handle);
}
drop(sequence);
drop(sender);
handles
}
fn start_archive_writers<'scope>(
pool: &'scope thread::Scope<'scope, '_>, transfer: &ParallelArgs, receiver: Receiver<Docs>,
progress: Sender<u64>, update_hadler_url: String,
) -> Vec<thread::ScopedJoinHandle<'scope, ()>> {
let update_errors = Arc::new(AtomicU64::new(0));
let merr = transfer.max_errors;
let delay = transfer.delay_per_request;
let mut handles = vec![];
for iw in 0..transfer.writers {
let consumer = receiver.clone();
let updater = progress.clone();
let arcerr = Arc::clone(&update_errors);
let url = update_hadler_url.clone();
let writer = iw;
let thread_name = format!("Writer_{}", writer);
let handle = thread::Builder::new()
.name(thread_name)
.spawn_scoped(pool, move || {
debug!("Started writer #{}", writer);
start_indexing_docs(writer, &url, consumer, updater, &arcerr, merr, delay);
debug!("Finished writer #{}", writer);
})
.unwrap();
handles.push(handle);
}
drop(receiver);
drop(progress);
handles
}
fn finish_progress(updated: u64) -> BoxedResult<u64> {
let ctrl_c = monitor_term_sinal();
if ctrl_c.aborted() { raise("# Execution aborted by user!") } else { Ok(updated) }
}
fn estimate_batch_count(found: &[PathBuf]) -> BoxedResult<u64> {
let zip_count = found.len();
let first = found.first().unwrap();
let file_count = ArchiveReader::get_archive_file_count(first);
match file_count {
None => throw(format!("Error opening archive: {:?}", first))?,
Some(doc_count) => {
let doc_total = doc_count * zip_count;
Ok(doc_total.to_u64())
}
}
}
fn pre_post_processing(params: &Restore, enable: bool) -> BoxedResult<()> {
let core = params.options.core.as_str();
if params.disable_replication {
let (verb, handler_path) = if enable {
("enabling", "replication?command=enablereplication")
} else {
("disabling", "replication?command=disablereplication")
};
info!("Now {} replication in {}.", verb, core);
let url = params.options.get_core_handler_url(handler_path);
SolrClient::send_get_as_json(&url)?;
}
Ok(())
}
fn start_listing_archives<'a>(found: &'a [PathBuf], generator: Sender<&'a Path>) {
let archives = found.iter();
for archive in archives {
let status = generator.send(archive);
if status.is_err() {
break;
}
}
drop(generator);
}
fn start_reading_archive(reader: u64, iterator: Receiver<&Path>, producer: Sender<Docs>) {
let ctrl_c = monitor_term_sinal();
loop {
let received = iterator.recv();
if received.is_err() || ctrl_c.aborted() {
break;
}
let archive_path = received.unwrap();
let failed = handle_reading_archive(reader, &producer, archive_path, &ctrl_c);
if failed || ctrl_c.aborted() {
break;
}
}
drop(producer);
}
fn handle_reading_archive(
reader: u64, producer: &Sender<Docs>, archive_path: &Path, ctrl_c: &Arc<AtomicBool>,
) -> bool {
let zip_name: String = get_filename(archive_path).unwrap();
trace!("Reading zip archive: {}", zip_name);
let can_open = ArchiveReader::create_reader(archive_path);
match can_open {
Ok(archive_reader) => {
for (entry_name, entry_contents) in archive_reader {
trace!(" Uncompressing json: '{}' from '{}'", entry_name, zip_name);
let docs = Docs::new(zip_name.clone(), entry_name, entry_contents);
let status = producer.send(docs);
if status.is_err() || ctrl_c.aborted() {
return true;
}
}
false
}
Err(cause) => {
error!("Error in thread #{} while reading docs in zip: {}", reader, cause);
true
}
}
}
fn start_indexing_docs(
writer: u64, url: &str, consumer: Receiver<Docs>, progress: Sender<u64>,
error_count: &Arc<AtomicU64>, max_errors: u64, delay: u64,
) {
let ctrl_c = monitor_term_sinal();
let mut client = SolrClient::new();
loop {
let received = consumer.recv();
if received.is_err() || ctrl_c.aborted() {
break;
}
let docs = received.unwrap();
let failed =
send_to_solr(docs, writer, url, &mut client, &progress, error_count, max_errors);
if failed || ctrl_c.aborted() {
break;
} else if delay > 0 {
wait_by(delay.to_usize());
}
}
drop(consumer);
}
fn send_to_solr(
docs: Docs, writer: u64, url: &str, client: &mut SolrClient, progress: &Sender<u64>,
error_count: &Arc<AtomicU64>, max_errors: u64,
) -> bool {
let failed = client.post_as_json(url, docs.json.as_str());
if let Err(cause) = failed {
let current = error_count.fetch_add(1, Ordering::SeqCst);
error!(
"Error #{}/{} in thread #{} when indexing solr core:\n{}{:?}",
current, max_errors, writer, cause, docs
);
current > max_errors
} else {
let status = progress.send(1);
status.is_err()
}
}
#[cfg(test)]
mod tests {
use crate::{
args::{Cli, Commands, Restore},
fails::{BoxedResult, raise},
};
use log::debug;
use pretty_assertions::assert_eq;
impl Commands {
pub(crate) fn put(&self) -> BoxedResult<&Restore> {
match &self {
Self::Restore(puts) => Ok(&puts),
_ => raise("command must be 'restore' !"),
}
}
}
#[test]
fn check_restore_pattern() {
let parsed = Cli::mockup_args_restore();
let puts = parsed.put().unwrap();
let wilcard = puts.get_pattern();
assert_eq!(wilcard.ends_with(".zip"), true);
}
#[test]
fn check_restore_iterator() {
let parsed = Cli::mockup_args_restore();
let puts = parsed.put().unwrap();
for zip in puts.find_archives().unwrap() {
debug!("{:?}", zip);
let path = zip.to_str().unwrap();
assert_eq!(path.ends_with(".zip"), true);
}
}
}