why2_chat/network/voice/client/
mod.rs

1/*
2This is part of WHY2
3Copyright (C) 2022-2026 Václav Šmejkal
4
5This program is free software: you can redistribute it and/or modify
6it under the terms of the GNU General Public License as published by
7the Free Software Foundation, either version 3 of the License, or
8(at your option) any later version.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program.  If not, see <https://www.gnu.org/licenses/>.
17*/
18
19//MODULES
20pub mod sfx;
21
22use std::
23{
24    thread,
25    net::UdpSocket,
26    collections::{ HashMap, VecDeque },
27    time::
28    {
29        SystemTime,
30        Duration,
31        UNIX_EPOCH,
32    },
33    sync::
34    {
35        Arc,
36        Mutex,
37        LazyLock,
38        mpsc::Sender,
39        atomic::{ AtomicUsize, Ordering },
40    },
41};
42
43use cpal::
44{
45    Device,
46    Stream,
47    StreamConfig,
48    SupportedStreamConfig,
49    SupportedStreamConfigRange,
50    traits::
51    {
52        DeviceTrait,
53        HostTrait,
54        StreamTrait,
55    },
56};
57
58use audiopus::
59{
60    Channels,
61    SampleRate,
62    Application,
63    TryFrom,
64    coder::{ Encoder, Decoder },
65};
66
67use ringbuf::
68{
69    HeapRb,
70    HeapCons,
71    HeapProd,
72    traits::
73    {
74        Split,
75        Producer,
76        Consumer,
77    },
78};
79
80use nnnoiseless::DenoiseState;
81
82use gag::Gag;
83
84use crate::
85{
86    options as chat_options,
87    network::
88    {
89        client::{ ClientEvent, VoiceUser },
90        voice::
91        {
92            self,
93            options,
94            VoiceCode,
95            VoicePacket,
96            client::sfx::SoundEffect,
97        },
98    },
99};
100
101//STRUCTS
102struct LocalStream
103{
104    _input: Stream,
105    _output: Stream,
106}
107
108struct StreamGuard
109{
110    generation: usize,
111}
112
113struct RemoteStream
114{
115    consumer: HeapCons<f32>,   //RINGBUFFER READER
116    resample_pos: f32,         //POSITION IN BETWEEN SAMPLES
117    current_sample: f32,       //CURRENT SAMPLE FOR INTERPOLATION
118    next_sample: f32,          //NEXT SAMPLE FOR INTERPOLATION
119    activity_hold: usize,      //ACTIVITY TIMER
120    display_hold: usize,       //ACTIVITY WINDOW TIMER
121    username: String,          //USERNAME
122    latencies: VecDeque<u128>, //HISTORY OF LATENCIES
123    avg_latency: u128,         //AVERAGE LATENCY TO DISPLAY
124}
125
126pub struct PeerData
127{
128    decoder: Decoder,        //DECODER
129    producer: HeapProd<f32>, //RINGBUFFER WRITER
130}
131
132//GLOBAL VARIABLES
133static LOCAL_STREAMS: LazyLock<Mutex<Option<LocalStream>>> = LazyLock::new(|| Mutex::new(None));
134static CONSUMERS: LazyLock<Mutex<HashMap<usize, (RemoteStream, PeerData)>>> = LazyLock::new(|| Mutex::new(HashMap::new())); //OTHER CLIENTS
135
136static LOCAL_DISPLAY_HOLD: AtomicUsize = AtomicUsize::new(0);
137static AUDIO_GENERATION: AtomicUsize = AtomicUsize::new(0);
138
139//IMPLEMENTATIONS
140impl Drop for StreamGuard
141{
142    //CLEAR STREAMS
143    fn drop(&mut self)
144    {
145        if AUDIO_GENERATION.load(Ordering::Relaxed) == self.generation
146        {
147            if let Ok(mut streams) = LOCAL_STREAMS.lock()
148            {
149                *streams = None;
150            }
151        }
152    }
153}
154
155//PRIVATE
156fn find_device(mut devices: impl Iterator<Item = Device>) -> Option<Device>
157{
158    devices.find(|d|
159    {
160        if let Ok(desc) = d.description()
161        {
162            let name = desc.to_string().to_lowercase();
163            name.contains("pipewire") || name.contains("pulse")
164        } else { false }
165    })
166}
167
168fn configure_device(supported_configs: impl Iterator<Item = SupportedStreamConfigRange>, default_config: SupportedStreamConfig) -> StreamConfig
169{
170    supported_configs
171        .filter(|c| c.min_sample_rate() <= options::SAMPLE_RATE && c.max_sample_rate() >= options::SAMPLE_RATE)
172        .next()
173        .map(|c| c.with_sample_rate(options::SAMPLE_RATE))
174        .unwrap_or(default_config)
175        .into()
176}
177
178fn transmit_audio(encoder: &Encoder, frame: &[f32], buffer: &mut [u8], id: usize, socket: &UdpSocket)
179{
180    //ENCODE (IGNORE ERRORS)
181    if let Ok(len) = encoder.encode_float(&frame, buffer)
182    {
183        //TRANSMIT
184        voice::send(socket, VoicePacket
185        {
186            voice: Some(buffer[..len].to_vec()),
187            id: Some(id),
188
189            ..Default::default()
190        }, &chat_options::get_keys().unwrap()).unwrap();
191    }
192}
193
194//PUBLIC
195pub fn listen_server_voice(id: usize, username: String, tx: Sender<ClientEvent>)
196{
197    //RESET SEQs
198    options::set_seq(0);
199    options::set_server_seq(0);
200
201    //DUPLICATE STREAM GUARDS
202    let current_generation = AUDIO_GENERATION.fetch_add(1, Ordering::Relaxed) + 1;
203    let _guard = StreamGuard { generation: current_generation };
204
205    //CONNECT
206    let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").expect("Binding UDP failed"));
207    socket.connect(chat_options::get_server_address()).expect("Connecting to server UDP failed");
208
209    //SET SOCKET TIMEOUT
210    socket.set_read_timeout(Some(Duration::from_millis(200))).expect("Setting socket timeout failed");
211
212    //INIT AUDIO HOST
213    let host = cpal::default_host();
214
215    //SUPPRESS STDERR (AVOID ALSA ERRORS)
216    let stderr_gag = Gag::stderr().unwrap();
217
218    //FIND INPUT DEVICE
219    let input_device = find_device(host.input_devices().expect("No input device found"))
220        .or_else(|| host.default_input_device()).unwrap();
221
222    //FIND OUTPUT DEVICE
223    let output_device = find_device(host.output_devices().expect("No output device found"))
224        .or_else(|| host.default_output_device()).unwrap();
225
226    //DISABLE SUPPRESSION
227    drop(stderr_gag);
228
229    //SEND HELLO PACKET
230    voice::send(&socket, VoicePacket
231    {
232        id: Some(id),
233        ..Default::default()
234    }, &chat_options::get_keys().unwrap()).unwrap();
235
236    //CONFIGURE CPAL INPUT
237    let input_config = configure_device(input_device.supported_input_configs().unwrap(), input_device.default_input_config().unwrap());
238
239    //CONFIGURE CPAL OUTPUT
240    let output_config = configure_device(output_device.supported_output_configs().unwrap(), output_device.default_output_config().unwrap());
241
242    //PREPARE OPUS ENCODER
243    let opus_encoder = Encoder::new
244    (
245        <SampleRate as TryFrom<i32>>::try_from(options::SAMPLE_RATE as i32).unwrap(),
246        Channels::Mono,
247        Application::Voip
248    ).unwrap();
249
250    //INPUT BUFFERS
251    let mut input_accum: Vec<f32> = Vec::with_capacity(options::FRAME_SIZE * 2);
252    let mut encoded_buffer = [0u8; 1500]; //ALLOCATE BUFFER TO STANDARD MTU
253
254    //INPUT RESAMPLING
255    let input_channels = input_config.channels as usize;
256    let input_source_rate = input_config.sample_rate as f32;
257    let input_target_rate = options::SAMPLE_RATE as f32;
258
259    //INPUT INTERPOLATION
260    let input_resample_step = input_source_rate / input_target_rate;
261    let mut input_resample_pos = 0.;
262
263    //VAD
264    let gate_open = Arc::new(Mutex::new(false)); //NOISE GATE
265    let preroll_buffer = Arc::new(Mutex::new(VecDeque::<Vec<f32>>::with_capacity(3))); //PRE-ROLL BUFFER
266    let hold_frames_remaining = Arc::new(Mutex::new(0usize)); //HOLD TIME
267
268    //NOISE REDUCTION
269    let mut denoiser = DenoiseState::new();
270    let mut denoise_buffer = [0.0f32; options::SAMPLE_RATE as usize / 100];
271
272    //CONFIGURE INPUT STREAM
273    let send_socket = socket.clone();
274    let input_stream = input_device.build_input_stream(&input_config, move |data: &[f32], _: &_|
275    {
276        //CHECK GENERATION
277        if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
278
279        let frames_in_buffer = data.len() / input_channels;
280
281        let current_hold = LOCAL_DISPLAY_HOLD.load(Ordering::Relaxed);
282        if current_hold > 0
283        {
284             LOCAL_DISPLAY_HOLD.store(current_hold.saturating_sub(frames_in_buffer), Ordering::Relaxed);
285        }
286
287        //MONO DOWNMIX CLOSURE
288        let get_mono_sample = |index: usize| -> f32
289        {
290            if index >= frames_in_buffer { return 0. }
291
292            let mut sum = 0.;
293            for c in 0..input_channels
294            {
295                sum += data[index * input_channels + c];
296            }
297
298            sum / input_channels as f32
299        };
300
301        //RESAMPLE LOOP
302        while input_resample_pos < (frames_in_buffer as f32) - 1.
303        {
304            let idx = input_resample_pos.floor() as usize;
305            let frac = input_resample_pos - idx as f32;
306
307            let s0 = get_mono_sample(idx);
308            let s1 = get_mono_sample(idx + 1);
309
310            let interpolated = s0 + (s1 - s0) * frac;
311            input_accum.push(interpolated);
312
313            input_resample_pos += input_resample_step;
314        }
315
316        //ADJUST POSITION FOR NEXT BUFFER
317        input_resample_pos -= frames_in_buffer as f32;
318
319        //PROCESS
320        while input_accum.len() >= options::FRAME_SIZE
321        {
322            let mut frame: Vec<f32> = input_accum.drain(0..options::FRAME_SIZE).collect();
323
324            //NOISE REDUCTION
325            for chunk in frame.chunks_mut(options::SAMPLE_RATE as usize / 100)
326            {
327                if chunk.len() == options::SAMPLE_RATE as usize / 100
328                {
329                    //SCALE UP
330                    for sample in chunk.iter_mut()
331                    {
332                        *sample *= 32767.;
333                    }
334
335                    //PROCESS NOISE
336                    denoiser.process_frame(&mut denoise_buffer, chunk);
337
338                    //SCALE DOWN & COPY
339                    for (i, sample) in denoise_buffer.iter().enumerate()
340                    {
341                        chunk[i] = sample / 32767.;
342                    }
343                }
344            }
345
346            //VAD
347            let rms = (frame.iter().map(|&x| x * x).sum::<f32>() / frame.len() as f32 + 1e-10).sqrt(); //RMS CALCULATION (+ SMALL BIAS)
348            let mut gate = gate_open.lock().unwrap();
349            let mut preroll = preroll_buffer.lock().unwrap();
350            let mut hold_frames = hold_frames_remaining.lock().unwrap();
351
352            //HYSTERESIS
353            if !*gate && rms > options::TRESHOLD_OPEN
354            {
355                *gate = true; //SPEAKING
356
357                //SEND STORED FRAMES
358                for old_frame in preroll.iter()
359                {
360                    transmit_audio(&opus_encoder, old_frame, &mut encoded_buffer, id, &send_socket);
361                }
362
363                preroll.clear();
364                *hold_frames = options::HOLD_FRAMES;
365            } else if *gate && rms < options::TRESHOLD_CLOSE
366            {
367                if *hold_frames > 0 //SILENT FRAME, DECREMENT
368                {
369                    *hold_frames -= 1;
370                } else //HOLD TIME EXPIRED, CLOSE GATE
371                {
372                    *gate = false;
373                }
374            } else if *gate && rms >= options::TRESHOLD_CLOSE //SPEAKING CONTINUES, RESET HOLD TIMER
375            {
376                *hold_frames = options::HOLD_FRAMES;
377            }
378
379            //STORE TO PRE-ROLL BUFFER (MAX 3 FRAMES)
380            if !*gate
381            {
382                preroll.push_back(frame.clone());
383                if preroll.len() > 3
384                {
385                    preroll.pop_front();
386                }
387            }
388
389            //TRANSMIT ONLY IF GATE IS OPEN
390            if *gate
391            {
392                LOCAL_DISPLAY_HOLD.store((options::SAMPLE_RATE * options::DISPLAY_HOLD as u32 / 1000) as usize, Ordering::Relaxed);
393                transmit_audio(&opus_encoder, &frame, &mut encoded_buffer, id, &send_socket);
394            }
395        }
396    }, |_| {}, None).unwrap();
397
398    //OUTPUT RESAMPLING
399    let output_channels = output_config.channels as usize;
400    let output_source_rate = options::SAMPLE_RATE as f32;
401    let output_target_rate = output_config.sample_rate as f32;
402
403    //OUTPUT INTERPOLATION
404    let output_resample_step = output_source_rate / output_target_rate;
405
406    //CONFIGURE OUTPUT STREAM
407    let output_stream = output_device.build_output_stream(&output_config, move |data: &mut [f32], _: &_|
408    {
409        //CHECK GENERATION
410        if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
411
412        //CLEAR OUTPUT BUFFER
413        data.fill(0.);
414
415        let frames_to_write = data.len() / output_channels;
416        let mut consumers_guard = CONSUMERS.lock().unwrap();
417
418        for i in 0..frames_to_write
419        {
420            let mut mixed_sample = 0.;
421            let mut active_speakers = 0;
422
423            for (stream, _) in consumers_guard.values_mut()
424            {
425                //RESAMPLE LOOP
426                while stream.resample_pos >= 1.
427                {
428                    stream.current_sample = stream.next_sample;
429                    stream.next_sample = stream.consumer.try_pop().unwrap_or(0.); //SILENCE ON UNDERRUN
430                    stream.resample_pos -= 1.;
431                }
432
433                //LINEAR INTERPOLATION
434                let interpolated = stream.current_sample + (stream.next_sample - stream.current_sample) * stream.resample_pos;
435                stream.resample_pos += output_resample_step; //MOVE RESAMPLER POSITION FOR THIS CLIENT
436
437                //ACTIVE SPEAKER DETECTION
438                if interpolated.abs() > options::MIXING_TRESHOLD
439                {
440                    stream.activity_hold = options::ACTIVITY_HOLD; //SET TIMER TO ~100ms
441                    stream.display_hold = (options::SAMPLE_RATE * options::DISPLAY_HOLD as u32 / 1000) as usize; //SET DISPLAY FOR ~1000ms
442                }
443
444                if stream.activity_hold > 0
445                {
446                    //MIX
447                    mixed_sample += interpolated;
448                    active_speakers += 1;
449                    stream.activity_hold -= 1;
450                }
451
452                //DECREMENT DISPLAY TIMER
453                if stream.display_hold > 0
454                {
455                    stream.display_hold -= 1;
456                }
457            }
458
459            //NORMALIZATION
460            if active_speakers > 1
461            {
462                mixed_sample /= (active_speakers as f32).sqrt();
463            }
464
465            //MIX EFFECTS
466            sfx::play_effects(&mut mixed_sample);
467
468            //SOFT CLIPPING (HYPERBOLIC TANGENT)
469            mixed_sample = mixed_sample.tanh();
470
471            //WRITE SAMPLE TO ALL CHANNELS
472            for channel in 0..output_channels
473            {
474                data[i * output_channels + channel] = mixed_sample;
475            }
476        }
477    }, |_| {}, None).unwrap();
478
479    //RUN STREAMS
480    input_stream.play().unwrap();  //INPUT
481    output_stream.play().unwrap(); //OUTPUT
482
483    //MOVE STREAMS TO GLOBAL STORAGE
484    *LOCAL_STREAMS.lock().unwrap() = Some(LocalStream
485    {
486        _input: input_stream,
487        _output: output_stream,
488    });
489
490    //PLAY JOIN SOUND
491    sfx::clear_effects();
492    sfx::queue_effect(SoundEffect::Join);
493
494    //START VOICE ACTIVITY DISPLAY & PING THREAD
495    let vad_socket = socket.clone();
496    thread::spawn(move ||
497    {
498        let mut iteration_counter = 0u8;
499
500        loop
501        {
502            //QUIT ON /leave
503            if !options::get_use_voice()
504            {
505                tx.send(ClientEvent::VoiceActivity(Vec::new())).unwrap(); //CLEAR WINDOW
506                return;
507            }
508
509            iteration_counter += 1; //INCREMENT
510
511            //SHOW VOICE ACTIVITY
512            display_active_speakers(&username, &tx);
513
514            //SEND PING PACKET
515            if iteration_counter == 10
516            {
517                voice::send(&vad_socket, VoicePacket
518                {
519                    id: Some(id),
520                    code: Some(VoiceCode::PING),
521                    timestamp: Some(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()),
522
523                    ..Default::default()
524                }, &chat_options::get_keys().unwrap()).unwrap();
525
526                //RESET COUNTER
527                iteration_counter = 0;
528            }
529
530            thread::sleep(Duration::from_millis(100));
531        }
532    });
533
534    //OUTPUT BUFFERS
535    let mut decoded_buffer = [0.0f32; options::FRAME_SIZE];
536
537    loop
538    {
539        //READ
540        let (network_buffer, _) = match voice::receive(&socket)
541        {
542            Some(r) => r,
543            None => //READING FAILED, TIMEOUT OR CRASH PROBABLY
544            {
545                //CHECK GENERATION
546                if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
547
548                //PLAY LEAVE SOUND EFFECT
549                sfx::queue_effect(SoundEffect::Leave);
550
551                //FINISH SOUND EFFECTS
552                while sfx::is_playing()
553                {
554                    //CHECK GENERATION AGAIN AHAHAHHAHAAH
555                    if AUDIO_GENERATION.load(Ordering::Relaxed) != current_generation { return; }
556
557                    thread::sleep(Duration::from_millis(50));
558                }
559
560                return;
561            }
562        };
563
564        //VERIFY SERVER SEQ
565        if network_buffer.seq <= options::get_server_seq() { continue; } //INGORE INVALID SEQs
566        options::set_server_seq(network_buffer.seq); //SET SERVER SEQ
567
568        //GET ID OF SENDER
569        let sender_id = match network_buffer.id
570        {
571            Some(id) => id,
572            None => continue
573        };
574
575        //CREATE NEW CLIENT CONTEXT ON UNKNOWN CLIENT
576        if !CONSUMERS.lock().unwrap().contains_key(&sender_id)
577        {
578            add_consumer(sender_id, network_buffer.username.unwrap());
579        }
580
581        if let Some((stream, peer)) = CONSUMERS.lock().unwrap().get_mut(&sender_id)
582        {
583            //PING/PONG
584            if let Some(code) = network_buffer.code && let Some(timestamp) = network_buffer.timestamp
585            {
586                match code
587                {
588                    //PING RECEIVED, SEND BACK
589                    VoiceCode::PING =>
590                    {
591                        //SEND PONG PACKET
592                        voice::send(&socket, VoicePacket
593                        {
594                            id: Some(id),
595                            target_id: Some(sender_id),
596                            code: Some(VoiceCode::PONG),
597                            timestamp: Some(timestamp),
598
599                            ..Default::default()
600                        }, &chat_options::get_keys().unwrap()).unwrap();
601                    },
602
603                    //PING FORWARDED BACK, CALCULATE LATENCY
604                    VoiceCode::PONG =>
605                    {
606                        //CALCULATE LATENCY
607                        let latency = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis().saturating_sub(timestamp);
608
609                        //STORE LATENCY TO BUFFER
610                        stream.latencies.push_back(latency);
611                        if stream.latencies.len() > 20 //STORE ONLY LATEST 20 LATENCIES
612                        {
613                            stream.latencies.pop_front();
614                        }
615
616                        //CALCULATE AVERAGE LATENCY
617                        let sum: u128 = stream.latencies.iter().sum();
618                        if !stream.latencies.is_empty()
619                        {
620                            //STORE IN AVG_LATENCY
621                            stream.avg_latency = sum / stream.latencies.len() as u128;
622                        }
623                    }
624                }
625            }
626
627            //CHECK FOR VOICE IN PACKET
628            if network_buffer.voice.is_none() { continue; }
629
630            //DECODE
631            if let Ok(decoded_len) = peer.decoder.decode_float(network_buffer.voice.as_deref(), &mut decoded_buffer[..], false)
632            {
633                //PUSH TO RINGBUFFER
634                peer.producer.push_slice(&decoded_buffer[..decoded_len]);
635            }
636        }
637    }
638}
639
640pub fn remove_consumer(id: &usize)
641{
642    if CONSUMERS.lock().unwrap().remove(id).is_some()
643    {
644        //PLAY LEAVE SOUND EFFECT
645        sfx::queue_effect(SoundEffect::Leave);
646    }
647}
648
649pub fn remove_all_consumers()
650{
651    CONSUMERS.lock().unwrap().clear();
652
653    //PLAY JOIN SOUND EFFECT (THIS IS CALLED ON CHANNEL CHANGE)
654    sfx::queue_effect(SoundEffect::Join);
655}
656
657pub fn add_consumer(id: usize, username: String)
658{
659    //OPUS DECODER
660    let decoder = Decoder::new
661    (
662        <SampleRate as TryFrom<i32>>::try_from(options::SAMPLE_RATE as i32).unwrap(),
663        Channels::Mono,
664    ).unwrap();
665
666    //JITTER BUFFER
667    let rb = HeapRb::<f32>::new(options::FRAME_SIZE * options::JITTER_BUFFER_SIZE);
668    let (producer, mut consumer) = rb.split();
669
670    let first_sample = consumer.try_pop().unwrap_or(0.0);
671
672    //INSERT TO SHARED AUDIO THREAD MAP
673    CONSUMERS.lock().unwrap().insert(id, (RemoteStream
674    {
675        consumer: consumer,
676        resample_pos: 0.,
677        current_sample: 0.,
678        next_sample: first_sample,
679        activity_hold: 0,
680        display_hold: 0,
681        username: username,
682        latencies: VecDeque::with_capacity(20),
683        avg_latency: 0,
684    }, PeerData
685    {
686        decoder: decoder,
687        producer: producer,
688    }));
689
690    //PLAY JOIN SOUND EFFECT
691    sfx::queue_effect(SoundEffect::Join);
692}
693
694fn display_active_speakers(local_username: &str, tx: &Sender<ClientEvent>)
695{
696    //ALL USERS
697    let mut users_to_display = Vec::new();
698
699    //ADD LOCAL CLIENT
700    let local_speaking = LOCAL_DISPLAY_HOLD.load(Ordering::Relaxed) > 0;
701    users_to_display.push(VoiceUser
702    {
703        id: 0,
704        username: local_username.to_string(),
705        is_speaking: local_speaking,
706        latency: 0,
707        is_local: true,
708    });
709
710    //COLLECT OTHER USERS
711    if let Ok(consumers) = CONSUMERS.try_lock()
712    {
713        for (id, (stream, _)) in consumers.iter()
714        {
715            users_to_display.push(VoiceUser
716            {
717                id: *id,
718                username: stream.username.clone(),
719                is_speaking: stream.display_hold > 0, //SPEAKING
720                latency: stream.avg_latency,
721                is_local: false,
722            });
723        }
724    }
725
726    //SORT
727    if users_to_display.len() > 1
728    {
729        users_to_display[1..].sort_by_key(|u| u.id);
730    }
731
732    //DISPLAY
733    tx.send(ClientEvent::VoiceActivity(users_to_display)).unwrap();
734}