alpine/session/
mod.rs

1use std::sync::{Arc, Mutex};
2use std::time::{Duration, Instant};
3
4use async_trait::async_trait;
5use ed25519_dalek::Signature;
6
7use crate::crypto::{identity::NodeCredentials, KeyExchange, SessionKeys, X25519KeyExchange};
8use crate::handshake::{
9    client::ClientHandshake, server::ServerHandshake, ChallengeAuthenticator, HandshakeContext,
10    HandshakeError, HandshakeOutcome, HandshakeParticipant, HandshakeTransport,
11};
12use crate::messages::{CapabilitySet, DeviceIdentity, SessionEstablished};
13use crate::profile::{CompiledStreamProfile, StreamProfile};
14
15pub mod state;
16use state::{SessionState, SessionStateError};
17
18impl From<SessionStateError> for HandshakeError {
19    fn from(err: SessionStateError) -> Self {
20        HandshakeError::Protocol(err.to_string())
21    }
22}
23
24#[derive(Debug, Clone, Copy, PartialEq)]
25pub enum AlnpRole {
26    Controller,
27    Node,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq)]
31pub enum JitterStrategy {
32    HoldLast,
33    Drop,
34    Lerp,
35}
36
37#[derive(Debug, Clone)]
38pub struct AlnpSession {
39    pub role: AlnpRole,
40    state: Arc<Mutex<SessionState>>,
41    last_keepalive: Arc<Mutex<Instant>>,
42    jitter: Arc<Mutex<JitterStrategy>>,
43    streaming_enabled: Arc<Mutex<bool>>,
44    timeout: Duration,
45    session_established: Arc<Mutex<Option<SessionEstablished>>>,
46    session_keys: Arc<Mutex<Option<SessionKeys>>>,
47    compiled_profile: Arc<Mutex<Option<CompiledStreamProfile>>>,
48    profile_locked: Arc<Mutex<bool>>,
49}
50
51impl AlnpSession {
52    pub fn new(role: AlnpRole) -> Self {
53        Self {
54            role,
55            state: Arc::new(Mutex::new(SessionState::Init)),
56            last_keepalive: Arc::new(Mutex::new(Instant::now())),
57            jitter: Arc::new(Mutex::new(JitterStrategy::HoldLast)),
58            streaming_enabled: Arc::new(Mutex::new(true)),
59            timeout: Duration::from_secs(10),
60            session_established: Arc::new(Mutex::new(None)),
61            session_keys: Arc::new(Mutex::new(None)),
62            compiled_profile: Arc::new(Mutex::new(None)),
63            profile_locked: Arc::new(Mutex::new(false)),
64        }
65    }
66
67    pub fn established(&self) -> Option<SessionEstablished> {
68        self.session_established.lock().ok().and_then(|s| s.clone())
69    }
70
71    pub fn keys(&self) -> Option<SessionKeys> {
72        self.session_keys.lock().ok().and_then(|k| k.clone())
73    }
74
75    pub fn state(&self) -> SessionState {
76        self.state
77            .lock()
78            .map(|g| g.clone())
79            .unwrap_or(SessionState::Failed("state poisoned".to_string()))
80    }
81
82    pub fn ensure_streaming_ready(&self) -> Result<SessionEstablished, HandshakeError> {
83        let state = self.state();
84        match state {
85            SessionState::Ready { .. } | SessionState::Streaming { .. } => {
86                self.established().ok_or_else(|| {
87                    HandshakeError::Authentication(
88                        "session missing even though state is ready".into(),
89                    )
90                })
91            }
92            SessionState::Failed(reason) => Err(HandshakeError::Authentication(reason)),
93            _ => Err(HandshakeError::Authentication(
94                "session not ready; streaming blocked".into(),
95            )),
96        }
97    }
98
99    pub fn update_keepalive(&self) {
100        if let Ok(mut k) = self.last_keepalive.lock() {
101            *k = Instant::now();
102        }
103    }
104
105    pub fn check_timeouts(&self) -> Result<(), HandshakeError> {
106        let now = Instant::now();
107        if let Ok(state) = self.state.lock() {
108            if state.check_timeout(self.timeout, now) {
109                self.fail("session timeout".into());
110                return Err(HandshakeError::Transport("session timeout".into()));
111            }
112        }
113        Ok(())
114    }
115
116    /// Sets the stream profile that determines runtime behavior.
117    ///
118    /// This method locks the profile until streaming begins to enforce immutability.
119    pub fn set_stream_profile(&self, profile: CompiledStreamProfile) -> Result<(), HandshakeError> {
120        let locked = self
121            .profile_locked
122            .lock()
123            .map_err(|_| HandshakeError::Protocol("profile lock poisoned".into()))?;
124        if *locked {
125            return Err(HandshakeError::Protocol(
126                "stream profile cannot be changed after streaming starts".into(),
127            ));
128        }
129        let mut compiled = self
130            .compiled_profile
131            .lock()
132            .map_err(|_| HandshakeError::Protocol("compiled profile lock poisoned".into()))?;
133        *compiled = Some(profile);
134        Ok(())
135    }
136
137    /// Returns the bound profile's config ID, if set.
138    ///
139    /// The `config_id` is computed from the normalized profile and never changes.
140    #[must_use]
141    pub fn profile_config_id(&self) -> Option<String> {
142        self.compiled_profile
143            .lock()
144            .ok()
145            .and_then(|guard| guard.clone().map(|profile| profile.config_id().to_string()))
146    }
147
148    /// Retrieves the compiled profile, if configured.
149    ///
150    /// Once streaming starts this returns the same object that controls runtime behavior.
151    #[must_use]
152    pub fn compiled_profile(&self) -> Option<CompiledStreamProfile> {
153        self.compiled_profile
154            .lock()
155            .ok()
156            .and_then(|guard| guard.clone())
157    }
158
159    #[cfg(test)]
160    pub(crate) fn set_locked_profile_for_testing(&self, profile: CompiledStreamProfile) {
161        let mut compiled = self.compiled_profile.lock().unwrap();
162        *compiled = Some(profile);
163        *self.profile_locked.lock().unwrap() = true;
164    }
165
166    pub fn set_jitter_strategy(&self, strat: JitterStrategy) {
167        if let Ok(mut j) = self.jitter.lock() {
168            *j = strat;
169        }
170    }
171
172    pub fn jitter_strategy(&self) -> JitterStrategy {
173        self.jitter
174            .lock()
175            .map(|j| *j)
176            .unwrap_or(JitterStrategy::Drop)
177    }
178
179    pub fn close(&self) {
180        if let Ok(mut state) = self.state.lock() {
181            *state = SessionState::Closed;
182        }
183    }
184
185    pub fn fail(&self, reason: String) {
186        if let Ok(mut state) = self.state.lock() {
187            *state = SessionState::Failed(reason);
188        }
189    }
190
191    fn transition(&self, next: SessionState) -> Result<(), SessionStateError> {
192        let mut state = self.state.lock().unwrap();
193        let current = state.clone();
194        *state = current.transition(next)?;
195        Ok(())
196    }
197
198    pub fn set_streaming_enabled(&self, enabled: bool) {
199        if let Ok(mut flag) = self.streaming_enabled.lock() {
200            *flag = enabled;
201        }
202    }
203
204    pub fn mark_streaming(&self) {
205        if let Ok(mut state) = self.state.lock() {
206            let current = state.clone();
207            if let SessionState::Ready { .. } = current {
208                let _ = current
209                    .transition(SessionState::Streaming {
210                        since: Instant::now(),
211                    })
212                    .map(|next| *state = next);
213            }
214        }
215        if let Ok(mut locked) = self.profile_locked.lock() {
216            *locked = true;
217        }
218    }
219
220    pub fn streaming_enabled(&self) -> bool {
221        self.streaming_enabled.lock().map(|f| *f).unwrap_or(false)
222    }
223
224    fn apply_outcome(&self, outcome: HandshakeOutcome) {
225        if let Ok(mut guard) = self.session_established.lock() {
226            *guard = Some(outcome.established);
227        }
228        if let Ok(mut guard) = self.session_keys.lock() {
229            *guard = Some(outcome.keys);
230        }
231    }
232
233    pub async fn connect<T, A, K>(
234        identity: DeviceIdentity,
235        capabilities: CapabilitySet,
236        authenticator: A,
237        key_exchange: K,
238        context: HandshakeContext,
239        transport: &mut T,
240    ) -> Result<Self, HandshakeError>
241    where
242        T: HandshakeTransport + Send,
243        A: ChallengeAuthenticator + Send + Sync,
244        K: KeyExchange + Send + Sync,
245    {
246        let session = Self::new(AlnpRole::Controller);
247        session.transition(SessionState::Handshake)?;
248        let driver = ClientHandshake {
249            identity,
250            capabilities,
251            authenticator,
252            key_exchange,
253            context,
254        };
255
256        let outcome = driver.run(transport).await?;
257        session.transition(SessionState::Authenticated {
258            since: Instant::now(),
259        })?;
260        session.transition(SessionState::Ready {
261            since: Instant::now(),
262        })?;
263        session.apply_outcome(outcome);
264        Ok(session)
265    }
266
267    pub async fn accept<T, A, K>(
268        identity: DeviceIdentity,
269        capabilities: CapabilitySet,
270        authenticator: A,
271        key_exchange: K,
272        context: HandshakeContext,
273        transport: &mut T,
274    ) -> Result<Self, HandshakeError>
275    where
276        T: HandshakeTransport + Send,
277        A: ChallengeAuthenticator + Send + Sync,
278        K: KeyExchange + Send + Sync,
279    {
280        let session = Self::new(AlnpRole::Node);
281        session.transition(SessionState::Handshake)?;
282        let driver = ServerHandshake {
283            identity,
284            capabilities,
285            authenticator,
286            key_exchange,
287            context,
288        };
289
290        let outcome = driver.run(transport).await?;
291        session.transition(SessionState::Authenticated {
292            since: Instant::now(),
293        })?;
294        session.transition(SessionState::Ready {
295            since: Instant::now(),
296        })?;
297        session.apply_outcome(outcome);
298        Ok(session)
299    }
300}
301
302/// Shared-secret authenticator placeholder for signing and verification.
303pub struct StaticKeyAuthenticator {
304    secret: Vec<u8>,
305}
306
307impl StaticKeyAuthenticator {
308    pub fn new(secret: Vec<u8>) -> Self {
309        Self { secret }
310    }
311}
312
313impl Default for StaticKeyAuthenticator {
314    fn default() -> Self {
315        Self::new(b"default-alnp-secret".to_vec())
316    }
317}
318
319impl ChallengeAuthenticator for StaticKeyAuthenticator {
320    fn sign_challenge(&self, nonce: &[u8]) -> Vec<u8> {
321        let mut sig = Vec::with_capacity(self.secret.len() + nonce.len());
322        sig.extend_from_slice(&self.secret);
323        sig.extend_from_slice(nonce);
324        sig
325    }
326
327    fn verify_challenge(&self, nonce: &[u8], signature: &[u8]) -> bool {
328        signature.ends_with(nonce) && signature.starts_with(&self.secret)
329    }
330}
331
332/// Ed25519-based authenticator using loaded credentials.
333pub struct Ed25519Authenticator {
334    creds: NodeCredentials,
335}
336
337impl Ed25519Authenticator {
338    pub fn new(creds: NodeCredentials) -> Self {
339        Self { creds }
340    }
341}
342
343impl ChallengeAuthenticator for Ed25519Authenticator {
344    fn sign_challenge(&self, nonce: &[u8]) -> Vec<u8> {
345        self.creds.sign(nonce).to_vec()
346    }
347
348    fn verify_challenge(&self, nonce: &[u8], signature: &[u8]) -> bool {
349        if let Ok(sig) = Signature::from_slice(signature) {
350            self.creds.verify(nonce, &sig)
351        } else {
352            false
353        }
354    }
355}
356
357/// Simplified in-memory transport useful for unit tests and examples.
358pub struct LoopbackTransport {
359    inbox: Vec<crate::handshake::HandshakeMessage>,
360}
361
362impl LoopbackTransport {
363    pub fn new() -> Self {
364        Self { inbox: Vec::new() }
365    }
366}
367
368#[cfg(test)]
369mod session_tests {
370    use super::*;
371
372    #[test]
373    fn profile_lock_prevents_profile_swaps() {
374        let session = AlnpSession::new(AlnpRole::Controller);
375        let compiled = StreamProfile::auto().compile().unwrap();
376        session.set_stream_profile(compiled.clone()).unwrap();
377        session.mark_streaming();
378        assert!(session.set_stream_profile(compiled).is_err());
379    }
380
381    #[test]
382    fn config_id_matches_profile() {
383        let session = AlnpSession::new(AlnpRole::Controller);
384        let compiled = StreamProfile::realtime().compile().unwrap();
385        session.set_stream_profile(compiled.clone()).unwrap();
386        assert_eq!(session.profile_config_id().unwrap(), compiled.config_id());
387    }
388
389    #[test]
390    fn config_id_stays_locked_after_streaming() {
391        let session = AlnpSession::new(AlnpRole::Controller);
392        let compiled = StreamProfile::install().compile().unwrap();
393        session.set_stream_profile(compiled.clone()).unwrap();
394        let before_config = session.profile_config_id().unwrap();
395        session.mark_streaming();
396        assert_eq!(session.profile_config_id().unwrap(), before_config);
397        assert!(session
398            .set_stream_profile(StreamProfile::default().compile().unwrap())
399            .is_err());
400    }
401}
402
403#[async_trait]
404impl HandshakeTransport for LoopbackTransport {
405    async fn send(
406        &mut self,
407        msg: crate::handshake::HandshakeMessage,
408    ) -> Result<(), HandshakeError> {
409        self.inbox.push(msg);
410        Ok(())
411    }
412
413    async fn recv(&mut self) -> Result<crate::handshake::HandshakeMessage, HandshakeError> {
414        if self.inbox.is_empty() {
415            return Err(HandshakeError::Transport("loopback queue empty".into()));
416        }
417        Ok(self.inbox.remove(0))
418    }
419}
420
421/// Helper builder to quickly create a controller-side session with defaults.
422pub async fn example_controller_session<T: HandshakeTransport + Send>(
423    identity: DeviceIdentity,
424    transport: &mut T,
425) -> Result<AlnpSession, HandshakeError> {
426    AlnpSession::connect(
427        identity,
428        CapabilitySet::default(),
429        StaticKeyAuthenticator::default(),
430        X25519KeyExchange::new(),
431        HandshakeContext::default(),
432        transport,
433    )
434    .await
435}
436
437/// Helper builder to quickly create a node-side session with defaults.
438pub async fn example_node_session<T: HandshakeTransport + Send>(
439    identity: DeviceIdentity,
440    transport: &mut T,
441) -> Result<AlnpSession, HandshakeError> {
442    AlnpSession::accept(
443        identity,
444        CapabilitySet::default(),
445        StaticKeyAuthenticator::default(),
446        X25519KeyExchange::new(),
447        HandshakeContext::default(),
448        transport,
449    )
450    .await
451}