1use std::collections::{HashMap, HashSet, VecDeque};
4use std::time::{Duration, Instant};
5
6use hkdf::Hkdf;
7use num_bigint::{BigInt, BigUint, ToBigInt};
8use num_traits::{One, Zero};
9use rand::RngCore;
10use serde::Serialize;
11use sha2::Sha256;
12use thiserror::Error;
13use url::Url;
14
15use crate::key_exchange::{KeyExchangeError, KeyExchangeMessage, mod_pow, parse_key_exchange};
16use crate::policy::{AutoKeyExchangeMode, RoutingPolicy};
17use crate::protocol::{
18 CmrMessage, CmrTimestamp, MessageId, ParseContext, ParseError, Signature, TransportKind,
19 parse_message,
20};
21
22#[derive(Debug, Error)]
24pub enum CompressionError {
25 #[error("compressor unavailable: {0}")]
27 Unavailable(String),
28 #[error("compressor failure: {0}")]
30 Failed(String),
31}
32
33pub trait CompressionOracle: Send + Sync {
35 fn compression_distance(&self, left: &[u8], right: &[u8]) -> Result<f64, CompressionError>;
38 fn compression_distance_chain(
41 &self,
42 left_parts: &[&[u8]],
43 right_parts: &[&[u8]],
44 ) -> Result<f64, CompressionError> {
45 fn join(parts: &[&[u8]]) -> Vec<u8> {
46 let total = parts.iter().map(|p| p.len()).sum::<usize>();
47 let mut out = Vec::with_capacity(total);
48 for part in parts {
49 out.extend_from_slice(part);
50 }
51 out
52 }
53 let left = join(left_parts);
54 let right = join(right_parts);
55 self.compression_distance(&left, &right)
56 }
57 fn intrinsic_dependence(&self, data: &[u8], max_order: i64) -> Result<f64, CompressionError>;
59 fn batch_compression_distance(
61 &self,
62 target: &[u8],
63 candidates: &[Vec<u8>],
64 ) -> Result<Vec<f64>, CompressionError> {
65 let mut out = Vec::with_capacity(candidates.len());
66 for candidate in candidates {
67 out.push(self.compression_distance(target, candidate)?);
68 }
69 Ok(out)
70 }
71}
72
73#[derive(Clone, Debug)]
74struct CacheEntry {
75 key: String,
76 message: CmrMessage,
77 encoded_size: usize,
78}
79
80#[derive(Debug)]
81struct MessageCache {
82 entries: HashMap<String, CacheEntry>,
83 order: VecDeque<String>,
84 id_counts: HashMap<String, usize>,
85 total_bytes: usize,
86 max_messages: usize,
87 max_bytes: usize,
88 total_evictions: u64,
89}
90
91impl MessageCache {
92 fn new(max_messages: usize, max_bytes: usize) -> Self {
93 Self {
94 entries: HashMap::new(),
95 order: VecDeque::new(),
96 id_counts: HashMap::new(),
97 total_bytes: 0,
98 max_messages,
99 max_bytes,
100 total_evictions: 0,
101 }
102 }
103
104 fn has_seen_any_id(&self, message: &CmrMessage) -> bool {
105 message
106 .header
107 .iter()
108 .any(|id| self.id_counts.contains_key(&id.to_string()))
109 }
110
111 fn insert(&mut self, mut message: CmrMessage) {
112 message.make_unsigned();
114 let key = cache_key(&message);
115 if self.entries.contains_key(&key) {
116 return;
117 }
118 let encoded_size = message.encoded_len();
119 let entry = CacheEntry {
120 key: key.clone(),
121 message,
122 encoded_size,
123 };
124 self.total_bytes = self.total_bytes.saturating_add(encoded_size);
125 self.order.push_back(key.clone());
126 self.add_message_ids(&entry.message);
127 self.entries.insert(key, entry);
128 self.evict_as_needed();
129 }
130
131 fn evict_as_needed(&mut self) {
132 while self.entries.len() > self.max_messages || self.total_bytes > self.max_bytes {
133 let Some(oldest_key) = self.order.pop_front() else {
134 break;
135 };
136 if let Some(entry) = self.entries.remove(&oldest_key) {
137 self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
138 self.remove_message_ids(&entry.message);
139 self.total_evictions = self.total_evictions.saturating_add(1);
140 }
141 }
142 }
143
144 fn add_message_ids(&mut self, message: &CmrMessage) {
145 for id in &message.header {
146 *self.id_counts.entry(id.to_string()).or_default() += 1;
147 }
148 }
149
150 fn remove_message_ids(&mut self, message: &CmrMessage) {
151 for id in &message.header {
152 let id_key = id.to_string();
153 let mut remove = false;
154 if let Some(count) = self.id_counts.get_mut(&id_key) {
155 if *count > 1 {
156 *count -= 1;
157 } else {
158 remove = true;
159 }
160 }
161 if remove {
162 self.id_counts.remove(&id_key);
163 }
164 }
165 }
166
167 fn remove_by_key(&mut self, key: &str) -> Option<CacheEntry> {
168 let entry = self.entries.remove(key)?;
169 self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
170 self.remove_message_ids(&entry.message);
171 self.order.retain(|existing| existing != key);
172 Some(entry)
173 }
174
175 fn remove_if(&mut self, mut predicate: impl FnMut(&CmrMessage) -> bool) {
176 let to_remove = self
177 .order
178 .iter()
179 .filter_map(|key| {
180 self.entries
181 .get(key)
182 .filter(|entry| predicate(&entry.message))
183 .map(|_| key.clone())
184 })
185 .collect::<Vec<_>>();
186 for key in to_remove {
187 let _ = self.remove_by_key(&key);
188 }
189 }
190}
191
192#[derive(Clone, Debug, Serialize)]
194pub struct CacheStats {
195 pub entry_count: usize,
197 pub total_bytes: usize,
199 pub max_messages: usize,
201 pub max_bytes: usize,
203 pub total_evictions: u64,
205}
206
207#[derive(Clone, Debug, Serialize)]
209pub struct CacheEntryView {
210 pub key: String,
212 pub encoded_size: usize,
214 pub sender: String,
216 pub timestamp: String,
218 pub body_preview: String,
220}
221
222#[derive(Clone, Debug)]
223struct PeerMetrics {
224 reputation: f64,
225 inbound_messages: u64,
226 inbound_bytes: u64,
227 outbound_messages: u64,
228 outbound_bytes: u64,
229 window: RateWindow,
230}
231
232#[derive(Clone, Debug, Serialize)]
234pub struct PeerSnapshot {
235 pub peer: String,
237 pub reputation: f64,
239 pub inbound_messages: u64,
241 pub inbound_bytes: u64,
243 pub outbound_messages: u64,
245 pub outbound_bytes: u64,
247 pub current_window_messages: usize,
249 pub current_window_bytes: u64,
251 pub has_shared_key: bool,
253 pub pending_key_exchange: bool,
255}
256
257impl Default for PeerMetrics {
258 fn default() -> Self {
259 Self {
260 reputation: 0.0,
261 inbound_messages: 0,
262 inbound_bytes: 0,
263 outbound_messages: 0,
264 outbound_bytes: 0,
265 window: RateWindow::new(),
266 }
267 }
268}
269
270#[derive(Clone, Debug)]
271struct RateWindow {
272 window: VecDeque<(Instant, u64)>,
273 bytes: u64,
274}
275
276impl RateWindow {
277 fn new() -> Self {
278 Self {
279 window: VecDeque::new(),
280 bytes: 0,
281 }
282 }
283
284 fn allow_and_record(
285 &mut self,
286 message_bytes: usize,
287 max_messages_per_minute: u32,
288 max_bytes_per_minute: u64,
289 ) -> bool {
290 let now = Instant::now();
291 let cutoff = Duration::from_secs(60);
292 while let Some((ts, bytes)) = self.window.front().copied() {
293 if now.duration_since(ts) < cutoff {
294 break;
295 }
296 self.window.pop_front();
297 self.bytes = self.bytes.saturating_sub(bytes);
298 }
299 let next_messages = self.window.len().saturating_add(1);
300 let next_bytes = self
301 .bytes
302 .saturating_add(u64::try_from(message_bytes).unwrap_or(u64::MAX));
303 if next_messages > usize::try_from(max_messages_per_minute).unwrap_or(usize::MAX)
304 || next_bytes > max_bytes_per_minute
305 {
306 return false;
307 }
308 self.window
309 .push_back((now, u64::try_from(message_bytes).unwrap_or(u64::MAX)));
310 self.bytes = next_bytes;
311 true
312 }
313
314 fn current_messages(&self) -> usize {
315 self.window.len()
316 }
317
318 fn current_bytes(&self) -> u64 {
319 self.bytes
320 }
321}
322
323#[derive(Clone, Debug)]
324struct PendingRsaState {
325 n: BigUint,
326 d: BigUint,
327}
328
329#[derive(Clone, Debug)]
330struct PendingDhState {
331 p: BigUint,
332 a_secret: BigUint,
333}
334
335const MIN_RSA_MODULUS_BITS: u64 = 2048;
336const MIN_DH_MODULUS_BITS: u64 = 2048;
337
338#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
340pub enum ForwardReason {
341 MatchedForwardIncoming,
343 MatchedForwardCached,
345 CompensatoryReply,
347 KeyExchangeInitiation,
349 KeyExchangeReply,
351}
352
353#[derive(Clone, Debug, Default)]
354struct RoutingDecision {
355 best_peer: Option<String>,
356 best_distance_raw: Option<f64>,
357 best_distance_normalized: Option<f64>,
358 threshold_raw: f64,
359 matched_messages: Vec<CmrMessage>,
360 compensatory: Option<(String, CmrMessage)>,
361}
362
363#[derive(Clone, Debug, Default)]
364struct RoutingActions {
365 forwards: Vec<ForwardAction>,
366 client_plans: Vec<ClientMessagePlan>,
367}
368
369#[derive(Clone, Debug)]
371pub struct ForwardAction {
372 pub destination: String,
374 pub message_bytes: Vec<u8>,
376 pub reason: ForwardReason,
378}
379
380#[derive(Clone, Debug)]
382pub struct ClientMessagePlan {
383 pub destination: String,
385 pub body: Vec<u8>,
387 pub signing_key: Option<Vec<u8>>,
389 pub reason: ForwardReason,
391}
392
393#[derive(Clone, Debug, Default)]
394struct KeyExchangeControlOutcome {
395 forwards: Vec<ForwardAction>,
396 client_plans: Vec<ClientMessagePlan>,
397}
398
399#[derive(Debug, Error)]
401pub enum ProcessError {
402 #[error("parse error: {0}")]
404 Parse(#[from] ParseError),
405 #[error("duplicate message id in cache")]
407 DuplicateMessageId,
408 #[error("peer exceeded flood limits")]
410 FloodLimited,
411 #[error("global flood limits exceeded")]
413 GlobalFloodLimited,
414 #[error("peer reputation below threshold")]
416 ReputationTooLow,
417 #[error("unsigned message violates trust policy")]
419 UnsignedRejected,
420 #[error("signature verification failed")]
422 BadSignature,
423 #[error("signed message without known key rejected")]
425 SignedWithoutKnownKey,
426 #[error("message body exceeds content policy")]
428 BodyTooLarge,
429 #[error("binary content blocked by policy")]
431 BinaryContentBlocked,
432 #[error("executable payload blocked by policy")]
434 ExecutableBlocked,
435 #[error("message failed intrinsic dependence spam check")]
437 IntrinsicDependenceTooLow,
438 #[error("message intrinsic dependence score was not finite")]
440 IntrinsicDependenceInvalid,
441 #[error("compression oracle error: {0}")]
443 Compression(#[from] CompressionError),
444 #[error("key exchange parse error: {0}")]
446 KeyExchange(#[from] KeyExchangeError),
447 #[error("clear key exchange requires secure channel")]
449 ClearKeyOnInsecureChannel,
450 #[error("unexpected key exchange reply without pending state")]
452 MissingPendingKeyExchangeState,
453 #[error("weak key exchange parameters: {0}")]
455 WeakKeyExchangeParameters(&'static str),
456}
457
458#[derive(Debug)]
460pub struct ProcessOutcome {
461 pub accepted: bool,
463 pub drop_reason: Option<ProcessError>,
465 pub parsed_message: Option<CmrMessage>,
467 pub intrinsic_dependence: Option<f64>,
469 pub forwards: Vec<ForwardAction>,
471 pub client_plans: Vec<ClientMessagePlan>,
473 pub local_matches: Vec<CmrMessage>,
475 pub matched_count: usize,
477 pub routing_diagnostics: Option<RoutingDiagnostics>,
479 pub key_exchange_control: bool,
481}
482
483#[derive(Clone, Debug)]
485pub struct RoutingDiagnostics {
486 pub best_peer: Option<String>,
488 pub best_distance_raw: Option<f64>,
490 pub best_distance_normalized: Option<f64>,
492 pub threshold_raw: f64,
494}
495
496impl ProcessOutcome {
497 fn dropped(reason: ProcessError) -> Self {
498 Self {
499 accepted: false,
500 drop_reason: Some(reason),
501 parsed_message: None,
502 intrinsic_dependence: None,
503 forwards: Vec::new(),
504 client_plans: Vec::new(),
505 local_matches: Vec::new(),
506 matched_count: 0,
507 routing_diagnostics: None,
508 key_exchange_control: false,
509 }
510 }
511
512 fn accepted(message: CmrMessage) -> Self {
513 Self {
514 accepted: true,
515 drop_reason: None,
516 parsed_message: Some(message),
517 intrinsic_dependence: None,
518 forwards: Vec::new(),
519 client_plans: Vec::new(),
520 local_matches: Vec::new(),
521 matched_count: 0,
522 routing_diagnostics: None,
523 key_exchange_control: false,
524 }
525 }
526}
527
528pub struct Router<O: CompressionOracle> {
530 local_address: String,
531 policy: RoutingPolicy,
532 oracle: O,
533 cache: MessageCache,
534 peers: HashMap<String, PeerMetrics>,
535 global_window: RateWindow,
536 shared_keys: HashMap<String, Vec<u8>>,
537 pending_rsa: HashMap<String, PendingRsaState>,
538 pending_dh: HashMap<String, PendingDhState>,
539 forward_counter: u64,
540}
541
542impl<O: CompressionOracle> Router<O> {
543 fn normalized_match_distance(&self, raw_distance: f64, incoming_len: usize) -> Option<f64> {
544 if !raw_distance.is_finite() {
545 return None;
546 }
547 let bounded = raw_distance.max(0.0);
548 let scale = incoming_len.max(1) as f64;
549 Some((bounded / scale).clamp(0.0, 1.0))
550 }
551
552 #[must_use]
554 pub fn new(local_address: String, policy: RoutingPolicy, oracle: O) -> Self {
555 Self {
556 local_address,
557 cache: MessageCache::new(policy.cache_max_messages, policy.cache_max_bytes),
558 policy,
559 oracle,
560 peers: HashMap::new(),
561 global_window: RateWindow::new(),
562 shared_keys: HashMap::new(),
563 pending_rsa: HashMap::new(),
564 pending_dh: HashMap::new(),
565 forward_counter: 0,
566 }
567 }
568
569 pub fn set_shared_key(&mut self, peer: impl Into<String>, key: Vec<u8>) {
571 self.shared_keys.insert(peer.into(), key);
572 }
573
574 #[must_use]
576 pub fn local_address(&self) -> &str {
577 &self.local_address
578 }
579
580 #[must_use]
582 pub fn shared_key(&self, peer: &str) -> Option<&[u8]> {
583 self.shared_keys.get(peer).map(Vec::as_slice)
584 }
585
586 #[must_use]
588 pub fn policy(&self) -> &RoutingPolicy {
589 &self.policy
590 }
591
592 pub fn set_policy(&mut self, policy: RoutingPolicy) {
594 self.cache.max_messages = policy.cache_max_messages;
595 self.cache.max_bytes = policy.cache_max_bytes;
596 self.policy = policy;
597 self.cache.evict_as_needed();
598 }
599
600 #[must_use]
602 pub fn peer_snapshots(&self) -> Vec<PeerSnapshot> {
603 let mut names = self.peers.keys().cloned().collect::<HashSet<_>>();
604 names.extend(self.shared_keys.keys().cloned());
605 names.extend(self.pending_rsa.keys().cloned());
606 names.extend(self.pending_dh.keys().cloned());
607 let mut peers = names
608 .into_iter()
609 .filter(|peer| !self.is_local_peer_alias(peer))
610 .map(|peer| {
611 let metrics = self.peers.get(&peer).cloned().unwrap_or_default();
612 PeerSnapshot {
613 reputation: metrics.reputation,
614 inbound_messages: metrics.inbound_messages,
615 inbound_bytes: metrics.inbound_bytes,
616 outbound_messages: metrics.outbound_messages,
617 outbound_bytes: metrics.outbound_bytes,
618 current_window_messages: metrics.window.current_messages(),
619 current_window_bytes: metrics.window.current_bytes(),
620 has_shared_key: self.shared_keys.contains_key(&peer),
621 pending_key_exchange: self.pending_rsa.contains_key(&peer)
622 || self.pending_dh.contains_key(&peer),
623 peer,
624 }
625 })
626 .collect::<Vec<_>>();
627 peers.sort_by(|left, right| left.peer.cmp(&right.peer));
628 peers
629 }
630
631 #[must_use]
633 pub fn peer_count(&self) -> usize {
634 let mut names = self.peers.keys().cloned().collect::<HashSet<_>>();
635 names.extend(self.shared_keys.keys().cloned());
636 names.extend(self.pending_rsa.keys().cloned());
637 names.extend(self.pending_dh.keys().cloned());
638 names
639 .into_iter()
640 .filter(|peer| !self.is_local_peer_alias(peer))
641 .count()
642 }
643
644 #[must_use]
646 pub fn known_keys_count(&self) -> usize {
647 self.shared_keys.len()
648 }
649
650 #[must_use]
652 pub fn pending_key_exchange_count(&self) -> usize {
653 self.pending_rsa.len().saturating_add(self.pending_dh.len())
654 }
655
656 pub fn adjust_reputation(&mut self, peer: &str, delta: f64) {
658 self.adjust_peer_reputation(peer, delta);
659 }
660
661 pub fn remove_peer(&mut self, peer: &str) -> bool {
663 let mut removed = false;
664 removed |= self.peers.remove(peer).is_some();
665 removed |= self.shared_keys.remove(peer).is_some();
666 removed |= self.pending_rsa.remove(peer).is_some();
667 removed |= self.pending_dh.remove(peer).is_some();
668 removed
669 }
670
671 pub fn record_successful_outbound(&mut self, peer: &str, bytes: usize) {
673 self.record_peer_outbound(peer, bytes);
674 }
675
676 pub fn cache_local_client_message(
678 &mut self,
679 raw_message: &[u8],
680 now: CmrTimestamp,
681 ) -> Result<(), ProcessError> {
682 let parse_ctx = ParseContext {
683 now,
684 recipient_address: None,
685 max_message_bytes: self.policy.content.max_message_bytes,
686 max_header_ids: self.policy.content.max_header_ids,
687 };
688 let parsed = parse_message(raw_message, &parse_ctx)?;
689 if parsed.body.len() > self.policy.content.max_body_bytes {
690 return Err(ProcessError::BodyTooLarge);
691 }
692 if !self.policy.content.allow_binary_payloads && is_probably_binary(&parsed.body) {
693 return Err(ProcessError::BinaryContentBlocked);
694 }
695 if self.policy.content.block_executable_magic && looks_like_executable(&parsed.body) {
696 return Err(ProcessError::ExecutableBlocked);
697 }
698 if self.cache.has_seen_any_id(&parsed) {
699 return Err(ProcessError::DuplicateMessageId);
700 }
701 self.cache.insert(parsed);
702 Ok(())
703 }
704
705 #[must_use]
707 pub fn cache_stats(&self) -> CacheStats {
708 CacheStats {
709 entry_count: self.cache.entries.len(),
710 total_bytes: self.cache.total_bytes,
711 max_messages: self.cache.max_messages,
712 max_bytes: self.cache.max_bytes,
713 total_evictions: self.cache.total_evictions,
714 }
715 }
716
717 #[must_use]
719 pub fn cache_entries(&self) -> Vec<CacheEntryView> {
720 self.cache
721 .order
722 .iter()
723 .filter_map(|key| self.cache.entries.get(key))
724 .map(|entry| {
725 let timestamp = entry
726 .message
727 .origin_id()
728 .map_or_else(String::new, |id| id.timestamp.to_string());
729 let body_preview = String::from_utf8_lossy(&entry.message.body)
730 .chars()
731 .take(128)
732 .collect::<String>();
733 CacheEntryView {
734 key: entry.key.clone(),
735 encoded_size: entry.encoded_size,
736 sender: entry.message.immediate_sender().to_owned(),
737 timestamp,
738 body_preview,
739 }
740 })
741 .collect()
742 }
743
744 pub fn cache_message_distance(
746 &self,
747 left_key: &str,
748 right_key: &str,
749 ) -> Result<Option<f64>, CompressionError> {
750 let Some(left) = self.cache.entries.get(left_key) else {
751 return Ok(None);
752 };
753 let Some(right) = self.cache.entries.get(right_key) else {
754 return Ok(None);
755 };
756 let distance = self
757 .oracle
758 .compression_distance(&left.message.to_bytes(), &right.message.to_bytes())?;
759 Ok(Some(distance))
760 }
761
762 pub fn register_pending_rsa_state(&mut self, peer: impl Into<String>, n: BigUint, d: BigUint) {
764 self.pending_rsa
765 .insert(peer.into(), PendingRsaState { n, d });
766 }
767
768 pub fn register_pending_dh_state(
770 &mut self,
771 peer: impl Into<String>,
772 p: BigUint,
773 a_secret: BigUint,
774 ) {
775 self.pending_dh
776 .insert(peer.into(), PendingDhState { p, a_secret });
777 }
778
779 pub fn initiate_rsa_key_exchange(
781 &mut self,
782 destination: &str,
783 now: &CmrTimestamp,
784 ) -> Option<ClientMessagePlan> {
785 self.build_rsa_initiation_plan(destination, now)
786 }
787
788 pub fn initiate_dh_key_exchange(
790 &mut self,
791 destination: &str,
792 now: &CmrTimestamp,
793 ) -> Option<ClientMessagePlan> {
794 self.build_dh_initiation_plan(destination, now)
795 }
796
797 pub fn initiate_clear_key_exchange(
799 &mut self,
800 destination: &str,
801 clear_key: Vec<u8>,
802 _now: &CmrTimestamp,
803 ) -> Option<ClientMessagePlan> {
804 if clear_key.is_empty() {
805 return None;
806 }
807 let signing_key = self.shared_keys.get(destination).cloned();
808 self.shared_keys.insert(
809 destination.to_owned(),
810 derive_exchange_key_from_bytes(&self.local_address, destination, b"clear", &clear_key),
811 );
812 self.purge_key_exchange_cache(destination);
813 Some(ClientMessagePlan {
814 destination: destination.to_owned(),
815 body: KeyExchangeMessage::ClearKey { key: clear_key }
816 .render()
817 .into_bytes(),
818 signing_key,
819 reason: ForwardReason::KeyExchangeInitiation,
820 })
821 }
822
823 #[must_use]
825 pub fn process_incoming(
826 &mut self,
827 raw_message: &[u8],
828 transport: TransportKind,
829 now: CmrTimestamp,
830 ) -> ProcessOutcome {
831 self.process_message(raw_message, transport, now, true)
832 }
833
834 #[must_use]
840 pub fn process_local_client_message(
841 &mut self,
842 raw_message: &[u8],
843 transport: TransportKind,
844 now: CmrTimestamp,
845 ) -> ProcessOutcome {
846 self.process_message(raw_message, transport, now, false)
847 }
848
849 fn process_message(
850 &mut self,
851 raw_message: &[u8],
852 transport: TransportKind,
853 now: CmrTimestamp,
854 enforce_recipient_guard: bool,
855 ) -> ProcessOutcome {
856 let parse_ctx = ParseContext {
857 now: now.clone(),
858 recipient_address: enforce_recipient_guard.then_some(self.local_address.as_str()),
859 max_message_bytes: self.policy.content.max_message_bytes,
860 max_header_ids: self.policy.content.max_header_ids,
861 };
862
863 let parsed = match parse_message(raw_message, &parse_ctx) {
864 Ok(m) => m,
865 Err(err) => return ProcessOutcome::dropped(ProcessError::Parse(err)),
866 };
867
868 if parsed.body.len() > self.policy.content.max_body_bytes {
869 return self.drop_for_peer(&parsed, ProcessError::BodyTooLarge, -2.0);
870 }
871
872 let sender = parsed.immediate_sender().to_owned();
873 if !self.check_global_rate(raw_message.len()) {
874 return self.drop_for_peer(&parsed, ProcessError::GlobalFloodLimited, -1.5);
875 }
876 if !self.check_peer_rate(&sender, raw_message.len()) {
877 return self.drop_for_peer(&parsed, ProcessError::FloodLimited, -2.0);
878 }
879 if self.peer_reputation(&sender) < self.policy.trust.min_reputation_score {
880 return self.drop_for_peer(&parsed, ProcessError::ReputationTooLow, -0.5);
881 }
882 if let Err(err) = self.validate_signature_policy(&parsed, &sender) {
883 return self.drop_for_peer(&parsed, err, -4.0);
884 }
885 if self.cache.has_seen_any_id(&parsed) {
886 return self.drop_for_peer(&parsed, ProcessError::DuplicateMessageId, -0.1);
887 }
888 if !self.policy.content.allow_binary_payloads && is_probably_binary(&parsed.body) {
889 return self.drop_for_peer(&parsed, ProcessError::BinaryContentBlocked, -0.4);
890 }
891 if self.policy.content.block_executable_magic && looks_like_executable(&parsed.body) {
892 return self.drop_for_peer(&parsed, ProcessError::ExecutableBlocked, -2.5);
893 }
894
895 match self.handle_key_exchange_control(&parsed, &sender, &transport) {
896 Ok(Some(control)) => {
897 self.adjust_peer_reputation(&sender, 1.5);
898 self.record_peer_inbound(&sender, raw_message.len());
899 return ProcessOutcome {
900 accepted: true,
901 drop_reason: None,
902 parsed_message: Some(parsed),
903 intrinsic_dependence: None,
904 forwards: control.forwards,
905 client_plans: control.client_plans,
906 local_matches: Vec::new(),
907 matched_count: 0,
908 routing_diagnostics: None,
909 key_exchange_control: true,
910 };
911 }
912 Ok(None) => {}
913 Err(err) => {
914 let penalty = if matches!(err, ProcessError::MissingPendingKeyExchangeState) {
915 0.0
916 } else {
917 -3.0
918 };
919 return self.drop_for_peer(&parsed, err, penalty);
920 }
921 }
922
923 let id_score = match self
924 .oracle
925 .intrinsic_dependence(&parsed.body, self.policy.spam.intrinsic_dependence_order)
926 {
927 Ok(score) => score,
928 Err(err) => {
929 return self.drop_for_peer(
930 &parsed,
931 ProcessError::Compression(err),
932 if self.policy.security_level == crate::policy::SecurityLevel::Trusted {
933 -0.2
934 } else {
935 -1.0
936 },
937 );
938 }
939 };
940 if !id_score.is_finite() {
941 return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceInvalid, -1.5);
942 }
943 if id_score < self.policy.spam.min_intrinsic_dependence {
944 return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceTooLow, -1.5);
945 }
946
947 let routing = match self.select_routing_decision(&parsed) {
948 Ok(decision) => decision,
949 Err(err) => return self.drop_for_peer(&parsed, err, -0.5),
950 };
951 let mut outcome = ProcessOutcome::accepted(parsed.clone());
952 outcome.intrinsic_dependence = Some(id_score);
953 outcome.matched_count = routing.matched_messages.len();
954 outcome.routing_diagnostics = Some(RoutingDiagnostics {
955 best_peer: routing.best_peer.clone(),
956 best_distance_raw: routing.best_distance_raw,
957 best_distance_normalized: routing.best_distance_normalized,
958 threshold_raw: routing.threshold_raw,
959 });
960 if !enforce_recipient_guard {
961 let mut local_matches = routing.matched_messages.clone();
962 local_matches.truncate(self.policy.throughput.max_forward_actions);
963 outcome.local_matches = local_matches;
964 }
965
966 self.cache.insert(parsed.clone());
967 self.record_peer_inbound(&sender, raw_message.len());
968 self.adjust_peer_reputation(&sender, 0.4);
969
970 let mut actions = self.build_routing_actions(&parsed, routing, &now);
971 actions
972 .forwards
973 .truncate(self.policy.throughput.max_forward_actions);
974 actions
975 .client_plans
976 .truncate(self.policy.throughput.max_forward_actions);
977 outcome.forwards = actions.forwards;
978 outcome.client_plans = actions.client_plans;
979 outcome
980 }
981
982 fn drop_for_peer(
983 &mut self,
984 parsed: &CmrMessage,
985 reason: ProcessError,
986 reputation_delta: f64,
987 ) -> ProcessOutcome {
988 let sender = parsed.immediate_sender().to_owned();
989 self.adjust_peer_reputation(&sender, reputation_delta);
990 ProcessOutcome {
991 accepted: false,
992 drop_reason: Some(reason),
993 parsed_message: Some(parsed.clone()),
994 intrinsic_dependence: None,
995 forwards: Vec::new(),
996 client_plans: Vec::new(),
997 local_matches: Vec::new(),
998 matched_count: 0,
999 routing_diagnostics: None,
1000 key_exchange_control: false,
1001 }
1002 }
1003
1004 fn check_peer_rate(&mut self, peer: &str, message_bytes: usize) -> bool {
1005 let metrics = self.peers.entry(peer.to_owned()).or_default();
1006 metrics.window.allow_and_record(
1007 message_bytes,
1008 self.policy.throughput.per_peer_messages_per_minute,
1009 self.policy.throughput.per_peer_bytes_per_minute,
1010 )
1011 }
1012
1013 fn check_global_rate(&mut self, message_bytes: usize) -> bool {
1014 self.global_window.allow_and_record(
1015 message_bytes,
1016 self.policy.throughput.global_messages_per_minute,
1017 self.policy.throughput.global_bytes_per_minute,
1018 )
1019 }
1020
1021 fn peer_reputation(&self, peer: &str) -> f64 {
1022 self.peers.get(peer).map_or(0.0, |p| p.reputation)
1023 }
1024
1025 fn adjust_peer_reputation(&mut self, peer: &str, delta: f64) {
1026 let metrics = self.peers.entry(peer.to_owned()).or_default();
1027 metrics.reputation = (metrics.reputation + delta).clamp(-100.0, 100.0);
1028 }
1029
1030 fn is_local_peer_alias(&self, peer: &str) -> bool {
1031 let local = self.local_address.trim_end_matches('/');
1032 let candidate = peer.trim_end_matches('/');
1033 if candidate == local {
1034 return true;
1035 }
1036
1037 if let (Ok(local_url), Ok(candidate_url)) = (Url::parse(local), Url::parse(candidate)) {
1038 if !same_origin(&local_url, &candidate_url) {
1039 return false;
1040 }
1041 return is_path_alias(local_url.path(), candidate_url.path());
1042 }
1043
1044 candidate.starts_with(&format!("{local}/"))
1045 }
1046
1047 fn record_peer_inbound(&mut self, peer: &str, bytes: usize) {
1048 let metrics = self.peers.entry(peer.to_owned()).or_default();
1049 metrics.inbound_messages = metrics.inbound_messages.saturating_add(1);
1050 metrics.inbound_bytes = metrics
1051 .inbound_bytes
1052 .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
1053 }
1054
1055 fn record_peer_outbound(&mut self, peer: &str, bytes: usize) {
1056 let metrics = self.peers.entry(peer.to_owned()).or_default();
1057 metrics.outbound_messages = metrics.outbound_messages.saturating_add(1);
1058 metrics.outbound_bytes = metrics
1059 .outbound_bytes
1060 .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
1061 }
1062
1063 fn can_forward_to_peer(&self, peer: &str) -> bool {
1064 let Some(metrics) = self.peers.get(peer) else {
1065 return true;
1066 };
1067 if metrics.inbound_bytes == 0 {
1068 return true;
1071 }
1072 let ratio = metrics.outbound_bytes as f64 / metrics.inbound_bytes as f64;
1073 ratio <= self.policy.trust.max_outbound_inbound_ratio
1074 }
1075
1076 fn validate_signature_policy(
1077 &self,
1078 message: &CmrMessage,
1079 sender: &str,
1080 ) -> Result<(), ProcessError> {
1081 let known_key = self.shared_keys.get(sender);
1082 match (&message.signature, known_key) {
1083 (Signature::Unsigned, Some(_))
1084 if self.policy.trust.require_signatures_from_known_peers =>
1085 {
1086 Err(ProcessError::UnsignedRejected)
1087 }
1088 (Signature::Unsigned, None) if !self.policy.trust.allow_unsigned_from_unknown_peers => {
1089 Err(ProcessError::UnsignedRejected)
1090 }
1091 (Signature::Sha256(_), None) if self.policy.trust.reject_signed_without_known_key => {
1092 Err(ProcessError::SignedWithoutKnownKey)
1093 }
1094 (Signature::Sha256(_), Some(key)) => {
1095 if message
1096 .signature
1097 .verifies(&message.payload_without_signature_line(), Some(key))
1098 {
1099 Ok(())
1100 } else {
1101 Err(ProcessError::BadSignature)
1102 }
1103 }
1104 _ => Ok(()),
1105 }
1106 }
1107
1108 fn handle_key_exchange_control(
1109 &mut self,
1110 message: &CmrMessage,
1111 sender: &str,
1112 transport: &TransportKind,
1113 ) -> Result<Option<KeyExchangeControlOutcome>, ProcessError> {
1114 let Some(control) = parse_key_exchange(&message.body)? else {
1115 return Ok(None);
1116 };
1117
1118 if self.shared_keys.contains_key(sender) && matches!(message.signature, Signature::Unsigned)
1119 {
1120 return Err(ProcessError::UnsignedRejected);
1121 }
1122
1123 let old_key = self.shared_keys.get(sender).cloned();
1124 match control {
1125 KeyExchangeMessage::ClearKey { key } => {
1126 if !transport.is_secure_channel() {
1127 return Err(ProcessError::ClearKeyOnInsecureChannel);
1128 }
1129 self.cache.insert(message.clone());
1130 let derived =
1131 derive_exchange_key_from_bytes(&self.local_address, sender, b"clear", &key);
1132 self.shared_keys.insert(sender.to_owned(), derived);
1133 self.purge_key_exchange_cache(sender);
1134 Ok(Some(KeyExchangeControlOutcome::default()))
1135 }
1136 KeyExchangeMessage::RsaRequest { n, e } => {
1137 validate_rsa_request_params(&n, &e)?;
1138 let key = random_nonzero_biguint_below(&n).ok_or(
1139 ProcessError::WeakKeyExchangeParameters("failed to generate RSA session key"),
1140 )?;
1141 self.cache.insert(message.clone());
1142 let c = mod_pow(&key, &e, &n);
1143 let reply_body = KeyExchangeMessage::RsaReply { c }.render().into_bytes();
1144 self.shared_keys.insert(
1145 sender.to_owned(),
1146 derive_exchange_key(&self.local_address, sender, b"rsa", &key),
1147 );
1148 self.purge_key_exchange_cache(sender);
1149 Ok(Some(KeyExchangeControlOutcome {
1150 forwards: Vec::new(),
1151 client_plans: vec![ClientMessagePlan {
1152 destination: sender.to_owned(),
1153 body: reply_body,
1154 signing_key: old_key,
1155 reason: ForwardReason::KeyExchangeReply,
1156 }],
1157 }))
1158 }
1159 KeyExchangeMessage::RsaReply { c } => {
1160 let Some(state) = self.pending_rsa.remove(sender) else {
1161 return Err(ProcessError::MissingPendingKeyExchangeState);
1162 };
1163 if c >= state.n {
1164 return Err(ProcessError::WeakKeyExchangeParameters(
1165 "RSA reply ciphertext out of range",
1166 ));
1167 }
1168 let key = mod_pow(&c, &state.d, &state.n);
1169 if key.is_zero() {
1170 return Err(ProcessError::WeakKeyExchangeParameters(
1171 "RSA shared key reduced to zero",
1172 ));
1173 }
1174 self.cache.insert(message.clone());
1175 self.shared_keys.insert(
1176 sender.to_owned(),
1177 derive_exchange_key(&self.local_address, sender, b"rsa", &key),
1178 );
1179 self.purge_key_exchange_cache(sender);
1180 Ok(Some(KeyExchangeControlOutcome::default()))
1181 }
1182 KeyExchangeMessage::DhRequest { g, p, a_pub } => {
1183 validate_dh_request_params(&g, &p, &a_pub)?;
1184 let b_secret =
1185 random_dh_secret(&p).ok_or(ProcessError::WeakKeyExchangeParameters(
1186 "failed to generate DH secret exponent",
1187 ))?;
1188 let b_pub = mod_pow(&g, &b_secret, &p);
1189 let shared = mod_pow(&a_pub, &b_secret, &p);
1190 if shared <= BigUint::one() {
1191 return Err(ProcessError::WeakKeyExchangeParameters(
1192 "DH derived weak shared key",
1193 ));
1194 }
1195 self.cache.insert(message.clone());
1196 let reply_body = KeyExchangeMessage::DhReply { b_pub }.render().into_bytes();
1197 self.shared_keys.insert(
1198 sender.to_owned(),
1199 derive_exchange_key(&self.local_address, sender, b"dh", &shared),
1200 );
1201 self.purge_key_exchange_cache(sender);
1202 Ok(Some(KeyExchangeControlOutcome {
1203 forwards: Vec::new(),
1204 client_plans: vec![ClientMessagePlan {
1205 destination: sender.to_owned(),
1206 body: reply_body,
1207 signing_key: old_key,
1208 reason: ForwardReason::KeyExchangeReply,
1209 }],
1210 }))
1211 }
1212 KeyExchangeMessage::DhReply { b_pub } => {
1213 let Some(state) = self.pending_dh.remove(sender) else {
1214 return Err(ProcessError::MissingPendingKeyExchangeState);
1215 };
1216 validate_dh_reply_params(&b_pub, &state.p)?;
1217 let shared = mod_pow(&b_pub, &state.a_secret, &state.p);
1218 if shared <= BigUint::one() {
1219 return Err(ProcessError::WeakKeyExchangeParameters(
1220 "DH derived weak shared key",
1221 ));
1222 }
1223 self.cache.insert(message.clone());
1224 self.shared_keys.insert(
1225 sender.to_owned(),
1226 derive_exchange_key(&self.local_address, sender, b"dh", &shared),
1227 );
1228 self.purge_key_exchange_cache(sender);
1229 Ok(Some(KeyExchangeControlOutcome::default()))
1230 }
1231 }
1232 }
1233
1234 fn purge_key_exchange_cache(&mut self, peer: &str) {
1235 let local = self.local_address.clone();
1236 self.cache.remove_if(|message| {
1237 parse_key_exchange(&message.body).ok().flatten().is_some()
1238 && (message_contains_sender(message, peer)
1239 || message_contains_sender(message, &local))
1240 });
1241 }
1242
1243 fn select_routing_decision(
1244 &self,
1245 incoming: &CmrMessage,
1246 ) -> Result<RoutingDecision, ProcessError> {
1247 let threshold_raw = self.policy.spam.max_match_distance;
1248 let mut decision = RoutingDecision {
1249 best_peer: None,
1250 best_distance_raw: None,
1251 best_distance_normalized: None,
1252 threshold_raw,
1253 matched_messages: Vec::new(),
1254 compensatory: None,
1255 };
1256
1257 let peer_corpora = self.collect_peer_corpora();
1258 if peer_corpora.is_empty() {
1259 return Ok(decision);
1260 }
1261
1262 let mut canonical_incoming = incoming.clone();
1263 canonical_incoming.make_unsigned();
1264 let incoming_bytes = canonical_incoming.to_bytes();
1265 let mut peers: Vec<(&String, &Vec<u8>)> = peer_corpora.iter().collect();
1266 peers.sort_by(|left, right| left.0.cmp(right.0));
1267 let peer_names: Vec<String> = peers.iter().map(|(peer, _)| (*peer).to_owned()).collect();
1268 let corpora: Vec<Vec<u8>> = peers.iter().map(|(_, corpus)| (*corpus).clone()).collect();
1269 let distances = self
1270 .oracle
1271 .batch_compression_distance(&incoming_bytes, &corpora)
1272 .map_err(ProcessError::Compression)?;
1273
1274 let mut best: Option<(String, f64)> = None;
1275 let mut matched_peers = Vec::<String>::new();
1276 let incoming_len = incoming_bytes.len();
1277 for ((peer, _corpus), distance) in peer_names
1278 .into_iter()
1279 .zip(corpora.iter())
1280 .zip(distances.into_iter())
1281 {
1282 if !distance.is_finite() {
1283 continue;
1284 }
1285 let passed_threshold = distance <= threshold_raw;
1286 if passed_threshold {
1287 matched_peers.push(peer.clone());
1288 }
1289 if best
1290 .as_ref()
1291 .is_none_or(|(_, best_distance)| distance < *best_distance)
1292 {
1293 best = Some((peer, distance));
1294 }
1295 }
1296
1297 let Some((best_peer, best_distance)) = best else {
1298 return Ok(decision);
1299 };
1300 let Some(best_normalized) = self.normalized_match_distance(best_distance, incoming_len)
1301 else {
1302 return Ok(decision);
1303 };
1304 decision.best_peer = Some(best_peer.clone());
1305 decision.best_distance_raw = Some(best_distance);
1306 decision.best_distance_normalized = Some(best_normalized);
1307
1308 let passes_best_threshold = best_distance <= threshold_raw;
1309 if !passes_best_threshold || matched_peers.is_empty() {
1310 return Ok(decision);
1311 }
1312
1313 let matched_messages =
1314 self.select_matched_messages(&incoming_bytes, &matched_peers, threshold_raw)?;
1315 if matched_messages.is_empty() {
1316 return Ok(decision);
1317 }
1318
1319 let compensatory = if message_contains_sender(incoming, &best_peer) {
1320 self.select_compensatory_message(incoming, &best_peer, &peer_corpora)?
1321 .map(|message| (best_peer.clone(), message))
1322 } else {
1323 None
1324 };
1325
1326 decision.matched_messages = matched_messages;
1327 decision.compensatory = compensatory;
1328 Ok(decision)
1329 }
1330
1331 fn collect_peer_corpora(&self) -> HashMap<String, Vec<u8>> {
1332 let mut peer_corpora = HashMap::<String, Vec<u8>>::new();
1333 for key in &self.cache.order {
1334 let Some(entry) = self.cache.entries.get(key) else {
1335 continue;
1336 };
1337 if is_key_exchange_control_message(&entry.message) {
1338 continue;
1339 }
1340 let sender = entry.message.immediate_sender();
1341 peer_corpora
1342 .entry(sender.to_owned())
1343 .or_default()
1344 .extend_from_slice(&entry.message.to_bytes());
1345 }
1346 peer_corpora
1347 }
1348
1349 fn select_matched_messages(
1350 &self,
1351 incoming_bytes: &[u8],
1352 matched_peers: &[String],
1353 threshold_raw: f64,
1354 ) -> Result<Vec<CmrMessage>, ProcessError> {
1355 let matched = matched_peers
1356 .iter()
1357 .map(String::as_str)
1358 .collect::<HashSet<_>>();
1359 let mut out = Vec::new();
1360 for key in &self.cache.order {
1361 let Some(entry) = self.cache.entries.get(key) else {
1362 continue;
1363 };
1364 if is_key_exchange_control_message(&entry.message) {
1365 continue;
1366 }
1367 if !matched.contains(entry.message.immediate_sender()) {
1368 continue;
1369 }
1370 let mut candidate = entry.message.clone();
1371 candidate.make_unsigned();
1372 let candidate_bytes = candidate.to_bytes();
1373 let distance = self
1374 .oracle
1375 .compression_distance(incoming_bytes, &candidate_bytes)
1376 .map_err(ProcessError::Compression)?;
1377 if distance.is_finite() && distance <= threshold_raw {
1378 out.push(entry.message.clone());
1379 }
1380 }
1381 Ok(out)
1382 }
1383
1384 fn select_compensatory_message(
1385 &self,
1386 incoming: &CmrMessage,
1387 best_peer: &str,
1388 peer_corpora: &HashMap<String, Vec<u8>>,
1389 ) -> Result<Option<CmrMessage>, ProcessError> {
1390 let ordered_entries = self
1391 .cache
1392 .order
1393 .iter()
1394 .filter_map(|key| self.cache.entries.get(key))
1395 .filter(|entry| !is_key_exchange_control_message(&entry.message))
1396 .collect::<Vec<_>>();
1397 if ordered_entries.len() <= 1 {
1398 return Ok(None);
1399 }
1400
1401 let encoded_entries = ordered_entries
1402 .iter()
1403 .map(|entry| (entry.message.clone(), entry.message.to_bytes()))
1404 .collect::<Vec<_>>();
1405 let total_bytes = encoded_entries
1406 .iter()
1407 .map(|(_, bytes)| bytes.len())
1408 .sum::<usize>();
1409 let mut cache_blob = Vec::with_capacity(total_bytes);
1410 let mut ranges = Vec::with_capacity(encoded_entries.len());
1411 for (_, bytes) in &encoded_entries {
1412 let start = cache_blob.len();
1413 cache_blob.extend_from_slice(bytes);
1414 let end = cache_blob.len();
1415 ranges.push((start, end));
1416 }
1417
1418 let mut canonical_incoming = incoming.clone();
1419 canonical_incoming.make_unsigned();
1420 let mut x_guess = Vec::with_capacity(
1421 canonical_incoming.encoded_len()
1422 + peer_corpora.get(best_peer).map_or(0, std::vec::Vec::len),
1423 );
1424 x_guess.extend_from_slice(&canonical_incoming.to_bytes());
1425 if let Some(known_from_best_peer) = peer_corpora.get(best_peer) {
1426 x_guess.extend_from_slice(known_from_best_peer);
1427 }
1428 let guess_distances = self
1429 .oracle
1430 .batch_compression_distance(
1431 &x_guess,
1432 &encoded_entries
1433 .iter()
1434 .map(|(_, bytes)| bytes.clone())
1435 .collect::<Vec<_>>(),
1436 )
1437 .map_err(ProcessError::Compression)?;
1438
1439 let mut best_score = f64::NEG_INFINITY;
1440 let mut best_message = None;
1441 for (idx, (candidate_message, candidate_bytes)) in encoded_entries.iter().enumerate() {
1442 if message_contains_sender(candidate_message, best_peer) {
1443 continue;
1444 }
1445 if total_bytes <= candidate_bytes.len() {
1446 continue;
1447 }
1448 let (start, end) = ranges[idx];
1449 let mut right_parts: [&[u8]; 2] = [&[][..], &[][..]];
1450 let mut right_count = 0;
1451 if start > 0 {
1452 right_parts[right_count] = &cache_blob[..start];
1453 right_count += 1;
1454 }
1455 if end < cache_blob.len() {
1456 right_parts[right_count] = &cache_blob[end..];
1457 right_count += 1;
1458 }
1459 if right_count == 0 {
1460 continue;
1461 }
1462
1463 let d_cache = self
1464 .oracle
1465 .compression_distance_chain(
1466 &[candidate_bytes.as_slice()],
1467 &right_parts[..right_count],
1468 )
1469 .map_err(ProcessError::Compression)?;
1470 let Some(d_guess) = guess_distances.get(idx).copied() else {
1471 continue;
1472 };
1473 if !d_cache.is_finite() || !d_guess.is_finite() {
1474 continue;
1475 }
1476 let score = d_cache - d_guess;
1477 if score > best_score {
1478 best_score = score;
1479 best_message = Some(candidate_message.clone());
1480 }
1481 }
1482
1483 Ok(best_message)
1484 }
1485
1486 fn build_routing_actions(
1487 &mut self,
1488 incoming: &CmrMessage,
1489 decision: RoutingDecision,
1490 now: &CmrTimestamp,
1491 ) -> RoutingActions {
1492 let mut out = RoutingActions::default();
1493 let mut dedupe = HashSet::<(String, String)>::new();
1494 let incoming_key = cache_key(incoming);
1495 let incoming_destinations = sorted_unique_addresses(&incoming.header);
1496 let suppress_best = decision
1497 .best_peer
1498 .as_deref()
1499 .filter(|peer| message_contains_sender(incoming, peer));
1500
1501 if let Some((destination, message)) = decision.compensatory.clone() {
1502 let dedupe_key = (destination.clone(), cache_key(&message));
1503 if !dedupe.contains(&dedupe_key) {
1504 let actions = self.forward_with_optional_key_exchange(
1505 &message,
1506 &destination,
1507 now,
1508 ForwardReason::CompensatoryReply,
1509 );
1510 if !actions.forwards.is_empty() {
1511 dedupe.insert(dedupe_key);
1512 out.forwards.extend(actions.forwards);
1513 out.client_plans.extend(actions.client_plans);
1514 }
1515 }
1516 }
1517
1518 for matched in &decision.matched_messages {
1519 for destination in sorted_unique_addresses(&matched.header) {
1520 if destination == self.local_address {
1521 continue;
1522 }
1523 if suppress_best.is_some_and(|peer| peer == destination) {
1524 continue;
1525 }
1526 let dedupe_key = (destination.clone(), incoming_key.clone());
1527 if dedupe.contains(&dedupe_key) {
1528 continue;
1529 }
1530 let actions = self.forward_with_optional_key_exchange(
1531 incoming,
1532 &destination,
1533 now,
1534 ForwardReason::MatchedForwardIncoming,
1535 );
1536 if !actions.forwards.is_empty() {
1537 dedupe.insert(dedupe_key);
1538 out.forwards.extend(actions.forwards);
1539 out.client_plans.extend(actions.client_plans);
1540 }
1541 }
1542
1543 let matched_key = cache_key(matched);
1544 for destination in &incoming_destinations {
1545 if destination == &self.local_address {
1546 continue;
1547 }
1548 let dedupe_key = (destination.clone(), matched_key.clone());
1549 if dedupe.contains(&dedupe_key) {
1550 continue;
1551 }
1552 let actions = self.forward_with_optional_key_exchange(
1553 matched,
1554 destination,
1555 now,
1556 ForwardReason::MatchedForwardCached,
1557 );
1558 if !actions.forwards.is_empty() {
1559 dedupe.insert(dedupe_key);
1560 out.forwards.extend(actions.forwards);
1561 out.client_plans.extend(actions.client_plans);
1562 }
1563 }
1564 }
1565
1566 out
1567 }
1568
1569 fn forward_with_optional_key_exchange(
1570 &mut self,
1571 message: &CmrMessage,
1572 destination: &str,
1573 now: &CmrTimestamp,
1574 reason: ForwardReason,
1575 ) -> RoutingActions {
1576 if destination == self.local_address
1577 || !self.can_forward_to_peer(destination)
1578 || message_contains_sender(message, destination)
1579 {
1580 return RoutingActions::default();
1581 }
1582
1583 let mut out = RoutingActions::default();
1584 if !self.shared_keys.contains_key(destination)
1585 && !self.pending_rsa.contains_key(destination)
1586 && !self.pending_dh.contains_key(destination)
1587 {
1588 let kx = match self.policy.trust.auto_key_exchange_mode {
1589 AutoKeyExchangeMode::Rsa => self.build_rsa_initiation_plan(destination, now),
1590 AutoKeyExchangeMode::Dh => self.build_dh_initiation_plan(destination, now),
1591 };
1592 if let Some(plan) = kx {
1593 out.client_plans.push(plan);
1594 }
1595 }
1596 out.forwards
1597 .push(self.wrap_and_forward(message, destination, now, reason));
1598 out
1599 }
1600
1601 fn build_rsa_initiation_plan(
1602 &mut self,
1603 destination: &str,
1604 _now: &CmrTimestamp,
1605 ) -> Option<ClientMessagePlan> {
1606 let e = BigUint::from(65_537_u32);
1607 let bits_each = usize::try_from(MIN_RSA_MODULUS_BITS / 2).ok()?;
1608 let mut generated = None;
1609 for _ in 0..8 {
1610 let p = generate_probable_prime(bits_each, 12)?;
1611 let mut q = generate_probable_prime(bits_each, 12)?;
1612 if q == p {
1613 q = generate_probable_prime(bits_each, 12)?;
1614 }
1615 if q == p {
1616 continue;
1617 }
1618
1619 let n = &p * &q;
1620 if n.bits() < MIN_RSA_MODULUS_BITS {
1621 continue;
1622 }
1623 let p1 = &p - BigUint::one();
1624 let q1 = &q - BigUint::one();
1625 let lambda = lcm_biguint(&p1, &q1);
1626 if gcd_biguint(&e, &lambda) != BigUint::one() {
1627 continue;
1628 }
1629 let Some(d) = mod_inverse_biguint(&e, &lambda) else {
1630 continue;
1631 };
1632 generated = Some((n, d));
1633 break;
1634 }
1635 let (n, d) = generated?;
1636 self.pending_rsa
1637 .insert(destination.to_owned(), PendingRsaState { n: n.clone(), d });
1638
1639 let body = KeyExchangeMessage::RsaRequest { n, e }
1640 .render()
1641 .into_bytes();
1642 Some(ClientMessagePlan {
1643 destination: destination.to_owned(),
1644 body,
1645 signing_key: None,
1646 reason: ForwardReason::KeyExchangeInitiation,
1647 })
1648 }
1649
1650 fn build_dh_initiation_plan(
1651 &mut self,
1652 destination: &str,
1653 _now: &CmrTimestamp,
1654 ) -> Option<ClientMessagePlan> {
1655 let bits = usize::try_from(MIN_DH_MODULUS_BITS).ok()?;
1656 let p = generate_probable_safe_prime(bits, 10)?;
1657 let g = find_primitive_root_for_safe_prime(&p)?;
1658 let a_secret = random_dh_secret(&p)?;
1659 let a_pub = mod_pow(&g, &a_secret, &p);
1660 self.pending_dh.insert(
1661 destination.to_owned(),
1662 PendingDhState {
1663 p: p.clone(),
1664 a_secret,
1665 },
1666 );
1667
1668 let body = KeyExchangeMessage::DhRequest { g, p, a_pub }
1669 .render()
1670 .into_bytes();
1671 Some(ClientMessagePlan {
1672 destination: destination.to_owned(),
1673 body,
1674 signing_key: None,
1675 reason: ForwardReason::KeyExchangeInitiation,
1676 })
1677 }
1678
1679 fn wrap_and_forward(
1680 &mut self,
1681 message: &CmrMessage,
1682 destination: &str,
1683 now: &CmrTimestamp,
1684 reason: ForwardReason,
1685 ) -> ForwardAction {
1686 let mut forwarded = message.clone();
1687 forwarded.make_unsigned();
1688 forwarded.prepend_hop(MessageId {
1689 timestamp: self
1690 .next_forward_timestamp(now, message.header.first().map(|id| &id.timestamp)),
1691 address: self.local_address.clone(),
1692 });
1693 if let Some(key) = self.shared_keys.get(destination) {
1694 forwarded.sign_with_key(key);
1695 }
1696 ForwardAction {
1697 destination: destination.to_owned(),
1698 message_bytes: forwarded.to_bytes(),
1699 reason,
1700 }
1701 }
1702
1703 fn next_forward_timestamp(
1704 &mut self,
1705 now: &CmrTimestamp,
1706 newer_than: Option<&CmrTimestamp>,
1707 ) -> CmrTimestamp {
1708 self.forward_counter = self.forward_counter.saturating_add(1);
1709 let now_text = now.to_string();
1710 let (date_part, now_fraction) = split_timestamp_text(&now_text);
1711 let counter_suffix = format!("{:011}", self.forward_counter % 100_000_000_000);
1712 let mut fraction = if now_fraction.is_empty() {
1713 format!("{:09}", self.forward_counter % 1_000_000_000)
1714 } else {
1715 format!("{now_fraction}{counter_suffix}")
1716 };
1717 let mut candidate = parse_timestamp_with_fraction(date_part, &fraction)
1718 .unwrap_or_else(|| now.clone().with_fraction(fraction.clone()));
1719 if let Some(min_ts) = newer_than
1720 && candidate <= *min_ts
1721 {
1722 let min_text = min_ts.to_string();
1723 let (min_date, min_fraction) = split_timestamp_text(&min_text);
1724 fraction = format!("{min_fraction}1");
1725 candidate = parse_timestamp_with_fraction(min_date, &fraction)
1726 .unwrap_or_else(|| min_ts.clone().with_fraction(fraction));
1727 }
1728 candidate
1729 }
1730}
1731
1732fn same_origin(left: &Url, right: &Url) -> bool {
1733 left.scheme().eq_ignore_ascii_case(right.scheme())
1734 && left.host_str().map(|h| h.to_ascii_lowercase())
1735 == right.host_str().map(|h| h.to_ascii_lowercase())
1736 && left.port_or_known_default() == right.port_or_known_default()
1737}
1738
1739fn is_path_alias(local_path: &str, candidate_path: &str) -> bool {
1740 let local = normalize_alias_path(local_path);
1741 let candidate = normalize_alias_path(candidate_path);
1742 candidate == local || candidate.starts_with(&format!("{local}/"))
1743}
1744
1745fn normalize_alias_path(path: &str) -> String {
1746 let trimmed = path.trim_end_matches('/');
1747 if trimmed.is_empty() {
1748 "/".to_owned()
1749 } else {
1750 trimmed.to_owned()
1751 }
1752}
1753
1754fn split_timestamp_text(input: &str) -> (&str, &str) {
1755 if let Some((date, fraction)) = input.split_once('.') {
1756 (date, fraction)
1757 } else {
1758 (input, "")
1759 }
1760}
1761
1762fn parse_timestamp_with_fraction(date_part: &str, fraction: &str) -> Option<CmrTimestamp> {
1763 let text = if fraction.is_empty() {
1764 date_part.to_owned()
1765 } else {
1766 format!("{date_part}.{fraction}")
1767 };
1768 CmrTimestamp::parse(&text).ok()
1769}
1770
1771fn cache_key(message: &CmrMessage) -> String {
1772 message
1773 .origin_id()
1774 .map_or_else(|| message.header[0].to_string(), MessageId::to_string)
1775}
1776
1777fn message_contains_sender(message: &CmrMessage, sender: &str) -> bool {
1778 message.header.iter().any(|id| id.address == sender)
1779}
1780
1781fn is_key_exchange_control_message(message: &CmrMessage) -> bool {
1782 parse_key_exchange(&message.body).ok().flatten().is_some()
1783}
1784
1785fn sorted_unique_addresses(header: &[MessageId]) -> Vec<String> {
1786 let mut addresses = header
1787 .iter()
1788 .map(|id| id.address.clone())
1789 .collect::<Vec<_>>();
1790 addresses.sort();
1791 addresses.dedup();
1792 addresses
1793}
1794
1795fn is_probably_binary(body: &[u8]) -> bool {
1796 if body.is_empty() {
1797 return false;
1798 }
1799 let non_text = body
1800 .iter()
1801 .copied()
1802 .filter(|b| !matches!(b, 0x09 | 0x0A | 0x0D | 0x20..=0x7E))
1803 .count();
1804 non_text * 10 > body.len() * 3
1805}
1806
1807fn looks_like_executable(body: &[u8]) -> bool {
1808 body.starts_with(b"\x7fELF")
1809 || body.starts_with(b"MZ")
1810 || body.starts_with(b"\xfe\xed\xfa\xce")
1811 || body.starts_with(b"\xce\xfa\xed\xfe")
1812 || body.starts_with(b"\xcf\xfa\xed\xfe")
1813 || body.starts_with(b"\xfe\xed\xfa\xcf")
1814}
1815
1816fn validate_rsa_request_params(n: &BigUint, e: &BigUint) -> Result<(), ProcessError> {
1817 if n.bits() < MIN_RSA_MODULUS_BITS {
1818 return Err(ProcessError::WeakKeyExchangeParameters(
1819 "RSA modulus too small",
1820 ));
1821 }
1822 let two = BigUint::from(2_u8);
1823 if n <= &two || (n % &two).is_zero() {
1824 return Err(ProcessError::WeakKeyExchangeParameters(
1825 "RSA modulus must be odd and > 2",
1826 ));
1827 }
1828 if e <= &two || (e % &two).is_zero() {
1829 return Err(ProcessError::WeakKeyExchangeParameters(
1830 "RSA exponent must be odd and > 2",
1831 ));
1832 }
1833 if e >= n {
1834 return Err(ProcessError::WeakKeyExchangeParameters(
1835 "RSA exponent must be smaller than modulus",
1836 ));
1837 }
1838 if is_probably_prime(n, 10) {
1839 return Err(ProcessError::WeakKeyExchangeParameters(
1840 "RSA modulus must be composite",
1841 ));
1842 }
1843 Ok(())
1844}
1845
1846fn validate_dh_request_params(
1847 g: &BigUint,
1848 p: &BigUint,
1849 a_pub: &BigUint,
1850) -> Result<(), ProcessError> {
1851 if p.bits() < MIN_DH_MODULUS_BITS {
1852 return Err(ProcessError::WeakKeyExchangeParameters(
1853 "DH modulus too small",
1854 ));
1855 }
1856 let two = BigUint::from(2_u8);
1857 if p <= &two || (p % &two).is_zero() {
1858 return Err(ProcessError::WeakKeyExchangeParameters(
1859 "DH modulus must be odd and > 2",
1860 ));
1861 }
1862 if !is_probably_safe_prime(p, 10) {
1863 return Err(ProcessError::WeakKeyExchangeParameters(
1864 "DH modulus must be a safe prime",
1865 ));
1866 }
1867
1868 let p_minus_one = p - BigUint::one();
1869 if g <= &BigUint::one() || g >= &p_minus_one {
1870 return Err(ProcessError::WeakKeyExchangeParameters(
1871 "DH generator must be in range (1, p-1)",
1872 ));
1873 }
1874 if a_pub <= &BigUint::one() || a_pub >= &p_minus_one {
1875 return Err(ProcessError::WeakKeyExchangeParameters(
1876 "DH public value must be in range (1, p-1)",
1877 ));
1878 }
1879 if !is_primitive_root_for_safe_prime(g, p) {
1880 return Err(ProcessError::WeakKeyExchangeParameters(
1881 "DH generator must be a primitive root of p",
1882 ));
1883 }
1884 Ok(())
1885}
1886
1887fn validate_dh_reply_params(b_pub: &BigUint, p: &BigUint) -> Result<(), ProcessError> {
1888 if !is_probably_safe_prime(p, 10) {
1889 return Err(ProcessError::WeakKeyExchangeParameters(
1890 "DH modulus must be a safe prime",
1891 ));
1892 }
1893 let p_minus_one = p - BigUint::one();
1894 if b_pub <= &BigUint::one() || b_pub >= &p_minus_one {
1895 return Err(ProcessError::WeakKeyExchangeParameters(
1896 "DH reply value must be in range (1, p-1)",
1897 ));
1898 }
1899 Ok(())
1900}
1901
1902fn is_primitive_root_for_safe_prime(g: &BigUint, p: &BigUint) -> bool {
1903 if p <= &BigUint::from(3_u8) {
1904 return false;
1905 }
1906 let p_minus_one = p - BigUint::one();
1907 if g <= &BigUint::one() || g >= &p_minus_one {
1908 return false;
1909 }
1910 let q = &p_minus_one >> 1usize;
1913 let one = BigUint::one();
1914 let two = BigUint::from(2_u8);
1915 mod_pow(g, &two, p) != one && mod_pow(g, &q, p) != one
1916}
1917
1918fn find_primitive_root_for_safe_prime(p: &BigUint) -> Option<BigUint> {
1919 for candidate in 2_u32..=65_537_u32 {
1920 let g = BigUint::from(candidate);
1921 if is_primitive_root_for_safe_prime(&g, p) {
1922 return Some(g);
1923 }
1924 }
1925 None
1926}
1927
1928fn random_nonzero_biguint_below(modulus: &BigUint) -> Option<BigUint> {
1929 let modulus_bits = usize::try_from(modulus.bits()).ok()?;
1930 if modulus_bits == 0 {
1931 return None;
1932 }
1933 let byte_len = modulus_bits.div_ceil(8);
1934 let excess_bits = byte_len.saturating_mul(8).saturating_sub(modulus_bits);
1935 let mut rng = rand::rng();
1936 let mut raw = vec![0_u8; byte_len];
1937 for _ in 0..256 {
1938 rng.fill_bytes(&mut raw);
1939 if excess_bits > 0 {
1940 raw[0] &= 0xff_u8 >> excess_bits;
1941 }
1942 let value = BigUint::from_bytes_be(&raw);
1943 if !value.is_zero() && &value < modulus {
1944 return Some(value);
1945 }
1946 }
1947 None
1948}
1949
1950fn random_dh_secret(p: &BigUint) -> Option<BigUint> {
1951 if p <= &BigUint::one() {
1952 return None;
1953 }
1954 let upper_bound = p - BigUint::one();
1955 for _ in 0..256 {
1956 let candidate = random_nonzero_biguint_below(&upper_bound)?;
1957 if candidate > BigUint::one() {
1958 return Some(candidate);
1959 }
1960 }
1961 None
1962}
1963
1964fn generate_probable_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1965 if bits < 2 {
1966 return None;
1967 }
1968 for _ in 0..4096 {
1969 let candidate = random_odd_biguint_with_bits(bits)?;
1970 if is_probably_prime(&candidate, rounds) {
1971 return Some(candidate);
1972 }
1973 }
1974 None
1975}
1976
1977fn generate_probable_safe_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1978 if bits < 3 {
1979 return None;
1980 }
1981 for _ in 0..256 {
1982 let q = generate_probable_prime(bits.saturating_sub(1), rounds)?;
1983 let p: BigUint = (&q << 1usize) + BigUint::one();
1984 if p.bits() >= u64::try_from(bits).ok()? && is_probably_prime(&p, rounds) {
1985 return Some(p);
1986 }
1987 }
1988 None
1989}
1990
1991fn random_odd_biguint_with_bits(bits: usize) -> Option<BigUint> {
1992 if bits < 2 {
1993 return None;
1994 }
1995 let byte_len = bits.div_ceil(8);
1996 let excess_bits = byte_len.saturating_mul(8).saturating_sub(bits);
1997 let mut bytes = vec![0_u8; byte_len];
1998 rand::rng().fill_bytes(&mut bytes);
1999 if excess_bits > 0 {
2000 bytes[0] &= 0xff_u8 >> excess_bits;
2001 }
2002 let top_bit = 7_u8.saturating_sub(u8::try_from(excess_bits).ok()?);
2003 bytes[0] |= 1_u8 << top_bit;
2004 bytes[byte_len.saturating_sub(1)] |= 1;
2005 Some(BigUint::from_bytes_be(&bytes))
2006}
2007
2008fn gcd_biguint(left: &BigUint, right: &BigUint) -> BigUint {
2009 let mut a = left.clone();
2010 let mut b = right.clone();
2011 while !b.is_zero() {
2012 let r = &a % &b;
2013 a = b;
2014 b = r;
2015 }
2016 a
2017}
2018
2019fn lcm_biguint(left: &BigUint, right: &BigUint) -> BigUint {
2020 if left.is_zero() || right.is_zero() {
2021 return BigUint::zero();
2022 }
2023 (left / gcd_biguint(left, right)) * right
2024}
2025
2026fn mod_inverse_biguint(value: &BigUint, modulus: &BigUint) -> Option<BigUint> {
2027 let a = value.to_bigint()?;
2028 let m = modulus.to_bigint()?;
2029 let (g, x, _) = extended_gcd_bigint(a, m.clone());
2030 if g != BigInt::one() {
2031 return None;
2032 }
2033 let mut reduced = x % &m;
2034 if reduced < BigInt::zero() {
2035 reduced += &m;
2036 }
2037 reduced.try_into().ok()
2038}
2039
2040fn extended_gcd_bigint(a: BigInt, b: BigInt) -> (BigInt, BigInt, BigInt) {
2041 let mut old_r = a;
2042 let mut r = b;
2043 let mut old_s = BigInt::one();
2044 let mut s = BigInt::zero();
2045 let mut old_t = BigInt::zero();
2046 let mut t = BigInt::one();
2047
2048 while r != BigInt::zero() {
2049 let q = &old_r / &r;
2050
2051 let new_r = &old_r - &q * &r;
2052 old_r = r;
2053 r = new_r;
2054
2055 let new_s = &old_s - &q * &s;
2056 old_s = s;
2057 s = new_s;
2058
2059 let new_t = &old_t - &q * &t;
2060 old_t = t;
2061 t = new_t;
2062 }
2063 (old_r, old_s, old_t)
2064}
2065
2066fn derive_exchange_key(local: &str, peer: &str, label: &[u8], secret: &BigUint) -> Vec<u8> {
2067 let mut ikm = secret.to_bytes_be();
2068 if ikm.is_empty() {
2069 ikm.push(0);
2070 }
2071 derive_exchange_key_from_bytes(local, peer, label, &ikm)
2072}
2073
2074fn derive_exchange_key_from_bytes(local: &str, peer: &str, label: &[u8], secret: &[u8]) -> Vec<u8> {
2075 let (left, right) = if local <= peer {
2076 (local.as_bytes(), peer.as_bytes())
2077 } else {
2078 (peer.as_bytes(), local.as_bytes())
2079 };
2080
2081 let hk = Hkdf::<Sha256>::new(Some(b"cmr-v1-key-exchange"), secret);
2082 let mut info = Vec::with_capacity(3 + label.len() + left.len() + right.len());
2083 info.extend_from_slice(b"cmr");
2084 info.push(0);
2085 info.extend_from_slice(label);
2086 info.push(0);
2087 info.extend_from_slice(left);
2088 info.push(0);
2089 info.extend_from_slice(right);
2090
2091 let mut out = [0_u8; 32];
2092 hk.expand(&info, &mut out)
2093 .expect("HKDF expand length is fixed and valid");
2094 out.to_vec()
2095}
2096
2097fn is_probably_safe_prime(p: &BigUint, rounds: usize) -> bool {
2098 if !is_probably_prime(p, rounds) {
2099 return false;
2100 }
2101 let one = BigUint::one();
2102 let two = BigUint::from(2_u8);
2103 if p <= &two {
2104 return false;
2105 }
2106 let q = (p - &one) >> 1;
2107 is_probably_prime(&q, rounds)
2108}
2109
2110fn is_probably_prime(n: &BigUint, rounds: usize) -> bool {
2111 let two = BigUint::from(2_u8);
2112 let three = BigUint::from(3_u8);
2113 if n < &two {
2114 return false;
2115 }
2116 if n == &two || n == &three {
2117 return true;
2118 }
2119 if (n % &two).is_zero() {
2120 return false;
2121 }
2122
2123 let one = BigUint::one();
2124 let n_minus_one = n - &one;
2125 let mut d = n_minus_one.clone();
2126 let mut s = 0_u32;
2127 while (&d % &two).is_zero() {
2128 d >>= 1;
2129 s = s.saturating_add(1);
2130 }
2131
2132 const BASES: [u8; 12] = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37];
2133 for &base in &BASES {
2134 let a = BigUint::from(base);
2135 if a >= n_minus_one {
2136 continue;
2137 }
2138 if is_miller_rabin_witness(n, &d, s, &a) {
2139 return false;
2140 }
2141 }
2142
2143 let three = BigUint::from(3_u8);
2144 let n_minus_three = n - &three;
2145 for _ in 0..rounds {
2146 let Some(offset) = random_nonzero_biguint_below(&n_minus_three) else {
2147 return false;
2148 };
2149 let a = offset + &two;
2150 if is_miller_rabin_witness(n, &d, s, &a) {
2151 return false;
2152 }
2153 }
2154
2155 true
2156}
2157
2158fn is_miller_rabin_witness(n: &BigUint, d: &BigUint, s: u32, a: &BigUint) -> bool {
2159 let one = BigUint::one();
2160 let n_minus_one = n - &one;
2161 let mut x = mod_pow(a, d, n);
2162 if x == one || x == n_minus_one {
2163 return false;
2164 }
2165 for _ in 1..s {
2166 x = (&x * &x) % n;
2167 if x == n_minus_one {
2168 return false;
2169 }
2170 }
2171 true
2172}
2173
2174#[cfg(test)]
2175mod tests {
2176 use super::*;
2177
2178 struct StubOracle;
2179
2180 impl CompressionOracle for StubOracle {
2181 fn compression_distance(
2182 &self,
2183 _left: &[u8],
2184 _right: &[u8],
2185 ) -> Result<f64, CompressionError> {
2186 Ok(0.4)
2187 }
2188
2189 fn intrinsic_dependence(
2190 &self,
2191 _data: &[u8],
2192 _max_order: i64,
2193 ) -> Result<f64, CompressionError> {
2194 Ok(0.5)
2195 }
2196 }
2197
2198 fn now() -> CmrTimestamp {
2199 CmrTimestamp::parse("2030/01/01 00:00:10").expect("ts")
2200 }
2201
2202 #[test]
2203 fn accepts_minimal_message() {
2204 let policy = RoutingPolicy::default();
2205 let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
2206 let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
2207 let outcome = router.process_incoming(raw, TransportKind::Http, now());
2208 assert!(outcome.accepted);
2209 assert!(outcome.drop_reason.is_none());
2210 }
2211
2212 #[test]
2213 fn rejects_duplicate_id() {
2214 let policy = RoutingPolicy::default();
2215 let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
2216 let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
2217 let first = router.process_incoming(raw, TransportKind::Http, now());
2218 assert!(first.accepted);
2219 let second = router.process_incoming(raw, TransportKind::Http, now());
2220 assert!(!second.accepted);
2221 assert!(matches!(
2222 second.drop_reason,
2223 Some(ProcessError::DuplicateMessageId)
2224 ));
2225 }
2226
2227 #[test]
2228 fn local_client_processing_allows_local_sender_while_network_ingress_rejects_it() {
2229 let policy = RoutingPolicy::default();
2230 let mut router = Router::new("http://bob/".to_owned(), policy, StubOracle);
2231 let local_sender = b"0\r\n2029/12/31 23:59:59 http://bob/\r\n\r\n2\r\nhi";
2232
2233 let ingress = router.process_incoming(local_sender, TransportKind::Http, now());
2234 assert!(!ingress.accepted);
2235 assert!(matches!(
2236 ingress.drop_reason,
2237 Some(ProcessError::Parse(
2238 crate::protocol::ParseError::RecipientAddressInHeader
2239 ))
2240 ));
2241
2242 let local = router.process_local_client_message(local_sender, TransportKind::Http, now());
2243 assert!(local.accepted);
2244 assert!(local.drop_reason.is_none());
2245 }
2246
2247 #[test]
2248 fn cache_inserts_messages_in_unsigned_canonical_form() {
2249 let mut cache = MessageCache::new(16, 1024 * 1024);
2250 let mut message = CmrMessage {
2251 signature: Signature::Unsigned,
2252 header: vec![MessageId {
2253 timestamp: CmrTimestamp::parse("2029/12/31 23:59:59").expect("timestamp"),
2254 address: "http://alice".to_owned(),
2255 }],
2256 body: b"payload".to_vec(),
2257 };
2258 message.sign_with_key(b"shared-key");
2259 assert!(matches!(message.signature, Signature::Sha256(_)));
2260
2261 let key = cache_key(&message);
2262 cache.insert(message);
2263 let stored = cache.entries.get(&key).expect("cached entry");
2264 assert!(matches!(stored.message.signature, Signature::Unsigned));
2265 assert!(stored.message.to_bytes().starts_with(b"0\r\n"));
2266 }
2267
2268 #[test]
2269 fn forward_timestamp_is_strictly_newer_than_existing_header() {
2270 let policy = RoutingPolicy::default();
2271 let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
2272 let now = CmrTimestamp::parse("2030/01/01 00:00:10.000000001").expect("now");
2273 let newest_existing = CmrTimestamp::parse("2030/01/01 00:00:10.9").expect("existing");
2274 let forwarded = router.next_forward_timestamp(&now, Some(&newest_existing));
2275 assert!(forwarded > newest_existing);
2276 }
2277
2278 #[test]
2279 fn local_peer_alias_rejects_prefix_collisions() {
2280 let policy = RoutingPolicy::default();
2281 let router = Router::new("http://peer.example/cmr".to_owned(), policy, StubOracle);
2282 assert!(!router.is_local_peer_alias("http://peer.example/cmr-admin"));
2283 }
2284
2285 #[test]
2286 fn local_peer_alias_accepts_same_origin_subpath() {
2287 let policy = RoutingPolicy::default();
2288 let router = Router::new("http://peer.example/cmr".to_owned(), policy, StubOracle);
2289 assert!(router.is_local_peer_alias("http://peer.example/cmr/inbox"));
2290 }
2291}