use std::{
fs::remove_file,
path::PathBuf,
str::FromStr,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc::channel,
Arc,
},
};
use anyhow::{anyhow, Context, Result};
use itertools::Itertools;
use reinfer_client::{
resources::{
attachments::AttachmentMetadata,
bucket::Id as BucketId,
dataset::{DatasetFlag, QueryRequestParams},
},
Bucket, Client, CommentFilter, CommentId, Dataset, DatasetFullName, DatasetName,
HasAnnotations, SourceId,
};
use scoped_threadpool::Pool;
use structopt::StructOpt;
use crate::{
commands::package::{
AttachmentKey, CommentBatchKey, EmailBatchKey, PackageContentId, PackageWriter,
},
progress::Progress,
DEFAULT_PROJECT_NAME,
};
use colored::Colorize;
#[derive(Debug)]
struct IxpDatasetIdentifier(DatasetFullName);
impl FromStr for IxpDatasetIdentifier {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let identifier = match DatasetFullName::from_str(s) {
Ok(full_name) => IxpDatasetIdentifier(full_name),
Err(_) => {
let full_name =
DatasetName(s.to_string()).with_project(&DEFAULT_PROJECT_NAME.clone())?;
IxpDatasetIdentifier(full_name)
}
};
Ok(identifier)
}
}
#[derive(Debug, StructOpt)]
pub struct DownloadPackageArgs {
project: IxpDatasetIdentifier,
#[structopt(short = "f", long = "file", parse(from_os_str))]
file: PathBuf,
#[structopt(long)]
overwrite: bool,
#[structopt(long = "batch-size", default_value = "128")]
batch_size: usize,
#[structopt(long = "max-attachment-memory-mb", default_value = "256")]
max_attachment_memory_mb: u64,
}
#[allow(clippy::too_many_arguments)]
fn package_bucket_contents(
bucket: &Bucket,
batch_size: usize,
package_writer: &mut PackageWriter,
client: &Client,
statistics: &Arc<Statistics>,
) -> Result<()> {
let bucket_full_name = bucket.full_name();
let bucket_emails_iter = client.get_emails_iter(&bucket_full_name, Some(batch_size));
bucket_emails_iter
.enumerate()
.try_for_each(|(idx, result)| -> Result<()> {
let emails = result.context("Failed to read email batch")?;
package_writer.write_email_batch(&bucket.id, EmailBatchKey(idx), &emails)?;
statistics.add_emails(emails.len());
Ok(())
})
}
enum SourceType {
CommunicationsMining,
UnstructedAndComplexDocs,
}
#[allow(clippy::too_many_arguments)]
fn package_source_contents(
dataset: &DatasetFullName,
source_id: &SourceId,
batch_size: usize,
package_writer: &mut PackageWriter,
client: &Client,
statistics: &Arc<Statistics>,
max_attachment_memory_mb: &u64,
pool: &mut Pool,
source_type: SourceType,
) -> Result<()> {
let mut query = QueryRequestParams {
limit: Some(batch_size),
filter: CommentFilter {
sources: vec![source_id.clone()],
..Default::default()
},
..Default::default()
};
let source_comments_iter = client.get_dataset_query_iter(dataset, &mut query);
let mut documents: Vec<(AttachmentMetadata, AttachmentKey, SourceId, CommentId)> = Vec::new();
source_comments_iter
.enumerate()
.try_for_each(|(idx, result)| -> Result<()> {
let comments = result.context("Failed to read comment batch")?;
package_writer
.write_comment_batch(source_id, CommentBatchKey(idx), &comments)
.context("Failed to write comment batch")?;
statistics.add_comments(comments.len());
statistics.add_annotations(comments.iter().filter(|c| c.has_annotations()).count());
if matches!(source_type, SourceType::UnstructedAndComplexDocs) {
documents.append(
&mut comments
.iter()
.flat_map(|comment| {
comment
.comment
.clone()
.attachments
.into_iter()
.enumerate()
.map(|(idx, attachment)| {
(
attachment,
AttachmentKey(idx),
source_id.clone(),
comment.comment.id.clone(),
)
})
})
.collect(),
);
if should_package_batch_of_documents(&documents, max_attachment_memory_mb) {
package_batch_of_documents(
&documents,
statistics,
package_writer,
client,
pool,
)
.context("Failed to package document batch")?;
documents.clear();
}
}
Ok(())
})?;
package_batch_of_documents(&documents, statistics, package_writer, client, pool)
}
fn should_package_batch_of_documents(
documents: &[(AttachmentMetadata, AttachmentKey, SourceId, CommentId)],
max_attachment_memory_mb: &u64,
) -> bool {
documents
.iter()
.map(|(attachment, _, _, _)| attachment.size)
.sum::<u64>()
/ 1_000_000
>= *max_attachment_memory_mb
}
fn package_batch_of_documents(
documents: &[(AttachmentMetadata, AttachmentKey, SourceId, CommentId)],
statistics: &Arc<Statistics>,
package_writer: &mut PackageWriter,
client: &Client,
pool: &mut Pool,
) -> Result<()> {
let (error_sender, error_receiver) = channel();
let (document_sender, document_receiver) = channel();
pool.scoped(|scope| {
documents
.iter()
.for_each(|(metadata, key, source_id, comment_id)| {
let error_sender = error_sender.clone();
let sender = document_sender.clone();
scope.execute(move || {
let result = client.get_ixp_document(source_id, comment_id);
let extension = metadata.extension();
match result {
Err(err) => {
error_sender.send(err).expect("Could not send error");
}
Ok(attachment) => {
statistics.add_document_download(1);
sender
.send((
PackageContentId::Document {
source_id,
comment_id,
key,
extension,
},
attachment,
))
.expect("could not send attachment");
}
}
});
});
});
if let Ok(error) = error_receiver.try_recv() {
return Err(anyhow!("Failed to download document {}", error));
};
drop(document_sender);
document_receiver
.iter()
.try_for_each(|(content_id, content)| -> Result<()> {
package_writer
.write_bytes(content_id, &content)
.context("Failed to write document bytes")?;
statistics.add_document_write(1);
Ok(())
})?;
Ok(())
}
fn get_comment_count(dataset: &Dataset, client: &Client) -> Result<u64> {
let total_comments = *client
.get_dataset_statistics(&dataset.full_name(), &Default::default())
.context("Failed to get dataset statistics")?
.num_comments as u64;
Ok(total_comments)
}
fn package_cm_project(
client: &Client,
dataset: &Dataset,
file: &PathBuf,
batch_size: &usize,
max_attachment_memory_mb: &u64,
pool: &mut Pool,
) -> Result<()> {
let total_comments =
get_comment_count(dataset, client).context("Failed to get comment count")?;
let mut package_writer =
PackageWriter::new(file.into()).context("Failed to create new package writer")?;
package_writer
.write_dataset(dataset)
.context("Failed to write dataset")?;
let buckets: Vec<Bucket> = dataset
.source_ids
.iter()
.map(|source_id| -> Result<Option<BucketId>> {
let source = client.get_source(source_id.clone())?;
Ok(source.bucket_id)
})
.try_collect::<Option<BucketId>, Vec<Option<BucketId>>, anyhow::Error>()?
.into_iter()
.flatten()
.map(|bucket_id| client.get_bucket(bucket_id.clone()))
.try_collect()?;
let statistics = Arc::new(Statistics::new());
let _progress_bar = get_cm_progress_bar(total_comments, !buckets.is_empty(), &statistics);
for bucket in buckets {
package_writer.write_bucket(&bucket)?;
package_bucket_contents(
&bucket,
*batch_size,
&mut package_writer,
client,
&statistics,
)?;
}
for source_id in &dataset.source_ids {
let source = client
.get_source(source_id.clone())
.context("Failed to get source")?;
package_writer
.write_source(&source)
.context("Failed to write source")?;
package_source_contents(
&dataset.full_name(),
source_id,
*batch_size,
&mut package_writer,
client,
&statistics,
max_attachment_memory_mb,
pool,
SourceType::CommunicationsMining,
)?;
}
Ok(())
}
fn package_ixp_project(
client: &Client,
dataset: &Dataset,
file: &PathBuf,
batch_size: &usize,
max_attachment_memory_mb: &u64,
pool: &mut Pool,
) -> Result<()> {
let total_comments =
get_comment_count(dataset, client).context("Failed to get comment count")?;
let statistics = Arc::new(Statistics::new());
let _progress_bar = get_ixp_progress_bar(total_comments, &statistics);
let mut package_writer =
PackageWriter::new(file.into()).context("Failed to create new package writer")?;
package_writer
.write_dataset(dataset)
.context("Failed to write dataset")?;
for source_id in &dataset.source_ids {
let source = client
.get_source(source_id.clone())
.context("Failed to get source")?;
package_writer
.write_source(&source)
.context("Failed to write source")?;
package_source_contents(
&dataset.full_name(),
source_id,
*batch_size,
&mut package_writer,
client,
&statistics,
max_attachment_memory_mb,
pool,
SourceType::UnstructedAndComplexDocs,
)
.context("Failed to package source context")?;
}
package_writer
.finish()
.context("Failed to finish package writer")?;
log::info!("Package exported");
Ok(())
}
pub fn run(args: &DownloadPackageArgs, client: &Client, pool: &mut Pool) -> Result<()> {
let DownloadPackageArgs {
project,
file,
overwrite,
batch_size,
max_attachment_memory_mb,
} = args;
if *overwrite && file.is_file() {
remove_file(file).context("Failed to remove existing file")?;
}
let dataset = client.get_dataset(project.0.clone()).context(format!(
"Could not get project with the name {0}",
project.0
))?;
if dataset.has_flag(DatasetFlag::Ixp) {
package_ixp_project(
client,
&dataset,
file,
batch_size,
max_attachment_memory_mb,
pool,
)
} else {
package_cm_project(
client,
&dataset,
file,
batch_size,
max_attachment_memory_mb,
pool,
)
}
}
#[derive(Debug)]
pub struct Statistics {
comments: AtomicUsize,
emails: AtomicUsize,
document_downloads: AtomicUsize,
document_writes: AtomicUsize,
annotations: AtomicUsize,
}
impl Statistics {
fn new() -> Self {
Self {
comments: AtomicUsize::new(0),
emails: AtomicUsize::new(0),
annotations: AtomicUsize::new(0),
document_downloads: AtomicUsize::new(0),
document_writes: AtomicUsize::new(0),
}
}
#[inline]
fn add_comments(&self, num: usize) {
self.comments.fetch_add(num, Ordering::SeqCst);
}
#[inline]
fn add_emails(&self, num: usize) {
self.emails.fetch_add(num, Ordering::SeqCst);
}
#[inline]
fn add_document_download(&self, num: usize) {
self.document_downloads.fetch_add(num, Ordering::SeqCst);
}
#[inline]
fn add_document_write(&self, num: usize) {
self.document_writes.fetch_add(num, Ordering::SeqCst);
}
#[inline]
fn add_annotations(&self, num: usize) {
self.annotations.fetch_add(num, Ordering::SeqCst);
}
#[inline]
fn num_comments(&self) -> usize {
self.comments.load(Ordering::SeqCst)
}
#[inline]
fn num_emails(&self) -> usize {
self.emails.load(Ordering::SeqCst)
}
#[inline]
fn num_document_downloaded(&self) -> usize {
self.document_downloads.load(Ordering::SeqCst)
}
#[inline]
fn num_document_writes(&self) -> usize {
self.document_writes.load(Ordering::SeqCst)
}
#[inline]
fn num_annotations(&self) -> usize {
self.annotations.load(Ordering::SeqCst)
}
}
fn get_cm_progress_bar(
total_comments: u64,
has_buckets: bool,
statistics: &Arc<Statistics>,
) -> Progress {
let total = if has_buckets {
total_comments * 2
} else {
total_comments
};
Progress::new(
move |statistics| {
let num_comments = statistics.num_comments();
let num_annotations = statistics.num_annotations();
let num_emails = statistics.num_emails();
(
(num_comments + num_emails) as u64,
format!(
"{} {} {} {} {} {}",
num_comments.to_string().bold(),
"comments".dimmed(),
num_annotations.to_string().bold(),
"annotations".dimmed(),
num_emails.to_string().bold(),
"emails downloaded".dimmed(),
),
)
},
statistics,
Some(total),
crate::progress::Options { bytes_units: false },
)
}
fn get_ixp_progress_bar(total_comments: u64, statistics: &Arc<Statistics>) -> Progress {
Progress::new(
move |statistics| {
let num_comments = statistics.num_comments();
let num_docs_downloaded = statistics.num_document_downloaded();
let num_annotations = statistics.num_annotations();
let num_docs_written = statistics.num_document_writes();
(
((num_comments + num_docs_downloaded + num_docs_written) / 3) as u64,
format!(
"{} {} {} {} {} {} {} {}",
num_comments.to_string().bold(),
"comments".dimmed(),
num_annotations.to_string().bold(),
"annotations".dimmed(),
num_docs_downloaded.to_string().bold(),
"docs downloaded".dimmed(),
num_docs_written.to_string().bold(),
"docs written".dimmed(),
),
)
},
statistics,
Some(total_comments),
crate::progress::Options { bytes_units: false },
)
}