hls_client 1.1.0

Library to generate a single stream from HLS segments
Documentation
use std::sync::{Arc, RwLock};

use bytes::Bytes;
use futures::{future, Stream};
use hls_m3u8::{types::PlaylistType, MediaPlaylist};
use mediatype::MediaTypeBuf;
use reqwest::{header, Response};
use url::Url;

use crate::errors::HLSDecoderError;

pub type ReqwestStreamItem = Result<Bytes, reqwest::Error>;
pub type ReqwestStream = Box<dyn Stream<Item = ReqwestStreamItem> + Unpin + Send + Sync>;

pub(crate) async fn fetch_playlist(url: &Url) -> Result<String, HLSDecoderError> {
    let response = reqwest::get(url.as_str()).await?;
    let text = response.text().await?;
    Ok(text)
}

pub(crate) async fn parse_media_playlist(
    playlist_text: &str,
) -> Result<MediaPlaylist, HLSDecoderError> {
    handle_media_playlist(playlist_text).await
}

pub(crate) fn is_infinite_stream(media_playlist: &MediaPlaylist) -> bool {
    media_playlist.playlist_type.unwrap_or(PlaylistType::Event) == PlaylistType::Event
}

pub(crate) fn resolve_segment_urls(media_playlist: &MediaPlaylist, base_url: &Url) -> Vec<String> {
    media_playlist
        .segments
        .iter()
        .map(|(_, segment)| {
            if segment.uri().starts_with("http") {
                segment.uri().to_string()
            } else {
                let base = base_url.as_str().trim_end_matches(".m3u8");
                format!("{}/{}", base, segment.uri())
            }
        })
        .collect()
}

pub(crate) async fn fetch_segment_responses(segment_urls: &[String]) -> Vec<Response> {
    let futures = segment_urls.iter().map(reqwest::get);
    let responses = future::join_all(futures).await;
    responses.into_iter().filter_map(Result::ok).collect()
}

pub(crate) fn compute_content_lengths(responses: &[Response]) -> Vec<u64> {
    responses
        .iter()
        .map(|resp| resp.content_length().unwrap_or(0))
        .collect()
}

pub(crate) fn compute_overall_content_length(responses: &[Response]) -> Option<u64> {
    responses
        .iter()
        .filter_map(|resp| resp.content_length())
        .reduce(|acc, len| acc + len)
}

pub(crate) fn extract_content_type(response: Option<&Response>) -> Option<String> {
    response
        .and_then(|r| {
            r.headers()
                .get(header::CONTENT_TYPE)
                .and_then(|val| val.to_str().ok())
        })
        .and_then(|ct| match ct.parse::<MediaTypeBuf>() {
            Ok(parsed) => Some(parsed.to_string()),
            Err(_) => None,
        })
}

pub(crate) fn extract_headers(responses: &[Response]) -> Vec<reqwest::header::HeaderMap> {
    responses.iter().map(|r| r.headers().clone()).collect()
}

pub(crate) fn build_streams(responses: Vec<Response>) -> Vec<Arc<RwLock<ReqwestStream>>> {
    responses
        .into_iter()
        .map(|resp| Arc::new(RwLock::new(Box::new(resp.bytes_stream()) as ReqwestStream)))
        .collect()
}

pub(crate) async fn handle_media_playlist(
    playlist: &str,
) -> Result<MediaPlaylist, HLSDecoderError> {
    let playlist = hls_m3u8::MediaPlaylist::try_from(playlist)?.into_owned();

    #[cfg(feature = "tracing")]
    tracing::info!("Media playlist detected");

    Ok(playlist)
}