Skip to main content

sonos_state/
state.rs

1//! Sync-first State Management for Sonos devices
2//!
3//! Provides a synchronous API for managing Sonos device state with
4//! background event processing.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use sonos_state::{StateManager, Volume};
10//! use sonos_discovery;
11//!
12//! // Create state manager (sync)
13//! let manager = StateManager::new()?;
14//! let devices = sonos_discovery::get();
15//! manager.add_devices(devices)?;
16//!
17//! // Get speakers
18//! for info in manager.speaker_infos() {
19//!     println!("{}: {}", info.name, info.ip_address);
20//! }
21//!
22//! // Blocking iteration over changes
23//! for event in manager.iter() {
24//!     println!("Change: {:?}", event);
25//! }
26//! ```
27
28use std::any::{Any, TypeId};
29use std::collections::{HashMap, HashSet};
30use std::net::IpAddr;
31use std::sync::{mpsc, Arc, Mutex, OnceLock};
32use std::thread::JoinHandle;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36
37use sonos_api::Service;
38use sonos_discovery::Device;
39use sonos_event_manager::{SonosEventManager, WatchRegistry};
40use tracing::info;
41
42use crate::event_worker::spawn_state_event_worker;
43use crate::iter::ChangeIterator;
44use crate::model::{GroupId, SpeakerId, SpeakerInfo};
45use crate::property::{GroupInfo, Property, SonosProperty, Topology};
46use crate::{Result, StateError};
47
48/// Closure type for lazy event manager initialization.
49///
50/// Stored on `StateManager` as the single source of truth. Called by
51/// `PropertyHandle::watch()` to trigger event manager creation on first use.
52/// Uses `Box<dyn Error>` to avoid circular dependency on `sonos-sdk` error types.
53pub type EventInitFn = Arc<
54    dyn Fn() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
55>;
56
57// ============================================================================
58// ChangeEvent - for iter()
59// ============================================================================
60
61/// A change event emitted when a watched property changes
62#[derive(Debug, Clone)]
63pub struct ChangeEvent {
64    /// Speaker or entity that changed
65    pub speaker_id: SpeakerId,
66    /// Property key that changed
67    pub property_key: &'static str,
68    /// Service the property belongs to
69    pub service: Service,
70    /// When the change occurred
71    pub timestamp: Instant,
72}
73
74impl ChangeEvent {
75    pub fn new(speaker_id: SpeakerId, property_key: &'static str, service: Service) -> Self {
76        Self {
77            speaker_id,
78            property_key,
79            service,
80            timestamp: Instant::now(),
81        }
82    }
83}
84
85// ============================================================================
86// Internal StateStore
87// ============================================================================
88
89/// Internal state storage
90pub struct StateStore {
91    /// Speaker metadata
92    pub(crate) speakers: HashMap<SpeakerId, SpeakerInfo>,
93    /// IP to speaker ID mapping
94    pub(crate) ip_to_speaker: HashMap<IpAddr, SpeakerId>,
95    /// Property values: (speaker_id, property_key) -> type-erased value
96    pub(crate) speaker_props: HashMap<SpeakerId, PropertyBag>,
97    /// Group metadata
98    pub(crate) groups: HashMap<GroupId, GroupInfo>,
99    /// Group properties
100    pub(crate) group_props: HashMap<GroupId, PropertyBag>,
101    /// System properties
102    pub(crate) system_props: PropertyBag,
103    /// Speaker to group mapping for quick lookups
104    pub(crate) speaker_to_group: HashMap<SpeakerId, GroupId>,
105}
106
107impl StateStore {
108    pub(crate) fn new() -> Self {
109        Self {
110            speakers: HashMap::new(),
111            ip_to_speaker: HashMap::new(),
112            speaker_props: HashMap::new(),
113            groups: HashMap::new(),
114            group_props: HashMap::new(),
115            system_props: PropertyBag::new(),
116            speaker_to_group: HashMap::new(),
117        }
118    }
119
120    pub(crate) fn add_speaker(&mut self, speaker: SpeakerInfo) {
121        let id = speaker.id.clone();
122        let ip = speaker.ip_address;
123        self.ip_to_speaker.insert(ip, id.clone());
124        self.speakers.insert(id.clone(), speaker);
125        self.speaker_props
126            .entry(id)
127            .or_insert_with(PropertyBag::new);
128    }
129
130    fn speaker(&self, id: &SpeakerId) -> Option<&SpeakerInfo> {
131        self.speakers.get(id)
132    }
133
134    fn speakers(&self) -> Vec<SpeakerInfo> {
135        self.speakers.values().cloned().collect()
136    }
137
138    pub(crate) fn add_group(&mut self, group: GroupInfo) {
139        let id = group.id.clone();
140        // Update speaker_to_group mapping for all members
141        for member_id in &group.member_ids {
142            self.speaker_to_group.insert(member_id.clone(), id.clone());
143        }
144        self.groups.insert(id.clone(), group);
145        self.group_props.entry(id).or_insert_with(PropertyBag::new);
146    }
147
148    /// Get the group a speaker belongs to
149    #[allow(dead_code)]
150    pub(crate) fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<&GroupInfo> {
151        let group_id = self.speaker_to_group.get(speaker_id)?;
152        self.groups.get(group_id)
153    }
154
155    /// Clear all groups and speaker_to_group mappings
156    ///
157    /// Used when processing topology updates to replace all group data
158    pub(crate) fn clear_groups(&mut self) {
159        self.groups.clear();
160        self.group_props.clear();
161        self.speaker_to_group.clear();
162    }
163
164    pub(crate) fn get<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
165        self.speaker_props.get(speaker_id)?.get::<P>()
166    }
167
168    pub(crate) fn set<P: Property>(&mut self, speaker_id: &SpeakerId, value: P) -> bool {
169        let bag = self
170            .speaker_props
171            .entry(speaker_id.clone())
172            .or_insert_with(PropertyBag::new);
173        bag.set(value)
174    }
175
176    pub(crate) fn get_group<P: Property>(&self, group_id: &GroupId) -> Option<P> {
177        self.group_props.get(group_id)?.get::<P>()
178    }
179
180    pub(crate) fn set_group<P: Property>(&mut self, group_id: &GroupId, value: P) -> bool {
181        let bag = self
182            .group_props
183            .entry(group_id.clone())
184            .or_insert_with(PropertyBag::new);
185        bag.set(value)
186    }
187
188    fn set_system<P: Property>(&mut self, value: P) -> bool {
189        self.system_props.set(value)
190    }
191
192    fn is_empty(&self) -> bool {
193        self.speakers.is_empty()
194    }
195
196    fn speaker_count(&self) -> usize {
197        self.speakers.len()
198    }
199
200    fn group_count(&self) -> usize {
201        self.groups.len()
202    }
203}
204
205// ============================================================================
206// PropertyBag - type-erased property storage
207// ============================================================================
208
209pub(crate) struct PropertyBag {
210    /// Map<TypeId, Box<dyn Any>> where Any is the property value
211    values: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
212}
213
214impl PropertyBag {
215    pub(crate) fn new() -> Self {
216        Self {
217            values: HashMap::new(),
218        }
219    }
220
221    fn get<P: Property>(&self) -> Option<P> {
222        let type_id = TypeId::of::<P>();
223        self.values
224            .get(&type_id)
225            .and_then(|boxed| boxed.downcast_ref::<P>())
226            .cloned()
227    }
228
229    fn set<P: Property>(&mut self, value: P) -> bool {
230        let type_id = TypeId::of::<P>();
231        let current = self
232            .values
233            .get(&type_id)
234            .and_then(|boxed| boxed.downcast_ref::<P>());
235
236        if current != Some(&value) {
237            self.values.insert(type_id, Box::new(value));
238            true
239        } else {
240            false
241        }
242    }
243}
244
245// ============================================================================
246// StateManager - main entry point
247// ============================================================================
248
249/// Core state manager with sync-first API
250///
251/// All public methods are synchronous. Background event processing
252/// happens in a dedicated thread.
253pub struct StateManager {
254    /// Property values storage
255    store: Arc<RwLock<StateStore>>,
256
257    /// Watched properties for iter() filtering
258    watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
259
260    /// IP to speaker ID mapping (for event worker)
261    ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
262
263    /// Event manager (set-once via OnceLock — enables live events)
264    event_manager: OnceLock<Arc<SonosEventManager>>,
265
266    /// Channel for sending change events to iter()
267    event_tx: mpsc::Sender<ChangeEvent>,
268
269    /// Receiver for iter() - wrapped in `Arc<Mutex>` for cloning
270    event_rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
271
272    /// Background event processor handle (lazily spawned)
273    _worker: Mutex<Option<JoinHandle<()>>>,
274
275    /// Cleanup timeout for subscriptions
276    cleanup_timeout: Duration,
277
278    /// Maps property key → Service for WatchRegistry's unregister_watches_for_service.
279    /// Shared with StateWatchRegistry via Arc.
280    key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
281
282    /// Lazy event manager initialization closure (set-once).
283    /// Called by watch() to trigger event manager creation on first use.
284    event_init: OnceLock<EventInitFn>,
285}
286
287// ============================================================================
288// StateWatchRegistry - WatchRegistry impl for SonosEventManager
289// ============================================================================
290
291/// Lightweight WatchRegistry implementation wired into the event manager.
292///
293/// Separated from StateManager because `mpsc::Sender` is `!Sync`,
294/// preventing StateManager itself from satisfying `WatchRegistry: Sync`.
295/// This struct holds only the Arc-wrapped fields needed for watch management.
296struct StateWatchRegistry {
297    watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
298    ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
299    key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
300}
301
302impl WatchRegistry for StateWatchRegistry {
303    fn register_watch(&self, speaker_id: &SpeakerId, key: &'static str, service: Service) {
304        self.watched.write().insert((speaker_id.clone(), key));
305        self.key_to_service.write().insert(key, service);
306    }
307
308    fn unregister_watches_for_service(&self, ip: IpAddr, service: Service) {
309        // 1. Resolve IP → SpeakerId
310        let speaker_id = match self.ip_to_speaker.read().get(&ip).cloned() {
311            Some(id) => id,
312            None => {
313                tracing::warn!(
314                    "unregister_watches_for_service: no speaker found for IP {}",
315                    ip
316                );
317                return;
318            }
319        };
320
321        // 2. Find property keys belonging to this service
322        let service_keys: Vec<&'static str> = self
323            .key_to_service
324            .read()
325            .iter()
326            .filter(|(_, &svc)| svc == service)
327            .map(|(&key, _)| key)
328            .collect();
329
330        // 3. Remove matching (speaker_id, key) entries from watched set
331        let mut watched = self.watched.write();
332        for key in service_keys {
333            watched.remove(&(speaker_id.clone(), key));
334        }
335    }
336}
337
338impl StateManager {
339    /// Create a new StateManager with default settings (sync)
340    ///
341    /// # Example
342    ///
343    /// ```rust,ignore
344    /// let manager = StateManager::new()?;
345    /// ```
346    pub fn new() -> Result<Self> {
347        Self::builder().build()
348    }
349
350    /// Create a StateManager builder for custom configuration
351    pub fn builder() -> StateManagerBuilder {
352        StateManagerBuilder::default()
353    }
354
355    /// Add discovered devices (sync)
356    ///
357    /// # Example
358    ///
359    /// ```rust,ignore
360    /// let devices = sonos_discovery::get();
361    /// manager.add_devices(devices)?;
362    /// ```
363    pub fn add_devices(&self, devices: Vec<Device>) -> Result<()> {
364        let mut store = self.store.write();
365        let mut ip_map = self.ip_to_speaker.write();
366
367        for device in devices {
368            let speaker_id = SpeakerId::new(&device.id);
369            let ip: IpAddr = device
370                .ip_address
371                .parse()
372                .map_err(|_| StateError::InvalidIpAddress(device.ip_address.clone()))?;
373
374            let friendly_name = if device.room_name.is_empty() || device.room_name == "Unknown" {
375                device.name.clone()
376            } else {
377                device.room_name.clone()
378            };
379
380            let info = SpeakerInfo {
381                id: speaker_id.clone(),
382                name: friendly_name,
383                room_name: device.room_name.clone(),
384                ip_address: ip,
385                port: device.port,
386                model_name: device.model_name.clone(),
387                software_version: "unknown".to_string(),
388                boot_seq: 0,
389                satellites: vec![],
390            };
391
392            // Update ip_to_speaker mapping
393            ip_map.insert(ip, speaker_id.clone());
394            tracing::debug!(
395                "Added speaker {} at IP {} to ip_to_speaker map",
396                speaker_id.as_str(),
397                ip
398            );
399
400            store.add_speaker(info);
401        }
402
403        // Also add devices to event manager if present
404        drop(store);
405        drop(ip_map);
406
407        if let Some(em) = self.event_manager.get() {
408            let devices_for_em: Vec<_> = self
409                .speaker_infos()
410                .iter()
411                .map(|info| sonos_discovery::Device {
412                    id: info.id.as_str().to_string(),
413                    name: info.name.clone(),
414                    room_name: info.room_name.clone(),
415                    ip_address: info.ip_address.to_string(),
416                    port: info.port,
417                    model_name: info.model_name.clone(),
418                })
419                .collect();
420
421            if let Err(e) = em.add_devices(devices_for_em) {
422                tracing::warn!("Failed to add devices to event manager: {}", e);
423            }
424        }
425
426        Ok(())
427    }
428
429    /// Get all speaker info
430    pub fn speaker_infos(&self) -> Vec<SpeakerInfo> {
431        self.store.read().speakers()
432    }
433
434    /// Get a specific speaker info by ID
435    pub fn speaker_info(&self, speaker_id: &SpeakerId) -> Option<SpeakerInfo> {
436        self.store.read().speaker(speaker_id).cloned()
437    }
438
439    /// Get speaker IP by ID
440    pub fn get_speaker_ip(&self, speaker_id: &SpeakerId) -> Option<IpAddr> {
441        self.store.read().speaker(speaker_id).map(|s| s.ip_address)
442    }
443
444    /// Get boot_seq for a speaker (used by GroupManagement AddMember)
445    pub fn get_boot_seq(&self, speaker_id: &SpeakerId) -> Option<u32> {
446        self.store.read().speaker(speaker_id).map(|s| s.boot_seq)
447    }
448
449    /// Create a blocking iterator over change events
450    ///
451    /// Only emits events for properties that have been watched.
452    ///
453    /// # Example
454    ///
455    /// ```rust,ignore
456    /// // First, watch some properties
457    /// speaker.volume.watch()?;
458    ///
459    /// // Then iterate over changes
460    /// for event in manager.iter() {
461    ///     println!("Changed: {} on {}", event.property_key, event.speaker_id);
462    /// }
463    /// ```
464    pub fn iter(&self) -> ChangeIterator {
465        ChangeIterator::new(Arc::clone(&self.event_rx))
466    }
467
468    /// Get current property value (sync, no subscription)
469    pub fn get_property<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
470        self.store.read().get::<P>(speaker_id)
471    }
472
473    /// Get current group property value (sync, no subscription)
474    pub fn get_group_property<P: Property>(&self, group_id: &GroupId) -> Option<P> {
475        self.store.read().get_group::<P>(group_id)
476    }
477
478    /// Set a property value
479    ///
480    /// Updates the property value in the store and emits a change event
481    /// if the property is being watched.
482    pub fn set_property<P: SonosProperty>(&self, speaker_id: &SpeakerId, value: P) {
483        let changed = {
484            let mut store = self.store.write();
485            store.set::<P>(speaker_id, value)
486        };
487
488        if changed {
489            self.maybe_emit_change(speaker_id, P::KEY, P::SERVICE);
490        }
491    }
492
493    /// Set a group property value
494    ///
495    /// Updates the group property value in the store and emits a change event
496    /// if the property is being watched (keyed on the coordinator's speaker ID).
497    /// Used by the SDK layer to store group-scoped values fetched via API calls.
498    pub fn set_group_property<P: SonosProperty>(&self, group_id: &GroupId, value: P) {
499        let coordinator_id = {
500            let mut store = self.store.write();
501            let changed = store.set_group::<P>(group_id, value);
502            if !changed {
503                return;
504            }
505            store.groups.get(group_id).map(|g| g.coordinator_id.clone())
506        };
507
508        if let Some(coordinator_id) = coordinator_id {
509            self.maybe_emit_change(&coordinator_id, P::KEY, P::SERVICE);
510        }
511    }
512
513    /// Register a property as watched (called by PropertyHandle::watch)
514    pub fn register_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
515        self.watched
516            .write()
517            .insert((speaker_id.clone(), property_key));
518    }
519
520    /// Unregister a property watch
521    pub fn unregister_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
522        self.watched
523            .write()
524            .remove(&(speaker_id.clone(), property_key));
525    }
526
527    /// Watch a property with automatic UPnP subscription (recommended API)
528    ///
529    /// This is the preferred method for watching properties as it:
530    /// 1. Registers the property for change notifications
531    /// 2. Subscribes to the UPnP service via the event manager
532    ///
533    /// Returns the current cached value if available.
534    pub fn watch_property_with_subscription<P: SonosProperty>(
535        &self,
536        speaker_id: &SpeakerId,
537    ) -> Result<Option<P>> {
538        // Register for change notifications
539        self.register_watch(speaker_id, P::KEY);
540
541        // Subscribe via event manager if available
542        if let Some(em) = self.event_manager.get() {
543            // Get speaker IP from store
544            if let Some(ip) = self.get_speaker_ip(speaker_id) {
545                if let Err(e) = em.ensure_service_subscribed(ip, P::SERVICE) {
546                    tracing::warn!(
547                        "Failed to subscribe to {:?} for {}: {}",
548                        P::SERVICE,
549                        speaker_id.as_str(),
550                        e
551                    );
552                }
553            }
554        }
555
556        Ok(self.get_property::<P>(speaker_id))
557    }
558
559    /// Unwatch a property and release UPnP subscription
560    pub fn unwatch_property_with_subscription<P: SonosProperty>(&self, speaker_id: &SpeakerId) {
561        // Unregister from change notifications
562        self.unregister_watch(speaker_id, P::KEY);
563
564        // Release subscription via event manager if available
565        if let Some(em) = self.event_manager.get() {
566            if let Some(ip) = self.get_speaker_ip(speaker_id) {
567                if let Err(e) = em.release_service_subscription(ip, P::SERVICE) {
568                    tracing::warn!(
569                        "Failed to unsubscribe from {:?} for {}: {}",
570                        P::SERVICE,
571                        speaker_id.as_str(),
572                        e
573                    );
574                }
575            }
576        }
577    }
578
579    /// Check if a property is being watched
580    pub fn is_watched(&self, speaker_id: &SpeakerId, property_key: &'static str) -> bool {
581        self.watched
582            .read()
583            .contains(&(speaker_id.clone(), property_key))
584    }
585
586    /// Emit a change event if the property is being watched
587    fn maybe_emit_change(
588        &self,
589        speaker_id: &SpeakerId,
590        property_key: &'static str,
591        service: Service,
592    ) {
593        let is_watched = self
594            .watched
595            .read()
596            .contains(&(speaker_id.clone(), property_key));
597
598        if is_watched {
599            let event = ChangeEvent::new(speaker_id.clone(), property_key, service);
600            let _ = self.event_tx.send(event);
601        }
602    }
603
604    /// Initialize from topology data
605    pub fn initialize(&self, topology: Topology) {
606        let mut store = self.store.write();
607        for speaker in &topology.speakers {
608            store.add_speaker(speaker.clone());
609        }
610        for group in &topology.groups {
611            store.add_group(group.clone());
612        }
613        store.set_system(topology);
614    }
615
616    /// Check if initialized with any speakers
617    pub fn is_initialized(&self) -> bool {
618        !self.store.read().is_empty()
619    }
620
621    /// Get number of speakers
622    pub fn speaker_count(&self) -> usize {
623        self.store.read().speaker_count()
624    }
625
626    /// Get number of groups
627    pub fn group_count(&self) -> usize {
628        self.store.read().group_count()
629    }
630
631    /// Get all current groups
632    ///
633    /// Returns all groups in the system. Every speaker is always in a group,
634    /// so a single speaker forms a group of one.
635    pub fn groups(&self) -> Vec<GroupInfo> {
636        self.store.read().groups.values().cloned().collect()
637    }
638
639    /// Get a specific group by ID
640    pub fn get_group(&self, group_id: &GroupId) -> Option<GroupInfo> {
641        self.store.read().groups.get(group_id).cloned()
642    }
643
644    /// Get the group a speaker belongs to
645    ///
646    /// Uses the speaker_to_group mapping for quick lookup.
647    pub fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<GroupInfo> {
648        let store = self.store.read();
649        let group_id = store.speaker_to_group.get(speaker_id)?;
650        store.groups.get(group_id).cloned()
651    }
652
653    /// Get access to the event manager (if configured)
654    ///
655    /// This allows PropertyHandle::watch() to trigger UPnP subscriptions
656    /// via the event manager's ensure_service_subscribed() method.
657    pub fn event_manager(&self) -> Option<&Arc<SonosEventManager>> {
658        self.event_manager.get()
659    }
660
661    /// Wire an event manager into this StateManager after construction.
662    ///
663    /// Spawns the event worker thread and registers all known devices.
664    /// Can only be called once — subsequent calls are no-ops.
665    pub fn set_event_manager(&self, em: Arc<SonosEventManager>) -> Result<()> {
666        tracing::debug!("StateManager::set_event_manager called");
667        if self.event_manager.set(Arc::clone(&em)).is_err() {
668            tracing::debug!("Event manager already set — no-op");
669            return Ok(()); // Already set — no-op
670        }
671
672        // Wire this StateManager as the WatchRegistry
673        em.set_watch_registry(Arc::new(StateWatchRegistry {
674            watched: Arc::clone(&self.watched),
675            ip_to_speaker: Arc::clone(&self.ip_to_speaker),
676            key_to_service: Arc::clone(&self.key_to_service),
677        }));
678
679        // Register all known devices with the event manager
680        let devices_for_em: Vec<_> = self
681            .speaker_infos()
682            .iter()
683            .map(|info| sonos_discovery::Device {
684                id: info.id.as_str().to_string(),
685                name: info.name.clone(),
686                room_name: info.room_name.clone(),
687                ip_address: info.ip_address.to_string(),
688                port: info.port,
689                model_name: info.model_name.clone(),
690            })
691            .collect();
692
693        if let Err(e) = em.add_devices(devices_for_em) {
694            tracing::warn!(
695                "Failed to add devices to event manager during lazy init: {}",
696                e
697            );
698        }
699
700        // Spawn event worker thread
701        let worker = spawn_state_event_worker(
702            em,
703            Arc::clone(&self.store),
704            Arc::clone(&self.watched),
705            self.event_tx.clone(),
706            Arc::clone(&self.ip_to_speaker),
707        );
708        info!("StateManager event worker started (lazy init)");
709
710        if let Ok(mut w) = self._worker.lock() {
711            *w = Some(worker);
712        }
713
714        Ok(())
715    }
716
717    /// Set the lazy event manager initialization closure.
718    ///
719    /// Called once by `SonosSystem::from_devices_inner()` after construction.
720    /// Subsequent calls are no-ops (OnceLock semantics).
721    pub fn set_event_init(&self, f: EventInitFn) {
722        let _ = self.event_init.set(f);
723    }
724
725    /// Get the event init closure (if set).
726    ///
727    /// Used by `PropertyHandle::watch()` and `GroupPropertyHandle::watch()`
728    /// to trigger lazy event manager creation on first use.
729    pub fn event_init(&self) -> Option<&EventInitFn> {
730        self.event_init.get()
731    }
732}
733
734impl Clone for StateManager {
735    fn clone(&self) -> Self {
736        let event_manager = OnceLock::new();
737        if let Some(em) = self.event_manager.get() {
738            let _ = event_manager.set(Arc::clone(em));
739        }
740        let event_init = OnceLock::new();
741        if let Some(f) = self.event_init.get() {
742            let _ = event_init.set(Arc::clone(f));
743        }
744        Self {
745            store: Arc::clone(&self.store),
746            watched: Arc::clone(&self.watched),
747            ip_to_speaker: Arc::clone(&self.ip_to_speaker),
748            event_manager,
749            event_tx: self.event_tx.clone(),
750            event_rx: Arc::clone(&self.event_rx),
751            _worker: Mutex::new(None),
752            cleanup_timeout: self.cleanup_timeout,
753            key_to_service: Arc::clone(&self.key_to_service),
754            event_init,
755        }
756    }
757}
758
759// ============================================================================
760// StateManagerBuilder
761// ============================================================================
762
763/// Builder for StateManager configuration
764pub struct StateManagerBuilder {
765    cleanup_timeout: Duration,
766    event_manager: Option<Arc<SonosEventManager>>,
767}
768
769impl Default for StateManagerBuilder {
770    fn default() -> Self {
771        Self {
772            cleanup_timeout: Duration::from_secs(5),
773            event_manager: None,
774        }
775    }
776}
777
778impl StateManagerBuilder {
779    /// Set the cleanup timeout for subscriptions
780    pub fn cleanup_timeout(mut self, timeout: Duration) -> Self {
781        self.cleanup_timeout = timeout;
782        self
783    }
784
785    /// Set the event manager for live event processing
786    ///
787    /// When an event manager is provided, the StateManager will:
788    /// - Spawn a background worker to process events
789    /// - Automatically subscribe/unsubscribe via `watch()`/`unwatch()` on properties
790    /// - Update state from incoming events
791    pub fn with_event_manager(mut self, em: Arc<SonosEventManager>) -> Self {
792        self.event_manager = Some(em);
793        self
794    }
795
796    /// Build the StateManager
797    pub fn build(self) -> Result<StateManager> {
798        let (event_tx, event_rx) = mpsc::channel();
799
800        let store = Arc::new(RwLock::new(StateStore::new()));
801        let watched = Arc::new(RwLock::new(HashSet::new()));
802        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
803        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
804
805        let event_manager_lock = OnceLock::new();
806        let mut worker = None;
807
808        // If event_manager provided at build time, wire it up eagerly
809        if let Some(em) = self.event_manager {
810            let _ = event_manager_lock.set(Arc::clone(&em));
811
812            // Wire WatchRegistry
813            em.set_watch_registry(Arc::new(StateWatchRegistry {
814                watched: Arc::clone(&watched),
815                ip_to_speaker: Arc::clone(&ip_to_speaker),
816                key_to_service: Arc::clone(&key_to_service),
817            }));
818
819            let worker_handle = spawn_state_event_worker(
820                em,
821                Arc::clone(&store),
822                Arc::clone(&watched),
823                event_tx.clone(),
824                Arc::clone(&ip_to_speaker),
825            );
826            info!("StateManager event worker started");
827            worker = Some(worker_handle);
828        }
829
830        let manager = StateManager {
831            store,
832            watched,
833            ip_to_speaker,
834            event_manager: event_manager_lock,
835            event_tx,
836            event_rx: Arc::new(Mutex::new(event_rx)),
837            _worker: Mutex::new(worker),
838            cleanup_timeout: self.cleanup_timeout,
839            key_to_service,
840            event_init: OnceLock::new(),
841        };
842
843        info!("StateManager created (sync-first mode)");
844        Ok(manager)
845    }
846}
847
848#[cfg(test)]
849mod tests {
850    use super::*;
851    use crate::property::{GroupVolume, Volume};
852    use sonos_api::Service;
853
854    #[test]
855    fn test_state_manager_creation() {
856        let manager = StateManager::new().unwrap();
857        assert!(!manager.is_initialized());
858        assert_eq!(manager.speaker_count(), 0);
859    }
860
861    #[test]
862    fn test_add_devices() {
863        let manager = StateManager::new().unwrap();
864
865        let devices = vec![Device {
866            id: "RINCON_123".to_string(),
867            name: "Living Room".to_string(),
868            room_name: "Living Room".to_string(),
869            ip_address: "192.168.1.100".to_string(),
870            port: 1400,
871            model_name: "Sonos One".to_string(),
872        }];
873
874        manager.add_devices(devices).unwrap();
875        assert_eq!(manager.speaker_count(), 1);
876    }
877
878    #[test]
879    fn test_property_storage() {
880        let manager = StateManager::new().unwrap();
881
882        let devices = vec![Device {
883            id: "RINCON_123".to_string(),
884            name: "Living Room".to_string(),
885            room_name: "Living Room".to_string(),
886            ip_address: "192.168.1.100".to_string(),
887            port: 1400,
888            model_name: "Sonos One".to_string(),
889        }];
890        manager.add_devices(devices).unwrap();
891
892        let speaker_id = SpeakerId::new("RINCON_123");
893
894        // Initially None
895        assert!(manager.get_property::<Volume>(&speaker_id).is_none());
896
897        // Set value
898        manager.set_property(&speaker_id, Volume::new(50));
899        assert_eq!(
900            manager.get_property::<Volume>(&speaker_id),
901            Some(Volume::new(50))
902        );
903    }
904
905    #[test]
906    fn test_watch_registration() {
907        let manager = StateManager::new().unwrap();
908
909        let devices = vec![Device {
910            id: "RINCON_123".to_string(),
911            name: "Living Room".to_string(),
912            room_name: "Living Room".to_string(),
913            ip_address: "192.168.1.100".to_string(),
914            port: 1400,
915            model_name: "Sonos One".to_string(),
916        }];
917        manager.add_devices(devices).unwrap();
918
919        let speaker_id = SpeakerId::new("RINCON_123");
920
921        // Not watched initially
922        assert!(!manager.is_watched(&speaker_id, "volume"));
923
924        // Register watch
925        manager.register_watch(&speaker_id, "volume");
926        assert!(manager.is_watched(&speaker_id, "volume"));
927
928        // Unregister watch
929        manager.unregister_watch(&speaker_id, "volume");
930        assert!(!manager.is_watched(&speaker_id, "volume"));
931    }
932
933    #[test]
934    fn test_change_event_emission() {
935        let manager = StateManager::new().unwrap();
936
937        let devices = vec![Device {
938            id: "RINCON_123".to_string(),
939            name: "Living Room".to_string(),
940            room_name: "Living Room".to_string(),
941            ip_address: "192.168.1.100".to_string(),
942            port: 1400,
943            model_name: "Sonos One".to_string(),
944        }];
945        manager.add_devices(devices).unwrap();
946
947        let speaker_id = SpeakerId::new("RINCON_123");
948
949        // Register watch
950        manager.register_watch(&speaker_id, "volume");
951
952        // Set property (should emit event)
953        manager.set_property(&speaker_id, Volume::new(75));
954
955        // Get event via iter
956        let iter = manager.iter();
957        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
958        assert!(event.is_some());
959
960        let event = event.unwrap();
961        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
962        assert_eq!(event.property_key, "volume");
963    }
964
965    #[test]
966    fn test_set_group_property_emits_change_event() {
967        let manager = StateManager::new().unwrap();
968
969        let devices = vec![Device {
970            id: "RINCON_123".to_string(),
971            name: "Living Room".to_string(),
972            room_name: "Living Room".to_string(),
973            ip_address: "192.168.1.100".to_string(),
974            port: 1400,
975            model_name: "Sonos One".to_string(),
976        }];
977        manager.add_devices(devices).unwrap();
978
979        let speaker_id = SpeakerId::new("RINCON_123");
980        let group_id = GroupId::new("RINCON_123:1");
981
982        // Add group so coordinator lookup works
983        {
984            let mut store = manager.store.write();
985            store.add_group(GroupInfo::new(
986                group_id.clone(),
987                speaker_id.clone(),
988                vec![speaker_id.clone()],
989            ));
990        }
991
992        // Register watch on coordinator for group_volume
993        manager.register_watch(&speaker_id, "group_volume");
994
995        // Set group property (should emit event via coordinator)
996        manager.set_group_property(&group_id, GroupVolume::new(80));
997
998        // Verify event was emitted
999        let iter = manager.iter();
1000        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1001        assert!(event.is_some());
1002
1003        let event = event.unwrap();
1004        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
1005        assert_eq!(event.property_key, "group_volume");
1006        assert_eq!(event.service, Service::GroupRenderingControl);
1007    }
1008
1009    #[test]
1010    fn test_set_group_property_no_event_when_unwatched() {
1011        let manager = StateManager::new().unwrap();
1012
1013        let devices = vec![Device {
1014            id: "RINCON_123".to_string(),
1015            name: "Living Room".to_string(),
1016            room_name: "Living Room".to_string(),
1017            ip_address: "192.168.1.100".to_string(),
1018            port: 1400,
1019            model_name: "Sonos One".to_string(),
1020        }];
1021        manager.add_devices(devices).unwrap();
1022
1023        let speaker_id = SpeakerId::new("RINCON_123");
1024        let group_id = GroupId::new("RINCON_123:1");
1025
1026        {
1027            let mut store = manager.store.write();
1028            store.add_group(GroupInfo::new(
1029                group_id.clone(),
1030                speaker_id.clone(),
1031                vec![speaker_id.clone()],
1032            ));
1033        }
1034
1035        // Don't register any watch
1036        manager.set_group_property(&group_id, GroupVolume::new(50));
1037
1038        let iter = manager.iter();
1039        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1040        assert!(event.is_none());
1041    }
1042
1043    // ========================================================================
1044    // StateStore Group Operations Tests
1045    // ========================================================================
1046
1047    #[test]
1048    fn test_add_group_updates_speaker_to_group() {
1049        let mut store = StateStore::new();
1050
1051        let speaker1 = SpeakerId::new("RINCON_111");
1052        let speaker2 = SpeakerId::new("RINCON_222");
1053        let group_id = GroupId::new("RINCON_111:1");
1054
1055        let group = GroupInfo::new(
1056            group_id.clone(),
1057            speaker1.clone(),
1058            vec![speaker1.clone(), speaker2.clone()],
1059        );
1060
1061        store.add_group(group);
1062
1063        // Verify speaker_to_group mapping is updated for all members
1064        assert_eq!(store.speaker_to_group.get(&speaker1), Some(&group_id));
1065        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group_id));
1066    }
1067
1068    #[test]
1069    fn test_add_group_single_speaker() {
1070        let mut store = StateStore::new();
1071
1072        let speaker = SpeakerId::new("RINCON_333");
1073        let group_id = GroupId::new("RINCON_333:1");
1074
1075        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1076
1077        store.add_group(group.clone());
1078
1079        // Verify speaker_to_group mapping
1080        assert_eq!(store.speaker_to_group.get(&speaker), Some(&group_id));
1081
1082        // Verify group is stored
1083        assert_eq!(store.groups.get(&group_id), Some(&group));
1084    }
1085
1086    #[test]
1087    fn test_get_group_for_speaker_returns_correct_group() {
1088        let mut store = StateStore::new();
1089
1090        let speaker1 = SpeakerId::new("RINCON_111");
1091        let speaker2 = SpeakerId::new("RINCON_222");
1092        let speaker3 = SpeakerId::new("RINCON_333");
1093        let group1_id = GroupId::new("RINCON_111:1");
1094        let group2_id = GroupId::new("RINCON_333:1");
1095
1096        // Group 1: speaker1 (coordinator) + speaker2
1097        let group1 = GroupInfo::new(
1098            group1_id.clone(),
1099            speaker1.clone(),
1100            vec![speaker1.clone(), speaker2.clone()],
1101        );
1102
1103        // Group 2: speaker3 alone
1104        let group2 = GroupInfo::new(group2_id.clone(), speaker3.clone(), vec![speaker3.clone()]);
1105
1106        store.add_group(group1.clone());
1107        store.add_group(group2.clone());
1108
1109        // Verify get_group_for_speaker returns correct groups
1110        assert_eq!(store.get_group_for_speaker(&speaker1), Some(&group1));
1111        assert_eq!(store.get_group_for_speaker(&speaker2), Some(&group1));
1112        assert_eq!(store.get_group_for_speaker(&speaker3), Some(&group2));
1113    }
1114
1115    #[test]
1116    fn test_get_group_for_speaker_returns_none_for_unknown() {
1117        let store = StateStore::new();
1118
1119        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1120
1121        assert!(store.get_group_for_speaker(&unknown_speaker).is_none());
1122    }
1123
1124    #[test]
1125    fn test_clear_groups_removes_all_group_data() {
1126        let mut store = StateStore::new();
1127
1128        let speaker1 = SpeakerId::new("RINCON_111");
1129        let speaker2 = SpeakerId::new("RINCON_222");
1130        let group_id = GroupId::new("RINCON_111:1");
1131
1132        let group = GroupInfo::new(
1133            group_id.clone(),
1134            speaker1.clone(),
1135            vec![speaker1.clone(), speaker2.clone()],
1136        );
1137
1138        store.add_group(group);
1139
1140        // Verify data exists
1141        assert!(!store.groups.is_empty());
1142        assert!(!store.speaker_to_group.is_empty());
1143
1144        // Clear groups
1145        store.clear_groups();
1146
1147        // Verify all group data is cleared
1148        assert!(store.groups.is_empty());
1149        assert!(store.group_props.is_empty());
1150        assert!(store.speaker_to_group.is_empty());
1151    }
1152
1153    #[test]
1154    fn test_clear_groups_then_add_new_groups() {
1155        let mut store = StateStore::new();
1156
1157        // Add initial group
1158        let speaker1 = SpeakerId::new("RINCON_111");
1159        let group1_id = GroupId::new("RINCON_111:1");
1160        let group1 = GroupInfo::new(group1_id.clone(), speaker1.clone(), vec![speaker1.clone()]);
1161        store.add_group(group1);
1162
1163        // Clear and add new group
1164        store.clear_groups();
1165
1166        let speaker2 = SpeakerId::new("RINCON_222");
1167        let group2_id = GroupId::new("RINCON_222:1");
1168        let group2 = GroupInfo::new(group2_id.clone(), speaker2.clone(), vec![speaker2.clone()]);
1169        store.add_group(group2.clone());
1170
1171        // Verify old group is gone, new group exists
1172        assert!(!store.groups.contains_key(&group1_id));
1173        assert_eq!(store.groups.get(&group2_id), Some(&group2));
1174
1175        // Verify speaker_to_group is updated correctly
1176        assert!(!store.speaker_to_group.contains_key(&speaker1));
1177        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group2_id));
1178    }
1179
1180    // ========================================================================
1181    // StateManager Group Methods Tests
1182    // ========================================================================
1183
1184    #[test]
1185    fn test_state_manager_groups_returns_all_groups() {
1186        let manager = StateManager::new().unwrap();
1187
1188        // Add devices
1189        let devices = vec![
1190            Device {
1191                id: "RINCON_111".to_string(),
1192                name: "Living Room".to_string(),
1193                room_name: "Living Room".to_string(),
1194                ip_address: "192.168.1.100".to_string(),
1195                port: 1400,
1196                model_name: "Sonos One".to_string(),
1197            },
1198            Device {
1199                id: "RINCON_222".to_string(),
1200                name: "Kitchen".to_string(),
1201                room_name: "Kitchen".to_string(),
1202                ip_address: "192.168.1.101".to_string(),
1203                port: 1400,
1204                model_name: "Sonos One".to_string(),
1205            },
1206        ];
1207        manager.add_devices(devices).unwrap();
1208
1209        // Create groups via initialize
1210        let speaker1 = SpeakerId::new("RINCON_111");
1211        let speaker2 = SpeakerId::new("RINCON_222");
1212        let group1 = GroupInfo::new(
1213            GroupId::new("RINCON_111:1"),
1214            speaker1.clone(),
1215            vec![speaker1.clone()],
1216        );
1217        let group2 = GroupInfo::new(
1218            GroupId::new("RINCON_222:1"),
1219            speaker2.clone(),
1220            vec![speaker2.clone()],
1221        );
1222
1223        let topology = Topology::new(
1224            manager.speaker_infos(),
1225            vec![group1.clone(), group2.clone()],
1226        );
1227        manager.initialize(topology);
1228
1229        // Verify groups() returns all groups
1230        let groups = manager.groups();
1231        assert_eq!(groups.len(), 2);
1232
1233        // Verify both groups are present (order may vary)
1234        let group_ids: Vec<_> = groups.iter().map(|g| g.id.clone()).collect();
1235        assert!(group_ids.contains(&GroupId::new("RINCON_111:1")));
1236        assert!(group_ids.contains(&GroupId::new("RINCON_222:1")));
1237    }
1238
1239    #[test]
1240    fn test_state_manager_groups_returns_empty_when_no_groups() {
1241        let manager = StateManager::new().unwrap();
1242
1243        // No groups added
1244        let groups = manager.groups();
1245        assert!(groups.is_empty());
1246    }
1247
1248    #[test]
1249    fn test_state_manager_get_group_returns_correct_group() {
1250        let manager = StateManager::new().unwrap();
1251
1252        // Add device
1253        let devices = vec![Device {
1254            id: "RINCON_111".to_string(),
1255            name: "Living Room".to_string(),
1256            room_name: "Living Room".to_string(),
1257            ip_address: "192.168.1.100".to_string(),
1258            port: 1400,
1259            model_name: "Sonos One".to_string(),
1260        }];
1261        manager.add_devices(devices).unwrap();
1262
1263        // Create group via initialize
1264        let speaker = SpeakerId::new("RINCON_111");
1265        let group_id = GroupId::new("RINCON_111:1");
1266        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1267
1268        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1269        manager.initialize(topology);
1270
1271        // Verify get_group returns the correct group
1272        let found = manager.get_group(&group_id);
1273        assert!(found.is_some());
1274        assert_eq!(found.unwrap(), group);
1275    }
1276
1277    #[test]
1278    fn test_state_manager_get_group_returns_none_for_unknown() {
1279        let manager = StateManager::new().unwrap();
1280
1281        // No groups added
1282        let unknown_id = GroupId::new("RINCON_UNKNOWN:1");
1283        let found = manager.get_group(&unknown_id);
1284        assert!(found.is_none());
1285    }
1286
1287    #[test]
1288    fn test_state_manager_get_group_for_speaker_returns_correct_group() {
1289        let manager = StateManager::new().unwrap();
1290
1291        // Add devices
1292        let devices = vec![
1293            Device {
1294                id: "RINCON_111".to_string(),
1295                name: "Living Room".to_string(),
1296                room_name: "Living Room".to_string(),
1297                ip_address: "192.168.1.100".to_string(),
1298                port: 1400,
1299                model_name: "Sonos One".to_string(),
1300            },
1301            Device {
1302                id: "RINCON_222".to_string(),
1303                name: "Kitchen".to_string(),
1304                room_name: "Kitchen".to_string(),
1305                ip_address: "192.168.1.101".to_string(),
1306                port: 1400,
1307                model_name: "Sonos One".to_string(),
1308            },
1309        ];
1310        manager.add_devices(devices).unwrap();
1311
1312        // Create a group with both speakers
1313        let speaker1 = SpeakerId::new("RINCON_111");
1314        let speaker2 = SpeakerId::new("RINCON_222");
1315        let group_id = GroupId::new("RINCON_111:1");
1316        let group = GroupInfo::new(
1317            group_id.clone(),
1318            speaker1.clone(),
1319            vec![speaker1.clone(), speaker2.clone()],
1320        );
1321
1322        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1323        manager.initialize(topology);
1324
1325        // Verify get_group_for_speaker returns the correct group for both speakers
1326        let found1 = manager.get_group_for_speaker(&speaker1);
1327        assert!(found1.is_some());
1328        assert_eq!(found1.unwrap(), group);
1329
1330        let found2 = manager.get_group_for_speaker(&speaker2);
1331        assert!(found2.is_some());
1332        assert_eq!(found2.unwrap(), group);
1333    }
1334
1335    #[test]
1336    fn test_state_manager_get_group_for_speaker_returns_none_for_unknown() {
1337        let manager = StateManager::new().unwrap();
1338
1339        // No groups added
1340        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1341        let found = manager.get_group_for_speaker(&unknown_speaker);
1342        assert!(found.is_none());
1343    }
1344
1345    #[test]
1346    fn test_state_manager_group_methods_consistency() {
1347        let manager = StateManager::new().unwrap();
1348
1349        // Add device
1350        let devices = vec![Device {
1351            id: "RINCON_111".to_string(),
1352            name: "Living Room".to_string(),
1353            room_name: "Living Room".to_string(),
1354            ip_address: "192.168.1.100".to_string(),
1355            port: 1400,
1356            model_name: "Sonos One".to_string(),
1357        }];
1358        manager.add_devices(devices).unwrap();
1359
1360        // Create group via initialize
1361        let speaker = SpeakerId::new("RINCON_111");
1362        let group_id = GroupId::new("RINCON_111:1");
1363        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1364
1365        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1366        manager.initialize(topology);
1367
1368        // Verify all three methods return consistent data
1369        let groups = manager.groups();
1370        assert_eq!(groups.len(), 1);
1371        assert_eq!(groups[0], group);
1372
1373        let by_id = manager.get_group(&group_id);
1374        assert_eq!(by_id, Some(group.clone()));
1375
1376        let by_speaker = manager.get_group_for_speaker(&speaker);
1377        assert_eq!(by_speaker, Some(group.clone()));
1378
1379        // All should return the same group
1380        assert_eq!(groups[0], by_id.unwrap());
1381        assert_eq!(groups[0], by_speaker.unwrap());
1382    }
1383
1384    // ========================================================================
1385    // boot_seq Tests
1386    // ========================================================================
1387
1388    #[test]
1389    fn test_get_boot_seq_returns_none_for_unknown_speaker() {
1390        let manager = StateManager::new().unwrap();
1391        let unknown = SpeakerId::new("RINCON_UNKNOWN");
1392        assert!(manager.get_boot_seq(&unknown).is_none());
1393    }
1394
1395    #[test]
1396    fn test_boot_seq_defaults_to_zero_for_new_speaker() {
1397        let manager = StateManager::new().unwrap();
1398
1399        let devices = vec![Device {
1400            id: "RINCON_123".to_string(),
1401            name: "Living Room".to_string(),
1402            room_name: "Living Room".to_string(),
1403            ip_address: "192.168.1.100".to_string(),
1404            port: 1400,
1405            model_name: "Sonos One".to_string(),
1406        }];
1407        manager.add_devices(devices).unwrap();
1408
1409        let speaker_id = SpeakerId::new("RINCON_123");
1410
1411        // Before any topology event, boot_seq should be 0
1412        assert_eq!(manager.get_boot_seq(&speaker_id), Some(0));
1413    }
1414
1415    // ========================================================================
1416    // StateWatchRegistry Tests
1417    // ========================================================================
1418
1419    #[test]
1420    fn test_state_watch_registry_register_and_unregister() {
1421        let watched = Arc::new(RwLock::new(HashSet::new()));
1422        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1423        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1424
1425        let ip: IpAddr = "192.168.1.100".parse().unwrap();
1426        let speaker_id = SpeakerId::new("RINCON_123");
1427        ip_to_speaker.write().insert(ip, speaker_id.clone());
1428
1429        let registry = StateWatchRegistry {
1430            watched: Arc::clone(&watched),
1431            ip_to_speaker: Arc::clone(&ip_to_speaker),
1432            key_to_service: Arc::clone(&key_to_service),
1433        };
1434
1435        // Register watches on two services
1436        registry.register_watch(&speaker_id, "volume", Service::RenderingControl);
1437        registry.register_watch(&speaker_id, "mute", Service::RenderingControl);
1438        registry.register_watch(&speaker_id, "playback_state", Service::AVTransport);
1439
1440        assert_eq!(watched.read().len(), 3);
1441
1442        // Unregister RenderingControl — should remove volume + mute, keep playback_state
1443        registry.unregister_watches_for_service(ip, Service::RenderingControl);
1444
1445        let w = watched.read();
1446        assert_eq!(w.len(), 1);
1447        assert!(w.contains(&(speaker_id.clone(), "playback_state")));
1448        assert!(!w.contains(&(speaker_id.clone(), "volume")));
1449        assert!(!w.contains(&(speaker_id.clone(), "mute")));
1450    }
1451
1452    #[test]
1453    fn test_state_watch_registry_unknown_ip_is_noop() {
1454        let watched = Arc::new(RwLock::new(HashSet::new()));
1455        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1456        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1457
1458        let speaker_id = SpeakerId::new("RINCON_123");
1459
1460        let registry = StateWatchRegistry {
1461            watched: Arc::clone(&watched),
1462            ip_to_speaker,
1463            key_to_service: Arc::clone(&key_to_service),
1464        };
1465
1466        // Register a watch (simulating direct add to shared set)
1467        watched.write().insert((speaker_id.clone(), "volume"));
1468        key_to_service
1469            .write()
1470            .insert("volume", Service::RenderingControl);
1471
1472        // Unregister for an unknown IP — should be a no-op
1473        let unknown_ip: IpAddr = "10.0.0.1".parse().unwrap();
1474        registry.unregister_watches_for_service(unknown_ip, Service::RenderingControl);
1475
1476        // Watch should still be there
1477        assert_eq!(watched.read().len(), 1);
1478    }
1479
1480    #[test]
1481    fn test_state_watch_registry_only_removes_matching_speaker() {
1482        let watched = Arc::new(RwLock::new(HashSet::new()));
1483        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1484        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1485
1486        let ip1: IpAddr = "192.168.1.100".parse().unwrap();
1487        let ip2: IpAddr = "192.168.1.101".parse().unwrap();
1488        let speaker1 = SpeakerId::new("RINCON_111");
1489        let speaker2 = SpeakerId::new("RINCON_222");
1490
1491        ip_to_speaker.write().insert(ip1, speaker1.clone());
1492        ip_to_speaker.write().insert(ip2, speaker2.clone());
1493
1494        let registry = StateWatchRegistry {
1495            watched: Arc::clone(&watched),
1496            ip_to_speaker,
1497            key_to_service: Arc::clone(&key_to_service),
1498        };
1499
1500        // Both speakers watch volume
1501        registry.register_watch(&speaker1, "volume", Service::RenderingControl);
1502        registry.register_watch(&speaker2, "volume", Service::RenderingControl);
1503        assert_eq!(watched.read().len(), 2);
1504
1505        // Unregister only speaker1's IP
1506        registry.unregister_watches_for_service(ip1, Service::RenderingControl);
1507
1508        let w = watched.read();
1509        assert_eq!(w.len(), 1);
1510        assert!(w.contains(&(speaker2.clone(), "volume")));
1511        assert!(!w.contains(&(speaker1.clone(), "volume")));
1512    }
1513}