use std::time::Duration;
use anyhow::Result;
use indicatif::ProgressBar;
use cloudflare::endpoints::workerskv::delete_bulk::DeleteBulk;
use cloudflare::endpoints::workerskv::write_bulk::KeyValuePair;
use cloudflare::endpoints::workerskv::write_bulk::WriteBulk;
use cloudflare::framework::apiclient::ApiClient;
use cloudflare::framework::auth::Credentials;
use cloudflare::framework::{HttpApiClient, HttpApiClientConfig};
use crate::commands::kv::format_error;
use crate::http;
use crate::http::feature::headers;
use crate::settings::global_user::GlobalUser;
use crate::settings::toml::Target;
const API_MAX_PAIRS: usize = 10000;
pub const BATCH_KEY_MAX: usize = API_MAX_PAIRS / 2;
const UPLOAD_MAX_SIZE: usize = 50 * 1024 * 1024;
fn bulk_api_client(user: &GlobalUser) -> Result<HttpApiClient> {
let config = HttpApiClientConfig {
http_timeout: Duration::from_secs(5 * 60),
default_headers: headers(None),
};
let environment = http::get_environment()?;
HttpApiClient::new(Credentials::from(user.to_owned()), config, environment)
}
pub fn put(
target: &Target,
user: &GlobalUser,
namespace_id: &str,
pairs: Vec<KeyValuePair>,
progress_bar: &Option<ProgressBar>,
) -> Result<()> {
let client = bulk_api_client(user)?;
for b in batch_keys_values(pairs) {
match client.request(&WriteBulk {
account_identifier: target.account_id.load()?,
namespace_identifier: namespace_id,
bulk_key_value_pairs: b.to_owned(),
}) {
Ok(_) => {}
Err(e) => anyhow::bail!("{}", format_error(e)),
}
if let Some(pb) = &progress_bar {
pb.inc(b.len() as u64);
}
}
Ok(())
}
pub fn delete(
target: &Target,
user: &GlobalUser,
namespace_id: &str,
keys: Vec<String>,
progress_bar: &Option<ProgressBar>,
) -> Result<()> {
let client = bulk_api_client(user)?;
for b in batch_keys(keys) {
match client.request(&DeleteBulk {
account_identifier: target.account_id.load()?,
namespace_identifier: namespace_id,
bulk_keys: b.to_owned(),
}) {
Ok(_) => {}
Err(e) => anyhow::bail!("{}", format_error(e)),
}
if let Some(pb) = &progress_bar {
pb.inc(b.len() as u64);
}
}
Ok(())
}
fn batch_keys_values(mut pairs: Vec<KeyValuePair>) -> Vec<Vec<KeyValuePair>> {
let mut batches: Vec<Vec<KeyValuePair>> = Vec::new();
if !pairs.is_empty() {
let mut key_count = 0;
let mut key_pair_bytes = 0;
let mut key_value_batch: Vec<KeyValuePair> = Vec::new();
while !(pairs.is_empty() && key_value_batch.is_empty()) {
if pairs.is_empty() {
batches.push(key_value_batch.to_vec());
key_value_batch.clear();
} else {
let pair = pairs.pop().unwrap();
if key_count + 1 > BATCH_KEY_MAX
|| key_pair_bytes + pair.key.len() + pair.value.len() > UPLOAD_MAX_SIZE
{
batches.push(key_value_batch.to_vec());
key_count = 0;
key_pair_bytes = 0;
key_value_batch.clear();
}
key_count += 1;
key_pair_bytes = key_pair_bytes + pair.key.len() + pair.value.len();
key_value_batch.push(pair);
}
}
}
batches
}
fn batch_keys(mut keys: Vec<String>) -> Vec<Vec<String>> {
let mut batches = Vec::new();
while !keys.is_empty() {
let k: Vec<String> = if keys.len() > BATCH_KEY_MAX {
keys.drain(0..BATCH_KEY_MAX).collect()
} else {
keys.drain(0..).collect()
};
batches.push(k);
}
batches
}