flacx 0.11.0

Convert supported PCM containers to FLAC, decode FLAC back to PCM containers, and recompress existing FLAC streams.
Documentation
use std::{
    fs::{self, File, OpenOptions},
    io::{Seek, Write},
    path::{Path, PathBuf},
    sync::atomic::{AtomicUsize, Ordering},
};

use crate::{
    config::DecodeConfig,
    decode::DecodeSummary,
    error::{Error, Result},
    md5::{StreaminfoMd5, verify_streaminfo_digest},
    progress::ProgressSink,
    read::DecodePcmStream,
    stream_info::StreamInfo,
    wav_output::{StreamingPcmWriter, WavMetadataWriteOptions, ensure_output_container_enabled},
};

#[cfg(feature = "progress")]
use crate::progress::emit_progress;

static TEMP_OUTPUT_COUNTER: AtomicUsize = AtomicUsize::new(0);
const DECODE_CHUNK_FRAME_MULTIPLIER: usize = 1_024;

pub(crate) fn decode_stream_to_container<S, W, P>(
    mut stream: S,
    output: &mut W,
    metadata: crate::metadata::Metadata,
    config: DecodeConfig,
    progress: &mut P,
) -> Result<DecodeSummary>
where
    S: DecodePcmStream,
    W: Write + Seek,
    P: ProgressSink,
{
    #[cfg(not(feature = "progress"))]
    let _ = progress;
    ensure_output_container_enabled(config.output_container())?;
    let spec = stream.spec();
    let source_info = stream.stream_info();
    #[cfg(feature = "progress")]
    let total_frames = stream.total_input_frames();
    let _profile_cleanup = DecodeProfileCleanupGuard;
    let mut streaminfo_md5 = StreaminfoMd5::new(spec);
    let mut writer = StreamingPcmWriter::new(
        output,
        spec,
        &metadata,
        WavMetadataWriteOptions {
            emit_fxmd: config.emit_fxmd(),
            container: config.output_container(),
        },
    )?;

    #[cfg(feature = "progress")]
    let mut processed_samples = 0u64;
    let mut chunk = Vec::new();
    let chunk_frames = chunk_frames_for_stream(source_info);
    loop {
        chunk.clear();
        let frames = stream.read_chunk(chunk_frames, &mut chunk)?;
        if frames == 0 {
            break;
        }
        crate::read::hand_out_decode_output_pcm_frames_for_current_thread(frames);
        writer.write_samples_and_update_md5(&chunk, &mut streaminfo_md5)?;
        crate::read::release_ordered_decode_output_for_current_thread();
        #[cfg(feature = "progress")]
        {
            processed_samples += frames as u64;
        }
        #[cfg(feature = "progress")]
        emit_progress!(
            progress,
            crate::progress::ProgressSnapshot {
                processed_samples,
                total_samples: spec.total_samples,
                completed_frames: stream.completed_input_frames(),
                total_frames,
                input_bytes_read: crate::read::DecodePcmStream::input_bytes_processed(&stream),
                output_bytes_written: writer.bytes_written(),
            }
        )?;
    }

    let output = writer.finish(Some(&mut streaminfo_md5))?;
    #[cfg(not(feature = "progress"))]
    let _ = &output;
    #[cfg(feature = "progress")]
    emit_progress!(
        progress,
        crate::progress::ProgressSnapshot {
            processed_samples,
            total_samples: spec.total_samples,
            completed_frames: stream.completed_input_frames(),
            total_frames,
            input_bytes_read: crate::read::DecodePcmStream::input_bytes_processed(&stream),
            output_bytes_written: output.stream_position()?,
        }
    )?;
    verify_streaminfo_digest(streaminfo_md5.finalize()?, source_info.md5)?;
    crate::read::finish_successful_decode_profile_for_current_thread();
    Ok(summary_from_stream_info(
        source_info,
        stream.completed_input_frames(),
    ))
}

struct DecodeProfileCleanupGuard;

impl Drop for DecodeProfileCleanupGuard {
    fn drop(&mut self) {
        crate::read::clear_decode_profile_session_for_current_thread();
    }
}

fn chunk_frames_for_stream(source_info: StreamInfo) -> usize {
    usize::from(source_info.max_block_size.max(1)).saturating_mul(DECODE_CHUNK_FRAME_MULTIPLIER)
}

pub(crate) fn open_temp_output(output_path: &Path) -> Result<(PathBuf, File)> {
    let parent = output_path.parent().unwrap_or_else(|| Path::new("."));
    let pid = std::process::id();

    for _ in 0..1_024 {
        let suffix = TEMP_OUTPUT_COUNTER.fetch_add(1, Ordering::Relaxed);
        let temp_name = format!(".flacx-{pid}-{suffix}.tmp");
        let temp_path = parent.join(temp_name);
        match OpenOptions::new()
            .write(true)
            .create_new(true)
            .open(&temp_path)
        {
            Ok(file) => return Ok((temp_path, file)),
            Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => continue,
            Err(error) => return Err(error.into()),
        }
    }

    Err(Error::Thread(
        "failed to allocate a temporary output path".into(),
    ))
}

pub(crate) fn commit_temp_output(temp_path: &Path, output_path: &Path) -> Result<()> {
    match fs::rename(temp_path, output_path) {
        Ok(()) => Ok(()),
        Err(error)
            if matches!(
                error.kind(),
                std::io::ErrorKind::AlreadyExists | std::io::ErrorKind::PermissionDenied
            ) =>
        {
            let _ = fs::remove_file(output_path);
            fs::rename(temp_path, output_path)?;
            Ok(())
        }
        Err(error) => Err(error.into()),
    }
}

pub(crate) fn summary_from_stream_info(
    stream_info: StreamInfo,
    frame_count: usize,
) -> DecodeSummary {
    DecodeSummary {
        frame_count,
        total_samples: stream_info.total_samples,
        block_size: stream_info.max_block_size,
        min_frame_size: stream_info.min_frame_size,
        max_frame_size: stream_info.max_frame_size,
        min_block_size: stream_info.min_block_size,
        max_block_size: stream_info.max_block_size,
        sample_rate: stream_info.sample_rate,
        channels: stream_info.channels,
        bits_per_sample: stream_info.bits_per_sample,
    }
}