1use anyhow::{Result, anyhow};
2use audio_codec::{Decoder, PcmBuf, g729::G729Decoder, samples_to_bytes};
3use futures::StreamExt;
4use hound::{SampleFormat, WavSpec};
5use serde::{Deserialize, Serialize};
6use std::{
7 collections::HashMap,
8 path::{Path, PathBuf},
9 sync::{
10 Mutex,
11 atomic::{AtomicUsize, Ordering},
12 },
13 time::Duration,
14 u32,
15};
16use tokio::{
17 fs::File,
18 io::{AsyncSeekExt, AsyncWriteExt},
19 select,
20 sync::mpsc::UnboundedReceiver,
21};
22use tokio_stream::wrappers::IntervalStream;
23use tokio_util::sync::CancellationToken;
24use tracing::{info, warn};
25
26#[cfg(feature = "opus")]
27use opusic_sys::{
28 OPUS_APPLICATION_AUDIO, OPUS_OK, OpusEncoder as OpusEncoderRaw, opus_encode,
29 opus_encoder_create, opus_encoder_destroy, opus_strerror,
30};
31#[cfg(feature = "opus")]
32use std::{ffi::CStr, os::raw::c_int, ptr::NonNull};
33
34use crate::media::{AudioFrame, Samples};
35
36#[cfg(feature = "opus")]
37fn opus_error_message(code: c_int) -> String {
38 if code == OPUS_OK {
39 return "ok".to_string();
40 }
41
42 unsafe {
43 let ptr = opus_strerror(code);
44 if ptr.is_null() {
45 format!("error code {code}")
46 } else {
47 CStr::from_ptr(ptr).to_string_lossy().into_owned()
48 }
49 }
50}
51
52#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
53#[serde(rename_all = "lowercase")]
54pub enum RecorderFormat {
55 Wav,
56 Ogg,
57}
58
59#[cfg(feature = "opus")]
60struct OggStreamWriter {
61 encoder: NonNull<OpusEncoderRaw>,
62 serial: u32,
63 sequence: u32,
64 granule_position: u64,
65 sample_rate: u32,
66}
67
68#[cfg(feature = "opus")]
69impl OggStreamWriter {
70 fn new(sample_rate: u32) -> Result<Self> {
71 let normalized = match sample_rate {
72 8000 | 12000 | 16000 | 24000 | 48000 => sample_rate,
73 _ => 16000,
74 };
75
76 let encoder = {
77 let mut error: c_int = 0;
78 let ptr = unsafe {
79 opus_encoder_create(
80 normalized as c_int,
81 2,
82 OPUS_APPLICATION_AUDIO,
83 &mut error as *mut c_int,
84 )
85 };
86
87 if error != OPUS_OK {
88 unsafe {
89 if !ptr.is_null() {
90 opus_encoder_destroy(ptr);
91 }
92 }
93 return Err(anyhow!(
94 "Failed to create Opus encoder: {}",
95 opus_error_message(error)
96 ));
97 }
98
99 NonNull::new(ptr)
100 .ok_or_else(|| anyhow!("Failed to create Opus encoder: null pointer returned"))?
101 };
102
103 let mut serial = rand::random::<u32>();
104 if serial == 0 {
105 serial = 1;
106 }
107
108 Ok(Self {
109 encoder,
110 serial,
111 sequence: 0,
112 granule_position: 0,
113 sample_rate: normalized,
114 })
115 }
116
117 fn sample_rate(&self) -> u32 {
118 self.sample_rate
119 }
120
121 fn granule_increment(&self, frame_samples: usize) -> u64 {
122 let factor = 48000 / self.sample_rate;
123 (frame_samples as u64) * (factor as u64)
124 }
125
126 fn encode_frame(&mut self, pcm: &[i16]) -> Result<Vec<u8>> {
127 if pcm.len() % 2 != 0 {
128 return Err(anyhow!(
129 "PCM frame must contain an even number of samples for stereo Opus encoding"
130 ));
131 }
132
133 let frame_size = (pcm.len() / 2) as c_int;
134 let mut buffer = vec![0u8; 4096];
135 let len = unsafe {
136 opus_encode(
137 self.encoder.as_ptr(),
138 pcm.as_ptr() as *const opusic_sys::opus_int16,
139 frame_size,
140 buffer.as_mut_ptr(),
141 buffer.len() as c_int,
142 )
143 };
144
145 if len < 0 {
146 return Err(anyhow!(
147 "Failed to encode Opus frame: {}",
148 opus_error_message(len)
149 ));
150 }
151
152 buffer.truncate(len as usize);
153 Ok(buffer)
154 }
155
156 async fn write_headers(&mut self, file: &mut File) -> Result<()> {
157 let head = Self::build_opus_head(self.sample_rate);
158 self.write_page(file, &head, 0, 0x02).await?;
159
160 let tags = Self::build_opus_tags();
161 self.write_page(file, &tags, 0, 0x00).await?;
162 Ok(())
163 }
164
165 async fn write_audio_packet(
166 &mut self,
167 file: &mut File,
168 packet: &[u8],
169 frame_samples: usize,
170 ) -> Result<()> {
171 let increment = self.granule_increment(frame_samples);
172 self.granule_position = self.granule_position.saturating_add(increment);
173 self.write_page(file, packet, self.granule_position, 0x00)
174 .await
175 }
176
177 async fn finalize(&mut self, file: &mut File) -> Result<()> {
178 self.write_page(file, &[], self.granule_position, 0x04)
179 .await
180 }
181
182 async fn write_page(
183 &mut self,
184 file: &mut File,
185 packet: &[u8],
186 granule_pos: u64,
187 header_type: u8,
188 ) -> Result<()> {
189 let mut segments = Vec::new();
190 if !packet.is_empty() {
191 let mut remaining = packet.len();
192 while remaining >= 255 {
193 segments.push(255u8);
194 remaining -= 255;
195 }
196 segments.push(remaining as u8);
197 }
198
199 let mut page = Vec::with_capacity(27 + segments.len() + packet.len());
200 page.extend_from_slice(b"OggS");
201 page.push(0); page.push(header_type);
203 page.extend_from_slice(&granule_pos.to_le_bytes());
204 page.extend_from_slice(&self.serial.to_le_bytes());
205 page.extend_from_slice(&self.sequence.to_le_bytes());
206 page.extend_from_slice(&0u32.to_le_bytes()); page.push(segments.len() as u8);
208 page.extend_from_slice(&segments);
209 page.extend_from_slice(packet);
210
211 let checksum = ogg_crc32(&page);
212 page[22..26].copy_from_slice(&checksum.to_le_bytes());
213
214 file.write_all(&page).await?;
215 self.sequence = self.sequence.wrapping_add(1);
216 Ok(())
217 }
218
219 fn build_opus_head(sample_rate: u32) -> Vec<u8> {
220 let mut head = Vec::with_capacity(19);
221 head.extend_from_slice(b"OpusHead");
222 head.push(1); head.push(2); head.extend_from_slice(&0u16.to_le_bytes()); head.extend_from_slice(&sample_rate.to_le_bytes());
226 head.extend_from_slice(&0i16.to_le_bytes()); head.push(0); head
229 }
230
231 fn build_opus_tags() -> Vec<u8> {
232 const VENDOR: &str = "rustpbx";
233 let vendor_bytes = VENDOR.as_bytes();
234 let mut tags = Vec::with_capacity(8 + 4 + vendor_bytes.len() + 4);
235 tags.extend_from_slice(b"OpusTags");
236 tags.extend_from_slice(&(vendor_bytes.len() as u32).to_le_bytes());
237 tags.extend_from_slice(vendor_bytes);
238 tags.extend_from_slice(&0u32.to_le_bytes()); tags
240 }
241}
242
243#[cfg(feature = "opus")]
244impl Drop for OggStreamWriter {
245 fn drop(&mut self) {
246 unsafe {
247 opus_encoder_destroy(self.encoder.as_ptr());
248 }
249 }
250}
251
252#[cfg(feature = "opus")]
253unsafe impl Send for OggStreamWriter {}
254
255#[cfg(feature = "opus")]
256unsafe impl Sync for OggStreamWriter {}
257
258#[cfg(feature = "opus")]
259fn ogg_crc32(data: &[u8]) -> u32 {
260 const POLY: u32 = 0x04C11DB7;
261 let mut crc: u32 = 0;
262 for &byte in data {
263 crc ^= (byte as u32) << 24;
264 for _ in 0..8 {
265 if (crc & 0x8000_0000) != 0 {
266 crc = (crc << 1) ^ POLY;
267 } else {
268 crc <<= 1;
269 }
270 }
271 }
272 crc
273}
274
275impl RecorderFormat {
276 pub fn extension(&self) -> &'static str {
277 match self {
278 RecorderFormat::Wav => "wav",
279 RecorderFormat::Ogg => "ogg",
280 }
281 }
282
283 pub fn is_supported(&self) -> bool {
284 match self {
285 RecorderFormat::Wav => true,
286 RecorderFormat::Ogg => cfg!(feature = "opus"),
287 }
288 }
289
290 pub fn effective(&self) -> RecorderFormat {
291 if self.is_supported() {
292 *self
293 } else {
294 RecorderFormat::Wav
295 }
296 }
297}
298
299impl Default for RecorderFormat {
300 fn default() -> Self {
301 RecorderFormat::Wav
302 }
303}
304
305#[derive(Debug, Deserialize, Serialize, Clone)]
306#[serde(rename_all = "camelCase")]
307#[serde(default)]
308pub struct RecorderOption {
309 #[serde(default)]
310 pub recorder_file: String,
311 #[serde(default)]
312 pub samplerate: u32,
313 #[serde(default)]
314 pub ptime: u32,
315 #[serde(default, skip_serializing_if = "Option::is_none")]
316 pub format: Option<RecorderFormat>,
317}
318
319impl RecorderOption {
320 pub fn new(recorder_file: String) -> Self {
321 Self {
322 recorder_file,
323 ..Default::default()
324 }
325 }
326
327 pub fn resolved_format(&self, default: RecorderFormat) -> RecorderFormat {
328 self.format.unwrap_or(default).effective()
329 }
330
331 pub fn ensure_path_extension(&mut self, fallback_format: RecorderFormat) {
332 let effective_format = self.format.unwrap_or(fallback_format).effective();
333 self.format = Some(effective_format);
334
335 if self.recorder_file.is_empty() {
336 return;
337 }
338
339 let mut path = PathBuf::from(&self.recorder_file);
340 let has_desired_ext = path
341 .extension()
342 .and_then(|ext| ext.to_str())
343 .map(|ext| ext.eq_ignore_ascii_case(effective_format.extension()))
344 .unwrap_or(false);
345
346 if !has_desired_ext {
347 path.set_extension(effective_format.extension());
348 self.recorder_file = path.to_string_lossy().into_owned();
349 }
350 }
351}
352
353impl Default for RecorderOption {
354 fn default() -> Self {
355 Self {
356 recorder_file: "".to_string(),
357 samplerate: 16000,
358 ptime: 200,
359 format: None,
360 }
361 }
362}
363
364pub struct G729WavReader {
365 decoder: G729Decoder,
366 file_path: PathBuf,
367}
368
369impl G729WavReader {
370 pub fn new(path: PathBuf) -> Self {
371 Self {
372 decoder: G729Decoder::new(),
373 file_path: path,
374 }
375 }
376
377 pub async fn read_all(&mut self) -> Result<Vec<i16>> {
378 let data = tokio::fs::read(&self.file_path).await?;
379 Ok(self.decoder.decode(&data))
380 }
381}
382
383pub struct Recorder {
384 session_id: String,
385 option: RecorderOption,
386 samples_written: AtomicUsize,
387 cancel_token: CancellationToken,
388 channel_idx: AtomicUsize,
389 channels: Mutex<HashMap<String, usize>>,
390 stereo_buf: Mutex<PcmBuf>,
391 mono_buf: Mutex<PcmBuf>,
392}
393
394impl Recorder {
395 pub fn new(
396 cancel_token: CancellationToken,
397 session_id: String,
398 option: RecorderOption,
399 ) -> Self {
400 Self {
401 session_id,
402 option,
403 samples_written: AtomicUsize::new(0),
404 cancel_token,
405 channel_idx: AtomicUsize::new(0),
406 channels: Mutex::new(HashMap::new()),
407 stereo_buf: Mutex::new(Vec::new()),
408 mono_buf: Mutex::new(Vec::new()),
409 }
410 }
411
412 async fn update_wav_header(&self, file: &mut File) -> Result<()> {
413 let total_samples = self.samples_written.load(Ordering::SeqCst);
415 let data_size = total_samples * 4; let spec = WavSpec {
419 channels: 2,
420 sample_rate: self.option.samplerate,
421 bits_per_sample: 16,
422 sample_format: SampleFormat::Int,
423 };
424 let mut header_buf = Vec::new();
426
427 header_buf.extend_from_slice(b"RIFF");
430 let file_size = data_size + 36; header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
432 header_buf.extend_from_slice(b"WAVE");
433
434 header_buf.extend_from_slice(b"fmt ");
436 header_buf.extend_from_slice(&16u32.to_le_bytes()); header_buf.extend_from_slice(&1u16.to_le_bytes()); header_buf.extend_from_slice(&(spec.channels as u16).to_le_bytes());
439 header_buf.extend_from_slice(&(spec.sample_rate).to_le_bytes());
440
441 let bytes_per_sec =
443 spec.sample_rate * (spec.channels as u32) * (spec.bits_per_sample as u32 / 8);
444 header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
445
446 let block_align = (spec.channels as u16) * (spec.bits_per_sample / 8);
448 header_buf.extend_from_slice(&block_align.to_le_bytes());
449 header_buf.extend_from_slice(&spec.bits_per_sample.to_le_bytes());
450
451 header_buf.extend_from_slice(b"data");
453 header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
454
455 file.seek(std::io::SeekFrom::Start(0)).await?;
457 file.write_all(&header_buf).await?;
458
459 file.seek(std::io::SeekFrom::End(0)).await?;
461
462 Ok(())
463 }
464
465 pub async fn process_recording(
466 &self,
467 file_path: &Path,
468 mut receiver: UnboundedReceiver<AudioFrame>,
469 ) -> Result<()> {
470 let first_frame = match receiver.recv().await {
472 Some(f) => f,
473 None => return Ok(()),
474 };
475
476 if let Samples::RTP { .. } = first_frame.samples {
477 return self
478 .process_recording_rtp(file_path, receiver, first_frame)
479 .await;
480 }
481
482 let requested_format = self.option.format.unwrap_or(RecorderFormat::Wav);
483 let effective_format = requested_format.effective();
484
485 if requested_format != effective_format {
486 warn!(
487 session_id = self.session_id,
488 requested = requested_format.extension(),
489 "Recorder format requires unavailable feature; falling back to wav"
490 );
491 }
492
493 if effective_format == RecorderFormat::Ogg {
494 #[cfg(feature = "opus")]
495 {
496 return self
497 .process_recording_ogg(file_path, receiver, first_frame)
498 .await;
499 }
500 #[cfg(not(feature = "opus"))]
501 {
502 unreachable!(
503 "RecorderFormat::effective() should prevent ogg when opus feature is disabled"
504 );
505 }
506 }
507
508 self.process_recording_wav(file_path, receiver, first_frame)
509 .await
510 }
511
512 fn ensure_parent_dir(&self, file_path: &Path) -> Result<()> {
513 if let Some(parent) = file_path.parent() {
514 if !parent.exists() {
515 if let Err(e) = std::fs::create_dir_all(parent) {
516 warn!(
517 "Failed to create recording file parent directory: {} {}",
518 e,
519 file_path.display()
520 );
521 return Err(anyhow!("Failed to create recording file parent directory"));
522 }
523 }
524 }
525 Ok(())
526 }
527
528 async fn create_output_file(&self, file_path: &Path) -> Result<File> {
529 self.ensure_parent_dir(file_path)?;
530 match File::create(file_path).await {
531 Ok(file) => {
532 info!(
533 session_id = self.session_id,
534 "recorder: created recording file: {}",
535 file_path.display()
536 );
537 Ok(file)
538 }
539 Err(e) => {
540 warn!(
541 "Failed to create recording file: {} {}",
542 e,
543 file_path.display()
544 );
545 Err(anyhow!("Failed to create recording file"))
546 }
547 }
548 }
549
550 async fn update_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
551 let total_bytes = self.samples_written.load(Ordering::SeqCst);
552 let data_size = total_bytes;
553
554 let (format_tag, sample_rate, channels): (u16, u32, u16) = match payload_type {
555 0 => (0x0007, 8000, 1), 8 => (0x0006, 8000, 1), 9 => (0x0064, 16000, 1), _ => return Ok(()), };
560
561 let mut header_buf = Vec::new();
562 header_buf.extend_from_slice(b"RIFF");
563 let file_size = data_size + 36;
564 header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
565 header_buf.extend_from_slice(b"WAVE");
566
567 header_buf.extend_from_slice(b"fmt ");
568 header_buf.extend_from_slice(&16u32.to_le_bytes());
569 header_buf.extend_from_slice(&format_tag.to_le_bytes());
570 header_buf.extend_from_slice(&(channels as u16).to_le_bytes());
571 header_buf.extend_from_slice(&sample_rate.to_le_bytes());
572
573 let bytes_per_sec: u32 = match payload_type {
575 9 => 8000, _ => sample_rate * (channels as u32) * 1, };
578 header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
579
580 let block_align: u16 = match payload_type {
582 9 => 1,
583 _ => 1,
584 };
585 header_buf.extend_from_slice(&block_align.to_le_bytes());
586
587 let bits_per_sample: u16 = match payload_type {
589 9 => 4,
590 _ => 8,
591 };
592 header_buf.extend_from_slice(&bits_per_sample.to_le_bytes());
593
594 header_buf.extend_from_slice(b"data");
595 header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
596
597 file.seek(std::io::SeekFrom::Start(0)).await?;
598 file.write_all(&header_buf).await?;
599 file.seek(std::io::SeekFrom::End(0)).await?;
600
601 Ok(())
602 }
603
604 async fn process_recording_rtp(
605 &self,
606 file_path: &Path,
607 mut receiver: UnboundedReceiver<AudioFrame>,
608 first_frame: AudioFrame,
609 ) -> Result<()> {
610 let (payload_type, mut file) =
611 if let Samples::RTP { payload_type, .. } = &first_frame.samples {
612 let mut path = file_path.to_path_buf();
613 if *payload_type == 18 {
615 path.set_extension("g729");
616 } else if matches!(payload_type, 0 | 8 | 9) {
617 path.set_extension("wav");
618 }
619
620 let file = self.create_output_file(&path).await?;
621 (*payload_type, file)
622 } else {
623 return Err(anyhow!("Invalid frame type for RTP recording"));
624 };
625
626 match payload_type {
628 0 | 8 | 9 => {
629 self.write_wav_header_rtp(&mut file, payload_type).await?;
630 }
631 _ => {}
632 }
633
634 if let Samples::RTP { payload, .. } = first_frame.samples {
635 file.write_all(&payload).await?;
636 self.samples_written
637 .fetch_add(payload.len(), Ordering::SeqCst);
638 }
639
640 loop {
641 match receiver.recv().await {
642 Some(frame) => {
643 if let Samples::RTP { payload, .. } = frame.samples {
644 file.write_all(&payload).await?;
645 self.samples_written
646 .fetch_add(payload.len(), Ordering::SeqCst);
647 }
648 }
649 None => break,
650 }
651 }
652
653 match payload_type {
655 0 | 8 | 9 => {
656 self.update_wav_header_rtp(&mut file, payload_type).await?;
657 }
658 _ => {}
659 }
660
661 file.sync_all().await?;
662
663 Ok(())
664 }
665
666 async fn write_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
668 self.update_wav_header_rtp(file, payload_type).await
669 }
670
671 async fn process_recording_wav(
672 &self,
673 file_path: &Path,
674 mut receiver: UnboundedReceiver<AudioFrame>,
675 first_frame: AudioFrame,
676 ) -> Result<()> {
677 let mut file = self.create_output_file(file_path).await?;
678 self.update_wav_header(&mut file).await?;
679
680 self.append_frame(first_frame).await.ok();
681
682 let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
683 info!(
684 session_id = self.session_id,
685 format = "wav",
686 "Recording to {} ptime: {}ms chunk_size: {}",
687 file_path.display(),
688 self.option.ptime,
689 chunk_size
690 );
691
692 let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
693 self.option.ptime as u64,
694 )));
695 loop {
696 select! {
697 Some(frame) = receiver.recv() => {
698 self.append_frame(frame).await.ok();
699 }
700 _ = interval.next() => {
701 let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
702 self.process_buffers(&mut file, mono_buf, stereo_buf).await?;
703 self.update_wav_header(&mut file).await?;
704 }
705 _ = self.cancel_token.cancelled() => {
706 self.flush_buffers(&mut file).await?;
707 self.update_wav_header(&mut file).await?;
708 return Ok(());
709 }
710 }
711 }
712 }
713
714 #[cfg(feature = "opus")]
715 async fn process_recording_ogg(
716 &self,
717 file_path: &Path,
718 mut receiver: UnboundedReceiver<AudioFrame>,
719 first_frame: AudioFrame,
720 ) -> Result<()> {
721 let mut file = self.create_output_file(file_path).await?;
722 let mut writer = OggStreamWriter::new(self.option.samplerate)?;
723 if writer.sample_rate() != self.option.samplerate {
724 warn!(
725 session_id = self.session_id,
726 requested = self.option.samplerate,
727 using = writer.sample_rate(),
728 "Adjusted recorder samplerate to Opus-compatible value"
729 );
730 }
731 writer.write_headers(&mut file).await?;
732
733 self.append_frame(first_frame).await.ok();
734
735 let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
736 info!(
737 session_id = self.session_id,
738 format = "ogg",
739 "Recording to {} ptime: {}ms chunk_size: {}",
740 file_path.display(),
741 self.option.ptime,
742 chunk_size
743 );
744
745 let frame_samples = std::cmp::max(1, (writer.sample_rate() / 50) as usize);
746 let frame_step = frame_samples * 2; let mut pending: Vec<i16> = Vec::new();
748
749 let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
750 self.option.ptime as u64,
751 )));
752
753 loop {
754 select! {
755 Some(frame) = receiver.recv() => {
756 self.append_frame(frame).await.ok();
757 }
758 _ = interval.next() => {
759 let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
760 if mono_buf.is_empty() && stereo_buf.is_empty() {
761 continue;
762 }
763
764 let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
765 pending.extend_from_slice(&mix);
766
767 let encoded_samples = self
768 .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, false)
769 .await?;
770 if encoded_samples > 0 {
771 self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
772 }
773 }
774 _ = self.cancel_token.cancelled() => {
775 let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
776 if !mono_buf.is_empty() || !stereo_buf.is_empty() {
777 let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
778 pending.extend_from_slice(&mix);
779 }
780
781 let encoded_samples = self
782 .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, true)
783 .await?;
784 if encoded_samples > 0 {
785 self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
786 }
787
788 writer.finalize(&mut file).await?;
789 return Ok(());
790 }
791 }
792 }
793 }
794
795 #[cfg(feature = "opus")]
796 async fn encode_pending_frames(
797 &self,
798 pending: &mut Vec<i16>,
799 frame_step: usize,
800 writer: &mut OggStreamWriter,
801 file: &mut File,
802 pad_final: bool,
803 ) -> Result<usize> {
804 let mut total_samples = 0usize;
805 let samples_per_channel = frame_step / 2;
806 while pending.len() >= frame_step {
807 let frame: Vec<i16> = pending.drain(..frame_step).collect();
808 let packet = writer.encode_frame(&frame)?;
809 writer
810 .write_audio_packet(file, &packet, samples_per_channel)
811 .await?;
812 total_samples += samples_per_channel;
813 }
814
815 if pad_final && !pending.is_empty() {
816 let mut frame: Vec<i16> = pending.drain(..).collect();
817 frame.resize(frame_step, 0);
818 let packet = writer.encode_frame(&frame)?;
819 writer
820 .write_audio_packet(file, &packet, samples_per_channel)
821 .await?;
822 total_samples += samples_per_channel;
823 }
824
825 Ok(total_samples)
826 }
827
828 fn get_channel_index(&self, track_id: &str) -> usize {
830 let mut channels = self.channels.lock().unwrap();
831 if let Some(&channel_idx) = channels.get(track_id) {
832 channel_idx % 2
833 } else {
834 let new_idx = self.channel_idx.fetch_add(1, Ordering::SeqCst);
835 channels.insert(track_id.to_string(), new_idx);
836 info!(
837 session_id = self.session_id,
838 "Assigned channel {} to track: {}",
839 new_idx % 2,
840 track_id
841 );
842 new_idx % 2
843 }
844 }
845
846 async fn append_frame(&self, frame: AudioFrame) -> Result<()> {
847 let buffer = match frame.samples {
848 Samples::PCM { samples } => samples,
849 _ => return Ok(()), };
851
852 if buffer.is_empty() {
854 return Ok(());
855 }
856
857 let channel_idx = self.get_channel_index(&frame.track_id);
859
860 match channel_idx {
862 0 => {
863 let mut mono_buf = self.mono_buf.lock().unwrap();
864 mono_buf.extend(buffer.iter());
865 }
866 1 => {
867 let mut stereo_buf = self.stereo_buf.lock().unwrap();
868 stereo_buf.extend(buffer.iter());
869 }
870 _ => {}
871 }
872
873 Ok(())
874 }
875
876 pub(crate) fn extract_samples(buffer: &mut PcmBuf, extract_size: usize) -> PcmBuf {
878 if extract_size > 0 && !buffer.is_empty() {
879 let take_size = extract_size.min(buffer.len());
880 buffer.drain(..take_size).collect()
881 } else {
882 Vec::new()
883 }
884 }
885
886 async fn pop(&self, chunk_size: usize) -> (PcmBuf, PcmBuf) {
887 let mut mono_buf = self.mono_buf.lock().unwrap();
888 let mut stereo_buf = self.stereo_buf.lock().unwrap();
889
890 let safe_chunk_size = chunk_size.min(16000 * 10); let mono_result = if mono_buf.len() >= safe_chunk_size {
894 Self::extract_samples(&mut mono_buf, safe_chunk_size)
896 } else if !mono_buf.is_empty() {
897 let available_len = mono_buf.len(); let mut result = Self::extract_samples(&mut mono_buf, available_len);
900 if chunk_size != usize::MAX {
901 result.resize(safe_chunk_size, 0); }
904 result
905 } else {
906 if chunk_size != usize::MAX {
908 vec![0; safe_chunk_size]
909 } else {
910 Vec::new()
911 }
912 };
913
914 let stereo_result = if stereo_buf.len() >= safe_chunk_size {
915 Self::extract_samples(&mut stereo_buf, safe_chunk_size)
917 } else if !stereo_buf.is_empty() {
918 let available_len = stereo_buf.len(); let mut result = Self::extract_samples(&mut stereo_buf, available_len);
921 if chunk_size != usize::MAX {
922 result.resize(safe_chunk_size, 0); }
925 result
926 } else {
927 if chunk_size != usize::MAX {
929 vec![0; safe_chunk_size]
930 } else {
931 Vec::new()
932 }
933 };
934
935 if chunk_size == usize::MAX {
937 let max_len = mono_result.len().max(stereo_result.len());
938 let mut mono_final = mono_result;
939 let mut stereo_final = stereo_result;
940 mono_final.resize(max_len, 0);
941 stereo_final.resize(max_len, 0);
942 (mono_final, stereo_final)
943 } else {
944 (mono_result, stereo_result)
945 }
946 }
947
948 pub fn stop_recording(&self) -> Result<()> {
949 self.cancel_token.cancel();
950 Ok(())
951 }
952
953 pub(crate) fn mix_buffers(mono_buf: &PcmBuf, stereo_buf: &PcmBuf) -> Vec<i16> {
955 assert_eq!(
957 mono_buf.len(),
958 stereo_buf.len(),
959 "Buffer lengths must be equal after pop()"
960 );
961
962 let len = mono_buf.len();
963 let mut mix_buff = Vec::with_capacity(len * 2);
964
965 for i in 0..len {
966 mix_buff.push(mono_buf[i]); mix_buff.push(stereo_buf[i]); }
969
970 mix_buff
971 }
972
973 async fn write_audio_data(
975 &self,
976 file: &mut File,
977 mono_buf: &PcmBuf,
978 stereo_buf: &PcmBuf,
979 ) -> Result<usize> {
980 let max_len = mono_buf.len().max(stereo_buf.len());
981 if max_len == 0 {
982 return Ok(0);
983 }
984
985 let mix_buff = Self::mix_buffers(mono_buf, stereo_buf);
986
987 file.seek(std::io::SeekFrom::End(0)).await?;
988 file.write_all(&samples_to_bytes(&mix_buff)).await?;
989
990 Ok(max_len)
991 }
992
993 async fn process_buffers(
995 &self,
996 file: &mut File,
997 mono_buf: PcmBuf,
998 stereo_buf: PcmBuf,
999 ) -> Result<()> {
1000 if mono_buf.is_empty() && stereo_buf.is_empty() {
1002 return Ok(());
1003 }
1004 let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1006 if samples_written > 0 {
1007 self.samples_written
1008 .fetch_add(samples_written, Ordering::SeqCst);
1009 }
1010 Ok(())
1011 }
1012
1013 async fn flush_buffers(&self, file: &mut File) -> Result<()> {
1015 loop {
1016 let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
1017
1018 if mono_buf.is_empty() && stereo_buf.is_empty() {
1019 break;
1020 }
1021
1022 let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1023 if samples_written > 0 {
1024 self.samples_written
1025 .fetch_add(samples_written, Ordering::SeqCst);
1026 }
1027 }
1028
1029 Ok(())
1030 }
1031}