why2_chat/network/voice/client/
mod.rs1pub mod sfx;
21
22use std::
23{
24 thread,
25 net::UdpSocket,
26 collections::{ HashMap, VecDeque },
27 time::
28 {
29 SystemTime,
30 Duration,
31 UNIX_EPOCH,
32 },
33 sync::
34 {
35 Arc,
36 Mutex,
37 LazyLock,
38 mpsc::Sender,
39 atomic::{ AtomicUsize, Ordering },
40 },
41};
42
43use cpal::
44{
45 Device,
46 Stream,
47 StreamConfig,
48 SupportedStreamConfig,
49 SupportedStreamConfigRange,
50 traits::
51 {
52 DeviceTrait,
53 HostTrait,
54 StreamTrait,
55 },
56};
57
58use audiopus::
59{
60 Channels,
61 SampleRate,
62 Application,
63 TryFrom,
64 coder::{ Encoder, Decoder },
65};
66
67use ringbuf::
68{
69 HeapRb,
70 HeapCons,
71 HeapProd,
72 traits::
73 {
74 Split,
75 Producer,
76 Consumer,
77 },
78};
79
80use nnnoiseless::DenoiseState;
81
82use gag::Gag;
83
84use crate::
85{
86 options as chat_options,
87 network::
88 {
89 client::{ ClientEvent, VoiceUser },
90 voice::
91 {
92 self,
93 options,
94 VoiceCode,
95 VoicePacket,
96 client::sfx::SoundEffect,
97 },
98 },
99};
100
101struct LocalStream
103{
104 _input: Stream,
105 _output: Stream,
106}
107
108struct StreamGuard
109{
110 generation: usize,
111}
112
113struct RemoteStream
114{
115 consumer: HeapCons<f32>, resample_pos: f32, current_sample: f32, next_sample: f32, activity_hold: usize, display_hold: usize, username: String, latencies: VecDeque<u128>, avg_latency: u128, }
125
126pub struct PeerData
127{
128 decoder: Decoder, producer: HeapProd<f32>, }
131
132static LOCAL_STREAMS: LazyLock<Mutex<Option<LocalStream>>> = LazyLock::new(|| Mutex::new(None));
134static CONSUMERS: LazyLock<Mutex<HashMap<usize, (RemoteStream, PeerData)>>> = LazyLock::new(|| Mutex::new(HashMap::new())); static LOCAL_DISPLAY_HOLD: AtomicUsize = AtomicUsize::new(0);
137static AUDIO_GENERATION: AtomicUsize = AtomicUsize::new(0);
138
139impl Drop for StreamGuard
141{
142 fn drop(&mut self)
144 {
145 if AUDIO_GENERATION.load(Ordering::Relaxed) == self.generation
146 {
147 if let Ok(mut streams) = LOCAL_STREAMS.lock()
148 {
149 *streams = None;
150 }
151 }
152 }
153}
154
155fn find_device(mut devices: impl Iterator<Item = Device>) -> Option<Device>
157{
158 devices.find(|d|
159 {
160 if let Ok(desc) = d.description()
161 {
162 let name = desc.to_string().to_lowercase();
163 name.contains("pipewire") || name.contains("pulse")
164 } else { false }
165 })
166}
167
168fn configure_device(supported_configs: impl Iterator<Item = SupportedStreamConfigRange>, default_config: SupportedStreamConfig) -> StreamConfig
169{
170 supported_configs
171 .filter(|c| c.min_sample_rate() <= options::SAMPLE_RATE && c.max_sample_rate() >= options::SAMPLE_RATE)
172 .next()
173 .map(|c| c.with_sample_rate(options::SAMPLE_RATE))
174 .unwrap_or(default_config)
175 .into()
176}
177
178fn transmit_audio(encoder: &Encoder, frame: &[f32], buffer: &mut [u8], id: usize, socket: &UdpSocket)
179{
180 if let Ok(len) = encoder.encode_float(&frame, buffer)
182 {
183 voice::send(socket, VoicePacket
185 {
186 voice: Some(buffer[..len].to_vec()),
187 id: Some(id),
188
189 ..Default::default()
190 }, &chat_options::get_keys().unwrap()).unwrap();
191 }
192}
193
194pub fn listen_server_voice(id: usize, username: String, tx: Sender<ClientEvent>)
196{
197 options::set_seq(0);
199 options::set_server_seq(0);
200
201 let current_generation = AUDIO_GENERATION.fetch_add(1, Ordering::Relaxed) + 1;
203 let _guard = StreamGuard { generation: current_generation };
204
205 let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").expect("Binding UDP failed"));
207 socket.connect(chat_options::get_server_address()).expect("Connecting to server UDP failed");
208
209 socket.set_read_timeout(Some(Duration::from_millis(200))).expect("Setting socket timeout failed");
211
212 let host = cpal::default_host();
214
215 let stderr_gag = Gag::stderr().unwrap();
217
218 let input_device = find_device(host.input_devices().expect("No input device found"))
220 .or_else(|| host.default_input_device()).unwrap();
221
222 let output_device = find_device(host.output_devices().expect("No output device found"))
224 .or_else(|| host.default_output_device()).unwrap();
225
226 drop(stderr_gag);
228
229 voice::send(&socket, VoicePacket
231 {
232 id: Some(id),
233 ..Default::default()
234 }, &chat_options::get_keys().unwrap()).unwrap();
235
236 let input_config = configure_device(input_device.supported_input_configs().unwrap(), input_device.default_input_config().unwrap());
238
239 let output_config = configure_device(output_device.supported_output_configs().unwrap(), output_device.default_output_config().unwrap());
241
242 let opus_encoder = Encoder::new
244 (
245 <SampleRate as TryFrom<i32>>::try_from(options::SAMPLE_RATE as i32).unwrap(),
246 Channels::Mono,
247 Application::Voip
248 ).unwrap();
249
250 let mut input_accum: Vec<f32> = Vec::with_capacity(options::FRAME_SIZE * 2);
252 let mut encoded_buffer = [0u8; 1500]; let input_channels = input_config.channels as usize;
256 let input_source_rate = input_config.sample_rate as f32;
257 let input_target_rate = options::SAMPLE_RATE as f32;
258
259 let input_resample_step = input_source_rate / input_target_rate;
261 let mut input_resample_pos = 0.;
262
263 let gate_open = Arc::new(Mutex::new(false)); let preroll_buffer = Arc::new(Mutex::new(VecDeque::<Vec<f32>>::with_capacity(3))); let hold_frames_remaining = Arc::new(Mutex::new(0usize)); let mut denoiser = DenoiseState::new();
270 let mut denoise_buffer = [0.0f32; options::SAMPLE_RATE as usize / 100];
271
272 let send_socket = socket.clone();
274 let input_stream = input_device.build_input_stream(&input_config, move |data: &[f32], _: &_|
275 {
276 if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
278
279 let frames_in_buffer = data.len() / input_channels;
280
281 let current_hold = LOCAL_DISPLAY_HOLD.load(Ordering::Relaxed);
282 if current_hold > 0
283 {
284 LOCAL_DISPLAY_HOLD.store(current_hold.saturating_sub(frames_in_buffer), Ordering::Relaxed);
285 }
286
287 let get_mono_sample = |index: usize| -> f32
289 {
290 if index >= frames_in_buffer { return 0. }
291
292 let mut sum = 0.;
293 for c in 0..input_channels
294 {
295 sum += data[index * input_channels + c];
296 }
297
298 sum / input_channels as f32
299 };
300
301 while input_resample_pos < (frames_in_buffer as f32) - 1.
303 {
304 let idx = input_resample_pos.floor() as usize;
305 let frac = input_resample_pos - idx as f32;
306
307 let s0 = get_mono_sample(idx);
308 let s1 = get_mono_sample(idx + 1);
309
310 let interpolated = s0 + (s1 - s0) * frac;
311 input_accum.push(interpolated);
312
313 input_resample_pos += input_resample_step;
314 }
315
316 input_resample_pos -= frames_in_buffer as f32;
318
319 while input_accum.len() >= options::FRAME_SIZE
321 {
322 let mut frame: Vec<f32> = input_accum.drain(0..options::FRAME_SIZE).collect();
323
324 for chunk in frame.chunks_mut(options::SAMPLE_RATE as usize / 100)
326 {
327 if chunk.len() == options::SAMPLE_RATE as usize / 100
328 {
329 for sample in chunk.iter_mut()
331 {
332 *sample *= 32767.;
333 }
334
335 denoiser.process_frame(&mut denoise_buffer, chunk);
337
338 for (i, sample) in denoise_buffer.iter().enumerate()
340 {
341 chunk[i] = sample / 32767.;
342 }
343 }
344 }
345
346 let rms = (frame.iter().map(|&x| x * x).sum::<f32>() / frame.len() as f32 + 1e-10).sqrt(); let mut gate = gate_open.lock().unwrap();
349 let mut preroll = preroll_buffer.lock().unwrap();
350 let mut hold_frames = hold_frames_remaining.lock().unwrap();
351
352 if !*gate && rms > options::TRESHOLD_OPEN
354 {
355 *gate = true; for old_frame in preroll.iter()
359 {
360 transmit_audio(&opus_encoder, old_frame, &mut encoded_buffer, id, &send_socket);
361 }
362
363 preroll.clear();
364 *hold_frames = options::HOLD_FRAMES;
365 } else if *gate && rms < options::TRESHOLD_CLOSE
366 {
367 if *hold_frames > 0 {
369 *hold_frames -= 1;
370 } else {
372 *gate = false;
373 }
374 } else if *gate && rms >= options::TRESHOLD_CLOSE {
376 *hold_frames = options::HOLD_FRAMES;
377 }
378
379 if !*gate
381 {
382 preroll.push_back(frame.clone());
383 if preroll.len() > 3
384 {
385 preroll.pop_front();
386 }
387 }
388
389 if *gate
391 {
392 LOCAL_DISPLAY_HOLD.store((options::SAMPLE_RATE * options::DISPLAY_HOLD as u32 / 1000) as usize, Ordering::Relaxed);
393 transmit_audio(&opus_encoder, &frame, &mut encoded_buffer, id, &send_socket);
394 }
395 }
396 }, |_| {}, None).unwrap();
397
398 let output_channels = output_config.channels as usize;
400 let output_source_rate = options::SAMPLE_RATE as f32;
401 let output_target_rate = output_config.sample_rate as f32;
402
403 let output_resample_step = output_source_rate / output_target_rate;
405
406 let output_stream = output_device.build_output_stream(&output_config, move |data: &mut [f32], _: &_|
408 {
409 if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
411
412 data.fill(0.);
414
415 let frames_to_write = data.len() / output_channels;
416 let mut consumers_guard = CONSUMERS.lock().unwrap();
417
418 for i in 0..frames_to_write
419 {
420 let mut mixed_sample = 0.;
421 let mut active_speakers = 0;
422
423 for (stream, _) in consumers_guard.values_mut()
424 {
425 while stream.resample_pos >= 1.
427 {
428 stream.current_sample = stream.next_sample;
429 stream.next_sample = stream.consumer.try_pop().unwrap_or(0.); stream.resample_pos -= 1.;
431 }
432
433 let interpolated = stream.current_sample + (stream.next_sample - stream.current_sample) * stream.resample_pos;
435 stream.resample_pos += output_resample_step; if interpolated.abs() > options::MIXING_TRESHOLD
439 {
440 stream.activity_hold = options::ACTIVITY_HOLD; stream.display_hold = (options::SAMPLE_RATE * options::DISPLAY_HOLD as u32 / 1000) as usize; }
443
444 if stream.activity_hold > 0
445 {
446 mixed_sample += interpolated;
448 active_speakers += 1;
449 stream.activity_hold -= 1;
450 }
451
452 if stream.display_hold > 0
454 {
455 stream.display_hold -= 1;
456 }
457 }
458
459 if active_speakers > 1
461 {
462 mixed_sample /= (active_speakers as f32).sqrt();
463 }
464
465 sfx::play_effects(&mut mixed_sample);
467
468 mixed_sample = mixed_sample.tanh();
470
471 for channel in 0..output_channels
473 {
474 data[i * output_channels + channel] = mixed_sample;
475 }
476 }
477 }, |_| {}, None).unwrap();
478
479 input_stream.play().unwrap(); output_stream.play().unwrap(); *LOCAL_STREAMS.lock().unwrap() = Some(LocalStream
485 {
486 _input: input_stream,
487 _output: output_stream,
488 });
489
490 sfx::clear_effects();
492 sfx::queue_effect(SoundEffect::Join);
493
494 let vad_socket = socket.clone();
496 thread::spawn(move ||
497 {
498 let mut iteration_counter = 0u8;
499
500 loop
501 {
502 if !options::get_use_voice()
504 {
505 tx.send(ClientEvent::VoiceActivity(Vec::new())).unwrap(); return;
507 }
508
509 iteration_counter += 1; display_active_speakers(&username, &tx);
513
514 if iteration_counter == 10
516 {
517 voice::send(&vad_socket, VoicePacket
518 {
519 id: Some(id),
520 code: Some(VoiceCode::PING),
521 timestamp: Some(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()),
522
523 ..Default::default()
524 }, &chat_options::get_keys().unwrap()).unwrap();
525
526 iteration_counter = 0;
528 }
529
530 thread::sleep(Duration::from_millis(100));
531 }
532 });
533
534 let mut decoded_buffer = [0.0f32; options::FRAME_SIZE];
536
537 loop
538 {
539 let (network_buffer, _) = match voice::receive(&socket)
541 {
542 Some(r) => r,
543 None => {
545 if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
547
548 sfx::queue_effect(SoundEffect::Leave);
550
551 while sfx::is_playing()
553 {
554 if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
556
557 thread::sleep(Duration::from_millis(50));
558 }
559
560 return;
561 }
562 };
563
564 if network_buffer.seq <= options::get_server_seq() { continue; } options::set_server_seq(network_buffer.seq); let sender_id = match network_buffer.id
570 {
571 Some(id) => id,
572 None => continue
573 };
574
575 if !CONSUMERS.lock().unwrap().contains_key(&sender_id)
577 {
578 add_consumer(sender_id, network_buffer.username.unwrap());
579 }
580
581 if let Some((stream, peer)) = CONSUMERS.lock().unwrap().get_mut(&sender_id)
582 {
583 if let Some(code) = network_buffer.code && let Some(timestamp) = network_buffer.timestamp
585 {
586 match code
587 {
588 VoiceCode::PING =>
590 {
591 voice::send(&socket, VoicePacket
593 {
594 id: Some(id),
595 target_id: Some(sender_id),
596 code: Some(VoiceCode::PONG),
597 timestamp: Some(timestamp),
598
599 ..Default::default()
600 }, &chat_options::get_keys().unwrap()).unwrap();
601 },
602
603 VoiceCode::PONG =>
605 {
606 let latency = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis().saturating_sub(timestamp);
608
609 stream.latencies.push_back(latency);
611 if stream.latencies.len() > 20 {
613 stream.latencies.pop_front();
614 }
615
616 let sum: u128 = stream.latencies.iter().sum();
618 if !stream.latencies.is_empty()
619 {
620 stream.avg_latency = sum / stream.latencies.len() as u128;
622 }
623 }
624 }
625 }
626
627 if network_buffer.voice.is_none() { continue; }
629
630 if let Ok(decoded_len) = peer.decoder.decode_float(network_buffer.voice.as_deref(), &mut decoded_buffer[..], false)
632 {
633 peer.producer.push_slice(&decoded_buffer[..decoded_len]);
635 }
636 }
637 }
638}
639
640pub fn remove_consumer(id: &usize)
641{
642 if CONSUMERS.lock().unwrap().remove(id).is_some()
643 {
644 sfx::queue_effect(SoundEffect::Leave);
646 }
647}
648
649pub fn remove_all_consumers()
650{
651 CONSUMERS.lock().unwrap().clear();
652
653 sfx::queue_effect(SoundEffect::Join);
655}
656
657pub fn add_consumer(id: usize, username: String)
658{
659 let decoder = Decoder::new
661 (
662 <SampleRate as TryFrom<i32>>::try_from(options::SAMPLE_RATE as i32).unwrap(),
663 Channels::Mono,
664 ).unwrap();
665
666 let rb = HeapRb::<f32>::new(options::FRAME_SIZE * options::JITTER_BUFFER_SIZE);
668 let (producer, mut consumer) = rb.split();
669
670 let first_sample = consumer.try_pop().unwrap_or(0.0);
671
672 CONSUMERS.lock().unwrap().insert(id, (RemoteStream
674 {
675 consumer: consumer,
676 resample_pos: 0.,
677 current_sample: 0.,
678 next_sample: first_sample,
679 activity_hold: 0,
680 display_hold: 0,
681 username: username,
682 latencies: VecDeque::with_capacity(20),
683 avg_latency: 0,
684 }, PeerData
685 {
686 decoder: decoder,
687 producer: producer,
688 }));
689
690 sfx::queue_effect(SoundEffect::Join);
692}
693
694fn display_active_speakers(local_username: &str, tx: &Sender<ClientEvent>)
695{
696 let mut users_to_display = Vec::new();
698
699 let local_speaking = LOCAL_DISPLAY_HOLD.load(Ordering::Relaxed) > 0;
701 users_to_display.push(VoiceUser
702 {
703 id: 0,
704 username: local_username.to_string(),
705 is_speaking: local_speaking,
706 latency: 0,
707 is_local: true,
708 });
709
710 if let Ok(consumers) = CONSUMERS.try_lock()
712 {
713 for (id, (stream, _)) in consumers.iter()
714 {
715 users_to_display.push(VoiceUser
716 {
717 id: *id,
718 username: stream.username.clone(),
719 is_speaking: stream.display_hold > 0, latency: stream.avg_latency,
721 is_local: false,
722 });
723 }
724 }
725
726 if users_to_display.len() > 1
728 {
729 users_to_display[1..].sort_by_key(|u| u.id);
730 }
731
732 tx.send(ClientEvent::VoiceActivity(users_to_display)).unwrap();
734}