1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use thiserror::Error;
5
6use crate::messages::{ChannelFormat, FrameEnvelope, MessageType};
7use crate::session::{AlnpSession, JitterStrategy};
8
9pub trait FrameTransport: Send + Sync {
11 fn send_frame(&self, bytes: &[u8]) -> Result<(), String>;
12}
13
14#[derive(Debug)]
15pub struct AlnpStream<T: FrameTransport> {
16 session: AlnpSession,
17 transport: T,
18 last_frame: parking_lot::Mutex<Option<FrameEnvelope>>,
19}
20
21#[derive(Debug, Error)]
22pub enum StreamError {
23 #[error("sender not authenticated")]
24 NotAuthenticated,
25 #[error("transport error: {0}")]
26 Transport(String),
27 #[error("streaming disabled")]
28 StreamingDisabled,
29 #[error("no session available")]
30 MissingSession,
31}
32
33impl<T: FrameTransport> AlnpStream<T> {
34 pub fn new(session: AlnpSession, transport: T) -> Self {
35 Self {
36 session,
37 transport,
38 last_frame: parking_lot::Mutex::new(None),
39 }
40 }
41
42 pub fn set_jitter_strategy(&self, strat: JitterStrategy) {
43 self.session.set_jitter_strategy(strat);
44 }
45
46 pub fn send(
48 &self,
49 channel_format: ChannelFormat,
50 channels: Vec<u16>,
51 priority: u8,
52 groups: Option<HashMap<String, Vec<u16>>>,
53 metadata: Option<HashMap<String, serde_json::Value>>,
54 ) -> Result<(), StreamError> {
55 let established = self
56 .session
57 .ensure_streaming_ready()
58 .map_err(|_| StreamError::NotAuthenticated)?;
59 if !self.session.streaming_enabled() {
60 return Err(StreamError::StreamingDisabled);
61 }
62
63 let adjusted_channels = self.apply_jitter(&channels);
64
65 let envelope = FrameEnvelope {
66 message_type: MessageType::AlpineFrame,
67 session_id: established.session_id,
68 timestamp_us: Self::now_us(),
69 priority,
70 channel_format,
71 channels: adjusted_channels,
72 groups,
73 metadata,
74 };
75
76 let bytes = serde_cbor::to_vec(&envelope)
77 .map_err(|e| StreamError::Transport(format!("encode: {}", e)))?;
78 self.transport
79 .send_frame(&bytes)
80 .map_err(StreamError::Transport)?;
81 *self.last_frame.lock() = Some(envelope);
82 Ok(())
83 }
84
85 fn apply_jitter(&self, channels: &[u16]) -> Vec<u16> {
86 match self.session.jitter_strategy() {
87 JitterStrategy::HoldLast => {
88 if channels.is_empty() {
89 if let Some(last) = self.last_frame.lock().as_ref() {
90 return last.channels.clone();
91 }
92 }
93 channels.to_vec()
94 }
95 JitterStrategy::Drop => {
96 if channels.is_empty() {
97 Vec::new()
98 } else {
99 channels.to_vec()
100 }
101 }
102 JitterStrategy::Lerp => {
103 if let Some(last) = self.last_frame.lock().as_ref() {
104 let mut blended = Vec::with_capacity(channels.len());
105 for (idx, value) in channels.iter().enumerate() {
106 let prev = last.channels.get(idx).cloned().unwrap_or(0);
107 blended.push(((prev as u32 + *value as u32) / 2) as u16);
108 }
109 blended
110 } else {
111 channels.to_vec()
112 }
113 }
114 }
115 }
116
117 fn now_us() -> u64 {
118 SystemTime::now()
119 .duration_since(UNIX_EPOCH)
120 .unwrap_or_default()
121 .as_micros() as u64
122 }
123}