ipfrs_network/
offline_queue.rs

1//! Offline request queue for mobile and intermittent connectivity
2//!
3//! This module provides queuing for network operations when offline:
4//! - Queue requests when network is unavailable
5//! - Automatic replay when connection is restored
6//! - Request prioritization
7//! - Timeout management
8//! - Persistent storage support
9
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::VecDeque;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use thiserror::Error;
16
17/// Errors that can occur during offline queue operations
18#[derive(Error, Debug, Clone)]
19pub enum OfflineQueueError {
20    #[error("Queue is full")]
21    QueueFull,
22
23    #[error("Request expired")]
24    RequestExpired,
25
26    #[error("Invalid configuration: {0}")]
27    InvalidConfig(String),
28
29    #[error("Serialization error: {0}")]
30    SerializationError(String),
31}
32
33/// Configuration for offline queue
34#[derive(Debug, Clone)]
35pub struct OfflineQueueConfig {
36    /// Maximum number of queued requests
37    pub max_queue_size: usize,
38
39    /// Enable persistent storage of queue
40    pub enable_persistence: bool,
41
42    /// Default request timeout
43    pub default_timeout: Duration,
44
45    /// Enable request prioritization
46    pub enable_prioritization: bool,
47
48    /// Maximum age for a request before automatic removal
49    pub max_request_age: Duration,
50
51    /// Enable automatic replay when online
52    pub enable_auto_replay: bool,
53
54    /// Batch size for replay
55    pub replay_batch_size: usize,
56
57    /// Delay between replay batches
58    pub replay_batch_delay: Duration,
59}
60
61impl Default for OfflineQueueConfig {
62    fn default() -> Self {
63        Self {
64            max_queue_size: 1000,
65            enable_persistence: false,
66            default_timeout: Duration::from_secs(300), // 5 minutes
67            enable_prioritization: true,
68            max_request_age: Duration::from_secs(3600), // 1 hour
69            enable_auto_replay: true,
70            replay_batch_size: 10,
71            replay_batch_delay: Duration::from_millis(100),
72        }
73    }
74}
75
76impl OfflineQueueConfig {
77    /// Configuration for mobile devices
78    pub fn mobile() -> Self {
79        Self {
80            max_queue_size: 500,
81            enable_persistence: true,
82            default_timeout: Duration::from_secs(600),
83            enable_prioritization: true,
84            max_request_age: Duration::from_secs(1800), // 30 minutes
85            enable_auto_replay: true,
86            replay_batch_size: 5,
87            replay_batch_delay: Duration::from_millis(200),
88        }
89    }
90
91    /// Configuration for IoT devices
92    pub fn iot() -> Self {
93        Self {
94            max_queue_size: 100,
95            enable_persistence: true,
96            default_timeout: Duration::from_secs(900), // 15 minutes
97            enable_prioritization: true,
98            max_request_age: Duration::from_secs(3600),
99            enable_auto_replay: true,
100            replay_batch_size: 3,
101            replay_batch_delay: Duration::from_millis(500),
102        }
103    }
104
105    /// Validate the configuration
106    pub fn validate(&self) -> Result<(), OfflineQueueError> {
107        if self.max_queue_size == 0 {
108            return Err(OfflineQueueError::InvalidConfig(
109                "max_queue_size must be > 0".to_string(),
110            ));
111        }
112
113        if self.replay_batch_size == 0 {
114            return Err(OfflineQueueError::InvalidConfig(
115                "replay_batch_size must be > 0".to_string(),
116            ));
117        }
118
119        Ok(())
120    }
121}
122
123/// Priority level for queued requests
124#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
125pub enum RequestPriority {
126    Low = 0,
127    Normal = 1,
128    High = 2,
129    Critical = 3,
130}
131
132/// Type of queued request
133#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
134pub enum QueuedRequestType {
135    /// Provide content to DHT
136    ProvideContent(String),
137    /// Find providers for content
138    FindProviders(String),
139    /// Get value from DHT
140    GetValue(String),
141    /// Put value to DHT
142    PutValue { key: String, value: Vec<u8> },
143    /// Custom request
144    Custom { operation: String, data: Vec<u8> },
145}
146
147/// A queued request waiting for network connectivity
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct QueuedRequest {
150    /// Unique request ID
151    pub id: String,
152    /// Request type
153    pub request_type: QueuedRequestType,
154    /// Priority level
155    pub priority: RequestPriority,
156    /// When the request was queued
157    #[serde(skip)]
158    pub queued_at: Option<Instant>,
159    /// Serialized timestamp for persistence
160    pub queued_timestamp: u64,
161    /// Request timeout
162    pub timeout: Duration,
163    /// Number of retry attempts
164    pub retry_count: u32,
165    /// Maximum retry attempts
166    pub max_retries: u32,
167}
168
169impl QueuedRequest {
170    /// Create a new queued request
171    pub fn new(
172        id: String,
173        request_type: QueuedRequestType,
174        priority: RequestPriority,
175        timeout: Duration,
176    ) -> Self {
177        let now = Instant::now();
178        Self {
179            id,
180            request_type,
181            priority,
182            queued_at: Some(now),
183            queued_timestamp: now.elapsed().as_secs(),
184            timeout,
185            retry_count: 0,
186            max_retries: 3,
187        }
188    }
189
190    /// Check if request has expired
191    pub fn is_expired(&self, max_age: Duration) -> bool {
192        if let Some(queued_at) = self.queued_at {
193            Instant::now().duration_since(queued_at) > max_age
194        } else {
195            false
196        }
197    }
198
199    /// Check if request has timed out
200    pub fn is_timed_out(&self) -> bool {
201        if let Some(queued_at) = self.queued_at {
202            Instant::now().duration_since(queued_at) > self.timeout
203        } else {
204            false
205        }
206    }
207
208    /// Check if should retry
209    pub fn should_retry(&self) -> bool {
210        self.retry_count < self.max_retries
211    }
212}
213
214/// Offline queue state
215struct QueueState {
216    /// Pending requests
217    requests: VecDeque<QueuedRequest>,
218    /// Requests currently being replayed
219    in_flight: Vec<String>,
220    /// Network status
221    is_online: bool,
222    /// Last cleanup time
223    last_cleanup: Instant,
224}
225
226impl QueueState {
227    fn new() -> Self {
228        Self {
229            requests: VecDeque::new(),
230            in_flight: Vec::new(),
231            is_online: false,
232            last_cleanup: Instant::now(),
233        }
234    }
235}
236
237/// Offline request queue
238pub struct OfflineQueue {
239    config: OfflineQueueConfig,
240    state: Arc<RwLock<QueueState>>,
241    stats: Arc<RwLock<OfflineQueueStats>>,
242}
243
244impl OfflineQueue {
245    /// Create a new offline queue
246    pub fn new(config: OfflineQueueConfig) -> Result<Self, OfflineQueueError> {
247        config.validate()?;
248
249        Ok(Self {
250            config,
251            state: Arc::new(RwLock::new(QueueState::new())),
252            stats: Arc::new(RwLock::new(OfflineQueueStats::default())),
253        })
254    }
255
256    /// Add a request to the queue
257    pub fn enqueue(&self, request: QueuedRequest) -> Result<(), OfflineQueueError> {
258        let mut state = self.state.write();
259        let mut stats = self.stats.write();
260
261        // Check queue size
262        if state.requests.len() >= self.config.max_queue_size {
263            stats.requests_dropped += 1;
264            return Err(OfflineQueueError::QueueFull);
265        }
266
267        // Insert with priority
268        if self.config.enable_prioritization {
269            // Find insertion point based on priority
270            let insert_pos = state
271                .requests
272                .iter()
273                .position(|r| r.priority < request.priority)
274                .unwrap_or(state.requests.len());
275
276            state.requests.insert(insert_pos, request);
277        } else {
278            state.requests.push_back(request);
279        }
280
281        stats.requests_queued += 1;
282
283        Ok(())
284    }
285
286    /// Get the next request to replay
287    pub fn dequeue(&self) -> Option<QueuedRequest> {
288        let mut state = self.state.write();
289
290        if !state.is_online {
291            return None;
292        }
293
294        while let Some(request) = state.requests.pop_front() {
295            // Check if expired
296            if request.is_expired(self.config.max_request_age) {
297                let mut stats = self.stats.write();
298                stats.requests_expired += 1;
299                continue;
300            }
301
302            // Check if timed out
303            if request.is_timed_out() {
304                let mut stats = self.stats.write();
305                stats.requests_timed_out += 1;
306                continue;
307            }
308
309            state.in_flight.push(request.id.clone());
310            return Some(request);
311        }
312
313        None
314    }
315
316    /// Mark a request as completed
317    pub fn mark_completed(&self, request_id: &str, success: bool) {
318        let mut state = self.state.write();
319        let mut stats = self.stats.write();
320
321        state.in_flight.retain(|id| id != request_id);
322
323        if success {
324            stats.requests_completed += 1;
325        } else {
326            stats.requests_failed += 1;
327        }
328    }
329
330    /// Requeue a failed request for retry
331    pub fn requeue(&self, mut request: QueuedRequest) -> Result<(), OfflineQueueError> {
332        let mut state = self.state.write();
333
334        state.in_flight.retain(|id| id != &request.id);
335
336        if !request.should_retry() {
337            let mut stats = self.stats.write();
338            stats.requests_failed += 1;
339            return Ok(());
340        }
341
342        request.retry_count += 1;
343
344        if self.config.enable_prioritization {
345            let insert_pos = state
346                .requests
347                .iter()
348                .position(|r| r.priority < request.priority)
349                .unwrap_or(state.requests.len());
350
351            state.requests.insert(insert_pos, request);
352        } else {
353            state.requests.push_back(request);
354        }
355
356        let mut stats = self.stats.write();
357        stats.requests_retried += 1;
358
359        Ok(())
360    }
361
362    /// Set network online status
363    pub fn set_online(&self, online: bool) {
364        let mut state = self.state.write();
365        state.is_online = online;
366
367        if online {
368            let mut stats = self.stats.write();
369            stats.online_transitions += 1;
370        } else {
371            let mut stats = self.stats.write();
372            stats.offline_transitions += 1;
373        }
374    }
375
376    /// Check if network is online
377    pub fn is_online(&self) -> bool {
378        self.state.read().is_online
379    }
380
381    /// Get number of pending requests
382    pub fn pending_count(&self) -> usize {
383        self.state.read().requests.len()
384    }
385
386    /// Get number of in-flight requests
387    pub fn in_flight_count(&self) -> usize {
388        self.state.read().in_flight.len()
389    }
390
391    /// Clean up expired requests
392    pub fn cleanup_expired(&self) {
393        let mut state = self.state.write();
394        let mut stats = self.stats.write();
395
396        let initial_len = state.requests.len();
397
398        state
399            .requests
400            .retain(|r| !r.is_expired(self.config.max_request_age));
401
402        let removed = initial_len - state.requests.len();
403        stats.requests_expired += removed as u64;
404
405        state.last_cleanup = Instant::now();
406    }
407
408    /// Get a batch of requests for replay
409    pub fn get_replay_batch(&self) -> Vec<QueuedRequest> {
410        let mut batch = Vec::with_capacity(self.config.replay_batch_size);
411
412        for _ in 0..self.config.replay_batch_size {
413            if let Some(request) = self.dequeue() {
414                batch.push(request);
415            } else {
416                break;
417            }
418        }
419
420        batch
421    }
422
423    /// Get current statistics
424    pub fn stats(&self) -> OfflineQueueStats {
425        self.stats.read().clone()
426    }
427
428    /// Reset statistics
429    pub fn reset_stats(&self) {
430        *self.stats.write() = OfflineQueueStats::default();
431    }
432
433    /// Clear all pending requests
434    pub fn clear(&self) {
435        let mut state = self.state.write();
436        state.requests.clear();
437        state.in_flight.clear();
438    }
439}
440
441/// Statistics for offline queue
442#[derive(Debug, Clone, Default)]
443pub struct OfflineQueueStats {
444    /// Total requests queued
445    pub requests_queued: u64,
446    /// Requests dropped (queue full)
447    pub requests_dropped: u64,
448    /// Requests completed successfully
449    pub requests_completed: u64,
450    /// Requests that failed
451    pub requests_failed: u64,
452    /// Requests that expired
453    pub requests_expired: u64,
454    /// Requests that timed out
455    pub requests_timed_out: u64,
456    /// Requests retried
457    pub requests_retried: u64,
458    /// Online transitions
459    pub online_transitions: u64,
460    /// Offline transitions
461    pub offline_transitions: u64,
462}
463
464impl OfflineQueueStats {
465    /// Calculate success rate
466    pub fn success_rate(&self) -> f64 {
467        let total = self.requests_completed + self.requests_failed;
468        if total == 0 {
469            return 0.0;
470        }
471        self.requests_completed as f64 / total as f64
472    }
473
474    /// Calculate completion rate (including expired/dropped)
475    pub fn completion_rate(&self) -> f64 {
476        if self.requests_queued == 0 {
477            return 0.0;
478        }
479        self.requests_completed as f64 / self.requests_queued as f64
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    #[test]
488    fn test_config_default() {
489        let config = OfflineQueueConfig::default();
490        assert!(config.validate().is_ok());
491        assert_eq!(config.max_queue_size, 1000);
492    }
493
494    #[test]
495    fn test_config_mobile() {
496        let config = OfflineQueueConfig::mobile();
497        assert!(config.validate().is_ok());
498        assert_eq!(config.max_queue_size, 500);
499    }
500
501    #[test]
502    fn test_config_iot() {
503        let config = OfflineQueueConfig::iot();
504        assert!(config.validate().is_ok());
505        assert_eq!(config.max_queue_size, 100);
506    }
507
508    #[test]
509    fn test_enqueue() {
510        let config = OfflineQueueConfig::default();
511        let queue = OfflineQueue::new(config).unwrap();
512
513        let request = QueuedRequest::new(
514            "test1".to_string(),
515            QueuedRequestType::FindProviders("QmTest".to_string()),
516            RequestPriority::Normal,
517            Duration::from_secs(60),
518        );
519
520        assert!(queue.enqueue(request).is_ok());
521        assert_eq!(queue.pending_count(), 1);
522    }
523
524    #[test]
525    fn test_priority_ordering() {
526        let config = OfflineQueueConfig::default();
527        let queue = OfflineQueue::new(config).unwrap();
528
529        // Add low priority
530        let req1 = QueuedRequest::new(
531            "low".to_string(),
532            QueuedRequestType::FindProviders("QmTest1".to_string()),
533            RequestPriority::Low,
534            Duration::from_secs(60),
535        );
536        queue.enqueue(req1).unwrap();
537
538        // Add high priority
539        let req2 = QueuedRequest::new(
540            "high".to_string(),
541            QueuedRequestType::FindProviders("QmTest2".to_string()),
542            RequestPriority::High,
543            Duration::from_secs(60),
544        );
545        queue.enqueue(req2).unwrap();
546
547        // Set online to enable dequeue
548        queue.set_online(true);
549
550        // High priority should come out first
551        let next = queue.dequeue().unwrap();
552        assert_eq!(next.id, "high");
553    }
554
555    #[test]
556    fn test_dequeue_when_offline() {
557        let config = OfflineQueueConfig::default();
558        let queue = OfflineQueue::new(config).unwrap();
559
560        let request = QueuedRequest::new(
561            "test1".to_string(),
562            QueuedRequestType::FindProviders("QmTest".to_string()),
563            RequestPriority::Normal,
564            Duration::from_secs(60),
565        );
566
567        queue.enqueue(request).unwrap();
568
569        // Should return None when offline
570        assert!(queue.dequeue().is_none());
571    }
572
573    #[test]
574    fn test_dequeue_when_online() {
575        let config = OfflineQueueConfig::default();
576        let queue = OfflineQueue::new(config).unwrap();
577
578        let request = QueuedRequest::new(
579            "test1".to_string(),
580            QueuedRequestType::FindProviders("QmTest".to_string()),
581            RequestPriority::Normal,
582            Duration::from_secs(60),
583        );
584
585        queue.enqueue(request).unwrap();
586        queue.set_online(true);
587
588        // Should return request when online
589        let req = queue.dequeue();
590        assert!(req.is_some());
591        assert_eq!(req.unwrap().id, "test1");
592    }
593
594    #[test]
595    fn test_mark_completed() {
596        let config = OfflineQueueConfig::default();
597        let queue = OfflineQueue::new(config).unwrap();
598
599        let request = QueuedRequest::new(
600            "test1".to_string(),
601            QueuedRequestType::FindProviders("QmTest".to_string()),
602            RequestPriority::Normal,
603            Duration::from_secs(60),
604        );
605
606        queue.enqueue(request).unwrap();
607        queue.set_online(true);
608        let req = queue.dequeue().unwrap();
609
610        queue.mark_completed(&req.id, true);
611
612        let stats = queue.stats();
613        assert_eq!(stats.requests_completed, 1);
614    }
615
616    #[test]
617    fn test_requeue() {
618        let config = OfflineQueueConfig::default();
619        let queue = OfflineQueue::new(config).unwrap();
620
621        let mut request = QueuedRequest::new(
622            "test1".to_string(),
623            QueuedRequestType::FindProviders("QmTest".to_string()),
624            RequestPriority::Normal,
625            Duration::from_secs(60),
626        );
627        request.max_retries = 3;
628
629        queue.enqueue(request.clone()).unwrap();
630        queue.set_online(true);
631        let req = queue.dequeue().unwrap();
632
633        queue.requeue(req).unwrap();
634
635        let stats = queue.stats();
636        assert_eq!(stats.requests_retried, 1);
637    }
638
639    #[test]
640    fn test_queue_full() {
641        let config = OfflineQueueConfig {
642            max_queue_size: 2,
643            ..Default::default()
644        };
645        let queue = OfflineQueue::new(config).unwrap();
646
647        let req1 = QueuedRequest::new(
648            "test1".to_string(),
649            QueuedRequestType::FindProviders("QmTest1".to_string()),
650            RequestPriority::Normal,
651            Duration::from_secs(60),
652        );
653        let req2 = QueuedRequest::new(
654            "test2".to_string(),
655            QueuedRequestType::FindProviders("QmTest2".to_string()),
656            RequestPriority::Normal,
657            Duration::from_secs(60),
658        );
659        let req3 = QueuedRequest::new(
660            "test3".to_string(),
661            QueuedRequestType::FindProviders("QmTest3".to_string()),
662            RequestPriority::Normal,
663            Duration::from_secs(60),
664        );
665
666        assert!(queue.enqueue(req1).is_ok());
667        assert!(queue.enqueue(req2).is_ok());
668        assert!(matches!(
669            queue.enqueue(req3),
670            Err(OfflineQueueError::QueueFull)
671        ));
672    }
673
674    #[test]
675    fn test_get_replay_batch() {
676        let config = OfflineQueueConfig {
677            replay_batch_size: 3,
678            ..Default::default()
679        };
680        let queue = OfflineQueue::new(config).unwrap();
681
682        for i in 0..5 {
683            let req = QueuedRequest::new(
684                format!("test{}", i),
685                QueuedRequestType::FindProviders(format!("QmTest{}", i)),
686                RequestPriority::Normal,
687                Duration::from_secs(60),
688            );
689            queue.enqueue(req).unwrap();
690        }
691
692        queue.set_online(true);
693
694        let batch = queue.get_replay_batch();
695        assert_eq!(batch.len(), 3);
696    }
697
698    #[test]
699    fn test_stats_success_rate() {
700        let stats = OfflineQueueStats {
701            requests_completed: 8,
702            requests_failed: 2,
703            ..Default::default()
704        };
705
706        assert_eq!(stats.success_rate(), 0.8);
707    }
708
709    #[test]
710    fn test_clear() {
711        let config = OfflineQueueConfig::default();
712        let queue = OfflineQueue::new(config).unwrap();
713
714        let req = QueuedRequest::new(
715            "test1".to_string(),
716            QueuedRequestType::FindProviders("QmTest".to_string()),
717            RequestPriority::Normal,
718            Duration::from_secs(60),
719        );
720        queue.enqueue(req).unwrap();
721
722        assert_eq!(queue.pending_count(), 1);
723
724        queue.clear();
725        assert_eq!(queue.pending_count(), 0);
726    }
727}