use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use thiserror::Error;
use tracing::{info, warn};
use crate::messages::{ChannelFormat, FrameEnvelope, MessageType};
use crate::profile::CompiledStreamProfile;
use crate::session::{AlnpSession, JitterStrategy};
use crate::stream::adaptive::{decide_next_state, AdaptationState};
pub trait FrameTransport: Send + Sync {
fn send_frame(&self, bytes: &[u8]) -> Result<(), String>;
}
#[derive(Debug)]
pub struct AlnpStream<T: FrameTransport> {
session: AlnpSession,
transport: T,
last_frame: parking_lot::Mutex<Option<FrameEnvelope>>,
profile: CompiledStreamProfile,
recovery: parking_lot::Mutex<RecoveryMonitor>,
recovery_reason: parking_lot::Mutex<Option<RecoveryReason>>,
adaptation: parking_lot::Mutex<AdaptationState>,
}
#[derive(Debug, Error)]
pub enum StreamError {
#[error("sender not authenticated")]
NotAuthenticated,
#[error("transport error: {0}")]
Transport(String),
#[error("streaming disabled")]
StreamingDisabled,
#[error("no session available")]
MissingSession,
}
mod network;
pub use network::{NetworkConditions, NetworkMetrics};
mod recovery;
pub use recovery::{RecoveryEvent, RecoveryMonitor, RecoveryReason};
mod adaptive;
impl<T: FrameTransport> AlnpStream<T> {
pub fn new(session: AlnpSession, transport: T, profile: CompiledStreamProfile) -> Self {
let intent = profile.intent();
Self {
session,
transport,
last_frame: parking_lot::Mutex::new(None),
profile,
recovery: parking_lot::Mutex::new(RecoveryMonitor::new()),
recovery_reason: parking_lot::Mutex::new(None),
adaptation: parking_lot::Mutex::new(AdaptationState::baseline(intent)),
}
}
pub fn send(
&self,
channel_format: ChannelFormat,
channels: Vec<u16>,
priority: u8,
groups: Option<HashMap<String, Vec<u16>>>,
metadata: Option<HashMap<String, serde_json::Value>>,
) -> Result<(), StreamError> {
let established = self
.session
.ensure_streaming_ready()
.map_err(|_| StreamError::NotAuthenticated)?;
if !self.session.streaming_enabled() {
return Err(StreamError::StreamingDisabled);
}
let adjusted_channels = self.apply_jitter(&channels);
let mut adaptation = self.adaptation.lock();
let should_force_keyframe = adaptation.should_emit_keyframe();
let adaptation_snapshot = adaptation.clone();
drop(adaptation);
let metadata =
self.annotate_metadata(metadata, should_force_keyframe, &adaptation_snapshot);
let envelope = FrameEnvelope {
message_type: MessageType::AlpineFrame,
session_id: established.session_id,
timestamp_us: Self::now_us(),
priority,
channel_format,
channels: adjusted_channels,
groups,
metadata,
};
let bytes = serde_cbor::to_vec(&envelope)
.map_err(|e| StreamError::Transport(format!("encode: {}", e)))?;
self.transport
.send_frame(&bytes)
.map_err(StreamError::Transport)?;
*self.last_frame.lock() = Some(envelope);
Ok(())
}
pub fn observe_network_conditions(&self, conditions: &NetworkConditions) {
let mut monitor = self.recovery.lock();
if let Some(event) = monitor.feed(conditions) {
match event {
RecoveryEvent::RecoveryStarted(reason) => warn!(
target: "alpine::recovery",
reason = reason.as_str(),
"recovery started due to {}",
reason.as_str()
),
RecoveryEvent::RecoveryComplete(reason) => info!(
target: "alpine::recovery",
reason = reason.as_str(),
"recovery complete for {}",
reason.as_str()
),
}
}
let reason = monitor.active_reason();
{
let mut guard = self.recovery_reason.lock();
*guard = reason;
}
drop(monitor);
let mut adaptation = self.adaptation.lock();
let decision = decide_next_state(&adaptation, conditions, reason, self.profile.intent());
*adaptation = decision.state;
}
fn annotate_metadata(
&self,
metadata: Option<HashMap<String, Value>>,
force_keyframe: bool,
adaptation_snapshot: &AdaptationState,
) -> Option<HashMap<String, Value>> {
let mut map = metadata.unwrap_or_default();
if let Some(reason) = *self.recovery_reason.lock() {
map.insert(
"alpine_recovery".to_string(),
json!({
"phase": "recovery",
"reason": reason.as_str(),
}),
);
}
let event_name = adaptation_snapshot
.last_event
.map(|event| event.as_str())
.unwrap_or("steady");
map.insert(
"alpine_adaptation".to_string(),
json!({
"keyframe_interval": adaptation_snapshot.keyframe_interval,
"delta_depth": adaptation_snapshot.delta_depth,
"deadline_offset_ms": adaptation_snapshot.deadline_offset_ms,
"degraded_safe": adaptation_snapshot.degraded_safe,
"frames_since_keyframe": adaptation_snapshot.frames_since_keyframe,
"force_keyframe": force_keyframe,
"event": event_name,
}),
);
Some(map)
}
fn apply_jitter(&self, channels: &[u16]) -> Vec<u16> {
match self.jitter_strategy_from_profile() {
JitterStrategy::HoldLast => {
if channels.is_empty() {
if let Some(last) = self.last_frame.lock().as_ref() {
return last.channels.clone();
}
}
channels.to_vec()
}
JitterStrategy::Drop => {
if channels.is_empty() {
Vec::new()
} else {
channels.to_vec()
}
}
JitterStrategy::Lerp => {
if let Some(last) = self.last_frame.lock().as_ref() {
let mut blended = Vec::with_capacity(channels.len());
for (idx, value) in channels.iter().enumerate() {
let prev = last.channels.get(idx).cloned().unwrap_or(0);
blended.push(((prev as u32 + *value as u32) / 2) as u16);
}
blended
} else {
channels.to_vec()
}
}
}
}
fn now_us() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64
}
fn jitter_strategy_from_profile(&self) -> JitterStrategy {
if self.profile.latency_weight() >= self.profile.resilience_weight() {
JitterStrategy::HoldLast
} else {
JitterStrategy::Lerp
}
}
}