voice_engine/media/
recorder.rs

1use crate::media::{AudioFrame, PcmBuf, Samples, codecs::samples_to_bytes};
2use anyhow::{Result, anyhow};
3use futures::StreamExt;
4use hound::{SampleFormat, WavSpec};
5use serde::{Deserialize, Serialize};
6use std::{
7    collections::HashMap,
8    path::{Path, PathBuf},
9    sync::{
10        Mutex,
11        atomic::{AtomicUsize, Ordering},
12    },
13    time::Duration,
14    u32,
15};
16use tokio::{
17    fs::File,
18    io::{AsyncSeekExt, AsyncWriteExt},
19    select,
20    sync::mpsc::UnboundedReceiver,
21};
22use tokio_stream::wrappers::IntervalStream;
23use tokio_util::sync::CancellationToken;
24use tracing::{info, warn};
25
26#[cfg(feature = "opus")]
27use opusic_sys::{
28    OPUS_APPLICATION_AUDIO, OPUS_OK, OpusEncoder as OpusEncoderRaw, opus_encode,
29    opus_encoder_create, opus_encoder_destroy, opus_strerror,
30};
31#[cfg(feature = "opus")]
32use std::{ffi::CStr, os::raw::c_int, ptr::NonNull};
33
34#[cfg(feature = "opus")]
35fn opus_error_message(code: c_int) -> String {
36    if code == OPUS_OK {
37        return "ok".to_string();
38    }
39
40    unsafe {
41        let ptr = opus_strerror(code);
42        if ptr.is_null() {
43            format!("error code {code}")
44        } else {
45            CStr::from_ptr(ptr).to_string_lossy().into_owned()
46        }
47    }
48}
49
50#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
51#[serde(rename_all = "lowercase")]
52pub enum RecorderFormat {
53    Wav,
54    Ogg,
55}
56
57#[cfg(feature = "opus")]
58struct OggStreamWriter {
59    encoder: NonNull<OpusEncoderRaw>,
60    serial: u32,
61    sequence: u32,
62    granule_position: u64,
63    sample_rate: u32,
64}
65
66#[cfg(feature = "opus")]
67impl OggStreamWriter {
68    fn new(sample_rate: u32) -> Result<Self> {
69        let normalized = match sample_rate {
70            8000 | 12000 | 16000 | 24000 | 48000 => sample_rate,
71            _ => 16000,
72        };
73
74        let encoder = {
75            let mut error: c_int = 0;
76            let ptr = unsafe {
77                opus_encoder_create(
78                    normalized as c_int,
79                    2,
80                    OPUS_APPLICATION_AUDIO,
81                    &mut error as *mut c_int,
82                )
83            };
84
85            if error != OPUS_OK {
86                unsafe {
87                    if !ptr.is_null() {
88                        opus_encoder_destroy(ptr);
89                    }
90                }
91                return Err(anyhow!(
92                    "Failed to create Opus encoder: {}",
93                    opus_error_message(error)
94                ));
95            }
96
97            NonNull::new(ptr)
98                .ok_or_else(|| anyhow!("Failed to create Opus encoder: null pointer returned"))?
99        };
100
101        let mut serial = rand::random::<u32>();
102        if serial == 0 {
103            serial = 1;
104        }
105
106        Ok(Self {
107            encoder,
108            serial,
109            sequence: 0,
110            granule_position: 0,
111            sample_rate: normalized,
112        })
113    }
114
115    fn sample_rate(&self) -> u32 {
116        self.sample_rate
117    }
118
119    fn granule_increment(&self, frame_samples: usize) -> u64 {
120        let factor = 48000 / self.sample_rate;
121        (frame_samples as u64) * (factor as u64)
122    }
123
124    fn encode_frame(&mut self, pcm: &[i16]) -> Result<Vec<u8>> {
125        if pcm.len() % 2 != 0 {
126            return Err(anyhow!(
127                "PCM frame must contain an even number of samples for stereo Opus encoding"
128            ));
129        }
130
131        let frame_size = (pcm.len() / 2) as c_int;
132        let mut buffer = vec![0u8; 4096];
133        let len = unsafe {
134            opus_encode(
135                self.encoder.as_ptr(),
136                pcm.as_ptr() as *const opusic_sys::opus_int16,
137                frame_size,
138                buffer.as_mut_ptr(),
139                buffer.len() as c_int,
140            )
141        };
142
143        if len < 0 {
144            return Err(anyhow!(
145                "Failed to encode Opus frame: {}",
146                opus_error_message(len)
147            ));
148        }
149
150        buffer.truncate(len as usize);
151        Ok(buffer)
152    }
153
154    async fn write_headers(&mut self, file: &mut File) -> Result<()> {
155        let head = Self::build_opus_head(self.sample_rate);
156        self.write_page(file, &head, 0, 0x02).await?;
157
158        let tags = Self::build_opus_tags();
159        self.write_page(file, &tags, 0, 0x00).await?;
160        Ok(())
161    }
162
163    async fn write_audio_packet(
164        &mut self,
165        file: &mut File,
166        packet: &[u8],
167        frame_samples: usize,
168    ) -> Result<()> {
169        let increment = self.granule_increment(frame_samples);
170        self.granule_position = self.granule_position.saturating_add(increment);
171        self.write_page(file, packet, self.granule_position, 0x00)
172            .await
173    }
174
175    async fn finalize(&mut self, file: &mut File) -> Result<()> {
176        self.write_page(file, &[], self.granule_position, 0x04)
177            .await
178    }
179
180    async fn write_page(
181        &mut self,
182        file: &mut File,
183        packet: &[u8],
184        granule_pos: u64,
185        header_type: u8,
186    ) -> Result<()> {
187        let mut segments = Vec::new();
188        if !packet.is_empty() {
189            let mut remaining = packet.len();
190            while remaining >= 255 {
191                segments.push(255u8);
192                remaining -= 255;
193            }
194            segments.push(remaining as u8);
195        }
196
197        let mut page = Vec::with_capacity(27 + segments.len() + packet.len());
198        page.extend_from_slice(b"OggS");
199        page.push(0); // version
200        page.push(header_type);
201        page.extend_from_slice(&granule_pos.to_le_bytes());
202        page.extend_from_slice(&self.serial.to_le_bytes());
203        page.extend_from_slice(&self.sequence.to_le_bytes());
204        page.extend_from_slice(&0u32.to_le_bytes()); // checksum placeholder
205        page.push(segments.len() as u8);
206        page.extend_from_slice(&segments);
207        page.extend_from_slice(packet);
208
209        let checksum = ogg_crc32(&page);
210        page[22..26].copy_from_slice(&checksum.to_le_bytes());
211
212        file.write_all(&page).await?;
213        self.sequence = self.sequence.wrapping_add(1);
214        Ok(())
215    }
216
217    fn build_opus_head(sample_rate: u32) -> Vec<u8> {
218        let mut head = Vec::with_capacity(19);
219        head.extend_from_slice(b"OpusHead");
220        head.push(1); // version
221        head.push(2); // channel count (stereo)
222        head.extend_from_slice(&0u16.to_le_bytes()); // pre-skip
223        head.extend_from_slice(&sample_rate.to_le_bytes());
224        head.extend_from_slice(&0i16.to_le_bytes()); // output gain
225        head.push(0); // channel mapping family
226        head
227    }
228
229    fn build_opus_tags() -> Vec<u8> {
230        const VENDOR: &str = "rustpbx";
231        let vendor_bytes = VENDOR.as_bytes();
232        let mut tags = Vec::with_capacity(8 + 4 + vendor_bytes.len() + 4);
233        tags.extend_from_slice(b"OpusTags");
234        tags.extend_from_slice(&(vendor_bytes.len() as u32).to_le_bytes());
235        tags.extend_from_slice(vendor_bytes);
236        tags.extend_from_slice(&0u32.to_le_bytes()); // user comment list length
237        tags
238    }
239}
240
241#[cfg(feature = "opus")]
242impl Drop for OggStreamWriter {
243    fn drop(&mut self) {
244        unsafe {
245            opus_encoder_destroy(self.encoder.as_ptr());
246        }
247    }
248}
249
250#[cfg(feature = "opus")]
251unsafe impl Send for OggStreamWriter {}
252
253#[cfg(feature = "opus")]
254unsafe impl Sync for OggStreamWriter {}
255
256#[cfg(feature = "opus")]
257fn ogg_crc32(data: &[u8]) -> u32 {
258    const POLY: u32 = 0x04C11DB7;
259    let mut crc: u32 = 0;
260    for &byte in data {
261        crc ^= (byte as u32) << 24;
262        for _ in 0..8 {
263            if (crc & 0x8000_0000) != 0 {
264                crc = (crc << 1) ^ POLY;
265            } else {
266                crc <<= 1;
267            }
268        }
269    }
270    crc
271}
272
273impl RecorderFormat {
274    pub fn extension(&self) -> &'static str {
275        match self {
276            RecorderFormat::Wav => "wav",
277            RecorderFormat::Ogg => "ogg",
278        }
279    }
280
281    pub fn is_supported(&self) -> bool {
282        match self {
283            RecorderFormat::Wav => true,
284            RecorderFormat::Ogg => cfg!(feature = "opus"),
285        }
286    }
287
288    pub fn effective(&self) -> RecorderFormat {
289        if self.is_supported() {
290            *self
291        } else {
292            RecorderFormat::Wav
293        }
294    }
295}
296
297impl Default for RecorderFormat {
298    fn default() -> Self {
299        RecorderFormat::Wav
300    }
301}
302
303#[derive(Debug, Deserialize, Serialize, Clone)]
304#[serde(rename_all = "camelCase")]
305#[serde(default)]
306pub struct RecorderOption {
307    #[serde(default)]
308    pub recorder_file: String,
309    #[serde(default)]
310    pub samplerate: u32,
311    #[serde(default)]
312    pub ptime: u32,
313    #[serde(default, skip_serializing_if = "Option::is_none")]
314    pub format: Option<RecorderFormat>,
315}
316
317impl RecorderOption {
318    pub fn new(recorder_file: String) -> Self {
319        Self {
320            recorder_file,
321            ..Default::default()
322        }
323    }
324
325    pub fn resolved_format(&self, default: RecorderFormat) -> RecorderFormat {
326        self.format.unwrap_or(default).effective()
327    }
328
329    pub fn ensure_path_extension(&mut self, fallback_format: RecorderFormat) {
330        let effective_format = self.format.unwrap_or(fallback_format).effective();
331        self.format = Some(effective_format);
332
333        if self.recorder_file.is_empty() {
334            return;
335        }
336
337        let mut path = PathBuf::from(&self.recorder_file);
338        let has_desired_ext = path
339            .extension()
340            .and_then(|ext| ext.to_str())
341            .map(|ext| ext.eq_ignore_ascii_case(effective_format.extension()))
342            .unwrap_or(false);
343
344        if !has_desired_ext {
345            path.set_extension(effective_format.extension());
346            self.recorder_file = path.to_string_lossy().into_owned();
347        }
348    }
349}
350
351impl Default for RecorderOption {
352    fn default() -> Self {
353        Self {
354            recorder_file: "".to_string(),
355            samplerate: 16000,
356            ptime: 200,
357            format: None,
358        }
359    }
360}
361
362pub struct Recorder {
363    session_id: String,
364    option: RecorderOption,
365    samples_written: AtomicUsize,
366    cancel_token: CancellationToken,
367    channel_idx: AtomicUsize,
368    channels: Mutex<HashMap<String, usize>>,
369    stereo_buf: Mutex<PcmBuf>,
370    mono_buf: Mutex<PcmBuf>,
371}
372
373impl Recorder {
374    pub fn new(
375        cancel_token: CancellationToken,
376        session_id: String,
377        option: RecorderOption,
378    ) -> Self {
379        Self {
380            session_id,
381            option,
382            samples_written: AtomicUsize::new(0),
383            cancel_token,
384            channel_idx: AtomicUsize::new(0),
385            channels: Mutex::new(HashMap::new()),
386            stereo_buf: Mutex::new(Vec::new()),
387            mono_buf: Mutex::new(Vec::new()),
388        }
389    }
390
391    async fn update_wav_header(&self, file: &mut File) -> Result<()> {
392        // Get total data size (in bytes)
393        let total_samples = self.samples_written.load(Ordering::SeqCst);
394        let data_size = total_samples * 4; // Stereo, 16-bit = 4 bytes per sample
395
396        // Create a WavSpec for the WAV header
397        let spec = WavSpec {
398            channels: 2,
399            sample_rate: self.option.samplerate,
400            bits_per_sample: 16,
401            sample_format: SampleFormat::Int,
402        };
403        // Create a memory buffer for the WAV header
404        let mut header_buf = Vec::new();
405
406        // Create a WAV header using standard structure
407        // RIFF header
408        header_buf.extend_from_slice(b"RIFF");
409        let file_size = data_size + 36; // 36 bytes for header - 8 + data bytes
410        header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
411        header_buf.extend_from_slice(b"WAVE");
412
413        // fmt subchunk - use values from WavSpec
414        header_buf.extend_from_slice(b"fmt ");
415        header_buf.extend_from_slice(&16u32.to_le_bytes()); // fmt chunk size
416        header_buf.extend_from_slice(&1u16.to_le_bytes()); // PCM format
417        header_buf.extend_from_slice(&(spec.channels as u16).to_le_bytes());
418        header_buf.extend_from_slice(&(spec.sample_rate).to_le_bytes());
419
420        // Bytes per second: sample_rate * num_channels * bytes_per_sample
421        let bytes_per_sec =
422            spec.sample_rate * (spec.channels as u32) * (spec.bits_per_sample as u32 / 8);
423        header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
424
425        // Block align: num_channels * bytes_per_sample
426        let block_align = (spec.channels as u16) * (spec.bits_per_sample / 8);
427        header_buf.extend_from_slice(&block_align.to_le_bytes());
428        header_buf.extend_from_slice(&spec.bits_per_sample.to_le_bytes());
429
430        // Data subchunk
431        header_buf.extend_from_slice(b"data");
432        header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
433
434        // Seek to beginning of file and write header
435        file.seek(std::io::SeekFrom::Start(0)).await?;
436        file.write_all(&header_buf).await?;
437
438        // Seek back to end of file for further writing
439        file.seek(std::io::SeekFrom::End(0)).await?;
440
441        Ok(())
442    }
443
444    pub async fn process_recording(
445        &self,
446        file_path: &Path,
447        receiver: UnboundedReceiver<AudioFrame>,
448    ) -> Result<()> {
449        let requested_format = self.option.format.unwrap_or(RecorderFormat::Wav);
450        let effective_format = requested_format.effective();
451
452        if requested_format != effective_format {
453            warn!(
454                session_id = self.session_id,
455                requested = requested_format.extension(),
456                "Recorder format requires unavailable feature; falling back to wav"
457            );
458        }
459
460        if effective_format == RecorderFormat::Ogg {
461            #[cfg(feature = "opus")]
462            {
463                return self.process_recording_ogg(file_path, receiver).await;
464            }
465            #[cfg(not(feature = "opus"))]
466            {
467                unreachable!(
468                    "RecorderFormat::effective() should prevent ogg when opus feature is disabled"
469                );
470            }
471        }
472
473        self.process_recording_wav(file_path, receiver).await
474    }
475
476    fn ensure_parent_dir(&self, file_path: &Path) -> Result<()> {
477        if let Some(parent) = file_path.parent() {
478            if !parent.exists() {
479                if let Err(e) = std::fs::create_dir_all(parent) {
480                    warn!(
481                        "Failed to create recording file parent directory: {} {}",
482                        e,
483                        file_path.display()
484                    );
485                    return Err(anyhow!("Failed to create recording file parent directory"));
486                }
487            }
488        }
489        Ok(())
490    }
491
492    async fn create_output_file(&self, file_path: &Path) -> Result<File> {
493        self.ensure_parent_dir(file_path)?;
494        match File::create(file_path).await {
495            Ok(file) => {
496                info!(
497                    session_id = self.session_id,
498                    "recorder: created recording file: {}",
499                    file_path.display()
500                );
501                Ok(file)
502            }
503            Err(e) => {
504                warn!(
505                    "Failed to create recording file: {} {}",
506                    e,
507                    file_path.display()
508                );
509                Err(anyhow!("Failed to create recording file"))
510            }
511        }
512    }
513
514    async fn process_recording_wav(
515        &self,
516        file_path: &Path,
517        mut receiver: UnboundedReceiver<AudioFrame>,
518    ) -> Result<()> {
519        let mut file = self.create_output_file(file_path).await?;
520        self.update_wav_header(&mut file).await?;
521        let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
522        info!(
523            session_id = self.session_id,
524            format = "wav",
525            "Recording to {} ptime: {}ms chunk_size: {}",
526            file_path.display(),
527            self.option.ptime,
528            chunk_size
529        );
530
531        let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
532            self.option.ptime as u64,
533        )));
534        loop {
535            select! {
536                Some(frame) = receiver.recv() => {
537                    self.append_frame(frame).await.ok();
538                }
539                _ = interval.next() => {
540                    let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
541                    self.process_buffers(&mut file, mono_buf, stereo_buf).await?;
542                    self.update_wav_header(&mut file).await?;
543                }
544                _ = self.cancel_token.cancelled() => {
545                    self.flush_buffers(&mut file).await?;
546                    self.update_wav_header(&mut file).await?;
547                    return Ok(());
548                }
549            }
550        }
551    }
552
553    #[cfg(feature = "opus")]
554    async fn process_recording_ogg(
555        &self,
556        file_path: &Path,
557        mut receiver: UnboundedReceiver<AudioFrame>,
558    ) -> Result<()> {
559        let mut file = self.create_output_file(file_path).await?;
560        let mut writer = OggStreamWriter::new(self.option.samplerate)?;
561        if writer.sample_rate() != self.option.samplerate {
562            warn!(
563                session_id = self.session_id,
564                requested = self.option.samplerate,
565                using = writer.sample_rate(),
566                "Adjusted recorder samplerate to Opus-compatible value"
567            );
568        }
569        writer.write_headers(&mut file).await?;
570
571        let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
572        info!(
573            session_id = self.session_id,
574            format = "ogg",
575            "Recording to {} ptime: {}ms chunk_size: {}",
576            file_path.display(),
577            self.option.ptime,
578            chunk_size
579        );
580
581        let frame_samples = std::cmp::max(1, (writer.sample_rate() / 50) as usize);
582        let frame_step = frame_samples * 2; // stereo samples
583        let mut pending: Vec<i16> = Vec::new();
584
585        let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
586            self.option.ptime as u64,
587        )));
588
589        loop {
590            select! {
591                Some(frame) = receiver.recv() => {
592                    self.append_frame(frame).await.ok();
593                }
594                _ = interval.next() => {
595                    let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
596                    if mono_buf.is_empty() && stereo_buf.is_empty() {
597                        continue;
598                    }
599
600                    let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
601                    pending.extend_from_slice(&mix);
602
603                    let encoded_samples = self
604                        .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, false)
605                        .await?;
606                    if encoded_samples > 0 {
607                        self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
608                    }
609                }
610                _ = self.cancel_token.cancelled() => {
611                    let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
612                    if !mono_buf.is_empty() || !stereo_buf.is_empty() {
613                        let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
614                        pending.extend_from_slice(&mix);
615                    }
616
617                    let encoded_samples = self
618                        .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, true)
619                        .await?;
620                    if encoded_samples > 0 {
621                        self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
622                    }
623
624                    writer.finalize(&mut file).await?;
625                    return Ok(());
626                }
627            }
628        }
629    }
630
631    #[cfg(feature = "opus")]
632    async fn encode_pending_frames(
633        &self,
634        pending: &mut Vec<i16>,
635        frame_step: usize,
636        writer: &mut OggStreamWriter,
637        file: &mut File,
638        pad_final: bool,
639    ) -> Result<usize> {
640        let mut total_samples = 0usize;
641        let samples_per_channel = frame_step / 2;
642        while pending.len() >= frame_step {
643            let frame: Vec<i16> = pending.drain(..frame_step).collect();
644            let packet = writer.encode_frame(&frame)?;
645            writer
646                .write_audio_packet(file, &packet, samples_per_channel)
647                .await?;
648            total_samples += samples_per_channel;
649        }
650
651        if pad_final && !pending.is_empty() {
652            let mut frame: Vec<i16> = pending.drain(..).collect();
653            frame.resize(frame_step, 0);
654            let packet = writer.encode_frame(&frame)?;
655            writer
656                .write_audio_packet(file, &packet, samples_per_channel)
657                .await?;
658            total_samples += samples_per_channel;
659        }
660
661        Ok(total_samples)
662    }
663
664    /// Get or assign channel index for a track
665    fn get_channel_index(&self, track_id: &str) -> usize {
666        let mut channels = self.channels.lock().unwrap();
667        if let Some(&channel_idx) = channels.get(track_id) {
668            channel_idx % 2
669        } else {
670            let new_idx = self.channel_idx.fetch_add(1, Ordering::SeqCst);
671            channels.insert(track_id.to_string(), new_idx);
672            info!(
673                session_id = self.session_id,
674                "Assigned channel {} to track: {}",
675                new_idx % 2,
676                track_id
677            );
678            new_idx % 2
679        }
680    }
681
682    async fn append_frame(&self, frame: AudioFrame) -> Result<()> {
683        let buffer = match frame.samples {
684            Samples::PCM { samples } => samples,
685            _ => return Ok(()), // ignore non-PCM frames
686        };
687
688        // Validate audio data
689        if buffer.is_empty() {
690            return Ok(());
691        }
692
693        // Get channel assignment
694        let channel_idx = self.get_channel_index(&frame.track_id);
695
696        // Add to appropriate buffer
697        match channel_idx {
698            0 => {
699                let mut mono_buf = self.mono_buf.lock().unwrap();
700                mono_buf.extend(buffer.iter());
701            }
702            1 => {
703                let mut stereo_buf = self.stereo_buf.lock().unwrap();
704                stereo_buf.extend(buffer.iter());
705            }
706            _ => {}
707        }
708
709        Ok(())
710    }
711
712    /// Extract samples from a buffer without padding
713    pub(crate) fn extract_samples(buffer: &mut PcmBuf, extract_size: usize) -> PcmBuf {
714        if extract_size > 0 && !buffer.is_empty() {
715            let take_size = extract_size.min(buffer.len());
716            buffer.drain(..take_size).collect()
717        } else {
718            Vec::new()
719        }
720    }
721
722    async fn pop(&self, chunk_size: usize) -> (PcmBuf, PcmBuf) {
723        let mut mono_buf = self.mono_buf.lock().unwrap();
724        let mut stereo_buf = self.stereo_buf.lock().unwrap();
725
726        // Limit chunk_size to prevent capacity overflow
727        let safe_chunk_size = chunk_size.min(16000 * 10); // Max 10 seconds at 16kHz
728
729        let mono_result = if mono_buf.len() >= safe_chunk_size {
730            // Sufficient data, extract complete chunk
731            Self::extract_samples(&mut mono_buf, safe_chunk_size)
732        } else if !mono_buf.is_empty() {
733            // Partial data, extract all and pad with silence
734            let available_len = mono_buf.len(); // Store length before mutable borrow
735            let mut result = Self::extract_samples(&mut mono_buf, available_len);
736            if chunk_size != usize::MAX {
737                // Don't pad when flushing
738                result.resize(safe_chunk_size, 0); // Pad with silence to chunk_size
739            }
740            result
741        } else {
742            // No data, output silence (only when not flushing)
743            if chunk_size != usize::MAX {
744                vec![0; safe_chunk_size]
745            } else {
746                Vec::new()
747            }
748        };
749
750        let stereo_result = if stereo_buf.len() >= safe_chunk_size {
751            // Sufficient data, extract complete chunk
752            Self::extract_samples(&mut stereo_buf, safe_chunk_size)
753        } else if !stereo_buf.is_empty() {
754            // Partial data, extract all and pad with silence
755            let available_len = stereo_buf.len(); // Store length before mutable borrow
756            let mut result = Self::extract_samples(&mut stereo_buf, available_len);
757            if chunk_size != usize::MAX {
758                // Don't pad when flushing
759                result.resize(safe_chunk_size, 0); // Pad with silence to chunk_size
760            }
761            result
762        } else {
763            // No data, output silence (only when not flushing)
764            if chunk_size != usize::MAX {
765                vec![0; safe_chunk_size]
766            } else {
767                Vec::new()
768            }
769        };
770
771        // Ensure buffers have equal length when flushing
772        if chunk_size == usize::MAX {
773            let max_len = mono_result.len().max(stereo_result.len());
774            let mut mono_final = mono_result;
775            let mut stereo_final = stereo_result;
776            mono_final.resize(max_len, 0);
777            stereo_final.resize(max_len, 0);
778            (mono_final, stereo_final)
779        } else {
780            (mono_result, stereo_result)
781        }
782    }
783
784    pub fn stop_recording(&self) -> Result<()> {
785        self.cancel_token.cancel();
786        Ok(())
787    }
788
789    /// Mix mono and stereo buffers into interleaved stereo output
790    pub(crate) fn mix_buffers(mono_buf: &PcmBuf, stereo_buf: &PcmBuf) -> Vec<i16> {
791        // Ensure both buffers have equal length (guaranteed by pop() method)
792        assert_eq!(
793            mono_buf.len(),
794            stereo_buf.len(),
795            "Buffer lengths must be equal after pop()"
796        );
797
798        let len = mono_buf.len();
799        let mut mix_buff = Vec::with_capacity(len * 2);
800
801        for i in 0..len {
802            mix_buff.push(mono_buf[i]); // Left channel
803            mix_buff.push(stereo_buf[i]); // Right channel
804        }
805
806        mix_buff
807    }
808
809    /// Write mixed audio data to file
810    async fn write_audio_data(
811        &self,
812        file: &mut File,
813        mono_buf: &PcmBuf,
814        stereo_buf: &PcmBuf,
815    ) -> Result<usize> {
816        let max_len = mono_buf.len().max(stereo_buf.len());
817        if max_len == 0 {
818            return Ok(0);
819        }
820
821        let mix_buff = Self::mix_buffers(mono_buf, stereo_buf);
822
823        file.seek(std::io::SeekFrom::End(0)).await?;
824        file.write_all(&samples_to_bytes(&mix_buff)).await?;
825
826        Ok(max_len)
827    }
828
829    /// Process buffers with quality checks and write to file
830    async fn process_buffers(
831        &self,
832        file: &mut File,
833        mono_buf: PcmBuf,
834        stereo_buf: PcmBuf,
835    ) -> Result<()> {
836        // Skip if no data
837        if mono_buf.is_empty() && stereo_buf.is_empty() {
838            return Ok(());
839        }
840        // Write audio data
841        let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
842        if samples_written > 0 {
843            self.samples_written
844                .fetch_add(samples_written, Ordering::SeqCst);
845        }
846        Ok(())
847    }
848
849    /// Flush all remaining buffer content
850    async fn flush_buffers(&self, file: &mut File) -> Result<()> {
851        loop {
852            let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
853
854            if mono_buf.is_empty() && stereo_buf.is_empty() {
855                break;
856            }
857
858            let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
859            if samples_written > 0 {
860                self.samples_written
861                    .fetch_add(samples_written, Ordering::SeqCst);
862            }
863        }
864
865        Ok(())
866    }
867}