Skip to main content

srx/
node.rs

1//! High-level entry point: ties handshake, session, and pipeline into a single API.
2//!
3//! [`SrxNode`] is the primary type an application uses to establish a connection
4//! and send/receive data through the full SRX protocol stack.
5//!
6//! ```text
7//! Application
8//!   └─ SrxNode
9//!        ├─ Handshake  (Kyber + X25519 + optional Ed25519)
10//!        ├─ Session    (seed, data key, packet counter)
11//!        └─ SrxPipeline (pad → encrypt → frame → mimicry → jitter → transport)
12//! ```
13
14use std::sync::Arc;
15
16use crate::client::policy::TransportPolicy;
17use crate::client::selfheal::SelfHealing;
18use crate::config::SrxConfig;
19use crate::crypto::{AeadPipeline, ReplayState};
20use crate::error::Result;
21use crate::pipeline::{Payload, SrxPipeline};
22use crate::replay_storage::{
23    ReplayStoreMetricsSnapshot, decode_replay_envelope, merge_and_persist_replay_state,
24    replay_store_metrics_snapshot, storage_from_config,
25};
26use crate::session::{Handshake, Session};
27use crate::signaling::inband::Signal;
28use crate::transport::{TransportKind, TransportManager};
29
30/// High-level SRX protocol node (client or server).
31///
32/// Holds the active [`SrxPipeline`] and exposes simple `send` / `recv_from` methods.
33pub struct SrxNode {
34    config: SrxConfig,
35    pipe: SrxPipeline,
36    self_healing: SelfHealing,
37    policy: TransportPolicy,
38}
39
40impl SrxNode {
41    /// Perform a client-side handshake and build the pipeline.
42    ///
43    /// `exchange` is a closure that carries the three handshake messages over the
44    /// network (transport-agnostic so the caller can use any channel).
45    ///
46    /// ```text
47    /// exchange(ClientHello) → ServerHello
48    /// exchange(ClientFinished) → ()
49    /// ```
50    pub fn client_connect<F>(
51        config: SrxConfig,
52        transport_mgr: TransportManager,
53        exchange: F,
54    ) -> Result<Self>
55    where
56        F: FnOnce(&[u8]) -> Result<(Vec<u8>, Vec<u8>)>,
57    {
58        let mut hs = Handshake::new_initiator();
59        let ch = hs.client_hello()?;
60        let (sh, _ack) = exchange(&ch)?;
61        let cf = hs.finalize(&sh)?;
62        // The caller is expected to send `cf` and receive an ack via `exchange`,
63        // but the second leg returns `_ack` which we ignore (server derives master
64        // secret from cf internally).
65        let _ = cf; // cf was already computed; drop silently
66
67        let master = hs.master_secret().ok_or_else(|| {
68            crate::error::SrxError::Session(crate::error::SessionError::HandshakeFailed(
69                "master secret not available after finalize".into(),
70            ))
71        })?;
72
73        Self::from_master_secret(config, master, transport_mgr)
74    }
75
76    /// Build a node directly from a shared master secret (e.g. after an external
77    /// handshake or for testing).
78    pub fn from_master_secret(
79        config: SrxConfig,
80        master: [u8; 32],
81        transport_mgr: TransportManager,
82    ) -> Result<Self> {
83        let timestamp = std::time::SystemTime::now()
84            .duration_since(std::time::UNIX_EPOCH)
85            .unwrap_or_default()
86            .as_secs();
87
88        let session = Session::from_master_secret(0, &master, timestamp, b"srx-node")?;
89        let aead = Arc::new(config.build_aead_pipeline(&session.data_key)?);
90        let pipe = SrxPipeline::from_config(&config, session, aead, transport_mgr);
91
92        let node = Self {
93            config,
94            pipe,
95            self_healing: SelfHealing::new(),
96            policy: TransportPolicy::default(),
97        };
98        node.restore_replay_state_from_disk()?;
99        Ok(node)
100    }
101
102    /// Build a node from a pre-established [`Session`] and [`AeadPipeline`].
103    pub fn from_session(
104        config: SrxConfig,
105        session: Session,
106        aead: Arc<AeadPipeline>,
107        transport_mgr: TransportManager,
108    ) -> Result<Self> {
109        let pipe = SrxPipeline::from_config(&config, session, aead, transport_mgr);
110        let node = Self {
111            config,
112            pipe,
113            self_healing: SelfHealing::new(),
114            policy: TransportPolicy::default(),
115        };
116        node.restore_replay_state_from_disk()?;
117        Ok(node)
118    }
119
120    /// Send application payload through the full protocol stack.
121    pub async fn send(&mut self, payload: &[u8]) -> Result<TransportKind> {
122        self.pipe.send(payload).await
123    }
124
125    /// Receive and decrypt from a specific transport.
126    pub async fn recv_from(&mut self, kind: TransportKind) -> Result<Vec<u8>> {
127        let payload = self.pipe.recv_from(kind).await?;
128        self.persist_replay_state_to_disk()?;
129        Ok(payload)
130    }
131
132    /// Process raw bytes received externally (e.g. from a worker queue).
133    pub fn process_incoming(&self, envelope: &[u8]) -> Result<Vec<u8>> {
134        let payload = self.pipe.process_incoming(envelope)?;
135        self.persist_replay_state_to_disk()?;
136        Ok(payload)
137    }
138
139    /// Prepare outgoing bytes without dispatching (for custom transport handling).
140    pub fn prepare_outgoing(&mut self, payload: &[u8]) -> Result<Vec<u8>> {
141        self.pipe.prepare_outgoing(payload)
142    }
143
144    /// Access the underlying pipeline.
145    pub fn pipeline(&self) -> &SrxPipeline {
146        &self.pipe
147    }
148
149    /// Mutable access to the underlying pipeline.
150    pub fn pipeline_mut(&mut self) -> &mut SrxPipeline {
151        &mut self.pipe
152    }
153
154    /// Access the node configuration.
155    pub fn config(&self) -> &SrxConfig {
156        &self.config
157    }
158
159    fn replay_persistence_enabled(&self) -> bool {
160        self.config.replay.persist_enabled
161    }
162
163    fn restore_replay_state_from_disk(&self) -> Result<()> {
164        if !self.replay_persistence_enabled() {
165            return Ok(());
166        }
167        let storage = storage_from_config(&self.config.replay)?;
168        let Some(raw) = storage.load_raw(&self.config.replay)? else {
169            return Ok(());
170        };
171        let Some(state) =
172            decode_replay_envelope(&self.config.replay, &self.replay_session_binding(), &raw)?
173        else {
174            return Ok(());
175        };
176        self.pipe.set_replay_state(&state)
177    }
178
179    fn replay_session_binding(&self) -> String {
180        use sha2::{Digest, Sha256};
181        let seed = self.pipe.session.rng.seed_bytes();
182        let digest = Sha256::digest(seed);
183        let mut out = String::with_capacity(24);
184        for b in digest.iter().take(12) {
185            out.push(char::from(b"0123456789abcdef"[(b >> 4) as usize]));
186            out.push(char::from(b"0123456789abcdef"[(b & 0x0f) as usize]));
187        }
188        out
189    }
190
191    fn persist_replay_state_to_disk(&self) -> Result<()> {
192        if !self.replay_persistence_enabled() {
193            return Ok(());
194        }
195        let state = self.replay_state();
196        let storage = storage_from_config(&self.config.replay)?;
197        merge_and_persist_replay_state(
198            &self.config.replay,
199            storage.as_ref(),
200            &self.replay_session_binding(),
201            state,
202        )
203    }
204
205    /// Snapshot anti-replay state so it can be restored after a restart.
206    pub fn replay_state(&self) -> ReplayState {
207        self.pipe.replay_state()
208    }
209
210    /// Restore anti-replay state captured from a previous process instance.
211    pub fn set_replay_state(&self, state: &ReplayState) -> Result<()> {
212        self.pipe.set_replay_state(state)?;
213        self.persist_replay_state_to_disk()
214    }
215
216    // ── In-band signaling ───────────────────────────────────────────────
217
218    /// Send a control signal through the full pipeline (encrypted, framed,
219    /// mimicry-wrapped — indistinguishable from data on the wire).
220    pub fn send_signal(&mut self, signal: &Signal) -> Result<Vec<u8>> {
221        self.pipe.prepare_signal(signal)
222    }
223
224    /// Process raw bytes and dispatch into [`Payload::Data`] or [`Payload::Signal`].
225    pub fn process_incoming_dispatched(&self, envelope: &[u8]) -> Result<Payload> {
226        let payload = self.process_incoming(envelope)?;
227        match self.pipe.try_decode_signal(&payload) {
228            Some(sig) => Ok(Payload::Signal(sig)),
229            None => Ok(Payload::Data(payload)),
230        }
231    }
232
233    // ── Self-healing ────────────────────────────────────────────────────
234
235    /// Check if self-healing should trigger and, if so, reseed and return
236    /// a new transport order.
237    ///
238    /// Returns `Some(order)` when healing was performed, `None` otherwise.
239    pub fn heal_if_needed(&mut self) -> Option<Vec<TransportKind>> {
240        if !self.self_healing.should_heal(self.pipe.transport_mgr()) {
241            return None;
242        }
243        // Snapshot health data before taking &mut session.rng (avoids
244        // overlapping borrows on `self.pipe`).
245        let healthy = self.pipe.transport_mgr().healthy_kinds();
246        let active = self.pipe.transport_mgr().active_kinds();
247        let blocked: Vec<_> = active
248            .iter()
249            .filter(|k| !healthy.contains(k))
250            .copied()
251            .collect();
252
253        // Reseed RNG + bump backoff (separate from TransportManager borrow).
254        self.self_healing.reseed_only(&mut self.pipe.session.rng);
255
256        // Build the order: healthy first (policy-sorted), then blocked.
257        let mut order = self.policy.recommend(&healthy);
258        order.extend(self.policy.recommend(&blocked));
259        Some(order)
260    }
261
262    /// Notify the self-healing controller that traffic succeeded.
263    pub fn record_success(&mut self) {
264        self.self_healing.record_success();
265    }
266
267    /// Number of self-healing events triggered so far.
268    pub fn heal_count(&self) -> u32 {
269        self.self_healing.heal_count
270    }
271
272    // ── Transport policy ────────────────────────────────────────────────
273
274    /// Set the network environment for transport policy.
275    pub fn set_environment(&mut self, env: crate::client::policy::NetworkEnvironment) {
276        self.policy.set_environment(env);
277    }
278
279    /// Access the transport policy.
280    pub fn policy(&self) -> &TransportPolicy {
281        &self.policy
282    }
283
284    /// Snapshot CAS contention metrics for replay-store persistence.
285    pub fn replay_store_metrics() -> ReplayStoreMetricsSnapshot {
286        replay_store_metrics_snapshot()
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use crate::client::policy::NetworkEnvironment;
294    use crate::config::SrxConfig;
295    use crate::pipeline::Payload;
296    use crate::signaling::inband::Signal;
297    use tempfile::TempDir;
298
299    fn make_pair() -> (SrxNode, SrxNode) {
300        let master = [0xBBu8; 32];
301        let mut config = SrxConfig::default();
302        config.replay.persist_enabled = false;
303        let sender =
304            SrxNode::from_master_secret(config.clone(), master, TransportManager::new()).unwrap();
305        let receiver =
306            SrxNode::from_master_secret(config, master, TransportManager::new()).unwrap();
307        (sender, receiver)
308    }
309
310    #[test]
311    fn from_master_secret_builds_node() {
312        let mut config = SrxConfig::default();
313        config.replay.persist_enabled = false;
314        let master = [0xAAu8; 32];
315        let node = SrxNode::from_master_secret(config, master, TransportManager::new()).unwrap();
316        assert!(node.pipeline().session.active);
317    }
318
319    #[test]
320    fn node_prepare_process_roundtrip() {
321        let (mut sender, receiver) = make_pair();
322        let envelope = sender.prepare_outgoing(b"node-test").unwrap();
323        let recovered = receiver.process_incoming(&envelope).unwrap();
324        assert_eq!(recovered, b"node-test");
325    }
326
327    #[test]
328    fn signal_roundtrip_through_node() {
329        let (mut sender, receiver) = make_pair();
330        let envelope = sender.send_signal(&Signal::SeedRotation).unwrap();
331        let payload = receiver.process_incoming_dispatched(&envelope).unwrap();
332        match payload {
333            Payload::Signal(sig) => assert_eq!(sig, Signal::SeedRotation),
334            Payload::Data(_) => panic!("expected signal"),
335        }
336    }
337
338    #[test]
339    fn data_dispatched_as_data() {
340        let (mut sender, receiver) = make_pair();
341        let envelope = sender.prepare_outgoing(b"app-data").unwrap();
342        let payload = receiver.process_incoming_dispatched(&envelope).unwrap();
343        match payload {
344            Payload::Data(d) => assert_eq!(d, b"app-data"),
345            Payload::Signal(_) => panic!("expected data"),
346        }
347    }
348
349    #[test]
350    fn heal_if_needed_returns_none_when_healthy() {
351        let (mut node, _) = make_pair();
352        // No blocked transports → no healing needed.
353        assert!(node.heal_if_needed().is_none());
354        assert_eq!(node.heal_count(), 0);
355    }
356
357    #[test]
358    fn record_success_resets_backoff() {
359        let (mut node, _) = make_pair();
360        node.record_success();
361        // Just ensure it doesn't panic; backoff is internal.
362        assert_eq!(node.heal_count(), 0);
363    }
364
365    #[test]
366    fn set_environment_updates_policy() {
367        let (mut node, _) = make_pair();
368        node.set_environment(NetworkEnvironment::Corporate);
369        assert_eq!(node.policy().environment(), NetworkEnvironment::Corporate);
370    }
371
372    #[test]
373    fn replay_state_snapshot_restore_on_node() {
374        let (mut sender, receiver) = make_pair();
375
376        let env1 = sender.prepare_outgoing(b"r1").unwrap();
377        let env2 = sender.prepare_outgoing(b"r2").unwrap();
378
379        assert_eq!(receiver.process_incoming(&env1).unwrap(), b"r1");
380        assert_eq!(receiver.process_incoming(&env2).unwrap(), b"r2");
381
382        let state = receiver.replay_state();
383        let restored = SrxNode::from_master_secret(
384            {
385                let mut cfg = SrxConfig::default();
386                cfg.replay.persist_enabled = false;
387                cfg
388            },
389            [0xBBu8; 32],
390            TransportManager::new(),
391        )
392        .unwrap();
393        restored.set_replay_state(&state).unwrap();
394
395        let replay = restored.process_incoming(&env2);
396        assert!(replay.is_err(), "restored node must reject duplicate");
397    }
398
399    #[test]
400    fn auto_persist_and_restore_replay_state() {
401        let temp = TempDir::new().unwrap();
402        let state_file = temp.path().join("replay_state.json");
403
404        let mut cfg = SrxConfig::default();
405        cfg.replay.persist_enabled = true;
406        cfg.replay.state_file = state_file.clone();
407
408        let master = [0xACu8; 32];
409        let mut sender =
410            SrxNode::from_master_secret(cfg.clone(), master, TransportManager::new()).unwrap();
411        let receiver_before =
412            SrxNode::from_master_secret(cfg.clone(), master, TransportManager::new()).unwrap();
413
414        let envelope = sender.prepare_outgoing(b"persisted").unwrap();
415        assert_eq!(
416            receiver_before.process_incoming(&envelope).unwrap(),
417            b"persisted"
418        );
419        assert!(state_file.exists(), "replay state file must be created");
420
421        let receiver_after =
422            SrxNode::from_master_secret(cfg, master, TransportManager::new()).unwrap();
423        assert!(
424            receiver_after.process_incoming(&envelope).is_err(),
425            "restored node must reject duplicate from persisted state"
426        );
427    }
428}