use crate::limits::ParseLimits;
use crate::util::{insert_bytes, resize_bytes};
use crate::{AudexError, Result, StreamInfo};
use std::collections::HashMap;
use std::fs::File;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::time::Duration;
#[cfg(feature = "async")]
use crate::util::{insert_bytes_async, resize_bytes_async};
#[cfg(feature = "async")]
use tokio::fs::File as TokioFile;
#[cfg(feature = "async")]
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
const MAX_OGG_PAGES: u32 = 500_000;
const OGG_CRC_TABLE: [u32; 256] = {
let mut table = [0u32; 256];
let mut i = 0u32;
while i < 256 {
let mut crc = i << 24;
let mut j = 0;
while j < 8 {
if crc & 0x80000000 != 0 {
crc = (crc << 1) ^ 0x04C11DB7;
} else {
crc <<= 1;
}
j += 1;
}
table[i as usize] = crc;
i += 1;
}
table
};
fn ogg_crc32(data: &[u8]) -> u32 {
let mut crc: u32 = 0;
for &byte in data {
let index = ((crc >> 24) as u8) ^ byte;
crc = (crc << 8) ^ OGG_CRC_TABLE[index as usize];
}
crc
}
mod file_utils {
use crate::Result;
use std::io::{Read, Seek, SeekFrom};
pub fn seek_end<F: Read + Seek>(fileobj: &mut F, offset: u64) -> Result<()> {
fileobj.seek(SeekFrom::End(0))?;
let filesize = fileobj.stream_position()?;
let seek_pos = filesize.saturating_sub(offset);
fileobj.seek(SeekFrom::Start(seek_pos))?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct OggPage {
pub version: u8,
pub header_type: u8,
pub position: i64,
pub serial: u32,
pub sequence: u32,
pub checksum: u32,
pub segments: Vec<u8>,
pub packets: Vec<Vec<u8>>,
pub offset: Option<i64>,
pub complete: bool,
}
impl OggPage {
pub fn first(&self) -> bool {
(self.header_type & 0x02) != 0
}
pub fn last(&self) -> bool {
(self.header_type & 0x04) != 0
}
pub fn continued(&self) -> bool {
(self.header_type & 0x01) != 0
}
pub fn new() -> Self {
Self {
version: 0,
header_type: 0,
position: 0,
serial: 0,
sequence: 0,
checksum: 0,
segments: Vec::new(),
packets: Vec::new(),
offset: None,
complete: true,
}
}
pub fn new_from_fileobj<R: Read + Seek>(fileobj: Option<&mut R>) -> Result<Self> {
match fileobj {
Some(reader) => Self::from_reader(reader),
None => Ok(Self::new()),
}
}
pub fn from_fileobj<R: Read + Seek>(fileobj: &mut R) -> Result<Self> {
Self::from_reader(fileobj)
}
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let mut file = File::open(path.as_ref()).map_err(|e| {
AudexError::Io(std::io::Error::new(
e.kind(),
format!("Failed to open file '{}': {}", path.as_ref().display(), e),
))
})?;
Self::from_reader(&mut file)
}
pub fn from_bytes(data: &[u8]) -> Result<Self> {
let mut cursor = Cursor::new(data);
Self::from_reader(&mut cursor)
}
pub fn from_vec(data: Vec<u8>) -> Result<Self> {
let mut cursor = Cursor::new(data);
Self::from_reader(&mut cursor)
}
pub fn from_reader<R: Read + Seek>(reader: &mut R) -> Result<Self> {
let offset = reader.stream_position().ok().map(|o| o as i64);
let mut header = [0u8; 27];
match reader.read_exact(&mut header) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Err(AudexError::from(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"End of file",
)));
}
Err(e) => return Err(AudexError::from(e)),
}
if &header[0..4] != b"OggS" {
error_event!("invalid OGG page signature");
return Err(AudexError::InvalidData(format!(
"Invalid OGG signature: {:?}",
&header[0..4]
)));
}
let version = header[4];
if version != 0 {
warn_event!(version = version, "unsupported OGG version");
return Err(AudexError::UnsupportedFormat(format!(
"Unsupported OGG version: {}",
version
)));
}
let header_type = header[5];
let position_u64 = u64::from_le_bytes([
header[6], header[7], header[8], header[9], header[10], header[11], header[12],
header[13],
]);
let position = position_u64 as i64;
let serial = u32::from_le_bytes([header[14], header[15], header[16], header[17]]);
let sequence = u32::from_le_bytes([header[18], header[19], header[20], header[21]]);
let checksum = u32::from_le_bytes([header[22], header[23], header[24], header[25]]);
let segment_count = header[26];
let mut segments = vec![0u8; segment_count as usize];
reader.read_exact(&mut segments)?;
let mut packets = Vec::new();
let mut current_packet = Vec::new();
for &segment_size in &segments {
let mut segment_data = vec![0u8; segment_size as usize];
reader.read_exact(&mut segment_data)?;
current_packet.extend_from_slice(&segment_data);
if segment_size < 255 {
packets.push(current_packet);
current_packet = Vec::new();
}
}
let complete = if !current_packet.is_empty() {
packets.push(current_packet);
false } else {
segments.last().is_none_or(|&s| s < 255)
};
let page = Self {
version,
header_type,
position,
serial,
sequence,
checksum,
segments,
packets,
offset,
complete,
};
let computed_crc = page.compute_read_crc()?;
if computed_crc != checksum {
return Err(AudexError::InvalidData(format!(
"OGG page CRC32 mismatch: stored={:#010X}, computed={:#010X}",
checksum, computed_crc
)));
}
Ok(page)
}
fn compute_read_crc(&self) -> Result<u32> {
let mut raw = Vec::new();
raw.extend_from_slice(b"OggS");
raw.push(self.version);
raw.push(self.header_type);
let position_u64 = self.position as u64;
raw.extend_from_slice(&position_u64.to_le_bytes());
raw.extend_from_slice(&self.serial.to_le_bytes());
raw.extend_from_slice(&self.sequence.to_le_bytes());
raw.extend_from_slice(&[0u8; 4]);
if self.segments.len() > 255 {
return Err(AudexError::InvalidData(format!(
"Ogg page has {} segments, exceeding the 255 segment limit",
self.segments.len()
)));
}
raw.push(self.segments.len() as u8);
raw.extend_from_slice(&self.segments);
for packet in &self.packets {
raw.extend_from_slice(packet);
}
Ok(self.calculate_ogg_crc32(&raw))
}
pub fn write_to<W: Write>(&self, writer: &mut W) -> Result<Vec<u8>> {
let mut data = Vec::new();
data.extend_from_slice(b"OggS");
data.push(self.version);
data.push(self.header_type);
data.extend_from_slice(&(self.position as u64).to_le_bytes());
data.extend_from_slice(&self.serial.to_le_bytes());
data.extend_from_slice(&self.sequence.to_le_bytes());
data.extend_from_slice(&[0, 0, 0, 0]);
let mut lacing_data = Vec::new();
for packet in &self.packets {
let packet_len = packet.len();
let (full_segments, remainder) = (packet_len / 255, packet_len % 255);
lacing_data.extend(std::iter::repeat_n(255u8, full_segments));
lacing_data.push(remainder as u8);
}
if !self.complete && lacing_data.last() == Some(&0) {
lacing_data.pop();
}
if lacing_data.len() > 255 {
return Err(AudexError::InvalidData(format!(
"Ogg page has {} lacing segments, exceeding the 255 segment limit",
lacing_data.len()
)));
}
data.push(lacing_data.len() as u8);
data.extend_from_slice(&lacing_data);
for packet in &self.packets {
data.extend_from_slice(packet);
}
let crc = self.calculate_ogg_crc32(&data);
data[22..26].copy_from_slice(&crc.to_le_bytes());
writer.write_all(&data)?;
Ok(data)
}
fn calculate_ogg_crc32(&self, data: &[u8]) -> u32 {
ogg_crc32(data)
}
pub fn write(&self) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
self.write_to(&mut buffer)?;
Ok(buffer)
}
pub fn calculate_crc(&self) -> Result<u32> {
let mut data = Vec::new();
self.write_to(&mut data)?;
if data.len() >= 26 {
data[22..26].copy_from_slice(&[0, 0, 0, 0]);
}
Ok(self.calculate_ogg_crc32(&data))
}
pub fn size(&self) -> usize {
let mut size = 27;
let mut segment_count = 0;
for packet in &self.packets {
let packet_len = packet.len();
let full_segments = packet_len / 255;
segment_count += full_segments + 1; }
if !self.packets.is_empty() && !self.complete {
let last_packet_len = self
.packets
.last()
.expect("packets confirmed non-empty")
.len();
if last_packet_len % 255 == 0 && segment_count > 0 {
segment_count -= 1;
}
}
size += segment_count; size += self.packets.iter().map(|p| p.len()).sum::<usize>(); size
}
pub fn validate_sync(data: &[u8]) -> bool {
data.len() >= 4 && &data[0..4] == b"OggS"
}
pub fn is_first(&self) -> bool {
(self.header_type & 0x02) != 0
}
pub fn is_last(&self) -> bool {
(self.header_type & 0x04) != 0
}
pub fn is_continued(&self) -> bool {
(self.header_type & 0x01) != 0
}
pub fn set_first(&mut self, first: bool) {
if first {
self.header_type |= 0x02;
} else {
self.header_type &= !0x02;
}
}
pub fn set_last(&mut self, last: bool) {
if last {
self.header_type |= 0x04;
} else {
self.header_type &= !0x04;
}
}
pub fn set_continued(&mut self, continued: bool) {
if continued {
self.header_type |= 0x01;
} else {
self.header_type &= !0x01;
}
}
pub fn is_complete(&self) -> bool {
self.complete
}
pub fn set_complete(&mut self, complete: bool) {
self.complete = complete;
}
pub fn find_last<R: Read + Seek>(
reader: &mut R,
serial: u32,
finishing: bool,
) -> Result<Option<Self>> {
Self::find_last_with_finishing(reader, serial, finishing)
}
#[doc(hidden)]
pub fn accumulate_page_bytes_with_limit(
limits: ParseLimits,
cumulative_bytes: &mut u64,
page: &OggPage,
context: &str,
) -> Result<()> {
let page_bytes: u64 = page.packets.iter().map(|pkt| pkt.len() as u64).sum();
*cumulative_bytes = cumulative_bytes.saturating_add(page_bytes);
limits.check_tag_size(*cumulative_bytes, context)
}
pub fn find_last_u32<R: Read + Seek>(reader: &mut R, serial: u32) -> Result<Option<Self>> {
Self::find_last(reader, serial, false)
}
pub fn find_last_with_finishing<R: Read + Seek>(
reader: &mut R,
serial: u32,
finishing: bool,
) -> Result<Option<Self>> {
file_utils::seek_end(reader, 256 * 256)?;
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
if let Some(index) = buffer.windows(4).rposition(|w| w == b"OggS") {
let mut cursor = Cursor::new(&buffer[index..]);
if let Ok(page) = Self::from_reader(&mut cursor) {
if page.serial == serial {
let is_valid = !finishing || page.position != -1;
if is_valid && page.last() {
return Ok(Some(page));
}
}
}
}
const MAX_SLOW_PATH_PAGES: u32 = 50_000;
reader.seek(SeekFrom::Start(0))?;
let mut best_page = None;
let mut pages_scanned = 0u32;
let mut cumulative_bytes: u64 = 0;
let limits = ParseLimits::default();
loop {
if pages_scanned >= MAX_SLOW_PATH_PAGES {
break;
}
match Self::from_reader(reader) {
Ok(page) => {
pages_scanned += 1;
Self::accumulate_page_bytes_with_limit(
limits,
&mut cumulative_bytes,
&page,
"OGG cumulative page data",
)?;
if page.serial == serial {
let is_valid = !finishing || page.position != -1;
if is_valid {
best_page = Some(page.clone());
}
if page.last() {
break;
}
}
}
Err(e) => {
if let AudexError::Io(io_err) = &e {
if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
}
return Ok(best_page);
}
}
}
Ok(best_page)
}
pub fn to_packets(pages: &[OggPage], strict: bool) -> Result<Vec<Vec<u8>>> {
Self::to_packets_strict(pages, strict)
}
pub fn to_packets_strict(pages: &[OggPage], strict: bool) -> Result<Vec<Vec<u8>>> {
const MAX_TOTAL_PACKET_BYTES: usize = 256 * 1024 * 1024;
if pages.is_empty() {
return Ok(Vec::new());
}
let serial = pages[0].serial;
let mut packets: Vec<Vec<u8>> = Vec::new();
let mut total_bytes: usize = 0;
if strict {
if pages[0].continued() {
return Err(AudexError::InvalidData(
"first packet is continued".to_string(),
));
}
if !pages
.last()
.expect("pages confirmed non-empty")
.is_complete()
{
return Err(AudexError::InvalidData(
"last packet does not complete".to_string(),
));
}
} else if !pages.is_empty() && pages[0].continued() {
packets.push(vec![]);
}
let mut expected_sequence = pages[0].sequence;
for page in pages.iter() {
if page.serial != serial {
return Err(AudexError::InvalidData(format!(
"invalid serial number in page: expected {}, got {}",
serial, page.serial
)));
}
if page.sequence != expected_sequence {
warn_event!(
expected = expected_sequence,
actual = page.sequence,
"OGG page sequence number mismatch"
);
return Err(AudexError::InvalidData(format!(
"bad sequence number in page: expected {}, got {}",
expected_sequence, page.sequence
)));
}
expected_sequence = expected_sequence.wrapping_add(1);
if !page.packets.is_empty() {
if page.continued() {
if let Some(last_packet) = packets.last_mut() {
total_bytes += page.packets[0].len();
if total_bytes > MAX_TOTAL_PACKET_BYTES {
return Err(AudexError::InvalidData(
"cumulative packet data exceeds 256 MB limit".to_string(),
));
}
last_packet.extend_from_slice(&page.packets[0]);
} else {
return Err(AudexError::InvalidData(
"Continued packet with no previous packet".to_string(),
));
}
for packet in &page.packets[1..] {
total_bytes += packet.len();
if total_bytes > MAX_TOTAL_PACKET_BYTES {
return Err(AudexError::InvalidData(
"cumulative packet data exceeds 256 MB limit".to_string(),
));
}
packets.push(packet.clone());
}
} else {
for packet in &page.packets {
total_bytes += packet.len();
if total_bytes > MAX_TOTAL_PACKET_BYTES {
return Err(AudexError::InvalidData(
"cumulative packet data exceeds 256 MB limit".to_string(),
));
}
packets.push(packet.clone());
}
}
}
}
Ok(packets)
}
pub fn from_packets_simple(packets: Vec<Vec<u8>>) -> Vec<OggPage> {
Self::from_packets(packets, 0, 4096, 2048)
}
pub fn from_packets_sequence(packets: Vec<Vec<u8>>, sequence: u32) -> Vec<OggPage> {
Self::from_packets(packets, sequence, 4096, 2048)
}
fn to_packets_no_sequence_validation(pages: &[OggPage]) -> Result<Vec<Vec<u8>>> {
const MAX_TOTAL_PACKET_BYTES: usize = 256 * 1024 * 1024;
if pages.is_empty() {
return Ok(Vec::new());
}
let serial = pages[0].serial;
let mut packets: Vec<Vec<u8>> = Vec::new();
let mut total_bytes: usize = 0;
for page in pages.iter() {
if page.serial != serial {
return Err(AudexError::InvalidData(format!(
"invalid serial number in page: expected {}, got {}",
serial, page.serial
)));
}
if !page.packets.is_empty() {
if page.continued() {
if let Some(last_packet) = packets.last_mut() {
total_bytes += page.packets[0].len();
if total_bytes > MAX_TOTAL_PACKET_BYTES {
return Err(AudexError::InvalidData(
"cumulative packet data exceeds 256 MB limit".to_string(),
));
}
last_packet.extend_from_slice(&page.packets[0]);
} else {
warn_event!(
serial = serial,
sequence = page.sequence,
dropped_bytes = page.packets[0].len(),
"OGG continued packet has no predecessor; dropping first segment"
);
}
for packet in &page.packets[1..] {
total_bytes += packet.len();
if total_bytes > MAX_TOTAL_PACKET_BYTES {
return Err(AudexError::InvalidData(
"cumulative packet data exceeds 256 MB limit".to_string(),
));
}
packets.push(packet.clone());
}
} else {
for packet in &page.packets {
total_bytes += packet.len();
if total_bytes > MAX_TOTAL_PACKET_BYTES {
return Err(AudexError::InvalidData(
"cumulative packet data exceeds 256 MB limit".to_string(),
));
}
packets.push(packet.clone());
}
}
}
}
Ok(packets)
}
pub fn from_packets_try_preserve(packets: Vec<Vec<u8>>, old_pages: &[OggPage]) -> Vec<OggPage> {
if old_pages.is_empty() {
return Vec::new();
}
let old_packets = match Self::to_packets_no_sequence_validation(old_pages) {
Ok(packets) => packets,
Err(_) => return Self::from_packets(packets, old_pages[0].sequence, 4096, 2048),
};
let new_sizes: Vec<usize> = packets.iter().map(|p| p.len()).collect();
let old_sizes: Vec<usize> = old_packets.iter().map(|p| p.len()).collect();
if new_sizes != old_sizes {
return Self::from_packets(packets, old_pages[0].sequence, 4096, 2048);
}
let packets_backup = packets.clone();
let mut new_data: Vec<u8> = packets.into_iter().flatten().collect();
let mut new_pages = Vec::new();
let sequence = old_pages[0].sequence;
for old_page in old_pages {
let mut new_page = OggPage::new();
new_page.sequence = old_page.sequence;
new_page.serial = old_page.serial;
new_page.complete = old_page.complete;
new_page.set_continued(old_page.continued());
new_page.position = old_page.position;
for old_packet in &old_page.packets {
let packet_len = old_packet.len();
if new_data.len() >= packet_len {
let packet_data = new_data.drain(..packet_len).collect::<Vec<u8>>();
new_page.packets.push(packet_data);
} else {
return Self::from_packets(packets_backup, sequence, 4096, 2048);
}
}
new_pages.push(new_page);
}
if !new_data.is_empty() {
return Self::from_packets(packets_backup, sequence, 4096, 2048);
}
new_pages
}
pub fn from_packets(
packets: Vec<Vec<u8>>,
sequence: u32,
default_size: usize,
wiggle_room: usize,
) -> Vec<OggPage> {
Self::from_packets_with_options(packets, sequence, default_size, wiggle_room, 0)
.unwrap_or_default()
}
pub fn from_packets_with_options(
packets: Vec<Vec<u8>>,
sequence: u32,
default_size: usize,
wiggle_room: usize,
granule_position: u64,
) -> Result<Vec<OggPage>> {
const MAX_CHUNK: usize = 254 * 255;
let chunk_size = ((default_size / 255) * 255).min(MAX_CHUNK);
let mut pages = Vec::new();
let mut page = OggPage::new();
page.sequence = sequence;
let mut page_segment_count: usize = 0;
for packet in packets {
page.packets.push(Vec::new());
let mut remaining_packet = packet;
while !remaining_packet.is_empty() {
let data_len = chunk_size.min(remaining_packet.len());
let data = remaining_packet.drain(..data_len).collect::<Vec<u8>>();
let packet_len = remaining_packet.len();
let last_pkt_len = page.packets.last().map_or(0, |p| p.len());
let new_pkt_len = last_pkt_len + data.len();
let segments_before = (last_pkt_len / 255) + if last_pkt_len > 0 { 1 } else { 0 };
let segments_after = (new_pkt_len / 255) + 1;
let extra_segments = segments_after.saturating_sub(segments_before);
if page.size() < default_size && page_segment_count + extra_segments <= 255 {
if let Some(last_packet) = page.packets.last_mut() {
last_packet.extend(data);
}
page_segment_count += extra_segments;
} else {
if let Some(last_packet) = page.packets.last() {
if !last_packet.is_empty() {
page.complete = false;
if page.packets.len() == 1 {
page.position = -1;
}
} else {
page.packets.pop();
}
}
pages.push(page);
page = OggPage::new();
let prev = pages.last().ok_or_else(|| {
AudexError::InternalError("page list empty after push".to_string())
})?;
page.set_continued(!prev.complete);
page.sequence = prev.sequence.checked_add(1).ok_or_else(|| {
AudexError::InvalidData(
"Ogg page sequence counter overflow while building pages".to_string(),
)
})?;
let new_pkt_segments = (data.len() / 255) + 1;
page_segment_count = new_pkt_segments;
page.packets.push(data);
}
if packet_len < wiggle_room {
if let Some(last_packet) = page.packets.last_mut() {
let before = (last_packet.len() / 255) + 1;
let projected_len = last_packet.len() + remaining_packet.len();
let after = (projected_len / 255) + 1;
let extra = after.saturating_sub(before);
if page_segment_count + extra <= 255 {
last_packet.extend_from_slice(&remaining_packet);
page_segment_count += extra;
remaining_packet.clear();
}
}
}
}
}
if !page.packets.is_empty() {
pages.push(page);
}
let signed_position = i64::try_from(granule_position).map_err(|_| {
AudexError::InvalidData(format!(
"Granule position {} exceeds i64::MAX and cannot be represented",
granule_position
))
})?;
if let Some(last_page) = pages.last_mut() {
last_page.position = signed_position;
}
Ok(pages)
}
pub fn renumber<R: Read + Write + Seek>(
reader: &mut R,
serial: u32,
start_sequence: u32,
) -> Result<()> {
let mut sequence = start_sequence;
let mut pages_scanned: u32 = 0;
loop {
if pages_scanned >= MAX_OGG_PAGES {
return Err(AudexError::ParseError(format!(
"Ogg renumber exceeded maximum page count ({})",
MAX_OGG_PAGES
)));
}
let page_offset = reader.stream_position()?;
match OggPage::from_reader(reader) {
Ok(mut page) => {
if page.serial == serial {
page.sequence = sequence;
reader.seek(SeekFrom::Start(page_offset))?;
let page_data = page.write()?;
reader.write_all(&page_data)?;
reader.seek(SeekFrom::Start(page_offset + page_data.len() as u64))?;
sequence = sequence.checked_add(1).ok_or_else(|| {
AudexError::InvalidData(
"Ogg page sequence counter overflow".to_string(),
)
})?;
}
pages_scanned += 1;
}
Err(e) => {
if let AudexError::Io(io_err) = &e {
if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
break; }
}
return Err(e);
}
}
}
Ok(())
}
pub fn replace<R: Read + Write + Seek + 'static>(
reader: &mut R,
old_pages: &[OggPage],
new_pages: Vec<OggPage>,
) -> Result<()> {
if old_pages.is_empty() || new_pages.is_empty() {
return Err(AudexError::InvalidData(
"empty pages list not allowed".to_string(),
));
}
let mut updated_pages = new_pages;
let first_sequence = old_pages[0].sequence;
for (i, page) in updated_pages.iter_mut().enumerate() {
page.sequence = first_sequence.checked_add(i as u32).ok_or_else(|| {
AudexError::ParseError("Ogg page sequence number overflow".to_string())
})?;
page.serial = old_pages[0].serial;
}
updated_pages[0].set_first(old_pages[0].first());
updated_pages[0].set_last(old_pages[0].last());
updated_pages[0].set_continued(old_pages[0].continued());
let old_last = old_pages
.last()
.ok_or_else(|| AudexError::InternalError("old_pages empty in replace".to_string()))?;
let old_last_first = old_last.first();
let old_last_last = old_last.last();
let old_last_complete = old_last.is_complete();
if updated_pages.len() == 1 {
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_first(old_last_first);
} else {
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_first(false);
}
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_last(old_last_last);
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_complete(old_last_complete);
let last_page = updated_pages
.last()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?;
if !last_page.is_complete() && last_page.packets.len() == 1 {
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.position = -1;
}
let new_data: Result<Vec<Vec<u8>>> = updated_pages.iter().map(|p| p.write()).collect();
let new_data = new_data?;
let mut final_data = new_data;
let pages_diff = old_pages.len() as i64 - final_data.len() as i64;
if pages_diff > 0 {
for _ in 0..pages_diff {
final_data.push(Vec::new());
}
} else if pages_diff < 0 {
let split_idx = old_pages.len().saturating_sub(1);
if split_idx < final_data.len() {
let merged: Vec<u8> = final_data.drain(split_idx..).flatten().collect();
final_data.push(merged);
}
}
#[allow(unused_assignments)]
let mut offset_adjust: i64 = 0;
let mut new_data_end: Option<u64> = None;
let min_pages = old_pages.len().min(final_data.len());
for i in 0..min_pages {
let old_page = &old_pages[i];
let data = &final_data[i];
if let Some(offset) = old_page.offset {
let adjusted = offset.checked_add(offset_adjust).ok_or_else(|| {
AudexError::InvalidData("Page offset arithmetic overflow".to_string())
})?;
if adjusted < 0 {
return Err(AudexError::InvalidData(format!(
"Adjusted page offset is negative: {}",
adjusted
)));
}
let adjusted_offset = adjusted as u64;
let data_size = data.len() as u64;
let old_size = old_page.size() as u64;
resize_bytes(reader, old_size, data_size, adjusted_offset)?;
reader.seek(SeekFrom::Start(adjusted_offset))?;
reader.write_all(data)?;
new_data_end = Some(adjusted_offset + data_size);
let data_i64 = i64::try_from(data_size).map_err(|_| {
AudexError::InvalidData("Page data size exceeds i64 range".to_string())
})?;
let old_i64 = i64::try_from(old_size).map_err(|_| {
AudexError::InvalidData("Old page size exceeds i64 range".to_string())
})?;
let delta = data_i64.checked_sub(old_i64).ok_or_else(|| {
AudexError::InvalidData(
"page size delta overflow during offset calculation".to_string(),
)
})?;
offset_adjust = offset_adjust.checked_add(delta).ok_or_else(|| {
AudexError::InvalidData(
"cumulative page offset adjustment overflow".to_string(),
)
})?;
}
}
if final_data.len() > old_pages.len() {
if let Some(insert_offset) = new_data_end {
for data in &final_data[min_pages..] {
let data_size = data.len() as u64;
if !data.is_empty() {
insert_bytes(reader, data_size, insert_offset, None)?;
reader.seek(SeekFrom::Start(insert_offset))?;
reader.write_all(data)?;
new_data_end = Some(insert_offset + data_size);
}
}
}
}
if old_pages.len() != updated_pages.len() {
if let Some(end_offset) = new_data_end {
reader.seek(SeekFrom::Start(end_offset))?;
let last_updated = updated_pages.last().ok_or_else(|| {
AudexError::InternalError("updated_pages empty during renumber".to_string())
})?;
let serial = last_updated.serial;
let next_sequence = last_updated.sequence.checked_add(1).ok_or_else(|| {
AudexError::InvalidData(
"Ogg page sequence counter overflow before renumber".to_string(),
)
})?;
Self::renumber(reader, serial, next_sequence)?;
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct OggFile {
pub pages: Vec<OggPage>,
pub streams: HashMap<u32, OggStream>,
}
impl OggFile {
pub fn new() -> Self {
Self {
pages: Vec::new(),
streams: HashMap::new(),
}
}
pub fn load<P: AsRef<Path>>(path: P) -> Result<Self> {
use std::fs::File;
use std::io::BufReader;
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut ogg_file = Self::new();
let mut pages_scanned = 0u32;
let limits = ParseLimits::default();
let mut cumulative_bytes: u64 = 0;
loop {
if pages_scanned >= MAX_OGG_PAGES {
break;
}
match OggPage::from_reader(&mut reader) {
Ok(page) => {
pages_scanned += 1;
let page_bytes: u64 = page.packets.iter().map(|pkt| pkt.len() as u64).sum();
cumulative_bytes = cumulative_bytes.saturating_add(page_bytes);
if cumulative_bytes > limits.max_tag_size {
return Err(AudexError::InvalidData(format!(
"OGG cumulative page data ({} bytes) exceeds configured limit ({} bytes)",
cumulative_bytes, limits.max_tag_size
)));
}
let serial = page.serial;
let stream = ogg_file
.streams
.entry(serial)
.or_insert_with(|| OggStream::new(serial));
for packet in &page.packets {
stream.packets.push(packet.clone());
}
ogg_file.pages.push(page);
}
Err(e) => {
if let AudexError::Io(io_err) = &e {
if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
break; }
}
return Err(e);
}
}
}
for stream in ogg_file.streams.values_mut() {
stream.detect_codec();
}
Ok(ogg_file)
}
pub fn get_stream_by_codec(&self, codec: &str) -> Option<&OggStream> {
self.streams.values().find(|stream| stream.codec == codec)
}
pub fn get_pages_for_stream(&self, serial: u32) -> Vec<&OggPage> {
self.pages
.iter()
.filter(|page| page.serial == serial)
.collect()
}
pub fn get_packets(&self, serial: u32) -> Result<Vec<Vec<u8>>> {
let pages: Vec<OggPage> = self
.pages
.iter()
.filter(|page| page.serial == serial)
.cloned()
.collect();
OggPage::to_packets(&pages, false)
}
}
#[derive(Debug, Clone)]
pub struct OggStream {
pub serial_number: u32,
pub codec: String,
pub packets: Vec<Vec<u8>>,
}
impl OggStream {
pub fn new(serial: u32) -> Self {
Self {
serial_number: serial,
codec: String::new(),
packets: Vec::new(),
}
}
pub fn detect_codec(&mut self) {
if let Some(first_packet) = self.packets.first() {
if first_packet.len() >= 8 && first_packet.starts_with(b"\x01vorbis") {
self.codec = "vorbis".to_string();
} else if first_packet.len() >= 8 && first_packet.starts_with(b"OpusHead") {
self.codec = "opus".to_string();
} else if first_packet.len() >= 8 && first_packet.starts_with(b"\x80theora") {
self.codec = "theora".to_string();
} else if first_packet.len() >= 5 && first_packet.starts_with(b"\x7FFLAC") {
self.codec = "flac".to_string();
}
}
}
pub fn identification_packet(&self) -> Option<&Vec<u8>> {
self.packets.first()
}
pub fn comment_packet(&self) -> Option<&Vec<u8>> {
self.packets.get(1)
}
pub fn setup_packet(&self) -> Option<&Vec<u8>> {
self.packets.get(2)
}
}
#[derive(Debug, Clone, Default)]
pub struct OggStreamInfo {
pub length: Option<Duration>,
pub bitrate: Option<u32>,
pub sample_rate: u32,
pub channels: u16,
pub serial: u32,
}
impl StreamInfo for OggStreamInfo {
fn length(&self) -> Option<Duration> {
self.length
}
fn bitrate(&self) -> Option<u32> {
self.bitrate
}
fn sample_rate(&self) -> Option<u32> {
if self.sample_rate > 0 {
Some(self.sample_rate)
} else {
None
}
}
fn channels(&self) -> Option<u16> {
if self.channels > 0 {
Some(self.channels)
} else {
None
}
}
fn bits_per_sample(&self) -> Option<u16> {
None }
}
impl Default for OggFile {
fn default() -> Self {
Self::new()
}
}
impl Default for OggPage {
fn default() -> Self {
Self::new()
}
}
impl PartialEq for OggPage {
fn eq(&self, other: &Self) -> bool {
match (self.write(), other.write()) {
(Ok(a), Ok(b)) => a == b,
_ => false,
}
}
}
#[cfg(feature = "async")]
pub async fn seek_end_async<F: AsyncRead + AsyncSeek + Unpin>(
fileobj: &mut F,
offset: u64,
) -> Result<()> {
fileobj.seek(SeekFrom::End(0)).await?;
let filesize = fileobj.stream_position().await?;
let seek_pos = filesize.saturating_sub(offset);
fileobj.seek(SeekFrom::Start(seek_pos)).await?;
Ok(())
}
#[cfg(feature = "async")]
impl OggPage {
pub async fn from_reader_async<R: AsyncRead + AsyncSeek + Unpin>(
reader: &mut R,
) -> Result<Self> {
let offset = reader.stream_position().await.ok().map(|o| o as i64);
let mut header = [0u8; 27];
match reader.read_exact(&mut header).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return Err(AudexError::from(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"End of file",
)));
}
Err(e) => return Err(AudexError::from(e)),
}
if &header[0..4] != b"OggS" {
return Err(AudexError::InvalidData(format!(
"Invalid OGG signature: {:?}",
&header[0..4]
)));
}
let version = header[4];
if version != 0 {
return Err(AudexError::UnsupportedFormat(format!(
"Unsupported OGG version: {}",
version
)));
}
let header_type = header[5];
let position_u64 = u64::from_le_bytes([
header[6], header[7], header[8], header[9], header[10], header[11], header[12],
header[13],
]);
let position = position_u64 as i64;
let serial = u32::from_le_bytes([header[14], header[15], header[16], header[17]]);
let sequence = u32::from_le_bytes([header[18], header[19], header[20], header[21]]);
let checksum = u32::from_le_bytes([header[22], header[23], header[24], header[25]]);
let segment_count = header[26];
let mut segments = vec![0u8; segment_count as usize];
reader.read_exact(&mut segments).await?;
let mut packets = Vec::new();
let mut current_packet = Vec::new();
for &segment_size in &segments {
let mut segment_data = vec![0u8; segment_size as usize];
reader.read_exact(&mut segment_data).await?;
current_packet.extend_from_slice(&segment_data);
if segment_size < 255 {
packets.push(current_packet);
current_packet = Vec::new();
}
}
let complete = if !current_packet.is_empty() {
packets.push(current_packet);
false } else {
segments.last().is_none_or(|&s| s < 255)
};
let page = Self {
version,
header_type,
position,
serial,
sequence,
checksum,
segments,
packets,
offset,
complete,
};
let computed_crc = page.compute_read_crc()?;
if computed_crc != checksum {
return Err(AudexError::InvalidData(format!(
"OGG page CRC32 mismatch: stored={:#010X}, computed={:#010X}",
checksum, computed_crc
)));
}
Ok(page)
}
pub async fn write_to_async<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> Result<Vec<u8>> {
let data = self.write()?;
writer.write_all(&data).await?;
Ok(data)
}
pub async fn find_last_async<R: AsyncRead + AsyncSeek + Unpin>(
reader: &mut R,
serial: u32,
finishing: bool,
) -> Result<Option<Self>> {
seek_end_async(reader, 256 * 256).await?;
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;
if let Some(index) = buffer.windows(4).rposition(|w| w == b"OggS") {
let mut cursor = std::io::Cursor::new(&buffer[index..]);
if let Ok(page) = Self::from_reader(&mut cursor) {
if page.serial == serial {
let is_valid = !finishing || page.position != -1;
if is_valid && page.last() {
return Ok(Some(page));
}
}
}
}
reader.seek(SeekFrom::Start(0)).await?;
let mut best_page = None;
let mut pages_scanned = 0u32;
let mut cumulative_bytes: u64 = 0;
let limits = ParseLimits::default();
loop {
if pages_scanned >= MAX_OGG_PAGES {
break;
}
match Self::from_reader_async(reader).await {
Ok(page) => {
pages_scanned += 1;
Self::accumulate_page_bytes_with_limit(
limits,
&mut cumulative_bytes,
&page,
"OGG cumulative page data",
)?;
if page.serial == serial {
let is_valid = !finishing || page.position != -1;
if is_valid {
best_page = Some(page.clone());
}
if page.last() {
break;
}
}
}
Err(e) => {
if let AudexError::Io(io_err) = &e {
if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
break;
}
}
return Ok(best_page);
}
}
}
Ok(best_page)
}
pub async fn renumber_async(
reader: &mut TokioFile,
serial: u32,
start_sequence: u32,
) -> Result<()> {
let mut sequence = start_sequence;
loop {
let page_offset = reader.stream_position().await?;
match OggPage::from_reader_async(reader).await {
Ok(mut page) => {
if page.serial == serial {
page.sequence = sequence;
reader.seek(SeekFrom::Start(page_offset)).await?;
let page_data = page.write()?;
reader.write_all(&page_data).await?;
reader
.seek(SeekFrom::Start(page_offset + page_data.len() as u64))
.await?;
sequence = sequence.checked_add(1).ok_or_else(|| {
AudexError::InvalidData(
"Ogg page sequence counter overflow".to_string(),
)
})?;
}
}
Err(e) => {
if let AudexError::Io(io_err) = &e {
if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
break; }
}
return Err(e);
}
}
}
Ok(())
}
pub async fn replace_async(
reader: &mut TokioFile,
old_pages: &[OggPage],
new_pages: Vec<OggPage>,
) -> Result<()> {
if old_pages.is_empty() || new_pages.is_empty() {
return Err(AudexError::InvalidData(
"empty pages list not allowed".to_string(),
));
}
let mut updated_pages = new_pages;
let first_sequence = old_pages[0].sequence;
for (i, page) in updated_pages.iter_mut().enumerate() {
page.sequence = first_sequence.checked_add(i as u32).ok_or_else(|| {
AudexError::ParseError("Ogg page sequence number overflow".to_string())
})?;
page.serial = old_pages[0].serial;
}
updated_pages[0].set_first(old_pages[0].first());
updated_pages[0].set_last(old_pages[0].last());
updated_pages[0].set_continued(old_pages[0].continued());
let old_last = old_pages.last().ok_or_else(|| {
AudexError::InternalError("old_pages empty in replace_async".to_string())
})?;
let old_last_first = old_last.first();
let old_last_last = old_last.last();
let old_last_complete = old_last.is_complete();
if updated_pages.len() == 1 {
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_first(old_last_first);
} else {
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_first(false);
}
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_last(old_last_last);
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.set_complete(old_last_complete);
let last_page = updated_pages
.last()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?;
if !last_page.is_complete() && last_page.packets.len() == 1 {
updated_pages
.last_mut()
.ok_or_else(|| AudexError::InternalError("updated_pages empty".to_string()))?
.position = -1;
}
let new_data: Result<Vec<Vec<u8>>> = updated_pages.iter().map(|p| p.write()).collect();
let new_data = new_data?;
let mut final_data = new_data;
let pages_diff = old_pages.len() as i64 - final_data.len() as i64;
if pages_diff > 0 {
for _ in 0..pages_diff {
final_data.push(Vec::new());
}
} else if pages_diff < 0 {
let split_idx = old_pages.len().saturating_sub(1);
if split_idx < final_data.len() {
let merged: Vec<u8> = final_data.drain(split_idx..).flatten().collect();
final_data.push(merged);
}
}
let mut offset_adjust: i64 = 0;
let mut new_data_end: Option<u64> = None;
let min_pages = old_pages.len().min(final_data.len());
for i in 0..min_pages {
let old_page = &old_pages[i];
let data = &final_data[i];
if let Some(offset) = old_page.offset {
let adjusted = offset.checked_add(offset_adjust).ok_or_else(|| {
AudexError::InvalidData("Page offset arithmetic overflow".to_string())
})?;
if adjusted < 0 {
return Err(AudexError::InvalidData(format!(
"Adjusted page offset is negative: {}",
adjusted
)));
}
let adjusted_offset = adjusted as u64;
let data_size = data.len() as u64;
let old_size = old_page.size() as u64;
resize_bytes_async(reader, old_size, data_size, adjusted_offset).await?;
reader.seek(SeekFrom::Start(adjusted_offset)).await?;
reader.write_all(data).await?;
new_data_end = Some(adjusted_offset + data_size);
let data_i64 = i64::try_from(data_size).map_err(|_| {
AudexError::InvalidData("Page data size exceeds i64 range".to_string())
})?;
let old_i64 = i64::try_from(old_size).map_err(|_| {
AudexError::InvalidData("Old page size exceeds i64 range".to_string())
})?;
let delta = data_i64.checked_sub(old_i64).ok_or_else(|| {
AudexError::InvalidData(
"page size delta overflow during offset calculation".to_string(),
)
})?;
offset_adjust = offset_adjust.checked_add(delta).ok_or_else(|| {
AudexError::InvalidData(
"cumulative page offset adjustment overflow".to_string(),
)
})?;
}
}
if final_data.len() > old_pages.len() {
if let Some(insert_offset) = new_data_end {
for data in &final_data[min_pages..] {
let data_size = data.len() as u64;
if !data.is_empty() {
insert_bytes_async(reader, data_size, insert_offset, None).await?;
reader.seek(SeekFrom::Start(insert_offset)).await?;
reader.write_all(data).await?;
new_data_end = Some(insert_offset + data_size);
}
}
}
}
if old_pages.len() != updated_pages.len() {
if let Some(end_offset) = new_data_end {
reader.seek(SeekFrom::Start(end_offset)).await?;
let last_updated = updated_pages.last().ok_or_else(|| {
AudexError::InternalError("updated_pages empty during renumber".to_string())
})?;
let serial = last_updated.serial;
let next_sequence = last_updated.sequence.checked_add(1).ok_or_else(|| {
AudexError::InvalidData(
"Ogg page sequence counter overflow before renumber".to_string(),
)
})?;
Self::renumber_async(reader, serial, next_sequence).await?;
}
}
reader.flush().await?;
Ok(())
}
}