alpine/
stream.rs

1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use thiserror::Error;
5
6use crate::messages::{ChannelFormat, FrameEnvelope, MessageType};
7use crate::profile::CompiledStreamProfile;
8use crate::session::{AlnpSession, JitterStrategy};
9
10/// Minimal transport for sending serialized ALPINE frames (UDP/QUIC left to the caller).
11pub trait FrameTransport: Send + Sync {
12    /// Sends the provided serialized frame.
13    fn send_frame(&self, bytes: &[u8]) -> Result<(), String>;
14}
15
16/// Stream state machine used by higher-level clients.
17#[derive(Debug)]
18pub struct AlnpStream<T: FrameTransport> {
19    session: AlnpSession,
20    transport: T,
21    last_frame: parking_lot::Mutex<Option<FrameEnvelope>>,
22    profile: CompiledStreamProfile,
23}
24
25/// Errors emitted from the streaming helper.
26#[derive(Debug, Error)]
27pub enum StreamError {
28    #[error("sender not authenticated")]
29    NotAuthenticated,
30    #[error("transport error: {0}")]
31    Transport(String),
32    #[error("streaming disabled")]
33    StreamingDisabled,
34    #[error("no session available")]
35    MissingSession,
36}
37
38impl<T: FrameTransport> AlnpStream<T> {
39    /// Builds a new streaming helper bound to a compiled profile.
40    pub fn new(session: AlnpSession, transport: T, profile: CompiledStreamProfile) -> Self {
41        Self {
42            session,
43            transport,
44            last_frame: parking_lot::Mutex::new(None),
45            profile,
46        }
47    }
48
49    /// Sends a streaming frame built from raw channel data.
50    ///
51    /// # Guarantees
52    /// * Only sends when the session is already authenticated and streaming-enabled.
53    /// * Applies jitter strategy derived from the compiled profile; no branching on
54    ///   user-facing preferences happens at this layer.
55    pub fn send(
56        &self,
57        channel_format: ChannelFormat,
58        channels: Vec<u16>,
59        priority: u8,
60        groups: Option<HashMap<String, Vec<u16>>>,
61        metadata: Option<HashMap<String, serde_json::Value>>,
62    ) -> Result<(), StreamError> {
63        let established = self
64            .session
65            .ensure_streaming_ready()
66            .map_err(|_| StreamError::NotAuthenticated)?;
67        if !self.session.streaming_enabled() {
68            return Err(StreamError::StreamingDisabled);
69        }
70
71        let adjusted_channels = self.apply_jitter(&channels);
72
73        let envelope = FrameEnvelope {
74            message_type: MessageType::AlpineFrame,
75            session_id: established.session_id,
76            timestamp_us: Self::now_us(),
77            priority,
78            channel_format,
79            channels: adjusted_channels,
80            groups,
81            metadata,
82        };
83
84        let bytes = serde_cbor::to_vec(&envelope)
85            .map_err(|e| StreamError::Transport(format!("encode: {}", e)))?;
86        self.transport
87            .send_frame(&bytes)
88            .map_err(StreamError::Transport)?;
89        *self.last_frame.lock() = Some(envelope);
90        Ok(())
91    }
92
93    fn apply_jitter(&self, channels: &[u16]) -> Vec<u16> {
94        match self.jitter_strategy_from_profile() {
95            JitterStrategy::HoldLast => {
96                if channels.is_empty() {
97                    if let Some(last) = self.last_frame.lock().as_ref() {
98                        return last.channels.clone();
99                    }
100                }
101                channels.to_vec()
102            }
103            JitterStrategy::Drop => {
104                if channels.is_empty() {
105                    Vec::new()
106                } else {
107                    channels.to_vec()
108                }
109            }
110            JitterStrategy::Lerp => {
111                if let Some(last) = self.last_frame.lock().as_ref() {
112                    let mut blended = Vec::with_capacity(channels.len());
113                    for (idx, value) in channels.iter().enumerate() {
114                        let prev = last.channels.get(idx).cloned().unwrap_or(0);
115                        blended.push(((prev as u32 + *value as u32) / 2) as u16);
116                    }
117                    blended
118                } else {
119                    channels.to_vec()
120                }
121            }
122        }
123    }
124
125    fn now_us() -> u64 {
126        SystemTime::now()
127            .duration_since(UNIX_EPOCH)
128            .unwrap_or_default()
129            .as_micros() as u64
130    }
131
132    fn jitter_strategy_from_profile(&self) -> JitterStrategy {
133        if self.profile.latency_weight() >= self.profile.resilience_weight() {
134            JitterStrategy::HoldLast
135        } else {
136            JitterStrategy::Lerp
137        }
138    }
139}