alpine/
stream.rs

1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use thiserror::Error;
5
6use crate::messages::{ChannelFormat, FrameEnvelope, MessageType};
7use crate::session::{AlnpSession, JitterStrategy};
8
9/// Minimal transport for sending serialized ALPINE frames (UDP/QUIC left to the caller).
10pub trait FrameTransport: Send + Sync {
11    fn send_frame(&self, bytes: &[u8]) -> Result<(), String>;
12}
13
14#[derive(Debug)]
15pub struct AlnpStream<T: FrameTransport> {
16    session: AlnpSession,
17    transport: T,
18    last_frame: parking_lot::Mutex<Option<FrameEnvelope>>,
19}
20
21#[derive(Debug, Error)]
22pub enum StreamError {
23    #[error("sender not authenticated")]
24    NotAuthenticated,
25    #[error("transport error: {0}")]
26    Transport(String),
27    #[error("streaming disabled")]
28    StreamingDisabled,
29    #[error("no session available")]
30    MissingSession,
31}
32
33impl<T: FrameTransport> AlnpStream<T> {
34    pub fn new(session: AlnpSession, transport: T) -> Self {
35        Self {
36            session,
37            transport,
38            last_frame: parking_lot::Mutex::new(None),
39        }
40    }
41
42    pub fn set_jitter_strategy(&self, strat: JitterStrategy) {
43        self.session.set_jitter_strategy(strat);
44    }
45
46    /// Sends a streaming frame built from raw channel data.
47    pub fn send(
48        &self,
49        channel_format: ChannelFormat,
50        channels: Vec<u16>,
51        priority: u8,
52        groups: Option<HashMap<String, Vec<u16>>>,
53        metadata: Option<HashMap<String, serde_json::Value>>,
54    ) -> Result<(), StreamError> {
55        let established = self
56            .session
57            .ensure_streaming_ready()
58            .map_err(|_| StreamError::NotAuthenticated)?;
59        if !self.session.streaming_enabled() {
60            return Err(StreamError::StreamingDisabled);
61        }
62
63        let adjusted_channels = self.apply_jitter(&channels);
64
65        let envelope = FrameEnvelope {
66            message_type: MessageType::AlpineFrame,
67            session_id: established.session_id,
68            timestamp_us: Self::now_us(),
69            priority,
70            channel_format,
71            channels: adjusted_channels,
72            groups,
73            metadata,
74        };
75
76        let bytes = serde_cbor::to_vec(&envelope)
77            .map_err(|e| StreamError::Transport(format!("encode: {}", e)))?;
78        self.transport
79            .send_frame(&bytes)
80            .map_err(StreamError::Transport)?;
81        *self.last_frame.lock() = Some(envelope);
82        Ok(())
83    }
84
85    fn apply_jitter(&self, channels: &[u16]) -> Vec<u16> {
86        match self.session.jitter_strategy() {
87            JitterStrategy::HoldLast => {
88                if channels.is_empty() {
89                    if let Some(last) = self.last_frame.lock().as_ref() {
90                        return last.channels.clone();
91                    }
92                }
93                channels.to_vec()
94            }
95            JitterStrategy::Drop => {
96                if channels.is_empty() {
97                    Vec::new()
98                } else {
99                    channels.to_vec()
100                }
101            }
102            JitterStrategy::Lerp => {
103                if let Some(last) = self.last_frame.lock().as_ref() {
104                    let mut blended = Vec::with_capacity(channels.len());
105                    for (idx, value) in channels.iter().enumerate() {
106                        let prev = last.channels.get(idx).cloned().unwrap_or(0);
107                        blended.push(((prev as u32 + *value as u32) / 2) as u16);
108                    }
109                    blended
110                } else {
111                    channels.to_vec()
112                }
113            }
114        }
115    }
116
117    fn now_us() -> u64 {
118        SystemTime::now()
119            .duration_since(UNIX_EPOCH)
120            .unwrap_or_default()
121            .as_micros() as u64
122    }
123}