1use std::sync::Arc;
15
16use crate::client::policy::TransportPolicy;
17use crate::client::selfheal::SelfHealing;
18use crate::config::SrxConfig;
19use crate::crypto::{AeadPipeline, ReplayState};
20use crate::error::Result;
21use crate::pipeline::{Payload, SrxPipeline};
22use crate::replay_storage::{
23 ReplayStoreMetricsSnapshot, decode_replay_envelope, merge_and_persist_replay_state,
24 replay_store_metrics_snapshot, storage_from_config,
25};
26use crate::session::{Handshake, Session};
27use crate::signaling::inband::Signal;
28use crate::transport::{TransportKind, TransportManager};
29
30pub struct SrxNode {
34 config: SrxConfig,
35 pipe: SrxPipeline,
36 self_healing: SelfHealing,
37 policy: TransportPolicy,
38}
39
40impl SrxNode {
41 pub fn client_connect<F>(
51 config: SrxConfig,
52 transport_mgr: TransportManager,
53 exchange: F,
54 ) -> Result<Self>
55 where
56 F: FnOnce(&[u8]) -> Result<(Vec<u8>, Vec<u8>)>,
57 {
58 let mut hs = Handshake::new_initiator();
59 let ch = hs.client_hello()?;
60 let (sh, _ack) = exchange(&ch)?;
61 let cf = hs.finalize(&sh)?;
62 let _ = cf; let master = hs.master_secret().ok_or_else(|| {
68 crate::error::SrxError::Session(crate::error::SessionError::HandshakeFailed(
69 "master secret not available after finalize".into(),
70 ))
71 })?;
72
73 Self::from_master_secret(config, master, transport_mgr)
74 }
75
76 pub fn from_master_secret(
79 config: SrxConfig,
80 master: [u8; 32],
81 transport_mgr: TransportManager,
82 ) -> Result<Self> {
83 let timestamp = std::time::SystemTime::now()
84 .duration_since(std::time::UNIX_EPOCH)
85 .unwrap_or_default()
86 .as_secs();
87
88 let session = Session::from_master_secret(0, &master, timestamp, b"srx-node")?;
89 let aead = Arc::new(config.build_aead_pipeline(&session.data_key)?);
90 let pipe = SrxPipeline::from_config(&config, session, aead, transport_mgr);
91
92 let node = Self {
93 config,
94 pipe,
95 self_healing: SelfHealing::new(),
96 policy: TransportPolicy::default(),
97 };
98 node.restore_replay_state_from_disk()?;
99 Ok(node)
100 }
101
102 pub fn from_session(
104 config: SrxConfig,
105 session: Session,
106 aead: Arc<AeadPipeline>,
107 transport_mgr: TransportManager,
108 ) -> Result<Self> {
109 let pipe = SrxPipeline::from_config(&config, session, aead, transport_mgr);
110 let node = Self {
111 config,
112 pipe,
113 self_healing: SelfHealing::new(),
114 policy: TransportPolicy::default(),
115 };
116 node.restore_replay_state_from_disk()?;
117 Ok(node)
118 }
119
120 pub async fn send(&mut self, payload: &[u8]) -> Result<TransportKind> {
122 self.pipe.send(payload).await
123 }
124
125 pub async fn recv_from(&mut self, kind: TransportKind) -> Result<Vec<u8>> {
127 let payload = self.pipe.recv_from(kind).await?;
128 self.persist_replay_state_to_disk()?;
129 Ok(payload)
130 }
131
132 pub fn process_incoming(&self, envelope: &[u8]) -> Result<Vec<u8>> {
134 let payload = self.pipe.process_incoming(envelope)?;
135 self.persist_replay_state_to_disk()?;
136 Ok(payload)
137 }
138
139 pub fn prepare_outgoing(&mut self, payload: &[u8]) -> Result<Vec<u8>> {
141 self.pipe.prepare_outgoing(payload)
142 }
143
144 pub fn pipeline(&self) -> &SrxPipeline {
146 &self.pipe
147 }
148
149 pub fn pipeline_mut(&mut self) -> &mut SrxPipeline {
151 &mut self.pipe
152 }
153
154 pub fn config(&self) -> &SrxConfig {
156 &self.config
157 }
158
159 fn replay_persistence_enabled(&self) -> bool {
160 self.config.replay.persist_enabled
161 }
162
163 fn restore_replay_state_from_disk(&self) -> Result<()> {
164 if !self.replay_persistence_enabled() {
165 return Ok(());
166 }
167 let storage = storage_from_config(&self.config.replay)?;
168 let Some(raw) = storage.load_raw(&self.config.replay)? else {
169 return Ok(());
170 };
171 let Some(state) =
172 decode_replay_envelope(&self.config.replay, &self.replay_session_binding(), &raw)?
173 else {
174 return Ok(());
175 };
176 self.pipe.set_replay_state(&state)
177 }
178
179 fn replay_session_binding(&self) -> String {
180 use sha2::{Digest, Sha256};
181 let seed = self.pipe.session.rng.seed_bytes();
182 let digest = Sha256::digest(seed);
183 let mut out = String::with_capacity(24);
184 for b in digest.iter().take(12) {
185 out.push(char::from(b"0123456789abcdef"[(b >> 4) as usize]));
186 out.push(char::from(b"0123456789abcdef"[(b & 0x0f) as usize]));
187 }
188 out
189 }
190
191 fn persist_replay_state_to_disk(&self) -> Result<()> {
192 if !self.replay_persistence_enabled() {
193 return Ok(());
194 }
195 let state = self.replay_state();
196 let storage = storage_from_config(&self.config.replay)?;
197 merge_and_persist_replay_state(
198 &self.config.replay,
199 storage.as_ref(),
200 &self.replay_session_binding(),
201 state,
202 )
203 }
204
205 pub fn replay_state(&self) -> ReplayState {
207 self.pipe.replay_state()
208 }
209
210 pub fn set_replay_state(&self, state: &ReplayState) -> Result<()> {
212 self.pipe.set_replay_state(state)?;
213 self.persist_replay_state_to_disk()
214 }
215
216 pub fn send_signal(&mut self, signal: &Signal) -> Result<Vec<u8>> {
221 self.pipe.prepare_signal(signal)
222 }
223
224 pub fn process_incoming_dispatched(&self, envelope: &[u8]) -> Result<Payload> {
226 let payload = self.process_incoming(envelope)?;
227 match self.pipe.try_decode_signal(&payload) {
228 Some(sig) => Ok(Payload::Signal(sig)),
229 None => Ok(Payload::Data(payload)),
230 }
231 }
232
233 pub fn heal_if_needed(&mut self) -> Option<Vec<TransportKind>> {
240 if !self.self_healing.should_heal(self.pipe.transport_mgr()) {
241 return None;
242 }
243 let healthy = self.pipe.transport_mgr().healthy_kinds();
246 let active = self.pipe.transport_mgr().active_kinds();
247 let blocked: Vec<_> = active
248 .iter()
249 .filter(|k| !healthy.contains(k))
250 .copied()
251 .collect();
252
253 self.self_healing.reseed_only(&mut self.pipe.session.rng);
255
256 let mut order = self.policy.recommend(&healthy);
258 order.extend(self.policy.recommend(&blocked));
259 Some(order)
260 }
261
262 pub fn record_success(&mut self) {
264 self.self_healing.record_success();
265 }
266
267 pub fn heal_count(&self) -> u32 {
269 self.self_healing.heal_count
270 }
271
272 pub fn set_environment(&mut self, env: crate::client::policy::NetworkEnvironment) {
276 self.policy.set_environment(env);
277 }
278
279 pub fn policy(&self) -> &TransportPolicy {
281 &self.policy
282 }
283
284 pub fn replay_store_metrics() -> ReplayStoreMetricsSnapshot {
286 replay_store_metrics_snapshot()
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use crate::client::policy::NetworkEnvironment;
294 use crate::config::SrxConfig;
295 use crate::pipeline::Payload;
296 use crate::signaling::inband::Signal;
297 use tempfile::TempDir;
298
299 fn make_pair() -> (SrxNode, SrxNode) {
300 let master = [0xBBu8; 32];
301 let mut config = SrxConfig::default();
302 config.replay.persist_enabled = false;
303 let sender =
304 SrxNode::from_master_secret(config.clone(), master, TransportManager::new()).unwrap();
305 let receiver =
306 SrxNode::from_master_secret(config, master, TransportManager::new()).unwrap();
307 (sender, receiver)
308 }
309
310 #[test]
311 fn from_master_secret_builds_node() {
312 let mut config = SrxConfig::default();
313 config.replay.persist_enabled = false;
314 let master = [0xAAu8; 32];
315 let node = SrxNode::from_master_secret(config, master, TransportManager::new()).unwrap();
316 assert!(node.pipeline().session.active);
317 }
318
319 #[test]
320 fn node_prepare_process_roundtrip() {
321 let (mut sender, receiver) = make_pair();
322 let envelope = sender.prepare_outgoing(b"node-test").unwrap();
323 let recovered = receiver.process_incoming(&envelope).unwrap();
324 assert_eq!(recovered, b"node-test");
325 }
326
327 #[test]
328 fn signal_roundtrip_through_node() {
329 let (mut sender, receiver) = make_pair();
330 let envelope = sender.send_signal(&Signal::SeedRotation).unwrap();
331 let payload = receiver.process_incoming_dispatched(&envelope).unwrap();
332 match payload {
333 Payload::Signal(sig) => assert_eq!(sig, Signal::SeedRotation),
334 Payload::Data(_) => panic!("expected signal"),
335 }
336 }
337
338 #[test]
339 fn data_dispatched_as_data() {
340 let (mut sender, receiver) = make_pair();
341 let envelope = sender.prepare_outgoing(b"app-data").unwrap();
342 let payload = receiver.process_incoming_dispatched(&envelope).unwrap();
343 match payload {
344 Payload::Data(d) => assert_eq!(d, b"app-data"),
345 Payload::Signal(_) => panic!("expected data"),
346 }
347 }
348
349 #[test]
350 fn heal_if_needed_returns_none_when_healthy() {
351 let (mut node, _) = make_pair();
352 assert!(node.heal_if_needed().is_none());
354 assert_eq!(node.heal_count(), 0);
355 }
356
357 #[test]
358 fn record_success_resets_backoff() {
359 let (mut node, _) = make_pair();
360 node.record_success();
361 assert_eq!(node.heal_count(), 0);
363 }
364
365 #[test]
366 fn set_environment_updates_policy() {
367 let (mut node, _) = make_pair();
368 node.set_environment(NetworkEnvironment::Corporate);
369 assert_eq!(node.policy().environment(), NetworkEnvironment::Corporate);
370 }
371
372 #[test]
373 fn replay_state_snapshot_restore_on_node() {
374 let (mut sender, receiver) = make_pair();
375
376 let env1 = sender.prepare_outgoing(b"r1").unwrap();
377 let env2 = sender.prepare_outgoing(b"r2").unwrap();
378
379 assert_eq!(receiver.process_incoming(&env1).unwrap(), b"r1");
380 assert_eq!(receiver.process_incoming(&env2).unwrap(), b"r2");
381
382 let state = receiver.replay_state();
383 let restored = SrxNode::from_master_secret(
384 {
385 let mut cfg = SrxConfig::default();
386 cfg.replay.persist_enabled = false;
387 cfg
388 },
389 [0xBBu8; 32],
390 TransportManager::new(),
391 )
392 .unwrap();
393 restored.set_replay_state(&state).unwrap();
394
395 let replay = restored.process_incoming(&env2);
396 assert!(replay.is_err(), "restored node must reject duplicate");
397 }
398
399 #[test]
400 fn auto_persist_and_restore_replay_state() {
401 let temp = TempDir::new().unwrap();
402 let state_file = temp.path().join("replay_state.json");
403
404 let mut cfg = SrxConfig::default();
405 cfg.replay.persist_enabled = true;
406 cfg.replay.state_file = state_file.clone();
407
408 let master = [0xACu8; 32];
409 let mut sender =
410 SrxNode::from_master_secret(cfg.clone(), master, TransportManager::new()).unwrap();
411 let receiver_before =
412 SrxNode::from_master_secret(cfg.clone(), master, TransportManager::new()).unwrap();
413
414 let envelope = sender.prepare_outgoing(b"persisted").unwrap();
415 assert_eq!(
416 receiver_before.process_incoming(&envelope).unwrap(),
417 b"persisted"
418 );
419 assert!(state_file.exists(), "replay state file must be created");
420
421 let receiver_after =
422 SrxNode::from_master_secret(cfg, master, TransportManager::new()).unwrap();
423 assert!(
424 receiver_after.process_incoming(&envelope).is_err(),
425 "restored node must reject duplicate from persisted state"
426 );
427 }
428}