#![forbid(unsafe_code)]
#![allow(clippy::cast_possible_truncation)]
use async_trait::async_trait;
use md5::{Digest, Md5};
use oximedia_core::{CodecId, OxiError, OxiResult};
use oximedia_io::MediaSource;
use std::io::SeekFrom;
use crate::mux::traits::{Muxer, MuxerConfig};
use crate::{Packet, StreamInfo};
const FLAC_MARKER: &[u8; 4] = b"fLaC";
const BLOCK_TYPE_STREAMINFO: u8 = 0;
#[allow(dead_code)]
const BLOCK_TYPE_PADDING: u8 = 1;
const BLOCK_TYPE_VORBIS_COMMENT: u8 = 4;
const BLOCK_TYPE_SEEKTABLE: u8 = 3;
const LAST_BLOCK_FLAG: u8 = 0x80;
const STREAMINFO_SIZE: usize = 34;
const DEFAULT_SEEK_TABLE_SIZE: usize = 100;
const SEEKPOINT_PLACEHOLDER: u64 = 0xFFFF_FFFF_FFFF_FFFF;
#[derive(Clone, Debug)]
pub struct FlacStreamInfo {
pub min_block_size: u16,
pub max_block_size: u16,
pub min_frame_size: u32,
pub max_frame_size: u32,
pub sample_rate: u32,
pub channels: u8,
pub bits_per_sample: u8,
pub total_samples: u64,
pub md5_signature: [u8; 16],
}
impl FlacStreamInfo {
#[must_use]
pub fn new(sample_rate: u32, channels: u8, bits_per_sample: u8) -> Self {
Self {
min_block_size: 4096,
max_block_size: 4096,
min_frame_size: 0,
max_frame_size: 0,
sample_rate,
channels,
bits_per_sample,
total_samples: 0,
md5_signature: [0u8; 16],
}
}
#[must_use]
pub fn encode(&self) -> [u8; STREAMINFO_SIZE] {
let mut data = [0u8; STREAMINFO_SIZE];
data[0..2].copy_from_slice(&self.min_block_size.to_be_bytes());
data[2..4].copy_from_slice(&self.max_block_size.to_be_bytes());
data[4] = (self.min_frame_size >> 16) as u8;
data[5] = (self.min_frame_size >> 8) as u8;
data[6] = self.min_frame_size as u8;
data[7] = (self.max_frame_size >> 16) as u8;
data[8] = (self.max_frame_size >> 8) as u8;
data[9] = self.max_frame_size as u8;
let sample_rate_20 = self.sample_rate & 0xFFFFF;
let channels_3 = (self.channels - 1) & 0x7;
let bps_5 = (self.bits_per_sample - 1) & 0x1F;
let total_samples_36 = self.total_samples & 0xF_FFFF_FFFF;
data[10] = (sample_rate_20 >> 12) as u8;
data[11] = (sample_rate_20 >> 4) as u8;
data[12] =
((sample_rate_20 << 4) | (u32::from(channels_3) << 1) | (u32::from(bps_5) >> 4)) as u8;
data[13] = ((u32::from(bps_5) << 4) | ((total_samples_36 >> 32) as u32)) as u8;
data[14] = (total_samples_36 >> 24) as u8;
data[15] = (total_samples_36 >> 16) as u8;
data[16] = (total_samples_36 >> 8) as u8;
data[17] = total_samples_36 as u8;
data[18..34].copy_from_slice(&self.md5_signature);
data
}
}
impl Default for FlacStreamInfo {
fn default() -> Self {
Self::new(44100, 2, 16)
}
}
#[derive(Clone, Copy, Debug)]
pub struct SeekPoint {
pub sample_number: u64,
pub stream_offset: u64,
pub frame_samples: u16,
}
impl SeekPoint {
#[must_use]
pub const fn new(sample_number: u64, stream_offset: u64, frame_samples: u16) -> Self {
Self {
sample_number,
stream_offset,
frame_samples,
}
}
#[must_use]
pub const fn placeholder() -> Self {
Self {
sample_number: SEEKPOINT_PLACEHOLDER,
stream_offset: 0,
frame_samples: 0,
}
}
#[must_use]
#[allow(dead_code)]
pub const fn is_placeholder(&self) -> bool {
self.sample_number == SEEKPOINT_PLACEHOLDER
}
#[must_use]
pub fn encode(&self) -> [u8; 18] {
let mut data = [0u8; 18];
data[0..8].copy_from_slice(&self.sample_number.to_be_bytes());
data[8..16].copy_from_slice(&self.stream_offset.to_be_bytes());
data[16..18].copy_from_slice(&self.frame_samples.to_be_bytes());
data
}
}
pub struct FlacMuxer<W> {
sink: W,
config: MuxerConfig,
streams: Vec<StreamInfo>,
stream_info: Option<FlacStreamInfo>,
header_written: bool,
position: u64,
streaminfo_position: u64,
seektable_position: Option<u64>,
seek_points: Vec<SeekPoint>,
first_frame_position: u64,
total_samples: u64,
min_frame_size: u32,
max_frame_size: u32,
md5_hasher: Md5,
}
impl<W> FlacMuxer<W> {
#[must_use]
pub fn new(sink: W, config: MuxerConfig) -> Self {
Self {
sink,
config,
streams: Vec::new(),
stream_info: None,
header_written: false,
position: 0,
streaminfo_position: 0,
seektable_position: None,
seek_points: Vec::with_capacity(DEFAULT_SEEK_TABLE_SIZE),
first_frame_position: 0,
total_samples: 0,
min_frame_size: 0,
max_frame_size: 0,
md5_hasher: Md5::new(),
}
}
#[must_use]
pub const fn sink(&self) -> &W {
&self.sink
}
pub fn sink_mut(&mut self) -> &mut W {
&mut self.sink
}
#[must_use]
#[allow(dead_code)]
pub fn into_sink(self) -> W {
self.sink
}
#[must_use]
pub const fn total_samples(&self) -> u64 {
self.total_samples
}
#[allow(dead_code)]
pub fn set_md5_signature(&mut self, md5: [u8; 16]) {
if let Some(ref mut info) = self.stream_info {
info.md5_signature = md5;
}
}
}
impl<W: MediaSource> FlacMuxer<W> {
async fn write_bytes(&mut self, data: &[u8]) -> OxiResult<()> {
self.sink.write_all(data).await?;
self.position += data.len() as u64;
Ok(())
}
fn build_stream_info(stream: &StreamInfo) -> FlacStreamInfo {
let sample_rate = stream.codec_params.sample_rate.unwrap_or(44100);
let channels = stream.codec_params.channels.unwrap_or(2);
FlacStreamInfo::new(sample_rate, channels, 16)
}
async fn write_marker(&mut self) -> OxiResult<()> {
self.write_bytes(FLAC_MARKER).await
}
async fn write_block_header(
&mut self,
block_type: u8,
size: u32,
is_last: bool,
) -> OxiResult<()> {
let header_byte = if is_last {
block_type | LAST_BLOCK_FLAG
} else {
block_type
};
self.write_bytes(&[header_byte]).await?;
self.write_bytes(&size.to_be_bytes()[1..4]).await }
async fn write_streaminfo(&mut self, is_last: bool) -> OxiResult<()> {
let stream_info = self
.stream_info
.clone()
.ok_or_else(|| OxiError::InvalidData("Stream info not configured".into()))?;
self.streaminfo_position = self.position;
self.write_block_header(BLOCK_TYPE_STREAMINFO, STREAMINFO_SIZE as u32, is_last)
.await?;
self.write_bytes(&stream_info.encode()).await
}
async fn write_seektable(&mut self, is_last: bool) -> OxiResult<()> {
let seek_table_size = DEFAULT_SEEK_TABLE_SIZE * 18;
self.seektable_position = Some(self.position);
self.write_block_header(BLOCK_TYPE_SEEKTABLE, seek_table_size as u32, is_last)
.await?;
for _ in 0..DEFAULT_SEEK_TABLE_SIZE {
self.write_bytes(&SeekPoint::placeholder().encode()).await?;
}
Ok(())
}
async fn write_vorbis_comment(&mut self, is_last: bool) -> OxiResult<()> {
let mut content = Vec::new();
let vendor = self.config.muxing_app.as_deref().unwrap_or("OxiMedia");
content.extend_from_slice(&(vendor.len() as u32).to_le_bytes());
content.extend_from_slice(vendor.as_bytes());
let mut comments = Vec::new();
if let Some(ref title) = self.config.title {
comments.push(format!("TITLE={title}"));
}
content.extend_from_slice(&(comments.len() as u32).to_le_bytes());
for comment in comments {
content.extend_from_slice(&(comment.len() as u32).to_le_bytes());
content.extend_from_slice(comment.as_bytes());
}
self.write_block_header(BLOCK_TYPE_VORBIS_COMMENT, content.len() as u32, is_last)
.await?;
self.write_bytes(&content).await
}
async fn fixup_streaminfo(&mut self) -> OxiResult<()> {
if let Some(ref mut stream_info) = self.stream_info {
stream_info.total_samples = self.total_samples;
stream_info.min_frame_size = self.min_frame_size;
stream_info.max_frame_size = self.max_frame_size;
let md5_result = self.md5_hasher.clone().finalize();
stream_info.md5_signature.copy_from_slice(&md5_result);
let data = stream_info.encode();
let current_pos = self.position;
self.sink
.seek(SeekFrom::Start(self.streaminfo_position + 4))
.await?;
self.sink.write_all(&data).await?;
self.sink.seek(SeekFrom::Start(current_pos)).await?;
}
Ok(())
}
async fn fixup_seektable(&mut self) -> OxiResult<()> {
if let Some(seektable_pos) = self.seektable_position {
let current_pos = self.position;
self.sink.seek(SeekFrom::Start(seektable_pos + 4)).await?;
for i in 0..DEFAULT_SEEK_TABLE_SIZE {
let point = self
.seek_points
.get(i)
.copied()
.unwrap_or_else(SeekPoint::placeholder);
self.sink.write_all(&point.encode()).await?;
}
self.sink.seek(SeekFrom::Start(current_pos)).await?;
}
Ok(())
}
}
#[async_trait]
impl<W: MediaSource> Muxer for FlacMuxer<W> {
fn add_stream(&mut self, info: StreamInfo) -> OxiResult<usize> {
if self.header_written {
return Err(OxiError::InvalidData(
"Cannot add stream after header is written".into(),
));
}
if !self.streams.is_empty() {
return Err(OxiError::unsupported(
"FLAC format only supports a single audio stream",
));
}
if info.codec != CodecId::Flac {
return Err(OxiError::unsupported(format!(
"FLAC muxer only supports FLAC codec, got {:?}",
info.codec
)));
}
self.stream_info = Some(Self::build_stream_info(&info));
let index = self.streams.len();
self.streams.push(info);
Ok(index)
}
async fn write_header(&mut self) -> OxiResult<()> {
if self.header_written {
return Err(OxiError::InvalidData("Header already written".into()));
}
if self.streams.is_empty() {
return Err(OxiError::InvalidData("No streams configured".into()));
}
self.write_marker().await?;
self.write_streaminfo(false).await?;
if self.config.write_cues {
self.write_seektable(false).await?;
}
self.write_vorbis_comment(true).await?;
self.first_frame_position = self.position;
self.header_written = true;
Ok(())
}
async fn write_packet(&mut self, packet: &Packet) -> OxiResult<()> {
if !self.header_written {
return Err(OxiError::InvalidData("Header not written".into()));
}
if packet.stream_index >= self.streams.len() {
return Err(OxiError::InvalidData(format!(
"Invalid stream index: {}",
packet.stream_index
)));
}
let frame_size = packet.data.len() as u32;
if self.min_frame_size == 0 || frame_size < self.min_frame_size {
self.min_frame_size = frame_size;
}
if frame_size > self.max_frame_size {
self.max_frame_size = frame_size;
}
self.md5_hasher.update(&packet.data);
if self.config.write_cues {
let seek_interval = self.stream_info.as_ref().map_or(44100, |s| s.sample_rate);
let block_size = self
.stream_info
.as_ref()
.map_or(4096, |s| u64::from(s.max_block_size));
if self.total_samples % u64::from(seek_interval) < block_size {
let offset = self.position - self.first_frame_position;
let point = SeekPoint::new(self.total_samples, offset, block_size as u16);
if self.seek_points.len() < DEFAULT_SEEK_TABLE_SIZE {
self.seek_points.push(point);
}
}
}
self.write_bytes(&packet.data).await?;
#[allow(clippy::cast_sign_loss)]
if let Some(duration) = packet.duration() {
self.total_samples += duration as u64;
} else {
let block_size = self
.stream_info
.as_ref()
.map_or(4096, |s| u64::from(s.max_block_size));
self.total_samples += block_size;
}
Ok(())
}
async fn write_trailer(&mut self) -> OxiResult<()> {
if !self.header_written {
return Err(OxiError::InvalidData("Header not written".into()));
}
self.fixup_streaminfo().await?;
self.fixup_seektable().await?;
Ok(())
}
fn streams(&self) -> &[StreamInfo] {
&self.streams
}
fn config(&self) -> &MuxerConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use oximedia_core::{Rational, Timestamp};
use oximedia_io::MemorySource;
fn create_flac_stream() -> StreamInfo {
let mut stream = StreamInfo::new(0, CodecId::Flac, Rational::new(1, 44100));
stream.codec_params.sample_rate = Some(44100);
stream.codec_params.channels = Some(2);
stream
}
#[test]
fn test_flac_stream_info_new() {
let info = FlacStreamInfo::new(48000, 2, 24);
assert_eq!(info.sample_rate, 48000);
assert_eq!(info.channels, 2);
assert_eq!(info.bits_per_sample, 24);
}
#[test]
fn test_flac_stream_info_encode() {
let info = FlacStreamInfo::new(44100, 2, 16);
let data = info.encode();
assert_eq!(data.len(), STREAMINFO_SIZE);
}
#[test]
fn test_seek_point_new() {
let point = SeekPoint::new(48000, 1000, 4096);
assert_eq!(point.sample_number, 48000);
assert_eq!(point.stream_offset, 1000);
assert_eq!(point.frame_samples, 4096);
}
#[test]
fn test_seek_point_placeholder() {
let point = SeekPoint::placeholder();
assert!(point.is_placeholder());
}
#[test]
fn test_seek_point_encode() {
let point = SeekPoint::new(48000, 1000, 4096);
let data = point.encode();
assert_eq!(data.len(), 18);
}
#[tokio::test]
async fn test_muxer_new() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let muxer = FlacMuxer::new(sink, config);
assert!(!muxer.header_written);
assert!(muxer.streams.is_empty());
}
#[tokio::test]
async fn test_muxer_add_stream() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let mut muxer = FlacMuxer::new(sink, config);
let flac = create_flac_stream();
let idx = muxer.add_stream(flac).expect("operation should succeed");
assert_eq!(idx, 0);
assert_eq!(muxer.streams.len(), 1);
}
#[tokio::test]
async fn test_muxer_add_multiple_streams_fails() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let mut muxer = FlacMuxer::new(sink, config);
let flac1 = create_flac_stream();
let flac2 = create_flac_stream();
muxer.add_stream(flac1).expect("operation should succeed");
let result = muxer.add_stream(flac2);
assert!(result.is_err());
}
#[tokio::test]
async fn test_muxer_add_non_flac_fails() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let mut muxer = FlacMuxer::new(sink, config);
let opus = StreamInfo::new(0, CodecId::Opus, Rational::new(1, 48000));
let result = muxer.add_stream(opus);
assert!(result.is_err());
}
#[tokio::test]
async fn test_muxer_write_header() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let mut muxer = FlacMuxer::new(sink, config);
let flac = create_flac_stream();
muxer.add_stream(flac).expect("operation should succeed");
let result = muxer.write_header().await;
assert!(result.is_ok());
assert!(muxer.header_written);
}
#[tokio::test]
async fn test_muxer_write_packet() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let mut muxer = FlacMuxer::new(sink, config);
let flac = create_flac_stream();
muxer.add_stream(flac).expect("operation should succeed");
muxer
.write_header()
.await
.expect("operation should succeed");
let packet = Packet::new(
0,
Bytes::from(vec![0xFF, 0xF8, 0x00, 0x00]), Timestamp::new(0, Rational::new(1, 44100)),
crate::PacketFlags::KEYFRAME,
);
let result = muxer.write_packet(&packet).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_muxer_write_trailer() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let mut muxer = FlacMuxer::new(sink, config);
let flac = create_flac_stream();
muxer.add_stream(flac).expect("operation should succeed");
muxer
.write_header()
.await
.expect("operation should succeed");
let result = muxer.write_trailer().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_muxer_with_title() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new().with_title("Test Song");
let mut muxer = FlacMuxer::new(sink, config);
let flac = create_flac_stream();
muxer.add_stream(flac).expect("operation should succeed");
let result = muxer.write_header().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_muxer_md5_calculation() {
let sink = MemorySource::new_writable(4096);
let config = MuxerConfig::new();
let mut muxer = FlacMuxer::new(sink, config);
let flac = create_flac_stream();
muxer.add_stream(flac).expect("operation should succeed");
muxer
.write_header()
.await
.expect("operation should succeed");
for i in 0..10 {
let packet = Packet::new(
0,
Bytes::from(vec![0xFF, 0xF8, i as u8, i as u8]),
Timestamp::new(i64::from(i) * 4096, Rational::new(1, 44_100)),
crate::PacketFlags::KEYFRAME,
);
muxer
.write_packet(&packet)
.await
.expect("operation should succeed");
}
muxer
.write_trailer()
.await
.expect("operation should succeed");
if let Some(ref info) = muxer.stream_info {
assert_ne!(info.md5_signature, [0u8; 16]);
}
}
}