1use serde_json::{json, Value};
2use std::collections::HashMap;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use thiserror::Error;
6use tracing::{info, warn};
7
8use crate::messages::{ChannelFormat, FrameEnvelope, MessageType};
9use crate::profile::CompiledStreamProfile;
10use crate::session::{AlnpSession, JitterStrategy};
11use crate::stream::adaptive::{decide_next_state, AdaptationState};
12
13pub trait FrameTransport: Send + Sync {
15 fn send_frame(&self, bytes: &[u8]) -> Result<(), String>;
17}
18
19#[derive(Debug)]
21pub struct AlnpStream<T: FrameTransport> {
22 session: AlnpSession,
23 transport: T,
24 last_frame: parking_lot::Mutex<Option<FrameEnvelope>>,
25 profile: CompiledStreamProfile,
26 recovery: parking_lot::Mutex<RecoveryMonitor>,
27 recovery_reason: parking_lot::Mutex<Option<RecoveryReason>>,
28 adaptation: parking_lot::Mutex<AdaptationState>,
29}
30
31#[derive(Debug, Error)]
33pub enum StreamError {
34 #[error("sender not authenticated")]
35 NotAuthenticated,
36 #[error("transport error: {0}")]
37 Transport(String),
38 #[error("streaming disabled")]
39 StreamingDisabled,
40 #[error("no session available")]
41 MissingSession,
42}
43
44mod network;
45
46pub use network::{NetworkConditions, NetworkMetrics};
47
48mod recovery;
49
50pub use recovery::{RecoveryEvent, RecoveryMonitor, RecoveryReason};
51
52mod adaptive;
53
54impl<T: FrameTransport> AlnpStream<T> {
55 pub fn new(session: AlnpSession, transport: T, profile: CompiledStreamProfile) -> Self {
57 let intent = profile.intent();
58 Self {
59 session,
60 transport,
61 last_frame: parking_lot::Mutex::new(None),
62 profile,
63 recovery: parking_lot::Mutex::new(RecoveryMonitor::new()),
64 recovery_reason: parking_lot::Mutex::new(None),
65 adaptation: parking_lot::Mutex::new(AdaptationState::baseline(intent)),
66 }
67 }
68
69 pub fn send(
76 &self,
77 channel_format: ChannelFormat,
78 channels: Vec<u16>,
79 priority: u8,
80 groups: Option<HashMap<String, Vec<u16>>>,
81 metadata: Option<HashMap<String, serde_json::Value>>,
82 ) -> Result<(), StreamError> {
83 let established = self
84 .session
85 .ensure_streaming_ready()
86 .map_err(|_| StreamError::NotAuthenticated)?;
87 if !self.session.streaming_enabled() {
88 return Err(StreamError::StreamingDisabled);
89 }
90
91 let adjusted_channels = self.apply_jitter(&channels);
92 let mut adaptation = self.adaptation.lock();
93 let should_force_keyframe = adaptation.should_emit_keyframe();
94 let adaptation_snapshot = adaptation.clone();
95 drop(adaptation);
96 let metadata =
97 self.annotate_metadata(metadata, should_force_keyframe, &adaptation_snapshot);
98
99 let envelope = FrameEnvelope {
100 message_type: MessageType::AlpineFrame,
101 session_id: established.session_id,
102 timestamp_us: Self::now_us(),
103 priority,
104 channel_format,
105 channels: adjusted_channels,
106 groups,
107 metadata,
108 };
109
110 let bytes = serde_cbor::to_vec(&envelope)
111 .map_err(|e| StreamError::Transport(format!("encode: {}", e)))?;
112 self.transport
113 .send_frame(&bytes)
114 .map_err(StreamError::Transport)?;
115 *self.last_frame.lock() = Some(envelope);
116 Ok(())
117 }
118
119 pub fn observe_network_conditions(&self, conditions: &NetworkConditions) {
121 let mut monitor = self.recovery.lock();
122 if let Some(event) = monitor.feed(conditions) {
123 match event {
124 RecoveryEvent::RecoveryStarted(reason) => warn!(
125 target: "alpine::recovery",
126 reason = reason.as_str(),
127 "recovery started due to {}",
128 reason.as_str()
129 ),
130 RecoveryEvent::RecoveryComplete(reason) => info!(
131 target: "alpine::recovery",
132 reason = reason.as_str(),
133 "recovery complete for {}",
134 reason.as_str()
135 ),
136 }
137 }
138 let reason = monitor.active_reason();
139 {
140 let mut guard = self.recovery_reason.lock();
141 *guard = reason;
142 }
143 drop(monitor);
144
145 let mut adaptation = self.adaptation.lock();
146 let decision = decide_next_state(&adaptation, conditions, reason, self.profile.intent());
147 *adaptation = decision.state;
148 }
149
150 fn annotate_metadata(
151 &self,
152 metadata: Option<HashMap<String, Value>>,
153 force_keyframe: bool,
154 adaptation_snapshot: &AdaptationState,
155 ) -> Option<HashMap<String, Value>> {
156 let mut map = metadata.unwrap_or_default();
157 if let Some(reason) = *self.recovery_reason.lock() {
158 map.insert(
159 "alpine_recovery".to_string(),
160 json!({
161 "phase": "recovery",
162 "reason": reason.as_str(),
163 }),
164 );
165 }
166
167 let event_name = adaptation_snapshot
168 .last_event
169 .map(|event| event.as_str())
170 .unwrap_or("steady");
171 map.insert(
172 "alpine_adaptation".to_string(),
173 json!({
174 "keyframe_interval": adaptation_snapshot.keyframe_interval,
175 "delta_depth": adaptation_snapshot.delta_depth,
176 "deadline_offset_ms": adaptation_snapshot.deadline_offset_ms,
177 "degraded_safe": adaptation_snapshot.degraded_safe,
178 "frames_since_keyframe": adaptation_snapshot.frames_since_keyframe,
179 "force_keyframe": force_keyframe,
180 "event": event_name,
181 }),
182 );
183 Some(map)
184 }
185
186 fn apply_jitter(&self, channels: &[u16]) -> Vec<u16> {
187 match self.jitter_strategy_from_profile() {
188 JitterStrategy::HoldLast => {
189 if channels.is_empty() {
190 if let Some(last) = self.last_frame.lock().as_ref() {
191 return last.channels.clone();
192 }
193 }
194 channels.to_vec()
195 }
196 JitterStrategy::Drop => {
197 if channels.is_empty() {
198 Vec::new()
199 } else {
200 channels.to_vec()
201 }
202 }
203 JitterStrategy::Lerp => {
204 if let Some(last) = self.last_frame.lock().as_ref() {
205 let mut blended = Vec::with_capacity(channels.len());
206 for (idx, value) in channels.iter().enumerate() {
207 let prev = last.channels.get(idx).cloned().unwrap_or(0);
208 blended.push(((prev as u32 + *value as u32) / 2) as u16);
209 }
210 blended
211 } else {
212 channels.to_vec()
213 }
214 }
215 }
216 }
217
218 fn now_us() -> u64 {
219 SystemTime::now()
220 .duration_since(UNIX_EPOCH)
221 .unwrap_or_default()
222 .as_micros() as u64
223 }
224
225 fn jitter_strategy_from_profile(&self) -> JitterStrategy {
226 if self.profile.latency_weight() >= self.profile.resilience_weight() {
227 JitterStrategy::HoldLast
228 } else {
229 JitterStrategy::Lerp
230 }
231 }
232}