use crate::client::PutError;
use futures::stream::{self, StreamExt};
use std::future::Future;
pub(crate) async fn process_tasks_with_max_concurrency<I, R>(tasks: I, batch_size: usize) -> Vec<R>
where
I: IntoIterator,
I::Item: Future<Output = R> + Send,
R: Send,
{
let tasks: Vec<_> = tasks.into_iter().collect();
info!(
"Processing {} tasks with max concurrency of {batch_size}",
tasks.len()
);
let result: Vec<R> = stream::iter(tasks)
.buffer_unordered(batch_size)
.collect()
.await;
info!("Completed {} tasks in parallel.", result.len());
result
}
pub(crate) fn extract_gas_values(err_str: &str) -> Option<(String, String)> {
if let Some(max_fee_start) = err_str.find("maxFeePerGas: ") {
let max_fee_str = &err_str[max_fee_start + 14..];
if let Some(comma_pos) = max_fee_str.find(',') {
let max_fee = &max_fee_str[..comma_pos];
if let Some(base_fee_start) = err_str.find("baseFee: ") {
let base_fee_str = &err_str[base_fee_start + 9..];
let base_fee = base_fee_str.split(|c: char| !c.is_numeric()).next()?;
return Some((max_fee.to_string(), base_fee.to_string()));
}
}
}
None
}
pub(crate) fn format_upload_error(err: &PutError) -> String {
let err_str = format!("{err:?}");
if err_str.contains("max fee per gas less than block base fee") {
if let Some((max_fee, base_fee)) = extract_gas_values(&err_str) {
format!(
"❌ Gas fee too low!\n💰 Your max fee per gas: {max_fee} wei\n📈 Network base fee: {base_fee} wei\n💡 Increase your --max-fee-per-gas if you want the upload to be executed faster",
)
} else {
"💸 Gas fee too low - current base fee exceeds your setting".to_string()
}
} else if err_str.contains("insufficient funds") {
"💰 Insufficient funds for transaction".to_string()
} else if let PutError::Batch(upload_state) = err {
format!(
"❌ Upload batch failed: {} chunks failed",
upload_state.failed.len()
)
} else {
"❌ Upload error occurred".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::client::ChunkBatchUploadState;
#[tokio::test]
async fn test_process_tasks_with_max_concurrency_empty_vec() {
let tasks: Vec<std::future::Ready<i32>> = vec![];
let results = process_tasks_with_max_concurrency(tasks, 10).await;
assert!(results.is_empty());
}
#[test]
fn test_extract_gas_values() {
let err_str = "Error: max fee per gas less than block base fee: maxFeePerGas: 1000000000, baseFee: 2000000000";
let result = extract_gas_values(err_str);
assert_eq!(
result,
Some(("1000000000".to_string(), "2000000000".to_string()))
);
let err_str = "maxFeePerGas: 500, baseFee: 1000 (retry later)";
let result = extract_gas_values(err_str);
assert_eq!(result, Some(("500".to_string(), "1000".to_string())));
let err_str = "baseFee: 1000";
let result = extract_gas_values(err_str);
assert_eq!(result, None);
let err_str = "maxFeePerGas: 500";
let result = extract_gas_values(err_str);
assert_eq!(result, None);
let err_str = "";
let result = extract_gas_values(err_str);
assert_eq!(result, None);
}
#[test]
fn test_format_upload_error() {
let mut batch_state = ChunkBatchUploadState::default();
let chunk_addr1 = ant_protocol::storage::ChunkAddress::new(xor_name::XorName([1; 32]));
let chunk_addr2 = ant_protocol::storage::ChunkAddress::new(xor_name::XorName([2; 32]));
batch_state
.failed
.push((chunk_addr1, "test error".to_string()));
batch_state
.failed
.push((chunk_addr2, "test error 2".to_string()));
let batch_err = PutError::Batch(batch_state);
let err_msg = format_upload_error(&batch_err);
assert_eq!(err_msg, "❌ Upload batch failed: 2 chunks failed");
let generic_err = PutError::Batch(ChunkBatchUploadState::default());
let err_msg = format_upload_error(&generic_err);
assert_eq!(err_msg, "❌ Upload batch failed: 0 chunks failed");
}
}