1use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use thiserror::Error;
16
17#[derive(Error, Debug, Clone)]
19pub enum OfflineQueueError {
20 #[error("Queue is full")]
21 QueueFull,
22
23 #[error("Request expired")]
24 RequestExpired,
25
26 #[error("Invalid configuration: {0}")]
27 InvalidConfig(String),
28
29 #[error("Serialization error: {0}")]
30 SerializationError(String),
31}
32
33#[derive(Debug, Clone)]
35pub struct OfflineQueueConfig {
36 pub max_queue_size: usize,
38
39 pub enable_persistence: bool,
41
42 pub default_timeout: Duration,
44
45 pub enable_prioritization: bool,
47
48 pub max_request_age: Duration,
50
51 pub enable_auto_replay: bool,
53
54 pub replay_batch_size: usize,
56
57 pub replay_batch_delay: Duration,
59}
60
61impl Default for OfflineQueueConfig {
62 fn default() -> Self {
63 Self {
64 max_queue_size: 1000,
65 enable_persistence: false,
66 default_timeout: Duration::from_secs(300), enable_prioritization: true,
68 max_request_age: Duration::from_secs(3600), enable_auto_replay: true,
70 replay_batch_size: 10,
71 replay_batch_delay: Duration::from_millis(100),
72 }
73 }
74}
75
76impl OfflineQueueConfig {
77 pub fn mobile() -> Self {
79 Self {
80 max_queue_size: 500,
81 enable_persistence: true,
82 default_timeout: Duration::from_secs(600),
83 enable_prioritization: true,
84 max_request_age: Duration::from_secs(1800), enable_auto_replay: true,
86 replay_batch_size: 5,
87 replay_batch_delay: Duration::from_millis(200),
88 }
89 }
90
91 pub fn iot() -> Self {
93 Self {
94 max_queue_size: 100,
95 enable_persistence: true,
96 default_timeout: Duration::from_secs(900), enable_prioritization: true,
98 max_request_age: Duration::from_secs(3600),
99 enable_auto_replay: true,
100 replay_batch_size: 3,
101 replay_batch_delay: Duration::from_millis(500),
102 }
103 }
104
105 pub fn validate(&self) -> Result<(), OfflineQueueError> {
107 if self.max_queue_size == 0 {
108 return Err(OfflineQueueError::InvalidConfig(
109 "max_queue_size must be > 0".to_string(),
110 ));
111 }
112
113 if self.replay_batch_size == 0 {
114 return Err(OfflineQueueError::InvalidConfig(
115 "replay_batch_size must be > 0".to_string(),
116 ));
117 }
118
119 Ok(())
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
125pub enum RequestPriority {
126 Low = 0,
127 Normal = 1,
128 High = 2,
129 Critical = 3,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
134pub enum QueuedRequestType {
135 ProvideContent(String),
137 FindProviders(String),
139 GetValue(String),
141 PutValue { key: String, value: Vec<u8> },
143 Custom { operation: String, data: Vec<u8> },
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct QueuedRequest {
150 pub id: String,
152 pub request_type: QueuedRequestType,
154 pub priority: RequestPriority,
156 #[serde(skip)]
158 pub queued_at: Option<Instant>,
159 pub queued_timestamp: u64,
161 pub timeout: Duration,
163 pub retry_count: u32,
165 pub max_retries: u32,
167}
168
169impl QueuedRequest {
170 pub fn new(
172 id: String,
173 request_type: QueuedRequestType,
174 priority: RequestPriority,
175 timeout: Duration,
176 ) -> Self {
177 let now = Instant::now();
178 Self {
179 id,
180 request_type,
181 priority,
182 queued_at: Some(now),
183 queued_timestamp: now.elapsed().as_secs(),
184 timeout,
185 retry_count: 0,
186 max_retries: 3,
187 }
188 }
189
190 pub fn is_expired(&self, max_age: Duration) -> bool {
192 if let Some(queued_at) = self.queued_at {
193 Instant::now().duration_since(queued_at) > max_age
194 } else {
195 false
196 }
197 }
198
199 pub fn is_timed_out(&self) -> bool {
201 if let Some(queued_at) = self.queued_at {
202 Instant::now().duration_since(queued_at) > self.timeout
203 } else {
204 false
205 }
206 }
207
208 pub fn should_retry(&self) -> bool {
210 self.retry_count < self.max_retries
211 }
212}
213
214struct QueueState {
216 requests: VecDeque<QueuedRequest>,
218 in_flight: Vec<String>,
220 is_online: bool,
222 last_cleanup: Instant,
224}
225
226impl QueueState {
227 fn new() -> Self {
228 Self {
229 requests: VecDeque::new(),
230 in_flight: Vec::new(),
231 is_online: false,
232 last_cleanup: Instant::now(),
233 }
234 }
235}
236
237pub struct OfflineQueue {
239 config: OfflineQueueConfig,
240 state: Arc<RwLock<QueueState>>,
241 stats: Arc<RwLock<OfflineQueueStats>>,
242}
243
244impl OfflineQueue {
245 pub fn new(config: OfflineQueueConfig) -> Result<Self, OfflineQueueError> {
247 config.validate()?;
248
249 Ok(Self {
250 config,
251 state: Arc::new(RwLock::new(QueueState::new())),
252 stats: Arc::new(RwLock::new(OfflineQueueStats::default())),
253 })
254 }
255
256 pub fn enqueue(&self, request: QueuedRequest) -> Result<(), OfflineQueueError> {
258 let mut state = self.state.write();
259 let mut stats = self.stats.write();
260
261 if state.requests.len() >= self.config.max_queue_size {
263 stats.requests_dropped += 1;
264 return Err(OfflineQueueError::QueueFull);
265 }
266
267 if self.config.enable_prioritization {
269 let insert_pos = state
271 .requests
272 .iter()
273 .position(|r| r.priority < request.priority)
274 .unwrap_or(state.requests.len());
275
276 state.requests.insert(insert_pos, request);
277 } else {
278 state.requests.push_back(request);
279 }
280
281 stats.requests_queued += 1;
282
283 Ok(())
284 }
285
286 pub fn dequeue(&self) -> Option<QueuedRequest> {
288 let mut state = self.state.write();
289
290 if !state.is_online {
291 return None;
292 }
293
294 while let Some(request) = state.requests.pop_front() {
295 if request.is_expired(self.config.max_request_age) {
297 let mut stats = self.stats.write();
298 stats.requests_expired += 1;
299 continue;
300 }
301
302 if request.is_timed_out() {
304 let mut stats = self.stats.write();
305 stats.requests_timed_out += 1;
306 continue;
307 }
308
309 state.in_flight.push(request.id.clone());
310 return Some(request);
311 }
312
313 None
314 }
315
316 pub fn mark_completed(&self, request_id: &str, success: bool) {
318 let mut state = self.state.write();
319 let mut stats = self.stats.write();
320
321 state.in_flight.retain(|id| id != request_id);
322
323 if success {
324 stats.requests_completed += 1;
325 } else {
326 stats.requests_failed += 1;
327 }
328 }
329
330 pub fn requeue(&self, mut request: QueuedRequest) -> Result<(), OfflineQueueError> {
332 let mut state = self.state.write();
333
334 state.in_flight.retain(|id| id != &request.id);
335
336 if !request.should_retry() {
337 let mut stats = self.stats.write();
338 stats.requests_failed += 1;
339 return Ok(());
340 }
341
342 request.retry_count += 1;
343
344 if self.config.enable_prioritization {
345 let insert_pos = state
346 .requests
347 .iter()
348 .position(|r| r.priority < request.priority)
349 .unwrap_or(state.requests.len());
350
351 state.requests.insert(insert_pos, request);
352 } else {
353 state.requests.push_back(request);
354 }
355
356 let mut stats = self.stats.write();
357 stats.requests_retried += 1;
358
359 Ok(())
360 }
361
362 pub fn set_online(&self, online: bool) {
364 let mut state = self.state.write();
365 state.is_online = online;
366
367 if online {
368 let mut stats = self.stats.write();
369 stats.online_transitions += 1;
370 } else {
371 let mut stats = self.stats.write();
372 stats.offline_transitions += 1;
373 }
374 }
375
376 pub fn is_online(&self) -> bool {
378 self.state.read().is_online
379 }
380
381 pub fn pending_count(&self) -> usize {
383 self.state.read().requests.len()
384 }
385
386 pub fn in_flight_count(&self) -> usize {
388 self.state.read().in_flight.len()
389 }
390
391 pub fn cleanup_expired(&self) {
393 let mut state = self.state.write();
394 let mut stats = self.stats.write();
395
396 let initial_len = state.requests.len();
397
398 state
399 .requests
400 .retain(|r| !r.is_expired(self.config.max_request_age));
401
402 let removed = initial_len - state.requests.len();
403 stats.requests_expired += removed as u64;
404
405 state.last_cleanup = Instant::now();
406 }
407
408 pub fn get_replay_batch(&self) -> Vec<QueuedRequest> {
410 let mut batch = Vec::with_capacity(self.config.replay_batch_size);
411
412 for _ in 0..self.config.replay_batch_size {
413 if let Some(request) = self.dequeue() {
414 batch.push(request);
415 } else {
416 break;
417 }
418 }
419
420 batch
421 }
422
423 pub fn stats(&self) -> OfflineQueueStats {
425 self.stats.read().clone()
426 }
427
428 pub fn reset_stats(&self) {
430 *self.stats.write() = OfflineQueueStats::default();
431 }
432
433 pub fn clear(&self) {
435 let mut state = self.state.write();
436 state.requests.clear();
437 state.in_flight.clear();
438 }
439}
440
441#[derive(Debug, Clone, Default)]
443pub struct OfflineQueueStats {
444 pub requests_queued: u64,
446 pub requests_dropped: u64,
448 pub requests_completed: u64,
450 pub requests_failed: u64,
452 pub requests_expired: u64,
454 pub requests_timed_out: u64,
456 pub requests_retried: u64,
458 pub online_transitions: u64,
460 pub offline_transitions: u64,
462}
463
464impl OfflineQueueStats {
465 pub fn success_rate(&self) -> f64 {
467 let total = self.requests_completed + self.requests_failed;
468 if total == 0 {
469 return 0.0;
470 }
471 self.requests_completed as f64 / total as f64
472 }
473
474 pub fn completion_rate(&self) -> f64 {
476 if self.requests_queued == 0 {
477 return 0.0;
478 }
479 self.requests_completed as f64 / self.requests_queued as f64
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 #[test]
488 fn test_config_default() {
489 let config = OfflineQueueConfig::default();
490 assert!(config.validate().is_ok());
491 assert_eq!(config.max_queue_size, 1000);
492 }
493
494 #[test]
495 fn test_config_mobile() {
496 let config = OfflineQueueConfig::mobile();
497 assert!(config.validate().is_ok());
498 assert_eq!(config.max_queue_size, 500);
499 }
500
501 #[test]
502 fn test_config_iot() {
503 let config = OfflineQueueConfig::iot();
504 assert!(config.validate().is_ok());
505 assert_eq!(config.max_queue_size, 100);
506 }
507
508 #[test]
509 fn test_enqueue() {
510 let config = OfflineQueueConfig::default();
511 let queue = OfflineQueue::new(config).unwrap();
512
513 let request = QueuedRequest::new(
514 "test1".to_string(),
515 QueuedRequestType::FindProviders("QmTest".to_string()),
516 RequestPriority::Normal,
517 Duration::from_secs(60),
518 );
519
520 assert!(queue.enqueue(request).is_ok());
521 assert_eq!(queue.pending_count(), 1);
522 }
523
524 #[test]
525 fn test_priority_ordering() {
526 let config = OfflineQueueConfig::default();
527 let queue = OfflineQueue::new(config).unwrap();
528
529 let req1 = QueuedRequest::new(
531 "low".to_string(),
532 QueuedRequestType::FindProviders("QmTest1".to_string()),
533 RequestPriority::Low,
534 Duration::from_secs(60),
535 );
536 queue.enqueue(req1).unwrap();
537
538 let req2 = QueuedRequest::new(
540 "high".to_string(),
541 QueuedRequestType::FindProviders("QmTest2".to_string()),
542 RequestPriority::High,
543 Duration::from_secs(60),
544 );
545 queue.enqueue(req2).unwrap();
546
547 queue.set_online(true);
549
550 let next = queue.dequeue().unwrap();
552 assert_eq!(next.id, "high");
553 }
554
555 #[test]
556 fn test_dequeue_when_offline() {
557 let config = OfflineQueueConfig::default();
558 let queue = OfflineQueue::new(config).unwrap();
559
560 let request = QueuedRequest::new(
561 "test1".to_string(),
562 QueuedRequestType::FindProviders("QmTest".to_string()),
563 RequestPriority::Normal,
564 Duration::from_secs(60),
565 );
566
567 queue.enqueue(request).unwrap();
568
569 assert!(queue.dequeue().is_none());
571 }
572
573 #[test]
574 fn test_dequeue_when_online() {
575 let config = OfflineQueueConfig::default();
576 let queue = OfflineQueue::new(config).unwrap();
577
578 let request = QueuedRequest::new(
579 "test1".to_string(),
580 QueuedRequestType::FindProviders("QmTest".to_string()),
581 RequestPriority::Normal,
582 Duration::from_secs(60),
583 );
584
585 queue.enqueue(request).unwrap();
586 queue.set_online(true);
587
588 let req = queue.dequeue();
590 assert!(req.is_some());
591 assert_eq!(req.unwrap().id, "test1");
592 }
593
594 #[test]
595 fn test_mark_completed() {
596 let config = OfflineQueueConfig::default();
597 let queue = OfflineQueue::new(config).unwrap();
598
599 let request = QueuedRequest::new(
600 "test1".to_string(),
601 QueuedRequestType::FindProviders("QmTest".to_string()),
602 RequestPriority::Normal,
603 Duration::from_secs(60),
604 );
605
606 queue.enqueue(request).unwrap();
607 queue.set_online(true);
608 let req = queue.dequeue().unwrap();
609
610 queue.mark_completed(&req.id, true);
611
612 let stats = queue.stats();
613 assert_eq!(stats.requests_completed, 1);
614 }
615
616 #[test]
617 fn test_requeue() {
618 let config = OfflineQueueConfig::default();
619 let queue = OfflineQueue::new(config).unwrap();
620
621 let mut request = QueuedRequest::new(
622 "test1".to_string(),
623 QueuedRequestType::FindProviders("QmTest".to_string()),
624 RequestPriority::Normal,
625 Duration::from_secs(60),
626 );
627 request.max_retries = 3;
628
629 queue.enqueue(request.clone()).unwrap();
630 queue.set_online(true);
631 let req = queue.dequeue().unwrap();
632
633 queue.requeue(req).unwrap();
634
635 let stats = queue.stats();
636 assert_eq!(stats.requests_retried, 1);
637 }
638
639 #[test]
640 fn test_queue_full() {
641 let config = OfflineQueueConfig {
642 max_queue_size: 2,
643 ..Default::default()
644 };
645 let queue = OfflineQueue::new(config).unwrap();
646
647 let req1 = QueuedRequest::new(
648 "test1".to_string(),
649 QueuedRequestType::FindProviders("QmTest1".to_string()),
650 RequestPriority::Normal,
651 Duration::from_secs(60),
652 );
653 let req2 = QueuedRequest::new(
654 "test2".to_string(),
655 QueuedRequestType::FindProviders("QmTest2".to_string()),
656 RequestPriority::Normal,
657 Duration::from_secs(60),
658 );
659 let req3 = QueuedRequest::new(
660 "test3".to_string(),
661 QueuedRequestType::FindProviders("QmTest3".to_string()),
662 RequestPriority::Normal,
663 Duration::from_secs(60),
664 );
665
666 assert!(queue.enqueue(req1).is_ok());
667 assert!(queue.enqueue(req2).is_ok());
668 assert!(matches!(
669 queue.enqueue(req3),
670 Err(OfflineQueueError::QueueFull)
671 ));
672 }
673
674 #[test]
675 fn test_get_replay_batch() {
676 let config = OfflineQueueConfig {
677 replay_batch_size: 3,
678 ..Default::default()
679 };
680 let queue = OfflineQueue::new(config).unwrap();
681
682 for i in 0..5 {
683 let req = QueuedRequest::new(
684 format!("test{}", i),
685 QueuedRequestType::FindProviders(format!("QmTest{}", i)),
686 RequestPriority::Normal,
687 Duration::from_secs(60),
688 );
689 queue.enqueue(req).unwrap();
690 }
691
692 queue.set_online(true);
693
694 let batch = queue.get_replay_batch();
695 assert_eq!(batch.len(), 3);
696 }
697
698 #[test]
699 fn test_stats_success_rate() {
700 let stats = OfflineQueueStats {
701 requests_completed: 8,
702 requests_failed: 2,
703 ..Default::default()
704 };
705
706 assert_eq!(stats.success_rate(), 0.8);
707 }
708
709 #[test]
710 fn test_clear() {
711 let config = OfflineQueueConfig::default();
712 let queue = OfflineQueue::new(config).unwrap();
713
714 let req = QueuedRequest::new(
715 "test1".to_string(),
716 QueuedRequestType::FindProviders("QmTest".to_string()),
717 RequestPriority::Normal,
718 Duration::from_secs(60),
719 );
720 queue.enqueue(req).unwrap();
721
722 assert_eq!(queue.pending_count(), 1);
723
724 queue.clear();
725 assert_eq!(queue.pending_count(), 0);
726 }
727}