use std::time::Duration;
use crate::Packet;
use super::error::Result;
use super::send::{send_packet, SendMode, SendOptions, SendReport};
use super::send_recv::SendRecv;
pub use super::send_recv::{
send_recv_packets, BatchSendRecv, BatchSendRecvEntry, BatchSendRecvReport,
PacketBatchSendRecvExt,
};
pub struct BatchSend {
send_options: SendOptions,
concurrency_limit: usize,
retries: usize,
retry_timeout: Duration,
}
impl BatchSend {
pub fn new() -> Self {
Self {
send_options: SendOptions::new(),
concurrency_limit: 64,
retries: 1,
retry_timeout: Duration::ZERO,
}
}
pub fn interface(mut self, interface: impl Into<String>) -> Self {
self.send_options = self.send_options.interface(interface);
self
}
pub fn iface(self, interface: impl Into<String>) -> Self {
self.interface(interface)
}
pub fn mode(mut self, mode: SendMode) -> Self {
self.send_options = self.send_options.mode(mode);
self
}
pub fn link_layer(self) -> Self {
self.mode(SendMode::LinkLayer)
}
pub fn network_layer(self) -> Self {
self.mode(SendMode::NetworkLayer)
}
pub fn dry_run(mut self) -> Self {
self.send_options = self.send_options.dry_run();
self
}
pub fn live(mut self) -> Self {
self.send_options = self.send_options.live();
self
}
pub fn write_timeout(mut self, timeout: Duration) -> Self {
self.send_options = self.send_options.write_timeout(timeout);
self
}
pub fn write_buffer_size(mut self, size: usize) -> Self {
self.send_options = self.send_options.write_buffer_size(size);
self
}
pub fn concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit = limit.max(1);
self
}
pub fn retries(mut self, retries: usize) -> Self {
self.retries = retries.max(1);
self
}
pub fn retry(self, retries: usize) -> Self {
self.retries(retries)
}
pub fn retry_timeout(mut self, timeout: Duration) -> Self {
self.retry_timeout = timeout;
self
}
pub fn timeout(self, timeout: Duration) -> Self {
self.retry_timeout(timeout)
}
pub const fn send_options(&self) -> &SendOptions {
&self.send_options
}
pub const fn concurrency_limit_value(&self) -> usize {
self.concurrency_limit
}
pub const fn retries_value(&self) -> usize {
self.retries
}
pub const fn retry_timeout_value(&self) -> Duration {
self.retry_timeout
}
pub fn send_all(&self, packets: &[Packet]) -> Result<BatchSendReport> {
let mut entries = (0..packets.len())
.map(BatchSendEntry::new)
.collect::<Vec<_>>();
for chunk_start in (0..packets.len()).step_by(self.concurrency_limit) {
let chunk_end = (chunk_start + self.concurrency_limit).min(packets.len());
for attempt in 0..self.retries {
for request_index in chunk_start..chunk_end {
entries[request_index].send_reports.push(send_packet(
&packets[request_index],
self.send_options.clone(),
)?);
}
maybe_wait_between_live_retries(
self.send_options.is_dry_run(),
self.retry_timeout,
attempt,
self.retries,
);
}
}
Ok(BatchSendReport::new(
entries,
self.concurrency_limit,
self.retries,
self.retry_timeout,
self.send_options.is_dry_run(),
))
}
}
impl Default for BatchSend {
fn default() -> Self {
Self::new()
}
}
impl From<SendOptions> for BatchSend {
fn from(send_options: SendOptions) -> Self {
Self::new().with_send_options(send_options)
}
}
impl From<SendRecv> for BatchSend {
fn from(send_recv: SendRecv) -> Self {
Self::new()
.with_send_options(send_recv.send_options().clone())
.retries(send_recv.retries_value())
.retry_timeout(send_recv.timeout_value())
}
}
impl From<&str> for BatchSend {
fn from(interface: &str) -> Self {
Self::new().iface(interface)
}
}
impl From<String> for BatchSend {
fn from(interface: String) -> Self {
Self::new().iface(interface)
}
}
impl BatchSend {
fn with_send_options(mut self, send_options: SendOptions) -> Self {
self.send_options = send_options;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BatchSendEntry {
request_index: usize,
send_reports: Vec<SendReport>,
}
impl BatchSendEntry {
fn new(request_index: usize) -> Self {
Self {
request_index,
send_reports: Vec::new(),
}
}
pub const fn request_index(&self) -> usize {
self.request_index
}
pub fn send_reports(&self) -> &[SendReport] {
&self.send_reports
}
pub fn attempts(&self) -> usize {
self.send_reports.len()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BatchSendReport {
entries: Vec<BatchSendEntry>,
concurrency_limit: usize,
retries: usize,
retry_timeout: Duration,
dry_run: bool,
}
impl BatchSendReport {
fn new(
entries: Vec<BatchSendEntry>,
concurrency_limit: usize,
retries: usize,
retry_timeout: Duration,
dry_run: bool,
) -> Self {
Self {
entries,
concurrency_limit,
retries,
retry_timeout,
dry_run,
}
}
pub fn entries(&self) -> &[BatchSendEntry] {
&self.entries
}
pub fn entry(&self, request_index: usize) -> Option<&BatchSendEntry> {
self.entries.get(request_index)
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub const fn concurrency_limit(&self) -> usize {
self.concurrency_limit
}
pub const fn retries(&self) -> usize {
self.retries
}
pub const fn retry_timeout(&self) -> Duration {
self.retry_timeout
}
pub const fn is_dry_run(&self) -> bool {
self.dry_run
}
}
pub trait PacketBatchSendExt {
fn batch_send(&self, options: impl Into<BatchSend>) -> Result<BatchSendReport>;
fn batch_send_dry_run(&self, options: impl Into<BatchSend>) -> Result<BatchSendReport>;
}
impl PacketBatchSendExt for [Packet] {
fn batch_send(&self, options: impl Into<BatchSend>) -> Result<BatchSendReport> {
options.into().send_all(self)
}
fn batch_send_dry_run(&self, options: impl Into<BatchSend>) -> Result<BatchSendReport> {
options.into().dry_run().send_all(self)
}
}
pub fn send_packets(packets: &[Packet], options: impl Into<BatchSend>) -> Result<BatchSendReport> {
options.into().send_all(packets)
}
fn maybe_wait_between_live_retries(
dry_run: bool,
timeout: Duration,
attempt: usize,
attempts: usize,
) {
if !dry_run && !timeout.is_zero() && attempt + 1 < attempts {
std::thread::sleep(timeout);
}
}