use crate::{
audio::{Interval, PCMSlice},
collections::{
linked_list::{LLNodeOps, LLOps, LinkedList},
segment_tree::{index_types::GlobalIndex, CircularSegmentTree, TreeIterState},
Ptr,
},
math::Vec4,
math::FP64,
mem,
};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
fmt::Debug,
};
pub mod protocol;
pub mod standard;
pub mod streams;
pub mod time;
use self::protocol::{
AddTrackErr, LocalRequestQueue, MixerEventKind, MixerRequest, MixerResponse, OffsetKind,
RemoveTrackErr, RequestQueuePtr, ResponseQueuePtr, TrackID, TrackMutatedErr,
};
pub use self::time::SampleTime;
pub type MutatedResult<T> = Result<T, TrackMutatedErr>;
#[derive(Clone)]
pub struct MixerProtocol {
requests: RequestQueuePtr,
responses: ResponseQueuePtr,
}
impl MixerProtocol {
pub fn submit_requests(&self, queue: &mut LocalRequestQueue) {
self.requests.submit_requests(queue);
}
pub fn recieve_responses(&self) -> impl Iterator<Item = MixerResponse> + '_ {
self.responses.recieve_responses()
}
}
#[derive(Copy, Clone)]
struct MixerCursor {
t0: SampleTime,
delta: SampleTime,
}
impl MixerCursor {
pub fn new(t0: SampleTime, delta: SampleTime) -> Self {
Self {
t0,
delta,
}
}
pub fn to_interval_ms(self) -> Interval {
let lo = self.t0.elapsed_in_ms_fp();
let hi = self.t0.sum(&self.delta).elapsed_in_ms_fp();
Interval { lo, hi }
}
#[allow(dead_code)]
pub fn to_interval_tuple_ms_f32(self) -> (f32, f32) {
let lo = self.t0.elapsed_in_ms_f32();
let hi = self.t0.sum(&self.delta).elapsed_in_ms_f32();
(lo, hi)
}
}
#[derive(Copy, Clone, Debug)]
pub struct StreamState {
pub local_time: SampleTime,
pub global_interval: Interval,
pub attack_time: u32,
pub release_time: u32,
pub gain: f32,
pub pan: f32,
pub frequency: u32,
pub channels: u32,
}
#[derive(Copy, Clone)]
pub struct PullInfo {
pub samples_read: usize,
pub samples_read_per_channel: usize,
pub elapsed_audio_in_ms: FP64,
}
pub trait HasAudioStream: Send + Debug {
fn stream_state(&self) -> &StreamState;
fn stream_state_mut(&mut self) -> &mut StreamState;
fn interval(&self) -> &Interval {
&self.stream_state().global_interval
}
fn gain(&self) -> f32 {
self.stream_state().gain
}
fn set_gain(&mut self, new_gain: f32) {
self.stream_state_mut().gain = new_gain.abs().max(0.0);
}
fn frequency(&self) -> u32 {
self.stream_state().frequency
}
fn interval_mut(&mut self) -> &mut Interval {
&mut self.stream_state_mut().global_interval
}
fn calculate_samples_needed(&self, dt: u32) -> u32 {
const NUM_MILLISECONDS_IN_ONE_SECOND: u32 = 1000;
(self.frequency() * dt) / NUM_MILLISECONDS_IN_ONE_SECOND
}
fn calculate_samples_needed_per_channel_fp(&self, dt: FP64) -> FP64 {
let f = self.frequency();
super::calculate_samples_needed_per_channel_fp(f, dt)
}
fn calculate_samples_needed_per_channel_f32(&self, dt: f32) -> f32 {
const NUM_MILLISECONDS_IN_ONE_SECOND: f32 = 1000.0;
(self.frequency() as f32 * dt) / NUM_MILLISECONDS_IN_ONE_SECOND
}
fn time_remaining_in_ms(&self) -> FP64 {
let state = self.stream_state();
let local_t = state.local_time.elapsed_in_ms_fp();
let interval = state.global_interval;
interval.distance() - local_t
}
fn is_dead(&self) -> bool {
let state = self.stream_state();
let local_t = state.local_time.elapsed_in_ms_fp();
let interval = state.global_interval;
local_t > interval.distance()
}
fn pull_samples<'a>(
&mut self,
scratch_space: &mut [f32],
audio_pcm: PCMSlice<'a, f32>,
) -> PullInfo;
fn seek(&mut self, global_time: SampleTime);
}
pub struct Mixer {
request_queue: RequestQueuePtr,
response_queue: ResponseQueuePtr,
local_response_queue: VecDeque<MixerResponse>,
speed_factor: FP64,
global_t: SampleTime,
stream_scratch_space: Vec<f32>,
sample_scratch_space: Vec<f32>,
track_chart: CircularSegmentTree<Box<dyn HasAudioStream>>,
track_id_table: HashMap<TrackID, GlobalIndex>,
running_streams_table: HashMap<GlobalIndex, Ptr>,
running_streams_on_intersection: Vec<GlobalIndex>,
running_streams: LinkedList<GlobalIndex>,
track_removal_stack: Vec<Ptr>,
}
impl Mixer {
pub fn new(sample_rate: u32, _channels: u32) -> Self {
Self {
request_queue: RequestQueuePtr::new(),
response_queue: ResponseQueuePtr::new(),
global_t: SampleTime::new().with_sample_rate(sample_rate),
running_streams_on_intersection: Vec::new(),
running_streams: LinkedList::new(),
running_streams_table: HashMap::new(),
track_chart: CircularSegmentTree::new(30, 1 << 30),
track_removal_stack: vec![],
track_id_table: HashMap::new(),
stream_scratch_space: vec![0.0f32; 8192 * 2],
sample_scratch_space: vec![0.0f32; 8192 * 2],
local_response_queue: VecDeque::new(),
speed_factor: FP64::from(1),
}
}
fn mix_audio(&mut self, mut output_buffer: PCMSlice<'_, f32>) {
let new_samples_per_channel =
FP64::from(output_buffer.samples_per_channel()) * self.speed_factor;
let cursor = MixerCursor::new(
self.global_t,
self.global_t
.with_sample_count(new_samples_per_channel.as_i64().max(0) as u64),
);
output_buffer.set_zero();
self.search_for_active_tracks(cursor);
self.mix_active_tracks(cursor, output_buffer);
self.remove_irrelevent_tracks(cursor);
self.handle_user_requests(cursor);
self.forward_local_responses_back_to_client();
self.global_t.increment(cursor.delta.samps());
}
fn mix_active_tracks(&mut self, cursor: MixerCursor, output_buffer: PCMSlice<f32>) {
self.handle_intersecting_tracks_not_first_time(cursor, output_buffer);
self.handle_intersecting_tracks(cursor, output_buffer);
}
fn handle_intersecting_tracks(
&mut self,
cursor: MixerCursor,
mut output_buffer: PCMSlice<f32>,
) {
let track_chart = &mut self.track_chart;
let running_streams_table = &mut self.running_streams_table;
let running_streams_on_intersection = &mut self.running_streams_on_intersection;
let running_streams_on_intersection_ptr =
running_streams_on_intersection as *const Vec<GlobalIndex>;
let running_streams = &mut self.running_streams;
let stream_scratch_space = self.stream_scratch_space.as_mut_slice();
let sample_scratch_space = &mut self.sample_scratch_space;
let iter = running_streams_on_intersection.iter().enumerate().rev();
let delta = cursor.delta.elapsed_in_ms_fp();
for (stream_vec_idx, &gi) in iter {
let current_track = &mut track_chart[gi];
let samples_needed_per_channel = current_track
.calculate_samples_needed_per_channel_fp(delta)
.ceil()
.as_i64() as usize;
let samples_needed = samples_needed_per_channel * 2;
let PullInfo { samples_read, .. } = current_track.pull_samples(
stream_scratch_space,
output_buffer.with_slice(&sample_scratch_space[0..samples_needed]),
);
resample_and_mix_assumed_2_channels(
&sample_scratch_space[0..samples_read],
&mut output_buffer[..],
);
let stream_idx = unsafe {
let on_intersection_ref =
mem::force_ptr_to_ref_mut(running_streams_on_intersection_ptr);
on_intersection_ref.swap_remove(stream_vec_idx)
};
running_streams.push_rear(stream_idx);
let stream_ptr = running_streams.get_rear();
if let Some(ptr) = running_streams_table.get_mut(&stream_idx) {
*ptr = stream_ptr;
}
}
}
fn handle_intersecting_tracks_not_first_time(
&mut self,
cursor: MixerCursor,
mut output_buffer: PCMSlice<f32>,
) {
let track_chart = &mut self.track_chart;
let running_streams = &mut self.running_streams;
let stream_scratch_space = self.stream_scratch_space.as_mut_slice();
let sample_scratch_space = &mut self.sample_scratch_space;
let elapsed_time = cursor.delta.elapsed_in_ms_fp();
running_streams
.iter()
.filter_map(|e| e.get_data())
.for_each(|&gi| {
let current_track = &mut track_chart[gi];
let elapsed_time_in_ms = elapsed_time;
let samples_required_to_pull_from_track =
current_track.calculate_samples_needed_per_channel_fp(elapsed_time_in_ms) * 2;
let samples_required_to_pull_from_track_truncated =
samples_required_to_pull_from_track.ceil().as_i64() as usize;
let PullInfo { samples_read, .. } = current_track.pull_samples(
stream_scratch_space,
output_buffer.with_slice(
&sample_scratch_space[0..samples_required_to_pull_from_track_truncated],
),
);
resample_and_mix_assumed_2_channels(
&sample_scratch_space[0..samples_read],
&mut output_buffer[..],
);
});
}
fn search_for_active_tracks(&mut self, cursor: MixerCursor) {
let track_chart = &mut self.track_chart;
let running_streams_on_intersection = &mut self.running_streams_on_intersection;
let running_streams_table = &mut self.running_streams_table;
let local_response_queue = &mut self.local_response_queue;
let track_id_table = &mut self.track_id_table;
track_chart
.search_interval(&mut TreeIterState::new(), cursor.to_interval_ms())
.for_each(|(gi, _)| {
if let Entry::Vacant(vacant_entry) = running_streams_table.entry(gi) {
running_streams_on_intersection.push(gi);
vacant_entry.insert(Ptr::null());
let track_id = track_id_table
.iter()
.find(|&(_, &v)| v == gi)
.map(|(&k, _)| k)
.expect("track_id should exist");
local_response_queue.push_back(MixerResponse::MixerEvent(
MixerEventKind::TrackStarted(track_id),
));
}
});
}
fn remove_irrelevent_tracks(&mut self, _cursor: MixerCursor) {
self.remove_irrelevent_tracks_predicate(|track| {
track.time_remaining_in_ms() < FP64::from(1)
})
}
fn remove_irrelevent_tracks_predicate<Predicate>(&mut self, can_be_removed: Predicate)
where
Predicate: Fn(&Box<dyn HasAudioStream>) -> bool,
{
let track_chart = &mut self.track_chart;
let running_streams = &mut self.running_streams;
let running_streams_table = &mut self.running_streams_table;
let track_removal_stack = &mut self.track_removal_stack;
let track_id_table = &mut self.track_id_table;
let local_response_queue = &mut self.local_response_queue;
for node_ptr in running_streams.node_index_iter() {
let &gi = running_streams[node_ptr]
.get_data()
.expect("should be available");
if can_be_removed(&track_chart[gi]) {
track_removal_stack.push(node_ptr);
running_streams_table.remove(&gi);
let track_id = track_id_table
.iter()
.find(|&(_, &v)| v == gi)
.map(|(&k, _)| k)
.expect("track_id should exist");
local_response_queue.push_back(MixerResponse::MixerEvent(
MixerEventKind::TrackStopped(track_id),
));
}
}
while let Some(node_ptr) = track_removal_stack.pop() {
running_streams.remove(node_ptr);
}
}
fn forward_local_responses_back_to_client(&mut self) -> Option<()> {
let local_response_queue = &mut self.local_response_queue;
let mut response_queue = self.response_queue.lock()?;
while let Some(local_response) = local_response_queue.pop_front() {
response_queue.push_back(local_response);
}
Some(())
}
fn handle_user_requests(&mut self, cursor: MixerCursor) -> Option<()> {
let mixer_ref = unsafe { mem::force_static_mut(self) };
let track_chart = &mut self.track_chart;
let track_id_table = &mut self.track_id_table;
let running_streams_table = &mut self.running_streams_table;
let global_t = &mut self.global_t;
let mut request_queue = self.request_queue.lock()?;
let mut response_queue = self.response_queue.lock()?;
let current_time = cursor.t0.elapsed_in_ms_fp();
while let Some(req) = request_queue.pop_front() {
match req {
MixerRequest::FetchMixerTime => {
response_queue.push_back(MixerResponse::MixerTime(*global_t))
}
MixerRequest::AddTrack(tid, off, track) => {
response_queue.push_back(MixerResponse::AddTrackStatus(
tid,
Self::request_operation_add_track(
track_chart,
track_id_table,
current_time,
tid,
off,
track,
),
));
}
MixerRequest::MutateMixer(tid, callback) => response_queue.push_back(
MixerResponse::MixerMutatedStatus(tid, callback(tid, mixer_ref)),
),
MixerRequest::RemoveTrack(tid) => {
response_queue.push_back(MixerResponse::RemoveTrackStatus(
tid,
Self::request_operation_remove_track(
tid,
track_id_table,
running_streams_table,
track_chart,
),
));
}
MixerRequest::Seek(offset_kind) => {
Self::request_operation_seek(track_chart, global_t, offset_kind);
mixer_ref.remove_irrelevent_tracks_predicate(|track| {
!track.interval().is_within(global_t.elapsed_in_ms_fp())
});
}
}
}
Some(())
}
fn request_operation_seek(
track_chart: &mut CircularSegmentTree<Box<dyn HasAudioStream>>,
global_t: &mut SampleTime,
offset_kind: OffsetKind,
) {
let global_t_in_ms = global_t.elapsed_in_ms_fp();
let new_global_t = match offset_kind {
OffsetKind::Current { offset } => {
let offset = FP64::from(offset);
global_t.from_time_in_ms_fp((global_t_in_ms + offset).max(FP64::zero()))
}
OffsetKind::Start { offset } => {
let offset = FP64::from(offset);
global_t.from_time_in_ms_fp((offset).max(FP64::zero()))
}
};
*global_t = new_global_t;
for track in track_chart.values_mut() {
track.seek(new_global_t);
}
}
fn request_operation_add_track(
track_chart: &mut CircularSegmentTree<Box<dyn HasAudioStream>>,
track_id_table: &mut HashMap<TrackID, GlobalIndex>,
current_time: FP64,
tid: TrackID,
off: OffsetKind,
mut track: Box<dyn HasAudioStream>,
) -> Result<(), AddTrackErr> {
if track_id_table.contains_key(&tid) {
return Err(AddTrackErr::TrackIdAlreadyExists(track));
}
let offset_interval = match off {
OffsetKind::Current { offset } => {
*track.interval() + (current_time + FP64::from(offset))
}
OffsetKind::Start { offset } => {
let interval_length = track.interval().distance();
Interval::from_point_and_length(FP64::from(offset), interval_length)
}
};
*track.interval_mut() = offset_interval;
let global_idx = track_chart.insert(offset_interval, track);
track_id_table.insert(tid, global_idx);
Ok(())
}
fn request_operation_remove_track(
tid: TrackID,
track_id_table: &mut HashMap<TrackID, GlobalIndex>,
running_streams_table: &mut HashMap<GlobalIndex, Ptr>,
track_chart: &mut CircularSegmentTree<Box<dyn HasAudioStream>>,
) -> Result<Box<dyn HasAudioStream>, RemoveTrackErr> {
let &global_idx = track_id_table
.get(&tid)
.ok_or(RemoveTrackErr::TrackNotFound)?;
let track_is_currently_playing = running_streams_table.contains_key(&global_idx);
if track_is_currently_playing {
return Err(RemoveTrackErr::TrackCurrentlyPlaying);
}
track_id_table
.remove(&tid)
.expect("tid should already exist");
let item = track_chart
.remove_by_global_idx(global_idx)
.expect("item should exist");
Ok(item)
}
fn protocol(&self) -> MixerProtocol {
MixerProtocol {
requests: self.request_queue.clone(),
responses: self.response_queue.clone(),
}
}
pub fn set_mixer_speed(&mut self, speed: FP64) -> MutatedResult<()> {
self.speed_factor = speed.max(FP64::zero());
Ok(())
}
pub fn print_tree(&self) -> MutatedResult<()> {
self.track_chart.print_tree(".");
Ok(())
}
pub fn track_get_interval(&self, tid: TrackID) -> MutatedResult<Interval> {
let track_id_table = &self.track_id_table;
let track_chart = &self.track_chart;
let &gid = track_id_table
.get(&tid)
.ok_or(TrackMutatedErr::TrackNotFound)?;
let current_track = &track_chart[gid];
let &interval = current_track.interval();
Ok(interval)
}
pub fn track_set_interval(
&mut self,
tid: TrackID,
new_interval: Interval,
) -> MutatedResult<()> {
let ¤t_track_gid = self
.track_id_table
.get(&tid)
.ok_or(TrackMutatedErr::TrackNotFound)?;
let was_in_intersection = self
.remove_track_references_from_the_track_intersection_phase(current_track_gid)
.is_some();
let was_in_running = self
.remove_track_references_from_the_running_track_phase(current_track_gid)
.is_some();
let track_chart = &mut self.track_chart;
let mut track = track_chart
.remove_by_global_idx(current_track_gid)
.expect("track should exist, since it also exists in the track_id_table");
*track.interval_mut() = new_interval;
let new_gid = track_chart.insert(new_interval, track);
let track_id_table = &mut self.track_id_table;
*track_id_table
.get_mut(&tid)
.expect("tid should already exist in table") = new_gid;
if was_in_intersection != was_in_running {
self.running_streams_table.insert(new_gid, Ptr::null());
if was_in_intersection {
self.running_streams_on_intersection.push(new_gid);
} else {
self.running_streams.push_rear(new_gid);
let ptr = self.running_streams.get_rear();
*self
.running_streams_table
.get_mut(&new_gid)
.expect("key should exist") = ptr;
}
}
Ok(())
}
fn remove_track_references_from_the_track_intersection_phase(
&mut self,
current_track_gid: GlobalIndex,
) -> Option<()> {
let running_streams_on_intersection = &mut self.running_streams_on_intersection;
let running_streams_table = &mut self.running_streams_table;
let item_that_match_gid = running_streams_on_intersection
.iter_mut()
.enumerate()
.map(|(a, &mut b)| (a, b))
.rev()
.find(|&(_, gi)| gi == current_track_gid)
.map(|(k, _)| k)?;
running_streams_on_intersection.swap_remove(item_that_match_gid);
running_streams_table.remove(¤t_track_gid);
Some(())
}
fn remove_track_references_from_the_running_track_phase(
&mut self,
current_track_gid: GlobalIndex,
) -> Option<()> {
let running_streams = &mut self.running_streams;
let running_streams_table = &mut self.running_streams_table;
let ptr = running_streams_table.get(¤t_track_gid).copied()?;
if ptr.is_null() {
return None;
}
running_streams
.remove(ptr)
.expect("pointer registered in table, so should exist here as well");
running_streams_table.remove(¤t_track_gid);
Some(())
}
pub fn get_time(&self) -> SampleTime {
self.global_t
}
}
fn resample_and_mix_assumed_2_channels(src: &[f32], dst: &mut [f32]) {
mix_resample_audio_both_2_channels_iterator_version_vectorized(src, dst)
}
#[allow(dead_code, clippy::identity_op)]
fn mix_resample_audio_both_2_channels_slow_reference(src: &[f32], dst: &mut [f32]) {
const NUM_CHANNELS: usize = 2;
let src_sample_count = src.len() / NUM_CHANNELS;
let dst_sample_count = dst.len() / NUM_CHANNELS;
if src_sample_count == 0 || dst_sample_count == 0 {
return;
}
let scale_ratio = src_sample_count as f32 / dst_sample_count as f32;
for dst_i in 0..dst_sample_count {
let src_i_estimate = dst_i as f32 * scale_ratio;
let src_i = src_i_estimate as usize;
let lerp_t = src_i_estimate.fract();
for k in 0..NUM_CHANNELS {
let dst_index_sub_sample = NUM_CHANNELS * dst_i + k;
let cur_block = (src_i + 0).max(0);
let nxt_block = (src_i + 1).min(src_sample_count - 1);
let cur = src[NUM_CHANNELS * cur_block + k];
let nxt = src[NUM_CHANNELS * nxt_block + k];
let old_value = dst[dst_index_sub_sample];
let new_value = (nxt - cur) * lerp_t + cur;
let mixed_value = old_value + new_value;
dst[dst_index_sub_sample] = mixed_value;
}
}
}
#[allow(dead_code,clippy::identity_op)]
fn mix_resample_audio_both_2_channels_iterator_version_vectorized(src: &[f32], dst: &mut [f32]) {
const NUM_CHANNELS: usize = 2;
let src_sample_count = src.len() / NUM_CHANNELS;
let dst_sample_count = dst.len() / NUM_CHANNELS;
if src_sample_count == 0 || dst_sample_count == 0 {
return;
}
let scale_ratio = src_sample_count as f32 / dst_sample_count as f32;
dst.chunks_mut(2)
.enumerate()
.flat_map(|(dst_i, dst_chunk)| {
let src_i_estimate = dst_i as f32 * scale_ratio;
let src_i = src_i_estimate as usize;
let lerp_t = src_i_estimate.fract();
let cur_block = (src_i + 0).max(0);
let nxt_block = (src_i + 1).min(src_sample_count - 1);
dst_chunk.iter_mut().enumerate().map(move |(k, dst)| {
let (cur, nxt) = unsafe {
(
src.get_unchecked(NUM_CHANNELS * cur_block + k),
src.get_unchecked(NUM_CHANNELS * nxt_block + k),
)
};
let new_value = (nxt - cur) * lerp_t + cur;
(dst, new_value)
})
})
.for_each(|(dst, new_value)| *dst += new_value);
}
#[allow(dead_code)]
pub fn mix_resample_audio_test(src: &[f32], dst: &mut [f32]) {
const NUM_CHANNELS: usize = 2;
let src_sample_count = src.len() / NUM_CHANNELS;
let dst_sample_count = dst.len() / NUM_CHANNELS;
if src_sample_count == 0 || dst_sample_count == 0 {
return;
}
let delta = src_sample_count as f32 / dst_sample_count as f32;
let mut src_full = Vec4::from([0., 1., 2., 3.]) * delta;
let step = Vec4::from([2.0; 4]) * delta;
dst.chunks_mut(4).for_each(|chunks| {
chunks.iter_mut().enumerate().for_each(|(k, dst)| unsafe {
let src_floor = *src_full.get_unchecked(k / 2) as usize;
*dst += *src.get_unchecked(NUM_CHANNELS * src_floor);
});
src_full += step;
});
}