1use std::collections::{HashMap, HashSet, VecDeque};
4use std::time::{Duration, Instant};
5
6use hkdf::Hkdf;
7use num_bigint::{BigInt, BigUint, ToBigInt};
8use num_traits::{One, Zero};
9use rand::RngCore;
10use sha2::Sha256;
11use thiserror::Error;
12
13use crate::key_exchange::{KeyExchangeError, KeyExchangeMessage, mod_pow, parse_key_exchange};
14use crate::policy::{AutoKeyExchangeMode, RoutingPolicy};
15use crate::protocol::{
16 CmrMessage, CmrTimestamp, MessageId, ParseContext, ParseError, Signature, TransportKind,
17 parse_message,
18};
19
20#[derive(Debug, Error)]
22pub enum CompressionError {
23 #[error("compressor unavailable: {0}")]
25 Unavailable(String),
26 #[error("compressor failure: {0}")]
28 Failed(String),
29}
30
31pub trait CompressionOracle: Send + Sync {
33 fn ncd_sym(&self, left: &[u8], right: &[u8]) -> Result<f64, CompressionError>;
35 fn compression_distance(&self, left: &[u8], right: &[u8]) -> Result<f64, CompressionError>;
38 fn intrinsic_dependence(&self, data: &[u8], max_order: i64) -> Result<f64, CompressionError>;
40 fn batch_compression_distance(
42 &self,
43 target: &[u8],
44 candidates: &[Vec<u8>],
45 ) -> Result<Vec<f64>, CompressionError> {
46 let mut out = Vec::with_capacity(candidates.len());
47 for candidate in candidates {
48 out.push(self.compression_distance(target, candidate)?);
49 }
50 Ok(out)
51 }
52 fn batch_ncd_sym(
54 &self,
55 target: &[u8],
56 candidates: &[Vec<u8>],
57 ) -> Result<Vec<f64>, CompressionError> {
58 let mut out = Vec::with_capacity(candidates.len());
59 for candidate in candidates {
60 out.push(self.ncd_sym(target, candidate)?);
61 }
62 Ok(out)
63 }
64}
65
66#[derive(Clone, Debug)]
67struct CacheEntry {
68 key: String,
69 message: CmrMessage,
70 encoded_size: usize,
71}
72
73#[derive(Debug)]
74struct MessageCache {
75 entries: HashMap<String, CacheEntry>,
76 order: VecDeque<String>,
77 id_counts: HashMap<String, usize>,
78 total_bytes: usize,
79 max_messages: usize,
80 max_bytes: usize,
81}
82
83impl MessageCache {
84 fn new(max_messages: usize, max_bytes: usize) -> Self {
85 Self {
86 entries: HashMap::new(),
87 order: VecDeque::new(),
88 id_counts: HashMap::new(),
89 total_bytes: 0,
90 max_messages,
91 max_bytes,
92 }
93 }
94
95 fn has_seen_any_id(&self, message: &CmrMessage) -> bool {
96 message
97 .header
98 .iter()
99 .any(|id| self.id_counts.contains_key(&id.to_string()))
100 }
101
102 fn insert(&mut self, mut message: CmrMessage) {
103 message.make_unsigned();
105 let key = cache_key(&message);
106 if self.entries.contains_key(&key) {
107 return;
108 }
109 let encoded_size = message.encoded_len();
110 let entry = CacheEntry {
111 key: key.clone(),
112 message,
113 encoded_size,
114 };
115 self.total_bytes = self.total_bytes.saturating_add(encoded_size);
116 self.order.push_back(key.clone());
117 self.add_message_ids(&entry.message);
118 self.entries.insert(key, entry);
119 self.evict_as_needed();
120 }
121
122 fn evict_as_needed(&mut self) {
123 while self.entries.len() > self.max_messages || self.total_bytes > self.max_bytes {
124 let Some(key) = self.order.pop_front() else {
125 break;
126 };
127 let Some(entry) = self.entries.remove(&key) else {
128 continue;
129 };
130 self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
131 self.remove_message_ids(&entry.message);
132 debug_assert_eq!(entry.key, key);
133 }
134 }
135
136 fn add_message_ids(&mut self, message: &CmrMessage) {
137 for id in &message.header {
138 *self.id_counts.entry(id.to_string()).or_default() += 1;
139 }
140 }
141
142 fn remove_message_ids(&mut self, message: &CmrMessage) {
143 for id in &message.header {
144 let id_key = id.to_string();
145 let mut remove = false;
146 if let Some(count) = self.id_counts.get_mut(&id_key) {
147 if *count > 1 {
148 *count -= 1;
149 } else {
150 remove = true;
151 }
152 }
153 if remove {
154 self.id_counts.remove(&id_key);
155 }
156 }
157 }
158
159 fn remove_by_key(&mut self, key: &str) -> Option<CacheEntry> {
160 let entry = self.entries.remove(key)?;
161 self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
162 self.remove_message_ids(&entry.message);
163 self.order.retain(|existing| existing != key);
164 Some(entry)
165 }
166
167 fn remove_if(&mut self, mut predicate: impl FnMut(&CmrMessage) -> bool) {
168 let to_remove = self
169 .order
170 .iter()
171 .filter_map(|key| {
172 self.entries
173 .get(key)
174 .filter(|entry| predicate(&entry.message))
175 .map(|_| key.clone())
176 })
177 .collect::<Vec<_>>();
178 for key in to_remove {
179 let _ = self.remove_by_key(&key);
180 }
181 }
182}
183
184#[derive(Clone, Debug)]
185struct PeerMetrics {
186 reputation: f64,
187 inbound_messages: u64,
188 inbound_bytes: u64,
189 outbound_messages: u64,
190 outbound_bytes: u64,
191 window: RateWindow,
192}
193
194impl Default for PeerMetrics {
195 fn default() -> Self {
196 Self {
197 reputation: 0.0,
198 inbound_messages: 0,
199 inbound_bytes: 0,
200 outbound_messages: 0,
201 outbound_bytes: 0,
202 window: RateWindow::new(),
203 }
204 }
205}
206
207#[derive(Clone, Debug)]
208struct RateWindow {
209 window: VecDeque<(Instant, u64)>,
210 bytes: u64,
211}
212
213impl RateWindow {
214 fn new() -> Self {
215 Self {
216 window: VecDeque::new(),
217 bytes: 0,
218 }
219 }
220
221 fn allow_and_record(
222 &mut self,
223 message_bytes: usize,
224 max_messages_per_minute: u32,
225 max_bytes_per_minute: u64,
226 ) -> bool {
227 let now = Instant::now();
228 let cutoff = Duration::from_secs(60);
229 while let Some((ts, bytes)) = self.window.front().copied() {
230 if now.duration_since(ts) < cutoff {
231 break;
232 }
233 self.window.pop_front();
234 self.bytes = self.bytes.saturating_sub(bytes);
235 }
236 let next_messages = self.window.len().saturating_add(1);
237 let next_bytes = self
238 .bytes
239 .saturating_add(u64::try_from(message_bytes).unwrap_or(u64::MAX));
240 if next_messages > usize::try_from(max_messages_per_minute).unwrap_or(usize::MAX)
241 || next_bytes > max_bytes_per_minute
242 {
243 return false;
244 }
245 self.window
246 .push_back((now, u64::try_from(message_bytes).unwrap_or(u64::MAX)));
247 self.bytes = next_bytes;
248 true
249 }
250}
251
252#[derive(Clone, Debug)]
253struct PendingRsaState {
254 n: BigUint,
255 d: BigUint,
256}
257
258#[derive(Clone, Debug)]
259struct PendingDhState {
260 p: BigUint,
261 a_secret: BigUint,
262}
263
264const MIN_RSA_MODULUS_BITS: u64 = 2048;
265const MIN_DH_MODULUS_BITS: u64 = 2048;
266
267#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
269pub enum ForwardReason {
270 MatchedForwardIncoming,
272 MatchedForwardCached,
274 CompensatoryReply,
276 KeyExchangeInitiation,
278 KeyExchangeReply,
280}
281
282#[derive(Clone, Debug, Default)]
283struct RoutingDecision {
284 best_peer: Option<String>,
285 matched_peers: Vec<String>,
286 matched_messages: Vec<CmrMessage>,
287 compensatory: Option<(String, CmrMessage)>,
288}
289
290#[derive(Clone, Debug)]
292pub struct ForwardAction {
293 pub destination: String,
295 pub message_bytes: Vec<u8>,
297 pub reason: ForwardReason,
299}
300
301#[derive(Debug, Error)]
303pub enum ProcessError {
304 #[error("parse error: {0}")]
306 Parse(#[from] ParseError),
307 #[error("duplicate message id in cache")]
309 DuplicateMessageId,
310 #[error("peer exceeded flood limits")]
312 FloodLimited,
313 #[error("global flood limits exceeded")]
315 GlobalFloodLimited,
316 #[error("peer reputation below threshold")]
318 ReputationTooLow,
319 #[error("unsigned message violates trust policy")]
321 UnsignedRejected,
322 #[error("signature verification failed")]
324 BadSignature,
325 #[error("signed message without known key rejected")]
327 SignedWithoutKnownKey,
328 #[error("message body exceeds content policy")]
330 BodyTooLarge,
331 #[error("binary content blocked by policy")]
333 BinaryContentBlocked,
334 #[error("executable payload blocked by policy")]
336 ExecutableBlocked,
337 #[error("message failed intrinsic dependence spam check")]
339 IntrinsicDependenceTooLow,
340 #[error("message intrinsic dependence score was not finite")]
342 IntrinsicDependenceInvalid,
343 #[error("compression oracle error: {0}")]
345 Compression(#[from] CompressionError),
346 #[error("key exchange parse error: {0}")]
348 KeyExchange(#[from] KeyExchangeError),
349 #[error("clear key exchange requires secure channel")]
351 ClearKeyOnInsecureChannel,
352 #[error("unexpected key exchange reply without pending state")]
354 MissingPendingKeyExchangeState,
355 #[error("weak key exchange parameters: {0}")]
357 WeakKeyExchangeParameters(&'static str),
358}
359
360#[derive(Debug)]
362pub struct ProcessOutcome {
363 pub accepted: bool,
365 pub drop_reason: Option<ProcessError>,
367 pub parsed_message: Option<CmrMessage>,
369 pub intrinsic_dependence: Option<f64>,
371 pub forwards: Vec<ForwardAction>,
373 pub matched_count: usize,
375 pub key_exchange_control: bool,
377}
378
379impl ProcessOutcome {
380 fn dropped(reason: ProcessError) -> Self {
381 Self {
382 accepted: false,
383 drop_reason: Some(reason),
384 parsed_message: None,
385 intrinsic_dependence: None,
386 forwards: Vec::new(),
387 matched_count: 0,
388 key_exchange_control: false,
389 }
390 }
391
392 fn accepted(message: CmrMessage) -> Self {
393 Self {
394 accepted: true,
395 drop_reason: None,
396 parsed_message: Some(message),
397 intrinsic_dependence: None,
398 forwards: Vec::new(),
399 matched_count: 0,
400 key_exchange_control: false,
401 }
402 }
403}
404
405pub struct Router<O: CompressionOracle> {
407 local_address: String,
408 policy: RoutingPolicy,
409 oracle: O,
410 cache: MessageCache,
411 peers: HashMap<String, PeerMetrics>,
412 global_window: RateWindow,
413 shared_keys: HashMap<String, Vec<u8>>,
414 pending_rsa: HashMap<String, PendingRsaState>,
415 pending_dh: HashMap<String, PendingDhState>,
416 forward_counter: u64,
417}
418
419impl<O: CompressionOracle> Router<O> {
420 #[must_use]
422 pub fn new(local_address: String, policy: RoutingPolicy, oracle: O) -> Self {
423 Self {
424 local_address,
425 cache: MessageCache::new(policy.cache_max_messages, policy.cache_max_bytes),
426 policy,
427 oracle,
428 peers: HashMap::new(),
429 global_window: RateWindow::new(),
430 shared_keys: HashMap::new(),
431 pending_rsa: HashMap::new(),
432 pending_dh: HashMap::new(),
433 forward_counter: 0,
434 }
435 }
436
437 pub fn set_shared_key(&mut self, peer: impl Into<String>, key: Vec<u8>) {
439 self.shared_keys.insert(peer.into(), key);
440 }
441
442 #[must_use]
444 pub fn shared_key(&self, peer: &str) -> Option<&[u8]> {
445 self.shared_keys.get(peer).map(Vec::as_slice)
446 }
447
448 pub fn register_pending_rsa_state(&mut self, peer: impl Into<String>, n: BigUint, d: BigUint) {
450 self.pending_rsa
451 .insert(peer.into(), PendingRsaState { n, d });
452 }
453
454 pub fn register_pending_dh_state(
456 &mut self,
457 peer: impl Into<String>,
458 p: BigUint,
459 a_secret: BigUint,
460 ) {
461 self.pending_dh
462 .insert(peer.into(), PendingDhState { p, a_secret });
463 }
464
465 #[must_use]
467 pub fn process_incoming(
468 &mut self,
469 raw_message: &[u8],
470 transport: TransportKind,
471 now: CmrTimestamp,
472 ) -> ProcessOutcome {
473 let parse_ctx = ParseContext {
474 now: now.clone(),
475 recipient_address: Some(self.local_address.as_str()),
476 max_message_bytes: self.policy.content.max_message_bytes,
477 max_header_ids: self.policy.content.max_header_ids,
478 };
479
480 let parsed = match parse_message(raw_message, &parse_ctx) {
481 Ok(m) => m,
482 Err(err) => return ProcessOutcome::dropped(ProcessError::Parse(err)),
483 };
484
485 if parsed.body.len() > self.policy.content.max_body_bytes {
486 return self.drop_for_peer(&parsed, ProcessError::BodyTooLarge, -2.0);
487 }
488
489 let sender = parsed.immediate_sender().to_owned();
490 if !self.check_global_rate(raw_message.len()) {
491 return self.drop_for_peer(&parsed, ProcessError::GlobalFloodLimited, -1.5);
492 }
493 if !self.check_peer_rate(&sender, raw_message.len()) {
494 return self.drop_for_peer(&parsed, ProcessError::FloodLimited, -2.0);
495 }
496 if self.peer_reputation(&sender) < self.policy.trust.min_reputation_score {
497 return self.drop_for_peer(&parsed, ProcessError::ReputationTooLow, -0.5);
498 }
499 if let Err(err) = self.validate_signature_policy(&parsed, &sender) {
500 return self.drop_for_peer(&parsed, err, -4.0);
501 }
502 if self.cache.has_seen_any_id(&parsed) {
503 return self.drop_for_peer(&parsed, ProcessError::DuplicateMessageId, -0.1);
504 }
505 if !self.policy.content.allow_binary_payloads && is_probably_binary(&parsed.body) {
506 return self.drop_for_peer(&parsed, ProcessError::BinaryContentBlocked, -0.4);
507 }
508 if self.policy.content.block_executable_magic && looks_like_executable(&parsed.body) {
509 return self.drop_for_peer(&parsed, ProcessError::ExecutableBlocked, -2.5);
510 }
511
512 match self.handle_key_exchange_control(&parsed, &sender, &transport, &now) {
513 Ok(Some(forwards)) => {
514 self.adjust_peer_reputation(&sender, 1.5);
515 self.record_peer_inbound(&sender, raw_message.len());
516 return ProcessOutcome {
517 accepted: true,
518 drop_reason: None,
519 parsed_message: Some(parsed),
520 intrinsic_dependence: None,
521 forwards,
522 matched_count: 0,
523 key_exchange_control: true,
524 };
525 }
526 Ok(None) => {}
527 Err(err) => return self.drop_for_peer(&parsed, err, -3.0),
528 }
529
530 let id_score = match self
531 .oracle
532 .intrinsic_dependence(&parsed.body, self.policy.spam.intrinsic_dependence_order)
533 {
534 Ok(score) => score,
535 Err(err) => {
536 return self.drop_for_peer(
537 &parsed,
538 ProcessError::Compression(err),
539 if self.policy.security_level == crate::policy::SecurityLevel::Trusted {
540 -0.2
541 } else {
542 -1.0
543 },
544 );
545 }
546 };
547 if !id_score.is_finite() {
548 return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceInvalid, -1.5);
549 }
550 if id_score < self.policy.spam.min_intrinsic_dependence {
551 return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceTooLow, -1.5);
552 }
553
554 let routing = match self.select_routing_decision(&parsed) {
555 Ok(decision) => decision,
556 Err(err) => return self.drop_for_peer(&parsed, err, -0.5),
557 };
558 let mut outcome = ProcessOutcome::accepted(parsed.clone());
559 outcome.intrinsic_dependence = Some(id_score);
560 outcome.matched_count = routing.matched_peers.len();
561
562 self.cache.insert(parsed.clone());
563 self.record_peer_inbound(&sender, raw_message.len());
564 self.adjust_peer_reputation(&sender, 0.4);
565
566 let mut limited = self.build_routing_forwards(&parsed, routing, &now);
567 limited.truncate(self.policy.throughput.max_forward_actions);
568 for action in &limited {
569 self.record_peer_outbound(&action.destination, action.message_bytes.len());
570 }
571 outcome.forwards = limited;
572 outcome
573 }
574
575 fn drop_for_peer(
576 &mut self,
577 parsed: &CmrMessage,
578 reason: ProcessError,
579 reputation_delta: f64,
580 ) -> ProcessOutcome {
581 let sender = parsed.immediate_sender().to_owned();
582 self.adjust_peer_reputation(&sender, reputation_delta);
583 ProcessOutcome {
584 accepted: false,
585 drop_reason: Some(reason),
586 parsed_message: Some(parsed.clone()),
587 intrinsic_dependence: None,
588 forwards: Vec::new(),
589 matched_count: 0,
590 key_exchange_control: false,
591 }
592 }
593
594 fn check_peer_rate(&mut self, peer: &str, message_bytes: usize) -> bool {
595 let metrics = self.peers.entry(peer.to_owned()).or_default();
596 metrics.window.allow_and_record(
597 message_bytes,
598 self.policy.throughput.per_peer_messages_per_minute,
599 self.policy.throughput.per_peer_bytes_per_minute,
600 )
601 }
602
603 fn check_global_rate(&mut self, message_bytes: usize) -> bool {
604 self.global_window.allow_and_record(
605 message_bytes,
606 self.policy.throughput.global_messages_per_minute,
607 self.policy.throughput.global_bytes_per_minute,
608 )
609 }
610
611 fn peer_reputation(&self, peer: &str) -> f64 {
612 self.peers.get(peer).map_or(0.0, |p| p.reputation)
613 }
614
615 fn adjust_peer_reputation(&mut self, peer: &str, delta: f64) {
616 let metrics = self.peers.entry(peer.to_owned()).or_default();
617 metrics.reputation = (metrics.reputation + delta).clamp(-100.0, 100.0);
618 }
619
620 fn record_peer_inbound(&mut self, peer: &str, bytes: usize) {
621 let metrics = self.peers.entry(peer.to_owned()).or_default();
622 metrics.inbound_messages = metrics.inbound_messages.saturating_add(1);
623 metrics.inbound_bytes = metrics
624 .inbound_bytes
625 .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
626 }
627
628 fn record_peer_outbound(&mut self, peer: &str, bytes: usize) {
629 let metrics = self.peers.entry(peer.to_owned()).or_default();
630 metrics.outbound_messages = metrics.outbound_messages.saturating_add(1);
631 metrics.outbound_bytes = metrics
632 .outbound_bytes
633 .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
634 }
635
636 fn can_forward_to_peer(&self, peer: &str) -> bool {
637 let Some(metrics) = self.peers.get(peer) else {
638 return true;
639 };
640 if metrics.inbound_bytes == 0 {
641 return metrics.outbound_bytes == 0;
642 }
643 let ratio = metrics.outbound_bytes as f64 / metrics.inbound_bytes as f64;
644 ratio <= self.policy.trust.max_outbound_inbound_ratio
645 }
646
647 fn validate_signature_policy(
648 &self,
649 message: &CmrMessage,
650 sender: &str,
651 ) -> Result<(), ProcessError> {
652 let known_key = self.shared_keys.get(sender);
653 match (&message.signature, known_key) {
654 (Signature::Unsigned, Some(_))
655 if self.policy.trust.require_signatures_from_known_peers =>
656 {
657 Err(ProcessError::UnsignedRejected)
658 }
659 (Signature::Unsigned, None) if !self.policy.trust.allow_unsigned_from_unknown_peers => {
660 Err(ProcessError::UnsignedRejected)
661 }
662 (Signature::Sha256(_), None) if self.policy.trust.reject_signed_without_known_key => {
663 Err(ProcessError::SignedWithoutKnownKey)
664 }
665 (Signature::Sha256(_), Some(key)) => {
666 if message
667 .signature
668 .verifies(&message.payload_without_signature_line(), Some(key))
669 {
670 Ok(())
671 } else {
672 Err(ProcessError::BadSignature)
673 }
674 }
675 _ => Ok(()),
676 }
677 }
678
679 fn handle_key_exchange_control(
680 &mut self,
681 message: &CmrMessage,
682 sender: &str,
683 transport: &TransportKind,
684 now: &CmrTimestamp,
685 ) -> Result<Option<Vec<ForwardAction>>, ProcessError> {
686 let Some(control) = parse_key_exchange(&message.body)? else {
687 return Ok(None);
688 };
689
690 if self.shared_keys.contains_key(sender) && matches!(message.signature, Signature::Unsigned)
691 {
692 return Err(ProcessError::UnsignedRejected);
693 }
694
695 let old_key = self.shared_keys.get(sender).cloned();
696 match control {
697 KeyExchangeMessage::ClearKey { key } => {
698 if !transport.is_secure_channel() {
699 return Err(ProcessError::ClearKeyOnInsecureChannel);
700 }
701 self.cache.insert(message.clone());
702 let derived =
703 derive_exchange_key_from_bytes(&self.local_address, sender, b"clear", &key);
704 self.shared_keys.insert(sender.to_owned(), derived);
705 self.purge_key_exchange_cache(sender);
706 Ok(Some(Vec::new()))
707 }
708 KeyExchangeMessage::RsaRequest { n, e } => {
709 validate_rsa_request_params(&n, &e)?;
710 let key = random_nonzero_biguint_below(&n).ok_or(
711 ProcessError::WeakKeyExchangeParameters("failed to generate RSA session key"),
712 )?;
713 self.cache.insert(message.clone());
714 let c = mod_pow(&key, &e, &n);
715 let reply_body = KeyExchangeMessage::RsaReply { c }.render().into_bytes();
716 let reply = self.build_control_reply(sender, reply_body, old_key.as_deref(), now);
717 self.shared_keys.insert(
718 sender.to_owned(),
719 derive_exchange_key(&self.local_address, sender, b"rsa", &key),
720 );
721 self.purge_key_exchange_cache(sender);
722 Ok(Some(vec![reply]))
723 }
724 KeyExchangeMessage::RsaReply { c } => {
725 let Some(state) = self.pending_rsa.remove(sender) else {
726 return Err(ProcessError::MissingPendingKeyExchangeState);
727 };
728 if c >= state.n {
729 return Err(ProcessError::WeakKeyExchangeParameters(
730 "RSA reply ciphertext out of range",
731 ));
732 }
733 let key = mod_pow(&c, &state.d, &state.n);
734 if key.is_zero() {
735 return Err(ProcessError::WeakKeyExchangeParameters(
736 "RSA shared key reduced to zero",
737 ));
738 }
739 self.cache.insert(message.clone());
740 self.shared_keys.insert(
741 sender.to_owned(),
742 derive_exchange_key(&self.local_address, sender, b"rsa", &key),
743 );
744 self.purge_key_exchange_cache(sender);
745 Ok(Some(Vec::new()))
746 }
747 KeyExchangeMessage::DhRequest { g, p, a_pub } => {
748 validate_dh_request_params(&g, &p, &a_pub)?;
749 let b_secret =
750 random_dh_secret(&p).ok_or(ProcessError::WeakKeyExchangeParameters(
751 "failed to generate DH secret exponent",
752 ))?;
753 let b_pub = mod_pow(&g, &b_secret, &p);
754 let shared = mod_pow(&a_pub, &b_secret, &p);
755 if shared <= BigUint::one() {
756 return Err(ProcessError::WeakKeyExchangeParameters(
757 "DH derived weak shared key",
758 ));
759 }
760 self.cache.insert(message.clone());
761 let reply_body = KeyExchangeMessage::DhReply { b_pub }.render().into_bytes();
762 let reply = self.build_control_reply(sender, reply_body, old_key.as_deref(), now);
763 self.shared_keys.insert(
764 sender.to_owned(),
765 derive_exchange_key(&self.local_address, sender, b"dh", &shared),
766 );
767 self.purge_key_exchange_cache(sender);
768 Ok(Some(vec![reply]))
769 }
770 KeyExchangeMessage::DhReply { b_pub } => {
771 let Some(state) = self.pending_dh.remove(sender) else {
772 return Err(ProcessError::MissingPendingKeyExchangeState);
773 };
774 validate_dh_reply_params(&b_pub, &state.p)?;
775 let shared = mod_pow(&b_pub, &state.a_secret, &state.p);
776 if shared <= BigUint::one() {
777 return Err(ProcessError::WeakKeyExchangeParameters(
778 "DH derived weak shared key",
779 ));
780 }
781 self.cache.insert(message.clone());
782 self.shared_keys.insert(
783 sender.to_owned(),
784 derive_exchange_key(&self.local_address, sender, b"dh", &shared),
785 );
786 self.purge_key_exchange_cache(sender);
787 Ok(Some(Vec::new()))
788 }
789 }
790 }
791
792 fn build_control_reply(
793 &mut self,
794 destination: &str,
795 body: Vec<u8>,
796 signing_key: Option<&[u8]>,
797 now: &CmrTimestamp,
798 ) -> ForwardAction {
799 let mut msg = CmrMessage {
800 signature: Signature::Unsigned,
801 header: vec![MessageId {
802 timestamp: self.next_forward_timestamp(now),
803 address: self.local_address.clone(),
804 }],
805 body,
806 };
807 if let Some(key) = signing_key {
808 msg.sign_with_key(key);
809 }
810 self.cache.insert(msg.clone());
811 ForwardAction {
812 destination: destination.to_owned(),
813 message_bytes: msg.to_bytes(),
814 reason: ForwardReason::KeyExchangeReply,
815 }
816 }
817
818 fn purge_key_exchange_cache(&mut self, peer: &str) {
819 let local = self.local_address.clone();
820 self.cache.remove_if(|message| {
821 parse_key_exchange(&message.body).ok().flatten().is_some()
822 && (message_contains_sender(message, peer)
823 || message_contains_sender(message, &local))
824 });
825 }
826
827 fn select_routing_decision(
828 &self,
829 incoming: &CmrMessage,
830 ) -> Result<RoutingDecision, ProcessError> {
831 let peer_corpora = self.collect_peer_corpora();
832 if peer_corpora.is_empty() {
833 return Ok(RoutingDecision::default());
834 }
835
836 let mut canonical_incoming = incoming.clone();
837 canonical_incoming.make_unsigned();
838 let incoming_bytes = canonical_incoming.to_bytes();
839 let mut peers: Vec<(&String, &Vec<u8>)> = peer_corpora.iter().collect();
840 peers.sort_by(|left, right| left.0.cmp(right.0));
841 let peer_names: Vec<String> = peers.iter().map(|(peer, _)| (*peer).to_owned()).collect();
842 let corpora: Vec<Vec<u8>> = peers.iter().map(|(_, corpus)| (*corpus).clone()).collect();
843 let distances = self
844 .oracle
845 .batch_compression_distance(&incoming_bytes, &corpora)
846 .map_err(ProcessError::Compression)?;
847
848 let mut best: Option<(String, f64)> = None;
849 let mut matched_peers = Vec::<String>::new();
850 for (peer, distance) in peer_names.into_iter().zip(distances.into_iter()) {
851 if !distance.is_finite() {
852 continue;
853 }
854 if distance <= self.policy.spam.max_match_distance {
855 matched_peers.push(peer.clone());
856 }
857 if best
858 .as_ref()
859 .is_none_or(|(_, best_distance)| distance < *best_distance)
860 {
861 best = Some((peer, distance));
862 }
863 }
864
865 let Some((best_peer, best_distance)) = best else {
866 return Ok(RoutingDecision::default());
867 };
868 if best_distance > self.policy.spam.max_match_distance || matched_peers.is_empty() {
869 return Ok(RoutingDecision::default());
870 }
871
872 let matched_set = matched_peers
873 .iter()
874 .map(String::as_str)
875 .collect::<HashSet<_>>();
876 let matched_messages = self
877 .cache
878 .order
879 .iter()
880 .filter_map(|key| self.cache.entries.get(key))
881 .filter(|entry| matched_set.contains(entry.message.immediate_sender()))
882 .map(|entry| entry.message.clone())
883 .collect::<Vec<_>>();
884
885 let compensatory = if message_contains_sender(incoming, &best_peer) {
886 self.select_compensatory_message(incoming, &best_peer, &peer_corpora)?
887 .map(|message| (best_peer.clone(), message))
888 } else {
889 None
890 };
891
892 Ok(RoutingDecision {
893 best_peer: Some(best_peer),
894 matched_peers,
895 matched_messages,
896 compensatory,
897 })
898 }
899
900 fn collect_peer_corpora(&self) -> HashMap<String, Vec<u8>> {
901 let mut peer_corpora = HashMap::<String, Vec<u8>>::new();
902 for key in &self.cache.order {
903 let Some(entry) = self.cache.entries.get(key) else {
904 continue;
905 };
906 let sender = entry.message.immediate_sender();
907 if sender == self.local_address.as_str() {
908 continue;
909 }
910 peer_corpora
911 .entry(sender.to_owned())
912 .or_default()
913 .extend_from_slice(&entry.message.to_bytes());
914 }
915 peer_corpora
916 }
917
918 fn select_compensatory_message(
919 &self,
920 incoming: &CmrMessage,
921 best_peer: &str,
922 peer_corpora: &HashMap<String, Vec<u8>>,
923 ) -> Result<Option<CmrMessage>, ProcessError> {
924 let ordered_entries = self
925 .cache
926 .order
927 .iter()
928 .filter_map(|key| self.cache.entries.get(key))
929 .collect::<Vec<_>>();
930 if ordered_entries.len() <= 1 {
931 return Ok(None);
932 }
933
934 let encoded_entries = ordered_entries
935 .iter()
936 .map(|entry| (entry.message.clone(), entry.message.to_bytes()))
937 .collect::<Vec<_>>();
938 let total_bytes = encoded_entries
939 .iter()
940 .map(|(_, bytes)| bytes.len())
941 .sum::<usize>();
942 let mut canonical_incoming = incoming.clone();
943 canonical_incoming.make_unsigned();
944 let mut x_guess = Vec::with_capacity(
945 canonical_incoming.encoded_len()
946 + peer_corpora.get(best_peer).map_or(0, std::vec::Vec::len),
947 );
948 x_guess.extend_from_slice(&canonical_incoming.to_bytes());
949 if let Some(known_from_best_peer) = peer_corpora.get(best_peer) {
950 x_guess.extend_from_slice(known_from_best_peer);
951 }
952
953 let mut best_score = f64::NEG_INFINITY;
954 let mut best_message = None;
955 for (idx, (candidate_message, candidate_bytes)) in encoded_entries.iter().enumerate() {
956 if message_contains_sender(candidate_message, best_peer) {
957 continue;
958 }
959 if total_bytes <= candidate_bytes.len() {
960 continue;
961 }
962
963 let mut remainder =
964 Vec::with_capacity(total_bytes.saturating_sub(candidate_bytes.len()));
965 for (other_idx, (_, other_bytes)) in encoded_entries.iter().enumerate() {
966 if idx == other_idx {
967 continue;
968 }
969 remainder.extend_from_slice(other_bytes);
970 }
971 if remainder.is_empty() {
972 continue;
973 }
974
975 let d_cache = self
976 .oracle
977 .compression_distance(candidate_bytes, &remainder)
978 .map_err(ProcessError::Compression)?;
979 let d_guess = self
980 .oracle
981 .compression_distance(&x_guess, candidate_bytes)
982 .map_err(ProcessError::Compression)?;
983 if !d_cache.is_finite() || !d_guess.is_finite() {
984 continue;
985 }
986 let score = d_cache - d_guess;
987 if score > best_score {
988 best_score = score;
989 best_message = Some(candidate_message.clone());
990 }
991 }
992
993 Ok(best_message)
994 }
995
996 fn build_routing_forwards(
997 &mut self,
998 incoming: &CmrMessage,
999 decision: RoutingDecision,
1000 now: &CmrTimestamp,
1001 ) -> Vec<ForwardAction> {
1002 let mut out = Vec::new();
1003 let mut dedupe = HashSet::<(String, String)>::new();
1004 let incoming_key = cache_key(incoming);
1005 let incoming_destinations = sorted_unique_addresses(&incoming.header);
1006 let suppress_best = decision
1007 .best_peer
1008 .as_deref()
1009 .filter(|peer| message_contains_sender(incoming, peer));
1010
1011 if let Some((destination, message)) = decision.compensatory.clone() {
1012 let dedupe_key = (destination.clone(), cache_key(&message));
1013 if !dedupe.contains(&dedupe_key) {
1014 let actions = self.forward_with_optional_key_exchange(
1015 &message,
1016 &destination,
1017 now,
1018 ForwardReason::CompensatoryReply,
1019 );
1020 if !actions.is_empty() {
1021 dedupe.insert(dedupe_key);
1022 out.extend(actions);
1023 }
1024 }
1025 }
1026
1027 for matched in &decision.matched_messages {
1028 for destination in sorted_unique_addresses(&matched.header) {
1029 if destination == self.local_address {
1030 continue;
1031 }
1032 if suppress_best.is_some_and(|peer| peer == destination) {
1033 continue;
1034 }
1035 let dedupe_key = (destination.clone(), incoming_key.clone());
1036 if dedupe.contains(&dedupe_key) {
1037 continue;
1038 }
1039 let actions = self.forward_with_optional_key_exchange(
1040 incoming,
1041 &destination,
1042 now,
1043 ForwardReason::MatchedForwardIncoming,
1044 );
1045 if !actions.is_empty() {
1046 dedupe.insert(dedupe_key);
1047 out.extend(actions);
1048 }
1049 }
1050
1051 let matched_key = cache_key(matched);
1052 for destination in &incoming_destinations {
1053 if destination == &self.local_address {
1054 continue;
1055 }
1056 let dedupe_key = (destination.clone(), matched_key.clone());
1057 if dedupe.contains(&dedupe_key) {
1058 continue;
1059 }
1060 let actions = self.forward_with_optional_key_exchange(
1061 matched,
1062 destination,
1063 now,
1064 ForwardReason::MatchedForwardCached,
1065 );
1066 if !actions.is_empty() {
1067 dedupe.insert(dedupe_key);
1068 out.extend(actions);
1069 }
1070 }
1071 }
1072
1073 out
1074 }
1075
1076 fn forward_with_optional_key_exchange(
1077 &mut self,
1078 message: &CmrMessage,
1079 destination: &str,
1080 now: &CmrTimestamp,
1081 reason: ForwardReason,
1082 ) -> Vec<ForwardAction> {
1083 if destination == self.local_address
1084 || !self.can_forward_to_peer(destination)
1085 || message_contains_sender(message, destination)
1086 {
1087 return Vec::new();
1088 }
1089
1090 let mut out = Vec::with_capacity(2);
1091 out.push(self.wrap_and_forward(message, destination, now, reason));
1092 if self.shared_keys.contains_key(destination)
1093 || self.pending_rsa.contains_key(destination)
1094 || self.pending_dh.contains_key(destination)
1095 {
1096 return out;
1097 }
1098
1099 if let Some(initiation) = self.build_key_exchange_initiation(destination, now) {
1100 out.push(initiation);
1101 }
1102 out
1103 }
1104
1105 fn build_key_exchange_initiation(
1106 &mut self,
1107 destination: &str,
1108 now: &CmrTimestamp,
1109 ) -> Option<ForwardAction> {
1110 match self.policy.trust.auto_key_exchange_mode {
1111 AutoKeyExchangeMode::Rsa => self
1112 .build_rsa_initiation(destination, now)
1113 .or_else(|| self.build_dh_initiation(destination, now)),
1114 AutoKeyExchangeMode::Dh => self
1115 .build_dh_initiation(destination, now)
1116 .or_else(|| self.build_rsa_initiation(destination, now)),
1117 }
1118 }
1119
1120 fn build_rsa_initiation(
1121 &mut self,
1122 destination: &str,
1123 now: &CmrTimestamp,
1124 ) -> Option<ForwardAction> {
1125 let e = BigUint::from(65_537_u32);
1126 let bits_each = usize::try_from(MIN_RSA_MODULUS_BITS / 2).ok()?;
1127 let mut generated = None;
1128 for _ in 0..8 {
1129 let p = generate_probable_prime(bits_each, 12)?;
1130 let mut q = generate_probable_prime(bits_each, 12)?;
1131 if q == p {
1132 q = generate_probable_prime(bits_each, 12)?;
1133 }
1134 if q == p {
1135 continue;
1136 }
1137
1138 let n = &p * &q;
1139 if n.bits() < MIN_RSA_MODULUS_BITS {
1140 continue;
1141 }
1142 let p1 = &p - BigUint::one();
1143 let q1 = &q - BigUint::one();
1144 let lambda = lcm_biguint(&p1, &q1);
1145 if gcd_biguint(&e, &lambda) != BigUint::one() {
1146 continue;
1147 }
1148 let Some(d) = mod_inverse_biguint(&e, &lambda) else {
1149 continue;
1150 };
1151 generated = Some((n, d));
1152 break;
1153 }
1154 let (n, d) = generated?;
1155 self.pending_rsa
1156 .insert(destination.to_owned(), PendingRsaState { n: n.clone(), d });
1157
1158 let body = KeyExchangeMessage::RsaRequest { n, e }
1159 .render()
1160 .into_bytes();
1161 let msg = CmrMessage {
1162 signature: Signature::Unsigned,
1163 header: vec![MessageId {
1164 timestamp: self.next_forward_timestamp(now),
1165 address: self.local_address.clone(),
1166 }],
1167 body,
1168 };
1169 self.cache.insert(msg.clone());
1170 Some(ForwardAction {
1171 destination: destination.to_owned(),
1172 message_bytes: msg.to_bytes(),
1173 reason: ForwardReason::KeyExchangeInitiation,
1174 })
1175 }
1176
1177 fn build_dh_initiation(
1178 &mut self,
1179 destination: &str,
1180 now: &CmrTimestamp,
1181 ) -> Option<ForwardAction> {
1182 let bits = usize::try_from(MIN_DH_MODULUS_BITS).ok()?;
1183 let p = generate_probable_safe_prime(bits, 10)?;
1184 let g = find_primitive_root_for_safe_prime(&p)?;
1185 let a_secret = random_dh_secret(&p)?;
1186 let a_pub = mod_pow(&g, &a_secret, &p);
1187 self.pending_dh.insert(
1188 destination.to_owned(),
1189 PendingDhState {
1190 p: p.clone(),
1191 a_secret,
1192 },
1193 );
1194
1195 let body = KeyExchangeMessage::DhRequest { g, p, a_pub }
1196 .render()
1197 .into_bytes();
1198 let msg = CmrMessage {
1199 signature: Signature::Unsigned,
1200 header: vec![MessageId {
1201 timestamp: self.next_forward_timestamp(now),
1202 address: self.local_address.clone(),
1203 }],
1204 body,
1205 };
1206 self.cache.insert(msg.clone());
1207 Some(ForwardAction {
1208 destination: destination.to_owned(),
1209 message_bytes: msg.to_bytes(),
1210 reason: ForwardReason::KeyExchangeInitiation,
1211 })
1212 }
1213
1214 fn wrap_and_forward(
1215 &mut self,
1216 message: &CmrMessage,
1217 destination: &str,
1218 now: &CmrTimestamp,
1219 reason: ForwardReason,
1220 ) -> ForwardAction {
1221 let mut forwarded = message.clone();
1222 forwarded.make_unsigned();
1223 forwarded.prepend_hop(MessageId {
1224 timestamp: self.next_forward_timestamp(now),
1225 address: self.local_address.clone(),
1226 });
1227 if let Some(key) = self.shared_keys.get(destination) {
1228 forwarded.sign_with_key(key);
1229 }
1230 ForwardAction {
1231 destination: destination.to_owned(),
1232 message_bytes: forwarded.to_bytes(),
1233 reason,
1234 }
1235 }
1236
1237 fn next_forward_timestamp(&mut self, now: &CmrTimestamp) -> CmrTimestamp {
1238 self.forward_counter = self.forward_counter.saturating_add(1);
1239 let fraction = format!("{:020}", self.forward_counter);
1240 now.clone().with_fraction(fraction)
1241 }
1242}
1243
1244fn cache_key(message: &CmrMessage) -> String {
1245 message
1246 .origin_id()
1247 .map_or_else(|| message.header[0].to_string(), MessageId::to_string)
1248}
1249
1250fn message_contains_sender(message: &CmrMessage, sender: &str) -> bool {
1251 message.header.iter().any(|id| id.address == sender)
1252}
1253
1254fn sorted_unique_addresses(header: &[MessageId]) -> Vec<String> {
1255 let mut addresses = header
1256 .iter()
1257 .map(|id| id.address.clone())
1258 .collect::<Vec<_>>();
1259 addresses.sort();
1260 addresses.dedup();
1261 addresses
1262}
1263
1264fn is_probably_binary(body: &[u8]) -> bool {
1265 if body.is_empty() {
1266 return false;
1267 }
1268 let non_text = body
1269 .iter()
1270 .copied()
1271 .filter(|b| !matches!(b, 0x09 | 0x0A | 0x0D | 0x20..=0x7E))
1272 .count();
1273 non_text * 10 > body.len() * 3
1274}
1275
1276fn looks_like_executable(body: &[u8]) -> bool {
1277 body.starts_with(b"\x7fELF")
1278 || body.starts_with(b"MZ")
1279 || body.starts_with(b"\xfe\xed\xfa\xce")
1280 || body.starts_with(b"\xce\xfa\xed\xfe")
1281 || body.starts_with(b"\xcf\xfa\xed\xfe")
1282 || body.starts_with(b"\xfe\xed\xfa\xcf")
1283}
1284
1285fn validate_rsa_request_params(n: &BigUint, e: &BigUint) -> Result<(), ProcessError> {
1286 if n.bits() < MIN_RSA_MODULUS_BITS {
1287 return Err(ProcessError::WeakKeyExchangeParameters(
1288 "RSA modulus too small",
1289 ));
1290 }
1291 let two = BigUint::from(2_u8);
1292 if n <= &two || (n % &two).is_zero() {
1293 return Err(ProcessError::WeakKeyExchangeParameters(
1294 "RSA modulus must be odd and > 2",
1295 ));
1296 }
1297 if e <= &two || (e % &two).is_zero() {
1298 return Err(ProcessError::WeakKeyExchangeParameters(
1299 "RSA exponent must be odd and > 2",
1300 ));
1301 }
1302 if e >= n {
1303 return Err(ProcessError::WeakKeyExchangeParameters(
1304 "RSA exponent must be smaller than modulus",
1305 ));
1306 }
1307 if is_probably_prime(n, 10) {
1308 return Err(ProcessError::WeakKeyExchangeParameters(
1309 "RSA modulus must be composite",
1310 ));
1311 }
1312 Ok(())
1313}
1314
1315fn validate_dh_request_params(
1316 g: &BigUint,
1317 p: &BigUint,
1318 a_pub: &BigUint,
1319) -> Result<(), ProcessError> {
1320 if p.bits() < MIN_DH_MODULUS_BITS {
1321 return Err(ProcessError::WeakKeyExchangeParameters(
1322 "DH modulus too small",
1323 ));
1324 }
1325 let two = BigUint::from(2_u8);
1326 if p <= &two || (p % &two).is_zero() {
1327 return Err(ProcessError::WeakKeyExchangeParameters(
1328 "DH modulus must be odd and > 2",
1329 ));
1330 }
1331 if !is_probably_safe_prime(p, 10) {
1332 return Err(ProcessError::WeakKeyExchangeParameters(
1333 "DH modulus must be a safe prime",
1334 ));
1335 }
1336
1337 let p_minus_one = p - BigUint::one();
1338 if g <= &BigUint::one() || g >= &p_minus_one {
1339 return Err(ProcessError::WeakKeyExchangeParameters(
1340 "DH generator must be in range (1, p-1)",
1341 ));
1342 }
1343 if a_pub <= &BigUint::one() || a_pub >= &p_minus_one {
1344 return Err(ProcessError::WeakKeyExchangeParameters(
1345 "DH public value must be in range (1, p-1)",
1346 ));
1347 }
1348 if !is_primitive_root_for_safe_prime(g, p) {
1349 return Err(ProcessError::WeakKeyExchangeParameters(
1350 "DH generator must be a primitive root of p",
1351 ));
1352 }
1353 Ok(())
1354}
1355
1356fn validate_dh_reply_params(b_pub: &BigUint, p: &BigUint) -> Result<(), ProcessError> {
1357 if !is_probably_safe_prime(p, 10) {
1358 return Err(ProcessError::WeakKeyExchangeParameters(
1359 "DH modulus must be a safe prime",
1360 ));
1361 }
1362 let p_minus_one = p - BigUint::one();
1363 if b_pub <= &BigUint::one() || b_pub >= &p_minus_one {
1364 return Err(ProcessError::WeakKeyExchangeParameters(
1365 "DH reply value must be in range (1, p-1)",
1366 ));
1367 }
1368 Ok(())
1369}
1370
1371fn is_primitive_root_for_safe_prime(g: &BigUint, p: &BigUint) -> bool {
1372 if p <= &BigUint::from(3_u8) {
1373 return false;
1374 }
1375 let p_minus_one = p - BigUint::one();
1376 if g <= &BigUint::one() || g >= &p_minus_one {
1377 return false;
1378 }
1379 let q = &p_minus_one >> 1usize;
1382 let one = BigUint::one();
1383 let two = BigUint::from(2_u8);
1384 mod_pow(g, &two, p) != one && mod_pow(g, &q, p) != one
1385}
1386
1387fn find_primitive_root_for_safe_prime(p: &BigUint) -> Option<BigUint> {
1388 for candidate in 2_u32..=65_537_u32 {
1389 let g = BigUint::from(candidate);
1390 if is_primitive_root_for_safe_prime(&g, p) {
1391 return Some(g);
1392 }
1393 }
1394 None
1395}
1396
1397fn random_nonzero_biguint_below(modulus: &BigUint) -> Option<BigUint> {
1398 let modulus_bits = usize::try_from(modulus.bits()).ok()?;
1399 if modulus_bits == 0 {
1400 return None;
1401 }
1402 let byte_len = modulus_bits.div_ceil(8);
1403 let excess_bits = byte_len.saturating_mul(8).saturating_sub(modulus_bits);
1404 let mut rng = rand::rng();
1405 let mut raw = vec![0_u8; byte_len];
1406 for _ in 0..256 {
1407 rng.fill_bytes(&mut raw);
1408 if excess_bits > 0 {
1409 raw[0] &= 0xff_u8 >> excess_bits;
1410 }
1411 let value = BigUint::from_bytes_be(&raw);
1412 if !value.is_zero() && &value < modulus {
1413 return Some(value);
1414 }
1415 }
1416 None
1417}
1418
1419fn random_dh_secret(p: &BigUint) -> Option<BigUint> {
1420 if p <= &BigUint::one() {
1421 return None;
1422 }
1423 let upper_bound = p - BigUint::one();
1424 for _ in 0..256 {
1425 let candidate = random_nonzero_biguint_below(&upper_bound)?;
1426 if candidate > BigUint::one() {
1427 return Some(candidate);
1428 }
1429 }
1430 None
1431}
1432
1433fn generate_probable_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1434 if bits < 2 {
1435 return None;
1436 }
1437 for _ in 0..4096 {
1438 let candidate = random_odd_biguint_with_bits(bits)?;
1439 if is_probably_prime(&candidate, rounds) {
1440 return Some(candidate);
1441 }
1442 }
1443 None
1444}
1445
1446fn generate_probable_safe_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1447 if bits < 3 {
1448 return None;
1449 }
1450 for _ in 0..256 {
1451 let q = generate_probable_prime(bits.saturating_sub(1), rounds)?;
1452 let p: BigUint = (&q << 1usize) + BigUint::one();
1453 if p.bits() >= u64::try_from(bits).ok()? && is_probably_prime(&p, rounds) {
1454 return Some(p);
1455 }
1456 }
1457 None
1458}
1459
1460fn random_odd_biguint_with_bits(bits: usize) -> Option<BigUint> {
1461 if bits < 2 {
1462 return None;
1463 }
1464 let byte_len = bits.div_ceil(8);
1465 let excess_bits = byte_len.saturating_mul(8).saturating_sub(bits);
1466 let mut bytes = vec![0_u8; byte_len];
1467 rand::rng().fill_bytes(&mut bytes);
1468 if excess_bits > 0 {
1469 bytes[0] &= 0xff_u8 >> excess_bits;
1470 }
1471 let top_bit = 7_u8.saturating_sub(u8::try_from(excess_bits).ok()?);
1472 bytes[0] |= 1_u8 << top_bit;
1473 bytes[byte_len.saturating_sub(1)] |= 1;
1474 Some(BigUint::from_bytes_be(&bytes))
1475}
1476
1477fn gcd_biguint(left: &BigUint, right: &BigUint) -> BigUint {
1478 let mut a = left.clone();
1479 let mut b = right.clone();
1480 while !b.is_zero() {
1481 let r = &a % &b;
1482 a = b;
1483 b = r;
1484 }
1485 a
1486}
1487
1488fn lcm_biguint(left: &BigUint, right: &BigUint) -> BigUint {
1489 if left.is_zero() || right.is_zero() {
1490 return BigUint::zero();
1491 }
1492 (left / gcd_biguint(left, right)) * right
1493}
1494
1495fn mod_inverse_biguint(value: &BigUint, modulus: &BigUint) -> Option<BigUint> {
1496 let a = value.to_bigint()?;
1497 let m = modulus.to_bigint()?;
1498 let (g, x, _) = extended_gcd_bigint(a, m.clone());
1499 if g != BigInt::one() {
1500 return None;
1501 }
1502 let mut reduced = x % &m;
1503 if reduced < BigInt::zero() {
1504 reduced += &m;
1505 }
1506 reduced.try_into().ok()
1507}
1508
1509fn extended_gcd_bigint(a: BigInt, b: BigInt) -> (BigInt, BigInt, BigInt) {
1510 let mut old_r = a;
1511 let mut r = b;
1512 let mut old_s = BigInt::one();
1513 let mut s = BigInt::zero();
1514 let mut old_t = BigInt::zero();
1515 let mut t = BigInt::one();
1516
1517 while r != BigInt::zero() {
1518 let q = &old_r / &r;
1519
1520 let new_r = &old_r - &q * &r;
1521 old_r = r;
1522 r = new_r;
1523
1524 let new_s = &old_s - &q * &s;
1525 old_s = s;
1526 s = new_s;
1527
1528 let new_t = &old_t - &q * &t;
1529 old_t = t;
1530 t = new_t;
1531 }
1532 (old_r, old_s, old_t)
1533}
1534
1535fn derive_exchange_key(local: &str, peer: &str, label: &[u8], secret: &BigUint) -> Vec<u8> {
1536 let mut ikm = secret.to_bytes_be();
1537 if ikm.is_empty() {
1538 ikm.push(0);
1539 }
1540 derive_exchange_key_from_bytes(local, peer, label, &ikm)
1541}
1542
1543fn derive_exchange_key_from_bytes(local: &str, peer: &str, label: &[u8], secret: &[u8]) -> Vec<u8> {
1544 let (left, right) = if local <= peer {
1545 (local.as_bytes(), peer.as_bytes())
1546 } else {
1547 (peer.as_bytes(), local.as_bytes())
1548 };
1549
1550 let hk = Hkdf::<Sha256>::new(Some(b"cmr-v1-key-exchange"), secret);
1551 let mut info = Vec::with_capacity(3 + label.len() + left.len() + right.len());
1552 info.extend_from_slice(b"cmr");
1553 info.push(0);
1554 info.extend_from_slice(label);
1555 info.push(0);
1556 info.extend_from_slice(left);
1557 info.push(0);
1558 info.extend_from_slice(right);
1559
1560 let mut out = [0_u8; 32];
1561 hk.expand(&info, &mut out)
1562 .expect("HKDF expand length is fixed and valid");
1563 out.to_vec()
1564}
1565
1566fn is_probably_safe_prime(p: &BigUint, rounds: usize) -> bool {
1567 if !is_probably_prime(p, rounds) {
1568 return false;
1569 }
1570 let one = BigUint::one();
1571 let two = BigUint::from(2_u8);
1572 if p <= &two {
1573 return false;
1574 }
1575 let q = (p - &one) >> 1;
1576 is_probably_prime(&q, rounds)
1577}
1578
1579fn is_probably_prime(n: &BigUint, rounds: usize) -> bool {
1580 let two = BigUint::from(2_u8);
1581 let three = BigUint::from(3_u8);
1582 if n < &two {
1583 return false;
1584 }
1585 if n == &two || n == &three {
1586 return true;
1587 }
1588 if (n % &two).is_zero() {
1589 return false;
1590 }
1591
1592 let one = BigUint::one();
1593 let n_minus_one = n - &one;
1594 let mut d = n_minus_one.clone();
1595 let mut s = 0_u32;
1596 while (&d % &two).is_zero() {
1597 d >>= 1;
1598 s = s.saturating_add(1);
1599 }
1600
1601 const BASES: [u8; 12] = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37];
1602 for &base in &BASES {
1603 let a = BigUint::from(base);
1604 if a >= n_minus_one {
1605 continue;
1606 }
1607 if is_miller_rabin_witness(n, &d, s, &a) {
1608 return false;
1609 }
1610 }
1611
1612 let three = BigUint::from(3_u8);
1613 let n_minus_three = n - &three;
1614 for _ in 0..rounds {
1615 let Some(offset) = random_nonzero_biguint_below(&n_minus_three) else {
1616 return false;
1617 };
1618 let a = offset + &two;
1619 if is_miller_rabin_witness(n, &d, s, &a) {
1620 return false;
1621 }
1622 }
1623
1624 true
1625}
1626
1627fn is_miller_rabin_witness(n: &BigUint, d: &BigUint, s: u32, a: &BigUint) -> bool {
1628 let one = BigUint::one();
1629 let n_minus_one = n - &one;
1630 let mut x = mod_pow(a, d, n);
1631 if x == one || x == n_minus_one {
1632 return false;
1633 }
1634 for _ in 1..s {
1635 x = (&x * &x) % n;
1636 if x == n_minus_one {
1637 return false;
1638 }
1639 }
1640 true
1641}
1642
1643#[cfg(test)]
1644mod tests {
1645 use super::*;
1646
1647 struct StubOracle;
1648
1649 impl CompressionOracle for StubOracle {
1650 fn ncd_sym(&self, _left: &[u8], _right: &[u8]) -> Result<f64, CompressionError> {
1651 Ok(0.4)
1652 }
1653
1654 fn compression_distance(
1655 &self,
1656 _left: &[u8],
1657 _right: &[u8],
1658 ) -> Result<f64, CompressionError> {
1659 Ok(0.4)
1660 }
1661
1662 fn intrinsic_dependence(
1663 &self,
1664 _data: &[u8],
1665 _max_order: i64,
1666 ) -> Result<f64, CompressionError> {
1667 Ok(0.5)
1668 }
1669 }
1670
1671 fn now() -> CmrTimestamp {
1672 CmrTimestamp::parse("2030/01/01 00:00:10").expect("ts")
1673 }
1674
1675 #[test]
1676 fn accepts_minimal_message() {
1677 let policy = RoutingPolicy::default();
1678 let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
1679 let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
1680 let outcome = router.process_incoming(raw, TransportKind::Http, now());
1681 assert!(outcome.accepted);
1682 assert!(outcome.drop_reason.is_none());
1683 }
1684
1685 #[test]
1686 fn rejects_duplicate_id() {
1687 let policy = RoutingPolicy::default();
1688 let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
1689 let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
1690 let first = router.process_incoming(raw, TransportKind::Http, now());
1691 assert!(first.accepted);
1692 let second = router.process_incoming(raw, TransportKind::Http, now());
1693 assert!(!second.accepted);
1694 assert!(matches!(
1695 second.drop_reason,
1696 Some(ProcessError::DuplicateMessageId)
1697 ));
1698 }
1699
1700 #[test]
1701 fn cache_inserts_messages_in_unsigned_canonical_form() {
1702 let mut cache = MessageCache::new(16, 1024 * 1024);
1703 let mut message = CmrMessage {
1704 signature: Signature::Unsigned,
1705 header: vec![MessageId {
1706 timestamp: CmrTimestamp::parse("2029/12/31 23:59:59").expect("timestamp"),
1707 address: "http://alice".to_owned(),
1708 }],
1709 body: b"payload".to_vec(),
1710 };
1711 message.sign_with_key(b"shared-key");
1712 assert!(matches!(message.signature, Signature::Sha256(_)));
1713
1714 let key = cache_key(&message);
1715 cache.insert(message);
1716 let stored = cache.entries.get(&key).expect("cached entry");
1717 assert!(matches!(stored.message.signature, Signature::Unsigned));
1718 assert!(stored.message.to_bytes().starts_with(b"0\r\n"));
1719 }
1720}