alpine/session/
mod.rs

1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3
4use async_trait::async_trait;
5use ed25519_dalek::Signature;
6
7use crate::crypto::{identity::NodeCredentials, KeyExchange, SessionKeys, X25519KeyExchange};
8use crate::handshake::{
9    client::ClientHandshake, server::ServerHandshake, ChallengeAuthenticator, HandshakeContext,
10    HandshakeError, HandshakeOutcome, HandshakeParticipant, HandshakeTransport,
11};
12use crate::messages::{CapabilitySet, DeviceIdentity, SessionEstablished};
13use crate::profile::CompiledStreamProfile;
14
15pub mod state;
16use state::{SessionState, SessionStateError};
17
18impl From<SessionStateError> for HandshakeError {
19    fn from(err: SessionStateError) -> Self {
20        HandshakeError::Protocol(err.to_string())
21    }
22}
23
24#[derive(Debug, Clone, Copy, PartialEq)]
25pub enum AlnpRole {
26    Controller,
27    Node,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq)]
31pub enum JitterStrategy {
32    HoldLast,
33    Drop,
34    Lerp,
35}
36
37#[derive(Debug, Clone)]
38pub struct AlnpSession {
39    pub role: AlnpRole,
40    state: Arc<Mutex<SessionState>>,
41    last_keepalive: Arc<Mutex<Instant>>,
42    jitter: Arc<Mutex<JitterStrategy>>,
43    streaming_enabled: Arc<Mutex<bool>>,
44    timeout: Duration,
45    session_established: Arc<Mutex<Option<SessionEstablished>>>,
46    session_keys: Arc<Mutex<Option<SessionKeys>>>,
47    compiled_profile: Arc<Mutex<Option<CompiledStreamProfile>>>,
48    profile_locked: Arc<Mutex<bool>>,
49}
50
51impl AlnpSession {
52    pub fn new(role: AlnpRole) -> Self {
53        Self {
54            role,
55            state: Arc::new(Mutex::new(SessionState::Init)),
56            last_keepalive: Arc::new(Mutex::new(Instant::now())),
57            jitter: Arc::new(Mutex::new(JitterStrategy::HoldLast)),
58            streaming_enabled: Arc::new(Mutex::new(true)),
59            timeout: Duration::from_secs(10),
60            session_established: Arc::new(Mutex::new(None)),
61            session_keys: Arc::new(Mutex::new(None)),
62            compiled_profile: Arc::new(Mutex::new(None)),
63            profile_locked: Arc::new(Mutex::new(false)),
64        }
65    }
66
67    pub fn established(&self) -> Option<SessionEstablished> {
68        self.session_established.lock().ok().and_then(|s| s.clone())
69    }
70
71    pub fn keys(&self) -> Option<SessionKeys> {
72        self.session_keys.lock().ok().and_then(|k| k.clone())
73    }
74
75    pub fn state(&self) -> SessionState {
76        self.state
77            .lock()
78            .map(|g| g.clone())
79            .unwrap_or(SessionState::Failed("state poisoned".to_string()))
80    }
81
82    pub fn ensure_streaming_ready(&self) -> Result<SessionEstablished, HandshakeError> {
83        let state = self.state();
84        match state {
85            SessionState::Ready { .. } | SessionState::Streaming { .. } => {
86                self.established().ok_or_else(|| {
87                    HandshakeError::Authentication(
88                        "session missing even though state is ready".into(),
89                    )
90                })
91            }
92            SessionState::Failed(reason) => Err(HandshakeError::Authentication(reason)),
93            _ => Err(HandshakeError::Authentication(
94                "session not ready; streaming blocked".into(),
95            )),
96        }
97    }
98
99    pub fn update_keepalive(&self) {
100        if let Ok(mut k) = self.last_keepalive.lock() {
101            *k = Instant::now();
102        }
103    }
104
105    pub fn check_timeouts(&self) -> Result<(), HandshakeError> {
106        let now = Instant::now();
107        if let Ok(state) = self.state.lock() {
108            if state.check_timeout(self.timeout, now) {
109                self.fail("session timeout".into());
110                return Err(HandshakeError::Transport("session timeout".into()));
111            }
112        }
113        Ok(())
114    }
115
116    /// Sets the stream profile that determines runtime behavior.
117    ///
118    /// This method locks the profile until streaming begins to enforce immutability.
119    pub fn set_stream_profile(&self, profile: CompiledStreamProfile) -> Result<(), HandshakeError> {
120        let locked = self
121            .profile_locked
122            .lock()
123            .map_err(|_| HandshakeError::Protocol("profile lock poisoned".into()))?;
124        if *locked {
125            return Err(HandshakeError::Protocol(
126                "stream profile cannot be changed after streaming starts".into(),
127            ));
128        }
129        let mut compiled = self
130            .compiled_profile
131            .lock()
132            .map_err(|_| HandshakeError::Protocol("compiled profile lock poisoned".into()))?;
133        *compiled = Some(profile);
134        Ok(())
135    }
136
137    /// Returns the bound profile's config ID, if set.
138    ///
139    /// The `config_id` is computed from the normalized profile and never changes.
140    #[must_use]
141    pub fn profile_config_id(&self) -> Option<String> {
142        self.compiled_profile
143            .lock()
144            .ok()
145            .and_then(|guard| guard.clone().map(|profile| profile.config_id().to_string()))
146    }
147
148    /// Retrieves the compiled profile, if configured.
149    ///
150    /// Once streaming starts this returns the same object that controls runtime behavior.
151    #[must_use]
152    pub fn compiled_profile(&self) -> Option<CompiledStreamProfile> {
153        self.compiled_profile
154            .lock()
155            .ok()
156            .and_then(|guard| guard.clone())
157    }
158
159    #[cfg(test)]
160    pub(crate) fn set_locked_profile_for_testing(&self, profile: CompiledStreamProfile) {
161        let mut compiled = self.compiled_profile.lock().unwrap();
162        *compiled = Some(profile);
163        *self.profile_locked.lock().unwrap() = true;
164    }
165
166    pub fn set_jitter_strategy(&self, strat: JitterStrategy) {
167        if let Ok(mut j) = self.jitter.lock() {
168            *j = strat;
169        }
170    }
171
172    pub fn jitter_strategy(&self) -> JitterStrategy {
173        self.jitter
174            .lock()
175            .map(|j| *j)
176            .unwrap_or(JitterStrategy::Drop)
177    }
178
179    pub fn close(&self) {
180        if let Ok(mut state) = self.state.lock() {
181            *state = SessionState::Closed;
182        }
183    }
184
185    pub fn fail(&self, reason: String) {
186        if let Ok(mut state) = self.state.lock() {
187            *state = SessionState::Failed(reason);
188        }
189    }
190
191    fn transition(&self, next: SessionState) -> Result<(), SessionStateError> {
192        let mut state = self.state.lock().unwrap();
193        let current = state.clone();
194        *state = current.transition(next)?;
195        Ok(())
196    }
197
198    pub fn set_streaming_enabled(&self, enabled: bool) {
199        if let Ok(mut flag) = self.streaming_enabled.lock() {
200            *flag = enabled;
201        }
202    }
203
204    pub fn mark_streaming(&self) {
205        if let Ok(mut state) = self.state.lock() {
206            let current = state.clone();
207            if let SessionState::Ready { .. } = current {
208                let _ = current
209                    .transition(SessionState::Streaming {
210                        since: Instant::now(),
211                    })
212                    .map(|next| *state = next);
213            }
214        }
215        if let Ok(mut locked) = self.profile_locked.lock() {
216            *locked = true;
217        }
218    }
219
220    pub fn streaming_enabled(&self) -> bool {
221        self.streaming_enabled.lock().map(|f| *f).unwrap_or(false)
222    }
223
224    fn apply_outcome(&self, outcome: HandshakeOutcome) {
225        if let Ok(mut guard) = self.session_established.lock() {
226            *guard = Some(outcome.established);
227        }
228        if let Ok(mut guard) = self.session_keys.lock() {
229            *guard = Some(outcome.keys);
230        }
231    }
232
233    pub async fn connect<T, A, K>(
234        identity: DeviceIdentity,
235        capabilities: CapabilitySet,
236        authenticator: A,
237        key_exchange: K,
238        context: HandshakeContext,
239        transport: &mut T,
240    ) -> Result<Self, HandshakeError>
241    where
242        T: HandshakeTransport + Send,
243        A: ChallengeAuthenticator + Send + Sync,
244        K: KeyExchange + Send + Sync,
245    {
246        let session = Self::new(AlnpRole::Controller);
247        session.transition(SessionState::Handshake)?;
248        let driver = ClientHandshake {
249            identity,
250            capabilities,
251            authenticator,
252            key_exchange,
253            context,
254        };
255
256        let outcome = driver.run(transport).await?;
257        session.transition(SessionState::Authenticated {
258            since: Instant::now(),
259        })?;
260        session.transition(SessionState::Ready {
261            since: Instant::now(),
262        })?;
263        session.apply_outcome(outcome);
264        Ok(session)
265    }
266
267    pub async fn accept<T, A, K>(
268        identity: DeviceIdentity,
269        capabilities: CapabilitySet,
270        authenticator: A,
271        key_exchange: K,
272        context: HandshakeContext,
273        transport: &mut T,
274    ) -> Result<Self, HandshakeError>
275    where
276        T: HandshakeTransport + Send,
277        A: ChallengeAuthenticator + Send + Sync,
278        K: KeyExchange + Send + Sync,
279    {
280        let session = Self::new(AlnpRole::Node);
281        session.transition(SessionState::Handshake)?;
282        let driver = ServerHandshake {
283            identity,
284            capabilities,
285            authenticator,
286            key_exchange,
287            context,
288        };
289
290        let outcome = driver.run(transport).await?;
291        session.transition(SessionState::Authenticated {
292            since: Instant::now(),
293        })?;
294        session.transition(SessionState::Ready {
295            since: Instant::now(),
296        })?;
297        session.apply_outcome(outcome);
298        Ok(session)
299    }
300}
301
302/// Shared-secret authenticator placeholder for signing and verification.
303pub struct StaticKeyAuthenticator {
304    secret: Vec<u8>,
305}
306
307impl StaticKeyAuthenticator {
308    pub fn new(secret: Vec<u8>) -> Self {
309        Self { secret }
310    }
311}
312
313impl Default for StaticKeyAuthenticator {
314    fn default() -> Self {
315        Self::new(b"default-alnp-secret".to_vec())
316    }
317}
318
319impl ChallengeAuthenticator for StaticKeyAuthenticator {
320    fn sign_challenge(&self, nonce: &[u8]) -> Vec<u8> {
321        let mut sig = Vec::with_capacity(self.secret.len() + nonce.len());
322        sig.extend_from_slice(&self.secret);
323        sig.extend_from_slice(nonce);
324        sig
325    }
326
327    fn verify_challenge(&self, nonce: &[u8], signature: &[u8]) -> bool {
328        signature.ends_with(nonce) && signature.starts_with(&self.secret)
329    }
330}
331
332/// Ed25519-based authenticator using loaded credentials.
333pub struct Ed25519Authenticator {
334    creds: NodeCredentials,
335}
336
337impl Ed25519Authenticator {
338    pub fn new(creds: NodeCredentials) -> Self {
339        Self { creds }
340    }
341}
342
343impl ChallengeAuthenticator for Ed25519Authenticator {
344    fn sign_challenge(&self, nonce: &[u8]) -> Vec<u8> {
345        self.creds.sign(nonce).to_vec()
346    }
347
348    fn verify_challenge(&self, nonce: &[u8], signature: &[u8]) -> bool {
349        if let Ok(sig) = Signature::from_slice(signature) {
350            self.creds.verify(nonce, &sig)
351        } else {
352            false
353        }
354    }
355
356    fn identity_verifying_key(&self) -> Option<Vec<u8>> {
357        Some(self.creds.verifying.to_bytes().to_vec())
358    }
359}
360
361/// Simplified in-memory transport useful for unit tests and examples.
362pub struct LoopbackTransport {
363    inbox: Vec<crate::handshake::HandshakeMessage>,
364}
365
366impl LoopbackTransport {
367    pub fn new() -> Self {
368        Self { inbox: Vec::new() }
369    }
370}
371
372#[cfg(test)]
373mod session_tests {
374    use super::*;
375    use crate::StreamProfile;
376
377    #[test]
378    fn profile_lock_prevents_profile_swaps() {
379        let session = AlnpSession::new(AlnpRole::Controller);
380        let compiled = StreamProfile::auto().compile().unwrap();
381        session.set_stream_profile(compiled.clone()).unwrap();
382        session.mark_streaming();
383        assert!(session.set_stream_profile(compiled).is_err());
384    }
385
386    #[test]
387    fn config_id_matches_profile() {
388        let session = AlnpSession::new(AlnpRole::Controller);
389        let compiled = StreamProfile::realtime().compile().unwrap();
390        session.set_stream_profile(compiled.clone()).unwrap();
391        assert_eq!(session.profile_config_id().unwrap(), compiled.config_id());
392    }
393
394    #[test]
395    fn config_id_stays_locked_after_streaming() {
396        let session = AlnpSession::new(AlnpRole::Controller);
397        let compiled = StreamProfile::install().compile().unwrap();
398        session.set_stream_profile(compiled.clone()).unwrap();
399        let before_config = session.profile_config_id().unwrap();
400        session.mark_streaming();
401        assert_eq!(session.profile_config_id().unwrap(), before_config);
402        assert!(session
403            .set_stream_profile(StreamProfile::default().compile().unwrap())
404            .is_err());
405    }
406}
407
408#[async_trait]
409impl HandshakeTransport for LoopbackTransport {
410    async fn send(
411        &mut self,
412        msg: crate::handshake::HandshakeMessage,
413    ) -> Result<(), HandshakeError> {
414        self.inbox.push(msg);
415        Ok(())
416    }
417
418    async fn recv(&mut self) -> Result<crate::handshake::HandshakeMessage, HandshakeError> {
419        if self.inbox.is_empty() {
420            return Err(HandshakeError::Transport("loopback queue empty".into()));
421        }
422        Ok(self.inbox.remove(0))
423    }
424}
425
426/// Helper builder to quickly create a controller-side session with defaults.
427pub async fn example_controller_session<T: HandshakeTransport + Send>(
428    identity: DeviceIdentity,
429    transport: &mut T,
430) -> Result<AlnpSession, HandshakeError> {
431    AlnpSession::connect(
432        identity,
433        CapabilitySet::default(),
434        StaticKeyAuthenticator::default(),
435        X25519KeyExchange::new(),
436        HandshakeContext::default(),
437        transport,
438    )
439    .await
440}
441
442/// Helper builder to quickly create a node-side session with defaults.
443pub async fn example_node_session<T: HandshakeTransport + Send>(
444    identity: DeviceIdentity,
445    transport: &mut T,
446) -> Result<AlnpSession, HandshakeError> {
447    AlnpSession::accept(
448        identity,
449        CapabilitySet::default(),
450        StaticKeyAuthenticator::default(),
451        X25519KeyExchange::new(),
452        HandshakeContext::default(),
453        transport,
454    )
455    .await
456}