1#![deny(unsafe_code)]
2
3use crate::traffic_obfuscation::{TrafficObfuscationConfig, TrafficObfuscator};
4use crate::types::{MessagePriority, NetworkError, NetworkMessage};
5use anyhow::Result;
6use blake3::Hash;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11use tokio::sync::{mpsc, Mutex, RwLock};
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
15pub struct SerializableHash(pub [u8; 32]);
16
17impl From<Hash> for SerializableHash {
18 fn from(hash: Hash) -> Self {
19 SerializableHash(*hash.as_bytes())
20 }
21}
22
23impl From<SerializableHash> for Hash {
24 fn from(hash: SerializableHash) -> Self {
25 Hash::from(hash.0)
26 }
27}
28
29impl SerializableHash {
30 pub fn as_bytes(&self) -> &[u8; 32] {
32 &self.0
33 }
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct MessageEnvelope {
39 pub message: NetworkMessage,
41 pub hash: SerializableHash,
43 pub timestamp: u64,
45 pub signature: Option<Vec<u8>>,
47}
48
49impl MessageEnvelope {
50 pub fn new(message: NetworkMessage) -> Self {
51 let timestamp = SystemTime::now()
52 .duration_since(UNIX_EPOCH)
53 .unwrap()
54 .as_secs();
55
56 let mut hasher = blake3::Hasher::new();
57 hasher.update(&bincode::serialize(&message).unwrap());
58 hasher.update(×tamp.to_le_bytes());
59
60 Self {
61 message,
62 hash: hasher.finalize().into(),
63 timestamp,
64 signature: None,
65 }
66 }
67
68 pub fn verify(&self) -> bool {
69 let mut hasher = blake3::Hasher::new();
70 hasher.update(&bincode::serialize(&self.message).unwrap());
71 hasher.update(&self.timestamp.to_le_bytes());
72
73 self.hash == hasher.finalize().into()
74 }
75
76 pub fn sign(&mut self, key: &[u8]) -> Result<(), NetworkError> {
77 let signature = ring::signature::Ed25519KeyPair::from_seed_unchecked(key)
79 .map_err(|e| NetworkError::EncryptionError(e.to_string()))?;
80
81 self.signature = Some(signature.sign(self.hash.as_bytes()).as_ref().to_vec());
82 Ok(())
83 }
84
85 pub fn verify_signature(&self, public_key: &[u8]) -> Result<bool, NetworkError> {
86 match &self.signature {
87 Some(sig) => {
88 let peer_public_key =
89 ring::signature::UnparsedPublicKey::new(&ring::signature::ED25519, public_key);
90
91 peer_public_key
92 .verify(self.hash.as_bytes(), sig)
93 .map(|_| true)
94 .map_err(|e| NetworkError::EncryptionError(e.to_string()))
95 }
96 None => Ok(false),
97 }
98 }
99}
100
101pub struct MessageQueue {
102 high_priority: Arc<Mutex<VecDeque<MessageEnvelope>>>,
104 normal_priority: Arc<Mutex<VecDeque<MessageEnvelope>>>,
106 low_priority: Arc<Mutex<VecDeque<MessageEnvelope>>>,
108 notify_tx: mpsc::Sender<()>,
110 obfuscator: Option<Arc<TrafficObfuscator>>,
112 obfuscation_config: Arc<RwLock<TrafficObfuscationConfig>>,
114}
115
116impl MessageQueue {
117 pub fn new() -> (Self, mpsc::Receiver<()>) {
119 let (tx, rx) = mpsc::channel(1000);
120
121 let queue = Self {
122 high_priority: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
123 normal_priority: Arc::new(Mutex::new(VecDeque::with_capacity(50000))),
124 low_priority: Arc::new(Mutex::new(VecDeque::with_capacity(100000))),
125 notify_tx: tx,
126 obfuscator: None,
127 obfuscation_config: Arc::new(RwLock::new(TrafficObfuscationConfig::default())),
128 };
129
130 (queue, rx)
131 }
132
133 pub fn with_obfuscation(config: TrafficObfuscationConfig) -> (Self, mpsc::Receiver<()>) {
135 let (tx, rx) = mpsc::channel(1000);
136 let obfuscator = Arc::new(TrafficObfuscator::new(config.clone()));
137
138 let queue = Self {
139 high_priority: Arc::new(Mutex::new(VecDeque::with_capacity(10000))),
140 normal_priority: Arc::new(Mutex::new(VecDeque::with_capacity(50000))),
141 low_priority: Arc::new(Mutex::new(VecDeque::with_capacity(100000))),
142 notify_tx: tx,
143 obfuscator: Some(obfuscator),
144 obfuscation_config: Arc::new(RwLock::new(config)),
145 };
146
147 (queue, rx)
148 }
149
150 pub async fn enable_obfuscation(&mut self, config: TrafficObfuscationConfig) {
152 self.obfuscator = Some(Arc::new(TrafficObfuscator::new(config.clone())));
153 *self.obfuscation_config.write().await = config;
154
155 if let Some(obfuscator) = &self.obfuscator {
157 obfuscator.start().await;
158 }
159 }
160
161 pub async fn enqueue(&self, mut msg: NetworkMessage) -> Result<(), NetworkError> {
163 if let Some(obfuscator) = &self.obfuscator {
165 let obfuscated_payload = obfuscator.obfuscate_message(msg.clone()).await?;
167
168 if obfuscated_payload.is_empty() {
170 return Ok(());
171 }
172
173 msg.payload = obfuscated_payload;
175 }
176
177 let envelope = MessageEnvelope::new(msg.clone());
178
179 if !envelope.verify() {
181 return Err(NetworkError::Internal(
182 "Message integrity check failed".into(),
183 ));
184 }
185 let queue = match msg.priority {
186 MessagePriority::High => &self.high_priority,
187 MessagePriority::Normal => &self.normal_priority,
188 MessagePriority::Low => &self.low_priority,
189 };
190
191 queue.lock().await.push_back(envelope);
192 let _ = self.notify_tx.send(()).await;
193 Ok(())
194 }
195
196 pub async fn dequeue(&self) -> Option<MessageEnvelope> {
198 if let Some(msg) = self.high_priority.lock().await.pop_front() {
199 return Some(msg);
200 }
201
202 if let Some(msg) = self.normal_priority.lock().await.pop_front() {
203 return Some(msg);
204 }
205
206 self.low_priority.lock().await.pop_front()
207 }
208
209 pub async fn len(&self) -> usize {
211 let high = self.high_priority.lock().await.len();
212 let normal = self.normal_priority.lock().await.len();
213 let low = self.low_priority.lock().await.len();
214 high + normal + low
215 }
216
217 pub async fn is_empty(&self) -> bool {
219 self.len().await == 0
220 }
221
222 pub async fn purge_expired(&self) {
224 let now = SystemTime::now()
225 .duration_since(UNIX_EPOCH)
226 .unwrap()
227 .as_secs();
228
229 let mut high = self.high_priority.lock().await;
231 high.retain(|env| env.message.ttl.as_secs() + env.timestamp > now);
232
233 let mut normal = self.normal_priority.lock().await;
235 normal.retain(|env| env.message.ttl.as_secs() + env.timestamp > now);
236
237 let mut low = self.low_priority.lock().await;
239 low.retain(|env| env.message.ttl.as_secs() + env.timestamp > now);
240 }
241
242 pub async fn process_batch(&self) -> Result<Vec<MessageEnvelope>, NetworkError> {
244 if let Some(obfuscator) = &self.obfuscator {
245 let obfuscated_messages = obfuscator.process_batch().await?;
246
247 let mut envelopes = Vec::new();
248 for obfuscated_data in obfuscated_messages {
249 let msg = NetworkMessage {
251 id: uuid::Uuid::new_v4().to_string(),
252 source: vec![],
253 destination: vec![],
254 payload: obfuscated_data,
255 priority: MessagePriority::Normal,
256 ttl: std::time::Duration::from_secs(300),
257 };
258 envelopes.push(MessageEnvelope::new(msg));
259 }
260
261 Ok(envelopes)
262 } else {
263 Ok(Vec::new())
264 }
265 }
266
267 pub async fn get_obfuscation_stats(
269 &self,
270 ) -> Option<crate::traffic_obfuscation::ObfuscationStats> {
271 if let Some(obfuscator) = &self.obfuscator {
272 Some(obfuscator.get_stats().await)
273 } else {
274 None
275 }
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use std::time::Duration;
283
284 #[tokio::test]
285 async fn test_message_queue() {
286 use std::thread;
287
288 let (queue, _rx) = MessageQueue::new();
289
290 let msg1 = NetworkMessage {
292 id: "1".into(),
293 source: vec![1],
294 destination: vec![2],
295 payload: vec![0; 100],
296 priority: MessagePriority::High,
297 ttl: Duration::from_secs(60),
298 };
299
300 let msg2 = NetworkMessage {
301 id: "2".into(),
302 source: vec![1],
303 destination: vec![2],
304 payload: vec![0; 100],
305 priority: MessagePriority::Normal,
306 ttl: Duration::from_secs(60),
307 };
308
309 assert!(queue.enqueue(msg1.clone()).await.is_ok());
311
312 let envelope = queue.dequeue().await.unwrap();
314 assert!(envelope.verify());
315 assert!(queue.enqueue(msg2.clone()).await.is_ok());
316 assert_eq!(queue.len().await, 2);
317
318 let dequeued = queue.dequeue().await.unwrap();
320 assert_eq!(dequeued.message.id, "1"); let dequeued = queue.dequeue().await.unwrap();
323 assert_eq!(dequeued.message.id, "2"); let msg3 = NetworkMessage {
327 id: "3".into(),
328 source: vec![1],
329 destination: vec![2],
330 payload: vec![0; 100],
331 priority: MessagePriority::Low,
332 ttl: Duration::from_secs(1), };
334
335 assert!(queue.enqueue(msg3).await.is_ok());
336 assert_eq!(queue.len().await, 1);
337
338 thread::sleep(Duration::from_secs(2));
340 queue.purge_expired().await;
341 assert_eq!(queue.len().await, 0);
342 }
343}