1use crate::media::codecs::{Decoder, g729::G729Decoder};
2use crate::media::{AudioFrame, PcmBuf, Samples, codecs::samples_to_bytes};
3use anyhow::{Result, anyhow};
4use futures::StreamExt;
5use hound::{SampleFormat, WavSpec};
6use serde::{Deserialize, Serialize};
7use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{
11 Mutex,
12 atomic::{AtomicUsize, Ordering},
13 },
14 time::Duration,
15 u32,
16};
17use tokio::{
18 fs::File,
19 io::{AsyncSeekExt, AsyncWriteExt},
20 select,
21 sync::mpsc::UnboundedReceiver,
22};
23use tokio_stream::wrappers::IntervalStream;
24use tokio_util::sync::CancellationToken;
25use tracing::{info, warn};
26
27#[cfg(feature = "opus")]
28use opusic_sys::{
29 OPUS_APPLICATION_AUDIO, OPUS_OK, OpusEncoder as OpusEncoderRaw, opus_encode,
30 opus_encoder_create, opus_encoder_destroy, opus_strerror,
31};
32#[cfg(feature = "opus")]
33use std::{ffi::CStr, os::raw::c_int, ptr::NonNull};
34
35#[cfg(feature = "opus")]
36fn opus_error_message(code: c_int) -> String {
37 if code == OPUS_OK {
38 return "ok".to_string();
39 }
40
41 unsafe {
42 let ptr = opus_strerror(code);
43 if ptr.is_null() {
44 format!("error code {code}")
45 } else {
46 CStr::from_ptr(ptr).to_string_lossy().into_owned()
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
52#[serde(rename_all = "lowercase")]
53pub enum RecorderFormat {
54 Wav,
55 Ogg,
56}
57
58#[cfg(feature = "opus")]
59struct OggStreamWriter {
60 encoder: NonNull<OpusEncoderRaw>,
61 serial: u32,
62 sequence: u32,
63 granule_position: u64,
64 sample_rate: u32,
65}
66
67#[cfg(feature = "opus")]
68impl OggStreamWriter {
69 fn new(sample_rate: u32) -> Result<Self> {
70 let normalized = match sample_rate {
71 8000 | 12000 | 16000 | 24000 | 48000 => sample_rate,
72 _ => 16000,
73 };
74
75 let encoder = {
76 let mut error: c_int = 0;
77 let ptr = unsafe {
78 opus_encoder_create(
79 normalized as c_int,
80 2,
81 OPUS_APPLICATION_AUDIO,
82 &mut error as *mut c_int,
83 )
84 };
85
86 if error != OPUS_OK {
87 unsafe {
88 if !ptr.is_null() {
89 opus_encoder_destroy(ptr);
90 }
91 }
92 return Err(anyhow!(
93 "Failed to create Opus encoder: {}",
94 opus_error_message(error)
95 ));
96 }
97
98 NonNull::new(ptr)
99 .ok_or_else(|| anyhow!("Failed to create Opus encoder: null pointer returned"))?
100 };
101
102 let mut serial = rand::random::<u32>();
103 if serial == 0 {
104 serial = 1;
105 }
106
107 Ok(Self {
108 encoder,
109 serial,
110 sequence: 0,
111 granule_position: 0,
112 sample_rate: normalized,
113 })
114 }
115
116 fn sample_rate(&self) -> u32 {
117 self.sample_rate
118 }
119
120 fn granule_increment(&self, frame_samples: usize) -> u64 {
121 let factor = 48000 / self.sample_rate;
122 (frame_samples as u64) * (factor as u64)
123 }
124
125 fn encode_frame(&mut self, pcm: &[i16]) -> Result<Vec<u8>> {
126 if pcm.len() % 2 != 0 {
127 return Err(anyhow!(
128 "PCM frame must contain an even number of samples for stereo Opus encoding"
129 ));
130 }
131
132 let frame_size = (pcm.len() / 2) as c_int;
133 let mut buffer = vec![0u8; 4096];
134 let len = unsafe {
135 opus_encode(
136 self.encoder.as_ptr(),
137 pcm.as_ptr() as *const opusic_sys::opus_int16,
138 frame_size,
139 buffer.as_mut_ptr(),
140 buffer.len() as c_int,
141 )
142 };
143
144 if len < 0 {
145 return Err(anyhow!(
146 "Failed to encode Opus frame: {}",
147 opus_error_message(len)
148 ));
149 }
150
151 buffer.truncate(len as usize);
152 Ok(buffer)
153 }
154
155 async fn write_headers(&mut self, file: &mut File) -> Result<()> {
156 let head = Self::build_opus_head(self.sample_rate);
157 self.write_page(file, &head, 0, 0x02).await?;
158
159 let tags = Self::build_opus_tags();
160 self.write_page(file, &tags, 0, 0x00).await?;
161 Ok(())
162 }
163
164 async fn write_audio_packet(
165 &mut self,
166 file: &mut File,
167 packet: &[u8],
168 frame_samples: usize,
169 ) -> Result<()> {
170 let increment = self.granule_increment(frame_samples);
171 self.granule_position = self.granule_position.saturating_add(increment);
172 self.write_page(file, packet, self.granule_position, 0x00)
173 .await
174 }
175
176 async fn finalize(&mut self, file: &mut File) -> Result<()> {
177 self.write_page(file, &[], self.granule_position, 0x04)
178 .await
179 }
180
181 async fn write_page(
182 &mut self,
183 file: &mut File,
184 packet: &[u8],
185 granule_pos: u64,
186 header_type: u8,
187 ) -> Result<()> {
188 let mut segments = Vec::new();
189 if !packet.is_empty() {
190 let mut remaining = packet.len();
191 while remaining >= 255 {
192 segments.push(255u8);
193 remaining -= 255;
194 }
195 segments.push(remaining as u8);
196 }
197
198 let mut page = Vec::with_capacity(27 + segments.len() + packet.len());
199 page.extend_from_slice(b"OggS");
200 page.push(0); page.push(header_type);
202 page.extend_from_slice(&granule_pos.to_le_bytes());
203 page.extend_from_slice(&self.serial.to_le_bytes());
204 page.extend_from_slice(&self.sequence.to_le_bytes());
205 page.extend_from_slice(&0u32.to_le_bytes()); page.push(segments.len() as u8);
207 page.extend_from_slice(&segments);
208 page.extend_from_slice(packet);
209
210 let checksum = ogg_crc32(&page);
211 page[22..26].copy_from_slice(&checksum.to_le_bytes());
212
213 file.write_all(&page).await?;
214 self.sequence = self.sequence.wrapping_add(1);
215 Ok(())
216 }
217
218 fn build_opus_head(sample_rate: u32) -> Vec<u8> {
219 let mut head = Vec::with_capacity(19);
220 head.extend_from_slice(b"OpusHead");
221 head.push(1); head.push(2); head.extend_from_slice(&0u16.to_le_bytes()); head.extend_from_slice(&sample_rate.to_le_bytes());
225 head.extend_from_slice(&0i16.to_le_bytes()); head.push(0); head
228 }
229
230 fn build_opus_tags() -> Vec<u8> {
231 const VENDOR: &str = "rustpbx";
232 let vendor_bytes = VENDOR.as_bytes();
233 let mut tags = Vec::with_capacity(8 + 4 + vendor_bytes.len() + 4);
234 tags.extend_from_slice(b"OpusTags");
235 tags.extend_from_slice(&(vendor_bytes.len() as u32).to_le_bytes());
236 tags.extend_from_slice(vendor_bytes);
237 tags.extend_from_slice(&0u32.to_le_bytes()); tags
239 }
240}
241
242#[cfg(feature = "opus")]
243impl Drop for OggStreamWriter {
244 fn drop(&mut self) {
245 unsafe {
246 opus_encoder_destroy(self.encoder.as_ptr());
247 }
248 }
249}
250
251#[cfg(feature = "opus")]
252unsafe impl Send for OggStreamWriter {}
253
254#[cfg(feature = "opus")]
255unsafe impl Sync for OggStreamWriter {}
256
257#[cfg(feature = "opus")]
258fn ogg_crc32(data: &[u8]) -> u32 {
259 const POLY: u32 = 0x04C11DB7;
260 let mut crc: u32 = 0;
261 for &byte in data {
262 crc ^= (byte as u32) << 24;
263 for _ in 0..8 {
264 if (crc & 0x8000_0000) != 0 {
265 crc = (crc << 1) ^ POLY;
266 } else {
267 crc <<= 1;
268 }
269 }
270 }
271 crc
272}
273
274impl RecorderFormat {
275 pub fn extension(&self) -> &'static str {
276 match self {
277 RecorderFormat::Wav => "wav",
278 RecorderFormat::Ogg => "ogg",
279 }
280 }
281
282 pub fn is_supported(&self) -> bool {
283 match self {
284 RecorderFormat::Wav => true,
285 RecorderFormat::Ogg => cfg!(feature = "opus"),
286 }
287 }
288
289 pub fn effective(&self) -> RecorderFormat {
290 if self.is_supported() {
291 *self
292 } else {
293 RecorderFormat::Wav
294 }
295 }
296}
297
298impl Default for RecorderFormat {
299 fn default() -> Self {
300 RecorderFormat::Wav
301 }
302}
303
304#[derive(Debug, Deserialize, Serialize, Clone)]
305#[serde(rename_all = "camelCase")]
306#[serde(default)]
307pub struct RecorderOption {
308 #[serde(default)]
309 pub recorder_file: String,
310 #[serde(default)]
311 pub samplerate: u32,
312 #[serde(default)]
313 pub ptime: u32,
314 #[serde(default, skip_serializing_if = "Option::is_none")]
315 pub format: Option<RecorderFormat>,
316}
317
318impl RecorderOption {
319 pub fn new(recorder_file: String) -> Self {
320 Self {
321 recorder_file,
322 ..Default::default()
323 }
324 }
325
326 pub fn resolved_format(&self, default: RecorderFormat) -> RecorderFormat {
327 self.format.unwrap_or(default).effective()
328 }
329
330 pub fn ensure_path_extension(&mut self, fallback_format: RecorderFormat) {
331 let effective_format = self.format.unwrap_or(fallback_format).effective();
332 self.format = Some(effective_format);
333
334 if self.recorder_file.is_empty() {
335 return;
336 }
337
338 let mut path = PathBuf::from(&self.recorder_file);
339 let has_desired_ext = path
340 .extension()
341 .and_then(|ext| ext.to_str())
342 .map(|ext| ext.eq_ignore_ascii_case(effective_format.extension()))
343 .unwrap_or(false);
344
345 if !has_desired_ext {
346 path.set_extension(effective_format.extension());
347 self.recorder_file = path.to_string_lossy().into_owned();
348 }
349 }
350}
351
352impl Default for RecorderOption {
353 fn default() -> Self {
354 Self {
355 recorder_file: "".to_string(),
356 samplerate: 16000,
357 ptime: 200,
358 format: None,
359 }
360 }
361}
362
363pub struct G729WavReader {
364 decoder: G729Decoder,
365 file_path: PathBuf,
366}
367
368impl G729WavReader {
369 pub fn new(path: PathBuf) -> Self {
370 Self {
371 decoder: G729Decoder::new(),
372 file_path: path,
373 }
374 }
375
376 pub async fn read_all(&mut self) -> Result<Vec<i16>> {
377 let data = tokio::fs::read(&self.file_path).await?;
378 Ok(self.decoder.decode(&data))
379 }
380}
381
382pub struct Recorder {
383 session_id: String,
384 option: RecorderOption,
385 samples_written: AtomicUsize,
386 cancel_token: CancellationToken,
387 channel_idx: AtomicUsize,
388 channels: Mutex<HashMap<String, usize>>,
389 stereo_buf: Mutex<PcmBuf>,
390 mono_buf: Mutex<PcmBuf>,
391}
392
393impl Recorder {
394 pub fn new(
395 cancel_token: CancellationToken,
396 session_id: String,
397 option: RecorderOption,
398 ) -> Self {
399 Self {
400 session_id,
401 option,
402 samples_written: AtomicUsize::new(0),
403 cancel_token,
404 channel_idx: AtomicUsize::new(0),
405 channels: Mutex::new(HashMap::new()),
406 stereo_buf: Mutex::new(Vec::new()),
407 mono_buf: Mutex::new(Vec::new()),
408 }
409 }
410
411 async fn update_wav_header(&self, file: &mut File) -> Result<()> {
412 let total_samples = self.samples_written.load(Ordering::SeqCst);
414 let data_size = total_samples * 4; let spec = WavSpec {
418 channels: 2,
419 sample_rate: self.option.samplerate,
420 bits_per_sample: 16,
421 sample_format: SampleFormat::Int,
422 };
423 let mut header_buf = Vec::new();
425
426 header_buf.extend_from_slice(b"RIFF");
429 let file_size = data_size + 36; header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
431 header_buf.extend_from_slice(b"WAVE");
432
433 header_buf.extend_from_slice(b"fmt ");
435 header_buf.extend_from_slice(&16u32.to_le_bytes()); header_buf.extend_from_slice(&1u16.to_le_bytes()); header_buf.extend_from_slice(&(spec.channels as u16).to_le_bytes());
438 header_buf.extend_from_slice(&(spec.sample_rate).to_le_bytes());
439
440 let bytes_per_sec =
442 spec.sample_rate * (spec.channels as u32) * (spec.bits_per_sample as u32 / 8);
443 header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
444
445 let block_align = (spec.channels as u16) * (spec.bits_per_sample / 8);
447 header_buf.extend_from_slice(&block_align.to_le_bytes());
448 header_buf.extend_from_slice(&spec.bits_per_sample.to_le_bytes());
449
450 header_buf.extend_from_slice(b"data");
452 header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
453
454 file.seek(std::io::SeekFrom::Start(0)).await?;
456 file.write_all(&header_buf).await?;
457
458 file.seek(std::io::SeekFrom::End(0)).await?;
460
461 Ok(())
462 }
463
464 pub async fn process_recording(
465 &self,
466 file_path: &Path,
467 mut receiver: UnboundedReceiver<AudioFrame>,
468 ) -> Result<()> {
469 let first_frame = match receiver.recv().await {
471 Some(f) => f,
472 None => return Ok(()),
473 };
474
475 if let Samples::RTP { .. } = first_frame.samples {
476 return self
477 .process_recording_rtp(file_path, receiver, first_frame)
478 .await;
479 }
480
481 let requested_format = self.option.format.unwrap_or(RecorderFormat::Wav);
482 let effective_format = requested_format.effective();
483
484 if requested_format != effective_format {
485 warn!(
486 session_id = self.session_id,
487 requested = requested_format.extension(),
488 "Recorder format requires unavailable feature; falling back to wav"
489 );
490 }
491
492 if effective_format == RecorderFormat::Ogg {
493 #[cfg(feature = "opus")]
494 {
495 return self
496 .process_recording_ogg(file_path, receiver, first_frame)
497 .await;
498 }
499 #[cfg(not(feature = "opus"))]
500 {
501 unreachable!(
502 "RecorderFormat::effective() should prevent ogg when opus feature is disabled"
503 );
504 }
505 }
506
507 self.process_recording_wav(file_path, receiver, first_frame)
508 .await
509 }
510
511 fn ensure_parent_dir(&self, file_path: &Path) -> Result<()> {
512 if let Some(parent) = file_path.parent() {
513 if !parent.exists() {
514 if let Err(e) = std::fs::create_dir_all(parent) {
515 warn!(
516 "Failed to create recording file parent directory: {} {}",
517 e,
518 file_path.display()
519 );
520 return Err(anyhow!("Failed to create recording file parent directory"));
521 }
522 }
523 }
524 Ok(())
525 }
526
527 async fn create_output_file(&self, file_path: &Path) -> Result<File> {
528 self.ensure_parent_dir(file_path)?;
529 match File::create(file_path).await {
530 Ok(file) => {
531 info!(
532 session_id = self.session_id,
533 "recorder: created recording file: {}",
534 file_path.display()
535 );
536 Ok(file)
537 }
538 Err(e) => {
539 warn!(
540 "Failed to create recording file: {} {}",
541 e,
542 file_path.display()
543 );
544 Err(anyhow!("Failed to create recording file"))
545 }
546 }
547 }
548
549 async fn update_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
550 let total_bytes = self.samples_written.load(Ordering::SeqCst);
551 let data_size = total_bytes;
552
553 let (format_tag, sample_rate, channels): (u16, u32, u16) = match payload_type {
554 0 => (0x0007, 8000, 1), 8 => (0x0006, 8000, 1), 9 => (0x0064, 16000, 1), _ => return Ok(()), };
559
560 let mut header_buf = Vec::new();
561 header_buf.extend_from_slice(b"RIFF");
562 let file_size = data_size + 36;
563 header_buf.extend_from_slice(&(file_size as u32).to_le_bytes());
564 header_buf.extend_from_slice(b"WAVE");
565
566 header_buf.extend_from_slice(b"fmt ");
567 header_buf.extend_from_slice(&16u32.to_le_bytes());
568 header_buf.extend_from_slice(&format_tag.to_le_bytes());
569 header_buf.extend_from_slice(&(channels as u16).to_le_bytes());
570 header_buf.extend_from_slice(&sample_rate.to_le_bytes());
571
572 let bytes_per_sec: u32 = match payload_type {
574 9 => 8000, _ => sample_rate * (channels as u32) * 1, };
577 header_buf.extend_from_slice(&bytes_per_sec.to_le_bytes());
578
579 let block_align: u16 = match payload_type {
581 9 => 1,
582 _ => 1,
583 };
584 header_buf.extend_from_slice(&block_align.to_le_bytes());
585
586 let bits_per_sample: u16 = match payload_type {
588 9 => 4,
589 _ => 8,
590 };
591 header_buf.extend_from_slice(&bits_per_sample.to_le_bytes());
592
593 header_buf.extend_from_slice(b"data");
594 header_buf.extend_from_slice(&(data_size as u32).to_le_bytes());
595
596 file.seek(std::io::SeekFrom::Start(0)).await?;
597 file.write_all(&header_buf).await?;
598 file.seek(std::io::SeekFrom::End(0)).await?;
599
600 Ok(())
601 }
602
603 async fn process_recording_rtp(
604 &self,
605 file_path: &Path,
606 mut receiver: UnboundedReceiver<AudioFrame>,
607 first_frame: AudioFrame,
608 ) -> Result<()> {
609 let (payload_type, mut file) =
610 if let Samples::RTP { payload_type, .. } = &first_frame.samples {
611 let mut path = file_path.to_path_buf();
612 if *payload_type == 18 {
614 path.set_extension("g729");
615 } else if matches!(payload_type, 0 | 8 | 9) {
616 path.set_extension("wav");
617 }
618
619 let file = self.create_output_file(&path).await?;
620 (*payload_type, file)
621 } else {
622 return Err(anyhow!("Invalid frame type for RTP recording"));
623 };
624
625 match payload_type {
627 0 | 8 | 9 => {
628 self.write_wav_header_rtp(&mut file, payload_type).await?;
629 }
630 _ => {}
631 }
632
633 if let Samples::RTP { payload, .. } = first_frame.samples {
634 file.write_all(&payload).await?;
635 self.samples_written
636 .fetch_add(payload.len(), Ordering::SeqCst);
637 }
638
639 loop {
640 match receiver.recv().await {
641 Some(frame) => {
642 if let Samples::RTP { payload, .. } = frame.samples {
643 file.write_all(&payload).await?;
644 self.samples_written
645 .fetch_add(payload.len(), Ordering::SeqCst);
646 }
647 }
648 None => break,
649 }
650 }
651
652 match payload_type {
654 0 | 8 | 9 => {
655 self.update_wav_header_rtp(&mut file, payload_type).await?;
656 }
657 _ => {}
658 }
659
660 file.sync_all().await?;
661
662 Ok(())
663 }
664
665 async fn write_wav_header_rtp(&self, file: &mut File, payload_type: u8) -> Result<()> {
667 self.update_wav_header_rtp(file, payload_type).await
668 }
669
670 async fn process_recording_wav(
671 &self,
672 file_path: &Path,
673 mut receiver: UnboundedReceiver<AudioFrame>,
674 first_frame: AudioFrame,
675 ) -> Result<()> {
676 let mut file = self.create_output_file(file_path).await?;
677 self.update_wav_header(&mut file).await?;
678
679 self.append_frame(first_frame).await.ok();
680
681 let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
682 info!(
683 session_id = self.session_id,
684 format = "wav",
685 "Recording to {} ptime: {}ms chunk_size: {}",
686 file_path.display(),
687 self.option.ptime,
688 chunk_size
689 );
690
691 let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
692 self.option.ptime as u64,
693 )));
694 loop {
695 select! {
696 Some(frame) = receiver.recv() => {
697 self.append_frame(frame).await.ok();
698 }
699 _ = interval.next() => {
700 let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
701 self.process_buffers(&mut file, mono_buf, stereo_buf).await?;
702 self.update_wav_header(&mut file).await?;
703 }
704 _ = self.cancel_token.cancelled() => {
705 self.flush_buffers(&mut file).await?;
706 self.update_wav_header(&mut file).await?;
707 return Ok(());
708 }
709 }
710 }
711 }
712
713 #[cfg(feature = "opus")]
714 async fn process_recording_ogg(
715 &self,
716 file_path: &Path,
717 mut receiver: UnboundedReceiver<AudioFrame>,
718 first_frame: AudioFrame,
719 ) -> Result<()> {
720 let mut file = self.create_output_file(file_path).await?;
721 let mut writer = OggStreamWriter::new(self.option.samplerate)?;
722 if writer.sample_rate() != self.option.samplerate {
723 warn!(
724 session_id = self.session_id,
725 requested = self.option.samplerate,
726 using = writer.sample_rate(),
727 "Adjusted recorder samplerate to Opus-compatible value"
728 );
729 }
730 writer.write_headers(&mut file).await?;
731
732 self.append_frame(first_frame).await.ok();
733
734 let chunk_size = (self.option.samplerate / 1000 * self.option.ptime) as usize;
735 info!(
736 session_id = self.session_id,
737 format = "ogg",
738 "Recording to {} ptime: {}ms chunk_size: {}",
739 file_path.display(),
740 self.option.ptime,
741 chunk_size
742 );
743
744 let frame_samples = std::cmp::max(1, (writer.sample_rate() / 50) as usize);
745 let frame_step = frame_samples * 2; let mut pending: Vec<i16> = Vec::new();
747
748 let mut interval = IntervalStream::new(tokio::time::interval(Duration::from_millis(
749 self.option.ptime as u64,
750 )));
751
752 loop {
753 select! {
754 Some(frame) = receiver.recv() => {
755 self.append_frame(frame).await.ok();
756 }
757 _ = interval.next() => {
758 let (mono_buf, stereo_buf) = self.pop(chunk_size).await;
759 if mono_buf.is_empty() && stereo_buf.is_empty() {
760 continue;
761 }
762
763 let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
764 pending.extend_from_slice(&mix);
765
766 let encoded_samples = self
767 .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, false)
768 .await?;
769 if encoded_samples > 0 {
770 self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
771 }
772 }
773 _ = self.cancel_token.cancelled() => {
774 let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
775 if !mono_buf.is_empty() || !stereo_buf.is_empty() {
776 let mix = Self::mix_buffers(&mono_buf, &stereo_buf);
777 pending.extend_from_slice(&mix);
778 }
779
780 let encoded_samples = self
781 .encode_pending_frames(&mut pending, frame_step, &mut writer, &mut file, true)
782 .await?;
783 if encoded_samples > 0 {
784 self.samples_written.fetch_add(encoded_samples, Ordering::SeqCst);
785 }
786
787 writer.finalize(&mut file).await?;
788 return Ok(());
789 }
790 }
791 }
792 }
793
794 #[cfg(feature = "opus")]
795 async fn encode_pending_frames(
796 &self,
797 pending: &mut Vec<i16>,
798 frame_step: usize,
799 writer: &mut OggStreamWriter,
800 file: &mut File,
801 pad_final: bool,
802 ) -> Result<usize> {
803 let mut total_samples = 0usize;
804 let samples_per_channel = frame_step / 2;
805 while pending.len() >= frame_step {
806 let frame: Vec<i16> = pending.drain(..frame_step).collect();
807 let packet = writer.encode_frame(&frame)?;
808 writer
809 .write_audio_packet(file, &packet, samples_per_channel)
810 .await?;
811 total_samples += samples_per_channel;
812 }
813
814 if pad_final && !pending.is_empty() {
815 let mut frame: Vec<i16> = pending.drain(..).collect();
816 frame.resize(frame_step, 0);
817 let packet = writer.encode_frame(&frame)?;
818 writer
819 .write_audio_packet(file, &packet, samples_per_channel)
820 .await?;
821 total_samples += samples_per_channel;
822 }
823
824 Ok(total_samples)
825 }
826
827 fn get_channel_index(&self, track_id: &str) -> usize {
829 let mut channels = self.channels.lock().unwrap();
830 if let Some(&channel_idx) = channels.get(track_id) {
831 channel_idx % 2
832 } else {
833 let new_idx = self.channel_idx.fetch_add(1, Ordering::SeqCst);
834 channels.insert(track_id.to_string(), new_idx);
835 info!(
836 session_id = self.session_id,
837 "Assigned channel {} to track: {}",
838 new_idx % 2,
839 track_id
840 );
841 new_idx % 2
842 }
843 }
844
845 async fn append_frame(&self, frame: AudioFrame) -> Result<()> {
846 let buffer = match frame.samples {
847 Samples::PCM { samples } => samples,
848 _ => return Ok(()), };
850
851 if buffer.is_empty() {
853 return Ok(());
854 }
855
856 let channel_idx = self.get_channel_index(&frame.track_id);
858
859 match channel_idx {
861 0 => {
862 let mut mono_buf = self.mono_buf.lock().unwrap();
863 mono_buf.extend(buffer.iter());
864 }
865 1 => {
866 let mut stereo_buf = self.stereo_buf.lock().unwrap();
867 stereo_buf.extend(buffer.iter());
868 }
869 _ => {}
870 }
871
872 Ok(())
873 }
874
875 pub(crate) fn extract_samples(buffer: &mut PcmBuf, extract_size: usize) -> PcmBuf {
877 if extract_size > 0 && !buffer.is_empty() {
878 let take_size = extract_size.min(buffer.len());
879 buffer.drain(..take_size).collect()
880 } else {
881 Vec::new()
882 }
883 }
884
885 async fn pop(&self, chunk_size: usize) -> (PcmBuf, PcmBuf) {
886 let mut mono_buf = self.mono_buf.lock().unwrap();
887 let mut stereo_buf = self.stereo_buf.lock().unwrap();
888
889 let safe_chunk_size = chunk_size.min(16000 * 10); let mono_result = if mono_buf.len() >= safe_chunk_size {
893 Self::extract_samples(&mut mono_buf, safe_chunk_size)
895 } else if !mono_buf.is_empty() {
896 let available_len = mono_buf.len(); let mut result = Self::extract_samples(&mut mono_buf, available_len);
899 if chunk_size != usize::MAX {
900 result.resize(safe_chunk_size, 0); }
903 result
904 } else {
905 if chunk_size != usize::MAX {
907 vec![0; safe_chunk_size]
908 } else {
909 Vec::new()
910 }
911 };
912
913 let stereo_result = if stereo_buf.len() >= safe_chunk_size {
914 Self::extract_samples(&mut stereo_buf, safe_chunk_size)
916 } else if !stereo_buf.is_empty() {
917 let available_len = stereo_buf.len(); let mut result = Self::extract_samples(&mut stereo_buf, available_len);
920 if chunk_size != usize::MAX {
921 result.resize(safe_chunk_size, 0); }
924 result
925 } else {
926 if chunk_size != usize::MAX {
928 vec![0; safe_chunk_size]
929 } else {
930 Vec::new()
931 }
932 };
933
934 if chunk_size == usize::MAX {
936 let max_len = mono_result.len().max(stereo_result.len());
937 let mut mono_final = mono_result;
938 let mut stereo_final = stereo_result;
939 mono_final.resize(max_len, 0);
940 stereo_final.resize(max_len, 0);
941 (mono_final, stereo_final)
942 } else {
943 (mono_result, stereo_result)
944 }
945 }
946
947 pub fn stop_recording(&self) -> Result<()> {
948 self.cancel_token.cancel();
949 Ok(())
950 }
951
952 pub(crate) fn mix_buffers(mono_buf: &PcmBuf, stereo_buf: &PcmBuf) -> Vec<i16> {
954 assert_eq!(
956 mono_buf.len(),
957 stereo_buf.len(),
958 "Buffer lengths must be equal after pop()"
959 );
960
961 let len = mono_buf.len();
962 let mut mix_buff = Vec::with_capacity(len * 2);
963
964 for i in 0..len {
965 mix_buff.push(mono_buf[i]); mix_buff.push(stereo_buf[i]); }
968
969 mix_buff
970 }
971
972 async fn write_audio_data(
974 &self,
975 file: &mut File,
976 mono_buf: &PcmBuf,
977 stereo_buf: &PcmBuf,
978 ) -> Result<usize> {
979 let max_len = mono_buf.len().max(stereo_buf.len());
980 if max_len == 0 {
981 return Ok(0);
982 }
983
984 let mix_buff = Self::mix_buffers(mono_buf, stereo_buf);
985
986 file.seek(std::io::SeekFrom::End(0)).await?;
987 file.write_all(&samples_to_bytes(&mix_buff)).await?;
988
989 Ok(max_len)
990 }
991
992 async fn process_buffers(
994 &self,
995 file: &mut File,
996 mono_buf: PcmBuf,
997 stereo_buf: PcmBuf,
998 ) -> Result<()> {
999 if mono_buf.is_empty() && stereo_buf.is_empty() {
1001 return Ok(());
1002 }
1003 let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1005 if samples_written > 0 {
1006 self.samples_written
1007 .fetch_add(samples_written, Ordering::SeqCst);
1008 }
1009 Ok(())
1010 }
1011
1012 async fn flush_buffers(&self, file: &mut File) -> Result<()> {
1014 loop {
1015 let (mono_buf, stereo_buf) = self.pop(usize::MAX).await;
1016
1017 if mono_buf.is_empty() && stereo_buf.is_empty() {
1018 break;
1019 }
1020
1021 let samples_written = self.write_audio_data(file, &mono_buf, &stereo_buf).await?;
1022 if samples_written > 0 {
1023 self.samples_written
1024 .fetch_add(samples_written, Ordering::SeqCst);
1025 }
1026 }
1027
1028 Ok(())
1029 }
1030}