use std::collections::HashSet;
use std::net::IpAddr;
use std::sync::{mpsc, Arc};
use std::thread::{self, JoinHandle};
use parking_lot::RwLock;
use sonos_api::Service;
use sonos_event_manager::SonosEventManager;
use sonos_stream::events::EventData;
use crate::decoder::{decode_event, decode_topology_event, PropertyChange, TopologyChanges};
use crate::model::SpeakerId;
use crate::property::{GroupMembership, Property};
use crate::state::{ChangeEvent, StateStore};
pub(crate) fn spawn_state_event_worker(
event_manager: Arc<SonosEventManager>,
store: Arc<RwLock<StateStore>>,
watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
event_tx: mpsc::Sender<ChangeEvent>,
ip_to_speaker: Arc<RwLock<std::collections::HashMap<IpAddr, SpeakerId>>>,
) -> JoinHandle<()> {
thread::spawn(move || {
tracing::info!("State event worker started, waiting for events...");
for event in event_manager.iter() {
tracing::debug!(
"Received event from {} for service {:?}",
event.speaker_ip,
event.service
);
if let EventData::ZoneGroupTopology(ref zgt_event) = event.event_data {
tracing::debug!("Processing ZoneGroupTopology event");
let topology_changes = decode_topology_event(zgt_event);
apply_topology_changes(&store, &watched, &event_tx, topology_changes);
continue;
}
let speaker_id = {
let ip_map = ip_to_speaker.read();
tracing::debug!(
"ip_to_speaker map has {} entries: {:?}",
ip_map.len(),
ip_map.keys().collect::<Vec<_>>()
);
match ip_map.get(&event.speaker_ip) {
Some(id) => id.clone(),
None => {
tracing::warn!(
"Received event from unknown speaker IP: {} (not in ip_to_speaker map)",
event.speaker_ip
);
continue;
}
}
};
tracing::debug!(
"Mapped IP {} to speaker_id {}",
event.speaker_ip,
speaker_id.as_str()
);
let decoded = decode_event(&event, speaker_id.clone());
tracing::debug!(
"Decoded {} property changes from event",
decoded.changes.len()
);
for change in decoded.changes {
tracing::debug!("Applying change: {:?}", change);
apply_property_change(&store, &watched, &event_tx, &speaker_id, change);
}
}
tracing::info!("State event worker stopped");
})
}
fn apply_topology_changes(
store: &Arc<RwLock<StateStore>>,
watched: &Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
event_tx: &mpsc::Sender<ChangeEvent>,
changes: TopologyChanges,
) {
tracing::debug!(
"Applying topology changes: {} groups, {} memberships",
changes.groups.len(),
changes.memberships.len()
);
let membership_changes: Vec<(SpeakerId, bool)> = {
let mut store = store.write();
store.clear_groups();
for group in changes.groups {
tracing::debug!(
"Adding group {} with {} members",
group.id.as_str(),
group.member_ids.len()
);
store.add_group(group);
}
let mut changed_memberships = Vec::new();
for (speaker_id, membership) in changes.memberships {
let changed = store.set(&speaker_id, membership);
changed_memberships.push((speaker_id, changed));
}
for (speaker_id, boot_seq) in changes.boot_seqs {
if let Some(speaker) = store.speakers.get_mut(&speaker_id) {
speaker.boot_seq = boot_seq;
}
}
changed_memberships
};
let watched_set = watched.read();
for (speaker_id, changed) in membership_changes {
if changed && watched_set.contains(&(speaker_id.clone(), GroupMembership::KEY)) {
tracing::debug!(
"GroupMembership changed for {}, emitting event",
speaker_id.as_str()
);
let _ = event_tx.send(ChangeEvent::new(
speaker_id,
GroupMembership::KEY,
Service::ZoneGroupTopology,
));
}
}
}
fn apply_property_change(
store: &Arc<RwLock<StateStore>>,
watched: &Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
event_tx: &mpsc::Sender<ChangeEvent>,
speaker_id: &SpeakerId,
change: PropertyChange,
) {
let key = change.key();
let service = change.service();
let changed = {
let mut store = store.write();
change.apply(&mut store, speaker_id)
};
if changed {
let is_watched = watched.read().contains(&(speaker_id.clone(), key));
if is_watched {
tracing::debug!(
"Property {} changed for {}, emitting event",
key,
speaker_id.as_str()
);
let _ = event_tx.send(ChangeEvent::new(speaker_id.clone(), key, service));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::GroupId;
use crate::property::{GroupInfo, Property, Volume};
use sonos_api::Service;
#[test]
fn test_apply_property_change_volume() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, rx) = mpsc::channel();
let speaker_id = SpeakerId::new("test-speaker");
{
let mut s = store.write();
s.add_speaker(crate::model::SpeakerInfo {
id: speaker_id.clone(),
name: "Test".to_string(),
room_name: "Test".to_string(),
ip_address: "192.168.1.100".parse().unwrap(),
port: 1400,
model_name: "Test".to_string(),
software_version: "1.0".to_string(),
boot_seq: 0,
satellites: vec![],
});
}
apply_property_change(
&store,
&watched,
&tx,
&speaker_id,
PropertyChange::Volume(Volume(50)),
);
assert!(rx.try_recv().is_err());
let stored: Option<Volume> = store.read().get(&speaker_id);
assert_eq!(stored, Some(Volume(50)));
}
#[test]
fn test_apply_property_change_with_watch() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, rx) = mpsc::channel();
let speaker_id = SpeakerId::new("test-speaker");
{
let mut s = store.write();
s.add_speaker(crate::model::SpeakerInfo {
id: speaker_id.clone(),
name: "Test".to_string(),
room_name: "Test".to_string(),
ip_address: "192.168.1.100".parse().unwrap(),
port: 1400,
model_name: "Test".to_string(),
software_version: "1.0".to_string(),
boot_seq: 0,
satellites: vec![],
});
}
{
let mut w = watched.write();
w.insert((speaker_id.clone(), Volume::KEY));
}
apply_property_change(
&store,
&watched,
&tx,
&speaker_id,
PropertyChange::Volume(Volume(75)),
);
let event = rx.try_recv().unwrap();
assert_eq!(event.speaker_id, speaker_id);
assert_eq!(event.property_key, Volume::KEY);
assert_eq!(event.service, Service::RenderingControl);
}
fn make_speaker_info(id: &str, name: &str, ip: &str) -> crate::model::SpeakerInfo {
crate::model::SpeakerInfo {
id: SpeakerId::new(id),
name: name.to_string(),
room_name: name.to_string(),
ip_address: ip.parse().unwrap(),
port: 1400,
model_name: "Test".to_string(),
software_version: "1.0".to_string(),
boot_seq: 0,
satellites: vec![],
}
}
#[test]
fn test_apply_property_change_group_volume() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, _rx) = mpsc::channel();
let speaker_id = SpeakerId::new("RINCON_111");
let group_id = GroupId::new("RINCON_111:1");
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
s.add_group(GroupInfo::new(
group_id.clone(),
speaker_id.clone(),
vec![speaker_id.clone()],
));
}
apply_property_change(
&store,
&watched,
&tx,
&speaker_id,
PropertyChange::GroupVolume(crate::property::GroupVolume(75)),
);
let s = store.read();
let stored: Option<crate::property::GroupVolume> = s.get_group(&group_id);
assert_eq!(stored, Some(crate::property::GroupVolume(75)));
}
#[test]
fn test_apply_property_change_group_volume_no_group() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, _rx) = mpsc::channel();
let speaker_id = SpeakerId::new("RINCON_111");
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
}
apply_property_change(
&store,
&watched,
&tx,
&speaker_id,
PropertyChange::GroupVolume(crate::property::GroupVolume(50)),
);
let s = store.read();
assert!(s.group_props.is_empty());
}
#[test]
fn test_apply_topology_changes_updates_groups() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, _rx) = mpsc::channel();
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
s.add_speaker(make_speaker_info("RINCON_222", "Kitchen", "192.168.1.102"));
}
let group_id = GroupId::new("RINCON_111:1");
let speaker1 = SpeakerId::new("RINCON_111");
let speaker2 = SpeakerId::new("RINCON_222");
let changes = TopologyChanges {
groups: vec![GroupInfo::new(
group_id.clone(),
speaker1.clone(),
vec![speaker1.clone(), speaker2.clone()],
)],
memberships: vec![
(
speaker1.clone(),
GroupMembership::new(group_id.clone(), true),
),
(
speaker2.clone(),
GroupMembership::new(group_id.clone(), false),
),
],
boot_seqs: vec![],
};
apply_topology_changes(&store, &watched, &tx, changes);
let s = store.read();
assert_eq!(s.groups.len(), 1);
let group = s.groups.get(&group_id).unwrap();
assert_eq!(group.coordinator_id, speaker1);
assert_eq!(group.member_ids.len(), 2);
assert!(group.member_ids.contains(&speaker1));
assert!(group.member_ids.contains(&speaker2));
}
#[test]
fn test_apply_topology_changes_updates_group_membership() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, _rx) = mpsc::channel();
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
s.add_speaker(make_speaker_info("RINCON_222", "Kitchen", "192.168.1.102"));
}
let group_id = GroupId::new("RINCON_111:1");
let speaker1 = SpeakerId::new("RINCON_111");
let speaker2 = SpeakerId::new("RINCON_222");
let changes = TopologyChanges {
groups: vec![GroupInfo::new(
group_id.clone(),
speaker1.clone(),
vec![speaker1.clone(), speaker2.clone()],
)],
memberships: vec![
(
speaker1.clone(),
GroupMembership::new(group_id.clone(), true),
),
(
speaker2.clone(),
GroupMembership::new(group_id.clone(), false),
),
],
boot_seqs: vec![],
};
apply_topology_changes(&store, &watched, &tx, changes);
let s = store.read();
let membership1: Option<GroupMembership> = s.get(&speaker1);
assert!(membership1.is_some());
let m1 = membership1.unwrap();
assert_eq!(m1.group_id, group_id);
assert!(m1.is_coordinator);
let membership2: Option<GroupMembership> = s.get(&speaker2);
assert!(membership2.is_some());
let m2 = membership2.unwrap();
assert_eq!(m2.group_id, group_id);
assert!(!m2.is_coordinator);
}
#[test]
fn test_apply_topology_changes_emits_events_for_watched_properties() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, rx) = mpsc::channel();
let speaker1 = SpeakerId::new("RINCON_111");
let speaker2 = SpeakerId::new("RINCON_222");
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
s.add_speaker(make_speaker_info("RINCON_222", "Kitchen", "192.168.1.102"));
}
{
let mut w = watched.write();
w.insert((speaker1.clone(), GroupMembership::KEY));
}
let group_id = GroupId::new("RINCON_111:1");
let changes = TopologyChanges {
groups: vec![GroupInfo::new(
group_id.clone(),
speaker1.clone(),
vec![speaker1.clone(), speaker2.clone()],
)],
memberships: vec![
(
speaker1.clone(),
GroupMembership::new(group_id.clone(), true),
),
(
speaker2.clone(),
GroupMembership::new(group_id.clone(), false),
),
],
boot_seqs: vec![],
};
apply_topology_changes(&store, &watched, &tx, changes);
let event = rx.try_recv().unwrap();
assert_eq!(event.speaker_id, speaker1);
assert_eq!(event.property_key, GroupMembership::KEY);
assert_eq!(event.service, Service::ZoneGroupTopology);
assert!(rx.try_recv().is_err());
}
#[test]
fn test_apply_topology_changes_clears_old_groups() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, _rx) = mpsc::channel();
let speaker1 = SpeakerId::new("RINCON_111");
let speaker2 = SpeakerId::new("RINCON_222");
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
s.add_speaker(make_speaker_info("RINCON_222", "Kitchen", "192.168.1.102"));
let old_group_id = GroupId::new("OLD_GROUP:1");
s.add_group(GroupInfo::new(
old_group_id.clone(),
speaker1.clone(),
vec![speaker1.clone()],
));
}
{
let s = store.read();
assert_eq!(s.groups.len(), 1);
assert!(s.groups.contains_key(&GroupId::new("OLD_GROUP:1")));
}
let new_group_id = GroupId::new("NEW_GROUP:1");
let changes = TopologyChanges {
groups: vec![GroupInfo::new(
new_group_id.clone(),
speaker2.clone(),
vec![speaker1.clone(), speaker2.clone()],
)],
memberships: vec![
(
speaker1.clone(),
GroupMembership::new(new_group_id.clone(), false),
),
(
speaker2.clone(),
GroupMembership::new(new_group_id.clone(), true),
),
],
boot_seqs: vec![],
};
apply_topology_changes(&store, &watched, &tx, changes);
let s = store.read();
assert_eq!(s.groups.len(), 1);
assert!(!s.groups.contains_key(&GroupId::new("OLD_GROUP:1")));
assert!(s.groups.contains_key(&new_group_id));
}
#[test]
fn test_apply_topology_changes_updates_speaker_to_group_mapping() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, _rx) = mpsc::channel();
let speaker1 = SpeakerId::new("RINCON_111");
let speaker2 = SpeakerId::new("RINCON_222");
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
s.add_speaker(make_speaker_info("RINCON_222", "Kitchen", "192.168.1.102"));
}
let group_id = GroupId::new("RINCON_111:1");
let changes = TopologyChanges {
groups: vec![GroupInfo::new(
group_id.clone(),
speaker1.clone(),
vec![speaker1.clone(), speaker2.clone()],
)],
memberships: vec![
(
speaker1.clone(),
GroupMembership::new(group_id.clone(), true),
),
(
speaker2.clone(),
GroupMembership::new(group_id.clone(), false),
),
],
boot_seqs: vec![],
};
apply_topology_changes(&store, &watched, &tx, changes);
let s = store.read();
assert_eq!(s.speaker_to_group.get(&speaker1), Some(&group_id));
assert_eq!(s.speaker_to_group.get(&speaker2), Some(&group_id));
}
#[test]
fn test_apply_topology_changes_no_event_when_membership_unchanged() {
let store = Arc::new(RwLock::new(StateStore::new()));
let watched = Arc::new(RwLock::new(HashSet::new()));
let (tx, rx) = mpsc::channel();
let speaker1 = SpeakerId::new("RINCON_111");
let group_id = GroupId::new("RINCON_111:1");
{
let mut s = store.write();
s.add_speaker(make_speaker_info(
"RINCON_111",
"Living Room",
"192.168.1.101",
));
s.set(&speaker1, GroupMembership::new(group_id.clone(), true));
}
{
let mut w = watched.write();
w.insert((speaker1.clone(), GroupMembership::KEY));
}
let changes = TopologyChanges {
groups: vec![GroupInfo::new(
group_id.clone(),
speaker1.clone(),
vec![speaker1.clone()],
)],
memberships: vec![(
speaker1.clone(),
GroupMembership::new(group_id.clone(), true),
)],
boot_seqs: vec![],
};
apply_topology_changes(&store, &watched, &tx, changes);
assert!(rx.try_recv().is_err());
}
}