voice_engine/media/
recorder.rs

1use crate::media::codecs::{Decoder, g729::G729Decoder};
2use crate::media::{AudioFrame, PcmBuf, Samples, codecs::samples_to_bytes};
3use anyhow::{Result, anyhow};
4use futures::StreamExt;
5use hound::{SampleFormat, WavSpec};
6use serde::{Deserialize, Serialize};
7use std::{
8    collections::HashMap,
9    path::{Path, PathBuf},
10    sync::{
11        Mutex,
12        atomic::{AtomicUsize, Ordering},
13    },
14    time::Duration,
15    u32,
16};
17use tokio::{
18    fs::File,
19    io::{AsyncSeekExt, AsyncWriteExt},
20    select,
21    sync::mpsc::UnboundedReceiver,
22};
23use tokio_stream::wrappers::IntervalStream;
24use tokio_util::sync::CancellationToken;
25use tracing::{info, warn};
26
27#[cfg(feature = "opus")]
28use opusic_sys::{
29    OPUS_APPLICATION_AUDIO, OPUS_OK, OpusEncoder as OpusEncoderRaw, opus_encode,
30    opus_encoder_create, opus_encoder_destroy, opus_strerror,
31};
32#[cfg(feature = "opus")]
33use std::{ffi::CStr, os::raw::c_int, ptr::NonNull};
34
35#[cfg(feature = "opus")]
36fn opus_error_message(code: c_int) -> String {
37    if code == OPUS_OK {
38        return "ok".to_string();
39    }
40
41    unsafe {
42        let ptr = opus_strerror(code);
43        if ptr.is_null() {
44            format!("error code {code}")
45        } else {
46            CStr::from_ptr(ptr).to_string_lossy().into_owned()
47        }
48    }
49}
50
51#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
52#[serde(rename_all = "lowercase")]
53pub enum RecorderFormat {
54    Wav,
55    Ogg,
56}
57
58#[cfg(feature = "opus")]
59struct OggStreamWriter {
60    encoder: NonNull<OpusEncoderRaw>,
61    serial: u32,
62    sequence: u32,
63    granule_position: u64,
64    sample_rate: u32,
65}
66
67#[cfg(feature = "opus")]
68impl OggStreamWriter {
69    fn new(sample_rate: u32) -> Result<Self> {
70        let normalized = match sample_rate {
71            8000 | 12000 | 16000 | 24000 | 48000 => sample_rate,
72            _ => 16000,
73        };
74
75        let encoder = {
76            let mut error: c_int = 0;
77            let ptr = unsafe {
78                opus_encoder_create(
79                    normalized as c_int,
80                    2,
81                    OPUS_APPLICATION_AUDIO,
82                    &mut error as *mut c_int,
83                )
84            };
85
86            if error != OPUS_OK {
87                unsafe {
88                    if !ptr.is_null() {
89                        opus_encoder_destroy(ptr);
90                    }
91                }
92                return Err(anyhow!(
93                    "Failed to create Opus encoder: {}",
94                    opus_error_message(error)
95                ));
96            }
97
98            NonNull::new(ptr)
99                .ok_or_else(|| anyhow!("Failed to create Opus encoder: null pointer returned"))?
100        };
101
102        let mut serial = rand::random::<u32>();
103        if serial == 0 {
104            serial = 1;
105        }
106
107        Ok(Self {
108            encoder,
109            serial,
110            sequence: 0,
111            granule_position: 0,
112            sample_rate: normalized,
113        })
114    }
115
116    fn sample_rate(&self) -> u32 {
117        self.sample_rate
118    }
119
120    fn granule_increment(&self, frame_samples: usize) -> u64 {
121        let factor = 48000 / self.sample_rate;
122        (frame_samples as u64) * (factor as u64)
123    }
124
125    fn encode_frame(&mut self, pcm: &[i16]) -> Result<Vec<u8>> {
126        if pcm.len() % 2 != 0 {
127            return Err(anyhow!(
128                "PCM frame must contain an even number of samples for stereo Opus encoding"
129            ));
130        }
131
132        let frame_size = (pcm.len() / 2) as c_int;
133        let mut buffer = vec![0u8; 4096];
134        let len = unsafe {
135            opus_encode(
136                self.encoder.as_ptr(),
137                pcm.as_ptr() as *const opusic_sys::opus_int16,
138                frame_size,
139                buffer.as_mut_ptr(),
140                buffer.len() as c_int,
141            )
142        };
143
144        if len < 0 {
145            return Err(anyhow!(
146                "Failed to encode Opus frame: {}",
147                opus_error_message(len)
148            ));
149        }
150
151        buffer.truncate(len as usize);
152        Ok(buffer)
153    }
154
155    async fn write_headers(&mut self, file: &mut File) -> Result<()> {
156        let head = Self::build_opus_head(self.sample_rate);
157        self.write_page(file, &head, 0, 0x02).await?;
158
159        let tags = Self::build_opus_tags();
160        self.write_page(file, &tags, 0, 0x00).await?;
161        Ok(())
162    }
163
164    async fn write_audio_packet(
165        &mut self,
166        file: &mut File,
167        packet: &[u8],
168        frame_samples: usize,
169    ) -> Result<()> {
170        let increment = self.granule_increment(frame_samples);
171        self.granule_position = self.granule_position.saturating_add(increment);
172        self.write_page(file, packet, self.granule_position, 0x00)
173            .await
174    }
175
176    async fn finalize(&mut self, file: &mut File) -> Result<()> {
177        self.write_page(file, &[], self.granule_position, 0x04)
178            .await
179    }
180
181    async fn write_page(
182        &mut self,
183        file: &mut File,
184        packet: &[u8],
185        granule_pos: u64,
186        header_type: u8,
187    ) -> Result<()> {
188        let mut segments = Vec::new();
189        if !packet.is_empty() {
190            let mut remaining = packet.len();
191            while remaining >= 255 {
192                segments.push(255u8);
193                remaining -= 255;
194            }
195            segments.push(remaining as u8);
196        }
197
198        let mut page = Vec::with_capacity(27 + segments.len() + packet.len());
199        page.extend_from_slice(b"OggS");
200        page.push(0); // version
201        page.push(header_type);
202        page.extend_from_slice(&granule_pos.to_le_bytes());
203        page.extend_from_slice(&self.serial.to_le_bytes());
204        page.extend_from_slice(&self.sequence.to_le_bytes());
205        page.extend_from_slice(&0u32.to_le_bytes()); // checksum placeholder
206        page.push(segments.len() as u8);
207        page.extend_from_slice(&segments);
208        page.extend_from_slice(packet);
209
210        let checksum = ogg_crc32(&page);
211        page[22..26].copy_from_slice(&checksum.to_le_bytes());
212
213        file.write_all(&page).await?;
214        self.sequence = self.sequence.wrapping_add(1);
215        Ok(())
216    }
217
218    fn build_opus_head(sample_rate: u32) -> Vec<u8> {
219        let mut head = Vec::with_capacity(19);
220        head.extend_from_slice(b"OpusHead");
221        head.push(1); // version
222        head.push(2); // channel count (stereo)
223        head.extend_from_slice(&0u16.to_le_bytes()); // pre-skip
224        head.extend_from_slice(&sample_rate.to_le_bytes());
225        head.extend_from_slice(&0i16.to_le_bytes()); // output gain
226        head.push(0); // channel mapping family
227        head
228    }
229
230    fn build_opus_tags() -> Vec<u8> {
231        const VENDOR: &str = "rustpbx";
232        let vendor_bytes = VENDOR.as_bytes();
233        let mut tags = Vec::with_capacity(8 + 4 + vendor_bytes.len() + 4);
234        tags.extend_from_slice(b"OpusTags");
235        tags.extend_from_slice(&(vendor_bytes.len() as u32).to_le_bytes());
236        tags.extend_from_slice(vendor_bytes);
237        tags.extend_from_slice(&0u32.to_le_bytes()); // user comment list length
238        tags
239    }
240}
241
242#[cfg(feature = "opus")]
243impl Drop for OggStreamWriter {
244    fn drop(&mut self) {
245        unsafe {
246            opus_encoder_destroy(self.encoder.as_ptr());
247        }
248    }
249}
250
251#[cfg(feature = "opus")]
252unsafe impl Send for OggStreamWriter {}
253
254#[cfg(feature = "opus")]
255unsafe impl Sync for OggStreamWriter {}
256
257#[cfg(feature = "opus")]
258fn ogg_crc32(data: &[u8]) -> u32 {
259    const POLY: u32 = 0x04C11DB7;
260    let mut crc: u32 = 0;
261    for &byte in data {
262        crc ^= (byte as u32) << 24;
263        for _ in 0..8 {
264            if (crc & 0x8000_0000) != 0 {
265                crc = (crc << 1) ^ POLY;
266            } else {
267                crc <<= 1;
268            }
269        }
270    }
271    crc
272}
273
274impl RecorderFormat {
275    pub fn extension(&self) -> &'static str {
276        match self {
277            RecorderFormat::Wav => "wav",
278            RecorderFormat::Ogg => "ogg",
279        }
280    }
281
282    pub fn is_supported(&self) -> bool {
283        match self {
284            RecorderFormat::Wav => true,
285            RecorderFormat::Ogg => cfg!(feature = "opus"),
286        }
287    }
288
289    pub fn effective(&self) -> RecorderFormat {
290        if self.is_supported() {
291            *self
292        } else {
293            RecorderFormat::Wav
294        }
295    }
296}
297
298impl Default for RecorderFormat {
299    fn default() -> Self {
300        RecorderFormat::Wav
301    }
302}
303
304#[derive(Debug, Deserialize, Serialize, Clone)]
305#[serde(rename_all = "camelCase")]
306#[serde(default)]
307pub struct RecorderOption {
308    #[serde(default)]
309    pub recorder_file: String,
310    #[serde(default)]
311    pub samplerate: u32,
312    #[serde(default)]
313    pub ptime: u32,
314    #[serde(default, skip_serializing_if = "Option::is_none")]
315    pub format: Option<RecorderFormat>,
316}
317
318impl RecorderOption {
319    pub fn new(recorder_file: String) -> Self {
320        Self {
321            recorder_file,
322            ..Default::default()
323        }
324    }
325
326    pub fn resolved_format(&self, default: RecorderFormat) -> RecorderFormat {
327        self.format.unwrap_or(default).effective()
328    }
329
330    pub fn ensure_path_extension(&mut self, fallback_format: RecorderFormat) {
331        let effective_format = self.format.unwrap_or(fallback_format).effective();
332        self.format = Some(effective_format);
333
334        if self.recorder_file.is_empty() {
335            return;
336        }
337
338        let mut path = PathBuf::from(&self.recorder_file);
339        let has_desired_ext = path
340            .extension()
341            .and_then(|ext| ext.to_str())
342            .map(|ext| ext.eq_ignore_ascii_case(effective_format.extension()))
343            .unwrap_or(false);
344
345        if !has_desired_ext {
346            path.set_extension(effective_format.extension());
347            self.recorder_file = path.to_string_lossy().into_owned();
348        }
349    }
350}
351
352impl Default for RecorderOption {
353    fn default() -> Self {
354        Self {
355            recorder_file: "".to_string(),
356            samplerate: 16000,
357            ptime: 200,
358            format: None,
359        }
360    }
361}
362
363pub struct G729WavReader {
364    decoder: G729Decoder,
365    file_path: PathBuf,
366}
367
368impl G729WavReader {
369    pub fn new(path: PathBuf) -> Self {
370        Self {
371            decoder: G729Decoder::new(),
372            file_path: path,
373        }
374    }
375
376    pub async fn read_all(&mut self) -> Result<Vec<i16>> {
377        let data = tokio::fs::read(&self.file_path).await?;
378        Ok(self.decoder.decode(&data))
379    }
380}
381
382pub struct Recorder {
383    session_id: String,
384    option: RecorderOption,
385    samples_written: AtomicUsize,
386    cancel_token: CancellationToken,
387    channel_idx: AtomicUsize,
388    channels: Mutex<HashMap<String, usize>>,
389    stereo_buf: Mutex<PcmBuf>,
390    mono_buf: Mutex<PcmBuf>,
391}
392
393impl Recorder {
394    pub fn new(
395        cancel_token: CancellationToken,
396        session_id: String,
397        option: RecorderOption,
398    ) -> Self {
399        Self {
400            session_id,
401            option,
402            samples_written: AtomicUsize::new(0),
403            cancel_token,
404            channel_idx: AtomicUsize::new(0),
405            channels: Mutex::new(HashMap::new()),
406            stereo_buf: Mutex::new(Vec::new()),
407            mono_buf: Mutex::new(Vec::new()),
408        }
409    }
410
411    async fn update_wav_header(&self, file: &mut File) -> Result<()> {
412        // Get total data size (in bytes)
413        let total_samples = self.samples_written.load(Ordering::SeqCst);
414        let data_size = total_samples * 4; // Stereo, 16-bit = 4 bytes per sample
415
416        // Create a WavSpec for the WAV header
417        let spec = WavSpec {
418            channels: 2,
419            sample_rate: self.option.samplerate,
420            bits_per_sample: 16,
421            sample_format: SampleFormat::Int,
422        };
423        // Create a memory buffer for the WAV header
424        let mut header_buf = Vec::new();
425
426        // Create a WAV header using standard structure
427        // RIFF header
428        header_buf.extend_from_slice(b"RIFF");
429        let file_size = data_size + 36; // 36 bytes for header - 8 + data bytes
430        header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
431        header_buf.extend_from_slice(b"WAVE");
432
433        // fmt subchunk - use values from WavSpec
434        header_buf.extend_from_slice(b"fmt ");
435        header_buf.extend_from_slice(&16u32.to_le_bytes()); // fmt chunk size
436        header_buf.extend_from_slice(&1u16.to_le_bytes()); // PCM format
437        header_buf.extend_from_slice(&(spec.channels as u16).to_le_bytes());
438        header_buf.extend_from_slice(&(spec.sample_rate).to_le_bytes());
439
440        // Bytes per second: sample_rate * num_channels * bytes_per_sample
441        let bytes_per_sec =
442            spec.sample_rate * (spec.channels as u32) * (spec.bits_per_sample as u32 / 8);
443        header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
444
445        // Block align: num_channels * bytes_per_sample
446        let block_align = (spec.channels as u16) * (spec.bits_per_sample / 8);
447        header_buf.extend_from_slice(&block_align.to_le_bytes());
448        header_buf.extend_from_slice(&spec.bits_per_sample.to_le_bytes());
449
450        // Data subchunk
451        header_buf.extend_from_slice(b"data");
452        header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
453
454        // Seek to beginning of file and write header
455        file.seek(std::io::SeekFrom::Start(0)).await?;
456        file.write_all(&header_buf).await?;
457
458        // Seek back to end of file for further writing
459        file.seek(std::io::SeekFrom::End(0)).await?;
460
461        Ok(())
462    }
463
464    pub async fn process_recording(
465        &self,
466        file_path: &Path,
467        mut receiver: UnboundedReceiver<AudioFrame>,
468    ) -> Result<()> {
469        // Peek first frame to decide mode
470        let first_frame = match receiver.recv().await {
471            Some(f) => f,
472            None => return Ok(()),
473        };
474
475        if let Samples::RTP { .. } = first_frame.samples {
476            return self
477                .process_recording_rtp(file_path, receiver, first_frame)
478                .await;
479        }
480
481        let requested_format = self.option.format.unwrap_or(RecorderFormat::Wav);
482        let effective_format = requested_format.effective();
483
484        if requested_format != effective_format {
485            warn!(
486                session_id = self.session_id,
487                requested = requested_format.extension(),
488                "Recorder format requires unavailable feature; falling back to wav"
489            );
490        }
491
492        if effective_format == RecorderFormat::Ogg {
493            #[cfg(feature = "opus")]
494            {
495                return self
496                    .process_recording_ogg(file_path, receiver, first_frame)
497                    .await;
498            }
499            #[cfg(not(feature = "opus"))]
500            {
501                unreachable!(
502                    "RecorderFormat::effective() should prevent ogg when opus feature is disabled"
503                );
504            }
505        }
506
507        self.process_recording_wav(file_path, receiver, first_frame)
508            .await
509    }
510
511    fn ensure_parent_dir(&self, file_path: &Path) -> Result<()> {
512        if let Some(parent) = file_path.parent() {
513            if !parent.exists() {
514                if let Err(e) = std::fs::create_dir_all(parent) {
515                    warn!(
516                        "Failed to create recording file parent directory: {} {}",
517                        e,
518                        file_path.display()
519                    );
520                    return Err(anyhow!("Failed to create recording file parent directory"));
521                }
522            }
523        }
524        Ok(())
525    }
526
527    async fn create_output_file(&self, file_path: &Path) -> Result<File> {
528        self.ensure_parent_dir(file_path)?;
529        match File::create(file_path).await {
530            Ok(file) => {
531                info!(
532                    session_id = self.session_id,
533                    "recorder: created recording file: {}",
534                    file_path.display()
535                );
536                Ok(file)
537            }
538            Err(e) => {
539                warn!(
540                    "Failed to create recording file: {} {}",
541                    e,
542                    file_path.display()
543                );
544                Err(anyhow!("Failed to create recording file"))
545            }
546        }
547    }
548
549    async fn update_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
550        let total_bytes = self.samples_written.load(Ordering::SeqCst);
551        let data_size = total_bytes;
552
553        let (format_tag, sample_rate, channels): (u16, u32, u16) = match payload_type {
554            0 => (0x0007, 8000, 1),  // PCMU
555            8 => (0x0006, 8000, 1),  // PCMA
556            9 => (0x0064, 16000, 1), // G722
557            _ => return Ok(()),      // Should not happen for WAV
558        };
559
560        let mut header_buf = Vec::new();
561        header_buf.extend_from_slice(b"RIFF");
562        let file_size = data_size + 36;
563        header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
564        header_buf.extend_from_slice(b"WAVE");
565
566        header_buf.extend_from_slice(b"fmt ");
567        header_buf.extend_from_slice(&16u32.to_le_bytes());
568        header_buf.extend_from_slice(&format_tag.to_le_bytes());
569        header_buf.extend_from_slice(&(channels as u16).to_le_bytes());
570        header_buf.extend_from_slice(&sample_rate.to_le_bytes());
571
572        // Byte rate
573        let bytes_per_sec: u32 = match payload_type {
574            9 => 8000,                                // G.722 is 64kbps
575            _ => sample_rate * (channels as u32) * 1, // G.711 is 8-bit
576        };
577        header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
578
579        // Block align
580        let block_align: u16 = match payload_type {
581            9 => 1,
582            _ => 1,
583        };
584        header_buf.extend_from_slice(&block_align.to_le_bytes());
585
586        // Bits per sample
587        let bits_per_sample: u16 = match payload_type {
588            9 => 4,
589            _ => 8,
590        };
591        header_buf.extend_from_slice(&bits_per_sample.to_le_bytes());
592
593        header_buf.extend_from_slice(b"data");
594        header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
595
596        file.seek(std::io::SeekFrom::Start(0)).await?;
597        file.write_all(&header_buf).await?;
598        file.seek(std::io::SeekFrom::End(0)).await?;
599
600        Ok(())
601    }
602
603    async fn process_recording_rtp(
604        &self,
605        file_path: &Path,
606        mut receiver: UnboundedReceiver<AudioFrame>,
607        first_frame: AudioFrame,
608    ) -> Result<()> {
609        let (payload_type, mut file) =
610            if let Samples::RTP { payload_type, .. } = &first_frame.samples {
611                let mut path = file_path.to_path_buf();
612                // Adjust extension if needed
613                if *payload_type == 18 {
614                    path.set_extension("g729");
615                } else if matches!(payload_type, 0 | 8 | 9) {
616                    path.set_extension("wav");
617                }
618
619                let file = self.create_output_file(&path).await?;
620                (*payload_type, file)
621            } else {
622                return Err(anyhow!("Invalid frame type for RTP recording"));
623            };
624
625        // Write header if needed
626        match payload_type {
627            0 | 8 | 9 => {
628                self.write_wav_header_rtp(&mut file, payload_type).await?;
629            }
630            _ => {}
631        }
632
633        if let Samples::RTP { payload, .. } = first_frame.samples {
634            file.write_all(&payload).await?;
635            self.samples_written
636                .fetch_add(payload.len(), Ordering::SeqCst);
637        }
638
639        loop {
640            match receiver.recv().await {
641                Some(frame) => {
642                    if let Samples::RTP { payload, .. } = frame.samples {
643                        file.write_all(&payload).await?;
644                        self.samples_written
645                            .fetch_add(payload.len(), Ordering::SeqCst);
646                    }
647                }
648                None => break,
649            }
650        }
651
652        // Finalize
653        match payload_type {
654            0 | 8 | 9 => {
655                self.update_wav_header_rtp(&mut file, payload_type).await?;
656            }
657            _ => {}
658        }
659
660        file.sync_all().await?;
661
662        Ok(())
663    }
664
665    // Helper for initial header write (just placeholders)
666    async fn write_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
667        self.update_wav_header_rtp(file, payload_type).await
668    }
669
670    async fn process_recording_wav(
671        &self,
672        file_path: &Path,
673        mut receiver: UnboundedReceiver<AudioFrame>,
674        first_frame: AudioFrame,
675    ) -> Result<()> {
676        let mut file = self.create_output_file(file_path).await?;
677        self.update_wav_header(&mut file).await?;
678
679        self.append_frame(first_frame).await.ok();
680
681        let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
682        info!(
683            session_id = self.session_id,
684            format = "wav",
685            "Recording to {} ptime: {}ms chunk_size: {}",
686            file_path.display(),
687            self.option.ptime,
688            chunk_size
689        );
690
691        let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
692            self.option.ptime as u64,
693        )));
694        loop {
695            select! {
696                Some(frame) = receiver.recv() => {
697                    self.append_frame(frame).await.ok();
698                }
699                _ = interval.next() => {
700                    let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
701                    self.process_buffers(&mut file, mono_buf, stereo_buf).await?;
702                    self.update_wav_header(&mut file).await?;
703                }
704                _ = self.cancel_token.cancelled() => {
705                    self.flush_buffers(&mut file).await?;
706                    self.update_wav_header(&mut file).await?;
707                    return Ok(());
708                }
709            }
710        }
711    }
712
713    #[cfg(feature = "opus")]
714    async fn process_recording_ogg(
715        &self,
716        file_path: &Path,
717        mut receiver: UnboundedReceiver<AudioFrame>,
718        first_frame: AudioFrame,
719    ) -> Result<()> {
720        let mut file = self.create_output_file(file_path).await?;
721        let mut writer = OggStreamWriter::new(self.option.samplerate)?;
722        if writer.sample_rate() != self.option.samplerate {
723            warn!(
724                session_id = self.session_id,
725                requested = self.option.samplerate,
726                using = writer.sample_rate(),
727                "Adjusted recorder samplerate to Opus-compatible value"
728            );
729        }
730        writer.write_headers(&mut file).await?;
731
732        self.append_frame(first_frame).await.ok();
733
734        let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
735        info!(
736            session_id = self.session_id,
737            format = "ogg",
738            "Recording to {} ptime: {}ms chunk_size: {}",
739            file_path.display(),
740            self.option.ptime,
741            chunk_size
742        );
743
744        let frame_samples = std::cmp::max(1, (writer.sample_rate() / 50) as usize);
745        let frame_step = frame_samples * 2; // stereo samples
746        let mut pending: Vec<i16> = Vec::new();
747
748        let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
749            self.option.ptime as u64,
750        )));
751
752        loop {
753            select! {
754                Some(frame) = receiver.recv() => {
755                    self.append_frame(frame).await.ok();
756                }
757                _ = interval.next() => {
758                    let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
759                    if mono_buf.is_empty() && stereo_buf.is_empty() {
760                        continue;
761                    }
762
763                    let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
764                    pending.extend_from_slice(&mix);
765
766                    let encoded_samples = self
767                        .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, false)
768                        .await?;
769                    if encoded_samples > 0 {
770                        self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
771                    }
772                }
773                _ = self.cancel_token.cancelled() => {
774                    let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
775                    if !mono_buf.is_empty() || !stereo_buf.is_empty() {
776                        let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
777                        pending.extend_from_slice(&mix);
778                    }
779
780                    let encoded_samples = self
781                        .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, true)
782                        .await?;
783                    if encoded_samples > 0 {
784                        self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
785                    }
786
787                    writer.finalize(&mut file).await?;
788                    return Ok(());
789                }
790            }
791        }
792    }
793
794    #[cfg(feature = "opus")]
795    async fn encode_pending_frames(
796        &self,
797        pending: &mut Vec<i16>,
798        frame_step: usize,
799        writer: &mut OggStreamWriter,
800        file: &mut File,
801        pad_final: bool,
802    ) -> Result<usize> {
803        let mut total_samples = 0usize;
804        let samples_per_channel = frame_step / 2;
805        while pending.len() >= frame_step {
806            let frame: Vec<i16> = pending.drain(..frame_step).collect();
807            let packet = writer.encode_frame(&frame)?;
808            writer
809                .write_audio_packet(file, &packet, samples_per_channel)
810                .await?;
811            total_samples += samples_per_channel;
812        }
813
814        if pad_final && !pending.is_empty() {
815            let mut frame: Vec<i16> = pending.drain(..).collect();
816            frame.resize(frame_step, 0);
817            let packet = writer.encode_frame(&frame)?;
818            writer
819                .write_audio_packet(file, &packet, samples_per_channel)
820                .await?;
821            total_samples += samples_per_channel;
822        }
823
824        Ok(total_samples)
825    }
826
827    /// Get or assign channel index for a track
828    fn get_channel_index(&self, track_id: &str) -> usize {
829        let mut channels = self.channels.lock().unwrap();
830        if let Some(&channel_idx) = channels.get(track_id) {
831            channel_idx % 2
832        } else {
833            let new_idx = self.channel_idx.fetch_add(1, Ordering::SeqCst);
834            channels.insert(track_id.to_string(), new_idx);
835            info!(
836                session_id = self.session_id,
837                "Assigned channel {} to track: {}",
838                new_idx % 2,
839                track_id
840            );
841            new_idx % 2
842        }
843    }
844
845    async fn append_frame(&self, frame: AudioFrame) -> Result<()> {
846        let buffer = match frame.samples {
847            Samples::PCM { samples } => samples,
848            _ => return Ok(()), // ignore non-PCM frames
849        };
850
851        // Validate audio data
852        if buffer.is_empty() {
853            return Ok(());
854        }
855
856        // Get channel assignment
857        let channel_idx = self.get_channel_index(&frame.track_id);
858
859        // Add to appropriate buffer
860        match channel_idx {
861            0 => {
862                let mut mono_buf = self.mono_buf.lock().unwrap();
863                mono_buf.extend(buffer.iter());
864            }
865            1 => {
866                let mut stereo_buf = self.stereo_buf.lock().unwrap();
867                stereo_buf.extend(buffer.iter());
868            }
869            _ => {}
870        }
871
872        Ok(())
873    }
874
875    /// Extract samples from a buffer without padding
876    pub(crate) fn extract_samples(buffer: &mut PcmBuf, extract_size: usize) -> PcmBuf {
877        if extract_size > 0 && !buffer.is_empty() {
878            let take_size = extract_size.min(buffer.len());
879            buffer.drain(..take_size).collect()
880        } else {
881            Vec::new()
882        }
883    }
884
885    async fn pop(&self, chunk_size: usize) -> (PcmBuf, PcmBuf) {
886        let mut mono_buf = self.mono_buf.lock().unwrap();
887        let mut stereo_buf = self.stereo_buf.lock().unwrap();
888
889        // Limit chunk_size to prevent capacity overflow
890        let safe_chunk_size = chunk_size.min(16000 * 10); // Max 10 seconds at 16kHz
891
892        let mono_result = if mono_buf.len() >= safe_chunk_size {
893            // Sufficient data, extract complete chunk
894            Self::extract_samples(&mut mono_buf, safe_chunk_size)
895        } else if !mono_buf.is_empty() {
896            // Partial data, extract all and pad with silence
897            let available_len = mono_buf.len(); // Store length before mutable borrow
898            let mut result = Self::extract_samples(&mut mono_buf, available_len);
899            if chunk_size != usize::MAX {
900                // Don't pad when flushing
901                result.resize(safe_chunk_size, 0); // Pad with silence to chunk_size
902            }
903            result
904        } else {
905            // No data, output silence (only when not flushing)
906            if chunk_size != usize::MAX {
907                vec![0; safe_chunk_size]
908            } else {
909                Vec::new()
910            }
911        };
912
913        let stereo_result = if stereo_buf.len() >= safe_chunk_size {
914            // Sufficient data, extract complete chunk
915            Self::extract_samples(&mut stereo_buf, safe_chunk_size)
916        } else if !stereo_buf.is_empty() {
917            // Partial data, extract all and pad with silence
918            let available_len = stereo_buf.len(); // Store length before mutable borrow
919            let mut result = Self::extract_samples(&mut stereo_buf, available_len);
920            if chunk_size != usize::MAX {
921                // Don't pad when flushing
922                result.resize(safe_chunk_size, 0); // Pad with silence to chunk_size
923            }
924            result
925        } else {
926            // No data, output silence (only when not flushing)
927            if chunk_size != usize::MAX {
928                vec![0; safe_chunk_size]
929            } else {
930                Vec::new()
931            }
932        };
933
934        // Ensure buffers have equal length when flushing
935        if chunk_size == usize::MAX {
936            let max_len = mono_result.len().max(stereo_result.len());
937            let mut mono_final = mono_result;
938            let mut stereo_final = stereo_result;
939            mono_final.resize(max_len, 0);
940            stereo_final.resize(max_len, 0);
941            (mono_final, stereo_final)
942        } else {
943            (mono_result, stereo_result)
944        }
945    }
946
947    pub fn stop_recording(&self) -> Result<()> {
948        self.cancel_token.cancel();
949        Ok(())
950    }
951
952    /// Mix mono and stereo buffers into interleaved stereo output
953    pub(crate) fn mix_buffers(mono_buf: &PcmBuf, stereo_buf: &PcmBuf) -> Vec<i16> {
954        // Ensure both buffers have equal length (guaranteed by pop() method)
955        assert_eq!(
956            mono_buf.len(),
957            stereo_buf.len(),
958            "Buffer lengths must be equal after pop()"
959        );
960
961        let len = mono_buf.len();
962        let mut mix_buff = Vec::with_capacity(len * 2);
963
964        for i in 0..len {
965            mix_buff.push(mono_buf[i]); // Left channel
966            mix_buff.push(stereo_buf[i]); // Right channel
967        }
968
969        mix_buff
970    }
971
972    /// Write mixed audio data to file
973    async fn write_audio_data(
974        &self,
975        file: &mut File,
976        mono_buf: &PcmBuf,
977        stereo_buf: &PcmBuf,
978    ) -> Result<usize> {
979        let max_len = mono_buf.len().max(stereo_buf.len());
980        if max_len == 0 {
981            return Ok(0);
982        }
983
984        let mix_buff = Self::mix_buffers(mono_buf, stereo_buf);
985
986        file.seek(std::io::SeekFrom::End(0)).await?;
987        file.write_all(&samples_to_bytes(&mix_buff)).await?;
988
989        Ok(max_len)
990    }
991
992    /// Process buffers with quality checks and write to file
993    async fn process_buffers(
994        &self,
995        file: &mut File,
996        mono_buf: PcmBuf,
997        stereo_buf: PcmBuf,
998    ) -> Result<()> {
999        // Skip if no data
1000        if mono_buf.is_empty() && stereo_buf.is_empty() {
1001            return Ok(());
1002        }
1003        // Write audio data
1004        let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1005        if samples_written > 0 {
1006            self.samples_written
1007                .fetch_add(samples_written, Ordering::SeqCst);
1008        }
1009        Ok(())
1010    }
1011
1012    /// Flush all remaining buffer content
1013    async fn flush_buffers(&self, file: &mut File) -> Result<()> {
1014        loop {
1015            let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
1016
1017            if mono_buf.is_empty() && stereo_buf.is_empty() {
1018                break;
1019            }
1020
1021            let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1022            if samples_written > 0 {
1023                self.samples_written
1024                    .fetch_add(samples_written, Ordering::SeqCst);
1025            }
1026        }
1027
1028        Ok(())
1029    }
1030}