1use crate::media::{AudioFrame, PcmBuf, Samples, codecs::samples_to_bytes};
2use anyhow::{Result, anyhow};
3use futures::StreamExt;
4use hound::{SampleFormat, WavSpec};
5use serde::{Deserialize, Serialize};
6use std::{
7 collections::HashMap,
8 path::{Path, PathBuf},
9 sync::{
10 Mutex,
11 atomic::{AtomicUsize, Ordering},
12 },
13 time::Duration,
14 u32,
15};
16use tokio::{
17 fs::File,
18 io::{AsyncSeekExt, AsyncWriteExt},
19 select,
20 sync::mpsc::UnboundedReceiver,
21};
22use tokio_stream::wrappers::IntervalStream;
23use tokio_util::sync::CancellationToken;
24use tracing::{info, warn};
25
26#[cfg(feature = "opus")]
27use opusic_sys::{
28 OPUS_APPLICATION_AUDIO, OPUS_OK, OpusEncoder as OpusEncoderRaw, opus_encode,
29 opus_encoder_create, opus_encoder_destroy, opus_strerror,
30};
31#[cfg(feature = "opus")]
32use std::{ffi::CStr, os::raw::c_int, ptr::NonNull};
33
34#[cfg(feature = "opus")]
35fn opus_error_message(code: c_int) -> String {
36 if code == OPUS_OK {
37 return "ok".to_string();
38 }
39
40 unsafe {
41 let ptr = opus_strerror(code);
42 if ptr.is_null() {
43 format!("error code {code}")
44 } else {
45 CStr::from_ptr(ptr).to_string_lossy().into_owned()
46 }
47 }
48}
49
50#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
51#[serde(rename_all = "lowercase")]
52pub enum RecorderFormat {
53 Wav,
54 Ogg,
55}
56
57#[cfg(feature = "opus")]
58struct OggStreamWriter {
59 encoder: NonNull<OpusEncoderRaw>,
60 serial: u32,
61 sequence: u32,
62 granule_position: u64,
63 sample_rate: u32,
64}
65
66#[cfg(feature = "opus")]
67impl OggStreamWriter {
68 fn new(sample_rate: u32) -> Result<Self> {
69 let normalized = match sample_rate {
70 8000 | 12000 | 16000 | 24000 | 48000 => sample_rate,
71 _ => 16000,
72 };
73
74 let encoder = {
75 let mut error: c_int = 0;
76 let ptr = unsafe {
77 opus_encoder_create(
78 normalized as c_int,
79 2,
80 OPUS_APPLICATION_AUDIO,
81 &mut error as *mut c_int,
82 )
83 };
84
85 if error != OPUS_OK {
86 unsafe {
87 if !ptr.is_null() {
88 opus_encoder_destroy(ptr);
89 }
90 }
91 return Err(anyhow!(
92 "Failed to create Opus encoder: {}",
93 opus_error_message(error)
94 ));
95 }
96
97 NonNull::new(ptr)
98 .ok_or_else(|| anyhow!("Failed to create Opus encoder: null pointer returned"))?
99 };
100
101 let mut serial = rand::random::<u32>();
102 if serial == 0 {
103 serial = 1;
104 }
105
106 Ok(Self {
107 encoder,
108 serial,
109 sequence: 0,
110 granule_position: 0,
111 sample_rate: normalized,
112 })
113 }
114
115 fn sample_rate(&self) -> u32 {
116 self.sample_rate
117 }
118
119 fn granule_increment(&self, frame_samples: usize) -> u64 {
120 let factor = 48000 / self.sample_rate;
121 (frame_samples as u64) * (factor as u64)
122 }
123
124 fn encode_frame(&mut self, pcm: &[i16]) -> Result<Vec<u8>> {
125 if pcm.len() % 2 != 0 {
126 return Err(anyhow!(
127 "PCM frame must contain an even number of samples for stereo Opus encoding"
128 ));
129 }
130
131 let frame_size = (pcm.len() / 2) as c_int;
132 let mut buffer = vec![0u8; 4096];
133 let len = unsafe {
134 opus_encode(
135 self.encoder.as_ptr(),
136 pcm.as_ptr() as *const opusic_sys::opus_int16,
137 frame_size,
138 buffer.as_mut_ptr(),
139 buffer.len() as c_int,
140 )
141 };
142
143 if len < 0 {
144 return Err(anyhow!(
145 "Failed to encode Opus frame: {}",
146 opus_error_message(len)
147 ));
148 }
149
150 buffer.truncate(len as usize);
151 Ok(buffer)
152 }
153
154 async fn write_headers(&mut self, file: &mut File) -> Result<()> {
155 let head = Self::build_opus_head(self.sample_rate);
156 self.write_page(file, &head, 0, 0x02).await?;
157
158 let tags = Self::build_opus_tags();
159 self.write_page(file, &tags, 0, 0x00).await?;
160 Ok(())
161 }
162
163 async fn write_audio_packet(
164 &mut self,
165 file: &mut File,
166 packet: &[u8],
167 frame_samples: usize,
168 ) -> Result<()> {
169 let increment = self.granule_increment(frame_samples);
170 self.granule_position = self.granule_position.saturating_add(increment);
171 self.write_page(file, packet, self.granule_position, 0x00)
172 .await
173 }
174
175 async fn finalize(&mut self, file: &mut File) -> Result<()> {
176 self.write_page(file, &[], self.granule_position, 0x04)
177 .await
178 }
179
180 async fn write_page(
181 &mut self,
182 file: &mut File,
183 packet: &[u8],
184 granule_pos: u64,
185 header_type: u8,
186 ) -> Result<()> {
187 let mut segments = Vec::new();
188 if !packet.is_empty() {
189 let mut remaining = packet.len();
190 while remaining >= 255 {
191 segments.push(255u8);
192 remaining -= 255;
193 }
194 segments.push(remaining as u8);
195 }
196
197 let mut page = Vec::with_capacity(27 + segments.len() + packet.len());
198 page.extend_from_slice(b"OggS");
199 page.push(0); page.push(header_type);
201 page.extend_from_slice(&granule_pos.to_le_bytes());
202 page.extend_from_slice(&self.serial.to_le_bytes());
203 page.extend_from_slice(&self.sequence.to_le_bytes());
204 page.extend_from_slice(&0u32.to_le_bytes()); page.push(segments.len() as u8);
206 page.extend_from_slice(&segments);
207 page.extend_from_slice(packet);
208
209 let checksum = ogg_crc32(&page);
210 page[22..26].copy_from_slice(&checksum.to_le_bytes());
211
212 file.write_all(&page).await?;
213 self.sequence = self.sequence.wrapping_add(1);
214 Ok(())
215 }
216
217 fn build_opus_head(sample_rate: u32) -> Vec<u8> {
218 let mut head = Vec::with_capacity(19);
219 head.extend_from_slice(b"OpusHead");
220 head.push(1); head.push(2); head.extend_from_slice(&0u16.to_le_bytes()); head.extend_from_slice(&sample_rate.to_le_bytes());
224 head.extend_from_slice(&0i16.to_le_bytes()); head.push(0); head
227 }
228
229 fn build_opus_tags() -> Vec<u8> {
230 const VENDOR: &str = "rustpbx";
231 let vendor_bytes = VENDOR.as_bytes();
232 let mut tags = Vec::with_capacity(8 + 4 + vendor_bytes.len() + 4);
233 tags.extend_from_slice(b"OpusTags");
234 tags.extend_from_slice(&(vendor_bytes.len() as u32).to_le_bytes());
235 tags.extend_from_slice(vendor_bytes);
236 tags.extend_from_slice(&0u32.to_le_bytes()); tags
238 }
239}
240
241#[cfg(feature = "opus")]
242impl Drop for OggStreamWriter {
243 fn drop(&mut self) {
244 unsafe {
245 opus_encoder_destroy(self.encoder.as_ptr());
246 }
247 }
248}
249
250#[cfg(feature = "opus")]
251unsafe impl Send for OggStreamWriter {}
252
253#[cfg(feature = "opus")]
254unsafe impl Sync for OggStreamWriter {}
255
256#[cfg(feature = "opus")]
257fn ogg_crc32(data: &[u8]) -> u32 {
258 const POLY: u32 = 0x04C11DB7;
259 let mut crc: u32 = 0;
260 for &byte in data {
261 crc ^= (byte as u32) << 24;
262 for _ in 0..8 {
263 if (crc & 0x8000_0000) != 0 {
264 crc = (crc << 1) ^ POLY;
265 } else {
266 crc <<= 1;
267 }
268 }
269 }
270 crc
271}
272
273impl RecorderFormat {
274 pub fn extension(&self) -> &'static str {
275 match self {
276 RecorderFormat::Wav => "wav",
277 RecorderFormat::Ogg => "ogg",
278 }
279 }
280
281 pub fn is_supported(&self) -> bool {
282 match self {
283 RecorderFormat::Wav => true,
284 RecorderFormat::Ogg => cfg!(feature = "opus"),
285 }
286 }
287
288 pub fn effective(&self) -> RecorderFormat {
289 if self.is_supported() {
290 *self
291 } else {
292 RecorderFormat::Wav
293 }
294 }
295}
296
297impl Default for RecorderFormat {
298 fn default() -> Self {
299 RecorderFormat::Wav
300 }
301}
302
303#[derive(Debug, Deserialize, Serialize, Clone)]
304#[serde(rename_all = "camelCase")]
305#[serde(default)]
306pub struct RecorderOption {
307 #[serde(default)]
308 pub recorder_file: String,
309 #[serde(default)]
310 pub samplerate: u32,
311 #[serde(default)]
312 pub ptime: u32,
313 #[serde(default, skip_serializing_if = "Option::is_none")]
314 pub format: Option<RecorderFormat>,
315}
316
317impl RecorderOption {
318 pub fn new(recorder_file: String) -> Self {
319 Self {
320 recorder_file,
321 ..Default::default()
322 }
323 }
324
325 pub fn resolved_format(&self, default: RecorderFormat) -> RecorderFormat {
326 self.format.unwrap_or(default).effective()
327 }
328
329 pub fn ensure_path_extension(&mut self, fallback_format: RecorderFormat) {
330 let effective_format = self.format.unwrap_or(fallback_format).effective();
331 self.format = Some(effective_format);
332
333 if self.recorder_file.is_empty() {
334 return;
335 }
336
337 let mut path = PathBuf::from(&self.recorder_file);
338 let has_desired_ext = path
339 .extension()
340 .and_then(|ext| ext.to_str())
341 .map(|ext| ext.eq_ignore_ascii_case(effective_format.extension()))
342 .unwrap_or(false);
343
344 if !has_desired_ext {
345 path.set_extension(effective_format.extension());
346 self.recorder_file = path.to_string_lossy().into_owned();
347 }
348 }
349}
350
351impl Default for RecorderOption {
352 fn default() -> Self {
353 Self {
354 recorder_file: "".to_string(),
355 samplerate: 16000,
356 ptime: 200,
357 format: None,
358 }
359 }
360}
361
362pub struct Recorder {
363 session_id: String,
364 option: RecorderOption,
365 samples_written: AtomicUsize,
366 cancel_token: CancellationToken,
367 channel_idx: AtomicUsize,
368 channels: Mutex<HashMap<String, usize>>,
369 stereo_buf: Mutex<PcmBuf>,
370 mono_buf: Mutex<PcmBuf>,
371}
372
373impl Recorder {
374 pub fn new(
375 cancel_token: CancellationToken,
376 session_id: String,
377 option: RecorderOption,
378 ) -> Self {
379 Self {
380 session_id,
381 option,
382 samples_written: AtomicUsize::new(0),
383 cancel_token,
384 channel_idx: AtomicUsize::new(0),
385 channels: Mutex::new(HashMap::new()),
386 stereo_buf: Mutex::new(Vec::new()),
387 mono_buf: Mutex::new(Vec::new()),
388 }
389 }
390
391 async fn update_wav_header(&self, file: &mut File) -> Result<()> {
392 let total_samples = self.samples_written.load(Ordering::SeqCst);
394 let data_size = total_samples * 4; let spec = WavSpec {
398 channels: 2,
399 sample_rate: self.option.samplerate,
400 bits_per_sample: 16,
401 sample_format: SampleFormat::Int,
402 };
403 let mut header_buf = Vec::new();
405
406 header_buf.extend_from_slice(b"RIFF");
409 let file_size = data_size + 36; header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
411 header_buf.extend_from_slice(b"WAVE");
412
413 header_buf.extend_from_slice(b"fmt ");
415 header_buf.extend_from_slice(&16u32.to_le_bytes()); header_buf.extend_from_slice(&1u16.to_le_bytes()); header_buf.extend_from_slice(&(spec.channels as u16).to_le_bytes());
418 header_buf.extend_from_slice(&(spec.sample_rate).to_le_bytes());
419
420 let bytes_per_sec =
422 spec.sample_rate * (spec.channels as u32) * (spec.bits_per_sample as u32 / 8);
423 header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
424
425 let block_align = (spec.channels as u16) * (spec.bits_per_sample / 8);
427 header_buf.extend_from_slice(&block_align.to_le_bytes());
428 header_buf.extend_from_slice(&spec.bits_per_sample.to_le_bytes());
429
430 header_buf.extend_from_slice(b"data");
432 header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
433
434 file.seek(std::io::SeekFrom::Start(0)).await?;
436 file.write_all(&header_buf).await?;
437
438 file.seek(std::io::SeekFrom::End(0)).await?;
440
441 Ok(())
442 }
443
444 pub async fn process_recording(
445 &self,
446 file_path: &Path,
447 receiver: UnboundedReceiver<AudioFrame>,
448 ) -> Result<()> {
449 let requested_format = self.option.format.unwrap_or(RecorderFormat::Wav);
450 let effective_format = requested_format.effective();
451
452 if requested_format != effective_format {
453 warn!(
454 session_id = self.session_id,
455 requested = requested_format.extension(),
456 "Recorder format requires unavailable feature; falling back to wav"
457 );
458 }
459
460 if effective_format == RecorderFormat::Ogg {
461 #[cfg(feature = "opus")]
462 {
463 return self.process_recording_ogg(file_path, receiver).await;
464 }
465 #[cfg(not(feature = "opus"))]
466 {
467 unreachable!(
468 "RecorderFormat::effective() should prevent ogg when opus feature is disabled"
469 );
470 }
471 }
472
473 self.process_recording_wav(file_path, receiver).await
474 }
475
476 fn ensure_parent_dir(&self, file_path: &Path) -> Result<()> {
477 if let Some(parent) = file_path.parent() {
478 if !parent.exists() {
479 if let Err(e) = std::fs::create_dir_all(parent) {
480 warn!(
481 "Failed to create recording file parent directory: {} {}",
482 e,
483 file_path.display()
484 );
485 return Err(anyhow!("Failed to create recording file parent directory"));
486 }
487 }
488 }
489 Ok(())
490 }
491
492 async fn create_output_file(&self, file_path: &Path) -> Result<File> {
493 self.ensure_parent_dir(file_path)?;
494 match File::create(file_path).await {
495 Ok(file) => {
496 info!(
497 session_id = self.session_id,
498 "recorder: created recording file: {}",
499 file_path.display()
500 );
501 Ok(file)
502 }
503 Err(e) => {
504 warn!(
505 "Failed to create recording file: {} {}",
506 e,
507 file_path.display()
508 );
509 Err(anyhow!("Failed to create recording file"))
510 }
511 }
512 }
513
514 async fn process_recording_wav(
515 &self,
516 file_path: &Path,
517 mut receiver: UnboundedReceiver<AudioFrame>,
518 ) -> Result<()> {
519 let mut file = self.create_output_file(file_path).await?;
520 self.update_wav_header(&mut file).await?;
521 let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
522 info!(
523 session_id = self.session_id,
524 format = "wav",
525 "Recording to {} ptime: {}ms chunk_size: {}",
526 file_path.display(),
527 self.option.ptime,
528 chunk_size
529 );
530
531 let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
532 self.option.ptime as u64,
533 )));
534 loop {
535 select! {
536 Some(frame) = receiver.recv() => {
537 self.append_frame(frame).await.ok();
538 }
539 _ = interval.next() => {
540 let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
541 self.process_buffers(&mut file, mono_buf, stereo_buf).await?;
542 self.update_wav_header(&mut file).await?;
543 }
544 _ = self.cancel_token.cancelled() => {
545 self.flush_buffers(&mut file).await?;
546 self.update_wav_header(&mut file).await?;
547 return Ok(());
548 }
549 }
550 }
551 }
552
553 #[cfg(feature = "opus")]
554 async fn process_recording_ogg(
555 &self,
556 file_path: &Path,
557 mut receiver: UnboundedReceiver<AudioFrame>,
558 ) -> Result<()> {
559 let mut file = self.create_output_file(file_path).await?;
560 let mut writer = OggStreamWriter::new(self.option.samplerate)?;
561 if writer.sample_rate() != self.option.samplerate {
562 warn!(
563 session_id = self.session_id,
564 requested = self.option.samplerate,
565 using = writer.sample_rate(),
566 "Adjusted recorder samplerate to Opus-compatible value"
567 );
568 }
569 writer.write_headers(&mut file).await?;
570
571 let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
572 info!(
573 session_id = self.session_id,
574 format = "ogg",
575 "Recording to {} ptime: {}ms chunk_size: {}",
576 file_path.display(),
577 self.option.ptime,
578 chunk_size
579 );
580
581 let frame_samples = std::cmp::max(1, (writer.sample_rate() / 50) as usize);
582 let frame_step = frame_samples * 2; let mut pending: Vec<i16> = Vec::new();
584
585 let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
586 self.option.ptime as u64,
587 )));
588
589 loop {
590 select! {
591 Some(frame) = receiver.recv() => {
592 self.append_frame(frame).await.ok();
593 }
594 _ = interval.next() => {
595 let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
596 if mono_buf.is_empty() && stereo_buf.is_empty() {
597 continue;
598 }
599
600 let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
601 pending.extend_from_slice(&mix);
602
603 let encoded_samples = self
604 .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, false)
605 .await?;
606 if encoded_samples > 0 {
607 self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
608 }
609 }
610 _ = self.cancel_token.cancelled() => {
611 let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
612 if !mono_buf.is_empty() || !stereo_buf.is_empty() {
613 let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
614 pending.extend_from_slice(&mix);
615 }
616
617 let encoded_samples = self
618 .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, true)
619 .await?;
620 if encoded_samples > 0 {
621 self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
622 }
623
624 writer.finalize(&mut file).await?;
625 return Ok(());
626 }
627 }
628 }
629 }
630
631 #[cfg(feature = "opus")]
632 async fn encode_pending_frames(
633 &self,
634 pending: &mut Vec<i16>,
635 frame_step: usize,
636 writer: &mut OggStreamWriter,
637 file: &mut File,
638 pad_final: bool,
639 ) -> Result<usize> {
640 let mut total_samples = 0usize;
641 let samples_per_channel = frame_step / 2;
642 while pending.len() >= frame_step {
643 let frame: Vec<i16> = pending.drain(..frame_step).collect();
644 let packet = writer.encode_frame(&frame)?;
645 writer
646 .write_audio_packet(file, &packet, samples_per_channel)
647 .await?;
648 total_samples += samples_per_channel;
649 }
650
651 if pad_final && !pending.is_empty() {
652 let mut frame: Vec<i16> = pending.drain(..).collect();
653 frame.resize(frame_step, 0);
654 let packet = writer.encode_frame(&frame)?;
655 writer
656 .write_audio_packet(file, &packet, samples_per_channel)
657 .await?;
658 total_samples += samples_per_channel;
659 }
660
661 Ok(total_samples)
662 }
663
664 fn get_channel_index(&self, track_id: &str) -> usize {
666 let mut channels = self.channels.lock().unwrap();
667 if let Some(&channel_idx) = channels.get(track_id) {
668 channel_idx % 2
669 } else {
670 let new_idx = self.channel_idx.fetch_add(1, Ordering::SeqCst);
671 channels.insert(track_id.to_string(), new_idx);
672 info!(
673 session_id = self.session_id,
674 "Assigned channel {} to track: {}",
675 new_idx % 2,
676 track_id
677 );
678 new_idx % 2
679 }
680 }
681
682 async fn append_frame(&self, frame: AudioFrame) -> Result<()> {
683 let buffer = match frame.samples {
684 Samples::PCM { samples } => samples,
685 _ => return Ok(()), };
687
688 if buffer.is_empty() {
690 return Ok(());
691 }
692
693 let channel_idx = self.get_channel_index(&frame.track_id);
695
696 match channel_idx {
698 0 => {
699 let mut mono_buf = self.mono_buf.lock().unwrap();
700 mono_buf.extend(buffer.iter());
701 }
702 1 => {
703 let mut stereo_buf = self.stereo_buf.lock().unwrap();
704 stereo_buf.extend(buffer.iter());
705 }
706 _ => {}
707 }
708
709 Ok(())
710 }
711
712 pub(crate) fn extract_samples(buffer: &mut PcmBuf, extract_size: usize) -> PcmBuf {
714 if extract_size > 0 && !buffer.is_empty() {
715 let take_size = extract_size.min(buffer.len());
716 buffer.drain(..take_size).collect()
717 } else {
718 Vec::new()
719 }
720 }
721
722 async fn pop(&self, chunk_size: usize) -> (PcmBuf, PcmBuf) {
723 let mut mono_buf = self.mono_buf.lock().unwrap();
724 let mut stereo_buf = self.stereo_buf.lock().unwrap();
725
726 let safe_chunk_size = chunk_size.min(16000 * 10); let mono_result = if mono_buf.len() >= safe_chunk_size {
730 Self::extract_samples(&mut mono_buf, safe_chunk_size)
732 } else if !mono_buf.is_empty() {
733 let available_len = mono_buf.len(); let mut result = Self::extract_samples(&mut mono_buf, available_len);
736 if chunk_size != usize::MAX {
737 result.resize(safe_chunk_size, 0); }
740 result
741 } else {
742 if chunk_size != usize::MAX {
744 vec![0; safe_chunk_size]
745 } else {
746 Vec::new()
747 }
748 };
749
750 let stereo_result = if stereo_buf.len() >= safe_chunk_size {
751 Self::extract_samples(&mut stereo_buf, safe_chunk_size)
753 } else if !stereo_buf.is_empty() {
754 let available_len = stereo_buf.len(); let mut result = Self::extract_samples(&mut stereo_buf, available_len);
757 if chunk_size != usize::MAX {
758 result.resize(safe_chunk_size, 0); }
761 result
762 } else {
763 if chunk_size != usize::MAX {
765 vec![0; safe_chunk_size]
766 } else {
767 Vec::new()
768 }
769 };
770
771 if chunk_size == usize::MAX {
773 let max_len = mono_result.len().max(stereo_result.len());
774 let mut mono_final = mono_result;
775 let mut stereo_final = stereo_result;
776 mono_final.resize(max_len, 0);
777 stereo_final.resize(max_len, 0);
778 (mono_final, stereo_final)
779 } else {
780 (mono_result, stereo_result)
781 }
782 }
783
784 pub fn stop_recording(&self) -> Result<()> {
785 self.cancel_token.cancel();
786 Ok(())
787 }
788
789 pub(crate) fn mix_buffers(mono_buf: &PcmBuf, stereo_buf: &PcmBuf) -> Vec<i16> {
791 assert_eq!(
793 mono_buf.len(),
794 stereo_buf.len(),
795 "Buffer lengths must be equal after pop()"
796 );
797
798 let len = mono_buf.len();
799 let mut mix_buff = Vec::with_capacity(len * 2);
800
801 for i in 0..len {
802 mix_buff.push(mono_buf[i]); mix_buff.push(stereo_buf[i]); }
805
806 mix_buff
807 }
808
809 async fn write_audio_data(
811 &self,
812 file: &mut File,
813 mono_buf: &PcmBuf,
814 stereo_buf: &PcmBuf,
815 ) -> Result<usize> {
816 let max_len = mono_buf.len().max(stereo_buf.len());
817 if max_len == 0 {
818 return Ok(0);
819 }
820
821 let mix_buff = Self::mix_buffers(mono_buf, stereo_buf);
822
823 file.seek(std::io::SeekFrom::End(0)).await?;
824 file.write_all(&samples_to_bytes(&mix_buff)).await?;
825
826 Ok(max_len)
827 }
828
829 async fn process_buffers(
831 &self,
832 file: &mut File,
833 mono_buf: PcmBuf,
834 stereo_buf: PcmBuf,
835 ) -> Result<()> {
836 if mono_buf.is_empty() && stereo_buf.is_empty() {
838 return Ok(());
839 }
840 let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
842 if samples_written > 0 {
843 self.samples_written
844 .fetch_add(samples_written, Ordering::SeqCst);
845 }
846 Ok(())
847 }
848
849 async fn flush_buffers(&self, file: &mut File) -> Result<()> {
851 loop {
852 let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
853
854 if mono_buf.is_empty() && stereo_buf.is_empty() {
855 break;
856 }
857
858 let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
859 if samples_written > 0 {
860 self.samples_written
861 .fetch_add(samples_written, Ordering::SeqCst);
862 }
863 }
864
865 Ok(())
866 }
867}