use std::{
cell::RefCell,
collections::hash_map::DefaultHasher,
env,
hash::Hasher,
io::{self, Read, Seek, SeekFrom, Write},
num::ParseIntError,
sync::{Arc, Mutex},
time::UNIX_EPOCH
};
use getrandom::getrandom;
use granulator::granule_position_for_packet;
use indexmap::{map::Entry, IndexMap};
use log::info;
use ogg::{OggReadError, PacketReader, PacketWriteEndInfo, PacketWriter, PageParsingOptions};
#[doc(inline)]
pub use ogg_vorbis_stream_mangler::{OggVorbisStreamMangler, OggVorbisStreamPassthroughMangler};
use rand_xoshiro::{
rand_core::{RngCore, SeedableRng},
Xoshiro256PlusPlus
};
use thiserror::Error;
use super::Remuxer;
use crate::vorbis::optimizer::{VorbisOptimizer, VorbisOptimizerError, VorbisOptimizerSettings};
mod granulator;
mod ogg_vorbis_stream_mangler;
#[cfg(test)]
mod test;
pub struct OggToOgg<M: OggVorbisStreamMangler> {
remuxer_settings: RefCell<Settings<M>>,
optimizer_settings: VorbisOptimizerSettings,
ogg_page_parsing_options: Arc<PageParsingOptions>
}
pub struct Settings<M: OggVorbisStreamMangler> {
pub randomize_stream_serials: bool,
pub first_stream_serial_offset: u32,
pub ignore_start_sample_offset: bool,
pub error_on_no_vorbis_streams: bool,
pub verify_ogg_page_checksums: bool,
pub vorbis_stream_mangler: M
}
impl Default for Settings<OggVorbisStreamPassthroughMangler> {
fn default() -> Self {
Self {
randomize_stream_serials: true,
first_stream_serial_offset: 0,
ignore_start_sample_offset: false,
error_on_no_vorbis_streams: true,
verify_ogg_page_checksums: true,
vorbis_stream_mangler: OggVorbisStreamPassthroughMangler
}
}
}
struct VorbisStreamState<'settings> {
optimizer: VorbisOptimizer<'settings>,
original_last_audio_packet_in_first_audio_page_granule_position: Option<(i64, usize)>,
last_written_packet_granule_position: Option<i64>,
last_written_packet_sample_block_size: Option<u16>,
start_granule_position_offset: Option<i64>,
analyzed_packet_count: usize,
optimized_packet_count: usize,
checksum: u32
}
#[derive(Debug, Error)]
pub enum RemuxError {
#[error("Ogg read error: {0}")]
OggError(#[from] OggReadError),
#[error("Vorbis optimization error: {0}")]
OptimizerError(#[from] VorbisOptimizerError),
#[error("Remuxing Ogg bitstreams with grouped logical bitstreams is not supported")]
UnsupportedStreamMultiplexing,
#[error("No Vorbis bitstream found. Is this Ogg Vorbis data?")]
NoVorbisStreamFound,
#[error("The SOURCE_DATE_EPOCH environment variable is set, but its value is invalid")]
#[cfg(any(doc, feature = "source-date-epoch"))]
InvalidSourceDateEpoch,
#[error("I/O error: {0}")]
IoError(#[from] io::Error)
}
impl<M: OggVorbisStreamMangler> Remuxer for OggToOgg<M> {
type RemuxError = RemuxError;
type RemuxerSettings = Settings<M>;
fn new(remuxer_settings: Settings<M>, optimizer_settings: VorbisOptimizerSettings) -> Self {
let mut ogg_page_parsing_options = PageParsingOptions::default();
ogg_page_parsing_options.verify_checksum = remuxer_settings.verify_ogg_page_checksums;
Self {
remuxer_settings: RefCell::new(remuxer_settings),
optimizer_settings,
ogg_page_parsing_options: Arc::new(ogg_page_parsing_options)
}
}
fn remux<R: Read + Seek, W: Write>(
&self,
mut source: R,
mut sink: W
) -> Result<W, Self::RemuxError> {
let initial_source_pos = source.stream_position()?;
let remuxer_settings = &mut *self.remuxer_settings.borrow_mut();
info!("Starting first Ogg to Ogg remux pass");
let mut vorbis_streams = first_pass(
&mut source,
&self.optimizer_settings,
remuxer_settings,
&self.ogg_page_parsing_options
)?;
info!("First Ogg to Ogg remux pass completed");
let (first_stream_serial, stream_serial_increment) =
if remuxer_settings.randomize_stream_serials {
random_stream_serial_and_increment(
remuxer_settings.first_stream_serial_offset,
vorbis_streams
.values()
.fold(0, |checksum, state| checksum ^ state.checksum)
)?
} else {
(remuxer_settings.first_stream_serial_offset, 1)
};
source.seek(SeekFrom::Start(initial_source_pos))?;
info!("Starting second Ogg to Ogg remux pass");
second_pass(
source,
&mut sink,
&mut vorbis_streams,
remuxer_settings,
&self.ogg_page_parsing_options,
first_stream_serial,
stream_serial_increment
)?;
info!("Second Ogg to Ogg remux pass completed");
Ok(sink)
}
}
fn first_pass<'settings, R: Read + Seek, M: OggVorbisStreamMangler>(
source: R,
optimizer_settings: &'settings VorbisOptimizerSettings,
remuxer_settings: &mut Settings<M>,
ogg_page_parsing_options: &Arc<PageParsingOptions>
) -> Result<IndexMap<u32, VorbisStreamState<'settings>>, RemuxError> {
let mut packet_reader =
PacketReader::new_with_page_parse_opts(source, Arc::clone(ogg_page_parsing_options));
let mut vorbis_streams = IndexMap::with_capacity(1);
let mut reading_vorbis_stream = false;
while let Some(packet) = packet_reader.read_packet()? {
let stream_serial = packet.stream_serial();
let page_checksum = packet.checksum_page();
if packet.first_in_stream() {
match VorbisOptimizer::new(optimizer_settings, packet.data) {
Ok(mut stream_optimizer) => {
if reading_vorbis_stream {
return Err(RemuxError::UnsupportedStreamMultiplexing);
}
info!(
"Analyzing Ogg Vorbis bitstream with serial {}",
stream_serial
);
let sampling_frequency = remuxer_settings
.vorbis_stream_mangler
.mangle_sampling_frequency(
stream_optimizer.identification_data.sampling_frequency
);
stream_optimizer.identification_data.sampling_frequency = sampling_frequency;
let (minimum_bitrate, nominal_bitrate, maximum_bitrate) =
remuxer_settings.vorbis_stream_mangler.mangle_bitrates(
stream_optimizer.identification_data.minimum_bitrate,
stream_optimizer.identification_data.nominal_bitrate,
stream_optimizer.identification_data.maximum_bitrate
);
stream_optimizer.identification_data.minimum_bitrate = minimum_bitrate;
stream_optimizer.identification_data.nominal_bitrate = nominal_bitrate;
stream_optimizer.identification_data.maximum_bitrate = maximum_bitrate;
vorbis_streams.insert(
stream_serial,
VorbisStreamState {
optimizer: stream_optimizer,
original_last_audio_packet_in_first_audio_page_granule_position: None,
last_written_packet_granule_position: None,
last_written_packet_sample_block_size: None,
start_granule_position_offset: None,
analyzed_packet_count: 1, optimized_packet_count: 0,
checksum: page_checksum
}
);
reading_vorbis_stream = true;
}
Err(
VorbisOptimizerError::TooSmallPacket(_)
| VorbisOptimizerError::UnexpectedPacketType { .. }
| VorbisOptimizerError::InvalidPacketType(_)
| VorbisOptimizerError::InvalidPattern
) => {
info!(
"Ignoring non-Vorbis logical bitstream with serial {}",
stream_serial
);
}
Err(error) => {
return Err(error.into());
}
}
} else if let Some(stream_state) = vorbis_streams.get_mut(&stream_serial) {
reading_vorbis_stream = !packet.last_in_stream();
let packet_sample_block_size = stream_state.optimizer.analyze_packet(&packet.data)?;
let is_not_discarded_audio_packet = packet_sample_block_size.is_some();
if stream_state.analyzed_packet_count <= 2 || is_not_discarded_audio_packet {
if let (None, true, true) = (
stream_state.original_last_audio_packet_in_first_audio_page_granule_position,
packet.last_in_page(),
is_not_discarded_audio_packet
) {
stream_state.original_last_audio_packet_in_first_audio_page_granule_position =
Some((
packet.absgp_page() as i64,
stream_state.analyzed_packet_count
));
}
stream_state.analyzed_packet_count =
stream_state.analyzed_packet_count.saturating_add(1);
stream_state.checksum ^= page_checksum;
}
}
}
if vorbis_streams.is_empty() && remuxer_settings.error_on_no_vorbis_streams {
Err(RemuxError::NoVorbisStreamFound)
} else {
Ok(vorbis_streams)
}
}
fn second_pass<R: Read + Seek, W: Write, M: OggVorbisStreamMangler>(
source: R,
sink: W,
vorbis_streams: &mut IndexMap<u32, VorbisStreamState<'_>>,
remuxer_settings: &mut Settings<M>,
ogg_page_parsing_options: &Arc<PageParsingOptions>,
first_stream_serial: u32,
stream_serial_increment: u32
) -> Result<(), RemuxError> {
let mut packet_reader =
PacketReader::new_with_page_parse_opts(source, Arc::clone(ogg_page_parsing_options));
let mut packet_writer = PacketWriter::new(sink);
let mut last_seen_vorbis_stream_serial = None;
while let Some(packet) = packet_reader.read_packet()? {
let stream_serial = packet.stream_serial();
if let Entry::Occupied(mut entry) = vorbis_streams.entry(stream_serial) {
if last_seen_vorbis_stream_serial != Some(stream_serial) {
info!(
"Optimizing Ogg Vorbis bitstream with serial {}",
stream_serial
);
}
last_seen_vorbis_stream_serial = Some(stream_serial);
let stream_index = entry.index() as u32;
let stream_state = entry.get_mut();
let packet_page_granule_position = packet.absgp_page();
let (optimized_packet, packet_sample_block_size) = if let Some(optimized_packet_data) =
stream_state.optimizer.optimize_packet(packet.data)?
{
optimized_packet_data
} else {
continue;
};
let packet_number = stream_state.optimized_packet_count;
let is_header_packet = packet_number < 3;
let is_last_stream_packet = packet_number == stream_state.analyzed_packet_count - 1;
let page_end_info = if is_last_stream_packet {
PacketWriteEndInfo::EndStream
} else if packet_number == 0 || packet_number == 2 {
PacketWriteEndInfo::EndPage
} else {
PacketWriteEndInfo::NormalPacket
};
let calculated_granule_position = granule_position_for_packet(
packet_sample_block_size,
packet_number,
packet_page_granule_position,
is_last_stream_packet,
remuxer_settings,
stream_state
);
let packet_stream_serial = first_stream_serial
.wrapping_add(stream_serial_increment.wrapping_mul(stream_index));
let packet_stream_serial = remuxer_settings
.vorbis_stream_mangler
.mangle_packet_stream_serial(
packet_stream_serial,
packet_number,
is_last_stream_packet
);
let page_end_info = remuxer_settings
.vorbis_stream_mangler
.mangle_packet_page_end_info(page_end_info, packet_number, is_last_stream_packet);
let granule_position = remuxer_settings
.vorbis_stream_mangler
.mangle_granule_position(
calculated_granule_position,
packet_number,
is_header_packet,
is_last_stream_packet
);
packet_writer.write_packet(
optimized_packet,
packet_stream_serial,
page_end_info,
granule_position as u64
)?;
stream_state.optimized_packet_count =
stream_state.optimized_packet_count.saturating_add(1);
}
}
Ok(())
}
fn random_stream_serial_and_increment(
first_stream_serial_offset: u32,
stream_serial_prng_seed_tweak: u32
) -> Result<(u32, u32), RemuxError> {
let mut random_bytes = [0; 5];
let source_date_epoch = cfg!(feature = "source-date-epoch")
.then(|| env::var_os("SOURCE_DATE_EPOCH"))
.flatten();
if source_date_epoch.is_some() || getrandom(&mut random_bytes[..]).is_err() {
static STREAM_SERIAL_PRNG: Mutex<Option<Result<Xoshiro256PlusPlus, ParseIntError>>> =
Mutex::new(None);
let mut stream_serial_prng = STREAM_SERIAL_PRNG.lock().unwrap();
let stream_serial_prng = stream_serial_prng
.get_or_insert_with(|| {
source_date_epoch
.map_or_else(
|| {
Ok(UNIX_EPOCH
.elapsed()
.unwrap_or_else(|err| err.duration())
.as_nanos() as u64)
},
|timestamp| {
timestamp
.to_str()
.unwrap_or_default()
.parse::<i128>()
.map(|timestamp| timestamp as u64)
}
)
.map(|seed| {
let tweak = {
let mut hasher = DefaultHasher::new();
hasher.write_u32(stream_serial_prng_seed_tweak);
hasher.finish()
};
Xoshiro256PlusPlus::seed_from_u64(seed ^ tweak)
})
})
.as_mut();
#[cfg(feature = "source-date-epoch")]
let stream_serial_prng = stream_serial_prng.map_err(|_| RemuxError::InvalidSourceDateEpoch)?;
#[cfg(not(feature = "source-date-epoch"))]
let stream_serial_prng = stream_serial_prng.unwrap();
stream_serial_prng.fill_bytes(&mut random_bytes);
}
Ok((
u32::from_ne_bytes(random_bytes[..4].try_into().unwrap())
.wrapping_add(first_stream_serial_offset),
1 + 2 * random_bytes[4] as u32 % 32
))
}