Skip to main content

cmr_core/
router.rs

1//! Router core: validation, security policy, and spec-driven forwarding.
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::time::{Duration, Instant};
5
6use hkdf::Hkdf;
7use num_bigint::{BigInt, BigUint, ToBigInt};
8use num_traits::{One, Zero};
9use rand::RngCore;
10use sha2::Sha256;
11use thiserror::Error;
12
13use crate::key_exchange::{KeyExchangeError, KeyExchangeMessage, mod_pow, parse_key_exchange};
14use crate::policy::{AutoKeyExchangeMode, RoutingPolicy};
15use crate::protocol::{
16    CmrMessage, CmrTimestamp, MessageId, ParseContext, ParseError, Signature, TransportKind,
17    parse_message,
18};
19
20/// Compression-oracle failures.
21#[derive(Debug, Error)]
22pub enum CompressionError {
23    /// Backing compressor unavailable.
24    #[error("compressor unavailable: {0}")]
25    Unavailable(String),
26    /// Backing compressor returned an error.
27    #[error("compressor failure: {0}")]
28    Failed(String),
29}
30
31/// Compression capability (intentionally abstracted from router process).
32pub trait CompressionOracle: Send + Sync {
33    /// Symmetric NCD-like distance.
34    fn ncd_sym(&self, left: &[u8], right: &[u8]) -> Result<f64, CompressionError>;
35    /// CMR Section 3.2 compression distance from spec:
36    /// `C(XY)-C(X) + C(YX)-C(Y)`.
37    fn compression_distance(&self, left: &[u8], right: &[u8]) -> Result<f64, CompressionError>;
38    /// Intrinsic dependence.
39    fn intrinsic_dependence(&self, data: &[u8], max_order: i64) -> Result<f64, CompressionError>;
40    /// Batch CMR distance, defaulting to repeated scalar calls.
41    fn batch_compression_distance(
42        &self,
43        target: &[u8],
44        candidates: &[Vec<u8>],
45    ) -> Result<Vec<f64>, CompressionError> {
46        let mut out = Vec::with_capacity(candidates.len());
47        for candidate in candidates {
48            out.push(self.compression_distance(target, candidate)?);
49        }
50        Ok(out)
51    }
52    /// Batch NCD, defaulting to repeated scalar calls.
53    fn batch_ncd_sym(
54        &self,
55        target: &[u8],
56        candidates: &[Vec<u8>],
57    ) -> Result<Vec<f64>, CompressionError> {
58        let mut out = Vec::with_capacity(candidates.len());
59        for candidate in candidates {
60            out.push(self.ncd_sym(target, candidate)?);
61        }
62        Ok(out)
63    }
64}
65
66#[derive(Clone, Debug)]
67struct CacheEntry {
68    key: String,
69    message: CmrMessage,
70    encoded_size: usize,
71}
72
73#[derive(Debug)]
74struct MessageCache {
75    entries: HashMap<String, CacheEntry>,
76    order: VecDeque<String>,
77    id_counts: HashMap<String, usize>,
78    total_bytes: usize,
79    max_messages: usize,
80    max_bytes: usize,
81}
82
83impl MessageCache {
84    fn new(max_messages: usize, max_bytes: usize) -> Self {
85        Self {
86            entries: HashMap::new(),
87            order: VecDeque::new(),
88            id_counts: HashMap::new(),
89            total_bytes: 0,
90            max_messages,
91            max_bytes,
92        }
93    }
94
95    fn has_seen_any_id(&self, message: &CmrMessage) -> bool {
96        message
97            .header
98            .iter()
99            .any(|id| self.id_counts.contains_key(&id.to_string()))
100    }
101
102    fn insert(&mut self, mut message: CmrMessage) {
103        // Cache canonical form without per-hop signature bytes.
104        message.make_unsigned();
105        let key = cache_key(&message);
106        if self.entries.contains_key(&key) {
107            return;
108        }
109        let encoded_size = message.encoded_len();
110        let entry = CacheEntry {
111            key: key.clone(),
112            message,
113            encoded_size,
114        };
115        self.total_bytes = self.total_bytes.saturating_add(encoded_size);
116        self.order.push_back(key.clone());
117        self.add_message_ids(&entry.message);
118        self.entries.insert(key, entry);
119        self.evict_as_needed();
120    }
121
122    fn evict_as_needed(&mut self) {
123        while self.entries.len() > self.max_messages || self.total_bytes > self.max_bytes {
124            let Some(key) = self.order.pop_front() else {
125                break;
126            };
127            let Some(entry) = self.entries.remove(&key) else {
128                continue;
129            };
130            self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
131            self.remove_message_ids(&entry.message);
132            debug_assert_eq!(entry.key, key);
133        }
134    }
135
136    fn add_message_ids(&mut self, message: &CmrMessage) {
137        for id in &message.header {
138            *self.id_counts.entry(id.to_string()).or_default() += 1;
139        }
140    }
141
142    fn remove_message_ids(&mut self, message: &CmrMessage) {
143        for id in &message.header {
144            let id_key = id.to_string();
145            let mut remove = false;
146            if let Some(count) = self.id_counts.get_mut(&id_key) {
147                if *count > 1 {
148                    *count -= 1;
149                } else {
150                    remove = true;
151                }
152            }
153            if remove {
154                self.id_counts.remove(&id_key);
155            }
156        }
157    }
158
159    fn remove_by_key(&mut self, key: &str) -> Option<CacheEntry> {
160        let entry = self.entries.remove(key)?;
161        self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
162        self.remove_message_ids(&entry.message);
163        self.order.retain(|existing| existing != key);
164        Some(entry)
165    }
166
167    fn remove_if(&mut self, mut predicate: impl FnMut(&CmrMessage) -> bool) {
168        let to_remove = self
169            .order
170            .iter()
171            .filter_map(|key| {
172                self.entries
173                    .get(key)
174                    .filter(|entry| predicate(&entry.message))
175                    .map(|_| key.clone())
176            })
177            .collect::<Vec<_>>();
178        for key in to_remove {
179            let _ = self.remove_by_key(&key);
180        }
181    }
182}
183
184#[derive(Clone, Debug)]
185struct PeerMetrics {
186    reputation: f64,
187    inbound_messages: u64,
188    inbound_bytes: u64,
189    outbound_messages: u64,
190    outbound_bytes: u64,
191    window: RateWindow,
192}
193
194impl Default for PeerMetrics {
195    fn default() -> Self {
196        Self {
197            reputation: 0.0,
198            inbound_messages: 0,
199            inbound_bytes: 0,
200            outbound_messages: 0,
201            outbound_bytes: 0,
202            window: RateWindow::new(),
203        }
204    }
205}
206
207#[derive(Clone, Debug)]
208struct RateWindow {
209    window: VecDeque<(Instant, u64)>,
210    bytes: u64,
211}
212
213impl RateWindow {
214    fn new() -> Self {
215        Self {
216            window: VecDeque::new(),
217            bytes: 0,
218        }
219    }
220
221    fn allow_and_record(
222        &mut self,
223        message_bytes: usize,
224        max_messages_per_minute: u32,
225        max_bytes_per_minute: u64,
226    ) -> bool {
227        let now = Instant::now();
228        let cutoff = Duration::from_secs(60);
229        while let Some((ts, bytes)) = self.window.front().copied() {
230            if now.duration_since(ts) < cutoff {
231                break;
232            }
233            self.window.pop_front();
234            self.bytes = self.bytes.saturating_sub(bytes);
235        }
236        let next_messages = self.window.len().saturating_add(1);
237        let next_bytes = self
238            .bytes
239            .saturating_add(u64::try_from(message_bytes).unwrap_or(u64::MAX));
240        if next_messages > usize::try_from(max_messages_per_minute).unwrap_or(usize::MAX)
241            || next_bytes > max_bytes_per_minute
242        {
243            return false;
244        }
245        self.window
246            .push_back((now, u64::try_from(message_bytes).unwrap_or(u64::MAX)));
247        self.bytes = next_bytes;
248        true
249    }
250}
251
252#[derive(Clone, Debug)]
253struct PendingRsaState {
254    n: BigUint,
255    d: BigUint,
256}
257
258#[derive(Clone, Debug)]
259struct PendingDhState {
260    p: BigUint,
261    a_secret: BigUint,
262}
263
264const MIN_RSA_MODULUS_BITS: u64 = 2048;
265const MIN_DH_MODULUS_BITS: u64 = 2048;
266
267/// Forward reason.
268#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
269pub enum ForwardReason {
270    /// Forwarded incoming message `X` using matched message header addresses.
271    MatchedForwardIncoming,
272    /// Forwarded matched cached message `Y` using incoming header addresses.
273    MatchedForwardCached,
274    /// Compensatory response chosen when best peer already sent X.
275    CompensatoryReply,
276    /// Key-exchange protocol initiation.
277    KeyExchangeInitiation,
278    /// Key-exchange protocol reply.
279    KeyExchangeReply,
280}
281
282#[derive(Clone, Debug, Default)]
283struct RoutingDecision {
284    best_peer: Option<String>,
285    matched_peers: Vec<String>,
286    matched_messages: Vec<CmrMessage>,
287    compensatory: Option<(String, CmrMessage)>,
288}
289
290/// Prepared outbound action.
291#[derive(Clone, Debug)]
292pub struct ForwardAction {
293    /// Recipient peer address.
294    pub destination: String,
295    /// Wire bytes.
296    pub message_bytes: Vec<u8>,
297    /// Reason for forwarding.
298    pub reason: ForwardReason,
299}
300
301/// Processing rejection reason.
302#[derive(Debug, Error)]
303pub enum ProcessError {
304    /// Parse failure.
305    #[error("parse error: {0}")]
306    Parse(#[from] ParseError),
307    /// Message duplicates cache by ID.
308    #[error("duplicate message id in cache")]
309    DuplicateMessageId,
310    /// Peer throttled by anti-flood controls.
311    #[error("peer exceeded flood limits")]
312    FloodLimited,
313    /// Global throttling triggered.
314    #[error("global flood limits exceeded")]
315    GlobalFloodLimited,
316    /// Peer reputation below threshold.
317    #[error("peer reputation below threshold")]
318    ReputationTooLow,
319    /// Unsigned message violates policy.
320    #[error("unsigned message violates trust policy")]
321    UnsignedRejected,
322    /// Signature cannot be verified.
323    #[error("signature verification failed")]
324    BadSignature,
325    /// Signed message from unknown key violates policy.
326    #[error("signed message without known key rejected")]
327    SignedWithoutKnownKey,
328    /// Message body exceeds policy.
329    #[error("message body exceeds content policy")]
330    BodyTooLarge,
331    /// Binary content blocked.
332    #[error("binary content blocked by policy")]
333    BinaryContentBlocked,
334    /// Executable payload blocked.
335    #[error("executable payload blocked by policy")]
336    ExecutableBlocked,
337    /// Intrinsic-dependence spam check failed.
338    #[error("message failed intrinsic dependence spam check")]
339    IntrinsicDependenceTooLow,
340    /// Intrinsic-dependence score was not finite.
341    #[error("message intrinsic dependence score was not finite")]
342    IntrinsicDependenceInvalid,
343    /// Compression oracle error.
344    #[error("compression oracle error: {0}")]
345    Compression(#[from] CompressionError),
346    /// Key-exchange parse error.
347    #[error("key exchange parse error: {0}")]
348    KeyExchange(#[from] KeyExchangeError),
349    /// Clear key exchange on insecure channel.
350    #[error("clear key exchange requires secure channel")]
351    ClearKeyOnInsecureChannel,
352    /// Malformed key-exchange state.
353    #[error("unexpected key exchange reply without pending state")]
354    MissingPendingKeyExchangeState,
355    /// Weak/unsafe key-exchange parameters.
356    #[error("weak key exchange parameters: {0}")]
357    WeakKeyExchangeParameters(&'static str),
358}
359
360/// Result of processing one inbound message.
361#[derive(Debug)]
362pub struct ProcessOutcome {
363    /// Whether message was accepted.
364    pub accepted: bool,
365    /// Drop reason when not accepted.
366    pub drop_reason: Option<ProcessError>,
367    /// Parsed message if available.
368    pub parsed_message: Option<CmrMessage>,
369    /// Intrinsic dependence score when computed.
370    pub intrinsic_dependence: Option<f64>,
371    /// Generated forwarding actions.
372    pub forwards: Vec<ForwardAction>,
373    /// Number of semantic matches found.
374    pub matched_count: usize,
375    /// Whether this was a key exchange control message.
376    pub key_exchange_control: bool,
377}
378
379impl ProcessOutcome {
380    fn dropped(reason: ProcessError) -> Self {
381        Self {
382            accepted: false,
383            drop_reason: Some(reason),
384            parsed_message: None,
385            intrinsic_dependence: None,
386            forwards: Vec::new(),
387            matched_count: 0,
388            key_exchange_control: false,
389        }
390    }
391
392    fn accepted(message: CmrMessage) -> Self {
393        Self {
394            accepted: true,
395            drop_reason: None,
396            parsed_message: Some(message),
397            intrinsic_dependence: None,
398            forwards: Vec::new(),
399            matched_count: 0,
400            key_exchange_control: false,
401        }
402    }
403}
404
405/// In-memory CMR router.
406pub struct Router<O: CompressionOracle> {
407    local_address: String,
408    policy: RoutingPolicy,
409    oracle: O,
410    cache: MessageCache,
411    peers: HashMap<String, PeerMetrics>,
412    global_window: RateWindow,
413    shared_keys: HashMap<String, Vec<u8>>,
414    pending_rsa: HashMap<String, PendingRsaState>,
415    pending_dh: HashMap<String, PendingDhState>,
416    forward_counter: u64,
417}
418
419impl<O: CompressionOracle> Router<O> {
420    /// Creates a router instance.
421    #[must_use]
422    pub fn new(local_address: String, policy: RoutingPolicy, oracle: O) -> Self {
423        Self {
424            local_address,
425            cache: MessageCache::new(policy.cache_max_messages, policy.cache_max_bytes),
426            policy,
427            oracle,
428            peers: HashMap::new(),
429            global_window: RateWindow::new(),
430            shared_keys: HashMap::new(),
431            pending_rsa: HashMap::new(),
432            pending_dh: HashMap::new(),
433            forward_counter: 0,
434        }
435    }
436
437    /// Registers a pairwise shared key.
438    pub fn set_shared_key(&mut self, peer: impl Into<String>, key: Vec<u8>) {
439        self.shared_keys.insert(peer.into(), key);
440    }
441
442    /// Gets known shared key.
443    #[must_use]
444    pub fn shared_key(&self, peer: &str) -> Option<&[u8]> {
445        self.shared_keys.get(peer).map(Vec::as_slice)
446    }
447
448    /// Stores pending RSA initiator state for incoming replies.
449    pub fn register_pending_rsa_state(&mut self, peer: impl Into<String>, n: BigUint, d: BigUint) {
450        self.pending_rsa
451            .insert(peer.into(), PendingRsaState { n, d });
452    }
453
454    /// Stores pending DH initiator state for incoming replies.
455    pub fn register_pending_dh_state(
456        &mut self,
457        peer: impl Into<String>,
458        p: BigUint,
459        a_secret: BigUint,
460    ) {
461        self.pending_dh
462            .insert(peer.into(), PendingDhState { p, a_secret });
463    }
464
465    /// Processes one inbound message.
466    #[must_use]
467    pub fn process_incoming(
468        &mut self,
469        raw_message: &[u8],
470        transport: TransportKind,
471        now: CmrTimestamp,
472    ) -> ProcessOutcome {
473        let parse_ctx = ParseContext {
474            now: now.clone(),
475            recipient_address: Some(self.local_address.as_str()),
476            max_message_bytes: self.policy.content.max_message_bytes,
477            max_header_ids: self.policy.content.max_header_ids,
478        };
479
480        let parsed = match parse_message(raw_message, &parse_ctx) {
481            Ok(m) => m,
482            Err(err) => return ProcessOutcome::dropped(ProcessError::Parse(err)),
483        };
484
485        if parsed.body.len() > self.policy.content.max_body_bytes {
486            return self.drop_for_peer(&parsed, ProcessError::BodyTooLarge, -2.0);
487        }
488
489        let sender = parsed.immediate_sender().to_owned();
490        if !self.check_global_rate(raw_message.len()) {
491            return self.drop_for_peer(&parsed, ProcessError::GlobalFloodLimited, -1.5);
492        }
493        if !self.check_peer_rate(&sender, raw_message.len()) {
494            return self.drop_for_peer(&parsed, ProcessError::FloodLimited, -2.0);
495        }
496        if self.peer_reputation(&sender) < self.policy.trust.min_reputation_score {
497            return self.drop_for_peer(&parsed, ProcessError::ReputationTooLow, -0.5);
498        }
499        if let Err(err) = self.validate_signature_policy(&parsed, &sender) {
500            return self.drop_for_peer(&parsed, err, -4.0);
501        }
502        if self.cache.has_seen_any_id(&parsed) {
503            return self.drop_for_peer(&parsed, ProcessError::DuplicateMessageId, -0.1);
504        }
505        if !self.policy.content.allow_binary_payloads && is_probably_binary(&parsed.body) {
506            return self.drop_for_peer(&parsed, ProcessError::BinaryContentBlocked, -0.4);
507        }
508        if self.policy.content.block_executable_magic && looks_like_executable(&parsed.body) {
509            return self.drop_for_peer(&parsed, ProcessError::ExecutableBlocked, -2.5);
510        }
511
512        match self.handle_key_exchange_control(&parsed, &sender, &transport, &now) {
513            Ok(Some(forwards)) => {
514                self.adjust_peer_reputation(&sender, 1.5);
515                self.record_peer_inbound(&sender, raw_message.len());
516                return ProcessOutcome {
517                    accepted: true,
518                    drop_reason: None,
519                    parsed_message: Some(parsed),
520                    intrinsic_dependence: None,
521                    forwards,
522                    matched_count: 0,
523                    key_exchange_control: true,
524                };
525            }
526            Ok(None) => {}
527            Err(err) => return self.drop_for_peer(&parsed, err, -3.0),
528        }
529
530        let id_score = match self
531            .oracle
532            .intrinsic_dependence(&parsed.body, self.policy.spam.intrinsic_dependence_order)
533        {
534            Ok(score) => score,
535            Err(err) => {
536                return self.drop_for_peer(
537                    &parsed,
538                    ProcessError::Compression(err),
539                    if self.policy.security_level == crate::policy::SecurityLevel::Trusted {
540                        -0.2
541                    } else {
542                        -1.0
543                    },
544                );
545            }
546        };
547        if !id_score.is_finite() {
548            return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceInvalid, -1.5);
549        }
550        if id_score < self.policy.spam.min_intrinsic_dependence {
551            return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceTooLow, -1.5);
552        }
553
554        let routing = match self.select_routing_decision(&parsed) {
555            Ok(decision) => decision,
556            Err(err) => return self.drop_for_peer(&parsed, err, -0.5),
557        };
558        let mut outcome = ProcessOutcome::accepted(parsed.clone());
559        outcome.intrinsic_dependence = Some(id_score);
560        outcome.matched_count = routing.matched_peers.len();
561
562        self.cache.insert(parsed.clone());
563        self.record_peer_inbound(&sender, raw_message.len());
564        self.adjust_peer_reputation(&sender, 0.4);
565
566        let mut limited = self.build_routing_forwards(&parsed, routing, &now);
567        limited.truncate(self.policy.throughput.max_forward_actions);
568        for action in &limited {
569            self.record_peer_outbound(&action.destination, action.message_bytes.len());
570        }
571        outcome.forwards = limited;
572        outcome
573    }
574
575    fn drop_for_peer(
576        &mut self,
577        parsed: &CmrMessage,
578        reason: ProcessError,
579        reputation_delta: f64,
580    ) -> ProcessOutcome {
581        let sender = parsed.immediate_sender().to_owned();
582        self.adjust_peer_reputation(&sender, reputation_delta);
583        ProcessOutcome {
584            accepted: false,
585            drop_reason: Some(reason),
586            parsed_message: Some(parsed.clone()),
587            intrinsic_dependence: None,
588            forwards: Vec::new(),
589            matched_count: 0,
590            key_exchange_control: false,
591        }
592    }
593
594    fn check_peer_rate(&mut self, peer: &str, message_bytes: usize) -> bool {
595        let metrics = self.peers.entry(peer.to_owned()).or_default();
596        metrics.window.allow_and_record(
597            message_bytes,
598            self.policy.throughput.per_peer_messages_per_minute,
599            self.policy.throughput.per_peer_bytes_per_minute,
600        )
601    }
602
603    fn check_global_rate(&mut self, message_bytes: usize) -> bool {
604        self.global_window.allow_and_record(
605            message_bytes,
606            self.policy.throughput.global_messages_per_minute,
607            self.policy.throughput.global_bytes_per_minute,
608        )
609    }
610
611    fn peer_reputation(&self, peer: &str) -> f64 {
612        self.peers.get(peer).map_or(0.0, |p| p.reputation)
613    }
614
615    fn adjust_peer_reputation(&mut self, peer: &str, delta: f64) {
616        let metrics = self.peers.entry(peer.to_owned()).or_default();
617        metrics.reputation = (metrics.reputation + delta).clamp(-100.0, 100.0);
618    }
619
620    fn record_peer_inbound(&mut self, peer: &str, bytes: usize) {
621        let metrics = self.peers.entry(peer.to_owned()).or_default();
622        metrics.inbound_messages = metrics.inbound_messages.saturating_add(1);
623        metrics.inbound_bytes = metrics
624            .inbound_bytes
625            .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
626    }
627
628    fn record_peer_outbound(&mut self, peer: &str, bytes: usize) {
629        let metrics = self.peers.entry(peer.to_owned()).or_default();
630        metrics.outbound_messages = metrics.outbound_messages.saturating_add(1);
631        metrics.outbound_bytes = metrics
632            .outbound_bytes
633            .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
634    }
635
636    fn can_forward_to_peer(&self, peer: &str) -> bool {
637        let Some(metrics) = self.peers.get(peer) else {
638            return true;
639        };
640        if metrics.inbound_bytes == 0 {
641            return metrics.outbound_bytes == 0;
642        }
643        let ratio = metrics.outbound_bytes as f64 / metrics.inbound_bytes as f64;
644        ratio <= self.policy.trust.max_outbound_inbound_ratio
645    }
646
647    fn validate_signature_policy(
648        &self,
649        message: &CmrMessage,
650        sender: &str,
651    ) -> Result<(), ProcessError> {
652        let known_key = self.shared_keys.get(sender);
653        match (&message.signature, known_key) {
654            (Signature::Unsigned, Some(_))
655                if self.policy.trust.require_signatures_from_known_peers =>
656            {
657                Err(ProcessError::UnsignedRejected)
658            }
659            (Signature::Unsigned, None) if !self.policy.trust.allow_unsigned_from_unknown_peers => {
660                Err(ProcessError::UnsignedRejected)
661            }
662            (Signature::Sha256(_), None) if self.policy.trust.reject_signed_without_known_key => {
663                Err(ProcessError::SignedWithoutKnownKey)
664            }
665            (Signature::Sha256(_), Some(key)) => {
666                if message
667                    .signature
668                    .verifies(&message.payload_without_signature_line(), Some(key))
669                {
670                    Ok(())
671                } else {
672                    Err(ProcessError::BadSignature)
673                }
674            }
675            _ => Ok(()),
676        }
677    }
678
679    fn handle_key_exchange_control(
680        &mut self,
681        message: &CmrMessage,
682        sender: &str,
683        transport: &TransportKind,
684        now: &CmrTimestamp,
685    ) -> Result<Option<Vec<ForwardAction>>, ProcessError> {
686        let Some(control) = parse_key_exchange(&message.body)? else {
687            return Ok(None);
688        };
689
690        if self.shared_keys.contains_key(sender) && matches!(message.signature, Signature::Unsigned)
691        {
692            return Err(ProcessError::UnsignedRejected);
693        }
694
695        let old_key = self.shared_keys.get(sender).cloned();
696        match control {
697            KeyExchangeMessage::ClearKey { key } => {
698                if !transport.is_secure_channel() {
699                    return Err(ProcessError::ClearKeyOnInsecureChannel);
700                }
701                self.cache.insert(message.clone());
702                let derived =
703                    derive_exchange_key_from_bytes(&self.local_address, sender, b"clear", &key);
704                self.shared_keys.insert(sender.to_owned(), derived);
705                self.purge_key_exchange_cache(sender);
706                Ok(Some(Vec::new()))
707            }
708            KeyExchangeMessage::RsaRequest { n, e } => {
709                validate_rsa_request_params(&n, &e)?;
710                let key = random_nonzero_biguint_below(&n).ok_or(
711                    ProcessError::WeakKeyExchangeParameters("failed to generate RSA session key"),
712                )?;
713                self.cache.insert(message.clone());
714                let c = mod_pow(&key, &e, &n);
715                let reply_body = KeyExchangeMessage::RsaReply { c }.render().into_bytes();
716                let reply = self.build_control_reply(sender, reply_body, old_key.as_deref(), now);
717                self.shared_keys.insert(
718                    sender.to_owned(),
719                    derive_exchange_key(&self.local_address, sender, b"rsa", &key),
720                );
721                self.purge_key_exchange_cache(sender);
722                Ok(Some(vec![reply]))
723            }
724            KeyExchangeMessage::RsaReply { c } => {
725                let Some(state) = self.pending_rsa.remove(sender) else {
726                    return Err(ProcessError::MissingPendingKeyExchangeState);
727                };
728                if c >= state.n {
729                    return Err(ProcessError::WeakKeyExchangeParameters(
730                        "RSA reply ciphertext out of range",
731                    ));
732                }
733                let key = mod_pow(&c, &state.d, &state.n);
734                if key.is_zero() {
735                    return Err(ProcessError::WeakKeyExchangeParameters(
736                        "RSA shared key reduced to zero",
737                    ));
738                }
739                self.cache.insert(message.clone());
740                self.shared_keys.insert(
741                    sender.to_owned(),
742                    derive_exchange_key(&self.local_address, sender, b"rsa", &key),
743                );
744                self.purge_key_exchange_cache(sender);
745                Ok(Some(Vec::new()))
746            }
747            KeyExchangeMessage::DhRequest { g, p, a_pub } => {
748                validate_dh_request_params(&g, &p, &a_pub)?;
749                let b_secret =
750                    random_dh_secret(&p).ok_or(ProcessError::WeakKeyExchangeParameters(
751                        "failed to generate DH secret exponent",
752                    ))?;
753                let b_pub = mod_pow(&g, &b_secret, &p);
754                let shared = mod_pow(&a_pub, &b_secret, &p);
755                if shared <= BigUint::one() {
756                    return Err(ProcessError::WeakKeyExchangeParameters(
757                        "DH derived weak shared key",
758                    ));
759                }
760                self.cache.insert(message.clone());
761                let reply_body = KeyExchangeMessage::DhReply { b_pub }.render().into_bytes();
762                let reply = self.build_control_reply(sender, reply_body, old_key.as_deref(), now);
763                self.shared_keys.insert(
764                    sender.to_owned(),
765                    derive_exchange_key(&self.local_address, sender, b"dh", &shared),
766                );
767                self.purge_key_exchange_cache(sender);
768                Ok(Some(vec![reply]))
769            }
770            KeyExchangeMessage::DhReply { b_pub } => {
771                let Some(state) = self.pending_dh.remove(sender) else {
772                    return Err(ProcessError::MissingPendingKeyExchangeState);
773                };
774                validate_dh_reply_params(&b_pub, &state.p)?;
775                let shared = mod_pow(&b_pub, &state.a_secret, &state.p);
776                if shared <= BigUint::one() {
777                    return Err(ProcessError::WeakKeyExchangeParameters(
778                        "DH derived weak shared key",
779                    ));
780                }
781                self.cache.insert(message.clone());
782                self.shared_keys.insert(
783                    sender.to_owned(),
784                    derive_exchange_key(&self.local_address, sender, b"dh", &shared),
785                );
786                self.purge_key_exchange_cache(sender);
787                Ok(Some(Vec::new()))
788            }
789        }
790    }
791
792    fn build_control_reply(
793        &mut self,
794        destination: &str,
795        body: Vec<u8>,
796        signing_key: Option<&[u8]>,
797        now: &CmrTimestamp,
798    ) -> ForwardAction {
799        let mut msg = CmrMessage {
800            signature: Signature::Unsigned,
801            header: vec![MessageId {
802                timestamp: self.next_forward_timestamp(now),
803                address: self.local_address.clone(),
804            }],
805            body,
806        };
807        if let Some(key) = signing_key {
808            msg.sign_with_key(key);
809        }
810        self.cache.insert(msg.clone());
811        ForwardAction {
812            destination: destination.to_owned(),
813            message_bytes: msg.to_bytes(),
814            reason: ForwardReason::KeyExchangeReply,
815        }
816    }
817
818    fn purge_key_exchange_cache(&mut self, peer: &str) {
819        let local = self.local_address.clone();
820        self.cache.remove_if(|message| {
821            parse_key_exchange(&message.body).ok().flatten().is_some()
822                && (message_contains_sender(message, peer)
823                    || message_contains_sender(message, &local))
824        });
825    }
826
827    fn select_routing_decision(
828        &self,
829        incoming: &CmrMessage,
830    ) -> Result<RoutingDecision, ProcessError> {
831        let peer_corpora = self.collect_peer_corpora();
832        if peer_corpora.is_empty() {
833            return Ok(RoutingDecision::default());
834        }
835
836        let mut canonical_incoming = incoming.clone();
837        canonical_incoming.make_unsigned();
838        let incoming_bytes = canonical_incoming.to_bytes();
839        let mut peers: Vec<(&String, &Vec<u8>)> = peer_corpora.iter().collect();
840        peers.sort_by(|left, right| left.0.cmp(right.0));
841        let peer_names: Vec<String> = peers.iter().map(|(peer, _)| (*peer).to_owned()).collect();
842        let corpora: Vec<Vec<u8>> = peers.iter().map(|(_, corpus)| (*corpus).clone()).collect();
843        let distances = self
844            .oracle
845            .batch_compression_distance(&incoming_bytes, &corpora)
846            .map_err(ProcessError::Compression)?;
847
848        let mut best: Option<(String, f64)> = None;
849        let mut matched_peers = Vec::<String>::new();
850        for (peer, distance) in peer_names.into_iter().zip(distances.into_iter()) {
851            if !distance.is_finite() {
852                continue;
853            }
854            if distance <= self.policy.spam.max_match_distance {
855                matched_peers.push(peer.clone());
856            }
857            if best
858                .as_ref()
859                .is_none_or(|(_, best_distance)| distance < *best_distance)
860            {
861                best = Some((peer, distance));
862            }
863        }
864
865        let Some((best_peer, best_distance)) = best else {
866            return Ok(RoutingDecision::default());
867        };
868        if best_distance > self.policy.spam.max_match_distance || matched_peers.is_empty() {
869            return Ok(RoutingDecision::default());
870        }
871
872        let matched_set = matched_peers
873            .iter()
874            .map(String::as_str)
875            .collect::<HashSet<_>>();
876        let matched_messages = self
877            .cache
878            .order
879            .iter()
880            .filter_map(|key| self.cache.entries.get(key))
881            .filter(|entry| matched_set.contains(entry.message.immediate_sender()))
882            .map(|entry| entry.message.clone())
883            .collect::<Vec<_>>();
884
885        let compensatory = if message_contains_sender(incoming, &best_peer) {
886            self.select_compensatory_message(incoming, &best_peer, &peer_corpora)?
887                .map(|message| (best_peer.clone(), message))
888        } else {
889            None
890        };
891
892        Ok(RoutingDecision {
893            best_peer: Some(best_peer),
894            matched_peers,
895            matched_messages,
896            compensatory,
897        })
898    }
899
900    fn collect_peer_corpora(&self) -> HashMap<String, Vec<u8>> {
901        let mut peer_corpora = HashMap::<String, Vec<u8>>::new();
902        for key in &self.cache.order {
903            let Some(entry) = self.cache.entries.get(key) else {
904                continue;
905            };
906            let sender = entry.message.immediate_sender();
907            if sender == self.local_address.as_str() {
908                continue;
909            }
910            peer_corpora
911                .entry(sender.to_owned())
912                .or_default()
913                .extend_from_slice(&entry.message.to_bytes());
914        }
915        peer_corpora
916    }
917
918    fn select_compensatory_message(
919        &self,
920        incoming: &CmrMessage,
921        best_peer: &str,
922        peer_corpora: &HashMap<String, Vec<u8>>,
923    ) -> Result<Option<CmrMessage>, ProcessError> {
924        let ordered_entries = self
925            .cache
926            .order
927            .iter()
928            .filter_map(|key| self.cache.entries.get(key))
929            .collect::<Vec<_>>();
930        if ordered_entries.len() <= 1 {
931            return Ok(None);
932        }
933
934        let encoded_entries = ordered_entries
935            .iter()
936            .map(|entry| (entry.message.clone(), entry.message.to_bytes()))
937            .collect::<Vec<_>>();
938        let total_bytes = encoded_entries
939            .iter()
940            .map(|(_, bytes)| bytes.len())
941            .sum::<usize>();
942        let mut canonical_incoming = incoming.clone();
943        canonical_incoming.make_unsigned();
944        let mut x_guess = Vec::with_capacity(
945            canonical_incoming.encoded_len()
946                + peer_corpora.get(best_peer).map_or(0, std::vec::Vec::len),
947        );
948        x_guess.extend_from_slice(&canonical_incoming.to_bytes());
949        if let Some(known_from_best_peer) = peer_corpora.get(best_peer) {
950            x_guess.extend_from_slice(known_from_best_peer);
951        }
952
953        let mut best_score = f64::NEG_INFINITY;
954        let mut best_message = None;
955        for (idx, (candidate_message, candidate_bytes)) in encoded_entries.iter().enumerate() {
956            if message_contains_sender(candidate_message, best_peer) {
957                continue;
958            }
959            if total_bytes <= candidate_bytes.len() {
960                continue;
961            }
962
963            let mut remainder =
964                Vec::with_capacity(total_bytes.saturating_sub(candidate_bytes.len()));
965            for (other_idx, (_, other_bytes)) in encoded_entries.iter().enumerate() {
966                if idx == other_idx {
967                    continue;
968                }
969                remainder.extend_from_slice(other_bytes);
970            }
971            if remainder.is_empty() {
972                continue;
973            }
974
975            let d_cache = self
976                .oracle
977                .compression_distance(candidate_bytes, &remainder)
978                .map_err(ProcessError::Compression)?;
979            let d_guess = self
980                .oracle
981                .compression_distance(&x_guess, candidate_bytes)
982                .map_err(ProcessError::Compression)?;
983            if !d_cache.is_finite() || !d_guess.is_finite() {
984                continue;
985            }
986            let score = d_cache - d_guess;
987            if score > best_score {
988                best_score = score;
989                best_message = Some(candidate_message.clone());
990            }
991        }
992
993        Ok(best_message)
994    }
995
996    fn build_routing_forwards(
997        &mut self,
998        incoming: &CmrMessage,
999        decision: RoutingDecision,
1000        now: &CmrTimestamp,
1001    ) -> Vec<ForwardAction> {
1002        let mut out = Vec::new();
1003        let mut dedupe = HashSet::<(String, String)>::new();
1004        let incoming_key = cache_key(incoming);
1005        let incoming_destinations = sorted_unique_addresses(&incoming.header);
1006        let suppress_best = decision
1007            .best_peer
1008            .as_deref()
1009            .filter(|peer| message_contains_sender(incoming, peer));
1010
1011        if let Some((destination, message)) = decision.compensatory.clone() {
1012            let dedupe_key = (destination.clone(), cache_key(&message));
1013            if !dedupe.contains(&dedupe_key) {
1014                let actions = self.forward_with_optional_key_exchange(
1015                    &message,
1016                    &destination,
1017                    now,
1018                    ForwardReason::CompensatoryReply,
1019                );
1020                if !actions.is_empty() {
1021                    dedupe.insert(dedupe_key);
1022                    out.extend(actions);
1023                }
1024            }
1025        }
1026
1027        for matched in &decision.matched_messages {
1028            for destination in sorted_unique_addresses(&matched.header) {
1029                if destination == self.local_address {
1030                    continue;
1031                }
1032                if suppress_best.is_some_and(|peer| peer == destination) {
1033                    continue;
1034                }
1035                let dedupe_key = (destination.clone(), incoming_key.clone());
1036                if dedupe.contains(&dedupe_key) {
1037                    continue;
1038                }
1039                let actions = self.forward_with_optional_key_exchange(
1040                    incoming,
1041                    &destination,
1042                    now,
1043                    ForwardReason::MatchedForwardIncoming,
1044                );
1045                if !actions.is_empty() {
1046                    dedupe.insert(dedupe_key);
1047                    out.extend(actions);
1048                }
1049            }
1050
1051            let matched_key = cache_key(matched);
1052            for destination in &incoming_destinations {
1053                if destination == &self.local_address {
1054                    continue;
1055                }
1056                let dedupe_key = (destination.clone(), matched_key.clone());
1057                if dedupe.contains(&dedupe_key) {
1058                    continue;
1059                }
1060                let actions = self.forward_with_optional_key_exchange(
1061                    matched,
1062                    destination,
1063                    now,
1064                    ForwardReason::MatchedForwardCached,
1065                );
1066                if !actions.is_empty() {
1067                    dedupe.insert(dedupe_key);
1068                    out.extend(actions);
1069                }
1070            }
1071        }
1072
1073        out
1074    }
1075
1076    fn forward_with_optional_key_exchange(
1077        &mut self,
1078        message: &CmrMessage,
1079        destination: &str,
1080        now: &CmrTimestamp,
1081        reason: ForwardReason,
1082    ) -> Vec<ForwardAction> {
1083        if destination == self.local_address
1084            || !self.can_forward_to_peer(destination)
1085            || message_contains_sender(message, destination)
1086        {
1087            return Vec::new();
1088        }
1089
1090        let mut out = Vec::with_capacity(2);
1091        out.push(self.wrap_and_forward(message, destination, now, reason));
1092        if self.shared_keys.contains_key(destination)
1093            || self.pending_rsa.contains_key(destination)
1094            || self.pending_dh.contains_key(destination)
1095        {
1096            return out;
1097        }
1098
1099        if let Some(initiation) = self.build_key_exchange_initiation(destination, now) {
1100            out.push(initiation);
1101        }
1102        out
1103    }
1104
1105    fn build_key_exchange_initiation(
1106        &mut self,
1107        destination: &str,
1108        now: &CmrTimestamp,
1109    ) -> Option<ForwardAction> {
1110        match self.policy.trust.auto_key_exchange_mode {
1111            AutoKeyExchangeMode::Rsa => self
1112                .build_rsa_initiation(destination, now)
1113                .or_else(|| self.build_dh_initiation(destination, now)),
1114            AutoKeyExchangeMode::Dh => self
1115                .build_dh_initiation(destination, now)
1116                .or_else(|| self.build_rsa_initiation(destination, now)),
1117        }
1118    }
1119
1120    fn build_rsa_initiation(
1121        &mut self,
1122        destination: &str,
1123        now: &CmrTimestamp,
1124    ) -> Option<ForwardAction> {
1125        let e = BigUint::from(65_537_u32);
1126        let bits_each = usize::try_from(MIN_RSA_MODULUS_BITS / 2).ok()?;
1127        let mut generated = None;
1128        for _ in 0..8 {
1129            let p = generate_probable_prime(bits_each, 12)?;
1130            let mut q = generate_probable_prime(bits_each, 12)?;
1131            if q == p {
1132                q = generate_probable_prime(bits_each, 12)?;
1133            }
1134            if q == p {
1135                continue;
1136            }
1137
1138            let n = &p * &q;
1139            if n.bits() < MIN_RSA_MODULUS_BITS {
1140                continue;
1141            }
1142            let p1 = &p - BigUint::one();
1143            let q1 = &q - BigUint::one();
1144            let lambda = lcm_biguint(&p1, &q1);
1145            if gcd_biguint(&e, &lambda) != BigUint::one() {
1146                continue;
1147            }
1148            let Some(d) = mod_inverse_biguint(&e, &lambda) else {
1149                continue;
1150            };
1151            generated = Some((n, d));
1152            break;
1153        }
1154        let (n, d) = generated?;
1155        self.pending_rsa
1156            .insert(destination.to_owned(), PendingRsaState { n: n.clone(), d });
1157
1158        let body = KeyExchangeMessage::RsaRequest { n, e }
1159            .render()
1160            .into_bytes();
1161        let msg = CmrMessage {
1162            signature: Signature::Unsigned,
1163            header: vec![MessageId {
1164                timestamp: self.next_forward_timestamp(now),
1165                address: self.local_address.clone(),
1166            }],
1167            body,
1168        };
1169        self.cache.insert(msg.clone());
1170        Some(ForwardAction {
1171            destination: destination.to_owned(),
1172            message_bytes: msg.to_bytes(),
1173            reason: ForwardReason::KeyExchangeInitiation,
1174        })
1175    }
1176
1177    fn build_dh_initiation(
1178        &mut self,
1179        destination: &str,
1180        now: &CmrTimestamp,
1181    ) -> Option<ForwardAction> {
1182        let bits = usize::try_from(MIN_DH_MODULUS_BITS).ok()?;
1183        let p = generate_probable_safe_prime(bits, 10)?;
1184        let g = find_primitive_root_for_safe_prime(&p)?;
1185        let a_secret = random_dh_secret(&p)?;
1186        let a_pub = mod_pow(&g, &a_secret, &p);
1187        self.pending_dh.insert(
1188            destination.to_owned(),
1189            PendingDhState {
1190                p: p.clone(),
1191                a_secret,
1192            },
1193        );
1194
1195        let body = KeyExchangeMessage::DhRequest { g, p, a_pub }
1196            .render()
1197            .into_bytes();
1198        let msg = CmrMessage {
1199            signature: Signature::Unsigned,
1200            header: vec![MessageId {
1201                timestamp: self.next_forward_timestamp(now),
1202                address: self.local_address.clone(),
1203            }],
1204            body,
1205        };
1206        self.cache.insert(msg.clone());
1207        Some(ForwardAction {
1208            destination: destination.to_owned(),
1209            message_bytes: msg.to_bytes(),
1210            reason: ForwardReason::KeyExchangeInitiation,
1211        })
1212    }
1213
1214    fn wrap_and_forward(
1215        &mut self,
1216        message: &CmrMessage,
1217        destination: &str,
1218        now: &CmrTimestamp,
1219        reason: ForwardReason,
1220    ) -> ForwardAction {
1221        let mut forwarded = message.clone();
1222        forwarded.make_unsigned();
1223        forwarded.prepend_hop(MessageId {
1224            timestamp: self.next_forward_timestamp(now),
1225            address: self.local_address.clone(),
1226        });
1227        if let Some(key) = self.shared_keys.get(destination) {
1228            forwarded.sign_with_key(key);
1229        }
1230        ForwardAction {
1231            destination: destination.to_owned(),
1232            message_bytes: forwarded.to_bytes(),
1233            reason,
1234        }
1235    }
1236
1237    fn next_forward_timestamp(&mut self, now: &CmrTimestamp) -> CmrTimestamp {
1238        self.forward_counter = self.forward_counter.saturating_add(1);
1239        let fraction = format!("{:020}", self.forward_counter);
1240        now.clone().with_fraction(fraction)
1241    }
1242}
1243
1244fn cache_key(message: &CmrMessage) -> String {
1245    message
1246        .origin_id()
1247        .map_or_else(|| message.header[0].to_string(), MessageId::to_string)
1248}
1249
1250fn message_contains_sender(message: &CmrMessage, sender: &str) -> bool {
1251    message.header.iter().any(|id| id.address == sender)
1252}
1253
1254fn sorted_unique_addresses(header: &[MessageId]) -> Vec<String> {
1255    let mut addresses = header
1256        .iter()
1257        .map(|id| id.address.clone())
1258        .collect::<Vec<_>>();
1259    addresses.sort();
1260    addresses.dedup();
1261    addresses
1262}
1263
1264fn is_probably_binary(body: &[u8]) -> bool {
1265    if body.is_empty() {
1266        return false;
1267    }
1268    let non_text = body
1269        .iter()
1270        .copied()
1271        .filter(|b| !matches!(b, 0x09 | 0x0A | 0x0D | 0x20..=0x7E))
1272        .count();
1273    non_text * 10 > body.len() * 3
1274}
1275
1276fn looks_like_executable(body: &[u8]) -> bool {
1277    body.starts_with(b"\x7fELF")
1278        || body.starts_with(b"MZ")
1279        || body.starts_with(b"\xfe\xed\xfa\xce")
1280        || body.starts_with(b"\xce\xfa\xed\xfe")
1281        || body.starts_with(b"\xcf\xfa\xed\xfe")
1282        || body.starts_with(b"\xfe\xed\xfa\xcf")
1283}
1284
1285fn validate_rsa_request_params(n: &BigUint, e: &BigUint) -> Result<(), ProcessError> {
1286    if n.bits() < MIN_RSA_MODULUS_BITS {
1287        return Err(ProcessError::WeakKeyExchangeParameters(
1288            "RSA modulus too small",
1289        ));
1290    }
1291    let two = BigUint::from(2_u8);
1292    if n <= &two || (n % &two).is_zero() {
1293        return Err(ProcessError::WeakKeyExchangeParameters(
1294            "RSA modulus must be odd and > 2",
1295        ));
1296    }
1297    if e <= &two || (e % &two).is_zero() {
1298        return Err(ProcessError::WeakKeyExchangeParameters(
1299            "RSA exponent must be odd and > 2",
1300        ));
1301    }
1302    if e >= n {
1303        return Err(ProcessError::WeakKeyExchangeParameters(
1304            "RSA exponent must be smaller than modulus",
1305        ));
1306    }
1307    if is_probably_prime(n, 10) {
1308        return Err(ProcessError::WeakKeyExchangeParameters(
1309            "RSA modulus must be composite",
1310        ));
1311    }
1312    Ok(())
1313}
1314
1315fn validate_dh_request_params(
1316    g: &BigUint,
1317    p: &BigUint,
1318    a_pub: &BigUint,
1319) -> Result<(), ProcessError> {
1320    if p.bits() < MIN_DH_MODULUS_BITS {
1321        return Err(ProcessError::WeakKeyExchangeParameters(
1322            "DH modulus too small",
1323        ));
1324    }
1325    let two = BigUint::from(2_u8);
1326    if p <= &two || (p % &two).is_zero() {
1327        return Err(ProcessError::WeakKeyExchangeParameters(
1328            "DH modulus must be odd and > 2",
1329        ));
1330    }
1331    if !is_probably_safe_prime(p, 10) {
1332        return Err(ProcessError::WeakKeyExchangeParameters(
1333            "DH modulus must be a safe prime",
1334        ));
1335    }
1336
1337    let p_minus_one = p - BigUint::one();
1338    if g <= &BigUint::one() || g >= &p_minus_one {
1339        return Err(ProcessError::WeakKeyExchangeParameters(
1340            "DH generator must be in range (1, p-1)",
1341        ));
1342    }
1343    if a_pub <= &BigUint::one() || a_pub >= &p_minus_one {
1344        return Err(ProcessError::WeakKeyExchangeParameters(
1345            "DH public value must be in range (1, p-1)",
1346        ));
1347    }
1348    if !is_primitive_root_for_safe_prime(g, p) {
1349        return Err(ProcessError::WeakKeyExchangeParameters(
1350            "DH generator must be a primitive root of p",
1351        ));
1352    }
1353    Ok(())
1354}
1355
1356fn validate_dh_reply_params(b_pub: &BigUint, p: &BigUint) -> Result<(), ProcessError> {
1357    if !is_probably_safe_prime(p, 10) {
1358        return Err(ProcessError::WeakKeyExchangeParameters(
1359            "DH modulus must be a safe prime",
1360        ));
1361    }
1362    let p_minus_one = p - BigUint::one();
1363    if b_pub <= &BigUint::one() || b_pub >= &p_minus_one {
1364        return Err(ProcessError::WeakKeyExchangeParameters(
1365            "DH reply value must be in range (1, p-1)",
1366        ));
1367    }
1368    Ok(())
1369}
1370
1371fn is_primitive_root_for_safe_prime(g: &BigUint, p: &BigUint) -> bool {
1372    if p <= &BigUint::from(3_u8) {
1373        return false;
1374    }
1375    let p_minus_one = p - BigUint::one();
1376    if g <= &BigUint::one() || g >= &p_minus_one {
1377        return false;
1378    }
1379    // For safe prime p = 2q+1, primitive root criterion:
1380    // g^2 != 1 (mod p) and g^q != 1 (mod p), where q=(p-1)/2.
1381    let q = &p_minus_one >> 1usize;
1382    let one = BigUint::one();
1383    let two = BigUint::from(2_u8);
1384    mod_pow(g, &two, p) != one && mod_pow(g, &q, p) != one
1385}
1386
1387fn find_primitive_root_for_safe_prime(p: &BigUint) -> Option<BigUint> {
1388    for candidate in 2_u32..=65_537_u32 {
1389        let g = BigUint::from(candidate);
1390        if is_primitive_root_for_safe_prime(&g, p) {
1391            return Some(g);
1392        }
1393    }
1394    None
1395}
1396
1397fn random_nonzero_biguint_below(modulus: &BigUint) -> Option<BigUint> {
1398    let modulus_bits = usize::try_from(modulus.bits()).ok()?;
1399    if modulus_bits == 0 {
1400        return None;
1401    }
1402    let byte_len = modulus_bits.div_ceil(8);
1403    let excess_bits = byte_len.saturating_mul(8).saturating_sub(modulus_bits);
1404    let mut rng = rand::rng();
1405    let mut raw = vec![0_u8; byte_len];
1406    for _ in 0..256 {
1407        rng.fill_bytes(&mut raw);
1408        if excess_bits > 0 {
1409            raw[0] &= 0xff_u8 >> excess_bits;
1410        }
1411        let value = BigUint::from_bytes_be(&raw);
1412        if !value.is_zero() && &value < modulus {
1413            return Some(value);
1414        }
1415    }
1416    None
1417}
1418
1419fn random_dh_secret(p: &BigUint) -> Option<BigUint> {
1420    if p <= &BigUint::one() {
1421        return None;
1422    }
1423    let upper_bound = p - BigUint::one();
1424    for _ in 0..256 {
1425        let candidate = random_nonzero_biguint_below(&upper_bound)?;
1426        if candidate > BigUint::one() {
1427            return Some(candidate);
1428        }
1429    }
1430    None
1431}
1432
1433fn generate_probable_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1434    if bits < 2 {
1435        return None;
1436    }
1437    for _ in 0..4096 {
1438        let candidate = random_odd_biguint_with_bits(bits)?;
1439        if is_probably_prime(&candidate, rounds) {
1440            return Some(candidate);
1441        }
1442    }
1443    None
1444}
1445
1446fn generate_probable_safe_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1447    if bits < 3 {
1448        return None;
1449    }
1450    for _ in 0..256 {
1451        let q = generate_probable_prime(bits.saturating_sub(1), rounds)?;
1452        let p: BigUint = (&q << 1usize) + BigUint::one();
1453        if p.bits() >= u64::try_from(bits).ok()? && is_probably_prime(&p, rounds) {
1454            return Some(p);
1455        }
1456    }
1457    None
1458}
1459
1460fn random_odd_biguint_with_bits(bits: usize) -> Option<BigUint> {
1461    if bits < 2 {
1462        return None;
1463    }
1464    let byte_len = bits.div_ceil(8);
1465    let excess_bits = byte_len.saturating_mul(8).saturating_sub(bits);
1466    let mut bytes = vec![0_u8; byte_len];
1467    rand::rng().fill_bytes(&mut bytes);
1468    if excess_bits > 0 {
1469        bytes[0] &= 0xff_u8 >> excess_bits;
1470    }
1471    let top_bit = 7_u8.saturating_sub(u8::try_from(excess_bits).ok()?);
1472    bytes[0] |= 1_u8 << top_bit;
1473    bytes[byte_len.saturating_sub(1)] |= 1;
1474    Some(BigUint::from_bytes_be(&bytes))
1475}
1476
1477fn gcd_biguint(left: &BigUint, right: &BigUint) -> BigUint {
1478    let mut a = left.clone();
1479    let mut b = right.clone();
1480    while !b.is_zero() {
1481        let r = &a % &b;
1482        a = b;
1483        b = r;
1484    }
1485    a
1486}
1487
1488fn lcm_biguint(left: &BigUint, right: &BigUint) -> BigUint {
1489    if left.is_zero() || right.is_zero() {
1490        return BigUint::zero();
1491    }
1492    (left / gcd_biguint(left, right)) * right
1493}
1494
1495fn mod_inverse_biguint(value: &BigUint, modulus: &BigUint) -> Option<BigUint> {
1496    let a = value.to_bigint()?;
1497    let m = modulus.to_bigint()?;
1498    let (g, x, _) = extended_gcd_bigint(a, m.clone());
1499    if g != BigInt::one() {
1500        return None;
1501    }
1502    let mut reduced = x % &m;
1503    if reduced < BigInt::zero() {
1504        reduced += &m;
1505    }
1506    reduced.try_into().ok()
1507}
1508
1509fn extended_gcd_bigint(a: BigInt, b: BigInt) -> (BigInt, BigInt, BigInt) {
1510    let mut old_r = a;
1511    let mut r = b;
1512    let mut old_s = BigInt::one();
1513    let mut s = BigInt::zero();
1514    let mut old_t = BigInt::zero();
1515    let mut t = BigInt::one();
1516
1517    while r != BigInt::zero() {
1518        let q = &old_r / &r;
1519
1520        let new_r = &old_r - &q * &r;
1521        old_r = r;
1522        r = new_r;
1523
1524        let new_s = &old_s - &q * &s;
1525        old_s = s;
1526        s = new_s;
1527
1528        let new_t = &old_t - &q * &t;
1529        old_t = t;
1530        t = new_t;
1531    }
1532    (old_r, old_s, old_t)
1533}
1534
1535fn derive_exchange_key(local: &str, peer: &str, label: &[u8], secret: &BigUint) -> Vec<u8> {
1536    let mut ikm = secret.to_bytes_be();
1537    if ikm.is_empty() {
1538        ikm.push(0);
1539    }
1540    derive_exchange_key_from_bytes(local, peer, label, &ikm)
1541}
1542
1543fn derive_exchange_key_from_bytes(local: &str, peer: &str, label: &[u8], secret: &[u8]) -> Vec<u8> {
1544    let (left, right) = if local <= peer {
1545        (local.as_bytes(), peer.as_bytes())
1546    } else {
1547        (peer.as_bytes(), local.as_bytes())
1548    };
1549
1550    let hk = Hkdf::<Sha256>::new(Some(b"cmr-v1-key-exchange"), secret);
1551    let mut info = Vec::with_capacity(3 + label.len() + left.len() + right.len());
1552    info.extend_from_slice(b"cmr");
1553    info.push(0);
1554    info.extend_from_slice(label);
1555    info.push(0);
1556    info.extend_from_slice(left);
1557    info.push(0);
1558    info.extend_from_slice(right);
1559
1560    let mut out = [0_u8; 32];
1561    hk.expand(&info, &mut out)
1562        .expect("HKDF expand length is fixed and valid");
1563    out.to_vec()
1564}
1565
1566fn is_probably_safe_prime(p: &BigUint, rounds: usize) -> bool {
1567    if !is_probably_prime(p, rounds) {
1568        return false;
1569    }
1570    let one = BigUint::one();
1571    let two = BigUint::from(2_u8);
1572    if p <= &two {
1573        return false;
1574    }
1575    let q = (p - &one) >> 1;
1576    is_probably_prime(&q, rounds)
1577}
1578
1579fn is_probably_prime(n: &BigUint, rounds: usize) -> bool {
1580    let two = BigUint::from(2_u8);
1581    let three = BigUint::from(3_u8);
1582    if n < &two {
1583        return false;
1584    }
1585    if n == &two || n == &three {
1586        return true;
1587    }
1588    if (n % &two).is_zero() {
1589        return false;
1590    }
1591
1592    let one = BigUint::one();
1593    let n_minus_one = n - &one;
1594    let mut d = n_minus_one.clone();
1595    let mut s = 0_u32;
1596    while (&d % &two).is_zero() {
1597        d >>= 1;
1598        s = s.saturating_add(1);
1599    }
1600
1601    const BASES: [u8; 12] = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37];
1602    for &base in &BASES {
1603        let a = BigUint::from(base);
1604        if a >= n_minus_one {
1605            continue;
1606        }
1607        if is_miller_rabin_witness(n, &d, s, &a) {
1608            return false;
1609        }
1610    }
1611
1612    let three = BigUint::from(3_u8);
1613    let n_minus_three = n - &three;
1614    for _ in 0..rounds {
1615        let Some(offset) = random_nonzero_biguint_below(&n_minus_three) else {
1616            return false;
1617        };
1618        let a = offset + &two;
1619        if is_miller_rabin_witness(n, &d, s, &a) {
1620            return false;
1621        }
1622    }
1623
1624    true
1625}
1626
1627fn is_miller_rabin_witness(n: &BigUint, d: &BigUint, s: u32, a: &BigUint) -> bool {
1628    let one = BigUint::one();
1629    let n_minus_one = n - &one;
1630    let mut x = mod_pow(a, d, n);
1631    if x == one || x == n_minus_one {
1632        return false;
1633    }
1634    for _ in 1..s {
1635        x = (&x * &x) % n;
1636        if x == n_minus_one {
1637            return false;
1638        }
1639    }
1640    true
1641}
1642
1643#[cfg(test)]
1644mod tests {
1645    use super::*;
1646
1647    struct StubOracle;
1648
1649    impl CompressionOracle for StubOracle {
1650        fn ncd_sym(&self, _left: &[u8], _right: &[u8]) -> Result<f64, CompressionError> {
1651            Ok(0.4)
1652        }
1653
1654        fn compression_distance(
1655            &self,
1656            _left: &[u8],
1657            _right: &[u8],
1658        ) -> Result<f64, CompressionError> {
1659            Ok(0.4)
1660        }
1661
1662        fn intrinsic_dependence(
1663            &self,
1664            _data: &[u8],
1665            _max_order: i64,
1666        ) -> Result<f64, CompressionError> {
1667            Ok(0.5)
1668        }
1669    }
1670
1671    fn now() -> CmrTimestamp {
1672        CmrTimestamp::parse("2030/01/01 00:00:10").expect("ts")
1673    }
1674
1675    #[test]
1676    fn accepts_minimal_message() {
1677        let policy = RoutingPolicy::default();
1678        let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
1679        let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
1680        let outcome = router.process_incoming(raw, TransportKind::Http, now());
1681        assert!(outcome.accepted);
1682        assert!(outcome.drop_reason.is_none());
1683    }
1684
1685    #[test]
1686    fn rejects_duplicate_id() {
1687        let policy = RoutingPolicy::default();
1688        let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
1689        let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
1690        let first = router.process_incoming(raw, TransportKind::Http, now());
1691        assert!(first.accepted);
1692        let second = router.process_incoming(raw, TransportKind::Http, now());
1693        assert!(!second.accepted);
1694        assert!(matches!(
1695            second.drop_reason,
1696            Some(ProcessError::DuplicateMessageId)
1697        ));
1698    }
1699
1700    #[test]
1701    fn cache_inserts_messages_in_unsigned_canonical_form() {
1702        let mut cache = MessageCache::new(16, 1024 * 1024);
1703        let mut message = CmrMessage {
1704            signature: Signature::Unsigned,
1705            header: vec![MessageId {
1706                timestamp: CmrTimestamp::parse("2029/12/31 23:59:59").expect("timestamp"),
1707                address: "http://alice".to_owned(),
1708            }],
1709            body: b"payload".to_vec(),
1710        };
1711        message.sign_with_key(b"shared-key");
1712        assert!(matches!(message.signature, Signature::Sha256(_)));
1713
1714        let key = cache_key(&message);
1715        cache.insert(message);
1716        let stored = cache.entries.get(&key).expect("cached entry");
1717        assert!(matches!(stored.message.signature, Signature::Unsigned));
1718        assert!(stored.message.to_bytes().starts_with(b"0\r\n"));
1719    }
1720}