use crate::call::domain::LegId;
use crate::media::conference_mixer::{AudioFrame, ConferenceAudioMixer};
use anyhow::{Result, anyhow};
use audio_codec::CodecType;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use tracing::info;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConferenceId(pub String);
impl From<String> for ConferenceId {
fn from(s: String) -> Self {
Self(s)
}
}
impl From<&str> for ConferenceId {
fn from(s: &str) -> Self {
Self(s.to_string())
}
}
#[derive(Debug, Clone)]
pub struct ConferenceParticipant {
pub leg_id: LegId,
pub muted: bool,
pub joined_at: std::time::Instant,
}
impl ConferenceParticipant {
pub fn new(leg_id: LegId) -> Self {
Self {
leg_id,
muted: false,
joined_at: std::time::Instant::now(),
}
}
}
#[derive(Debug, Clone)]
pub struct ConferenceRoom {
pub id: ConferenceId,
pub participants: HashMap<LegId, ConferenceParticipant>,
pub created_at: std::time::Instant,
pub max_participants: Option<usize>,
pub locked: bool,
}
impl ConferenceRoom {
pub fn new(id: ConferenceId, max_participants: Option<usize>) -> Self {
Self {
id,
participants: HashMap::new(),
created_at: std::time::Instant::now(),
max_participants,
locked: false,
}
}
pub fn add_participant(&mut self, leg_id: LegId) -> Result<()> {
if let Some(max) = self.max_participants
&& self.participants.len() >= max
{
return Err(anyhow!("Conference is at maximum capacity"));
}
if self.participants.contains_key(&leg_id) {
return Err(anyhow!("Leg {} already in conference", leg_id));
}
let participant = ConferenceParticipant::new(leg_id.clone());
self.participants.insert(leg_id.clone(), participant);
info!(conf_id = %self.id.0, leg_id = %leg_id, "Participant added to conference");
Ok(())
}
pub fn remove_participant(&mut self, leg_id: &LegId) -> Result<()> {
if self.participants.remove(leg_id).is_none() {
return Err(anyhow!("Leg {} is not in conference", leg_id));
}
info!(conf_id = %self.id.0, leg_id = %leg_id, "Participant removed from conference");
Ok(())
}
pub fn mute_participant(&mut self, leg_id: &LegId) -> Result<()> {
let participant = self
.participants
.get_mut(leg_id)
.ok_or_else(|| anyhow!("Leg {} is not in conference", leg_id))?;
participant.muted = true;
info!(conf_id = %self.id.0, leg_id = %leg_id, "Participant muted");
Ok(())
}
pub fn unmute_participant(&mut self, leg_id: &LegId) -> Result<()> {
let participant = self
.participants
.get_mut(leg_id)
.ok_or_else(|| anyhow!("Leg {} is not in conference", leg_id))?;
participant.muted = false;
info!(conf_id = %self.id.0, leg_id = %leg_id, "Participant unmuted");
Ok(())
}
pub fn participant_count(&self) -> usize {
self.participants.len()
}
pub fn is_empty(&self) -> bool {
self.participants.is_empty()
}
pub fn participant_ids(&self) -> Vec<LegId> {
self.participants.keys().cloned().collect()
}
pub fn lock(&mut self) {
self.locked = true;
}
pub fn unlock(&mut self) {
self.locked = false;
}
}
#[derive(Clone)]
pub struct ParticipantChannels {
pub input_tx: mpsc::Sender<AudioFrame>,
}
impl ParticipantChannels {
pub fn new(input_tx: mpsc::Sender<AudioFrame>) -> Self {
Self { input_tx }
}
}
#[derive(Clone)]
pub struct ConferenceManager {
conferences: Arc<RwLock<HashMap<ConferenceId, ConferenceRoom>>>,
leg_to_conference: Arc<RwLock<HashMap<LegId, ConferenceId>>>,
audio_mixers: Arc<RwLock<HashMap<ConferenceId, Arc<ConferenceAudioMixer>>>>,
participant_channels: Arc<RwLock<HashMap<LegId, ParticipantChannels>>>,
participant_output_rxs: Arc<RwLock<HashMap<LegId, mpsc::Receiver<AudioFrame>>>>,
}
impl ConferenceManager {
pub fn new() -> Self {
Self {
conferences: Arc::new(RwLock::new(HashMap::new())),
leg_to_conference: Arc::new(RwLock::new(HashMap::new())),
audio_mixers: Arc::new(RwLock::new(HashMap::new())),
participant_channels: Arc::new(RwLock::new(HashMap::new())),
participant_output_rxs: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn create_conference(
&self,
conf_id: ConferenceId,
max_participants: Option<usize>,
) -> Result<ConferenceRoom> {
let mut conferences = self.conferences.write().await;
if conferences.contains_key(&conf_id) {
return Err(anyhow!("Conference {} already exists", conf_id.0));
}
let conference = ConferenceRoom::new(conf_id.clone(), max_participants);
conferences.insert(conf_id.clone(), conference.clone());
let mut audio_mixers = self.audio_mixers.write().await;
let mixer = Arc::new(ConferenceAudioMixer::new(conf_id.0.clone(), 8000));
mixer.start();
audio_mixers.insert(conf_id.clone(), mixer);
info!(conf_id = %conf_id.0, "Conference created with local audio mixing");
Ok(conference)
}
pub async fn get_conference(&self, conf_id: &ConferenceId) -> Option<ConferenceRoom> {
let conferences = self.conferences.read().await;
conferences.get(conf_id).cloned()
}
pub async fn destroy_conference(&self, conf_id: &ConferenceId) -> Result<()> {
let mut audio_mixers = self.audio_mixers.write().await;
if let Some(mixer) = audio_mixers.remove(conf_id) {
mixer.stop().await;
}
let mut conferences = self.conferences.write().await;
let mut leg_map = self.leg_to_conference.write().await;
let mut participant_channels = self.participant_channels.write().await;
let mut participant_output_rxs = self.participant_output_rxs.write().await;
if let Some(conf) = conferences.get(conf_id) {
for leg_id in conf.participant_ids() {
leg_map.remove(&leg_id);
participant_channels.remove(&leg_id);
participant_output_rxs.remove(&leg_id);
}
}
conferences
.remove(conf_id)
.ok_or_else(|| anyhow!("Conference {} not found", conf_id.0))?;
info!(conf_id = %conf_id.0, "Conference destroyed");
Ok(())
}
pub async fn add_participant(
&self,
conf_id: &ConferenceId,
leg_id: LegId,
) -> Result<ParticipantChannels> {
{
let leg_map = self.leg_to_conference.read().await;
if let Some(existing_conf) = leg_map.get(&leg_id)
&& existing_conf != conf_id
{
return Err(anyhow!(
"Leg {} is already in conference {}",
leg_id,
existing_conf.0
));
}
}
{
let mut conferences = self.conferences.write().await;
let conference = conferences
.get_mut(conf_id)
.ok_or_else(|| anyhow!("Conference {} not found", conf_id.0))?;
conference.add_participant(leg_id.clone())?;
}
let (input_tx, output_rx) = {
let audio_mixers = self.audio_mixers.read().await;
let mixer = audio_mixers
.get(conf_id)
.ok_or_else(|| anyhow!("Audio mixer not found for conference {}", conf_id.0))?;
mixer
.add_participant(leg_id.clone(), CodecType::PCMU)
.await?
};
let channels = ParticipantChannels::new(input_tx);
{
let mut participant_channels = self.participant_channels.write().await;
participant_channels.insert(leg_id.clone(), channels.clone());
let mut leg_map = self.leg_to_conference.write().await;
leg_map.insert(leg_id.clone(), conf_id.clone());
}
{
let mut output_rxs = self.participant_output_rxs.write().await;
output_rxs.insert(leg_id.clone(), output_rx);
}
Ok(channels)
}
pub async fn remove_participant(&self, conf_id: &ConferenceId, leg_id: &LegId) -> Result<()> {
{
let mut conferences = self.conferences.write().await;
let conference = conferences
.get_mut(conf_id)
.ok_or_else(|| anyhow!("Conference {} not found", conf_id.0))?;
conference.remove_participant(leg_id)?;
}
let audio_mixers = self.audio_mixers.read().await;
if let Some(mixer) = audio_mixers.get(conf_id) {
mixer.remove_participant(leg_id).await?;
}
{
let mut participant_channels = self.participant_channels.write().await;
participant_channels.remove(leg_id);
let mut participant_output_rxs = self.participant_output_rxs.write().await;
participant_output_rxs.remove(leg_id);
let mut leg_map = self.leg_to_conference.write().await;
leg_map.remove(leg_id);
}
Ok(())
}
pub async fn mute_participant(&self, conf_id: &ConferenceId, leg_id: &LegId) -> Result<()> {
{
let mut conferences = self.conferences.write().await;
let conference = conferences
.get_mut(conf_id)
.ok_or_else(|| anyhow!("Conference {} not found", conf_id.0))?;
conference.mute_participant(leg_id)?;
}
let audio_mixers = self.audio_mixers.read().await;
if let Some(mixer) = audio_mixers.get(conf_id) {
mixer.set_muted(leg_id, true).await?;
}
Ok(())
}
pub async fn unmute_participant(&self, conf_id: &ConferenceId, leg_id: &LegId) -> Result<()> {
{
let mut conferences = self.conferences.write().await;
let conference = conferences
.get_mut(conf_id)
.ok_or_else(|| anyhow!("Conference {} not found", conf_id.0))?;
conference.unmute_participant(leg_id)?;
}
let audio_mixers = self.audio_mixers.read().await;
if let Some(mixer) = audio_mixers.get(conf_id) {
mixer.set_muted(leg_id, false).await?;
}
Ok(())
}
pub async fn get_conference_id_for_leg(&self, leg_id: &LegId) -> Option<ConferenceId> {
let leg_map = self.leg_to_conference.read().await;
leg_map.get(leg_id).cloned()
}
pub async fn get_participant_channels(&self, leg_id: &LegId) -> Option<ParticipantChannels> {
let participant_channels = self.participant_channels.read().await;
participant_channels.get(leg_id).cloned()
}
pub async fn take_participant_output_rx(
&self,
leg_id: &LegId,
) -> Option<mpsc::Receiver<AudioFrame>> {
let mut participant_output_rxs = self.participant_output_rxs.write().await;
participant_output_rxs.remove(leg_id)
}
pub async fn list_conferences(&self) -> Vec<ConferenceId> {
let conferences = self.conferences.read().await;
conferences.keys().cloned().collect()
}
pub async fn get_conference_stats(&self, conf_id: &ConferenceId) -> Result<ConferenceStats> {
let conferences = self.conferences.read().await;
let conference = conferences
.get(conf_id)
.ok_or_else(|| anyhow!("Conference {} not found", conf_id.0))?;
Ok(ConferenceStats {
conference_id: conf_id.0.clone(),
participant_count: conference.participant_count(),
muted_count: conference.participants.values().filter(|p| p.muted).count(),
duration: conference.created_at.elapsed(),
})
}
pub async fn remove_leg_from_all(&self, leg_id: &LegId) -> Result<()> {
let conf_id = {
let leg_map = self.leg_to_conference.read().await;
leg_map.get(leg_id).cloned()
};
if let Some(conf_id) = conf_id {
let _ = self.remove_participant(&conf_id, leg_id).await;
}
Ok(())
}
}
impl Default for ConferenceManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct ConferenceStats {
pub conference_id: String,
pub participant_count: usize,
pub muted_count: usize,
pub duration: std::time::Duration,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_conference_manager_creation() {
let manager = ConferenceManager::new();
let conferences = manager.list_conferences().await;
assert!(conferences.is_empty());
}
#[tokio::test]
async fn test_create_destroy_conference() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf");
let conf = manager
.create_conference(conf_id.clone(), Some(10))
.await
.unwrap();
assert_eq!(conf.participant_count(), 0);
manager.destroy_conference(&conf_id).await.unwrap();
assert!(manager.get_conference(&conf_id).await.is_none());
}
#[tokio::test]
async fn test_add_remove_participant_with_audio() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf");
manager
.create_conference(conf_id.clone(), None)
.await
.unwrap();
let leg_id = LegId::new("leg1");
let channels = manager
.add_participant(&conf_id, leg_id.clone())
.await
.unwrap();
let frame = crate::media::conference_mixer::AudioFrame::new(vec![1000i16; 160], 8000);
channels.input_tx.send(frame).await.unwrap();
manager.remove_participant(&conf_id, &leg_id).await.unwrap();
let conf = manager.get_conference(&conf_id).await.unwrap();
assert_eq!(conf.participant_count(), 0);
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_mute_unmute() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf");
manager
.create_conference(conf_id.clone(), None)
.await
.unwrap();
let leg_id = LegId::new("leg1");
manager
.add_participant(&conf_id, leg_id.clone())
.await
.unwrap();
manager.mute_participant(&conf_id, &leg_id).await.unwrap();
let conf = manager.get_conference(&conf_id).await.unwrap();
let participant = conf.participants.get(&leg_id).unwrap();
assert!(participant.muted);
manager.unmute_participant(&conf_id, &leg_id).await.unwrap();
let conf = manager.get_conference(&conf_id).await.unwrap();
let participant = conf.participants.get(&leg_id).unwrap();
assert!(!participant.muted);
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_max_participants_limit() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf-max");
manager
.create_conference(conf_id.clone(), Some(2))
.await
.unwrap();
let leg1 = LegId::new("leg1");
let leg2 = LegId::new("leg2");
let leg3 = LegId::new("leg3");
manager.add_participant(&conf_id, leg1).await.unwrap();
manager.add_participant(&conf_id, leg2).await.unwrap();
let result = manager.add_participant(&conf_id, leg3).await;
assert!(
result.is_err(),
"Should fail when exceeding max participants"
);
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_duplicate_participant() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf-dup");
manager
.create_conference(conf_id.clone(), None)
.await
.unwrap();
let leg_id = LegId::new("leg1");
manager
.add_participant(&conf_id, leg_id.clone())
.await
.unwrap();
assert!(
manager
.add_participant(&conf_id, leg_id.clone())
.await
.is_err()
);
let conf = manager.get_conference(&conf_id).await.unwrap();
assert_eq!(conf.participant_count(), 1);
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_conference_stats() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf-stats");
manager
.create_conference(conf_id.clone(), None)
.await
.unwrap();
let leg1 = LegId::new("leg1");
let leg2 = LegId::new("leg2");
manager
.add_participant(&conf_id, leg1.clone())
.await
.unwrap();
manager
.add_participant(&conf_id, leg2.clone())
.await
.unwrap();
manager.mute_participant(&conf_id, &leg1).await.unwrap();
let stats = manager.get_conference_stats(&conf_id).await.unwrap();
assert_eq!(stats.participant_count, 2);
assert_eq!(stats.muted_count, 1);
assert_eq!(stats.conference_id, "test-conf-stats");
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_list_conferences() {
let manager = ConferenceManager::new();
let conf1 = ConferenceId::from("conf1");
let conf2 = ConferenceId::from("conf2");
manager
.create_conference(conf1.clone(), None)
.await
.unwrap();
manager
.create_conference(conf2.clone(), None)
.await
.unwrap();
let conferences = manager.list_conferences().await;
assert_eq!(conferences.len(), 2);
assert!(conferences.contains(&conf1));
assert!(conferences.contains(&conf2));
manager.destroy_conference(&conf1).await.unwrap();
manager.destroy_conference(&conf2).await.unwrap();
}
#[tokio::test]
async fn test_remove_leg_from_all() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf-remove-all");
manager
.create_conference(conf_id.clone(), None)
.await
.unwrap();
let leg_id = LegId::new("leg1");
manager
.add_participant(&conf_id, leg_id.clone())
.await
.unwrap();
manager.remove_leg_from_all(&leg_id).await.unwrap();
let conf = manager.get_conference(&conf_id).await.unwrap();
assert_eq!(conf.participant_count(), 0);
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_concurrent_participants() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf-concurrent");
manager
.create_conference(conf_id.clone(), None)
.await
.unwrap();
let mut handles = vec![];
for i in 0..5 {
let manager = manager.clone();
let conf_id = conf_id.clone();
let handle = tokio::spawn(async move {
let leg_id = LegId::new(format!("leg{}", i));
manager.add_participant(&conf_id, leg_id).await.unwrap();
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let conf = manager.get_conference(&conf_id).await.unwrap();
assert_eq!(conf.participant_count(), 5);
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_get_conference_for_leg() {
let manager = ConferenceManager::new();
let conf_id = ConferenceId::from("test-conf-lookup");
manager
.create_conference(conf_id.clone(), None)
.await
.unwrap();
let leg_id = LegId::new("leg1");
manager
.add_participant(&conf_id, leg_id.clone())
.await
.unwrap();
let found_conf = manager.get_conference_id_for_leg(&leg_id).await;
assert!(found_conf.is_some());
assert_eq!(found_conf.unwrap().0, "test-conf-lookup");
let not_found = manager
.get_conference_id_for_leg(&LegId::new("nonexistent"))
.await;
assert!(not_found.is_none());
manager.destroy_conference(&conf_id).await.unwrap();
}
#[tokio::test]
async fn test_cross_conference_isolation() {
let manager = ConferenceManager::new();
let conf1 = ConferenceId::from("conf1");
let conf2 = ConferenceId::from("conf2");
manager
.create_conference(conf1.clone(), None)
.await
.unwrap();
manager
.create_conference(conf2.clone(), None)
.await
.unwrap();
let leg = LegId::new("shared-leg");
manager.add_participant(&conf1, leg.clone()).await.unwrap();
let result = manager.add_participant(&conf2, leg.clone()).await;
assert!(
result.is_err(),
"Leg should not be able to join multiple conferences"
);
manager.destroy_conference(&conf1).await.unwrap();
manager.destroy_conference(&conf2).await.unwrap();
}
}