1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3
4use async_trait::async_trait;
5use ed25519_dalek::Signature;
6
7use crate::crypto::{identity::NodeCredentials, KeyExchange, SessionKeys, X25519KeyExchange};
8use crate::handshake::{
9 client::ClientHandshake, server::ServerHandshake, ChallengeAuthenticator, HandshakeContext,
10 HandshakeError, HandshakeOutcome, HandshakeParticipant, HandshakeTransport,
11};
12use crate::messages::{CapabilitySet, DeviceIdentity, SessionEstablished};
13use crate::profile::CompiledStreamProfile;
14
15pub mod state;
16use state::{SessionState, SessionStateError};
17
18impl From<SessionStateError> for HandshakeError {
19 fn from(err: SessionStateError) -> Self {
20 HandshakeError::Protocol(err.to_string())
21 }
22}
23
24#[derive(Debug, Clone, Copy, PartialEq)]
25pub enum AlnpRole {
26 Controller,
27 Node,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq)]
31pub enum JitterStrategy {
32 HoldLast,
33 Drop,
34 Lerp,
35}
36
37#[derive(Debug, Clone)]
38pub struct AlnpSession {
39 pub role: AlnpRole,
40 state: Arc<Mutex<SessionState>>,
41 last_keepalive: Arc<Mutex<Instant>>,
42 jitter: Arc<Mutex<JitterStrategy>>,
43 streaming_enabled: Arc<Mutex<bool>>,
44 timeout: Duration,
45 session_established: Arc<Mutex<Option<SessionEstablished>>>,
46 session_keys: Arc<Mutex<Option<SessionKeys>>>,
47 compiled_profile: Arc<Mutex<Option<CompiledStreamProfile>>>,
48 profile_locked: Arc<Mutex<bool>>,
49}
50
51impl AlnpSession {
52 pub fn new(role: AlnpRole) -> Self {
53 Self {
54 role,
55 state: Arc::new(Mutex::new(SessionState::Init)),
56 last_keepalive: Arc::new(Mutex::new(Instant::now())),
57 jitter: Arc::new(Mutex::new(JitterStrategy::HoldLast)),
58 streaming_enabled: Arc::new(Mutex::new(true)),
59 timeout: Duration::from_secs(10),
60 session_established: Arc::new(Mutex::new(None)),
61 session_keys: Arc::new(Mutex::new(None)),
62 compiled_profile: Arc::new(Mutex::new(None)),
63 profile_locked: Arc::new(Mutex::new(false)),
64 }
65 }
66
67 pub fn established(&self) -> Option<SessionEstablished> {
68 self.session_established.lock().ok().and_then(|s| s.clone())
69 }
70
71 pub fn keys(&self) -> Option<SessionKeys> {
72 self.session_keys.lock().ok().and_then(|k| k.clone())
73 }
74
75 pub fn state(&self) -> SessionState {
76 self.state
77 .lock()
78 .map(|g| g.clone())
79 .unwrap_or(SessionState::Failed("state poisoned".to_string()))
80 }
81
82 pub fn ensure_streaming_ready(&self) -> Result<SessionEstablished, HandshakeError> {
83 let state = self.state();
84 match state {
85 SessionState::Ready { .. } | SessionState::Streaming { .. } => {
86 self.established().ok_or_else(|| {
87 HandshakeError::Authentication(
88 "session missing even though state is ready".into(),
89 )
90 })
91 }
92 SessionState::Failed(reason) => Err(HandshakeError::Authentication(reason)),
93 _ => Err(HandshakeError::Authentication(
94 "session not ready; streaming blocked".into(),
95 )),
96 }
97 }
98
99 pub fn update_keepalive(&self) {
100 if let Ok(mut k) = self.last_keepalive.lock() {
101 *k = Instant::now();
102 }
103 }
104
105 pub fn check_timeouts(&self) -> Result<(), HandshakeError> {
106 let now = Instant::now();
107 if let Ok(state) = self.state.lock() {
108 if state.check_timeout(self.timeout, now) {
109 self.fail("session timeout".into());
110 return Err(HandshakeError::Transport("session timeout".into()));
111 }
112 }
113 Ok(())
114 }
115
116 pub fn set_stream_profile(&self, profile: CompiledStreamProfile) -> Result<(), HandshakeError> {
120 let locked = self
121 .profile_locked
122 .lock()
123 .map_err(|_| HandshakeError::Protocol("profile lock poisoned".into()))?;
124 if *locked {
125 return Err(HandshakeError::Protocol(
126 "stream profile cannot be changed after streaming starts".into(),
127 ));
128 }
129 let mut compiled = self
130 .compiled_profile
131 .lock()
132 .map_err(|_| HandshakeError::Protocol("compiled profile lock poisoned".into()))?;
133 *compiled = Some(profile);
134 Ok(())
135 }
136
137 #[must_use]
141 pub fn profile_config_id(&self) -> Option<String> {
142 self.compiled_profile
143 .lock()
144 .ok()
145 .and_then(|guard| guard.clone().map(|profile| profile.config_id().to_string()))
146 }
147
148 #[must_use]
152 pub fn compiled_profile(&self) -> Option<CompiledStreamProfile> {
153 self.compiled_profile
154 .lock()
155 .ok()
156 .and_then(|guard| guard.clone())
157 }
158
159 #[cfg(test)]
160 pub(crate) fn set_locked_profile_for_testing(&self, profile: CompiledStreamProfile) {
161 let mut compiled = self.compiled_profile.lock().unwrap();
162 *compiled = Some(profile);
163 *self.profile_locked.lock().unwrap() = true;
164 }
165
166 pub fn set_jitter_strategy(&self, strat: JitterStrategy) {
167 if let Ok(mut j) = self.jitter.lock() {
168 *j = strat;
169 }
170 }
171
172 pub fn jitter_strategy(&self) -> JitterStrategy {
173 self.jitter
174 .lock()
175 .map(|j| *j)
176 .unwrap_or(JitterStrategy::Drop)
177 }
178
179 pub fn close(&self) {
180 if let Ok(mut state) = self.state.lock() {
181 *state = SessionState::Closed;
182 }
183 }
184
185 pub fn fail(&self, reason: String) {
186 if let Ok(mut state) = self.state.lock() {
187 *state = SessionState::Failed(reason);
188 }
189 }
190
191 fn transition(&self, next: SessionState) -> Result<(), SessionStateError> {
192 let mut state = self.state.lock().unwrap();
193 let current = state.clone();
194 *state = current.transition(next)?;
195 Ok(())
196 }
197
198 pub fn set_streaming_enabled(&self, enabled: bool) {
199 if let Ok(mut flag) = self.streaming_enabled.lock() {
200 *flag = enabled;
201 }
202 }
203
204 pub fn mark_streaming(&self) {
205 if let Ok(mut state) = self.state.lock() {
206 let current = state.clone();
207 if let SessionState::Ready { .. } = current {
208 let _ = current
209 .transition(SessionState::Streaming {
210 since: Instant::now(),
211 })
212 .map(|next| *state = next);
213 }
214 }
215 if let Ok(mut locked) = self.profile_locked.lock() {
216 *locked = true;
217 }
218 }
219
220 pub fn streaming_enabled(&self) -> bool {
221 self.streaming_enabled.lock().map(|f| *f).unwrap_or(false)
222 }
223
224 fn apply_outcome(&self, outcome: HandshakeOutcome) {
225 if let Ok(mut guard) = self.session_established.lock() {
226 *guard = Some(outcome.established);
227 }
228 if let Ok(mut guard) = self.session_keys.lock() {
229 *guard = Some(outcome.keys);
230 }
231 }
232
233 pub async fn connect<T, A, K>(
234 identity: DeviceIdentity,
235 capabilities: CapabilitySet,
236 authenticator: A,
237 key_exchange: K,
238 context: HandshakeContext,
239 transport: &mut T,
240 ) -> Result<Self, HandshakeError>
241 where
242 T: HandshakeTransport + Send,
243 A: ChallengeAuthenticator + Send + Sync,
244 K: KeyExchange + Send + Sync,
245 {
246 let session = Self::new(AlnpRole::Controller);
247 session.transition(SessionState::Handshake)?;
248 let driver = ClientHandshake {
249 identity,
250 capabilities,
251 authenticator,
252 key_exchange,
253 context,
254 };
255
256 let outcome = driver.run(transport).await?;
257 session.transition(SessionState::Authenticated {
258 since: Instant::now(),
259 })?;
260 session.transition(SessionState::Ready {
261 since: Instant::now(),
262 })?;
263 session.apply_outcome(outcome);
264 Ok(session)
265 }
266
267 pub async fn accept<T, A, K>(
268 identity: DeviceIdentity,
269 capabilities: CapabilitySet,
270 authenticator: A,
271 key_exchange: K,
272 context: HandshakeContext,
273 transport: &mut T,
274 ) -> Result<Self, HandshakeError>
275 where
276 T: HandshakeTransport + Send,
277 A: ChallengeAuthenticator + Send + Sync,
278 K: KeyExchange + Send + Sync,
279 {
280 let session = Self::new(AlnpRole::Node);
281 session.transition(SessionState::Handshake)?;
282 let driver = ServerHandshake {
283 identity,
284 capabilities,
285 authenticator,
286 key_exchange,
287 context,
288 };
289
290 let outcome = driver.run(transport).await?;
291 session.transition(SessionState::Authenticated {
292 since: Instant::now(),
293 })?;
294 session.transition(SessionState::Ready {
295 since: Instant::now(),
296 })?;
297 session.apply_outcome(outcome);
298 Ok(session)
299 }
300}
301
302pub struct StaticKeyAuthenticator {
304 secret: Vec<u8>,
305}
306
307impl StaticKeyAuthenticator {
308 pub fn new(secret: Vec<u8>) -> Self {
309 Self { secret }
310 }
311}
312
313impl Default for StaticKeyAuthenticator {
314 fn default() -> Self {
315 Self::new(b"default-alnp-secret".to_vec())
316 }
317}
318
319impl ChallengeAuthenticator for StaticKeyAuthenticator {
320 fn sign_challenge(&self, nonce: &[u8]) -> Vec<u8> {
321 let mut sig = Vec::with_capacity(self.secret.len() + nonce.len());
322 sig.extend_from_slice(&self.secret);
323 sig.extend_from_slice(nonce);
324 sig
325 }
326
327 fn verify_challenge(&self, nonce: &[u8], signature: &[u8]) -> bool {
328 signature.ends_with(nonce) && signature.starts_with(&self.secret)
329 }
330}
331
332pub struct Ed25519Authenticator {
334 creds: NodeCredentials,
335}
336
337impl Ed25519Authenticator {
338 pub fn new(creds: NodeCredentials) -> Self {
339 Self { creds }
340 }
341}
342
343impl ChallengeAuthenticator for Ed25519Authenticator {
344 fn sign_challenge(&self, nonce: &[u8]) -> Vec<u8> {
345 self.creds.sign(nonce).to_vec()
346 }
347
348 fn verify_challenge(&self, nonce: &[u8], signature: &[u8]) -> bool {
349 if let Ok(sig) = Signature::from_slice(signature) {
350 self.creds.verify(nonce, &sig)
351 } else {
352 false
353 }
354 }
355
356 fn identity_verifying_key(&self) -> Option<Vec<u8>> {
357 Some(self.creds.verifying.to_bytes().to_vec())
358 }
359}
360
361pub struct LoopbackTransport {
363 inbox: Vec<crate::handshake::HandshakeMessage>,
364}
365
366impl LoopbackTransport {
367 pub fn new() -> Self {
368 Self { inbox: Vec::new() }
369 }
370}
371
372#[cfg(test)]
373mod session_tests {
374 use super::*;
375 use crate::StreamProfile;
376
377 #[test]
378 fn profile_lock_prevents_profile_swaps() {
379 let session = AlnpSession::new(AlnpRole::Controller);
380 let compiled = StreamProfile::auto().compile().unwrap();
381 session.set_stream_profile(compiled.clone()).unwrap();
382 session.mark_streaming();
383 assert!(session.set_stream_profile(compiled).is_err());
384 }
385
386 #[test]
387 fn config_id_matches_profile() {
388 let session = AlnpSession::new(AlnpRole::Controller);
389 let compiled = StreamProfile::realtime().compile().unwrap();
390 session.set_stream_profile(compiled.clone()).unwrap();
391 assert_eq!(session.profile_config_id().unwrap(), compiled.config_id());
392 }
393
394 #[test]
395 fn config_id_stays_locked_after_streaming() {
396 let session = AlnpSession::new(AlnpRole::Controller);
397 let compiled = StreamProfile::install().compile().unwrap();
398 session.set_stream_profile(compiled.clone()).unwrap();
399 let before_config = session.profile_config_id().unwrap();
400 session.mark_streaming();
401 assert_eq!(session.profile_config_id().unwrap(), before_config);
402 assert!(session
403 .set_stream_profile(StreamProfile::default().compile().unwrap())
404 .is_err());
405 }
406}
407
408#[async_trait]
409impl HandshakeTransport for LoopbackTransport {
410 async fn send(
411 &mut self,
412 msg: crate::handshake::HandshakeMessage,
413 ) -> Result<(), HandshakeError> {
414 self.inbox.push(msg);
415 Ok(())
416 }
417
418 async fn recv(&mut self) -> Result<crate::handshake::HandshakeMessage, HandshakeError> {
419 if self.inbox.is_empty() {
420 return Err(HandshakeError::Transport("loopback queue empty".into()));
421 }
422 Ok(self.inbox.remove(0))
423 }
424}
425
426pub async fn example_controller_session<T: HandshakeTransport + Send>(
428 identity: DeviceIdentity,
429 transport: &mut T,
430) -> Result<AlnpSession, HandshakeError> {
431 AlnpSession::connect(
432 identity,
433 CapabilitySet::default(),
434 StaticKeyAuthenticator::default(),
435 X25519KeyExchange::new(),
436 HandshakeContext::default(),
437 transport,
438 )
439 .await
440}
441
442pub async fn example_node_session<T: HandshakeTransport + Send>(
444 identity: DeviceIdentity,
445 transport: &mut T,
446) -> Result<AlnpSession, HandshakeError> {
447 AlnpSession::accept(
448 identity,
449 CapabilitySet::default(),
450 StaticKeyAuthenticator::default(),
451 X25519KeyExchange::new(),
452 HandshakeContext::default(),
453 transport,
454 )
455 .await
456}