Skip to main content

oximedia_gpu/
upload_queue.rs

1#![allow(dead_code)]
2//! Asynchronous upload queue for staging CPU data to GPU buffers.
3//!
4//! This module provides a batched upload mechanism that stages host-side data
5//! into a queue and flushes it to GPU-side buffers in optimized batches,
6//! reducing per-transfer overhead.
7
8use std::collections::VecDeque;
9use std::time::Instant;
10
11/// Unique identifier for an upload request.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub struct UploadId(
14    /// Inner identifier value.
15    pub u64,
16);
17
18/// Priority level for upload requests.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
20pub enum UploadPriority {
21    /// Low priority - background uploads.
22    Low = 0,
23    /// Normal priority - standard uploads.
24    #[default]
25    Normal = 1,
26    /// High priority - latency-sensitive uploads.
27    High = 2,
28    /// Critical priority - must be processed immediately.
29    Critical = 3,
30}
31
32/// The current state of an upload request.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum UploadState {
35    /// Queued and waiting to be processed.
36    Queued,
37    /// Currently being transferred.
38    Transferring,
39    /// Transfer completed successfully.
40    Completed,
41    /// Transfer failed.
42    Failed,
43    /// Transfer was cancelled.
44    Cancelled,
45}
46
47/// Describes the destination for an upload.
48#[derive(Debug, Clone)]
49pub struct UploadTarget {
50    /// Target buffer identifier.
51    pub buffer_id: u64,
52    /// Byte offset within the target buffer.
53    pub offset: usize,
54    /// Expected total size of the target buffer.
55    pub buffer_size: usize,
56}
57
58impl UploadTarget {
59    /// Create a new upload target.
60    #[must_use]
61    pub fn new(buffer_id: u64, offset: usize, buffer_size: usize) -> Self {
62        Self {
63            buffer_id,
64            offset,
65            buffer_size,
66        }
67    }
68
69    /// Check whether the given data size fits within the target at the offset.
70    #[must_use]
71    pub fn fits(&self, data_size: usize) -> bool {
72        self.offset + data_size <= self.buffer_size
73    }
74}
75
76/// A single upload request in the queue.
77#[derive(Debug, Clone)]
78pub struct UploadRequest {
79    /// Unique identifier for this request.
80    pub id: UploadId,
81    /// The data to upload.
82    pub data: Vec<u8>,
83    /// Target destination for the data.
84    pub target: UploadTarget,
85    /// Priority level.
86    pub priority: UploadPriority,
87    /// Current state.
88    pub state: UploadState,
89    /// Timestamp when the request was enqueued.
90    pub enqueue_time: Instant,
91    /// Timestamp when the transfer completed.
92    pub complete_time: Option<Instant>,
93}
94
95impl UploadRequest {
96    /// Create a new upload request.
97    #[must_use]
98    pub fn new(
99        id: UploadId,
100        data: Vec<u8>,
101        target: UploadTarget,
102        priority: UploadPriority,
103    ) -> Self {
104        Self {
105            id,
106            data,
107            target,
108            priority,
109            state: UploadState::Queued,
110            enqueue_time: Instant::now(),
111            complete_time: None,
112        }
113    }
114
115    /// Return the size of the data in bytes.
116    #[must_use]
117    pub fn data_size(&self) -> usize {
118        self.data.len()
119    }
120
121    /// Return the transfer latency if the upload is completed.
122    #[must_use]
123    pub fn latency(&self) -> Option<std::time::Duration> {
124        self.complete_time
125            .map(|t| t.duration_since(self.enqueue_time))
126    }
127}
128
129/// Configuration for the upload queue.
130#[derive(Debug, Clone)]
131pub struct UploadQueueConfig {
132    /// Maximum number of pending requests.
133    pub max_pending: usize,
134    /// Maximum total bytes that can be queued.
135    pub max_queued_bytes: usize,
136    /// Batch size for flush operations.
137    pub flush_batch_size: usize,
138    /// Whether to sort by priority before flushing.
139    pub priority_sort: bool,
140}
141
142impl Default for UploadQueueConfig {
143    fn default() -> Self {
144        Self {
145            max_pending: 256,
146            max_queued_bytes: 256 * 1024 * 1024, // 256 MB
147            flush_batch_size: 32,
148            priority_sort: true,
149        }
150    }
151}
152
153/// Statistics for the upload queue.
154#[derive(Debug, Clone, Default)]
155pub struct UploadQueueStats {
156    /// Number of requests currently in the queue.
157    pub queued_count: usize,
158    /// Total bytes currently queued.
159    pub queued_bytes: usize,
160    /// Total number of requests processed.
161    pub total_processed: u64,
162    /// Total bytes transferred.
163    pub total_bytes_transferred: u64,
164    /// Number of failed requests.
165    pub failed_count: u64,
166    /// Number of cancelled requests.
167    pub cancelled_count: u64,
168    /// Average transfer throughput in bytes per second.
169    pub avg_throughput_bps: f64,
170}
171
172impl UploadQueueStats {
173    /// Return queued bytes in megabytes.
174    #[allow(clippy::cast_precision_loss)]
175    #[must_use]
176    pub fn queued_mb(&self) -> f64 {
177        self.queued_bytes as f64 / (1024.0 * 1024.0)
178    }
179
180    /// Return total transferred in megabytes.
181    #[allow(clippy::cast_precision_loss)]
182    #[must_use]
183    pub fn transferred_mb(&self) -> f64 {
184        self.total_bytes_transferred as f64 / (1024.0 * 1024.0)
185    }
186}
187
188/// An asynchronous upload queue that batches host-to-device transfers.
189pub struct UploadQueue {
190    /// Pending upload requests.
191    queue: VecDeque<UploadRequest>,
192    /// Completed upload requests (for stats).
193    completed: Vec<UploadRequest>,
194    /// Configuration.
195    config: UploadQueueConfig,
196    /// Counter for generating unique upload IDs.
197    next_id: u64,
198    /// Total bytes transferred.
199    total_bytes_transferred: u64,
200    /// Failed request count.
201    failed_count: u64,
202    /// Cancelled request count.
203    cancelled_count: u64,
204}
205
206impl UploadQueue {
207    /// Create a new upload queue with default configuration.
208    #[must_use]
209    pub fn new() -> Self {
210        Self::with_config(UploadQueueConfig::default())
211    }
212
213    /// Create a new upload queue with the given configuration.
214    #[must_use]
215    pub fn with_config(config: UploadQueueConfig) -> Self {
216        Self {
217            queue: VecDeque::new(),
218            completed: Vec::new(),
219            config,
220            next_id: 0,
221            total_bytes_transferred: 0,
222            failed_count: 0,
223            cancelled_count: 0,
224        }
225    }
226
227    /// Enqueue an upload request. Returns the `UploadId` if successful,
228    /// or `None` if the queue is full.
229    pub fn enqueue(
230        &mut self,
231        data: Vec<u8>,
232        target: UploadTarget,
233        priority: UploadPriority,
234    ) -> Option<UploadId> {
235        let current_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
236        if self.queue.len() >= self.config.max_pending {
237            return None;
238        }
239        if current_bytes + data.len() > self.config.max_queued_bytes {
240            return None;
241        }
242        let id = UploadId(self.next_id);
243        self.next_id += 1;
244        let request = UploadRequest::new(id, data, target, priority);
245        self.queue.push_back(request);
246        Some(id)
247    }
248
249    /// Flush a batch of uploads, simulating the transfer to GPU.
250    /// Returns the IDs of requests that were processed.
251    pub fn flush(&mut self) -> Vec<UploadId> {
252        if self.config.priority_sort {
253            let mut items: Vec<UploadRequest> = self.queue.drain(..).collect();
254            items.sort_by(|a, b| b.priority.cmp(&a.priority));
255            self.queue = items.into_iter().collect();
256        }
257
258        let batch_size = self.config.flush_batch_size.min(self.queue.len());
259        let mut processed = Vec::with_capacity(batch_size);
260
261        for _ in 0..batch_size {
262            if let Some(mut request) = self.queue.pop_front() {
263                if request.target.fits(request.data.len()) {
264                    request.state = UploadState::Completed;
265                    request.complete_time = Some(Instant::now());
266                    self.total_bytes_transferred += request.data.len() as u64;
267                } else {
268                    request.state = UploadState::Failed;
269                    self.failed_count += 1;
270                }
271                processed.push(request.id);
272                self.completed.push(request);
273            }
274        }
275
276        processed
277    }
278
279    /// Cancel a pending upload by ID.
280    pub fn cancel(&mut self, id: UploadId) -> bool {
281        if let Some(pos) = self.queue.iter().position(|r| r.id == id) {
282            let mut request = self.queue.remove(pos).unwrap();
283            request.state = UploadState::Cancelled;
284            self.cancelled_count += 1;
285            self.completed.push(request);
286            true
287        } else {
288            false
289        }
290    }
291
292    /// Return the number of pending requests.
293    #[must_use]
294    pub fn pending_count(&self) -> usize {
295        self.queue.len()
296    }
297
298    /// Return the total queued bytes.
299    #[must_use]
300    pub fn queued_bytes(&self) -> usize {
301        self.queue.iter().map(|r| r.data.len()).sum()
302    }
303
304    /// Check if the queue is empty.
305    #[must_use]
306    pub fn is_empty(&self) -> bool {
307        self.queue.is_empty()
308    }
309
310    /// Return the state of a request by ID.
311    #[must_use]
312    pub fn request_state(&self, id: UploadId) -> Option<UploadState> {
313        self.queue
314            .iter()
315            .find(|r| r.id == id)
316            .map(|r| r.state)
317            .or_else(|| self.completed.iter().find(|r| r.id == id).map(|r| r.state))
318    }
319
320    /// Return statistics about the queue.
321    #[allow(clippy::cast_precision_loss)]
322    #[must_use]
323    pub fn stats(&self) -> UploadQueueStats {
324        let queued_bytes: usize = self.queue.iter().map(|r| r.data.len()).sum();
325        let total_processed = self.completed.len() as u64;
326        let latencies: Vec<f64> = self
327            .completed
328            .iter()
329            .filter_map(UploadRequest::latency)
330            .map(|d| d.as_secs_f64())
331            .collect();
332        let avg_throughput_bps = if latencies.is_empty() {
333            0.0
334        } else {
335            let total_secs: f64 = latencies.iter().sum();
336            if total_secs > 0.0 {
337                self.total_bytes_transferred as f64 / total_secs
338            } else {
339                0.0
340            }
341        };
342
343        UploadQueueStats {
344            queued_count: self.queue.len(),
345            queued_bytes,
346            total_processed,
347            total_bytes_transferred: self.total_bytes_transferred,
348            failed_count: self.failed_count,
349            cancelled_count: self.cancelled_count,
350            avg_throughput_bps,
351        }
352    }
353
354    /// Clear all pending requests (marking them cancelled).
355    pub fn clear(&mut self) {
356        while let Some(mut req) = self.queue.pop_front() {
357            req.state = UploadState::Cancelled;
358            self.cancelled_count += 1;
359            self.completed.push(req);
360        }
361    }
362
363    /// Peek at the next request that would be processed.
364    #[must_use]
365    pub fn peek_next(&self) -> Option<&UploadRequest> {
366        self.queue.front()
367    }
368}
369
370impl Default for UploadQueue {
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    fn make_target(size: usize) -> UploadTarget {
381        UploadTarget::new(1, 0, size)
382    }
383
384    #[test]
385    fn test_create_queue() {
386        let queue = UploadQueue::new();
387        assert!(queue.is_empty());
388        assert_eq!(queue.pending_count(), 0);
389    }
390
391    #[test]
392    fn test_enqueue_request() {
393        let mut queue = UploadQueue::new();
394        let data = vec![0u8; 1024];
395        let target = make_target(2048);
396        let id = queue.enqueue(data, target, UploadPriority::Normal);
397        assert!(id.is_some());
398        assert_eq!(queue.pending_count(), 1);
399    }
400
401    #[test]
402    fn test_enqueue_limit() {
403        let config = UploadQueueConfig {
404            max_pending: 2,
405            ..Default::default()
406        };
407        let mut queue = UploadQueue::with_config(config);
408        let t1 = make_target(4096);
409        let t2 = make_target(4096);
410        let t3 = make_target(4096);
411        assert!(queue
412            .enqueue(vec![0; 100], t1, UploadPriority::Normal)
413            .is_some());
414        assert!(queue
415            .enqueue(vec![0; 100], t2, UploadPriority::Normal)
416            .is_some());
417        assert!(queue
418            .enqueue(vec![0; 100], t3, UploadPriority::Normal)
419            .is_none());
420    }
421
422    #[test]
423    fn test_enqueue_byte_limit() {
424        let config = UploadQueueConfig {
425            max_queued_bytes: 200,
426            ..Default::default()
427        };
428        let mut queue = UploadQueue::with_config(config);
429        let t1 = make_target(4096);
430        let t2 = make_target(4096);
431        assert!(queue
432            .enqueue(vec![0; 150], t1, UploadPriority::Normal)
433            .is_some());
434        assert!(queue
435            .enqueue(vec![0; 100], t2, UploadPriority::Normal)
436            .is_none());
437    }
438
439    #[test]
440    fn test_flush_batch() {
441        let mut queue = UploadQueue::new();
442        for i in 0..5 {
443            let target = make_target(4096);
444            queue.enqueue(vec![0u8; 100], target, UploadPriority::Normal);
445            let _ = i;
446        }
447        let processed = queue.flush();
448        assert!(!processed.is_empty());
449    }
450
451    #[test]
452    fn test_flush_priority_ordering() {
453        let config = UploadQueueConfig {
454            priority_sort: true,
455            flush_batch_size: 1,
456            ..Default::default()
457        };
458        let mut queue = UploadQueue::with_config(config);
459        let t1 = make_target(4096);
460        let t2 = make_target(4096);
461        let _low_id = queue.enqueue(vec![1], t1, UploadPriority::Low).unwrap();
462        let high_id = queue
463            .enqueue(vec![2], t2, UploadPriority::Critical)
464            .unwrap();
465        let processed = queue.flush();
466        assert_eq!(processed.len(), 1);
467        assert_eq!(processed[0], high_id);
468    }
469
470    #[test]
471    fn test_cancel_request() {
472        let mut queue = UploadQueue::new();
473        let target = make_target(4096);
474        let id = queue
475            .enqueue(vec![0; 100], target, UploadPriority::Normal)
476            .unwrap();
477        assert!(queue.cancel(id));
478        assert!(queue.is_empty());
479        assert_eq!(queue.request_state(id), Some(UploadState::Cancelled));
480    }
481
482    #[test]
483    fn test_cancel_nonexistent() {
484        let mut queue = UploadQueue::new();
485        assert!(!queue.cancel(UploadId(999)));
486    }
487
488    #[test]
489    fn test_request_state_tracking() {
490        let mut queue = UploadQueue::new();
491        let target = make_target(4096);
492        let id = queue
493            .enqueue(vec![0; 100], target, UploadPriority::Normal)
494            .unwrap();
495        assert_eq!(queue.request_state(id), Some(UploadState::Queued));
496        queue.flush();
497        assert_eq!(queue.request_state(id), Some(UploadState::Completed));
498    }
499
500    #[test]
501    fn test_failed_transfer() {
502        let mut queue = UploadQueue::new();
503        // Target too small for data
504        let target = UploadTarget::new(1, 0, 10);
505        let id = queue
506            .enqueue(vec![0; 100], target, UploadPriority::Normal)
507            .unwrap();
508        queue.flush();
509        assert_eq!(queue.request_state(id), Some(UploadState::Failed));
510    }
511
512    #[test]
513    fn test_queued_bytes() {
514        let mut queue = UploadQueue::new();
515        let t1 = make_target(4096);
516        let t2 = make_target(4096);
517        queue.enqueue(vec![0; 100], t1, UploadPriority::Normal);
518        queue.enqueue(vec![0; 200], t2, UploadPriority::Normal);
519        assert_eq!(queue.queued_bytes(), 300);
520    }
521
522    #[test]
523    fn test_stats() {
524        let mut queue = UploadQueue::new();
525        let target = make_target(4096);
526        queue.enqueue(vec![0; 1024], target, UploadPriority::Normal);
527        queue.flush();
528        let stats = queue.stats();
529        assert_eq!(stats.total_processed, 1);
530        assert_eq!(stats.total_bytes_transferred, 1024);
531    }
532
533    #[test]
534    fn test_stats_mb_conversion() {
535        let stats = UploadQueueStats {
536            queued_bytes: 1_048_576,
537            total_bytes_transferred: 2_097_152,
538            ..Default::default()
539        };
540        assert!((stats.queued_mb() - 1.0).abs() < 0.001);
541        assert!((stats.transferred_mb() - 2.0).abs() < 0.001);
542    }
543
544    #[test]
545    fn test_clear_queue() {
546        let mut queue = UploadQueue::new();
547        for _ in 0..5 {
548            let target = make_target(4096);
549            queue.enqueue(vec![0; 100], target, UploadPriority::Normal);
550        }
551        queue.clear();
552        assert!(queue.is_empty());
553        assert_eq!(queue.stats().cancelled_count, 5);
554    }
555
556    #[test]
557    fn test_peek_next() {
558        let mut queue = UploadQueue::new();
559        assert!(queue.peek_next().is_none());
560        let target = make_target(4096);
561        let id = queue
562            .enqueue(vec![0; 100], target, UploadPriority::Normal)
563            .unwrap();
564        let peeked = queue.peek_next().unwrap();
565        assert_eq!(peeked.id, id);
566    }
567
568    #[test]
569    fn test_upload_target_fits() {
570        let target = UploadTarget::new(1, 10, 100);
571        assert!(target.fits(90));
572        assert!(target.fits(0));
573        assert!(!target.fits(91));
574    }
575}