Skip to main content

oximedia_gpu/
upload_queue.rs

1#![allow(dead_code)]
2//! Asynchronous upload queue for staging CPU data to GPU buffers.
3//!
4//! This module provides a batched upload mechanism that stages host-side data
5//! into a queue and flushes it to GPU-side buffers in optimized batches,
6//! reducing per-transfer overhead.
7
8use std::collections::VecDeque;
9use std::time::Instant;
10
11/// Unique identifier for an upload request.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub struct UploadId(
14    /// Inner identifier value.
15    pub u64,
16);
17
18/// Priority level for upload requests.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
20pub enum UploadPriority {
21    /// Low priority - background uploads.
22    Low = 0,
23    /// Normal priority - standard uploads.
24    #[default]
25    Normal = 1,
26    /// High priority - latency-sensitive uploads.
27    High = 2,
28    /// Critical priority - must be processed immediately.
29    Critical = 3,
30}
31
32/// The current state of an upload request.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum UploadState {
35    /// Queued and waiting to be processed.
36    Queued,
37    /// Currently being transferred.
38    Transferring,
39    /// Transfer completed successfully.
40    Completed,
41    /// Transfer failed.
42    Failed,
43    /// Transfer was cancelled.
44    Cancelled,
45}
46
47/// Describes the destination for an upload.
48#[derive(Debug, Clone)]
49pub struct UploadTarget {
50    /// Target buffer identifier.
51    pub buffer_id: u64,
52    /// Byte offset within the target buffer.
53    pub offset: usize,
54    /// Expected total size of the target buffer.
55    pub buffer_size: usize,
56}
57
58impl UploadTarget {
59    /// Create a new upload target.
60    #[must_use]
61    pub fn new(buffer_id: u64, offset: usize, buffer_size: usize) -> Self {
62        Self {
63            buffer_id,
64            offset,
65            buffer_size,
66        }
67    }
68
69    /// Check whether the given data size fits within the target at the offset.
70    #[must_use]
71    pub fn fits(&self, data_size: usize) -> bool {
72        self.offset + data_size <= self.buffer_size
73    }
74}
75
76/// A single upload request in the queue.
77#[derive(Debug, Clone)]
78pub struct UploadRequest {
79    /// Unique identifier for this request.
80    pub id: UploadId,
81    /// The data to upload.
82    pub data: Vec<u8>,
83    /// Target destination for the data.
84    pub target: UploadTarget,
85    /// Priority level.
86    pub priority: UploadPriority,
87    /// Current state.
88    pub state: UploadState,
89    /// Timestamp when the request was enqueued.
90    pub enqueue_time: Instant,
91    /// Timestamp when the transfer completed.
92    pub complete_time: Option<Instant>,
93}
94
95impl UploadRequest {
96    /// Create a new upload request.
97    #[must_use]
98    pub fn new(
99        id: UploadId,
100        data: Vec<u8>,
101        target: UploadTarget,
102        priority: UploadPriority,
103    ) -> Self {
104        Self {
105            id,
106            data,
107            target,
108            priority,
109            state: UploadState::Queued,
110            enqueue_time: Instant::now(),
111            complete_time: None,
112        }
113    }
114
115    /// Return the size of the data in bytes.
116    #[must_use]
117    pub fn data_size(&self) -> usize {
118        self.data.len()
119    }
120
121    /// Return the transfer latency if the upload is completed.
122    #[must_use]
123    pub fn latency(&self) -> Option<std::time::Duration> {
124        self.complete_time
125            .map(|t| t.duration_since(self.enqueue_time))
126    }
127}
128
129/// Configuration for the upload queue.
130#[derive(Debug, Clone)]
131pub struct UploadQueueConfig {
132    /// Maximum number of pending requests.
133    pub max_pending: usize,
134    /// Maximum total bytes that can be queued.
135    pub max_queued_bytes: usize,
136    /// Batch size for flush operations.
137    pub flush_batch_size: usize,
138    /// Whether to sort by priority before flushing.
139    pub priority_sort: bool,
140}
141
142impl Default for UploadQueueConfig {
143    fn default() -> Self {
144        Self {
145            max_pending: 256,
146            max_queued_bytes: 256 * 1024 * 1024, // 256 MB
147            flush_batch_size: 32,
148            priority_sort: true,
149        }
150    }
151}
152
153/// Statistics for the upload queue.
154#[derive(Debug, Clone, Default)]
155pub struct UploadQueueStats {
156    /// Number of requests currently in the queue.
157    pub queued_count: usize,
158    /// Total bytes currently queued.
159    pub queued_bytes: usize,
160    /// Total number of requests processed.
161    pub total_processed: u64,
162    /// Total bytes transferred.
163    pub total_bytes_transferred: u64,
164    /// Number of failed requests.
165    pub failed_count: u64,
166    /// Number of cancelled requests.
167    pub cancelled_count: u64,
168    /// Average transfer throughput in bytes per second.
169    pub avg_throughput_bps: f64,
170}
171
172impl UploadQueueStats {
173    /// Return queued bytes in megabytes.
174    #[allow(clippy::cast_precision_loss)]
175    #[must_use]
176    pub fn queued_mb(&self) -> f64 {
177        self.queued_bytes as f64 / (1024.0 * 1024.0)
178    }
179
180    /// Return total transferred in megabytes.
181    #[allow(clippy::cast_precision_loss)]
182    #[must_use]
183    pub fn transferred_mb(&self) -> f64 {
184        self.total_bytes_transferred as f64 / (1024.0 * 1024.0)
185    }
186}
187
188/// An asynchronous upload queue that batches host-to-device transfers.
189pub struct UploadQueue {
190    /// Pending upload requests.
191    queue: VecDeque<UploadRequest>,
192    /// Completed upload requests (for stats).
193    completed: Vec<UploadRequest>,
194    /// Configuration.
195    config: UploadQueueConfig,
196    /// Counter for generating unique upload IDs.
197    next_id: u64,
198    /// Total bytes transferred.
199    total_bytes_transferred: u64,
200    /// Failed request count.
201    failed_count: u64,
202    /// Cancelled request count.
203    cancelled_count: u64,
204}
205
206impl UploadQueue {
207    /// Create a new upload queue with default configuration.
208    #[must_use]
209    pub fn new() -> Self {
210        Self::with_config(UploadQueueConfig::default())
211    }
212
213    /// Create a new upload queue with the given configuration.
214    #[must_use]
215    pub fn with_config(config: UploadQueueConfig) -> Self {
216        Self {
217            queue: VecDeque::new(),
218            completed: Vec::new(),
219            config,
220            next_id: 0,
221            total_bytes_transferred: 0,
222            failed_count: 0,
223            cancelled_count: 0,
224        }
225    }
226
227    /// Enqueue an upload request. Returns the `UploadId` if successful,
228    /// or `None` if the queue is full.
229    pub fn enqueue(
230        &mut self,
231        data: Vec<u8>,
232        target: UploadTarget,
233        priority: UploadPriority,
234    ) -> Option<UploadId> {
235        let current_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
236        if self.queue.len() >= self.config.max_pending {
237            return None;
238        }
239        if current_bytes + data.len() > self.config.max_queued_bytes {
240            return None;
241        }
242        let id = UploadId(self.next_id);
243        self.next_id += 1;
244        let request = UploadRequest::new(id, data, target, priority);
245        self.queue.push_back(request);
246        Some(id)
247    }
248
249    /// Flush a batch of uploads, simulating the transfer to GPU.
250    /// Returns the IDs of requests that were processed.
251    pub fn flush(&mut self) -> Vec<UploadId> {
252        if self.config.priority_sort {
253            let mut items: Vec<UploadRequest> = self.queue.drain(..).collect();
254            items.sort_by(|a, b| b.priority.cmp(&a.priority));
255            self.queue = items.into_iter().collect();
256        }
257
258        let batch_size = self.config.flush_batch_size.min(self.queue.len());
259        let mut processed = Vec::with_capacity(batch_size);
260
261        for _ in 0..batch_size {
262            if let Some(mut request) = self.queue.pop_front() {
263                if request.target.fits(request.data.len()) {
264                    request.state = UploadState::Completed;
265                    request.complete_time = Some(Instant::now());
266                    self.total_bytes_transferred += request.data.len() as u64;
267                } else {
268                    request.state = UploadState::Failed;
269                    self.failed_count += 1;
270                }
271                processed.push(request.id);
272                self.completed.push(request);
273            }
274        }
275
276        processed
277    }
278
279    /// Cancel a pending upload by ID.
280    pub fn cancel(&mut self, id: UploadId) -> bool {
281        if let Some(pos) = self.queue.iter().position(|r| r.id == id) {
282            let mut request = match self.queue.remove(pos) {
283                Some(r) => r,
284                None => return false,
285            };
286            request.state = UploadState::Cancelled;
287            self.cancelled_count += 1;
288            self.completed.push(request);
289            true
290        } else {
291            false
292        }
293    }
294
295    /// Return the number of pending requests.
296    #[must_use]
297    pub fn pending_count(&self) -> usize {
298        self.queue.len()
299    }
300
301    /// Return the total queued bytes.
302    #[must_use]
303    pub fn queued_bytes(&self) -> usize {
304        self.queue.iter().map(|r| r.data.len()).sum()
305    }
306
307    /// Check if the queue is empty.
308    #[must_use]
309    pub fn is_empty(&self) -> bool {
310        self.queue.is_empty()
311    }
312
313    /// Return the state of a request by ID.
314    #[must_use]
315    pub fn request_state(&self, id: UploadId) -> Option<UploadState> {
316        self.queue
317            .iter()
318            .find(|r| r.id == id)
319            .map(|r| r.state)
320            .or_else(|| self.completed.iter().find(|r| r.id == id).map(|r| r.state))
321    }
322
323    /// Return statistics about the queue.
324    #[allow(clippy::cast_precision_loss)]
325    #[must_use]
326    pub fn stats(&self) -> UploadQueueStats {
327        let queued_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
328        let total_processed = self.completed.len() as u64;
329        let latencies: Vec<f64> = self
330            .completed
331            .iter()
332            .filter_map(UploadRequest::latency)
333            .map(|d| d.as_secs_f64())
334            .collect();
335        let avg_throughput_bps = if latencies.is_empty() {
336            0.0
337        } else {
338            let total_secs: f64 = latencies.iter().sum();
339            if total_secs > 0.0 {
340                self.total_bytes_transferred as f64 / total_secs
341            } else {
342                0.0
343            }
344        };
345
346        UploadQueueStats {
347            queued_count: self.queue.len(),
348            queued_bytes,
349            total_processed,
350            total_bytes_transferred: self.total_bytes_transferred,
351            failed_count: self.failed_count,
352            cancelled_count: self.cancelled_count,
353            avg_throughput_bps,
354        }
355    }
356
357    /// Clear all pending requests (marking them cancelled).
358    pub fn clear(&mut self) {
359        while let Some(mut req) = self.queue.pop_front() {
360            req.state = UploadState::Cancelled;
361            self.cancelled_count += 1;
362            self.completed.push(req);
363        }
364    }
365
366    /// Peek at the next request that would be processed.
367    #[must_use]
368    pub fn peek_next(&self) -> Option<&UploadRequest> {
369        self.queue.front()
370    }
371}
372
373impl Default for UploadQueue {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    fn make_target(size: usize) -> UploadTarget {
384        UploadTarget::new(1, 0, size)
385    }
386
387    #[test]
388    fn test_create_queue() {
389        let queue = UploadQueue::new();
390        assert!(queue.is_empty());
391        assert_eq!(queue.pending_count(), 0);
392    }
393
394    #[test]
395    fn test_enqueue_request() {
396        let mut queue = UploadQueue::new();
397        let data = vec![0u8; 1024];
398        let target = make_target(2048);
399        let id = queue.enqueue(data, target, UploadPriority::Normal);
400        assert!(id.is_some());
401        assert_eq!(queue.pending_count(), 1);
402    }
403
404    #[test]
405    fn test_enqueue_limit() {
406        let config = UploadQueueConfig {
407            max_pending: 2,
408            ..Default::default()
409        };
410        let mut queue = UploadQueue::with_config(config);
411        let t1 = make_target(4096);
412        let t2 = make_target(4096);
413        let t3 = make_target(4096);
414        assert!(queue
415            .enqueue(vec![0; 100], t1, UploadPriority::Normal)
416            .is_some());
417        assert!(queue
418            .enqueue(vec![0; 100], t2, UploadPriority::Normal)
419            .is_some());
420        assert!(queue
421            .enqueue(vec![0; 100], t3, UploadPriority::Normal)
422            .is_none());
423    }
424
425    #[test]
426    fn test_enqueue_byte_limit() {
427        let config = UploadQueueConfig {
428            max_queued_bytes: 200,
429            ..Default::default()
430        };
431        let mut queue = UploadQueue::with_config(config);
432        let t1 = make_target(4096);
433        let t2 = make_target(4096);
434        assert!(queue
435            .enqueue(vec![0; 150], t1, UploadPriority::Normal)
436            .is_some());
437        assert!(queue
438            .enqueue(vec![0; 100], t2, UploadPriority::Normal)
439            .is_none());
440    }
441
442    #[test]
443    fn test_flush_batch() {
444        let mut queue = UploadQueue::new();
445        for i in 0..5 {
446            let target = make_target(4096);
447            queue.enqueue(vec![0u8; 100], target, UploadPriority::Normal);
448            let _ = i;
449        }
450        let processed = queue.flush();
451        assert!(!processed.is_empty());
452    }
453
454    #[test]
455    fn test_flush_priority_ordering() {
456        let config = UploadQueueConfig {
457            priority_sort: true,
458            flush_batch_size: 1,
459            ..Default::default()
460        };
461        let mut queue = UploadQueue::with_config(config);
462        let t1 = make_target(4096);
463        let t2 = make_target(4096);
464        let _low_id = queue
465            .enqueue(vec![1], t1, UploadPriority::Low)
466            .expect("enqueue should succeed");
467        let high_id = queue
468            .enqueue(vec![2], t2, UploadPriority::Critical)
469            .expect("operation should succeed in test");
470        let processed = queue.flush();
471        assert_eq!(processed.len(), 1);
472        assert_eq!(processed[0], high_id);
473    }
474
475    #[test]
476    fn test_cancel_request() {
477        let mut queue = UploadQueue::new();
478        let target = make_target(4096);
479        let id = queue
480            .enqueue(vec![0; 100], target, UploadPriority::Normal)
481            .expect("operation should succeed in test");
482        assert!(queue.cancel(id));
483        assert!(queue.is_empty());
484        assert_eq!(queue.request_state(id), Some(UploadState::Cancelled));
485    }
486
487    #[test]
488    fn test_cancel_nonexistent() {
489        let mut queue = UploadQueue::new();
490        assert!(!queue.cancel(UploadId(999)));
491    }
492
493    #[test]
494    fn test_request_state_tracking() {
495        let mut queue = UploadQueue::new();
496        let target = make_target(4096);
497        let id = queue
498            .enqueue(vec![0; 100], target, UploadPriority::Normal)
499            .expect("operation should succeed in test");
500        assert_eq!(queue.request_state(id), Some(UploadState::Queued));
501        queue.flush();
502        assert_eq!(queue.request_state(id), Some(UploadState::Completed));
503    }
504
505    #[test]
506    fn test_failed_transfer() {
507        let mut queue = UploadQueue::new();
508        // Target too small for data
509        let target = UploadTarget::new(1, 0, 10);
510        let id = queue
511            .enqueue(vec![0; 100], target, UploadPriority::Normal)
512            .expect("operation should succeed in test");
513        queue.flush();
514        assert_eq!(queue.request_state(id), Some(UploadState::Failed));
515    }
516
517    #[test]
518    fn test_queued_bytes() {
519        let mut queue = UploadQueue::new();
520        let t1 = make_target(4096);
521        let t2 = make_target(4096);
522        queue.enqueue(vec![0; 100], t1, UploadPriority::Normal);
523        queue.enqueue(vec![0; 200], t2, UploadPriority::Normal);
524        assert_eq!(queue.queued_bytes(), 300);
525    }
526
527    #[test]
528    fn test_stats() {
529        let mut queue = UploadQueue::new();
530        let target = make_target(4096);
531        queue.enqueue(vec![0; 1024], target, UploadPriority::Normal);
532        queue.flush();
533        let stats = queue.stats();
534        assert_eq!(stats.total_processed, 1);
535        assert_eq!(stats.total_bytes_transferred, 1024);
536    }
537
538    #[test]
539    fn test_stats_mb_conversion() {
540        let stats = UploadQueueStats {
541            queued_bytes: 1_048_576,
542            total_bytes_transferred: 2_097_152,
543            ..Default::default()
544        };
545        assert!((stats.queued_mb() - 1.0).abs() < 0.001);
546        assert!((stats.transferred_mb() - 2.0).abs() < 0.001);
547    }
548
549    #[test]
550    fn test_clear_queue() {
551        let mut queue = UploadQueue::new();
552        for _ in 0..5 {
553            let target = make_target(4096);
554            queue.enqueue(vec![0; 100], target, UploadPriority::Normal);
555        }
556        queue.clear();
557        assert!(queue.is_empty());
558        assert_eq!(queue.stats().cancelled_count, 5);
559    }
560
561    #[test]
562    fn test_peek_next() {
563        let mut queue = UploadQueue::new();
564        assert!(queue.peek_next().is_none());
565        let target = make_target(4096);
566        let id = queue
567            .enqueue(vec![0; 100], target, UploadPriority::Normal)
568            .expect("operation should succeed in test");
569        let peeked = queue.peek_next().expect("peek should return next item");
570        assert_eq!(peeked.id, id);
571    }
572
573    #[test]
574    fn test_upload_target_fits() {
575        let target = UploadTarget::new(1, 10, 100);
576        assert!(target.fits(90));
577        assert!(target.fits(0));
578        assert!(!target.fits(91));
579    }
580}