Skip to main content

gosuto_libwebrtc/native/
audio_source.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cxx::SharedPtr;
16use tokio::sync::oneshot;
17use gosuto_webrtc_sys::audio_track as sys_at;
18
19use crate::{audio_frame::AudioFrame, audio_source::AudioSourceOptions, RtcError, RtcErrorType};
20
21#[derive(Clone)]
22pub struct NativeAudioSource {
23    sys_handle: SharedPtr<sys_at::ffi::AudioTrackSource>,
24    sample_rate: u32,
25    num_channels: u32,
26    queue_size_samples: u32,
27}
28
29impl NativeAudioSource {
30    /// Creates a new [`NativeAudioSource`].
31    ///
32    /// # Arguments
33    /// * `options` – Configuration options for the source (e.g. echo cancellation, noise suppression).
34    /// * `sample_rate` – Sampling rate in Hz (for example, `48000`).
35    /// * `num_channels` – Number of audio channels (`1` for mono, `2` for stereo, etc.).
36    /// * `queue_size_ms` – Size of the internal buffering queue, in milliseconds.
37    ///
38    /// # Behavior
39    /// - If `queue_size_ms` is **zero**, buffering is **disabled** and audio frames are
40    ///   delivered directly to webrtc sinks. In this mode, the caller **must provide 10 ms frames**
41    ///   (i.e., `sample_rate / 100` samples per channel) when calling [`capture_frame`].
42    /// - If `queue_size_ms` is **non-zero**, buffering is enabled. The value must be a
43    ///   **multiple of 10**, representing the total buffering duration in milliseconds.
44    ///   Frames will be queued and flushed to sinks asynchronously once the buffer
45    ///   reaches the configured threshold.
46    ///
47    /// # Panics
48    /// assert if `queue_size_ms` is not a multiple of 10.
49    pub fn new(
50        options: AudioSourceOptions,
51        sample_rate: u32,
52        num_channels: u32,
53        queue_size_ms: u32,
54    ) -> NativeAudioSource {
55        assert!(queue_size_ms % 10 == 0, "queue_size_ms must be a multiple of 10");
56
57        let sys_handle = sys_at::ffi::new_audio_track_source(
58            options.into(),
59            sample_rate.try_into().unwrap(),
60            num_channels.try_into().unwrap(),
61            queue_size_ms.try_into().unwrap(),
62        );
63
64        let queue_size_samples = (queue_size_ms * sample_rate * num_channels) / 1000;
65        Self { sys_handle, sample_rate, num_channels, queue_size_samples }
66    }
67
68    pub fn sys_handle(&self) -> SharedPtr<sys_at::ffi::AudioTrackSource> {
69        self.sys_handle.clone()
70    }
71
72    pub fn set_audio_options(&self, options: AudioSourceOptions) {
73        self.sys_handle.set_audio_options(&sys_at::ffi::AudioSourceOptions::from(options))
74    }
75
76    pub fn audio_options(&self) -> AudioSourceOptions {
77        self.sys_handle.audio_options().into()
78    }
79
80    pub fn sample_rate(&self) -> u32 {
81        self.sample_rate
82    }
83
84    pub fn num_channels(&self) -> u32 {
85        self.num_channels
86    }
87
88    pub fn clear_buffer(&self) {
89        self.sys_handle.clear_buffer();
90    }
91
92    pub async fn capture_frame(&self, frame: &AudioFrame<'_>) -> Result<(), RtcError> {
93        if self.sample_rate != frame.sample_rate || self.num_channels != frame.num_channels {
94            return Err(RtcError {
95                error_type: RtcErrorType::InvalidState,
96                message: "sample_rate and num_channels don't match".to_owned(),
97            });
98        }
99
100        // Fast path: no buffering
101        if self.queue_size_samples == 0 {
102            // frame size must be 10ms for fast path
103            let expected_frames_per_ch = (self.sample_rate / 100) as usize;
104            if frame.data.len() % (self.num_channels as usize) != 0 {
105                return Err(RtcError {
106                    error_type: RtcErrorType::InvalidState,
107                    message: "frame.data length not divisible by channel count".to_owned(),
108                });
109            }
110            let nb_frames = frame.data.len() / (self.num_channels as usize);
111            if nb_frames != expected_frames_per_ch {
112                return Err(RtcError {
113                    error_type: RtcErrorType::InvalidState,
114                    message: format!(
115                        "direct capture requires 10ms frames: got {} frames, expected {}",
116                        nb_frames, expected_frames_per_ch
117                    ),
118                });
119            }
120
121            // Define a no-op callback for fast path (queue_size_ms=0)
122            // This is safer than passing null, which can cause UB in release mode optimizations
123            extern "C" fn noop_complete_callback(_ctx: *const sys_at::SourceContext) {
124                // No-op: fast path completes synchronously, no callback needed
125            }
126
127            unsafe {
128                let data: &[i16] = frame.data.as_ref();
129                // Use a valid no-op callback instead of null for safety
130                // In release mode, transmuting null pointers can cause UB
131                let noop_callback = sys_at::CompleteCallback(noop_complete_callback);
132                let ok = self.sys_handle.capture_frame(
133                    data,
134                    self.sample_rate,
135                    self.num_channels,
136                    nb_frames,
137                    std::ptr::null(), // Context is still null - callback won't use it
138                    noop_callback,
139                );
140                if !ok {
141                    return Err(RtcError {
142                        error_type: RtcErrorType::InvalidState,
143                        message: "failed to capture frame without buffering".to_owned(),
144                    });
145                }
146            }
147            return Ok(());
148        }
149
150        // Buffered path.
151        extern "C" fn lk_audio_source_complete(userdata: *const sys_at::SourceContext) {
152            let tx = unsafe { Box::from_raw(userdata as *mut oneshot::Sender<()>) };
153            let _ = tx.send(());
154        }
155
156        // iterate over chunks of self._queue_size_samples
157        for chunk in frame.data.chunks(self.queue_size_samples as usize) {
158            let nb_frames = chunk.len() / self.num_channels as usize;
159            let (tx, rx) = oneshot::channel::<()>();
160            let ctx = Box::new(tx);
161            let ctx_ptr = Box::into_raw(ctx) as *const sys_at::SourceContext;
162
163            unsafe {
164                // In the fast path, C++ never store / invoke on_complete / ctx.
165                if !self.sys_handle.capture_frame(
166                    chunk,
167                    self.sample_rate,
168                    self.num_channels,
169                    nb_frames,
170                    ctx_ptr,
171                    sys_at::CompleteCallback(lk_audio_source_complete),
172                ) {
173                    return Err(RtcError {
174                        error_type: RtcErrorType::InvalidState,
175                        message: "failed to capture frame".to_owned(),
176                    });
177                }
178            }
179
180            let _ = rx.await;
181        }
182
183        Ok(())
184    }
185}
186
187impl From<sys_at::ffi::AudioSourceOptions> for AudioSourceOptions {
188    fn from(options: sys_at::ffi::AudioSourceOptions) -> Self {
189        Self {
190            echo_cancellation: options.echo_cancellation,
191            noise_suppression: options.noise_suppression,
192            auto_gain_control: options.auto_gain_control,
193        }
194    }
195}
196
197impl From<AudioSourceOptions> for sys_at::ffi::AudioSourceOptions {
198    fn from(options: AudioSourceOptions) -> Self {
199        Self {
200            echo_cancellation: options.echo_cancellation,
201            noise_suppression: options.noise_suppression,
202            auto_gain_control: options.auto_gain_control,
203        }
204    }
205}