qudag_network/
message.rs

1#![deny(unsafe_code)]
2
3use crate::traffic_obfuscation::{TrafficObfuscationConfig, TrafficObfuscator};
4use crate::types::{MessagePriority, NetworkError, NetworkMessage};
5use anyhow::Result;
6use blake3::Hash;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11use tokio::sync::{mpsc, Mutex, RwLock};
12
13/// Serializable wrapper for blake3::Hash
14#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
15pub struct SerializableHash(pub [u8; 32]);
16
17impl From<Hash> for SerializableHash {
18    fn from(hash: Hash) -> Self {
19        SerializableHash(*hash.as_bytes())
20    }
21}
22
23impl From<SerializableHash> for Hash {
24    fn from(hash: SerializableHash) -> Self {
25        Hash::from(hash.0)
26    }
27}
28
29impl SerializableHash {
30    /// Get the hash as bytes
31    pub fn as_bytes(&self) -> &[u8; 32] {
32        &self.0
33    }
34}
35
36/// High-performance message queue for network messages
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct MessageEnvelope {
39    /// The actual message
40    pub message: NetworkMessage,
41    /// Message hash for integrity
42    pub hash: SerializableHash,
43    /// Timestamp
44    pub timestamp: u64,
45    /// Signature
46    pub signature: Option<Vec<u8>>,
47}
48
49impl MessageEnvelope {
50    pub fn new(message: NetworkMessage) -> Self {
51        let timestamp = SystemTime::now()
52            .duration_since(UNIX_EPOCH)
53            .unwrap()
54            .as_secs();
55
56        let mut hasher = blake3::Hasher::new();
57        hasher.update(&bincode::serialize(&message).unwrap());
58        hasher.update(&timestamp.to_le_bytes());
59
60        Self {
61            message,
62            hash: hasher.finalize().into(),
63            timestamp,
64            signature: None,
65        }
66    }
67
68    pub fn verify(&self) -> bool {
69        let mut hasher = blake3::Hasher::new();
70        hasher.update(&bincode::serialize(&self.message).unwrap());
71        hasher.update(&self.timestamp.to_le_bytes());
72
73        self.hash == hasher.finalize().into()
74    }
75
76    pub fn sign(&mut self, key: &[u8]) -> Result<(), NetworkError> {
77        // Sign the message hash
78        let signature = ring::signature::Ed25519KeyPair::from_seed_unchecked(key)
79            .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
80
81        self.signature = Some(signature.sign(self.hash.as_bytes()).as_ref().to_vec());
82        Ok(())
83    }
84
85    pub fn verify_signature(&self, public_key: &[u8]) -> Result<bool, NetworkError> {
86        match &self.signature {
87            Some(sig) => {
88                let peer_public_key =
89                    ring::signature::UnparsedPublicKey::new(&ring::signature::ED25519, public_key);
90
91                peer_public_key
92                    .verify(self.hash.as_bytes(), sig)
93                    .map(|_| true)
94                    .map_err(|e| NetworkError::EncryptionError(e.to_string()))
95            }
96            None => Ok(false),
97        }
98    }
99}
100
101pub struct MessageQueue {
102    /// High priority message queue
103    high_priority: Arc<Mutex<VecDeque<MessageEnvelope>>>,
104    /// Normal priority message queue  
105    normal_priority: Arc<Mutex<VecDeque<MessageEnvelope>>>,
106    /// Low priority message queue
107    low_priority: Arc<Mutex<VecDeque<MessageEnvelope>>>,
108    /// Channel for message notifications
109    notify_tx: mpsc::Sender<()>,
110    /// Traffic obfuscator for message processing
111    obfuscator: Option<Arc<TrafficObfuscator>>,
112    /// Configuration for traffic obfuscation
113    obfuscation_config: Arc<RwLock<TrafficObfuscationConfig>>,
114}
115
116impl MessageQueue {
117    /// Creates a new message queue
118    pub fn new() -> (Self, mpsc::Receiver<()>) {
119        let (tx, rx) = mpsc::channel(1000);
120
121        let queue = Self {
122            high_priority: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
123            normal_priority: Arc::new(Mutex::new(VecDeque::with_capacity(50000))),
124            low_priority: Arc::new(Mutex::new(VecDeque::with_capacity(100000))),
125            notify_tx: tx,
126            obfuscator: None,
127            obfuscation_config: Arc::new(RwLock::new(TrafficObfuscationConfig::default())),
128        };
129
130        (queue, rx)
131    }
132
133    /// Creates a new message queue with traffic obfuscation
134    pub fn with_obfuscation(config: TrafficObfuscationConfig) -> (Self, mpsc::Receiver<()>) {
135        let (tx, rx) = mpsc::channel(1000);
136        let obfuscator = Arc::new(TrafficObfuscator::new(config.clone()));
137
138        let queue = Self {
139            high_priority: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
140            normal_priority: Arc::new(Mutex::new(VecDeque::with_capacity(50000))),
141            low_priority: Arc::new(Mutex::new(VecDeque::with_capacity(100000))),
142            notify_tx: tx,
143            obfuscator: Some(obfuscator),
144            obfuscation_config: Arc::new(RwLock::new(config)),
145        };
146
147        (queue, rx)
148    }
149
150    /// Enable traffic obfuscation
151    pub async fn enable_obfuscation(&mut self, config: TrafficObfuscationConfig) {
152        self.obfuscator = Some(Arc::new(TrafficObfuscator::new(config.clone())));
153        *self.obfuscation_config.write().await = config;
154
155        // Start the obfuscator
156        if let Some(obfuscator) = &self.obfuscator {
157            obfuscator.start().await;
158        }
159    }
160
161    /// Enqueues a message with the specified priority
162    pub async fn enqueue(&self, mut msg: NetworkMessage) -> Result<(), NetworkError> {
163        // Apply obfuscation if enabled
164        if let Some(obfuscator) = &self.obfuscator {
165            // Process message through obfuscation pipeline
166            let obfuscated_payload = obfuscator.obfuscate_message(msg.clone()).await?;
167
168            // If obfuscation returns empty (batching), don't enqueue directly
169            if obfuscated_payload.is_empty() {
170                return Ok(());
171            }
172
173            // Update message with obfuscated payload
174            msg.payload = obfuscated_payload;
175        }
176
177        let envelope = MessageEnvelope::new(msg.clone());
178
179        // Verify message integrity
180        if !envelope.verify() {
181            return Err(NetworkError::Internal(
182                "Message integrity check failed".into(),
183            ));
184        }
185        let queue = match msg.priority {
186            MessagePriority::High => &self.high_priority,
187            MessagePriority::Normal => &self.normal_priority,
188            MessagePriority::Low => &self.low_priority,
189        };
190
191        queue.lock().await.push_back(envelope);
192        let _ = self.notify_tx.send(()).await;
193        Ok(())
194    }
195
196    /// Dequeues the next message by priority
197    pub async fn dequeue(&self) -> Option<MessageEnvelope> {
198        if let Some(msg) = self.high_priority.lock().await.pop_front() {
199            return Some(msg);
200        }
201
202        if let Some(msg) = self.normal_priority.lock().await.pop_front() {
203            return Some(msg);
204        }
205
206        self.low_priority.lock().await.pop_front()
207    }
208
209    /// Returns the total number of queued messages
210    pub async fn len(&self) -> usize {
211        let high = self.high_priority.lock().await.len();
212        let normal = self.normal_priority.lock().await.len();
213        let low = self.low_priority.lock().await.len();
214        high + normal + low
215    }
216
217    /// Returns true if the queue is empty
218    pub async fn is_empty(&self) -> bool {
219        self.len().await == 0
220    }
221
222    /// Purge expired messages
223    pub async fn purge_expired(&self) {
224        let now = SystemTime::now()
225            .duration_since(UNIX_EPOCH)
226            .unwrap()
227            .as_secs();
228
229        // Purge high priority
230        let mut high = self.high_priority.lock().await;
231        high.retain(|env| env.message.ttl.as_secs() + env.timestamp > now);
232
233        // Purge normal priority
234        let mut normal = self.normal_priority.lock().await;
235        normal.retain(|env| env.message.ttl.as_secs() + env.timestamp > now);
236
237        // Purge low priority
238        let mut low = self.low_priority.lock().await;
239        low.retain(|env| env.message.ttl.as_secs() + env.timestamp > now);
240    }
241
242    /// Process batched messages if obfuscation is enabled
243    pub async fn process_batch(&self) -> Result<Vec<MessageEnvelope>, NetworkError> {
244        if let Some(obfuscator) = &self.obfuscator {
245            let obfuscated_messages = obfuscator.process_batch().await?;
246
247            let mut envelopes = Vec::new();
248            for obfuscated_data in obfuscated_messages {
249                // Create a dummy message envelope for obfuscated data
250                let msg = NetworkMessage {
251                    id: uuid::Uuid::new_v4().to_string(),
252                    source: vec![],
253                    destination: vec![],
254                    payload: obfuscated_data,
255                    priority: MessagePriority::Normal,
256                    ttl: std::time::Duration::from_secs(300),
257                };
258                envelopes.push(MessageEnvelope::new(msg));
259            }
260
261            Ok(envelopes)
262        } else {
263            Ok(Vec::new())
264        }
265    }
266
267    /// Get obfuscation statistics
268    pub async fn get_obfuscation_stats(
269        &self,
270    ) -> Option<crate::traffic_obfuscation::ObfuscationStats> {
271        if let Some(obfuscator) = &self.obfuscator {
272            Some(obfuscator.get_stats().await)
273        } else {
274            None
275        }
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use std::time::Duration;
283
284    #[tokio::test]
285    async fn test_message_queue() {
286        use std::thread;
287
288        let (queue, _rx) = MessageQueue::new();
289
290        // Create test messages
291        let msg1 = NetworkMessage {
292            id: "1".into(),
293            source: vec![1],
294            destination: vec![2],
295            payload: vec![0; 100],
296            priority: MessagePriority::High,
297            ttl: Duration::from_secs(60),
298        };
299
300        let msg2 = NetworkMessage {
301            id: "2".into(),
302            source: vec![1],
303            destination: vec![2],
304            payload: vec![0; 100],
305            priority: MessagePriority::Normal,
306            ttl: Duration::from_secs(60),
307        };
308
309        // Test enqueue
310        assert!(queue.enqueue(msg1.clone()).await.is_ok());
311
312        // Test message verification
313        let envelope = queue.dequeue().await.unwrap();
314        assert!(envelope.verify());
315        assert!(queue.enqueue(msg2.clone()).await.is_ok());
316        assert_eq!(queue.len().await, 2);
317
318        // Test priority dequeue
319        let dequeued = queue.dequeue().await.unwrap();
320        assert_eq!(dequeued.message.id, "1"); // High priority dequeued first
321
322        let dequeued = queue.dequeue().await.unwrap();
323        assert_eq!(dequeued.message.id, "2"); // Normal priority dequeued second
324
325        // Test message expiry
326        let msg3 = NetworkMessage {
327            id: "3".into(),
328            source: vec![1],
329            destination: vec![2],
330            payload: vec![0; 100],
331            priority: MessagePriority::Low,
332            ttl: Duration::from_secs(1), // Short TTL
333        };
334
335        assert!(queue.enqueue(msg3).await.is_ok());
336        assert_eq!(queue.len().await, 1);
337
338        // Wait for message to expire
339        thread::sleep(Duration::from_secs(2));
340        queue.purge_expired().await;
341        assert_eq!(queue.len().await, 0);
342    }
343}