use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use colored::Colorize;
use log::info;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use structopt::StructOpt;
use reinfer_client::{
resources::{bucket::GetKeyedSyncStateIdsRequest, project::ForceDeleteProject},
BucketIdentifier, Client, CommentId, CommentsIter, CommentsIterTimerange, DatasetIdentifier,
ProjectName, Source, SourceIdentifier, UserIdentifier,
};
use crate::progress::{Options as ProgressOptions, Progress};
#[derive(Debug, StructOpt)]
pub enum DeleteArgs {
#[structopt(name = "source")]
Source {
#[structopt(name = "source")]
source: SourceIdentifier,
},
#[structopt(name = "comments")]
Comments {
#[structopt(short = "s", long = "source")]
source: SourceIdentifier,
#[structopt(name = "comment id")]
comments: Vec<CommentId>,
},
#[structopt(name = "bulk")]
BulkComments {
#[structopt(short = "s", long = "source")]
source: SourceIdentifier,
#[structopt(long, parse(try_from_str))]
include_annotated: bool,
#[structopt(long)]
from_timestamp: Option<DateTime<Utc>>,
#[structopt(long)]
to_timestamp: Option<DateTime<Utc>>,
#[structopt(long)]
no_progress: bool,
},
#[structopt(name = "bucket")]
Bucket {
#[structopt(name = "bucket")]
bucket: BucketIdentifier,
},
#[structopt(name = "dataset")]
Dataset {
#[structopt(name = "dataset")]
dataset: DatasetIdentifier,
},
#[structopt(name = "user")]
User {
#[structopt(name = "user")]
user: UserIdentifier,
},
#[structopt(name = "project")]
Project {
#[structopt(name = "project")]
project: ProjectName,
#[structopt(long)]
force: bool,
},
#[structopt(name = "keyed-sync-states")]
KeyedSyncStates {
bucket: BucketIdentifier,
mailbox_name: String,
},
}
pub fn run(delete_args: &DeleteArgs, client: Client) -> Result<()> {
match delete_args {
DeleteArgs::Source { source } => {
client
.delete_source(source.clone())
.context("Operation to delete source has failed.")?;
log::info!("Deleted source.");
}
DeleteArgs::User { user } => {
client
.delete_user(user.clone())
.context("Operation to delete user has failed.")?;
log::info!("Deleted user.");
}
DeleteArgs::Comments { source, comments } => {
client
.delete_comments(source.clone(), comments)
.context("Operation to delete comments has failed.")?;
log::info!("Deleted comments.");
}
DeleteArgs::BulkComments {
source: source_identifier,
include_annotated,
from_timestamp,
to_timestamp,
no_progress,
} => {
let source = client.get_source(source_identifier.clone())?;
let show_progress = !no_progress;
delete_comments_in_period(
&client,
source,
*include_annotated,
CommentsIterTimerange {
from: *from_timestamp,
to: *to_timestamp,
},
show_progress,
)
.context("Operation to delete comments has failed.")?;
}
DeleteArgs::Dataset { dataset } => {
client
.delete_dataset(dataset.clone())
.context("Operation to delete dataset has failed.")?;
log::info!("Deleted dataset.");
}
DeleteArgs::Bucket { bucket } => {
client
.delete_bucket(bucket.clone())
.context("Operation to delete bucket has failed.")?;
log::info!("Deleted bucket.");
}
DeleteArgs::Project { project, force } => {
let force_delete = if *force {
ForceDeleteProject::Yes
} else {
ForceDeleteProject::No
};
client
.delete_project(project, force_delete)
.context("Operation to delete project has failed.")?;
log::info!("Deleted project.");
}
DeleteArgs::KeyedSyncStates {
bucket,
mailbox_name,
} => {
let bucket = client.get_bucket(bucket.clone())?;
let keyed_sync_state_ids = client.get_keyed_sync_state_ids(
&bucket.id,
&GetKeyedSyncStateIdsRequest {
mailbox_name: mailbox_name.clone(),
},
)?;
for id in keyed_sync_state_ids {
client.delete_keyed_sync_state(&bucket.id, &id)?;
info!("Delete keyed sync state {}", id.0)
}
}
};
Ok(())
}
fn delete_comments_in_period(
client: &Client,
source: Source,
include_annotated: bool,
timerange: CommentsIterTimerange,
show_progress: bool,
) -> Result<()> {
log::info!(
"Deleting comments in source `{}`{} (include-annotated: {})",
source.full_name().0,
match (timerange.from, timerange.to) {
(None, None) => "".into(),
(Some(start), None) => format!(" after {start}"),
(None, Some(end)) => format!(" before {end}"),
(Some(start), Some(end)) => format!(" in range {start} -> {end}"),
},
include_annotated,
);
let statistics = Arc::new(Statistics::new());
{
let _progress = if show_progress {
Some(delete_comments_progress_bar(&statistics))
} else {
None
};
const DELETION_BATCH_SIZE: usize = 32;
let mut comments_to_delete =
Vec::with_capacity(DELETION_BATCH_SIZE + CommentsIter::MAX_PAGE_SIZE);
let delete_batch = |comment_ids: Vec<CommentId>| -> Result<()> {
client
.delete_comments(&source, &comment_ids)
.context("Operation to delete comments failed")?;
statistics.increment_deleted(comment_ids.len());
Ok(())
};
client
.get_comments_iter(
&source.full_name(),
Some(CommentsIter::MAX_PAGE_SIZE),
timerange,
)
.try_for_each(|page| -> Result<()> {
let page = page.context("Operation to get comments failed")?;
let num_comments = page.len();
let comment_ids = page
.into_iter()
.filter_map(|comment| {
if !include_annotated && comment.has_annotations {
None
} else {
Some(comment.id)
}
})
.collect::<Vec<_>>();
let num_skipped = num_comments - comment_ids.len();
statistics.increment_skipped(num_skipped);
comments_to_delete.extend(comment_ids);
while comments_to_delete.len() >= DELETION_BATCH_SIZE {
let remainder = comments_to_delete.split_off(DELETION_BATCH_SIZE);
delete_batch(std::mem::replace(&mut comments_to_delete, remainder))?;
}
Ok(())
})?;
if !comments_to_delete.is_empty() {
assert!(comments_to_delete.len() < DELETION_BATCH_SIZE);
delete_batch(comments_to_delete)?;
}
}
log::info!(
"Deleted {} comments (skipped {}).",
statistics.deleted(),
statistics.skipped()
);
Ok(())
}
#[derive(Debug)]
pub struct Statistics {
deleted: AtomicUsize,
skipped: AtomicUsize,
}
impl Statistics {
fn new() -> Self {
Self {
deleted: AtomicUsize::new(0),
skipped: AtomicUsize::new(0),
}
}
#[inline]
fn increment_deleted(&self, num_deleted: usize) {
self.deleted.fetch_add(num_deleted, Ordering::SeqCst);
}
#[inline]
fn increment_skipped(&self, num_skipped: usize) {
self.skipped.fetch_add(num_skipped, Ordering::SeqCst);
}
#[inline]
fn deleted(&self) -> usize {
self.deleted.load(Ordering::SeqCst)
}
#[inline]
fn skipped(&self) -> usize {
self.skipped.load(Ordering::SeqCst)
}
}
fn delete_comments_progress_bar(statistics: &Arc<Statistics>) -> Progress {
Progress::new(
move |statistics| {
let num_deleted = statistics.deleted() as u64;
let num_skipped = statistics.skipped() as u64;
(
num_deleted + num_skipped,
format!(
"{} {} [{} {}] total",
num_deleted.to_string().bold(),
"deleted".dimmed(),
num_skipped,
"skipped".dimmed()
),
)
},
statistics,
None,
ProgressOptions { bytes_units: false },
)
}