Skip to main content

cmr_core/
router.rs

1//! Router core: validation, security policy, and spec-driven forwarding.
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::time::{Duration, Instant};
5
6use hkdf::Hkdf;
7use num_bigint::{BigInt, BigUint, ToBigInt};
8use num_traits::{One, Zero};
9use rand::RngCore;
10use serde::Serialize;
11use sha2::Sha256;
12use thiserror::Error;
13use url::Url;
14
15use crate::key_exchange::{KeyExchangeError, KeyExchangeMessage, mod_pow, parse_key_exchange};
16use crate::policy::{AutoKeyExchangeMode, RoutingPolicy};
17use crate::protocol::{
18    CmrMessage, CmrTimestamp, MessageId, ParseContext, ParseError, Signature, TransportKind,
19    parse_message,
20};
21
22/// Compression-oracle failures.
23#[derive(Debug, Error)]
24pub enum CompressionError {
25    /// Backing compressor unavailable.
26    #[error("compressor unavailable: {0}")]
27    Unavailable(String),
28    /// Backing compressor returned an error.
29    #[error("compressor failure: {0}")]
30    Failed(String),
31}
32
33/// Compression capability (intentionally abstracted from router process).
34pub trait CompressionOracle: Send + Sync {
35    /// CMR Section 3.2 compression distance from spec:
36    /// `C(XY)-C(X) + C(YX)-C(Y)`.
37    fn compression_distance(&self, left: &[u8], right: &[u8]) -> Result<f64, CompressionError>;
38    /// CMR Section 3.2 compression distance where each side can be provided
39    /// as a sequence of chunks without building contiguous buffers.
40    fn compression_distance_chain(
41        &self,
42        left_parts: &[&[u8]],
43        right_parts: &[&[u8]],
44    ) -> Result<f64, CompressionError> {
45        fn join(parts: &[&[u8]]) -> Vec<u8> {
46            let total = parts.iter().map(|p| p.len()).sum::<usize>();
47            let mut out = Vec::with_capacity(total);
48            for part in parts {
49                out.extend_from_slice(part);
50            }
51            out
52        }
53        let left = join(left_parts);
54        let right = join(right_parts);
55        self.compression_distance(&left, &right)
56    }
57    /// Intrinsic dependence.
58    fn intrinsic_dependence(&self, data: &[u8], max_order: i64) -> Result<f64, CompressionError>;
59    /// Batch CMR distance, defaulting to repeated scalar calls.
60    fn batch_compression_distance(
61        &self,
62        target: &[u8],
63        candidates: &[Vec<u8>],
64    ) -> Result<Vec<f64>, CompressionError> {
65        let mut out = Vec::with_capacity(candidates.len());
66        for candidate in candidates {
67            out.push(self.compression_distance(target, candidate)?);
68        }
69        Ok(out)
70    }
71}
72
73#[derive(Clone, Debug)]
74struct CacheEntry {
75    key: String,
76    message: CmrMessage,
77    encoded_size: usize,
78}
79
80#[derive(Debug)]
81struct MessageCache {
82    entries: HashMap<String, CacheEntry>,
83    order: VecDeque<String>,
84    id_counts: HashMap<String, usize>,
85    total_bytes: usize,
86    max_messages: usize,
87    max_bytes: usize,
88    total_evictions: u64,
89}
90
91impl MessageCache {
92    fn new(max_messages: usize, max_bytes: usize) -> Self {
93        Self {
94            entries: HashMap::new(),
95            order: VecDeque::new(),
96            id_counts: HashMap::new(),
97            total_bytes: 0,
98            max_messages,
99            max_bytes,
100            total_evictions: 0,
101        }
102    }
103
104    fn has_seen_any_id(&self, message: &CmrMessage) -> bool {
105        message
106            .header
107            .iter()
108            .any(|id| self.id_counts.contains_key(&id.to_string()))
109    }
110
111    fn insert(&mut self, mut message: CmrMessage) {
112        // Cache canonical form without per-hop signature bytes.
113        message.make_unsigned();
114        let key = cache_key(&message);
115        if self.entries.contains_key(&key) {
116            return;
117        }
118        let encoded_size = message.encoded_len();
119        let entry = CacheEntry {
120            key: key.clone(),
121            message,
122            encoded_size,
123        };
124        self.total_bytes = self.total_bytes.saturating_add(encoded_size);
125        self.order.push_back(key.clone());
126        self.add_message_ids(&entry.message);
127        self.entries.insert(key, entry);
128        self.evict_as_needed();
129    }
130
131    fn evict_as_needed(&mut self) {
132        while self.entries.len() > self.max_messages || self.total_bytes > self.max_bytes {
133            let Some(oldest_key) = self.order.pop_front() else {
134                break;
135            };
136            if let Some(entry) = self.entries.remove(&oldest_key) {
137                self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
138                self.remove_message_ids(&entry.message);
139                self.total_evictions = self.total_evictions.saturating_add(1);
140            }
141        }
142    }
143
144    fn add_message_ids(&mut self, message: &CmrMessage) {
145        for id in &message.header {
146            *self.id_counts.entry(id.to_string()).or_default() += 1;
147        }
148    }
149
150    fn remove_message_ids(&mut self, message: &CmrMessage) {
151        for id in &message.header {
152            let id_key = id.to_string();
153            let mut remove = false;
154            if let Some(count) = self.id_counts.get_mut(&id_key) {
155                if *count > 1 {
156                    *count -= 1;
157                } else {
158                    remove = true;
159                }
160            }
161            if remove {
162                self.id_counts.remove(&id_key);
163            }
164        }
165    }
166
167    fn remove_by_key(&mut self, key: &str) -> Option<CacheEntry> {
168        let entry = self.entries.remove(key)?;
169        self.total_bytes = self.total_bytes.saturating_sub(entry.encoded_size);
170        self.remove_message_ids(&entry.message);
171        self.order.retain(|existing| existing != key);
172        Some(entry)
173    }
174
175    fn remove_if(&mut self, mut predicate: impl FnMut(&CmrMessage) -> bool) {
176        let to_remove = self
177            .order
178            .iter()
179            .filter_map(|key| {
180                self.entries
181                    .get(key)
182                    .filter(|entry| predicate(&entry.message))
183                    .map(|_| key.clone())
184            })
185            .collect::<Vec<_>>();
186        for key in to_remove {
187            let _ = self.remove_by_key(&key);
188        }
189    }
190}
191
192/// Cache-level observability counters.
193#[derive(Clone, Debug, Serialize)]
194pub struct CacheStats {
195    /// Number of cache entries.
196    pub entry_count: usize,
197    /// Sum of encoded bytes currently in cache.
198    pub total_bytes: usize,
199    /// Configured maximum entries.
200    pub max_messages: usize,
201    /// Configured maximum cache bytes.
202    pub max_bytes: usize,
203    /// Number of evictions performed.
204    pub total_evictions: u64,
205}
206
207/// Read-only cache entry projection for dashboards and APIs.
208#[derive(Clone, Debug, Serialize)]
209pub struct CacheEntryView {
210    /// Stable cache key.
211    pub key: String,
212    /// Encoded message size.
213    pub encoded_size: usize,
214    /// Immediate sender address.
215    pub sender: String,
216    /// Origin timestamp text when available.
217    pub timestamp: String,
218    /// Short body preview, UTF-8 lossy.
219    pub body_preview: String,
220}
221
222#[derive(Clone, Debug)]
223struct PeerMetrics {
224    reputation: f64,
225    inbound_messages: u64,
226    inbound_bytes: u64,
227    outbound_messages: u64,
228    outbound_bytes: u64,
229    window: RateWindow,
230}
231
232/// Stable per-peer metrics projection.
233#[derive(Clone, Debug, Serialize)]
234pub struct PeerSnapshot {
235    /// Peer address.
236    pub peer: String,
237    /// Reputation score.
238    pub reputation: f64,
239    /// Inbound messages observed.
240    pub inbound_messages: u64,
241    /// Inbound bytes observed.
242    pub inbound_bytes: u64,
243    /// Outbound messages sent.
244    pub outbound_messages: u64,
245    /// Outbound bytes sent.
246    pub outbound_bytes: u64,
247    /// Sliding window message count.
248    pub current_window_messages: usize,
249    /// Sliding window bytes.
250    pub current_window_bytes: u64,
251    /// Whether a shared key is currently known for this peer.
252    pub has_shared_key: bool,
253    /// Whether key-exchange initiator state is pending for this peer.
254    pub pending_key_exchange: bool,
255}
256
257impl Default for PeerMetrics {
258    fn default() -> Self {
259        Self {
260            reputation: 0.0,
261            inbound_messages: 0,
262            inbound_bytes: 0,
263            outbound_messages: 0,
264            outbound_bytes: 0,
265            window: RateWindow::new(),
266        }
267    }
268}
269
270#[derive(Clone, Debug)]
271struct RateWindow {
272    window: VecDeque<(Instant, u64)>,
273    bytes: u64,
274}
275
276impl RateWindow {
277    fn new() -> Self {
278        Self {
279            window: VecDeque::new(),
280            bytes: 0,
281        }
282    }
283
284    fn allow_and_record(
285        &mut self,
286        message_bytes: usize,
287        max_messages_per_minute: u32,
288        max_bytes_per_minute: u64,
289    ) -> bool {
290        let now = Instant::now();
291        let cutoff = Duration::from_secs(60);
292        while let Some((ts, bytes)) = self.window.front().copied() {
293            if now.duration_since(ts) < cutoff {
294                break;
295            }
296            self.window.pop_front();
297            self.bytes = self.bytes.saturating_sub(bytes);
298        }
299        let next_messages = self.window.len().saturating_add(1);
300        let next_bytes = self
301            .bytes
302            .saturating_add(u64::try_from(message_bytes).unwrap_or(u64::MAX));
303        if next_messages > usize::try_from(max_messages_per_minute).unwrap_or(usize::MAX)
304            || next_bytes > max_bytes_per_minute
305        {
306            return false;
307        }
308        self.window
309            .push_back((now, u64::try_from(message_bytes).unwrap_or(u64::MAX)));
310        self.bytes = next_bytes;
311        true
312    }
313
314    fn current_messages(&self) -> usize {
315        self.window.len()
316    }
317
318    fn current_bytes(&self) -> u64 {
319        self.bytes
320    }
321}
322
323#[derive(Clone, Debug)]
324struct PendingRsaState {
325    n: BigUint,
326    d: BigUint,
327}
328
329#[derive(Clone, Debug)]
330struct PendingDhState {
331    p: BigUint,
332    a_secret: BigUint,
333}
334
335const MIN_RSA_MODULUS_BITS: u64 = 2048;
336const MIN_DH_MODULUS_BITS: u64 = 2048;
337
338/// Forward reason.
339#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
340pub enum ForwardReason {
341    /// Forwarded incoming message `X` using matched message header addresses.
342    MatchedForwardIncoming,
343    /// Forwarded matched cached message `Y` using incoming header addresses.
344    MatchedForwardCached,
345    /// Compensatory response chosen when best peer already sent X.
346    CompensatoryReply,
347    /// Key-exchange protocol initiation.
348    KeyExchangeInitiation,
349    /// Key-exchange protocol reply.
350    KeyExchangeReply,
351}
352
353#[derive(Clone, Debug, Default)]
354struct RoutingDecision {
355    best_peer: Option<String>,
356    best_distance_raw: Option<f64>,
357    best_distance_normalized: Option<f64>,
358    threshold_raw: f64,
359    matched_messages: Vec<CmrMessage>,
360    compensatory: Option<(String, CmrMessage)>,
361}
362
363#[derive(Clone, Debug, Default)]
364struct RoutingActions {
365    forwards: Vec<ForwardAction>,
366    client_plans: Vec<ClientMessagePlan>,
367}
368
369/// Prepared outbound action.
370#[derive(Clone, Debug)]
371pub struct ForwardAction {
372    /// Recipient peer address.
373    pub destination: String,
374    /// Wire bytes.
375    pub message_bytes: Vec<u8>,
376    /// Reason for forwarding.
377    pub reason: ForwardReason,
378}
379
380/// Client-side message creation plan emitted by router control logic.
381#[derive(Clone, Debug)]
382pub struct ClientMessagePlan {
383    /// Recipient peer address.
384    pub destination: String,
385    /// Message body to send in a fresh client-originated CMR message.
386    pub body: Vec<u8>,
387    /// Optional key for signing the created message.
388    pub signing_key: Option<Vec<u8>>,
389    /// Reason for message creation.
390    pub reason: ForwardReason,
391}
392
393#[derive(Clone, Debug, Default)]
394struct KeyExchangeControlOutcome {
395    forwards: Vec<ForwardAction>,
396    client_plans: Vec<ClientMessagePlan>,
397}
398
399/// Processing rejection reason.
400#[derive(Debug, Error)]
401pub enum ProcessError {
402    /// Parse failure.
403    #[error("parse error: {0}")]
404    Parse(#[from] ParseError),
405    /// Message duplicates cache by ID.
406    #[error("duplicate message id in cache")]
407    DuplicateMessageId,
408    /// Peer throttled by anti-flood controls.
409    #[error("peer exceeded flood limits")]
410    FloodLimited,
411    /// Global throttling triggered.
412    #[error("global flood limits exceeded")]
413    GlobalFloodLimited,
414    /// Peer reputation below threshold.
415    #[error("peer reputation below threshold")]
416    ReputationTooLow,
417    /// Unsigned message violates policy.
418    #[error("unsigned message violates trust policy")]
419    UnsignedRejected,
420    /// Signature cannot be verified.
421    #[error("signature verification failed")]
422    BadSignature,
423    /// Signed message from unknown key violates policy.
424    #[error("signed message without known key rejected")]
425    SignedWithoutKnownKey,
426    /// Message body exceeds policy.
427    #[error("message body exceeds content policy")]
428    BodyTooLarge,
429    /// Binary content blocked.
430    #[error("binary content blocked by policy")]
431    BinaryContentBlocked,
432    /// Executable payload blocked.
433    #[error("executable payload blocked by policy")]
434    ExecutableBlocked,
435    /// Intrinsic-dependence spam check failed.
436    #[error("message failed intrinsic dependence spam check")]
437    IntrinsicDependenceTooLow,
438    /// Intrinsic-dependence score was not finite.
439    #[error("message intrinsic dependence score was not finite")]
440    IntrinsicDependenceInvalid,
441    /// Compression oracle error.
442    #[error("compression oracle error: {0}")]
443    Compression(#[from] CompressionError),
444    /// Key-exchange parse error.
445    #[error("key exchange parse error: {0}")]
446    KeyExchange(#[from] KeyExchangeError),
447    /// Clear key exchange on insecure channel.
448    #[error("clear key exchange requires secure channel")]
449    ClearKeyOnInsecureChannel,
450    /// Malformed key-exchange state.
451    #[error("unexpected key exchange reply without pending state")]
452    MissingPendingKeyExchangeState,
453    /// Weak/unsafe key-exchange parameters.
454    #[error("weak key exchange parameters: {0}")]
455    WeakKeyExchangeParameters(&'static str),
456}
457
458/// Result of processing one inbound message.
459#[derive(Debug)]
460pub struct ProcessOutcome {
461    /// Whether message was accepted.
462    pub accepted: bool,
463    /// Drop reason when not accepted.
464    pub drop_reason: Option<ProcessError>,
465    /// Parsed message if available.
466    pub parsed_message: Option<CmrMessage>,
467    /// Intrinsic dependence score when computed.
468    pub intrinsic_dependence: Option<f64>,
469    /// Generated forwarding actions.
470    pub forwards: Vec<ForwardAction>,
471    /// Client-originated message creation plans.
472    pub client_plans: Vec<ClientMessagePlan>,
473    /// Matched cached messages returned to a locally originating client post.
474    pub local_matches: Vec<CmrMessage>,
475    /// Number of semantic matches found.
476    pub matched_count: usize,
477    /// Routing distance diagnostics.
478    pub routing_diagnostics: Option<RoutingDiagnostics>,
479    /// Whether this was a key exchange control message.
480    pub key_exchange_control: bool,
481}
482
483/// Routing-distance diagnostics for threshold tuning and observability.
484#[derive(Clone, Debug)]
485pub struct RoutingDiagnostics {
486    /// Best candidate peer selected by raw distance ranking.
487    pub best_peer: Option<String>,
488    /// Best raw Section 3.2 distance.
489    pub best_distance_raw: Option<f64>,
490    /// Best normalized distance, when computable.
491    pub best_distance_normalized: Option<f64>,
492    /// Active raw threshold.
493    pub threshold_raw: f64,
494}
495
496impl ProcessOutcome {
497    fn dropped(reason: ProcessError) -> Self {
498        Self {
499            accepted: false,
500            drop_reason: Some(reason),
501            parsed_message: None,
502            intrinsic_dependence: None,
503            forwards: Vec::new(),
504            client_plans: Vec::new(),
505            local_matches: Vec::new(),
506            matched_count: 0,
507            routing_diagnostics: None,
508            key_exchange_control: false,
509        }
510    }
511
512    fn accepted(message: CmrMessage) -> Self {
513        Self {
514            accepted: true,
515            drop_reason: None,
516            parsed_message: Some(message),
517            intrinsic_dependence: None,
518            forwards: Vec::new(),
519            client_plans: Vec::new(),
520            local_matches: Vec::new(),
521            matched_count: 0,
522            routing_diagnostics: None,
523            key_exchange_control: false,
524        }
525    }
526}
527
528/// In-memory CMR router.
529pub struct Router<O: CompressionOracle> {
530    local_address: String,
531    policy: RoutingPolicy,
532    oracle: O,
533    cache: MessageCache,
534    peers: HashMap<String, PeerMetrics>,
535    global_window: RateWindow,
536    shared_keys: HashMap<String, Vec<u8>>,
537    pending_rsa: HashMap<String, PendingRsaState>,
538    pending_dh: HashMap<String, PendingDhState>,
539    forward_counter: u64,
540}
541
542impl<O: CompressionOracle> Router<O> {
543    fn normalized_match_distance(&self, raw_distance: f64, incoming_len: usize) -> Option<f64> {
544        if !raw_distance.is_finite() {
545            return None;
546        }
547        let bounded = raw_distance.max(0.0);
548        let scale = incoming_len.max(1) as f64;
549        Some((bounded / scale).clamp(0.0, 1.0))
550    }
551
552    /// Creates a router instance.
553    #[must_use]
554    pub fn new(local_address: String, policy: RoutingPolicy, oracle: O) -> Self {
555        Self {
556            local_address,
557            cache: MessageCache::new(policy.cache_max_messages, policy.cache_max_bytes),
558            policy,
559            oracle,
560            peers: HashMap::new(),
561            global_window: RateWindow::new(),
562            shared_keys: HashMap::new(),
563            pending_rsa: HashMap::new(),
564            pending_dh: HashMap::new(),
565            forward_counter: 0,
566        }
567    }
568
569    /// Registers a pairwise shared key.
570    pub fn set_shared_key(&mut self, peer: impl Into<String>, key: Vec<u8>) {
571        self.shared_keys.insert(peer.into(), key);
572    }
573
574    /// Local peer address.
575    #[must_use]
576    pub fn local_address(&self) -> &str {
577        &self.local_address
578    }
579
580    /// Gets known shared key.
581    #[must_use]
582    pub fn shared_key(&self, peer: &str) -> Option<&[u8]> {
583        self.shared_keys.get(peer).map(Vec::as_slice)
584    }
585
586    /// Returns active routing policy.
587    #[must_use]
588    pub fn policy(&self) -> &RoutingPolicy {
589        &self.policy
590    }
591
592    /// Replaces active policy and immediately updates cache limits.
593    pub fn set_policy(&mut self, policy: RoutingPolicy) {
594        self.cache.max_messages = policy.cache_max_messages;
595        self.cache.max_bytes = policy.cache_max_bytes;
596        self.policy = policy;
597        self.cache.evict_as_needed();
598    }
599
600    /// Snapshot of peer metrics.
601    #[must_use]
602    pub fn peer_snapshots(&self) -> Vec<PeerSnapshot> {
603        let mut names = self.peers.keys().cloned().collect::<HashSet<_>>();
604        names.extend(self.shared_keys.keys().cloned());
605        names.extend(self.pending_rsa.keys().cloned());
606        names.extend(self.pending_dh.keys().cloned());
607        let mut peers = names
608            .into_iter()
609            .filter(|peer| !self.is_local_peer_alias(peer))
610            .map(|peer| {
611                let metrics = self.peers.get(&peer).cloned().unwrap_or_default();
612                PeerSnapshot {
613                    reputation: metrics.reputation,
614                    inbound_messages: metrics.inbound_messages,
615                    inbound_bytes: metrics.inbound_bytes,
616                    outbound_messages: metrics.outbound_messages,
617                    outbound_bytes: metrics.outbound_bytes,
618                    current_window_messages: metrics.window.current_messages(),
619                    current_window_bytes: metrics.window.current_bytes(),
620                    has_shared_key: self.shared_keys.contains_key(&peer),
621                    pending_key_exchange: self.pending_rsa.contains_key(&peer)
622                        || self.pending_dh.contains_key(&peer),
623                    peer,
624                }
625            })
626            .collect::<Vec<_>>();
627        peers.sort_by(|left, right| left.peer.cmp(&right.peer));
628        peers
629    }
630
631    /// Number of tracked peers.
632    #[must_use]
633    pub fn peer_count(&self) -> usize {
634        let mut names = self.peers.keys().cloned().collect::<HashSet<_>>();
635        names.extend(self.shared_keys.keys().cloned());
636        names.extend(self.pending_rsa.keys().cloned());
637        names.extend(self.pending_dh.keys().cloned());
638        names
639            .into_iter()
640            .filter(|peer| !self.is_local_peer_alias(peer))
641            .count()
642    }
643
644    /// Number of configured shared keys.
645    #[must_use]
646    pub fn known_keys_count(&self) -> usize {
647        self.shared_keys.len()
648    }
649
650    /// Number of pending key exchange initiator states.
651    #[must_use]
652    pub fn pending_key_exchange_count(&self) -> usize {
653        self.pending_rsa.len().saturating_add(self.pending_dh.len())
654    }
655
656    /// Adjusts peer reputation by delta.
657    pub fn adjust_reputation(&mut self, peer: &str, delta: f64) {
658        self.adjust_peer_reputation(peer, delta);
659    }
660
661    /// Removes a peer from local metrics and key state.
662    pub fn remove_peer(&mut self, peer: &str) -> bool {
663        let mut removed = false;
664        removed |= self.peers.remove(peer).is_some();
665        removed |= self.shared_keys.remove(peer).is_some();
666        removed |= self.pending_rsa.remove(peer).is_some();
667        removed |= self.pending_dh.remove(peer).is_some();
668        removed
669    }
670
671    /// Records a successfully delivered outbound message for peer-level accounting.
672    pub fn record_successful_outbound(&mut self, peer: &str, bytes: usize) {
673        self.record_peer_outbound(peer, bytes);
674    }
675
676    /// Inserts a local client-created message into cache without network-ingress checks.
677    pub fn cache_local_client_message(
678        &mut self,
679        raw_message: &[u8],
680        now: CmrTimestamp,
681    ) -> Result<(), ProcessError> {
682        let parse_ctx = ParseContext {
683            now,
684            recipient_address: None,
685            max_message_bytes: self.policy.content.max_message_bytes,
686            max_header_ids: self.policy.content.max_header_ids,
687        };
688        let parsed = parse_message(raw_message, &parse_ctx)?;
689        if parsed.body.len() > self.policy.content.max_body_bytes {
690            return Err(ProcessError::BodyTooLarge);
691        }
692        if !self.policy.content.allow_binary_payloads && is_probably_binary(&parsed.body) {
693            return Err(ProcessError::BinaryContentBlocked);
694        }
695        if self.policy.content.block_executable_magic && looks_like_executable(&parsed.body) {
696            return Err(ProcessError::ExecutableBlocked);
697        }
698        if self.cache.has_seen_any_id(&parsed) {
699            return Err(ProcessError::DuplicateMessageId);
700        }
701        self.cache.insert(parsed);
702        Ok(())
703    }
704
705    /// Current cache summary.
706    #[must_use]
707    pub fn cache_stats(&self) -> CacheStats {
708        CacheStats {
709            entry_count: self.cache.entries.len(),
710            total_bytes: self.cache.total_bytes,
711            max_messages: self.cache.max_messages,
712            max_bytes: self.cache.max_bytes,
713            total_evictions: self.cache.total_evictions,
714        }
715    }
716
717    /// Cache entries for observability.
718    #[must_use]
719    pub fn cache_entries(&self) -> Vec<CacheEntryView> {
720        self.cache
721            .order
722            .iter()
723            .filter_map(|key| self.cache.entries.get(key))
724            .map(|entry| {
725                let timestamp = entry
726                    .message
727                    .origin_id()
728                    .map_or_else(String::new, |id| id.timestamp.to_string());
729                let body_preview = String::from_utf8_lossy(&entry.message.body)
730                    .chars()
731                    .take(128)
732                    .collect::<String>();
733                CacheEntryView {
734                    key: entry.key.clone(),
735                    encoded_size: entry.encoded_size,
736                    sender: entry.message.immediate_sender().to_owned(),
737                    timestamp,
738                    body_preview,
739                }
740            })
741            .collect()
742    }
743
744    /// Computes Section 3.2 distance between two cached messages by cache keys.
745    pub fn cache_message_distance(
746        &self,
747        left_key: &str,
748        right_key: &str,
749    ) -> Result<Option<f64>, CompressionError> {
750        let Some(left) = self.cache.entries.get(left_key) else {
751            return Ok(None);
752        };
753        let Some(right) = self.cache.entries.get(right_key) else {
754            return Ok(None);
755        };
756        let distance = self
757            .oracle
758            .compression_distance(&left.message.to_bytes(), &right.message.to_bytes())?;
759        Ok(Some(distance))
760    }
761
762    /// Stores pending RSA initiator state for incoming replies.
763    pub fn register_pending_rsa_state(&mut self, peer: impl Into<String>, n: BigUint, d: BigUint) {
764        self.pending_rsa
765            .insert(peer.into(), PendingRsaState { n, d });
766    }
767
768    /// Stores pending DH initiator state for incoming replies.
769    pub fn register_pending_dh_state(
770        &mut self,
771        peer: impl Into<String>,
772        p: BigUint,
773        a_secret: BigUint,
774    ) {
775        self.pending_dh
776            .insert(peer.into(), PendingDhState { p, a_secret });
777    }
778
779    /// Builds an RSA key-exchange initiation for a destination peer.
780    pub fn initiate_rsa_key_exchange(
781        &mut self,
782        destination: &str,
783        now: &CmrTimestamp,
784    ) -> Option<ClientMessagePlan> {
785        self.build_rsa_initiation_plan(destination, now)
786    }
787
788    /// Builds a DH key-exchange initiation for a destination peer.
789    pub fn initiate_dh_key_exchange(
790        &mut self,
791        destination: &str,
792        now: &CmrTimestamp,
793    ) -> Option<ClientMessagePlan> {
794        self.build_dh_initiation_plan(destination, now)
795    }
796
797    /// Initiates clear key-exchange by sending shared key bytes over a secure channel.
798    pub fn initiate_clear_key_exchange(
799        &mut self,
800        destination: &str,
801        clear_key: Vec<u8>,
802        _now: &CmrTimestamp,
803    ) -> Option<ClientMessagePlan> {
804        if clear_key.is_empty() {
805            return None;
806        }
807        let signing_key = self.shared_keys.get(destination).cloned();
808        self.shared_keys.insert(
809            destination.to_owned(),
810            derive_exchange_key_from_bytes(&self.local_address, destination, b"clear", &clear_key),
811        );
812        self.purge_key_exchange_cache(destination);
813        Some(ClientMessagePlan {
814            destination: destination.to_owned(),
815            body: KeyExchangeMessage::ClearKey { key: clear_key }
816                .render()
817                .into_bytes(),
818            signing_key,
819            reason: ForwardReason::KeyExchangeInitiation,
820        })
821    }
822
823    /// Processes one inbound message.
824    #[must_use]
825    pub fn process_incoming(
826        &mut self,
827        raw_message: &[u8],
828        transport: TransportKind,
829        now: CmrTimestamp,
830    ) -> ProcessOutcome {
831        self.process_message(raw_message, transport, now, true)
832    }
833
834    /// Processes one local client-originated message.
835    ///
836    /// This path intentionally skips the recipient-address header guard used for
837    /// network ingress so local compose/injection can originate from the node's
838    /// own canonical address.
839    #[must_use]
840    pub fn process_local_client_message(
841        &mut self,
842        raw_message: &[u8],
843        transport: TransportKind,
844        now: CmrTimestamp,
845    ) -> ProcessOutcome {
846        self.process_message(raw_message, transport, now, false)
847    }
848
849    fn process_message(
850        &mut self,
851        raw_message: &[u8],
852        transport: TransportKind,
853        now: CmrTimestamp,
854        enforce_recipient_guard: bool,
855    ) -> ProcessOutcome {
856        let parse_ctx = ParseContext {
857            now: now.clone(),
858            recipient_address: enforce_recipient_guard.then_some(self.local_address.as_str()),
859            max_message_bytes: self.policy.content.max_message_bytes,
860            max_header_ids: self.policy.content.max_header_ids,
861        };
862
863        let parsed = match parse_message(raw_message, &parse_ctx) {
864            Ok(m) => m,
865            Err(err) => return ProcessOutcome::dropped(ProcessError::Parse(err)),
866        };
867
868        if parsed.body.len() > self.policy.content.max_body_bytes {
869            return self.drop_for_peer(&parsed, ProcessError::BodyTooLarge, -2.0);
870        }
871
872        let sender = parsed.immediate_sender().to_owned();
873        if !self.check_global_rate(raw_message.len()) {
874            return self.drop_for_peer(&parsed, ProcessError::GlobalFloodLimited, -1.5);
875        }
876        if !self.check_peer_rate(&sender, raw_message.len()) {
877            return self.drop_for_peer(&parsed, ProcessError::FloodLimited, -2.0);
878        }
879        if self.peer_reputation(&sender) < self.policy.trust.min_reputation_score {
880            return self.drop_for_peer(&parsed, ProcessError::ReputationTooLow, -0.5);
881        }
882        if let Err(err) = self.validate_signature_policy(&parsed, &sender) {
883            return self.drop_for_peer(&parsed, err, -4.0);
884        }
885        if self.cache.has_seen_any_id(&parsed) {
886            return self.drop_for_peer(&parsed, ProcessError::DuplicateMessageId, -0.1);
887        }
888        if !self.policy.content.allow_binary_payloads && is_probably_binary(&parsed.body) {
889            return self.drop_for_peer(&parsed, ProcessError::BinaryContentBlocked, -0.4);
890        }
891        if self.policy.content.block_executable_magic && looks_like_executable(&parsed.body) {
892            return self.drop_for_peer(&parsed, ProcessError::ExecutableBlocked, -2.5);
893        }
894
895        match self.handle_key_exchange_control(&parsed, &sender, &transport) {
896            Ok(Some(control)) => {
897                self.adjust_peer_reputation(&sender, 1.5);
898                self.record_peer_inbound(&sender, raw_message.len());
899                return ProcessOutcome {
900                    accepted: true,
901                    drop_reason: None,
902                    parsed_message: Some(parsed),
903                    intrinsic_dependence: None,
904                    forwards: control.forwards,
905                    client_plans: control.client_plans,
906                    local_matches: Vec::new(),
907                    matched_count: 0,
908                    routing_diagnostics: None,
909                    key_exchange_control: true,
910                };
911            }
912            Ok(None) => {}
913            Err(err) => {
914                let penalty = if matches!(err, ProcessError::MissingPendingKeyExchangeState) {
915                    0.0
916                } else {
917                    -3.0
918                };
919                return self.drop_for_peer(&parsed, err, penalty);
920            }
921        }
922
923        let id_score = match self
924            .oracle
925            .intrinsic_dependence(&parsed.body, self.policy.spam.intrinsic_dependence_order)
926        {
927            Ok(score) => score,
928            Err(err) => {
929                return self.drop_for_peer(
930                    &parsed,
931                    ProcessError::Compression(err),
932                    if self.policy.security_level == crate::policy::SecurityLevel::Trusted {
933                        -0.2
934                    } else {
935                        -1.0
936                    },
937                );
938            }
939        };
940        if !id_score.is_finite() {
941            return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceInvalid, -1.5);
942        }
943        if id_score < self.policy.spam.min_intrinsic_dependence {
944            return self.drop_for_peer(&parsed, ProcessError::IntrinsicDependenceTooLow, -1.5);
945        }
946
947        let routing = match self.select_routing_decision(&parsed) {
948            Ok(decision) => decision,
949            Err(err) => return self.drop_for_peer(&parsed, err, -0.5),
950        };
951        let mut outcome = ProcessOutcome::accepted(parsed.clone());
952        outcome.intrinsic_dependence = Some(id_score);
953        outcome.matched_count = routing.matched_messages.len();
954        outcome.routing_diagnostics = Some(RoutingDiagnostics {
955            best_peer: routing.best_peer.clone(),
956            best_distance_raw: routing.best_distance_raw,
957            best_distance_normalized: routing.best_distance_normalized,
958            threshold_raw: routing.threshold_raw,
959        });
960        if !enforce_recipient_guard {
961            let mut local_matches = routing.matched_messages.clone();
962            local_matches.truncate(self.policy.throughput.max_forward_actions);
963            outcome.local_matches = local_matches;
964        }
965
966        self.cache.insert(parsed.clone());
967        self.record_peer_inbound(&sender, raw_message.len());
968        self.adjust_peer_reputation(&sender, 0.4);
969
970        let mut actions = self.build_routing_actions(&parsed, routing, &now);
971        actions
972            .forwards
973            .truncate(self.policy.throughput.max_forward_actions);
974        actions
975            .client_plans
976            .truncate(self.policy.throughput.max_forward_actions);
977        outcome.forwards = actions.forwards;
978        outcome.client_plans = actions.client_plans;
979        outcome
980    }
981
982    fn drop_for_peer(
983        &mut self,
984        parsed: &CmrMessage,
985        reason: ProcessError,
986        reputation_delta: f64,
987    ) -> ProcessOutcome {
988        let sender = parsed.immediate_sender().to_owned();
989        self.adjust_peer_reputation(&sender, reputation_delta);
990        ProcessOutcome {
991            accepted: false,
992            drop_reason: Some(reason),
993            parsed_message: Some(parsed.clone()),
994            intrinsic_dependence: None,
995            forwards: Vec::new(),
996            client_plans: Vec::new(),
997            local_matches: Vec::new(),
998            matched_count: 0,
999            routing_diagnostics: None,
1000            key_exchange_control: false,
1001        }
1002    }
1003
1004    fn check_peer_rate(&mut self, peer: &str, message_bytes: usize) -> bool {
1005        let metrics = self.peers.entry(peer.to_owned()).or_default();
1006        metrics.window.allow_and_record(
1007            message_bytes,
1008            self.policy.throughput.per_peer_messages_per_minute,
1009            self.policy.throughput.per_peer_bytes_per_minute,
1010        )
1011    }
1012
1013    fn check_global_rate(&mut self, message_bytes: usize) -> bool {
1014        self.global_window.allow_and_record(
1015            message_bytes,
1016            self.policy.throughput.global_messages_per_minute,
1017            self.policy.throughput.global_bytes_per_minute,
1018        )
1019    }
1020
1021    fn peer_reputation(&self, peer: &str) -> f64 {
1022        self.peers.get(peer).map_or(0.0, |p| p.reputation)
1023    }
1024
1025    fn adjust_peer_reputation(&mut self, peer: &str, delta: f64) {
1026        let metrics = self.peers.entry(peer.to_owned()).or_default();
1027        metrics.reputation = (metrics.reputation + delta).clamp(-100.0, 100.0);
1028    }
1029
1030    fn is_local_peer_alias(&self, peer: &str) -> bool {
1031        let local = self.local_address.trim_end_matches('/');
1032        let candidate = peer.trim_end_matches('/');
1033        if candidate == local {
1034            return true;
1035        }
1036
1037        if let (Ok(local_url), Ok(candidate_url)) = (Url::parse(local), Url::parse(candidate)) {
1038            if !same_origin(&local_url, &candidate_url) {
1039                return false;
1040            }
1041            return is_path_alias(local_url.path(), candidate_url.path());
1042        }
1043
1044        candidate.starts_with(&format!("{local}/"))
1045    }
1046
1047    fn record_peer_inbound(&mut self, peer: &str, bytes: usize) {
1048        let metrics = self.peers.entry(peer.to_owned()).or_default();
1049        metrics.inbound_messages = metrics.inbound_messages.saturating_add(1);
1050        metrics.inbound_bytes = metrics
1051            .inbound_bytes
1052            .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
1053    }
1054
1055    fn record_peer_outbound(&mut self, peer: &str, bytes: usize) {
1056        let metrics = self.peers.entry(peer.to_owned()).or_default();
1057        metrics.outbound_messages = metrics.outbound_messages.saturating_add(1);
1058        metrics.outbound_bytes = metrics
1059            .outbound_bytes
1060            .saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX));
1061    }
1062
1063    fn can_forward_to_peer(&self, peer: &str) -> bool {
1064        let Some(metrics) = self.peers.get(peer) else {
1065            return true;
1066        };
1067        if metrics.inbound_bytes == 0 {
1068            // First-contact forwarding is allowed; ratio policy applies only
1069            // after inbound traffic has been observed from this peer.
1070            return true;
1071        }
1072        let ratio = metrics.outbound_bytes as f64 / metrics.inbound_bytes as f64;
1073        ratio <= self.policy.trust.max_outbound_inbound_ratio
1074    }
1075
1076    fn validate_signature_policy(
1077        &self,
1078        message: &CmrMessage,
1079        sender: &str,
1080    ) -> Result<(), ProcessError> {
1081        let known_key = self.shared_keys.get(sender);
1082        match (&message.signature, known_key) {
1083            (Signature::Unsigned, Some(_))
1084                if self.policy.trust.require_signatures_from_known_peers =>
1085            {
1086                Err(ProcessError::UnsignedRejected)
1087            }
1088            (Signature::Unsigned, None) if !self.policy.trust.allow_unsigned_from_unknown_peers => {
1089                Err(ProcessError::UnsignedRejected)
1090            }
1091            (Signature::Sha256(_), None) if self.policy.trust.reject_signed_without_known_key => {
1092                Err(ProcessError::SignedWithoutKnownKey)
1093            }
1094            (Signature::Sha256(_), Some(key)) => {
1095                if message
1096                    .signature
1097                    .verifies(&message.payload_without_signature_line(), Some(key))
1098                {
1099                    Ok(())
1100                } else {
1101                    Err(ProcessError::BadSignature)
1102                }
1103            }
1104            _ => Ok(()),
1105        }
1106    }
1107
1108    fn handle_key_exchange_control(
1109        &mut self,
1110        message: &CmrMessage,
1111        sender: &str,
1112        transport: &TransportKind,
1113    ) -> Result<Option<KeyExchangeControlOutcome>, ProcessError> {
1114        let Some(control) = parse_key_exchange(&message.body)? else {
1115            return Ok(None);
1116        };
1117
1118        if self.shared_keys.contains_key(sender) && matches!(message.signature, Signature::Unsigned)
1119        {
1120            return Err(ProcessError::UnsignedRejected);
1121        }
1122
1123        let old_key = self.shared_keys.get(sender).cloned();
1124        match control {
1125            KeyExchangeMessage::ClearKey { key } => {
1126                if !transport.is_secure_channel() {
1127                    return Err(ProcessError::ClearKeyOnInsecureChannel);
1128                }
1129                self.cache.insert(message.clone());
1130                let derived =
1131                    derive_exchange_key_from_bytes(&self.local_address, sender, b"clear", &key);
1132                self.shared_keys.insert(sender.to_owned(), derived);
1133                self.purge_key_exchange_cache(sender);
1134                Ok(Some(KeyExchangeControlOutcome::default()))
1135            }
1136            KeyExchangeMessage::RsaRequest { n, e } => {
1137                validate_rsa_request_params(&n, &e)?;
1138                let key = random_nonzero_biguint_below(&n).ok_or(
1139                    ProcessError::WeakKeyExchangeParameters("failed to generate RSA session key"),
1140                )?;
1141                self.cache.insert(message.clone());
1142                let c = mod_pow(&key, &e, &n);
1143                let reply_body = KeyExchangeMessage::RsaReply { c }.render().into_bytes();
1144                self.shared_keys.insert(
1145                    sender.to_owned(),
1146                    derive_exchange_key(&self.local_address, sender, b"rsa", &key),
1147                );
1148                self.purge_key_exchange_cache(sender);
1149                Ok(Some(KeyExchangeControlOutcome {
1150                    forwards: Vec::new(),
1151                    client_plans: vec![ClientMessagePlan {
1152                        destination: sender.to_owned(),
1153                        body: reply_body,
1154                        signing_key: old_key,
1155                        reason: ForwardReason::KeyExchangeReply,
1156                    }],
1157                }))
1158            }
1159            KeyExchangeMessage::RsaReply { c } => {
1160                let Some(state) = self.pending_rsa.remove(sender) else {
1161                    return Err(ProcessError::MissingPendingKeyExchangeState);
1162                };
1163                if c >= state.n {
1164                    return Err(ProcessError::WeakKeyExchangeParameters(
1165                        "RSA reply ciphertext out of range",
1166                    ));
1167                }
1168                let key = mod_pow(&c, &state.d, &state.n);
1169                if key.is_zero() {
1170                    return Err(ProcessError::WeakKeyExchangeParameters(
1171                        "RSA shared key reduced to zero",
1172                    ));
1173                }
1174                self.cache.insert(message.clone());
1175                self.shared_keys.insert(
1176                    sender.to_owned(),
1177                    derive_exchange_key(&self.local_address, sender, b"rsa", &key),
1178                );
1179                self.purge_key_exchange_cache(sender);
1180                Ok(Some(KeyExchangeControlOutcome::default()))
1181            }
1182            KeyExchangeMessage::DhRequest { g, p, a_pub } => {
1183                validate_dh_request_params(&g, &p, &a_pub)?;
1184                let b_secret =
1185                    random_dh_secret(&p).ok_or(ProcessError::WeakKeyExchangeParameters(
1186                        "failed to generate DH secret exponent",
1187                    ))?;
1188                let b_pub = mod_pow(&g, &b_secret, &p);
1189                let shared = mod_pow(&a_pub, &b_secret, &p);
1190                if shared <= BigUint::one() {
1191                    return Err(ProcessError::WeakKeyExchangeParameters(
1192                        "DH derived weak shared key",
1193                    ));
1194                }
1195                self.cache.insert(message.clone());
1196                let reply_body = KeyExchangeMessage::DhReply { b_pub }.render().into_bytes();
1197                self.shared_keys.insert(
1198                    sender.to_owned(),
1199                    derive_exchange_key(&self.local_address, sender, b"dh", &shared),
1200                );
1201                self.purge_key_exchange_cache(sender);
1202                Ok(Some(KeyExchangeControlOutcome {
1203                    forwards: Vec::new(),
1204                    client_plans: vec![ClientMessagePlan {
1205                        destination: sender.to_owned(),
1206                        body: reply_body,
1207                        signing_key: old_key,
1208                        reason: ForwardReason::KeyExchangeReply,
1209                    }],
1210                }))
1211            }
1212            KeyExchangeMessage::DhReply { b_pub } => {
1213                let Some(state) = self.pending_dh.remove(sender) else {
1214                    return Err(ProcessError::MissingPendingKeyExchangeState);
1215                };
1216                validate_dh_reply_params(&b_pub, &state.p)?;
1217                let shared = mod_pow(&b_pub, &state.a_secret, &state.p);
1218                if shared <= BigUint::one() {
1219                    return Err(ProcessError::WeakKeyExchangeParameters(
1220                        "DH derived weak shared key",
1221                    ));
1222                }
1223                self.cache.insert(message.clone());
1224                self.shared_keys.insert(
1225                    sender.to_owned(),
1226                    derive_exchange_key(&self.local_address, sender, b"dh", &shared),
1227                );
1228                self.purge_key_exchange_cache(sender);
1229                Ok(Some(KeyExchangeControlOutcome::default()))
1230            }
1231        }
1232    }
1233
1234    fn purge_key_exchange_cache(&mut self, peer: &str) {
1235        let local = self.local_address.clone();
1236        self.cache.remove_if(|message| {
1237            parse_key_exchange(&message.body).ok().flatten().is_some()
1238                && (message_contains_sender(message, peer)
1239                    || message_contains_sender(message, &local))
1240        });
1241    }
1242
1243    fn select_routing_decision(
1244        &self,
1245        incoming: &CmrMessage,
1246    ) -> Result<RoutingDecision, ProcessError> {
1247        let threshold_raw = self.policy.spam.max_match_distance;
1248        let mut decision = RoutingDecision {
1249            best_peer: None,
1250            best_distance_raw: None,
1251            best_distance_normalized: None,
1252            threshold_raw,
1253            matched_messages: Vec::new(),
1254            compensatory: None,
1255        };
1256
1257        let peer_corpora = self.collect_peer_corpora();
1258        if peer_corpora.is_empty() {
1259            return Ok(decision);
1260        }
1261
1262        let mut canonical_incoming = incoming.clone();
1263        canonical_incoming.make_unsigned();
1264        let incoming_bytes = canonical_incoming.to_bytes();
1265        let mut peers: Vec<(&String, &Vec<u8>)> = peer_corpora.iter().collect();
1266        peers.sort_by(|left, right| left.0.cmp(right.0));
1267        let peer_names: Vec<String> = peers.iter().map(|(peer, _)| (*peer).to_owned()).collect();
1268        let corpora: Vec<Vec<u8>> = peers.iter().map(|(_, corpus)| (*corpus).clone()).collect();
1269        let distances = self
1270            .oracle
1271            .batch_compression_distance(&incoming_bytes, &corpora)
1272            .map_err(ProcessError::Compression)?;
1273
1274        let mut best: Option<(String, f64)> = None;
1275        let mut matched_peers = Vec::<String>::new();
1276        let incoming_len = incoming_bytes.len();
1277        for ((peer, _corpus), distance) in peer_names
1278            .into_iter()
1279            .zip(corpora.iter())
1280            .zip(distances.into_iter())
1281        {
1282            if !distance.is_finite() {
1283                continue;
1284            }
1285            let passed_threshold = distance <= threshold_raw;
1286            if passed_threshold {
1287                matched_peers.push(peer.clone());
1288            }
1289            if best
1290                .as_ref()
1291                .is_none_or(|(_, best_distance)| distance < *best_distance)
1292            {
1293                best = Some((peer, distance));
1294            }
1295        }
1296
1297        let Some((best_peer, best_distance)) = best else {
1298            return Ok(decision);
1299        };
1300        let Some(best_normalized) = self.normalized_match_distance(best_distance, incoming_len)
1301        else {
1302            return Ok(decision);
1303        };
1304        decision.best_peer = Some(best_peer.clone());
1305        decision.best_distance_raw = Some(best_distance);
1306        decision.best_distance_normalized = Some(best_normalized);
1307
1308        let passes_best_threshold = best_distance <= threshold_raw;
1309        if !passes_best_threshold || matched_peers.is_empty() {
1310            return Ok(decision);
1311        }
1312
1313        let matched_messages =
1314            self.select_matched_messages(&incoming_bytes, &matched_peers, threshold_raw)?;
1315        if matched_messages.is_empty() {
1316            return Ok(decision);
1317        }
1318
1319        let compensatory = if message_contains_sender(incoming, &best_peer) {
1320            self.select_compensatory_message(incoming, &best_peer, &peer_corpora)?
1321                .map(|message| (best_peer.clone(), message))
1322        } else {
1323            None
1324        };
1325
1326        decision.matched_messages = matched_messages;
1327        decision.compensatory = compensatory;
1328        Ok(decision)
1329    }
1330
1331    fn collect_peer_corpora(&self) -> HashMap<String, Vec<u8>> {
1332        let mut peer_corpora = HashMap::<String, Vec<u8>>::new();
1333        for key in &self.cache.order {
1334            let Some(entry) = self.cache.entries.get(key) else {
1335                continue;
1336            };
1337            if is_key_exchange_control_message(&entry.message) {
1338                continue;
1339            }
1340            let sender = entry.message.immediate_sender();
1341            peer_corpora
1342                .entry(sender.to_owned())
1343                .or_default()
1344                .extend_from_slice(&entry.message.to_bytes());
1345        }
1346        peer_corpora
1347    }
1348
1349    fn select_matched_messages(
1350        &self,
1351        incoming_bytes: &[u8],
1352        matched_peers: &[String],
1353        threshold_raw: f64,
1354    ) -> Result<Vec<CmrMessage>, ProcessError> {
1355        let matched = matched_peers
1356            .iter()
1357            .map(String::as_str)
1358            .collect::<HashSet<_>>();
1359        let mut out = Vec::new();
1360        for key in &self.cache.order {
1361            let Some(entry) = self.cache.entries.get(key) else {
1362                continue;
1363            };
1364            if is_key_exchange_control_message(&entry.message) {
1365                continue;
1366            }
1367            if !matched.contains(entry.message.immediate_sender()) {
1368                continue;
1369            }
1370            let mut candidate = entry.message.clone();
1371            candidate.make_unsigned();
1372            let candidate_bytes = candidate.to_bytes();
1373            let distance = self
1374                .oracle
1375                .compression_distance(incoming_bytes, &candidate_bytes)
1376                .map_err(ProcessError::Compression)?;
1377            if distance.is_finite() && distance <= threshold_raw {
1378                out.push(entry.message.clone());
1379            }
1380        }
1381        Ok(out)
1382    }
1383
1384    fn select_compensatory_message(
1385        &self,
1386        incoming: &CmrMessage,
1387        best_peer: &str,
1388        peer_corpora: &HashMap<String, Vec<u8>>,
1389    ) -> Result<Option<CmrMessage>, ProcessError> {
1390        let ordered_entries = self
1391            .cache
1392            .order
1393            .iter()
1394            .filter_map(|key| self.cache.entries.get(key))
1395            .filter(|entry| !is_key_exchange_control_message(&entry.message))
1396            .collect::<Vec<_>>();
1397        if ordered_entries.len() <= 1 {
1398            return Ok(None);
1399        }
1400
1401        let encoded_entries = ordered_entries
1402            .iter()
1403            .map(|entry| (entry.message.clone(), entry.message.to_bytes()))
1404            .collect::<Vec<_>>();
1405        let total_bytes = encoded_entries
1406            .iter()
1407            .map(|(_, bytes)| bytes.len())
1408            .sum::<usize>();
1409        let mut cache_blob = Vec::with_capacity(total_bytes);
1410        let mut ranges = Vec::with_capacity(encoded_entries.len());
1411        for (_, bytes) in &encoded_entries {
1412            let start = cache_blob.len();
1413            cache_blob.extend_from_slice(bytes);
1414            let end = cache_blob.len();
1415            ranges.push((start, end));
1416        }
1417
1418        let mut canonical_incoming = incoming.clone();
1419        canonical_incoming.make_unsigned();
1420        let mut x_guess = Vec::with_capacity(
1421            canonical_incoming.encoded_len()
1422                + peer_corpora.get(best_peer).map_or(0, std::vec::Vec::len),
1423        );
1424        x_guess.extend_from_slice(&canonical_incoming.to_bytes());
1425        if let Some(known_from_best_peer) = peer_corpora.get(best_peer) {
1426            x_guess.extend_from_slice(known_from_best_peer);
1427        }
1428        let guess_distances = self
1429            .oracle
1430            .batch_compression_distance(
1431                &x_guess,
1432                &encoded_entries
1433                    .iter()
1434                    .map(|(_, bytes)| bytes.clone())
1435                    .collect::<Vec<_>>(),
1436            )
1437            .map_err(ProcessError::Compression)?;
1438
1439        let mut best_score = f64::NEG_INFINITY;
1440        let mut best_message = None;
1441        for (idx, (candidate_message, candidate_bytes)) in encoded_entries.iter().enumerate() {
1442            if message_contains_sender(candidate_message, best_peer) {
1443                continue;
1444            }
1445            if total_bytes <= candidate_bytes.len() {
1446                continue;
1447            }
1448            let (start, end) = ranges[idx];
1449            let mut right_parts: [&[u8]; 2] = [&[][..], &[][..]];
1450            let mut right_count = 0;
1451            if start > 0 {
1452                right_parts[right_count] = &cache_blob[..start];
1453                right_count += 1;
1454            }
1455            if end < cache_blob.len() {
1456                right_parts[right_count] = &cache_blob[end..];
1457                right_count += 1;
1458            }
1459            if right_count == 0 {
1460                continue;
1461            }
1462
1463            let d_cache = self
1464                .oracle
1465                .compression_distance_chain(
1466                    &[candidate_bytes.as_slice()],
1467                    &right_parts[..right_count],
1468                )
1469                .map_err(ProcessError::Compression)?;
1470            let Some(d_guess) = guess_distances.get(idx).copied() else {
1471                continue;
1472            };
1473            if !d_cache.is_finite() || !d_guess.is_finite() {
1474                continue;
1475            }
1476            let score = d_cache - d_guess;
1477            if score > best_score {
1478                best_score = score;
1479                best_message = Some(candidate_message.clone());
1480            }
1481        }
1482
1483        Ok(best_message)
1484    }
1485
1486    fn build_routing_actions(
1487        &mut self,
1488        incoming: &CmrMessage,
1489        decision: RoutingDecision,
1490        now: &CmrTimestamp,
1491    ) -> RoutingActions {
1492        let mut out = RoutingActions::default();
1493        let mut dedupe = HashSet::<(String, String)>::new();
1494        let incoming_key = cache_key(incoming);
1495        let incoming_destinations = sorted_unique_addresses(&incoming.header);
1496        let suppress_best = decision
1497            .best_peer
1498            .as_deref()
1499            .filter(|peer| message_contains_sender(incoming, peer));
1500
1501        if let Some((destination, message)) = decision.compensatory.clone() {
1502            let dedupe_key = (destination.clone(), cache_key(&message));
1503            if !dedupe.contains(&dedupe_key) {
1504                let actions = self.forward_with_optional_key_exchange(
1505                    &message,
1506                    &destination,
1507                    now,
1508                    ForwardReason::CompensatoryReply,
1509                );
1510                if !actions.forwards.is_empty() {
1511                    dedupe.insert(dedupe_key);
1512                    out.forwards.extend(actions.forwards);
1513                    out.client_plans.extend(actions.client_plans);
1514                }
1515            }
1516        }
1517
1518        for matched in &decision.matched_messages {
1519            for destination in sorted_unique_addresses(&matched.header) {
1520                if destination == self.local_address {
1521                    continue;
1522                }
1523                if suppress_best.is_some_and(|peer| peer == destination) {
1524                    continue;
1525                }
1526                let dedupe_key = (destination.clone(), incoming_key.clone());
1527                if dedupe.contains(&dedupe_key) {
1528                    continue;
1529                }
1530                let actions = self.forward_with_optional_key_exchange(
1531                    incoming,
1532                    &destination,
1533                    now,
1534                    ForwardReason::MatchedForwardIncoming,
1535                );
1536                if !actions.forwards.is_empty() {
1537                    dedupe.insert(dedupe_key);
1538                    out.forwards.extend(actions.forwards);
1539                    out.client_plans.extend(actions.client_plans);
1540                }
1541            }
1542
1543            let matched_key = cache_key(matched);
1544            for destination in &incoming_destinations {
1545                if destination == &self.local_address {
1546                    continue;
1547                }
1548                let dedupe_key = (destination.clone(), matched_key.clone());
1549                if dedupe.contains(&dedupe_key) {
1550                    continue;
1551                }
1552                let actions = self.forward_with_optional_key_exchange(
1553                    matched,
1554                    destination,
1555                    now,
1556                    ForwardReason::MatchedForwardCached,
1557                );
1558                if !actions.forwards.is_empty() {
1559                    dedupe.insert(dedupe_key);
1560                    out.forwards.extend(actions.forwards);
1561                    out.client_plans.extend(actions.client_plans);
1562                }
1563            }
1564        }
1565
1566        out
1567    }
1568
1569    fn forward_with_optional_key_exchange(
1570        &mut self,
1571        message: &CmrMessage,
1572        destination: &str,
1573        now: &CmrTimestamp,
1574        reason: ForwardReason,
1575    ) -> RoutingActions {
1576        if destination == self.local_address
1577            || !self.can_forward_to_peer(destination)
1578            || message_contains_sender(message, destination)
1579        {
1580            return RoutingActions::default();
1581        }
1582
1583        let mut out = RoutingActions::default();
1584        if !self.shared_keys.contains_key(destination)
1585            && !self.pending_rsa.contains_key(destination)
1586            && !self.pending_dh.contains_key(destination)
1587        {
1588            let kx = match self.policy.trust.auto_key_exchange_mode {
1589                AutoKeyExchangeMode::Rsa => self.build_rsa_initiation_plan(destination, now),
1590                AutoKeyExchangeMode::Dh => self.build_dh_initiation_plan(destination, now),
1591            };
1592            if let Some(plan) = kx {
1593                out.client_plans.push(plan);
1594            }
1595        }
1596        out.forwards
1597            .push(self.wrap_and_forward(message, destination, now, reason));
1598        out
1599    }
1600
1601    fn build_rsa_initiation_plan(
1602        &mut self,
1603        destination: &str,
1604        _now: &CmrTimestamp,
1605    ) -> Option<ClientMessagePlan> {
1606        let e = BigUint::from(65_537_u32);
1607        let bits_each = usize::try_from(MIN_RSA_MODULUS_BITS / 2).ok()?;
1608        let mut generated = None;
1609        for _ in 0..8 {
1610            let p = generate_probable_prime(bits_each, 12)?;
1611            let mut q = generate_probable_prime(bits_each, 12)?;
1612            if q == p {
1613                q = generate_probable_prime(bits_each, 12)?;
1614            }
1615            if q == p {
1616                continue;
1617            }
1618
1619            let n = &p * &q;
1620            if n.bits() < MIN_RSA_MODULUS_BITS {
1621                continue;
1622            }
1623            let p1 = &p - BigUint::one();
1624            let q1 = &q - BigUint::one();
1625            let lambda = lcm_biguint(&p1, &q1);
1626            if gcd_biguint(&e, &lambda) != BigUint::one() {
1627                continue;
1628            }
1629            let Some(d) = mod_inverse_biguint(&e, &lambda) else {
1630                continue;
1631            };
1632            generated = Some((n, d));
1633            break;
1634        }
1635        let (n, d) = generated?;
1636        self.pending_rsa
1637            .insert(destination.to_owned(), PendingRsaState { n: n.clone(), d });
1638
1639        let body = KeyExchangeMessage::RsaRequest { n, e }
1640            .render()
1641            .into_bytes();
1642        Some(ClientMessagePlan {
1643            destination: destination.to_owned(),
1644            body,
1645            signing_key: None,
1646            reason: ForwardReason::KeyExchangeInitiation,
1647        })
1648    }
1649
1650    fn build_dh_initiation_plan(
1651        &mut self,
1652        destination: &str,
1653        _now: &CmrTimestamp,
1654    ) -> Option<ClientMessagePlan> {
1655        let bits = usize::try_from(MIN_DH_MODULUS_BITS).ok()?;
1656        let p = generate_probable_safe_prime(bits, 10)?;
1657        let g = find_primitive_root_for_safe_prime(&p)?;
1658        let a_secret = random_dh_secret(&p)?;
1659        let a_pub = mod_pow(&g, &a_secret, &p);
1660        self.pending_dh.insert(
1661            destination.to_owned(),
1662            PendingDhState {
1663                p: p.clone(),
1664                a_secret,
1665            },
1666        );
1667
1668        let body = KeyExchangeMessage::DhRequest { g, p, a_pub }
1669            .render()
1670            .into_bytes();
1671        Some(ClientMessagePlan {
1672            destination: destination.to_owned(),
1673            body,
1674            signing_key: None,
1675            reason: ForwardReason::KeyExchangeInitiation,
1676        })
1677    }
1678
1679    fn wrap_and_forward(
1680        &mut self,
1681        message: &CmrMessage,
1682        destination: &str,
1683        now: &CmrTimestamp,
1684        reason: ForwardReason,
1685    ) -> ForwardAction {
1686        let mut forwarded = message.clone();
1687        forwarded.make_unsigned();
1688        forwarded.prepend_hop(MessageId {
1689            timestamp: self
1690                .next_forward_timestamp(now, message.header.first().map(|id| &id.timestamp)),
1691            address: self.local_address.clone(),
1692        });
1693        if let Some(key) = self.shared_keys.get(destination) {
1694            forwarded.sign_with_key(key);
1695        }
1696        ForwardAction {
1697            destination: destination.to_owned(),
1698            message_bytes: forwarded.to_bytes(),
1699            reason,
1700        }
1701    }
1702
1703    fn next_forward_timestamp(
1704        &mut self,
1705        now: &CmrTimestamp,
1706        newer_than: Option<&CmrTimestamp>,
1707    ) -> CmrTimestamp {
1708        self.forward_counter = self.forward_counter.saturating_add(1);
1709        let now_text = now.to_string();
1710        let (date_part, now_fraction) = split_timestamp_text(&now_text);
1711        let counter_suffix = format!("{:011}", self.forward_counter % 100_000_000_000);
1712        let mut fraction = if now_fraction.is_empty() {
1713            format!("{:09}", self.forward_counter % 1_000_000_000)
1714        } else {
1715            format!("{now_fraction}{counter_suffix}")
1716        };
1717        let mut candidate = parse_timestamp_with_fraction(date_part, &fraction)
1718            .unwrap_or_else(|| now.clone().with_fraction(fraction.clone()));
1719        if let Some(min_ts) = newer_than
1720            && candidate <= *min_ts
1721        {
1722            let min_text = min_ts.to_string();
1723            let (min_date, min_fraction) = split_timestamp_text(&min_text);
1724            fraction = format!("{min_fraction}1");
1725            candidate = parse_timestamp_with_fraction(min_date, &fraction)
1726                .unwrap_or_else(|| min_ts.clone().with_fraction(fraction));
1727        }
1728        candidate
1729    }
1730}
1731
1732fn same_origin(left: &Url, right: &Url) -> bool {
1733    left.scheme().eq_ignore_ascii_case(right.scheme())
1734        && left.host_str().map(|h| h.to_ascii_lowercase())
1735            == right.host_str().map(|h| h.to_ascii_lowercase())
1736        && left.port_or_known_default() == right.port_or_known_default()
1737}
1738
1739fn is_path_alias(local_path: &str, candidate_path: &str) -> bool {
1740    let local = normalize_alias_path(local_path);
1741    let candidate = normalize_alias_path(candidate_path);
1742    candidate == local || candidate.starts_with(&format!("{local}/"))
1743}
1744
1745fn normalize_alias_path(path: &str) -> String {
1746    let trimmed = path.trim_end_matches('/');
1747    if trimmed.is_empty() {
1748        "/".to_owned()
1749    } else {
1750        trimmed.to_owned()
1751    }
1752}
1753
1754fn split_timestamp_text(input: &str) -> (&str, &str) {
1755    if let Some((date, fraction)) = input.split_once('.') {
1756        (date, fraction)
1757    } else {
1758        (input, "")
1759    }
1760}
1761
1762fn parse_timestamp_with_fraction(date_part: &str, fraction: &str) -> Option<CmrTimestamp> {
1763    let text = if fraction.is_empty() {
1764        date_part.to_owned()
1765    } else {
1766        format!("{date_part}.{fraction}")
1767    };
1768    CmrTimestamp::parse(&text).ok()
1769}
1770
1771fn cache_key(message: &CmrMessage) -> String {
1772    message
1773        .origin_id()
1774        .map_or_else(|| message.header[0].to_string(), MessageId::to_string)
1775}
1776
1777fn message_contains_sender(message: &CmrMessage, sender: &str) -> bool {
1778    message.header.iter().any(|id| id.address == sender)
1779}
1780
1781fn is_key_exchange_control_message(message: &CmrMessage) -> bool {
1782    parse_key_exchange(&message.body).ok().flatten().is_some()
1783}
1784
1785fn sorted_unique_addresses(header: &[MessageId]) -> Vec<String> {
1786    let mut addresses = header
1787        .iter()
1788        .map(|id| id.address.clone())
1789        .collect::<Vec<_>>();
1790    addresses.sort();
1791    addresses.dedup();
1792    addresses
1793}
1794
1795fn is_probably_binary(body: &[u8]) -> bool {
1796    if body.is_empty() {
1797        return false;
1798    }
1799    let non_text = body
1800        .iter()
1801        .copied()
1802        .filter(|b| !matches!(b, 0x09 | 0x0A | 0x0D | 0x20..=0x7E))
1803        .count();
1804    non_text * 10 > body.len() * 3
1805}
1806
1807fn looks_like_executable(body: &[u8]) -> bool {
1808    body.starts_with(b"\x7fELF")
1809        || body.starts_with(b"MZ")
1810        || body.starts_with(b"\xfe\xed\xfa\xce")
1811        || body.starts_with(b"\xce\xfa\xed\xfe")
1812        || body.starts_with(b"\xcf\xfa\xed\xfe")
1813        || body.starts_with(b"\xfe\xed\xfa\xcf")
1814}
1815
1816fn validate_rsa_request_params(n: &BigUint, e: &BigUint) -> Result<(), ProcessError> {
1817    if n.bits() < MIN_RSA_MODULUS_BITS {
1818        return Err(ProcessError::WeakKeyExchangeParameters(
1819            "RSA modulus too small",
1820        ));
1821    }
1822    let two = BigUint::from(2_u8);
1823    if n <= &two || (n % &two).is_zero() {
1824        return Err(ProcessError::WeakKeyExchangeParameters(
1825            "RSA modulus must be odd and > 2",
1826        ));
1827    }
1828    if e <= &two || (e % &two).is_zero() {
1829        return Err(ProcessError::WeakKeyExchangeParameters(
1830            "RSA exponent must be odd and > 2",
1831        ));
1832    }
1833    if e >= n {
1834        return Err(ProcessError::WeakKeyExchangeParameters(
1835            "RSA exponent must be smaller than modulus",
1836        ));
1837    }
1838    if is_probably_prime(n, 10) {
1839        return Err(ProcessError::WeakKeyExchangeParameters(
1840            "RSA modulus must be composite",
1841        ));
1842    }
1843    Ok(())
1844}
1845
1846fn validate_dh_request_params(
1847    g: &BigUint,
1848    p: &BigUint,
1849    a_pub: &BigUint,
1850) -> Result<(), ProcessError> {
1851    if p.bits() < MIN_DH_MODULUS_BITS {
1852        return Err(ProcessError::WeakKeyExchangeParameters(
1853            "DH modulus too small",
1854        ));
1855    }
1856    let two = BigUint::from(2_u8);
1857    if p <= &two || (p % &two).is_zero() {
1858        return Err(ProcessError::WeakKeyExchangeParameters(
1859            "DH modulus must be odd and > 2",
1860        ));
1861    }
1862    if !is_probably_safe_prime(p, 10) {
1863        return Err(ProcessError::WeakKeyExchangeParameters(
1864            "DH modulus must be a safe prime",
1865        ));
1866    }
1867
1868    let p_minus_one = p - BigUint::one();
1869    if g <= &BigUint::one() || g >= &p_minus_one {
1870        return Err(ProcessError::WeakKeyExchangeParameters(
1871            "DH generator must be in range (1, p-1)",
1872        ));
1873    }
1874    if a_pub <= &BigUint::one() || a_pub >= &p_minus_one {
1875        return Err(ProcessError::WeakKeyExchangeParameters(
1876            "DH public value must be in range (1, p-1)",
1877        ));
1878    }
1879    if !is_primitive_root_for_safe_prime(g, p) {
1880        return Err(ProcessError::WeakKeyExchangeParameters(
1881            "DH generator must be a primitive root of p",
1882        ));
1883    }
1884    Ok(())
1885}
1886
1887fn validate_dh_reply_params(b_pub: &BigUint, p: &BigUint) -> Result<(), ProcessError> {
1888    if !is_probably_safe_prime(p, 10) {
1889        return Err(ProcessError::WeakKeyExchangeParameters(
1890            "DH modulus must be a safe prime",
1891        ));
1892    }
1893    let p_minus_one = p - BigUint::one();
1894    if b_pub <= &BigUint::one() || b_pub >= &p_minus_one {
1895        return Err(ProcessError::WeakKeyExchangeParameters(
1896            "DH reply value must be in range (1, p-1)",
1897        ));
1898    }
1899    Ok(())
1900}
1901
1902fn is_primitive_root_for_safe_prime(g: &BigUint, p: &BigUint) -> bool {
1903    if p <= &BigUint::from(3_u8) {
1904        return false;
1905    }
1906    let p_minus_one = p - BigUint::one();
1907    if g <= &BigUint::one() || g >= &p_minus_one {
1908        return false;
1909    }
1910    // For safe prime p = 2q+1, primitive root criterion:
1911    // g^2 != 1 (mod p) and g^q != 1 (mod p), where q=(p-1)/2.
1912    let q = &p_minus_one >> 1usize;
1913    let one = BigUint::one();
1914    let two = BigUint::from(2_u8);
1915    mod_pow(g, &two, p) != one && mod_pow(g, &q, p) != one
1916}
1917
1918fn find_primitive_root_for_safe_prime(p: &BigUint) -> Option<BigUint> {
1919    for candidate in 2_u32..=65_537_u32 {
1920        let g = BigUint::from(candidate);
1921        if is_primitive_root_for_safe_prime(&g, p) {
1922            return Some(g);
1923        }
1924    }
1925    None
1926}
1927
1928fn random_nonzero_biguint_below(modulus: &BigUint) -> Option<BigUint> {
1929    let modulus_bits = usize::try_from(modulus.bits()).ok()?;
1930    if modulus_bits == 0 {
1931        return None;
1932    }
1933    let byte_len = modulus_bits.div_ceil(8);
1934    let excess_bits = byte_len.saturating_mul(8).saturating_sub(modulus_bits);
1935    let mut rng = rand::rng();
1936    let mut raw = vec![0_u8; byte_len];
1937    for _ in 0..256 {
1938        rng.fill_bytes(&mut raw);
1939        if excess_bits > 0 {
1940            raw[0] &= 0xff_u8 >> excess_bits;
1941        }
1942        let value = BigUint::from_bytes_be(&raw);
1943        if !value.is_zero() && &value < modulus {
1944            return Some(value);
1945        }
1946    }
1947    None
1948}
1949
1950fn random_dh_secret(p: &BigUint) -> Option<BigUint> {
1951    if p <= &BigUint::one() {
1952        return None;
1953    }
1954    let upper_bound = p - BigUint::one();
1955    for _ in 0..256 {
1956        let candidate = random_nonzero_biguint_below(&upper_bound)?;
1957        if candidate > BigUint::one() {
1958            return Some(candidate);
1959        }
1960    }
1961    None
1962}
1963
1964fn generate_probable_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1965    if bits < 2 {
1966        return None;
1967    }
1968    for _ in 0..4096 {
1969        let candidate = random_odd_biguint_with_bits(bits)?;
1970        if is_probably_prime(&candidate, rounds) {
1971            return Some(candidate);
1972        }
1973    }
1974    None
1975}
1976
1977fn generate_probable_safe_prime(bits: usize, rounds: usize) -> Option<BigUint> {
1978    if bits < 3 {
1979        return None;
1980    }
1981    for _ in 0..256 {
1982        let q = generate_probable_prime(bits.saturating_sub(1), rounds)?;
1983        let p: BigUint = (&q << 1usize) + BigUint::one();
1984        if p.bits() >= u64::try_from(bits).ok()? && is_probably_prime(&p, rounds) {
1985            return Some(p);
1986        }
1987    }
1988    None
1989}
1990
1991fn random_odd_biguint_with_bits(bits: usize) -> Option<BigUint> {
1992    if bits < 2 {
1993        return None;
1994    }
1995    let byte_len = bits.div_ceil(8);
1996    let excess_bits = byte_len.saturating_mul(8).saturating_sub(bits);
1997    let mut bytes = vec![0_u8; byte_len];
1998    rand::rng().fill_bytes(&mut bytes);
1999    if excess_bits > 0 {
2000        bytes[0] &= 0xff_u8 >> excess_bits;
2001    }
2002    let top_bit = 7_u8.saturating_sub(u8::try_from(excess_bits).ok()?);
2003    bytes[0] |= 1_u8 << top_bit;
2004    bytes[byte_len.saturating_sub(1)] |= 1;
2005    Some(BigUint::from_bytes_be(&bytes))
2006}
2007
2008fn gcd_biguint(left: &BigUint, right: &BigUint) -> BigUint {
2009    let mut a = left.clone();
2010    let mut b = right.clone();
2011    while !b.is_zero() {
2012        let r = &a % &b;
2013        a = b;
2014        b = r;
2015    }
2016    a
2017}
2018
2019fn lcm_biguint(left: &BigUint, right: &BigUint) -> BigUint {
2020    if left.is_zero() || right.is_zero() {
2021        return BigUint::zero();
2022    }
2023    (left / gcd_biguint(left, right)) * right
2024}
2025
2026fn mod_inverse_biguint(value: &BigUint, modulus: &BigUint) -> Option<BigUint> {
2027    let a = value.to_bigint()?;
2028    let m = modulus.to_bigint()?;
2029    let (g, x, _) = extended_gcd_bigint(a, m.clone());
2030    if g != BigInt::one() {
2031        return None;
2032    }
2033    let mut reduced = x % &m;
2034    if reduced < BigInt::zero() {
2035        reduced += &m;
2036    }
2037    reduced.try_into().ok()
2038}
2039
2040fn extended_gcd_bigint(a: BigInt, b: BigInt) -> (BigInt, BigInt, BigInt) {
2041    let mut old_r = a;
2042    let mut r = b;
2043    let mut old_s = BigInt::one();
2044    let mut s = BigInt::zero();
2045    let mut old_t = BigInt::zero();
2046    let mut t = BigInt::one();
2047
2048    while r != BigInt::zero() {
2049        let q = &old_r / &r;
2050
2051        let new_r = &old_r - &q * &r;
2052        old_r = r;
2053        r = new_r;
2054
2055        let new_s = &old_s - &q * &s;
2056        old_s = s;
2057        s = new_s;
2058
2059        let new_t = &old_t - &q * &t;
2060        old_t = t;
2061        t = new_t;
2062    }
2063    (old_r, old_s, old_t)
2064}
2065
2066fn derive_exchange_key(local: &str, peer: &str, label: &[u8], secret: &BigUint) -> Vec<u8> {
2067    let mut ikm = secret.to_bytes_be();
2068    if ikm.is_empty() {
2069        ikm.push(0);
2070    }
2071    derive_exchange_key_from_bytes(local, peer, label, &ikm)
2072}
2073
2074fn derive_exchange_key_from_bytes(local: &str, peer: &str, label: &[u8], secret: &[u8]) -> Vec<u8> {
2075    let (left, right) = if local <= peer {
2076        (local.as_bytes(), peer.as_bytes())
2077    } else {
2078        (peer.as_bytes(), local.as_bytes())
2079    };
2080
2081    let hk = Hkdf::<Sha256>::new(Some(b"cmr-v1-key-exchange"), secret);
2082    let mut info = Vec::with_capacity(3 + label.len() + left.len() + right.len());
2083    info.extend_from_slice(b"cmr");
2084    info.push(0);
2085    info.extend_from_slice(label);
2086    info.push(0);
2087    info.extend_from_slice(left);
2088    info.push(0);
2089    info.extend_from_slice(right);
2090
2091    let mut out = [0_u8; 32];
2092    hk.expand(&info, &mut out)
2093        .expect("HKDF expand length is fixed and valid");
2094    out.to_vec()
2095}
2096
2097fn is_probably_safe_prime(p: &BigUint, rounds: usize) -> bool {
2098    if !is_probably_prime(p, rounds) {
2099        return false;
2100    }
2101    let one = BigUint::one();
2102    let two = BigUint::from(2_u8);
2103    if p <= &two {
2104        return false;
2105    }
2106    let q = (p - &one) >> 1;
2107    is_probably_prime(&q, rounds)
2108}
2109
2110fn is_probably_prime(n: &BigUint, rounds: usize) -> bool {
2111    let two = BigUint::from(2_u8);
2112    let three = BigUint::from(3_u8);
2113    if n < &two {
2114        return false;
2115    }
2116    if n == &two || n == &three {
2117        return true;
2118    }
2119    if (n % &two).is_zero() {
2120        return false;
2121    }
2122
2123    let one = BigUint::one();
2124    let n_minus_one = n - &one;
2125    let mut d = n_minus_one.clone();
2126    let mut s = 0_u32;
2127    while (&d % &two).is_zero() {
2128        d >>= 1;
2129        s = s.saturating_add(1);
2130    }
2131
2132    const BASES: [u8; 12] = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37];
2133    for &base in &BASES {
2134        let a = BigUint::from(base);
2135        if a >= n_minus_one {
2136            continue;
2137        }
2138        if is_miller_rabin_witness(n, &d, s, &a) {
2139            return false;
2140        }
2141    }
2142
2143    let three = BigUint::from(3_u8);
2144    let n_minus_three = n - &three;
2145    for _ in 0..rounds {
2146        let Some(offset) = random_nonzero_biguint_below(&n_minus_three) else {
2147            return false;
2148        };
2149        let a = offset + &two;
2150        if is_miller_rabin_witness(n, &d, s, &a) {
2151            return false;
2152        }
2153    }
2154
2155    true
2156}
2157
2158fn is_miller_rabin_witness(n: &BigUint, d: &BigUint, s: u32, a: &BigUint) -> bool {
2159    let one = BigUint::one();
2160    let n_minus_one = n - &one;
2161    let mut x = mod_pow(a, d, n);
2162    if x == one || x == n_minus_one {
2163        return false;
2164    }
2165    for _ in 1..s {
2166        x = (&x * &x) % n;
2167        if x == n_minus_one {
2168            return false;
2169        }
2170    }
2171    true
2172}
2173
2174#[cfg(test)]
2175mod tests {
2176    use super::*;
2177
2178    struct StubOracle;
2179
2180    impl CompressionOracle for StubOracle {
2181        fn compression_distance(
2182            &self,
2183            _left: &[u8],
2184            _right: &[u8],
2185        ) -> Result<f64, CompressionError> {
2186            Ok(0.4)
2187        }
2188
2189        fn intrinsic_dependence(
2190            &self,
2191            _data: &[u8],
2192            _max_order: i64,
2193        ) -> Result<f64, CompressionError> {
2194            Ok(0.5)
2195        }
2196    }
2197
2198    fn now() -> CmrTimestamp {
2199        CmrTimestamp::parse("2030/01/01 00:00:10").expect("ts")
2200    }
2201
2202    #[test]
2203    fn accepts_minimal_message() {
2204        let policy = RoutingPolicy::default();
2205        let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
2206        let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
2207        let outcome = router.process_incoming(raw, TransportKind::Http, now());
2208        assert!(outcome.accepted);
2209        assert!(outcome.drop_reason.is_none());
2210    }
2211
2212    #[test]
2213    fn rejects_duplicate_id() {
2214        let policy = RoutingPolicy::default();
2215        let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
2216        let raw = b"0\r\n2029/12/31 23:59:59 http://alice\r\n\r\n5\r\nhello";
2217        let first = router.process_incoming(raw, TransportKind::Http, now());
2218        assert!(first.accepted);
2219        let second = router.process_incoming(raw, TransportKind::Http, now());
2220        assert!(!second.accepted);
2221        assert!(matches!(
2222            second.drop_reason,
2223            Some(ProcessError::DuplicateMessageId)
2224        ));
2225    }
2226
2227    #[test]
2228    fn local_client_processing_allows_local_sender_while_network_ingress_rejects_it() {
2229        let policy = RoutingPolicy::default();
2230        let mut router = Router::new("http://bob/".to_owned(), policy, StubOracle);
2231        let local_sender = b"0\r\n2029/12/31 23:59:59 http://bob/\r\n\r\n2\r\nhi";
2232
2233        let ingress = router.process_incoming(local_sender, TransportKind::Http, now());
2234        assert!(!ingress.accepted);
2235        assert!(matches!(
2236            ingress.drop_reason,
2237            Some(ProcessError::Parse(
2238                crate::protocol::ParseError::RecipientAddressInHeader
2239            ))
2240        ));
2241
2242        let local = router.process_local_client_message(local_sender, TransportKind::Http, now());
2243        assert!(local.accepted);
2244        assert!(local.drop_reason.is_none());
2245    }
2246
2247    #[test]
2248    fn cache_inserts_messages_in_unsigned_canonical_form() {
2249        let mut cache = MessageCache::new(16, 1024 * 1024);
2250        let mut message = CmrMessage {
2251            signature: Signature::Unsigned,
2252            header: vec![MessageId {
2253                timestamp: CmrTimestamp::parse("2029/12/31 23:59:59").expect("timestamp"),
2254                address: "http://alice".to_owned(),
2255            }],
2256            body: b"payload".to_vec(),
2257        };
2258        message.sign_with_key(b"shared-key");
2259        assert!(matches!(message.signature, Signature::Sha256(_)));
2260
2261        let key = cache_key(&message);
2262        cache.insert(message);
2263        let stored = cache.entries.get(&key).expect("cached entry");
2264        assert!(matches!(stored.message.signature, Signature::Unsigned));
2265        assert!(stored.message.to_bytes().starts_with(b"0\r\n"));
2266    }
2267
2268    #[test]
2269    fn forward_timestamp_is_strictly_newer_than_existing_header() {
2270        let policy = RoutingPolicy::default();
2271        let mut router = Router::new("http://bob".to_owned(), policy, StubOracle);
2272        let now = CmrTimestamp::parse("2030/01/01 00:00:10.000000001").expect("now");
2273        let newest_existing = CmrTimestamp::parse("2030/01/01 00:00:10.9").expect("existing");
2274        let forwarded = router.next_forward_timestamp(&now, Some(&newest_existing));
2275        assert!(forwarded > newest_existing);
2276    }
2277
2278    #[test]
2279    fn local_peer_alias_rejects_prefix_collisions() {
2280        let policy = RoutingPolicy::default();
2281        let router = Router::new("http://peer.example/cmr".to_owned(), policy, StubOracle);
2282        assert!(!router.is_local_peer_alias("http://peer.example/cmr-admin"));
2283    }
2284
2285    #[test]
2286    fn local_peer_alias_accepts_same_origin_subpath() {
2287        let policy = RoutingPolicy::default();
2288        let router = Router::new("http://peer.example/cmr".to_owned(), policy, StubOracle);
2289        assert!(router.is_local_peer_alias("http://peer.example/cmr/inbox"));
2290    }
2291}