active_call/media/
recorder.rs

1use anyhow::{Result, anyhow};
2use audio_codec::{Decoder, PcmBuf, g729::G729Decoder, samples_to_bytes};
3use futures::StreamExt;
4use hound::{SampleFormat, WavSpec};
5use serde::{Deserialize, Serialize};
6use std::{
7    collections::HashMap,
8    path::{Path, PathBuf},
9    sync::{
10        Mutex,
11        atomic::{AtomicUsize, Ordering},
12    },
13    time::Duration,
14    u32,
15};
16use tokio::{
17    fs::File,
18    io::{AsyncSeekExt, AsyncWriteExt},
19    select,
20    sync::mpsc::UnboundedReceiver,
21};
22use tokio_stream::wrappers::IntervalStream;
23use tokio_util::sync::CancellationToken;
24use tracing::{info, warn};
25
26#[cfg(feature = "opus")]
27use opusic_sys::{
28    OPUS_APPLICATION_AUDIO, OPUS_OK, OpusEncoder as OpusEncoderRaw, opus_encode,
29    opus_encoder_create, opus_encoder_destroy, opus_strerror,
30};
31#[cfg(feature = "opus")]
32use std::{ffi::CStr, os::raw::c_int, ptr::NonNull};
33
34use crate::media::{AudioFrame, Samples};
35
36#[cfg(feature = "opus")]
37fn opus_error_message(code: c_int) -> String {
38    if code == OPUS_OK {
39        return "ok".to_string();
40    }
41
42    unsafe {
43        let ptr = opus_strerror(code);
44        if ptr.is_null() {
45            format!("error code {code}")
46        } else {
47            CStr::from_ptr(ptr).to_string_lossy().into_owned()
48        }
49    }
50}
51
52#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
53#[serde(rename_all = "lowercase")]
54pub enum RecorderFormat {
55    Wav,
56    Ogg,
57}
58
59#[cfg(feature = "opus")]
60struct OggStreamWriter {
61    encoder: NonNull<OpusEncoderRaw>,
62    serial: u32,
63    sequence: u32,
64    granule_position: u64,
65    sample_rate: u32,
66}
67
68#[cfg(feature = "opus")]
69impl OggStreamWriter {
70    fn new(sample_rate: u32) -> Result<Self> {
71        let normalized = match sample_rate {
72            8000 | 12000 | 16000 | 24000 | 48000 => sample_rate,
73            _ => 16000,
74        };
75
76        let encoder = {
77            let mut error: c_int = 0;
78            let ptr = unsafe {
79                opus_encoder_create(
80                    normalized as c_int,
81                    2,
82                    OPUS_APPLICATION_AUDIO,
83                    &mut error as *mut c_int,
84                )
85            };
86
87            if error != OPUS_OK {
88                unsafe {
89                    if !ptr.is_null() {
90                        opus_encoder_destroy(ptr);
91                    }
92                }
93                return Err(anyhow!(
94                    "Failed to create Opus encoder: {}",
95                    opus_error_message(error)
96                ));
97            }
98
99            NonNull::new(ptr)
100                .ok_or_else(|| anyhow!("Failed to create Opus encoder: null pointer returned"))?
101        };
102
103        let mut serial = rand::random::<u32>();
104        if serial == 0 {
105            serial = 1;
106        }
107
108        Ok(Self {
109            encoder,
110            serial,
111            sequence: 0,
112            granule_position: 0,
113            sample_rate: normalized,
114        })
115    }
116
117    fn sample_rate(&self) -> u32 {
118        self.sample_rate
119    }
120
121    fn granule_increment(&self, frame_samples: usize) -> u64 {
122        let factor = 48000 / self.sample_rate;
123        (frame_samples as u64) * (factor as u64)
124    }
125
126    fn encode_frame(&mut self, pcm: &[i16]) -> Result<Vec<u8>> {
127        if pcm.len() % 2 != 0 {
128            return Err(anyhow!(
129                "PCM frame must contain an even number of samples for stereo Opus encoding"
130            ));
131        }
132
133        let frame_size = (pcm.len() / 2) as c_int;
134        let mut buffer = vec![0u8; 4096];
135        let len = unsafe {
136            opus_encode(
137                self.encoder.as_ptr(),
138                pcm.as_ptr() as *const opusic_sys::opus_int16,
139                frame_size,
140                buffer.as_mut_ptr(),
141                buffer.len() as c_int,
142            )
143        };
144
145        if len < 0 {
146            return Err(anyhow!(
147                "Failed to encode Opus frame: {}",
148                opus_error_message(len)
149            ));
150        }
151
152        buffer.truncate(len as usize);
153        Ok(buffer)
154    }
155
156    async fn write_headers(&mut self, file: &mut File) -> Result<()> {
157        let head = Self::build_opus_head(self.sample_rate);
158        self.write_page(file, &head, 0, 0x02).await?;
159
160        let tags = Self::build_opus_tags();
161        self.write_page(file, &tags, 0, 0x00).await?;
162        Ok(())
163    }
164
165    async fn write_audio_packet(
166        &mut self,
167        file: &mut File,
168        packet: &[u8],
169        frame_samples: usize,
170    ) -> Result<()> {
171        let increment = self.granule_increment(frame_samples);
172        self.granule_position = self.granule_position.saturating_add(increment);
173        self.write_page(file, packet, self.granule_position, 0x00)
174            .await
175    }
176
177    async fn finalize(&mut self, file: &mut File) -> Result<()> {
178        self.write_page(file, &[], self.granule_position, 0x04)
179            .await
180    }
181
182    async fn write_page(
183        &mut self,
184        file: &mut File,
185        packet: &[u8],
186        granule_pos: u64,
187        header_type: u8,
188    ) -> Result<()> {
189        let mut segments = Vec::new();
190        if !packet.is_empty() {
191            let mut remaining = packet.len();
192            while remaining >= 255 {
193                segments.push(255u8);
194                remaining -= 255;
195            }
196            segments.push(remaining as u8);
197        }
198
199        let mut page = Vec::with_capacity(27 + segments.len() + packet.len());
200        page.extend_from_slice(b"OggS");
201        page.push(0); // version
202        page.push(header_type);
203        page.extend_from_slice(&granule_pos.to_le_bytes());
204        page.extend_from_slice(&self.serial.to_le_bytes());
205        page.extend_from_slice(&self.sequence.to_le_bytes());
206        page.extend_from_slice(&0u32.to_le_bytes()); // checksum placeholder
207        page.push(segments.len() as u8);
208        page.extend_from_slice(&segments);
209        page.extend_from_slice(packet);
210
211        let checksum = ogg_crc32(&page);
212        page[22..26].copy_from_slice(&checksum.to_le_bytes());
213
214        file.write_all(&page).await?;
215        self.sequence = self.sequence.wrapping_add(1);
216        Ok(())
217    }
218
219    fn build_opus_head(sample_rate: u32) -> Vec<u8> {
220        let mut head = Vec::with_capacity(19);
221        head.extend_from_slice(b"OpusHead");
222        head.push(1); // version
223        head.push(2); // channel count (stereo)
224        head.extend_from_slice(&0u16.to_le_bytes()); // pre-skip
225        head.extend_from_slice(&sample_rate.to_le_bytes());
226        head.extend_from_slice(&0i16.to_le_bytes()); // output gain
227        head.push(0); // channel mapping family
228        head
229    }
230
231    fn build_opus_tags() -> Vec<u8> {
232        const VENDOR: &str = "rustpbx";
233        let vendor_bytes = VENDOR.as_bytes();
234        let mut tags = Vec::with_capacity(8 + 4 + vendor_bytes.len() + 4);
235        tags.extend_from_slice(b"OpusTags");
236        tags.extend_from_slice(&(vendor_bytes.len() as u32).to_le_bytes());
237        tags.extend_from_slice(vendor_bytes);
238        tags.extend_from_slice(&0u32.to_le_bytes()); // user comment list length
239        tags
240    }
241}
242
243#[cfg(feature = "opus")]
244impl Drop for OggStreamWriter {
245    fn drop(&mut self) {
246        unsafe {
247            opus_encoder_destroy(self.encoder.as_ptr());
248        }
249    }
250}
251
252#[cfg(feature = "opus")]
253unsafe impl Send for OggStreamWriter {}
254
255#[cfg(feature = "opus")]
256unsafe impl Sync for OggStreamWriter {}
257
258#[cfg(feature = "opus")]
259fn ogg_crc32(data: &[u8]) -> u32 {
260    const POLY: u32 = 0x04C11DB7;
261    let mut crc: u32 = 0;
262    for &byte in data {
263        crc ^= (byte as u32) << 24;
264        for _ in 0..8 {
265            if (crc & 0x8000_0000) != 0 {
266                crc = (crc << 1) ^ POLY;
267            } else {
268                crc <<= 1;
269            }
270        }
271    }
272    crc
273}
274
275impl RecorderFormat {
276    pub fn extension(&self) -> &'static str {
277        match self {
278            RecorderFormat::Wav => "wav",
279            RecorderFormat::Ogg => "ogg",
280        }
281    }
282
283    pub fn is_supported(&self) -> bool {
284        match self {
285            RecorderFormat::Wav => true,
286            RecorderFormat::Ogg => cfg!(feature = "opus"),
287        }
288    }
289
290    pub fn effective(&self) -> RecorderFormat {
291        if self.is_supported() {
292            *self
293        } else {
294            RecorderFormat::Wav
295        }
296    }
297}
298
299impl Default for RecorderFormat {
300    fn default() -> Self {
301        RecorderFormat::Wav
302    }
303}
304
305#[derive(Debug, Deserialize, Serialize, Clone)]
306#[serde(rename_all = "camelCase")]
307#[serde(default)]
308pub struct RecorderOption {
309    #[serde(default)]
310    pub recorder_file: String,
311    #[serde(default)]
312    pub samplerate: u32,
313    #[serde(default)]
314    pub ptime: u32,
315    #[serde(default, skip_serializing_if = "Option::is_none")]
316    pub format: Option<RecorderFormat>,
317}
318
319impl RecorderOption {
320    pub fn new(recorder_file: String) -> Self {
321        Self {
322            recorder_file,
323            ..Default::default()
324        }
325    }
326
327    pub fn resolved_format(&self, default: RecorderFormat) -> RecorderFormat {
328        self.format.unwrap_or(default).effective()
329    }
330
331    pub fn ensure_path_extension(&mut self, fallback_format: RecorderFormat) {
332        let effective_format = self.format.unwrap_or(fallback_format).effective();
333        self.format = Some(effective_format);
334
335        if self.recorder_file.is_empty() {
336            return;
337        }
338
339        let mut path = PathBuf::from(&self.recorder_file);
340        let has_desired_ext = path
341            .extension()
342            .and_then(|ext| ext.to_str())
343            .map(|ext| ext.eq_ignore_ascii_case(effective_format.extension()))
344            .unwrap_or(false);
345
346        if !has_desired_ext {
347            path.set_extension(effective_format.extension());
348            self.recorder_file = path.to_string_lossy().into_owned();
349        }
350    }
351}
352
353impl Default for RecorderOption {
354    fn default() -> Self {
355        Self {
356            recorder_file: "".to_string(),
357            samplerate: 16000,
358            ptime: 200,
359            format: None,
360        }
361    }
362}
363
364pub struct G729WavReader {
365    decoder: G729Decoder,
366    file_path: PathBuf,
367}
368
369impl G729WavReader {
370    pub fn new(path: PathBuf) -> Self {
371        Self {
372            decoder: G729Decoder::new(),
373            file_path: path,
374        }
375    }
376
377    pub async fn read_all(&mut self) -> Result<Vec<i16>> {
378        let data = tokio::fs::read(&self.file_path).await?;
379        Ok(self.decoder.decode(&data))
380    }
381}
382
383pub struct Recorder {
384    session_id: String,
385    option: RecorderOption,
386    samples_written: AtomicUsize,
387    cancel_token: CancellationToken,
388    channel_idx: AtomicUsize,
389    channels: Mutex<HashMap<String, usize>>,
390    stereo_buf: Mutex<PcmBuf>,
391    mono_buf: Mutex<PcmBuf>,
392}
393
394impl Recorder {
395    pub fn new(
396        cancel_token: CancellationToken,
397        session_id: String,
398        option: RecorderOption,
399    ) -> Self {
400        Self {
401            session_id,
402            option,
403            samples_written: AtomicUsize::new(0),
404            cancel_token,
405            channel_idx: AtomicUsize::new(0),
406            channels: Mutex::new(HashMap::new()),
407            stereo_buf: Mutex::new(Vec::new()),
408            mono_buf: Mutex::new(Vec::new()),
409        }
410    }
411
412    async fn update_wav_header(&self, file: &mut File) -> Result<()> {
413        // Get total data size (in bytes)
414        let total_samples = self.samples_written.load(Ordering::SeqCst);
415        let data_size = total_samples * 4; // Stereo, 16-bit = 4 bytes per sample
416
417        // Create a WavSpec for the WAV header
418        let spec = WavSpec {
419            channels: 2,
420            sample_rate: self.option.samplerate,
421            bits_per_sample: 16,
422            sample_format: SampleFormat::Int,
423        };
424        // Create a memory buffer for the WAV header
425        let mut header_buf = Vec::new();
426
427        // Create a WAV header using standard structure
428        // RIFF header
429        header_buf.extend_from_slice(b"RIFF");
430        let file_size = data_size + 36; // 36 bytes for header - 8 + data bytes
431        header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
432        header_buf.extend_from_slice(b"WAVE");
433
434        // fmt subchunk - use values from WavSpec
435        header_buf.extend_from_slice(b"fmt ");
436        header_buf.extend_from_slice(&16u32.to_le_bytes()); // fmt chunk size
437        header_buf.extend_from_slice(&1u16.to_le_bytes()); // PCM format
438        header_buf.extend_from_slice(&(spec.channels as u16).to_le_bytes());
439        header_buf.extend_from_slice(&(spec.sample_rate).to_le_bytes());
440
441        // Bytes per second: sample_rate * num_channels * bytes_per_sample
442        let bytes_per_sec =
443            spec.sample_rate * (spec.channels as u32) * (spec.bits_per_sample as u32 / 8);
444        header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
445
446        // Block align: num_channels * bytes_per_sample
447        let block_align = (spec.channels as u16) * (spec.bits_per_sample / 8);
448        header_buf.extend_from_slice(&block_align.to_le_bytes());
449        header_buf.extend_from_slice(&spec.bits_per_sample.to_le_bytes());
450
451        // Data subchunk
452        header_buf.extend_from_slice(b"data");
453        header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
454
455        // Seek to beginning of file and write header
456        file.seek(std::io::SeekFrom::Start(0)).await?;
457        file.write_all(&header_buf).await?;
458
459        // Seek back to end of file for further writing
460        file.seek(std::io::SeekFrom::End(0)).await?;
461
462        Ok(())
463    }
464
465    pub async fn process_recording(
466        &self,
467        file_path: &Path,
468        mut receiver: UnboundedReceiver<AudioFrame>,
469    ) -> Result<()> {
470        // Peek first frame to decide mode
471        let first_frame = match receiver.recv().await {
472            Some(f) => f,
473            None => return Ok(()),
474        };
475
476        if let Samples::RTP { .. } = first_frame.samples {
477            return self
478                .process_recording_rtp(file_path, receiver, first_frame)
479                .await;
480        }
481
482        let requested_format = self.option.format.unwrap_or(RecorderFormat::Wav);
483        let effective_format = requested_format.effective();
484
485        if requested_format != effective_format {
486            warn!(
487                session_id = self.session_id,
488                requested = requested_format.extension(),
489                "Recorder format requires unavailable feature; falling back to wav"
490            );
491        }
492
493        if effective_format == RecorderFormat::Ogg {
494            #[cfg(feature = "opus")]
495            {
496                return self
497                    .process_recording_ogg(file_path, receiver, first_frame)
498                    .await;
499            }
500            #[cfg(not(feature = "opus"))]
501            {
502                unreachable!(
503                    "RecorderFormat::effective() should prevent ogg when opus feature is disabled"
504                );
505            }
506        }
507
508        self.process_recording_wav(file_path, receiver, first_frame)
509            .await
510    }
511
512    fn ensure_parent_dir(&self, file_path: &Path) -> Result<()> {
513        if let Some(parent) = file_path.parent() {
514            if !parent.exists() {
515                if let Err(e) = std::fs::create_dir_all(parent) {
516                    warn!(
517                        "Failed to create recording file parent directory: {} {}",
518                        e,
519                        file_path.display()
520                    );
521                    return Err(anyhow!("Failed to create recording file parent directory"));
522                }
523            }
524        }
525        Ok(())
526    }
527
528    async fn create_output_file(&self, file_path: &Path) -> Result<File> {
529        self.ensure_parent_dir(file_path)?;
530        match File::create(file_path).await {
531            Ok(file) => {
532                info!(
533                    session_id = self.session_id,
534                    "recorder: created recording file: {}",
535                    file_path.display()
536                );
537                Ok(file)
538            }
539            Err(e) => {
540                warn!(
541                    "Failed to create recording file: {} {}",
542                    e,
543                    file_path.display()
544                );
545                Err(anyhow!("Failed to create recording file"))
546            }
547        }
548    }
549
550    async fn update_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
551        let total_bytes = self.samples_written.load(Ordering::SeqCst);
552        let data_size = total_bytes;
553
554        let (format_tag, sample_rate, channels): (u16, u32, u16) = match payload_type {
555            0 => (0x0007, 8000, 1),  // PCMU
556            8 => (0x0006, 8000, 1),  // PCMA
557            9 => (0x0064, 16000, 1), // G722
558            _ => return Ok(()),      // Should not happen for WAV
559        };
560
561        let mut header_buf = Vec::new();
562        header_buf.extend_from_slice(b"RIFF");
563        let file_size = data_size + 36;
564        header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
565        header_buf.extend_from_slice(b"WAVE");
566
567        header_buf.extend_from_slice(b"fmt ");
568        header_buf.extend_from_slice(&16u32.to_le_bytes());
569        header_buf.extend_from_slice(&format_tag.to_le_bytes());
570        header_buf.extend_from_slice(&(channels as u16).to_le_bytes());
571        header_buf.extend_from_slice(&sample_rate.to_le_bytes());
572
573        // Byte rate
574        let bytes_per_sec: u32 = match payload_type {
575            9 => 8000,                                // G.722 is 64kbps
576            _ => sample_rate * (channels as u32) * 1, // G.711 is 8-bit
577        };
578        header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
579
580        // Block align
581        let block_align: u16 = match payload_type {
582            9 => 1,
583            _ => 1,
584        };
585        header_buf.extend_from_slice(&block_align.to_le_bytes());
586
587        // Bits per sample
588        let bits_per_sample: u16 = match payload_type {
589            9 => 4,
590            _ => 8,
591        };
592        header_buf.extend_from_slice(&bits_per_sample.to_le_bytes());
593
594        header_buf.extend_from_slice(b"data");
595        header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
596
597        file.seek(std::io::SeekFrom::Start(0)).await?;
598        file.write_all(&header_buf).await?;
599        file.seek(std::io::SeekFrom::End(0)).await?;
600
601        Ok(())
602    }
603
604    async fn process_recording_rtp(
605        &self,
606        file_path: &Path,
607        mut receiver: UnboundedReceiver<AudioFrame>,
608        first_frame: AudioFrame,
609    ) -> Result<()> {
610        let (payload_type, mut file) =
611            if let Samples::RTP { payload_type, .. } = &first_frame.samples {
612                let mut path = file_path.to_path_buf();
613                // Adjust extension if needed
614                if *payload_type == 18 {
615                    path.set_extension("g729");
616                } else if matches!(payload_type, 0 | 8 | 9) {
617                    path.set_extension("wav");
618                }
619
620                let file = self.create_output_file(&path).await?;
621                (*payload_type, file)
622            } else {
623                return Err(anyhow!("Invalid frame type for RTP recording"));
624            };
625
626        // Write header if needed
627        match payload_type {
628            0 | 8 | 9 => {
629                self.write_wav_header_rtp(&mut file, payload_type).await?;
630            }
631            _ => {}
632        }
633
634        if let Samples::RTP { payload, .. } = first_frame.samples {
635            file.write_all(&payload).await?;
636            self.samples_written
637                .fetch_add(payload.len(), Ordering::SeqCst);
638        }
639
640        loop {
641            match receiver.recv().await {
642                Some(frame) => {
643                    if let Samples::RTP { payload, .. } = frame.samples {
644                        file.write_all(&payload).await?;
645                        self.samples_written
646                            .fetch_add(payload.len(), Ordering::SeqCst);
647                    }
648                }
649                None => break,
650            }
651        }
652
653        // Finalize
654        match payload_type {
655            0 | 8 | 9 => {
656                self.update_wav_header_rtp(&mut file, payload_type).await?;
657            }
658            _ => {}
659        }
660
661        file.sync_all().await?;
662
663        Ok(())
664    }
665
666    // Helper for initial header write (just placeholders)
667    async fn write_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
668        self.update_wav_header_rtp(file, payload_type).await
669    }
670
671    async fn process_recording_wav(
672        &self,
673        file_path: &Path,
674        mut receiver: UnboundedReceiver<AudioFrame>,
675        first_frame: AudioFrame,
676    ) -> Result<()> {
677        let mut file = self.create_output_file(file_path).await?;
678        self.update_wav_header(&mut file).await?;
679
680        self.append_frame(first_frame).await.ok();
681
682        let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
683        info!(
684            session_id = self.session_id,
685            format = "wav",
686            "Recording to {} ptime: {}ms chunk_size: {}",
687            file_path.display(),
688            self.option.ptime,
689            chunk_size
690        );
691
692        let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
693            self.option.ptime as u64,
694        )));
695        loop {
696            select! {
697                Some(frame) = receiver.recv() => {
698                    self.append_frame(frame).await.ok();
699                }
700                _ = interval.next() => {
701                    let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
702                    self.process_buffers(&mut file, mono_buf, stereo_buf).await?;
703                    self.update_wav_header(&mut file).await?;
704                }
705                _ = self.cancel_token.cancelled() => {
706                    self.flush_buffers(&mut file).await?;
707                    self.update_wav_header(&mut file).await?;
708                    return Ok(());
709                }
710            }
711        }
712    }
713
714    #[cfg(feature = "opus")]
715    async fn process_recording_ogg(
716        &self,
717        file_path: &Path,
718        mut receiver: UnboundedReceiver<AudioFrame>,
719        first_frame: AudioFrame,
720    ) -> Result<()> {
721        let mut file = self.create_output_file(file_path).await?;
722        let mut writer = OggStreamWriter::new(self.option.samplerate)?;
723        if writer.sample_rate() != self.option.samplerate {
724            warn!(
725                session_id = self.session_id,
726                requested = self.option.samplerate,
727                using = writer.sample_rate(),
728                "Adjusted recorder samplerate to Opus-compatible value"
729            );
730        }
731        writer.write_headers(&mut file).await?;
732
733        self.append_frame(first_frame).await.ok();
734
735        let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
736        info!(
737            session_id = self.session_id,
738            format = "ogg",
739            "Recording to {} ptime: {}ms chunk_size: {}",
740            file_path.display(),
741            self.option.ptime,
742            chunk_size
743        );
744
745        let frame_samples = std::cmp::max(1, (writer.sample_rate() / 50) as usize);
746        let frame_step = frame_samples * 2; // stereo samples
747        let mut pending: Vec<i16> = Vec::new();
748
749        let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
750            self.option.ptime as u64,
751        )));
752
753        loop {
754            select! {
755                Some(frame) = receiver.recv() => {
756                    self.append_frame(frame).await.ok();
757                }
758                _ = interval.next() => {
759                    let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
760                    if mono_buf.is_empty() && stereo_buf.is_empty() {
761                        continue;
762                    }
763
764                    let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
765                    pending.extend_from_slice(&mix);
766
767                    let encoded_samples = self
768                        .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, false)
769                        .await?;
770                    if encoded_samples > 0 {
771                        self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
772                    }
773                }
774                _ = self.cancel_token.cancelled() => {
775                    let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
776                    if !mono_buf.is_empty() || !stereo_buf.is_empty() {
777                        let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
778                        pending.extend_from_slice(&mix);
779                    }
780
781                    let encoded_samples = self
782                        .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, true)
783                        .await?;
784                    if encoded_samples > 0 {
785                        self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
786                    }
787
788                    writer.finalize(&mut file).await?;
789                    return Ok(());
790                }
791            }
792        }
793    }
794
795    #[cfg(feature = "opus")]
796    async fn encode_pending_frames(
797        &self,
798        pending: &mut Vec<i16>,
799        frame_step: usize,
800        writer: &mut OggStreamWriter,
801        file: &mut File,
802        pad_final: bool,
803    ) -> Result<usize> {
804        let mut total_samples = 0usize;
805        let samples_per_channel = frame_step / 2;
806        while pending.len() >= frame_step {
807            let frame: Vec<i16> = pending.drain(..frame_step).collect();
808            let packet = writer.encode_frame(&frame)?;
809            writer
810                .write_audio_packet(file, &packet, samples_per_channel)
811                .await?;
812            total_samples += samples_per_channel;
813        }
814
815        if pad_final && !pending.is_empty() {
816            let mut frame: Vec<i16> = pending.drain(..).collect();
817            frame.resize(frame_step, 0);
818            let packet = writer.encode_frame(&frame)?;
819            writer
820                .write_audio_packet(file, &packet, samples_per_channel)
821                .await?;
822            total_samples += samples_per_channel;
823        }
824
825        Ok(total_samples)
826    }
827
828    /// Get or assign channel index for a track
829    fn get_channel_index(&self, track_id: &str) -> usize {
830        let mut channels = self.channels.lock().unwrap();
831        if let Some(&channel_idx) = channels.get(track_id) {
832            channel_idx % 2
833        } else {
834            let new_idx = self.channel_idx.fetch_add(1, Ordering::SeqCst);
835            channels.insert(track_id.to_string(), new_idx);
836            info!(
837                session_id = self.session_id,
838                "Assigned channel {} to track: {}",
839                new_idx % 2,
840                track_id
841            );
842            new_idx % 2
843        }
844    }
845
846    async fn append_frame(&self, frame: AudioFrame) -> Result<()> {
847        let buffer = match frame.samples {
848            Samples::PCM { samples } => samples,
849            _ => return Ok(()), // ignore non-PCM frames
850        };
851
852        // Validate audio data
853        if buffer.is_empty() {
854            return Ok(());
855        }
856
857        // Get channel assignment
858        let channel_idx = self.get_channel_index(&frame.track_id);
859
860        // Add to appropriate buffer
861        match channel_idx {
862            0 => {
863                let mut mono_buf = self.mono_buf.lock().unwrap();
864                mono_buf.extend(buffer.iter());
865            }
866            1 => {
867                let mut stereo_buf = self.stereo_buf.lock().unwrap();
868                stereo_buf.extend(buffer.iter());
869            }
870            _ => {}
871        }
872
873        Ok(())
874    }
875
876    /// Extract samples from a buffer without padding
877    pub(crate) fn extract_samples(buffer: &mut PcmBuf, extract_size: usize) -> PcmBuf {
878        if extract_size > 0 && !buffer.is_empty() {
879            let take_size = extract_size.min(buffer.len());
880            buffer.drain(..take_size).collect()
881        } else {
882            Vec::new()
883        }
884    }
885
886    async fn pop(&self, chunk_size: usize) -> (PcmBuf, PcmBuf) {
887        let mut mono_buf = self.mono_buf.lock().unwrap();
888        let mut stereo_buf = self.stereo_buf.lock().unwrap();
889
890        // Limit chunk_size to prevent capacity overflow
891        let safe_chunk_size = chunk_size.min(16000 * 10); // Max 10 seconds at 16kHz
892
893        let mono_result = if mono_buf.len() >= safe_chunk_size {
894            // Sufficient data, extract complete chunk
895            Self::extract_samples(&mut mono_buf, safe_chunk_size)
896        } else if !mono_buf.is_empty() {
897            // Partial data, extract all and pad with silence
898            let available_len = mono_buf.len(); // Store length before mutable borrow
899            let mut result = Self::extract_samples(&mut mono_buf, available_len);
900            if chunk_size != usize::MAX {
901                // Don't pad when flushing
902                result.resize(safe_chunk_size, 0); // Pad with silence to chunk_size
903            }
904            result
905        } else {
906            // No data, output silence (only when not flushing)
907            if chunk_size != usize::MAX {
908                vec![0; safe_chunk_size]
909            } else {
910                Vec::new()
911            }
912        };
913
914        let stereo_result = if stereo_buf.len() >= safe_chunk_size {
915            // Sufficient data, extract complete chunk
916            Self::extract_samples(&mut stereo_buf, safe_chunk_size)
917        } else if !stereo_buf.is_empty() {
918            // Partial data, extract all and pad with silence
919            let available_len = stereo_buf.len(); // Store length before mutable borrow
920            let mut result = Self::extract_samples(&mut stereo_buf, available_len);
921            if chunk_size != usize::MAX {
922                // Don't pad when flushing
923                result.resize(safe_chunk_size, 0); // Pad with silence to chunk_size
924            }
925            result
926        } else {
927            // No data, output silence (only when not flushing)
928            if chunk_size != usize::MAX {
929                vec![0; safe_chunk_size]
930            } else {
931                Vec::new()
932            }
933        };
934
935        // Ensure buffers have equal length when flushing
936        if chunk_size == usize::MAX {
937            let max_len = mono_result.len().max(stereo_result.len());
938            let mut mono_final = mono_result;
939            let mut stereo_final = stereo_result;
940            mono_final.resize(max_len, 0);
941            stereo_final.resize(max_len, 0);
942            (mono_final, stereo_final)
943        } else {
944            (mono_result, stereo_result)
945        }
946    }
947
948    pub fn stop_recording(&self) -> Result<()> {
949        self.cancel_token.cancel();
950        Ok(())
951    }
952
953    /// Mix mono and stereo buffers into interleaved stereo output
954    pub(crate) fn mix_buffers(mono_buf: &PcmBuf, stereo_buf: &PcmBuf) -> Vec<i16> {
955        // Ensure both buffers have equal length (guaranteed by pop() method)
956        assert_eq!(
957            mono_buf.len(),
958            stereo_buf.len(),
959            "Buffer lengths must be equal after pop()"
960        );
961
962        let len = mono_buf.len();
963        let mut mix_buff = Vec::with_capacity(len * 2);
964
965        for i in 0..len {
966            mix_buff.push(mono_buf[i]); // Left channel
967            mix_buff.push(stereo_buf[i]); // Right channel
968        }
969
970        mix_buff
971    }
972
973    /// Write mixed audio data to file
974    async fn write_audio_data(
975        &self,
976        file: &mut File,
977        mono_buf: &PcmBuf,
978        stereo_buf: &PcmBuf,
979    ) -> Result<usize> {
980        let max_len = mono_buf.len().max(stereo_buf.len());
981        if max_len == 0 {
982            return Ok(0);
983        }
984
985        let mix_buff = Self::mix_buffers(mono_buf, stereo_buf);
986
987        file.seek(std::io::SeekFrom::End(0)).await?;
988        file.write_all(&samples_to_bytes(&mix_buff)).await?;
989
990        Ok(max_len)
991    }
992
993    /// Process buffers with quality checks and write to file
994    async fn process_buffers(
995        &self,
996        file: &mut File,
997        mono_buf: PcmBuf,
998        stereo_buf: PcmBuf,
999    ) -> Result<()> {
1000        // Skip if no data
1001        if mono_buf.is_empty() && stereo_buf.is_empty() {
1002            return Ok(());
1003        }
1004        // Write audio data
1005        let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1006        if samples_written > 0 {
1007            self.samples_written
1008                .fetch_add(samples_written, Ordering::SeqCst);
1009        }
1010        Ok(())
1011    }
1012
1013    /// Flush all remaining buffer content
1014    async fn flush_buffers(&self, file: &mut File) -> Result<()> {
1015        loop {
1016            let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
1017
1018            if mono_buf.is_empty() && stereo_buf.is_empty() {
1019                break;
1020            }
1021
1022            let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1023            if samples_written > 0 {
1024                self.samples_written
1025                    .fetch_add(samples_written, Ordering::SeqCst);
1026            }
1027        }
1028
1029        Ok(())
1030    }
1031}