use super::{
args::Backup,
bars::{forall_progress, wait_with_progress},
connection::SolrClient,
fails::{BoxedError, raise},
helpers::{IntegerHelpers, wait, wait_by},
models::{Compression, Documents, SolrCore, Step},
save::Archiver,
state::{UserInterruption, monitor_term_sinal},
steps::{Requests, Slices},
};
use crossbeam_channel::{Receiver, Sender, bounded};
use log::{debug, error, info};
use std::thread;
use std::{path::PathBuf, time::Instant};
pub(crate) fn backup_main(params: &Backup) -> BoxedError {
debug!("# BACKUP {:?}", params);
if params.options.is_quiet() {
wait_with_progress(params.transfer.delay_before, "Starting the copy...");
}
let schema = params.inspect_core()?;
let num_found = schema.num_found;
let num_retrieve = params.get_docs_to_retrieve(num_found);
let mut retrieved = 0;
info!(
"retrieving {} documents from range {} to {} of {} total returned by the query on solr core: {}.",
num_retrieve,
params.skip + 1,
params.skip + num_retrieve,
num_found,
params.options.core
);
let started = Instant::now();
thread::scope(|pool| {
let transfer = ¶ms.transfer;
let readers_channel = transfer.readers * 4;
let writers_channel = transfer.writers * 3;
let (generator, sequence) = bounded::<Step>(readers_channel.to_usize());
let (sender, receiver) = bounded::<Documents>(writers_channel.to_usize());
let (progress, reporter) = bounded::<u64>(transfer.writers.to_usize());
let gen_handle = thread::Builder::new()
.name("Generator".to_string())
.spawn_scoped(pool, || {
start_querying_core(params, &schema, generator);
})
.unwrap();
let reader_handles = start_solr_readers(pool, params, sender, sequence);
let writer_handles = start_archive_writers(pool, params, receiver, progress, num_retrieve);
let bar_handle = thread::Builder::new()
.name("Generator".to_string())
.spawn_scoped(pool, || {
retrieved = forall_progress(reporter, num_retrieve, params.options.is_quiet());
})
.unwrap();
let mut handles = vec![];
handles.push(gen_handle);
handles.extend(reader_handles);
handles.extend(writer_handles);
handles.push(bar_handle);
for handle in handles {
handle.join().unwrap();
}
});
finish_progress(started, num_retrieve, retrieved, params.transfer.delay_after)
}
fn start_solr_readers<'scope>(
pool: &'scope thread::Scope<'scope, '_>, params: &Backup, sender: Sender<Documents>,
sequence: Receiver<Step>,
) -> Vec<thread::ScopedJoinHandle<'scope, ()>> {
let merr = params.transfer.max_errors;
let delay = params.transfer.delay_per_request;
let mut handles = vec![];
for ir in 0..params.transfer.readers {
let producer = sender.clone();
let iterator = sequence.clone();
let reader = ir;
let thread_name = format!("Reader_{}", reader);
let handle = thread::Builder::new()
.name(thread_name)
.spawn_scoped(pool, move || {
debug!("Started reader #{}", reader);
start_retrieving_docs(reader, iterator, producer, merr, delay);
debug!("Finished reader #{}", reader);
})
.unwrap();
handles.push(handle);
}
drop(sequence);
drop(sender);
handles
}
fn start_archive_writers<'scope>(
pool: &'scope thread::Scope<'scope, '_>, params: &Backup, receiver: Receiver<Documents>,
progress: Sender<u64>, num_retrieve: u64,
) -> Vec<thread::ScopedJoinHandle<'scope, ()>> {
let output_pat = params.get_archive_pattern(num_retrieve);
let max = params.archive_files;
let comp = params.archive_compression;
let mut handles = vec![];
for iw in 0..params.transfer.writers {
let consumer = receiver.clone();
let updater = progress.clone();
let dir = params.transfer.dir.clone();
let name = output_pat.clone();
let writer = iw;
let thread_name = format!("Writer_{}", writer);
let handle = thread::Builder::new()
.name(thread_name)
.spawn_scoped(pool, move || {
debug!("Started writer #{}", writer);
start_storing_docs(writer, dir, name, comp, max, consumer, updater);
debug!("Finished writer #{}", writer);
})
.unwrap();
handles.push(handle);
}
drop(receiver);
drop(progress);
handles
}
fn finish_progress(
started: Instant, num_retrieve: u64, retrieved: u64, delay_after: u64,
) -> BoxedError {
let ctrl_c = monitor_term_sinal();
if ctrl_c.aborted() {
raise("# Execution aborted by user!")
} else {
if retrieved > 0 {
wait_with_progress(delay_after, "Finished exporting documents to archives...");
}
let (r, n, s) = (retrieved, num_retrieve, started.elapsed());
info!("Downloaded {} of {} documents in {:?}.", r, n, s);
Ok(())
}
}
fn start_querying_core(params: &Backup, schema: &SolrCore, generator: Sender<Step>) {
let ctrl_c = monitor_term_sinal();
let core_fields = params.merge_core_fields(schema);
let slices: Slices<String> = params.get_slices();
let partitions = slices.get_iterator();
let mut retrieved = 0u64;
'outer: for range in partitions {
let num_found = params.query_num_found(&range.begin, &range.end).unwrap_or(0);
if num_found == 0 {
continue;
}
let expected = if params.workaround_shards > 0 { num_found } else { 0 };
let num_retrieve = params.get_docs_to_retrieve(num_found);
let requests: Requests = params.get_requests_for_range(
retrieved,
num_retrieve,
&core_fields,
expected,
&range.begin,
&range.end,
);
for step in requests {
let status = generator.send(step);
if status.is_err() || ctrl_c.aborted() {
break 'outer;
}
}
retrieved += num_found;
}
drop(generator);
}
fn start_retrieving_docs(
reader: u64, iterator: Receiver<Step>, producer: Sender<Documents>, max_errors: u64, delay: u64,
) {
let ctrl_c = monitor_term_sinal();
let mut error_count = 0;
let mut client = SolrClient::new();
loop {
let received = iterator.recv();
if ctrl_c.aborted() {
break;
}
let failed = match received {
Ok(step) => retrieve_docs_from_solr(reader, &producer, step, &mut client),
Err(_) => true,
};
if failed {
if error_count < max_errors {
error_count += 1;
} else {
break;
}
}
if ctrl_c.aborted() {
break;
} else if delay > 0 {
wait_by(delay.to_usize());
}
}
drop(producer);
}
fn retrieve_docs_from_solr(
reader: u64, producer: &Sender<Documents>, step: Step, client: &mut SolrClient,
) -> bool {
let query_url = step.url.as_str();
let response = fetch_docs_from_solr(reader, client, query_url, step.expected);
match response {
Err(_) => true,
Ok(content) => {
let parsed = SolrCore::parse_docs_from_query(&content);
match parsed {
None => {
error!("Error in thread #{} parsing from solr query: {}", reader, query_url);
true
}
Some(json) => {
let docs = Documents { step, docs: json.to_string() };
let status = producer.send(docs);
status.is_err()
}
}
}
}
}
fn fetch_docs_from_solr(
reader: u64, client: &mut SolrClient, query_url: &str, expected: u64,
) -> Result<String, ()> {
let mut times = 0;
loop {
let response = client.get_as_json(query_url);
match response {
Err(cause) => {
error!("Error in thread #{} retrieving docs from solr: {}", reader, cause);
return Err(());
}
Ok(content) => {
if expected > 0 {
match SolrCore::parse_num_found(&content) {
Ok(num_found) => {
if expected != num_found.to_u64() && times < 13 {
debug!(
"#{} got num_found {} but expected {}",
times, num_found, expected
);
times += 1;
wait(times);
continue;
}
}
Err(cause) => {
error!("Error in Solr response: {}", cause);
return Err(());
}
}
}
break Ok(content);
}
}
}
}
fn start_storing_docs(
writer: u64, dir: PathBuf, name: String, compression: Compression, max: u64,
consumer: Receiver<Documents>, progress: Sender<u64>,
) {
let mut archiver = Archiver::write_on(&dir, &name, compression, max.to_usize());
loop {
let received = consumer.recv();
match received {
Ok(docs) => {
let failed = archiver.write_documents(&docs);
if let Err(cause) = failed {
error!("Error in thread #{} writing file into archive: {}", writer, cause);
break;
}
let num_docs = docs.step.curr;
let status = progress.send(num_docs);
if status.is_err() {
break;
}
}
Err(_) => break,
}
}
drop(consumer);
}