#![allow(dead_code)]
use std::collections::VecDeque;
use std::time::Instant;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct UploadId(
pub u64,
);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum UploadPriority {
Low = 0,
#[default]
Normal = 1,
High = 2,
Critical = 3,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UploadState {
Queued,
Transferring,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone)]
pub struct UploadTarget {
pub buffer_id: u64,
pub offset: usize,
pub buffer_size: usize,
}
impl UploadTarget {
#[must_use]
pub fn new(buffer_id: u64, offset: usize, buffer_size: usize) -> Self {
Self {
buffer_id,
offset,
buffer_size,
}
}
#[must_use]
pub fn fits(&self, data_size: usize) -> bool {
self.offset + data_size <= self.buffer_size
}
}
#[derive(Debug, Clone)]
pub struct UploadRequest {
pub id: UploadId,
pub data: Vec<u8>,
pub target: UploadTarget,
pub priority: UploadPriority,
pub state: UploadState,
pub enqueue_time: Instant,
pub complete_time: Option<Instant>,
}
impl UploadRequest {
#[must_use]
pub fn new(
id: UploadId,
data: Vec<u8>,
target: UploadTarget,
priority: UploadPriority,
) -> Self {
Self {
id,
data,
target,
priority,
state: UploadState::Queued,
enqueue_time: Instant::now(),
complete_time: None,
}
}
#[must_use]
pub fn data_size(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn latency(&self) -> Option<std::time::Duration> {
self.complete_time
.map(|t| t.duration_since(self.enqueue_time))
}
}
#[derive(Debug, Clone)]
pub struct UploadQueueConfig {
pub max_pending: usize,
pub max_queued_bytes: usize,
pub flush_batch_size: usize,
pub priority_sort: bool,
}
impl Default for UploadQueueConfig {
fn default() -> Self {
Self {
max_pending: 256,
max_queued_bytes: 256 * 1024 * 1024, flush_batch_size: 32,
priority_sort: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct UploadQueueStats {
pub queued_count: usize,
pub queued_bytes: usize,
pub total_processed: u64,
pub total_bytes_transferred: u64,
pub failed_count: u64,
pub cancelled_count: u64,
pub avg_throughput_bps: f64,
}
impl UploadQueueStats {
#[allow(clippy::cast_precision_loss)]
#[must_use]
pub fn queued_mb(&self) -> f64 {
self.queued_bytes as f64 / (1024.0 * 1024.0)
}
#[allow(clippy::cast_precision_loss)]
#[must_use]
pub fn transferred_mb(&self) -> f64 {
self.total_bytes_transferred as f64 / (1024.0 * 1024.0)
}
}
pub struct UploadQueue {
queue: VecDeque<UploadRequest>,
completed: Vec<UploadRequest>,
config: UploadQueueConfig,
next_id: u64,
total_bytes_transferred: u64,
failed_count: u64,
cancelled_count: u64,
}
impl UploadQueue {
#[must_use]
pub fn new() -> Self {
Self::with_config(UploadQueueConfig::default())
}
#[must_use]
pub fn with_config(config: UploadQueueConfig) -> Self {
Self {
queue: VecDeque::new(),
completed: Vec::new(),
config,
next_id: 0,
total_bytes_transferred: 0,
failed_count: 0,
cancelled_count: 0,
}
}
pub fn enqueue(
&mut self,
data: Vec<u8>,
target: UploadTarget,
priority: UploadPriority,
) -> Option<UploadId> {
let current_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
if self.queue.len() >= self.config.max_pending {
return None;
}
if current_bytes + data.len() > self.config.max_queued_bytes {
return None;
}
let id = UploadId(self.next_id);
self.next_id += 1;
let request = UploadRequest::new(id, data, target, priority);
self.queue.push_back(request);
Some(id)
}
pub fn flush(&mut self) -> Vec<UploadId> {
if self.config.priority_sort {
let mut items: Vec<UploadRequest> = self.queue.drain(..).collect();
items.sort_by(|a, b| b.priority.cmp(&a.priority));
self.queue = items.into_iter().collect();
}
let batch_size = self.config.flush_batch_size.min(self.queue.len());
let mut processed = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(mut request) = self.queue.pop_front() {
if request.target.fits(request.data.len()) {
request.state = UploadState::Completed;
request.complete_time = Some(Instant::now());
self.total_bytes_transferred += request.data.len() as u64;
} else {
request.state = UploadState::Failed;
self.failed_count += 1;
}
processed.push(request.id);
self.completed.push(request);
}
}
processed
}
pub fn cancel(&mut self, id: UploadId) -> bool {
if let Some(pos) = self.queue.iter().position(|r| r.id == id) {
let mut request = match self.queue.remove(pos) {
Some(r) => r,
None => return false,
};
request.state = UploadState::Cancelled;
self.cancelled_count += 1;
self.completed.push(request);
true
} else {
false
}
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.queue.len()
}
#[must_use]
pub fn queued_bytes(&self) -> usize {
self.queue.iter().map(|r| r.data.len()).sum()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
#[must_use]
pub fn request_state(&self, id: UploadId) -> Option<UploadState> {
self.queue
.iter()
.find(|r| r.id == id)
.map(|r| r.state)
.or_else(|| self.completed.iter().find(|r| r.id == id).map(|r| r.state))
}
#[allow(clippy::cast_precision_loss)]
#[must_use]
pub fn stats(&self) -> UploadQueueStats {
let queued_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
let total_processed = self.completed.len() as u64;
let latencies: Vec<f64> = self
.completed
.iter()
.filter_map(UploadRequest::latency)
.map(|d| d.as_secs_f64())
.collect();
let avg_throughput_bps = if latencies.is_empty() {
0.0
} else {
let total_secs: f64 = latencies.iter().sum();
if total_secs > 0.0 {
self.total_bytes_transferred as f64 / total_secs
} else {
0.0
}
};
UploadQueueStats {
queued_count: self.queue.len(),
queued_bytes,
total_processed,
total_bytes_transferred: self.total_bytes_transferred,
failed_count: self.failed_count,
cancelled_count: self.cancelled_count,
avg_throughput_bps,
}
}
pub fn clear(&mut self) {
while let Some(mut req) = self.queue.pop_front() {
req.state = UploadState::Cancelled;
self.cancelled_count += 1;
self.completed.push(req);
}
}
#[must_use]
pub fn peek_next(&self) -> Option<&UploadRequest> {
self.queue.front()
}
}
impl Default for UploadQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_target(size: usize) -> UploadTarget {
UploadTarget::new(1, 0, size)
}
#[test]
fn test_create_queue() {
let queue = UploadQueue::new();
assert!(queue.is_empty());
assert_eq!(queue.pending_count(), 0);
}
#[test]
fn test_enqueue_request() {
let mut queue = UploadQueue::new();
let data = vec![0u8; 1024];
let target = make_target(2048);
let id = queue.enqueue(data, target, UploadPriority::Normal);
assert!(id.is_some());
assert_eq!(queue.pending_count(), 1);
}
#[test]
fn test_enqueue_limit() {
let config = UploadQueueConfig {
max_pending: 2,
..Default::default()
};
let mut queue = UploadQueue::with_config(config);
let t1 = make_target(4096);
let t2 = make_target(4096);
let t3 = make_target(4096);
assert!(queue
.enqueue(vec![0; 100], t1, UploadPriority::Normal)
.is_some());
assert!(queue
.enqueue(vec![0; 100], t2, UploadPriority::Normal)
.is_some());
assert!(queue
.enqueue(vec![0; 100], t3, UploadPriority::Normal)
.is_none());
}
#[test]
fn test_enqueue_byte_limit() {
let config = UploadQueueConfig {
max_queued_bytes: 200,
..Default::default()
};
let mut queue = UploadQueue::with_config(config);
let t1 = make_target(4096);
let t2 = make_target(4096);
assert!(queue
.enqueue(vec![0; 150], t1, UploadPriority::Normal)
.is_some());
assert!(queue
.enqueue(vec![0; 100], t2, UploadPriority::Normal)
.is_none());
}
#[test]
fn test_flush_batch() {
let mut queue = UploadQueue::new();
for i in 0..5 {
let target = make_target(4096);
queue.enqueue(vec![0u8; 100], target, UploadPriority::Normal);
let _ = i;
}
let processed = queue.flush();
assert!(!processed.is_empty());
}
#[test]
fn test_flush_priority_ordering() {
let config = UploadQueueConfig {
priority_sort: true,
flush_batch_size: 1,
..Default::default()
};
let mut queue = UploadQueue::with_config(config);
let t1 = make_target(4096);
let t2 = make_target(4096);
let _low_id = queue
.enqueue(vec![1], t1, UploadPriority::Low)
.expect("enqueue should succeed");
let high_id = queue
.enqueue(vec![2], t2, UploadPriority::Critical)
.expect("operation should succeed in test");
let processed = queue.flush();
assert_eq!(processed.len(), 1);
assert_eq!(processed[0], high_id);
}
#[test]
fn test_cancel_request() {
let mut queue = UploadQueue::new();
let target = make_target(4096);
let id = queue
.enqueue(vec![0; 100], target, UploadPriority::Normal)
.expect("operation should succeed in test");
assert!(queue.cancel(id));
assert!(queue.is_empty());
assert_eq!(queue.request_state(id), Some(UploadState::Cancelled));
}
#[test]
fn test_cancel_nonexistent() {
let mut queue = UploadQueue::new();
assert!(!queue.cancel(UploadId(999)));
}
#[test]
fn test_request_state_tracking() {
let mut queue = UploadQueue::new();
let target = make_target(4096);
let id = queue
.enqueue(vec![0; 100], target, UploadPriority::Normal)
.expect("operation should succeed in test");
assert_eq!(queue.request_state(id), Some(UploadState::Queued));
queue.flush();
assert_eq!(queue.request_state(id), Some(UploadState::Completed));
}
#[test]
fn test_failed_transfer() {
let mut queue = UploadQueue::new();
let target = UploadTarget::new(1, 0, 10);
let id = queue
.enqueue(vec![0; 100], target, UploadPriority::Normal)
.expect("operation should succeed in test");
queue.flush();
assert_eq!(queue.request_state(id), Some(UploadState::Failed));
}
#[test]
fn test_queued_bytes() {
let mut queue = UploadQueue::new();
let t1 = make_target(4096);
let t2 = make_target(4096);
queue.enqueue(vec![0; 100], t1, UploadPriority::Normal);
queue.enqueue(vec![0; 200], t2, UploadPriority::Normal);
assert_eq!(queue.queued_bytes(), 300);
}
#[test]
fn test_stats() {
let mut queue = UploadQueue::new();
let target = make_target(4096);
queue.enqueue(vec![0; 1024], target, UploadPriority::Normal);
queue.flush();
let stats = queue.stats();
assert_eq!(stats.total_processed, 1);
assert_eq!(stats.total_bytes_transferred, 1024);
}
#[test]
fn test_stats_mb_conversion() {
let stats = UploadQueueStats {
queued_bytes: 1_048_576,
total_bytes_transferred: 2_097_152,
..Default::default()
};
assert!((stats.queued_mb() - 1.0).abs() < 0.001);
assert!((stats.transferred_mb() - 2.0).abs() < 0.001);
}
#[test]
fn test_clear_queue() {
let mut queue = UploadQueue::new();
for _ in 0..5 {
let target = make_target(4096);
queue.enqueue(vec![0; 100], target, UploadPriority::Normal);
}
queue.clear();
assert!(queue.is_empty());
assert_eq!(queue.stats().cancelled_count, 5);
}
#[test]
fn test_peek_next() {
let mut queue = UploadQueue::new();
assert!(queue.peek_next().is_none());
let target = make_target(4096);
let id = queue
.enqueue(vec![0; 100], target, UploadPriority::Normal)
.expect("operation should succeed in test");
let peeked = queue.peek_next().expect("peek should return next item");
assert_eq!(peeked.id, id);
}
#[test]
fn test_upload_target_fits() {
let target = UploadTarget::new(1, 10, 100);
assert!(target.fits(90));
assert!(target.fits(0));
assert!(!target.fits(91));
}
}