alpine/
stream.rs

1use serde_json::{json, Value};
2use std::collections::HashMap;
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use thiserror::Error;
6use tracing::{info, warn};
7
8use crate::messages::{ChannelFormat, FrameEnvelope, MessageType};
9use crate::profile::CompiledStreamProfile;
10use crate::session::{AlnpSession, JitterStrategy};
11use crate::stream::adaptive::{decide_next_state, AdaptationState};
12
13/// Minimal transport for sending serialized ALPINE frames (UDP/QUIC left to the caller).
14pub trait FrameTransport: Send + Sync {
15    /// Sends the provided serialized frame.
16    fn send_frame(&self, bytes: &[u8]) -> Result<(), String>;
17}
18
19/// Stream state machine used by higher-level clients.
20#[derive(Debug)]
21pub struct AlnpStream<T: FrameTransport> {
22    session: AlnpSession,
23    transport: T,
24    last_frame: parking_lot::Mutex<Option<FrameEnvelope>>,
25    profile: CompiledStreamProfile,
26    recovery: parking_lot::Mutex<RecoveryMonitor>,
27    recovery_reason: parking_lot::Mutex<Option<RecoveryReason>>,
28    adaptation: parking_lot::Mutex<AdaptationState>,
29}
30
31/// Errors emitted from the streaming helper.
32#[derive(Debug, Error)]
33pub enum StreamError {
34    #[error("sender not authenticated")]
35    NotAuthenticated,
36    #[error("transport error: {0}")]
37    Transport(String),
38    #[error("streaming disabled")]
39    StreamingDisabled,
40    #[error("no session available")]
41    MissingSession,
42}
43
44mod network;
45
46pub use network::{NetworkConditions, NetworkMetrics};
47
48mod recovery;
49
50pub use recovery::{RecoveryEvent, RecoveryMonitor, RecoveryReason};
51
52mod adaptive;
53
54impl<T: FrameTransport> AlnpStream<T> {
55    /// Builds a new streaming helper bound to a compiled profile.
56    pub fn new(session: AlnpSession, transport: T, profile: CompiledStreamProfile) -> Self {
57        let intent = profile.intent();
58        Self {
59            session,
60            transport,
61            last_frame: parking_lot::Mutex::new(None),
62            profile,
63            recovery: parking_lot::Mutex::new(RecoveryMonitor::new()),
64            recovery_reason: parking_lot::Mutex::new(None),
65            adaptation: parking_lot::Mutex::new(AdaptationState::baseline(intent)),
66        }
67    }
68
69    /// Sends a streaming frame built from raw channel data.
70    ///
71    /// # Guarantees
72    /// * Only sends when the session is already authenticated and streaming-enabled.
73    /// * Applies jitter strategy derived from the compiled profile; no branching on
74    ///   user-facing preferences happens at this layer.
75    pub fn send(
76        &self,
77        channel_format: ChannelFormat,
78        channels: Vec<u16>,
79        priority: u8,
80        groups: Option<HashMap<String, Vec<u16>>>,
81        metadata: Option<HashMap<String, serde_json::Value>>,
82    ) -> Result<(), StreamError> {
83        let established = self
84            .session
85            .ensure_streaming_ready()
86            .map_err(|_| StreamError::NotAuthenticated)?;
87        if !self.session.streaming_enabled() {
88            return Err(StreamError::StreamingDisabled);
89        }
90
91        let adjusted_channels = self.apply_jitter(&channels);
92        let mut adaptation = self.adaptation.lock();
93        let should_force_keyframe = adaptation.should_emit_keyframe();
94        let adaptation_snapshot = adaptation.clone();
95        drop(adaptation);
96        let metadata =
97            self.annotate_metadata(metadata, should_force_keyframe, &adaptation_snapshot);
98
99        let envelope = FrameEnvelope {
100            message_type: MessageType::AlpineFrame,
101            session_id: established.session_id,
102            timestamp_us: Self::now_us(),
103            priority,
104            channel_format,
105            channels: adjusted_channels,
106            groups,
107            metadata,
108        };
109
110        let bytes = serde_cbor::to_vec(&envelope)
111            .map_err(|e| StreamError::Transport(format!("encode: {}", e)))?;
112        self.transport
113            .send_frame(&bytes)
114            .map_err(StreamError::Transport)?;
115        *self.last_frame.lock() = Some(envelope);
116        Ok(())
117    }
118
119    /// Updates recovery state based on observed network conditions.
120    pub fn observe_network_conditions(&self, conditions: &NetworkConditions) {
121        let mut monitor = self.recovery.lock();
122        if let Some(event) = monitor.feed(conditions) {
123            match event {
124                RecoveryEvent::RecoveryStarted(reason) => warn!(
125                    target: "alpine::recovery",
126                    reason = reason.as_str(),
127                    "recovery started due to {}",
128                    reason.as_str()
129                ),
130                RecoveryEvent::RecoveryComplete(reason) => info!(
131                    target: "alpine::recovery",
132                    reason = reason.as_str(),
133                    "recovery complete for {}",
134                    reason.as_str()
135                ),
136            }
137        }
138        let reason = monitor.active_reason();
139        {
140            let mut guard = self.recovery_reason.lock();
141            *guard = reason;
142        }
143        drop(monitor);
144
145        let mut adaptation = self.adaptation.lock();
146        let decision = decide_next_state(&adaptation, conditions, reason, self.profile.intent());
147        *adaptation = decision.state;
148    }
149
150    fn annotate_metadata(
151        &self,
152        metadata: Option<HashMap<String, Value>>,
153        force_keyframe: bool,
154        adaptation_snapshot: &AdaptationState,
155    ) -> Option<HashMap<String, Value>> {
156        let mut map = metadata.unwrap_or_default();
157        if let Some(reason) = *self.recovery_reason.lock() {
158            map.insert(
159                "alpine_recovery".to_string(),
160                json!({
161                    "phase": "recovery",
162                    "reason": reason.as_str(),
163                }),
164            );
165        }
166
167        let event_name = adaptation_snapshot
168            .last_event
169            .map(|event| event.as_str())
170            .unwrap_or("steady");
171        map.insert(
172            "alpine_adaptation".to_string(),
173            json!({
174                "keyframe_interval": adaptation_snapshot.keyframe_interval,
175                "delta_depth": adaptation_snapshot.delta_depth,
176                "deadline_offset_ms": adaptation_snapshot.deadline_offset_ms,
177                "degraded_safe": adaptation_snapshot.degraded_safe,
178                "frames_since_keyframe": adaptation_snapshot.frames_since_keyframe,
179                "force_keyframe": force_keyframe,
180                "event": event_name,
181            }),
182        );
183        Some(map)
184    }
185
186    fn apply_jitter(&self, channels: &[u16]) -> Vec<u16> {
187        match self.jitter_strategy_from_profile() {
188            JitterStrategy::HoldLast => {
189                if channels.is_empty() {
190                    if let Some(last) = self.last_frame.lock().as_ref() {
191                        return last.channels.clone();
192                    }
193                }
194                channels.to_vec()
195            }
196            JitterStrategy::Drop => {
197                if channels.is_empty() {
198                    Vec::new()
199                } else {
200                    channels.to_vec()
201                }
202            }
203            JitterStrategy::Lerp => {
204                if let Some(last) = self.last_frame.lock().as_ref() {
205                    let mut blended = Vec::with_capacity(channels.len());
206                    for (idx, value) in channels.iter().enumerate() {
207                        let prev = last.channels.get(idx).cloned().unwrap_or(0);
208                        blended.push(((prev as u32 + *value as u32) / 2) as u16);
209                    }
210                    blended
211                } else {
212                    channels.to_vec()
213                }
214            }
215        }
216    }
217
218    fn now_us() -> u64 {
219        SystemTime::now()
220            .duration_since(UNIX_EPOCH)
221            .unwrap_or_default()
222            .as_micros() as u64
223    }
224
225    fn jitter_strategy_from_profile(&self) -> JitterStrategy {
226        if self.profile.latency_weight() >= self.profile.resilience_weight() {
227            JitterStrategy::HoldLast
228        } else {
229            JitterStrategy::Lerp
230        }
231    }
232}