1use std::collections::HashMap;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use thiserror::Error;
5
6use crate::messages::{ChannelFormat, FrameEnvelope, MessageType};
7use crate::profile::CompiledStreamProfile;
8use crate::session::{AlnpSession, JitterStrategy};
9
10pub trait FrameTransport: Send + Sync {
12 fn send_frame(&self, bytes: &[u8]) -> Result<(), String>;
14}
15
16#[derive(Debug)]
18pub struct AlnpStream<T: FrameTransport> {
19 session: AlnpSession,
20 transport: T,
21 last_frame: parking_lot::Mutex<Option<FrameEnvelope>>,
22 profile: CompiledStreamProfile,
23}
24
25#[derive(Debug, Error)]
27pub enum StreamError {
28 #[error("sender not authenticated")]
29 NotAuthenticated,
30 #[error("transport error: {0}")]
31 Transport(String),
32 #[error("streaming disabled")]
33 StreamingDisabled,
34 #[error("no session available")]
35 MissingSession,
36}
37
38impl<T: FrameTransport> AlnpStream<T> {
39 pub fn new(session: AlnpSession, transport: T, profile: CompiledStreamProfile) -> Self {
41 Self {
42 session,
43 transport,
44 last_frame: parking_lot::Mutex::new(None),
45 profile,
46 }
47 }
48
49 pub fn send(
56 &self,
57 channel_format: ChannelFormat,
58 channels: Vec<u16>,
59 priority: u8,
60 groups: Option<HashMap<String, Vec<u16>>>,
61 metadata: Option<HashMap<String, serde_json::Value>>,
62 ) -> Result<(), StreamError> {
63 let established = self
64 .session
65 .ensure_streaming_ready()
66 .map_err(|_| StreamError::NotAuthenticated)?;
67 if !self.session.streaming_enabled() {
68 return Err(StreamError::StreamingDisabled);
69 }
70
71 let adjusted_channels = self.apply_jitter(&channels);
72
73 let envelope = FrameEnvelope {
74 message_type: MessageType::AlpineFrame,
75 session_id: established.session_id,
76 timestamp_us: Self::now_us(),
77 priority,
78 channel_format,
79 channels: adjusted_channels,
80 groups,
81 metadata,
82 };
83
84 let bytes = serde_cbor::to_vec(&envelope)
85 .map_err(|e| StreamError::Transport(format!("encode: {}", e)))?;
86 self.transport
87 .send_frame(&bytes)
88 .map_err(StreamError::Transport)?;
89 *self.last_frame.lock() = Some(envelope);
90 Ok(())
91 }
92
93 fn apply_jitter(&self, channels: &[u16]) -> Vec<u16> {
94 match self.jitter_strategy_from_profile() {
95 JitterStrategy::HoldLast => {
96 if channels.is_empty() {
97 if let Some(last) = self.last_frame.lock().as_ref() {
98 return last.channels.clone();
99 }
100 }
101 channels.to_vec()
102 }
103 JitterStrategy::Drop => {
104 if channels.is_empty() {
105 Vec::new()
106 } else {
107 channels.to_vec()
108 }
109 }
110 JitterStrategy::Lerp => {
111 if let Some(last) = self.last_frame.lock().as_ref() {
112 let mut blended = Vec::with_capacity(channels.len());
113 for (idx, value) in channels.iter().enumerate() {
114 let prev = last.channels.get(idx).cloned().unwrap_or(0);
115 blended.push(((prev as u32 + *value as u32) / 2) as u16);
116 }
117 blended
118 } else {
119 channels.to_vec()
120 }
121 }
122 }
123 }
124
125 fn now_us() -> u64 {
126 SystemTime::now()
127 .duration_since(UNIX_EPOCH)
128 .unwrap_or_default()
129 .as_micros() as u64
130 }
131
132 fn jitter_strategy_from_profile(&self) -> JitterStrategy {
133 if self.profile.latency_weight() >= self.profile.resilience_weight() {
134 JitterStrategy::HoldLast
135 } else {
136 JitterStrategy::Lerp
137 }
138 }
139}