use crate::core::types::{DeviceId, Group, Message, MessagePriority};
use crate::router::selector::Router;
use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, info, warn};
use std::collections::HashSet;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BroadcastStrategy {
Direct,
FanOut,
PowerEfficient,
}
pub struct DispatchResult {
pub successful_devices: HashSet<DeviceId>,
pub failed_devices: HashSet<DeviceId>,
}
pub struct BroadcastExecutor {
router: Arc<Router>,
}
impl BroadcastExecutor {
pub fn new(router: Arc<Router>) -> Self {
Self { router }
}
pub async fn execute_broadcast(
&self,
group: &Group,
message: Message,
strategy: BroadcastStrategy,
) -> DispatchResult {
match strategy {
BroadcastStrategy::Direct => self.execute_direct(group, message).await,
BroadcastStrategy::FanOut => self.execute_fan_out(group, message).await,
BroadcastStrategy::PowerEfficient => self.execute_power_efficient(group, message).await,
}
}
async fn execute_direct(&self, group: &Group, message: Message) -> DispatchResult {
let mut successful_devices = HashSet::new();
let mut failed_devices = HashSet::new();
let mut futures = FuturesUnordered::new();
for &member_id in group.members.keys() {
if member_id == message.sender {
continue;
}
let router = self.router.clone();
let mut msg_to_send = message.clone();
msg_to_send.recipient = member_id;
futures.push(async move {
match router.select_channel(&msg_to_send).await {
Ok(channel) => {
if let Err(e) = channel.send(msg_to_send).await {
warn!("Failed to send message to member {}: {}", member_id, e);
(member_id, false)
} else {
debug!("Successfully sent broadcast chunk to {}", member_id);
(member_id, true)
}
}
Err(e) => {
warn!("No channel available for member {}: {}", member_id, e);
(member_id, false)
}
}
});
}
while let Some((member_id, success)) = futures.next().await {
if success {
successful_devices.insert(member_id);
} else {
failed_devices.insert(member_id);
}
}
DispatchResult {
successful_devices,
failed_devices,
}
}
async fn execute_fan_out(&self, group: &Group, message: Message) -> DispatchResult {
info!(
"Fan-out strategy requested for group {}, falling back to Direct",
group.id
);
self.execute_direct(group, message).await
}
async fn execute_power_efficient(&self, group: &Group, mut message: Message) -> DispatchResult {
message.priority = MessagePriority::Low;
self.execute_direct(group, message).await
}
}