Skip to main content

sonos_state/
state.rs

1//! Sync-first State Management for Sonos devices
2//!
3//! Provides a synchronous API for managing Sonos device state with
4//! background event processing.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use sonos_state::{StateManager, Volume};
10//! use sonos_discovery;
11//!
12//! // Create state manager (sync)
13//! let manager = StateManager::new()?;
14//! let devices = sonos_discovery::get();
15//! manager.add_devices(devices)?;
16//!
17//! // Get speakers
18//! for info in manager.speaker_infos() {
19//!     println!("{}: {}", info.name, info.ip_address);
20//! }
21//!
22//! // Blocking iteration over changes
23//! for event in manager.iter() {
24//!     println!("Change: {:?}", event);
25//! }
26//! ```
27
28use std::any::{Any, TypeId};
29use std::collections::{HashMap, HashSet};
30use std::net::IpAddr;
31use std::sync::{mpsc, Arc, Mutex, OnceLock};
32use std::thread::JoinHandle;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36
37use sonos_api::{Service, ServiceScope};
38use sonos_discovery::Device;
39use sonos_event_manager::{SonosEventManager, WatchRegistry};
40use tracing::info;
41
42use crate::event_worker::spawn_state_event_worker;
43use crate::iter::ChangeIterator;
44use crate::model::{GroupId, SpeakerId, SpeakerInfo};
45use crate::property::{GroupInfo, Property, Scope, SonosProperty, Topology};
46use crate::{Result, StateError};
47
48/// Closure type for lazy event manager initialization.
49///
50/// Stored on `StateManager` as the single source of truth. Called by
51/// `PropertyHandle::watch()` to trigger event manager creation on first use.
52/// Uses `Box<dyn Error>` to avoid circular dependency on `sonos-sdk` error types.
53pub type EventInitFn = Arc<
54    dyn Fn() -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
55>;
56
57// ============================================================================
58// ChangeEvent - for iter()
59// ============================================================================
60
61/// A change event emitted when a watched property changes
62#[derive(Debug, Clone)]
63pub struct ChangeEvent {
64    /// Speaker or entity that changed
65    pub speaker_id: SpeakerId,
66    /// Property key that changed
67    pub property_key: &'static str,
68    /// Service the property belongs to
69    pub service: Service,
70    /// When the change occurred
71    pub timestamp: Instant,
72}
73
74impl ChangeEvent {
75    pub fn new(speaker_id: SpeakerId, property_key: &'static str, service: Service) -> Self {
76        Self {
77            speaker_id,
78            property_key,
79            service,
80            timestamp: Instant::now(),
81        }
82    }
83}
84
85// ============================================================================
86// Internal StateStore
87// ============================================================================
88
89/// Internal state storage
90pub struct StateStore {
91    /// Speaker metadata
92    pub(crate) speakers: HashMap<SpeakerId, SpeakerInfo>,
93    /// IP to speaker ID mapping
94    pub(crate) ip_to_speaker: HashMap<IpAddr, SpeakerId>,
95    /// Property values: (speaker_id, property_key) -> type-erased value
96    pub(crate) speaker_props: HashMap<SpeakerId, PropertyBag>,
97    /// Group metadata
98    pub(crate) groups: HashMap<GroupId, GroupInfo>,
99    /// Group properties
100    pub(crate) group_props: HashMap<GroupId, PropertyBag>,
101    /// System properties
102    pub(crate) system_props: PropertyBag,
103    /// Speaker to group mapping for quick lookups
104    pub(crate) speaker_to_group: HashMap<SpeakerId, GroupId>,
105    /// Satellite speaker IDs (Invisible="1") from topology
106    pub(crate) satellite_ids: HashSet<SpeakerId>,
107}
108
109impl StateStore {
110    pub(crate) fn new() -> Self {
111        Self {
112            speakers: HashMap::new(),
113            ip_to_speaker: HashMap::new(),
114            speaker_props: HashMap::new(),
115            groups: HashMap::new(),
116            group_props: HashMap::new(),
117            system_props: PropertyBag::new(),
118            speaker_to_group: HashMap::new(),
119            satellite_ids: HashSet::new(),
120        }
121    }
122
123    pub(crate) fn add_speaker(&mut self, speaker: SpeakerInfo) {
124        let id = speaker.id.clone();
125        let ip = speaker.ip_address;
126        self.ip_to_speaker.insert(ip, id.clone());
127        self.speakers.insert(id.clone(), speaker);
128        self.speaker_props
129            .entry(id)
130            .or_insert_with(PropertyBag::new);
131    }
132
133    fn speaker(&self, id: &SpeakerId) -> Option<&SpeakerInfo> {
134        self.speakers.get(id)
135    }
136
137    fn speakers(&self) -> Vec<SpeakerInfo> {
138        self.speakers.values().cloned().collect()
139    }
140
141    pub(crate) fn add_group(&mut self, group: GroupInfo) {
142        let id = group.id.clone();
143        // Update speaker_to_group mapping for all members
144        for member_id in &group.member_ids {
145            self.speaker_to_group.insert(member_id.clone(), id.clone());
146        }
147        self.groups.insert(id.clone(), group);
148        self.group_props.entry(id).or_insert_with(PropertyBag::new);
149    }
150
151    /// Get the group a speaker belongs to
152    #[allow(dead_code)]
153    pub(crate) fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<&GroupInfo> {
154        let group_id = self.speaker_to_group.get(speaker_id)?;
155        self.groups.get(group_id)
156    }
157
158    /// Clear all groups and speaker_to_group mappings
159    ///
160    /// Used when processing topology updates to replace all group data
161    pub(crate) fn clear_groups(&mut self) {
162        self.groups.clear();
163        self.group_props.clear();
164        self.speaker_to_group.clear();
165    }
166
167    /// Resolve the coordinator speaker for the given speaker.
168    ///
169    /// Looks up `speaker_to_group → groups → coordinator_id`.
170    /// Returns the speaker's own ID if no group info exists (safe default).
171    pub(crate) fn resolve_coordinator(&self, speaker_id: &SpeakerId) -> SpeakerId {
172        self.speaker_to_group
173            .get(speaker_id)
174            .and_then(|gid| self.groups.get(gid))
175            .map(|group| group.coordinator_id.clone())
176            .unwrap_or_else(|| speaker_id.clone())
177    }
178
179    /// Get a property value with coordinator resolution for PerCoordinator services.
180    ///
181    /// If the property's service is PerCoordinator AND the property scope is Speaker,
182    /// reads from the coordinator's speaker_props. Otherwise reads from the
183    /// speaker's own props.
184    ///
185    /// Group-scoped properties (e.g. GroupVolume) come from a PerCoordinator service
186    /// but are stored in `group_props`, not `speaker_props`, so they are not resolved
187    /// through the coordinator's speaker_props.
188    pub(crate) fn get_resolved<P: SonosProperty>(&self, speaker_id: &SpeakerId) -> Option<P> {
189        if P::SERVICE.scope() == ServiceScope::PerCoordinator && P::SCOPE == Scope::Speaker {
190            let coordinator_id = self.resolve_coordinator(speaker_id);
191            self.speaker_props.get(&coordinator_id)?.get::<P>()
192        } else {
193            self.speaker_props.get(speaker_id)?.get::<P>()
194        }
195    }
196
197    #[cfg_attr(not(test), allow(dead_code))]
198    pub(crate) fn get<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
199        self.speaker_props.get(speaker_id)?.get::<P>()
200    }
201
202    pub(crate) fn set<P: Property>(&mut self, speaker_id: &SpeakerId, value: P) -> bool {
203        let bag = self
204            .speaker_props
205            .entry(speaker_id.clone())
206            .or_insert_with(PropertyBag::new);
207        bag.set(value)
208    }
209
210    pub(crate) fn get_group<P: Property>(&self, group_id: &GroupId) -> Option<P> {
211        self.group_props.get(group_id)?.get::<P>()
212    }
213
214    pub(crate) fn set_group<P: Property>(&mut self, group_id: &GroupId, value: P) -> bool {
215        let bag = self
216            .group_props
217            .entry(group_id.clone())
218            .or_insert_with(PropertyBag::new);
219        bag.set(value)
220    }
221
222    fn set_system<P: Property>(&mut self, value: P) -> bool {
223        self.system_props.set(value)
224    }
225
226    /// Update a speaker's IP address in the store. Returns the old IP if changed.
227    pub(crate) fn update_speaker_ip_address(
228        &mut self,
229        speaker_id: &SpeakerId,
230        new_ip: IpAddr,
231    ) -> Option<IpAddr> {
232        if let Some(info) = self.speakers.get_mut(speaker_id) {
233            let old_ip = info.ip_address;
234            if old_ip != new_ip {
235                info.ip_address = new_ip;
236                return Some(old_ip);
237            }
238        }
239        None
240    }
241
242    fn is_empty(&self) -> bool {
243        self.speakers.is_empty()
244    }
245
246    fn speaker_count(&self) -> usize {
247        self.speakers.len()
248    }
249
250    fn group_count(&self) -> usize {
251        self.groups.len()
252    }
253}
254
255// ============================================================================
256// PropertyBag - type-erased property storage
257// ============================================================================
258
259pub(crate) struct PropertyBag {
260    /// Map<TypeId, Box<dyn Any>> where Any is the property value
261    values: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
262}
263
264impl PropertyBag {
265    pub(crate) fn new() -> Self {
266        Self {
267            values: HashMap::new(),
268        }
269    }
270
271    fn get<P: Property>(&self) -> Option<P> {
272        let type_id = TypeId::of::<P>();
273        self.values
274            .get(&type_id)
275            .and_then(|boxed| boxed.downcast_ref::<P>())
276            .cloned()
277    }
278
279    fn set<P: Property>(&mut self, value: P) -> bool {
280        let type_id = TypeId::of::<P>();
281        let current = self
282            .values
283            .get(&type_id)
284            .and_then(|boxed| boxed.downcast_ref::<P>());
285
286        if current != Some(&value) {
287            self.values.insert(type_id, Box::new(value));
288            true
289        } else {
290            false
291        }
292    }
293}
294
295// ============================================================================
296// StateManager - main entry point
297// ============================================================================
298
299/// Core state manager with sync-first API
300///
301/// All public methods are synchronous. Background event processing
302/// happens in a dedicated thread.
303pub struct StateManager {
304    /// Property values storage
305    store: Arc<RwLock<StateStore>>,
306
307    /// Watched properties for iter() filtering
308    watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
309
310    /// IP to speaker ID mapping (for event worker)
311    ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
312
313    /// Event manager (set-once via OnceLock — enables live events)
314    event_manager: OnceLock<Arc<SonosEventManager>>,
315
316    /// Channel for sending change events to iter()
317    event_tx: mpsc::Sender<ChangeEvent>,
318
319    /// Receiver for iter() - wrapped in `Arc<Mutex>` for cloning
320    event_rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
321
322    /// Background event processor handle (lazily spawned)
323    _worker: Mutex<Option<JoinHandle<()>>>,
324
325    /// Cleanup timeout for subscriptions
326    cleanup_timeout: Duration,
327
328    /// Maps property key → Service for WatchRegistry's unregister_watches_for_service.
329    /// Shared with StateWatchRegistry via Arc.
330    key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
331
332    /// Lazy event manager initialization closure (set-once).
333    /// Called by watch() to trigger event manager creation on first use.
334    event_init: OnceLock<EventInitFn>,
335}
336
337// ============================================================================
338// StateWatchRegistry - WatchRegistry impl for SonosEventManager
339// ============================================================================
340
341/// Lightweight WatchRegistry implementation wired into the event manager.
342///
343/// Separated from StateManager because `mpsc::Sender` is `!Sync`,
344/// preventing StateManager itself from satisfying `WatchRegistry: Sync`.
345/// This struct holds only the Arc-wrapped fields needed for watch management.
346struct StateWatchRegistry {
347    watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
348    ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
349    key_to_service: Arc<RwLock<HashMap<&'static str, Service>>>,
350}
351
352impl WatchRegistry for StateWatchRegistry {
353    fn register_watch(&self, speaker_id: &SpeakerId, key: &'static str, service: Service) {
354        self.watched.write().insert((speaker_id.clone(), key));
355        self.key_to_service.write().insert(key, service);
356    }
357
358    fn unregister_watches_for_service(&self, ip: IpAddr, service: Service) {
359        // 1. Resolve IP → SpeakerId
360        let speaker_id = match self.ip_to_speaker.read().get(&ip).cloned() {
361            Some(id) => id,
362            None => {
363                tracing::warn!(
364                    "unregister_watches_for_service: no speaker found for IP {}",
365                    ip
366                );
367                return;
368            }
369        };
370
371        // 2. Find property keys belonging to this service
372        let service_keys: Vec<&'static str> = self
373            .key_to_service
374            .read()
375            .iter()
376            .filter(|(_, &svc)| svc == service)
377            .map(|(&key, _)| key)
378            .collect();
379
380        // 3. Remove matching (speaker_id, key) entries from watched set
381        let mut watched = self.watched.write();
382        for key in service_keys {
383            watched.remove(&(speaker_id.clone(), key));
384        }
385    }
386}
387
388impl StateManager {
389    /// Create a new StateManager with default settings (sync)
390    ///
391    /// # Example
392    ///
393    /// ```rust,ignore
394    /// let manager = StateManager::new()?;
395    /// ```
396    pub fn new() -> Result<Self> {
397        Self::builder().build()
398    }
399
400    /// Create a StateManager builder for custom configuration
401    pub fn builder() -> StateManagerBuilder {
402        StateManagerBuilder::default()
403    }
404
405    /// Add discovered devices (sync)
406    ///
407    /// # Example
408    ///
409    /// ```rust,ignore
410    /// let devices = sonos_discovery::get();
411    /// manager.add_devices(devices)?;
412    /// ```
413    pub fn add_devices(&self, devices: Vec<Device>) -> Result<()> {
414        let mut store = self.store.write();
415        let mut ip_map = self.ip_to_speaker.write();
416
417        for device in devices {
418            let speaker_id = SpeakerId::new(&device.id);
419            let ip: IpAddr = device
420                .ip_address
421                .parse()
422                .map_err(|_| StateError::InvalidIpAddress(device.ip_address.clone()))?;
423
424            let friendly_name = if device.room_name.is_empty() || device.room_name == "Unknown" {
425                device.name.clone()
426            } else {
427                device.room_name.clone()
428            };
429
430            let info = SpeakerInfo {
431                id: speaker_id.clone(),
432                name: friendly_name,
433                room_name: device.room_name.clone(),
434                ip_address: ip,
435                port: device.port,
436                model_name: device.model_name.clone(),
437                software_version: "unknown".to_string(),
438                boot_seq: 0,
439                satellites: vec![],
440            };
441
442            // Update ip_to_speaker mapping
443            ip_map.insert(ip, speaker_id.clone());
444            tracing::debug!(
445                "Added speaker {} at IP {} to ip_to_speaker map",
446                speaker_id.as_str(),
447                ip
448            );
449
450            store.add_speaker(info);
451        }
452
453        // Also add devices to event manager if present
454        drop(store);
455        drop(ip_map);
456
457        if let Some(em) = self.event_manager.get() {
458            let devices_for_em: Vec<_> = self
459                .speaker_infos()
460                .iter()
461                .map(|info| sonos_discovery::Device {
462                    id: info.id.as_str().to_string(),
463                    name: info.name.clone(),
464                    room_name: info.room_name.clone(),
465                    ip_address: info.ip_address.to_string(),
466                    port: info.port,
467                    model_name: info.model_name.clone(),
468                })
469                .collect();
470
471            if let Err(e) = em.add_devices(devices_for_em) {
472                tracing::warn!("Failed to add devices to event manager: {}", e);
473            }
474        }
475
476        Ok(())
477    }
478
479    /// Get all speaker info
480    pub fn speaker_infos(&self) -> Vec<SpeakerInfo> {
481        self.store.read().speakers()
482    }
483
484    /// Get a specific speaker info by ID
485    pub fn speaker_info(&self, speaker_id: &SpeakerId) -> Option<SpeakerInfo> {
486        self.store.read().speaker(speaker_id).cloned()
487    }
488
489    /// Get speaker IP by ID
490    pub fn get_speaker_ip(&self, speaker_id: &SpeakerId) -> Option<IpAddr> {
491        self.store.read().speaker(speaker_id).map(|s| s.ip_address)
492    }
493
494    /// Get boot_seq for a speaker (used by GroupManagement AddMember)
495    pub fn get_boot_seq(&self, speaker_id: &SpeakerId) -> Option<u32> {
496        self.store.read().speaker(speaker_id).map(|s| s.boot_seq)
497    }
498
499    /// Update a speaker's IP address in both the store and the reverse map.
500    pub fn update_speaker_ip(&self, speaker_id: &SpeakerId, new_ip: IpAddr) {
501        let old_ip = {
502            let mut store = self.store.write();
503            store.update_speaker_ip_address(speaker_id, new_ip)
504        };
505        if let Some(old_ip) = old_ip {
506            let mut map = self.ip_to_speaker.write();
507            map.remove(&old_ip);
508            map.insert(new_ip, speaker_id.clone());
509        }
510    }
511
512    /// Get all satellite speaker IDs from topology data.
513    pub fn get_satellite_ids(&self) -> Vec<SpeakerId> {
514        self.store.read().satellite_ids.iter().cloned().collect()
515    }
516
517    /// Store satellite speaker IDs from topology data.
518    pub fn set_satellite_ids(&self, ids: Vec<SpeakerId>) {
519        self.store.write().satellite_ids = ids.into_iter().collect();
520    }
521
522    /// Create a blocking iterator over change events
523    ///
524    /// Only emits events for properties that have been watched.
525    ///
526    /// # Example
527    ///
528    /// ```rust,ignore
529    /// // First, watch some properties
530    /// speaker.volume.watch()?;
531    ///
532    /// // Then iterate over changes
533    /// for event in manager.iter() {
534    ///     println!("Changed: {} on {}", event.property_key, event.speaker_id);
535    /// }
536    /// ```
537    pub fn iter(&self) -> ChangeIterator {
538        ChangeIterator::new(Arc::clone(&self.event_rx))
539    }
540
541    /// Get current property value (sync, no subscription)
542    ///
543    /// For PerCoordinator speaker-scoped properties, this transparently reads
544    /// from the coordinator's store, so group members see the coordinator's value.
545    pub fn get_property<P: SonosProperty>(&self, speaker_id: &SpeakerId) -> Option<P> {
546        self.store.read().get_resolved::<P>(speaker_id)
547    }
548
549    /// Get current group property value (sync, no subscription)
550    pub fn get_group_property<P: Property>(&self, group_id: &GroupId) -> Option<P> {
551        self.store.read().get_group::<P>(group_id)
552    }
553
554    /// Set a property value
555    ///
556    /// Updates the property value in the store and emits a change event
557    /// if the property is being watched.
558    pub fn set_property<P: SonosProperty>(&self, speaker_id: &SpeakerId, value: P) {
559        let changed = {
560            let mut store = self.store.write();
561            store.set::<P>(speaker_id, value)
562        };
563
564        if changed {
565            self.maybe_emit_change(speaker_id, P::KEY, P::SERVICE);
566        }
567    }
568
569    /// Set a group property value
570    ///
571    /// Updates the group property value in the store and emits a change event
572    /// if the property is being watched (keyed on the coordinator's speaker ID).
573    /// Used by the SDK layer to store group-scoped values fetched via API calls.
574    pub fn set_group_property<P: SonosProperty>(&self, group_id: &GroupId, value: P) {
575        let coordinator_id = {
576            let mut store = self.store.write();
577            let changed = store.set_group::<P>(group_id, value);
578            if !changed {
579                return;
580            }
581            store.groups.get(group_id).map(|g| g.coordinator_id.clone())
582        };
583
584        if let Some(coordinator_id) = coordinator_id {
585            self.maybe_emit_change(&coordinator_id, P::KEY, P::SERVICE);
586        }
587    }
588
589    /// Register a property as watched (called by PropertyHandle::watch)
590    pub fn register_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
591        self.watched
592            .write()
593            .insert((speaker_id.clone(), property_key));
594    }
595
596    /// Unregister a property watch
597    pub fn unregister_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
598        self.watched
599            .write()
600            .remove(&(speaker_id.clone(), property_key));
601    }
602
603    /// Watch a property with automatic UPnP subscription (recommended API)
604    ///
605    /// This is the preferred method for watching properties as it:
606    /// 1. Registers the property for change notifications
607    /// 2. Subscribes to the UPnP service via the event manager
608    ///
609    /// Returns the current cached value if available.
610    pub fn watch_property_with_subscription<P: SonosProperty>(
611        &self,
612        speaker_id: &SpeakerId,
613    ) -> Result<Option<P>> {
614        // Register for change notifications
615        self.register_watch(speaker_id, P::KEY);
616
617        // Subscribe via event manager if available
618        if let Some(em) = self.event_manager.get() {
619            // Get speaker IP from store
620            if let Some(ip) = self.get_speaker_ip(speaker_id) {
621                if let Err(e) = em.ensure_service_subscribed(ip, P::SERVICE) {
622                    tracing::warn!(
623                        "Failed to subscribe to {:?} for {}: {}",
624                        P::SERVICE,
625                        speaker_id.as_str(),
626                        e
627                    );
628                }
629            }
630        }
631
632        Ok(self.get_property::<P>(speaker_id))
633    }
634
635    /// Unwatch a property and release UPnP subscription
636    pub fn unwatch_property_with_subscription<P: SonosProperty>(&self, speaker_id: &SpeakerId) {
637        // Unregister from change notifications
638        self.unregister_watch(speaker_id, P::KEY);
639
640        // Release subscription via event manager if available
641        if let Some(em) = self.event_manager.get() {
642            if let Some(ip) = self.get_speaker_ip(speaker_id) {
643                if let Err(e) = em.release_service_subscription(ip, P::SERVICE) {
644                    tracing::warn!(
645                        "Failed to unsubscribe from {:?} for {}: {}",
646                        P::SERVICE,
647                        speaker_id.as_str(),
648                        e
649                    );
650                }
651            }
652        }
653    }
654
655    /// Check if a property is being watched
656    pub fn is_watched(&self, speaker_id: &SpeakerId, property_key: &'static str) -> bool {
657        self.watched
658            .read()
659            .contains(&(speaker_id.clone(), property_key))
660    }
661
662    /// Emit a change event if the property is being watched
663    fn maybe_emit_change(
664        &self,
665        speaker_id: &SpeakerId,
666        property_key: &'static str,
667        service: Service,
668    ) {
669        let is_watched = self
670            .watched
671            .read()
672            .contains(&(speaker_id.clone(), property_key));
673
674        if is_watched {
675            let event = ChangeEvent::new(speaker_id.clone(), property_key, service);
676            let _ = self.event_tx.send(event);
677        }
678    }
679
680    /// Initialize from topology data
681    pub fn initialize(&self, topology: Topology) {
682        let mut store = self.store.write();
683        for speaker in &topology.speakers {
684            store.add_speaker(speaker.clone());
685        }
686        for group in &topology.groups {
687            store.add_group(group.clone());
688        }
689        store.set_system(topology);
690    }
691
692    /// Check if initialized with any speakers
693    pub fn is_initialized(&self) -> bool {
694        !self.store.read().is_empty()
695    }
696
697    /// Get number of speakers
698    pub fn speaker_count(&self) -> usize {
699        self.store.read().speaker_count()
700    }
701
702    /// Get number of groups
703    pub fn group_count(&self) -> usize {
704        self.store.read().group_count()
705    }
706
707    /// Get all current groups
708    ///
709    /// Returns all groups in the system. Every speaker is always in a group,
710    /// so a single speaker forms a group of one.
711    pub fn groups(&self) -> Vec<GroupInfo> {
712        self.store.read().groups.values().cloned().collect()
713    }
714
715    /// Get a specific group by ID
716    pub fn get_group(&self, group_id: &GroupId) -> Option<GroupInfo> {
717        self.store.read().groups.get(group_id).cloned()
718    }
719
720    /// Get the group a speaker belongs to
721    ///
722    /// Uses the speaker_to_group mapping for quick lookup.
723    pub fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<GroupInfo> {
724        let store = self.store.read();
725        let group_id = store.speaker_to_group.get(speaker_id)?;
726        store.groups.get(group_id).cloned()
727    }
728
729    /// Resolve the subscription target for a PerCoordinator service.
730    ///
731    /// For PerCoordinator services, returns the coordinator's `(SpeakerId, IpAddr)`
732    /// so the SDK can route UPnP subscriptions to the coordinator speaker.
733    /// Falls back to the speaker itself if no group data exists.
734    ///
735    /// For non-PerCoordinator services, returns the speaker's own identity.
736    pub fn resolve_subscription_target(
737        &self,
738        speaker_id: &SpeakerId,
739        speaker_ip: IpAddr,
740        service: Service,
741    ) -> (SpeakerId, IpAddr) {
742        if service.scope() == ServiceScope::PerCoordinator {
743            let store = self.store.read();
744            let coordinator_id = store.resolve_coordinator(speaker_id);
745            if coordinator_id == *speaker_id {
746                (speaker_id.clone(), speaker_ip)
747            } else {
748                let coord_ip = store
749                    .speaker(&coordinator_id)
750                    .map(|s| s.ip_address)
751                    .unwrap_or(speaker_ip);
752                (coordinator_id, coord_ip)
753            }
754        } else {
755            (speaker_id.clone(), speaker_ip)
756        }
757    }
758
759    /// Get access to the event manager (if configured)
760    ///
761    /// This allows PropertyHandle::watch() to trigger UPnP subscriptions
762    /// via the event manager's ensure_service_subscribed() method.
763    pub fn event_manager(&self) -> Option<&Arc<SonosEventManager>> {
764        self.event_manager.get()
765    }
766
767    /// Wire an event manager into this StateManager after construction.
768    ///
769    /// Spawns the event worker thread and registers all known devices.
770    /// Can only be called once — subsequent calls are no-ops.
771    pub fn set_event_manager(&self, em: Arc<SonosEventManager>) -> Result<()> {
772        tracing::debug!("StateManager::set_event_manager called");
773        if self.event_manager.set(Arc::clone(&em)).is_err() {
774            tracing::debug!("Event manager already set — no-op");
775            return Ok(()); // Already set — no-op
776        }
777
778        // Wire this StateManager as the WatchRegistry
779        em.set_watch_registry(Arc::new(StateWatchRegistry {
780            watched: Arc::clone(&self.watched),
781            ip_to_speaker: Arc::clone(&self.ip_to_speaker),
782            key_to_service: Arc::clone(&self.key_to_service),
783        }));
784
785        // Register all known devices with the event manager
786        let devices_for_em: Vec<_> = self
787            .speaker_infos()
788            .iter()
789            .map(|info| sonos_discovery::Device {
790                id: info.id.as_str().to_string(),
791                name: info.name.clone(),
792                room_name: info.room_name.clone(),
793                ip_address: info.ip_address.to_string(),
794                port: info.port,
795                model_name: info.model_name.clone(),
796            })
797            .collect();
798
799        if let Err(e) = em.add_devices(devices_for_em) {
800            tracing::warn!(
801                "Failed to add devices to event manager during lazy init: {}",
802                e
803            );
804        }
805
806        // Spawn event worker thread
807        let worker = spawn_state_event_worker(
808            em,
809            Arc::clone(&self.store),
810            Arc::clone(&self.watched),
811            self.event_tx.clone(),
812            Arc::clone(&self.ip_to_speaker),
813        );
814        info!("StateManager event worker started (lazy init)");
815
816        if let Ok(mut w) = self._worker.lock() {
817            *w = Some(worker);
818        }
819
820        Ok(())
821    }
822
823    /// Set the lazy event manager initialization closure.
824    ///
825    /// Called once by `SonosSystem::from_devices_inner()` after construction.
826    /// Subsequent calls are no-ops (OnceLock semantics).
827    pub fn set_event_init(&self, f: EventInitFn) {
828        let _ = self.event_init.set(f);
829    }
830
831    /// Get the event init closure (if set).
832    ///
833    /// Used by `PropertyHandle::watch()` and `GroupPropertyHandle::watch()`
834    /// to trigger lazy event manager creation on first use.
835    pub fn event_init(&self) -> Option<&EventInitFn> {
836        self.event_init.get()
837    }
838}
839
840impl Clone for StateManager {
841    fn clone(&self) -> Self {
842        let event_manager = OnceLock::new();
843        if let Some(em) = self.event_manager.get() {
844            let _ = event_manager.set(Arc::clone(em));
845        }
846        let event_init = OnceLock::new();
847        if let Some(f) = self.event_init.get() {
848            let _ = event_init.set(Arc::clone(f));
849        }
850        Self {
851            store: Arc::clone(&self.store),
852            watched: Arc::clone(&self.watched),
853            ip_to_speaker: Arc::clone(&self.ip_to_speaker),
854            event_manager,
855            event_tx: self.event_tx.clone(),
856            event_rx: Arc::clone(&self.event_rx),
857            _worker: Mutex::new(None),
858            cleanup_timeout: self.cleanup_timeout,
859            key_to_service: Arc::clone(&self.key_to_service),
860            event_init,
861        }
862    }
863}
864
865// ============================================================================
866// StateManagerBuilder
867// ============================================================================
868
869/// Builder for StateManager configuration
870pub struct StateManagerBuilder {
871    cleanup_timeout: Duration,
872    event_manager: Option<Arc<SonosEventManager>>,
873}
874
875impl Default for StateManagerBuilder {
876    fn default() -> Self {
877        Self {
878            cleanup_timeout: Duration::from_secs(5),
879            event_manager: None,
880        }
881    }
882}
883
884impl StateManagerBuilder {
885    /// Set the cleanup timeout for subscriptions
886    pub fn cleanup_timeout(mut self, timeout: Duration) -> Self {
887        self.cleanup_timeout = timeout;
888        self
889    }
890
891    /// Set the event manager for live event processing
892    ///
893    /// When an event manager is provided, the StateManager will:
894    /// - Spawn a background worker to process events
895    /// - Automatically subscribe/unsubscribe via `watch()`/`unwatch()` on properties
896    /// - Update state from incoming events
897    pub fn with_event_manager(mut self, em: Arc<SonosEventManager>) -> Self {
898        self.event_manager = Some(em);
899        self
900    }
901
902    /// Build the StateManager
903    pub fn build(self) -> Result<StateManager> {
904        let (event_tx, event_rx) = mpsc::channel();
905
906        let store = Arc::new(RwLock::new(StateStore::new()));
907        let watched = Arc::new(RwLock::new(HashSet::new()));
908        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
909        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
910
911        let event_manager_lock = OnceLock::new();
912        let mut worker = None;
913
914        // If event_manager provided at build time, wire it up eagerly
915        if let Some(em) = self.event_manager {
916            let _ = event_manager_lock.set(Arc::clone(&em));
917
918            // Wire WatchRegistry
919            em.set_watch_registry(Arc::new(StateWatchRegistry {
920                watched: Arc::clone(&watched),
921                ip_to_speaker: Arc::clone(&ip_to_speaker),
922                key_to_service: Arc::clone(&key_to_service),
923            }));
924
925            let worker_handle = spawn_state_event_worker(
926                em,
927                Arc::clone(&store),
928                Arc::clone(&watched),
929                event_tx.clone(),
930                Arc::clone(&ip_to_speaker),
931            );
932            info!("StateManager event worker started");
933            worker = Some(worker_handle);
934        }
935
936        let manager = StateManager {
937            store,
938            watched,
939            ip_to_speaker,
940            event_manager: event_manager_lock,
941            event_tx,
942            event_rx: Arc::new(Mutex::new(event_rx)),
943            _worker: Mutex::new(worker),
944            cleanup_timeout: self.cleanup_timeout,
945            key_to_service,
946            event_init: OnceLock::new(),
947        };
948
949        info!("StateManager created (sync-first mode)");
950        Ok(manager)
951    }
952}
953
954#[cfg(test)]
955mod tests {
956    use super::*;
957    use crate::property::{GroupVolume, PlaybackState, Volume};
958    use sonos_api::Service;
959
960    #[test]
961    fn test_state_manager_creation() {
962        let manager = StateManager::new().unwrap();
963        assert!(!manager.is_initialized());
964        assert_eq!(manager.speaker_count(), 0);
965    }
966
967    #[test]
968    fn test_add_devices() {
969        let manager = StateManager::new().unwrap();
970
971        let devices = vec![Device {
972            id: "RINCON_123".to_string(),
973            name: "Living Room".to_string(),
974            room_name: "Living Room".to_string(),
975            ip_address: "192.168.1.100".to_string(),
976            port: 1400,
977            model_name: "Sonos One".to_string(),
978        }];
979
980        manager.add_devices(devices).unwrap();
981        assert_eq!(manager.speaker_count(), 1);
982    }
983
984    #[test]
985    fn test_property_storage() {
986        let manager = StateManager::new().unwrap();
987
988        let devices = vec![Device {
989            id: "RINCON_123".to_string(),
990            name: "Living Room".to_string(),
991            room_name: "Living Room".to_string(),
992            ip_address: "192.168.1.100".to_string(),
993            port: 1400,
994            model_name: "Sonos One".to_string(),
995        }];
996        manager.add_devices(devices).unwrap();
997
998        let speaker_id = SpeakerId::new("RINCON_123");
999
1000        // Initially None
1001        assert!(manager.get_property::<Volume>(&speaker_id).is_none());
1002
1003        // Set value
1004        manager.set_property(&speaker_id, Volume::new(50));
1005        assert_eq!(
1006            manager.get_property::<Volume>(&speaker_id),
1007            Some(Volume::new(50))
1008        );
1009    }
1010
1011    #[test]
1012    fn test_watch_registration() {
1013        let manager = StateManager::new().unwrap();
1014
1015        let devices = vec![Device {
1016            id: "RINCON_123".to_string(),
1017            name: "Living Room".to_string(),
1018            room_name: "Living Room".to_string(),
1019            ip_address: "192.168.1.100".to_string(),
1020            port: 1400,
1021            model_name: "Sonos One".to_string(),
1022        }];
1023        manager.add_devices(devices).unwrap();
1024
1025        let speaker_id = SpeakerId::new("RINCON_123");
1026
1027        // Not watched initially
1028        assert!(!manager.is_watched(&speaker_id, "volume"));
1029
1030        // Register watch
1031        manager.register_watch(&speaker_id, "volume");
1032        assert!(manager.is_watched(&speaker_id, "volume"));
1033
1034        // Unregister watch
1035        manager.unregister_watch(&speaker_id, "volume");
1036        assert!(!manager.is_watched(&speaker_id, "volume"));
1037    }
1038
1039    #[test]
1040    fn test_change_event_emission() {
1041        let manager = StateManager::new().unwrap();
1042
1043        let devices = vec![Device {
1044            id: "RINCON_123".to_string(),
1045            name: "Living Room".to_string(),
1046            room_name: "Living Room".to_string(),
1047            ip_address: "192.168.1.100".to_string(),
1048            port: 1400,
1049            model_name: "Sonos One".to_string(),
1050        }];
1051        manager.add_devices(devices).unwrap();
1052
1053        let speaker_id = SpeakerId::new("RINCON_123");
1054
1055        // Register watch
1056        manager.register_watch(&speaker_id, "volume");
1057
1058        // Set property (should emit event)
1059        manager.set_property(&speaker_id, Volume::new(75));
1060
1061        // Get event via iter
1062        let iter = manager.iter();
1063        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1064        assert!(event.is_some());
1065
1066        let event = event.unwrap();
1067        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
1068        assert_eq!(event.property_key, "volume");
1069    }
1070
1071    #[test]
1072    fn test_set_group_property_emits_change_event() {
1073        let manager = StateManager::new().unwrap();
1074
1075        let devices = vec![Device {
1076            id: "RINCON_123".to_string(),
1077            name: "Living Room".to_string(),
1078            room_name: "Living Room".to_string(),
1079            ip_address: "192.168.1.100".to_string(),
1080            port: 1400,
1081            model_name: "Sonos One".to_string(),
1082        }];
1083        manager.add_devices(devices).unwrap();
1084
1085        let speaker_id = SpeakerId::new("RINCON_123");
1086        let group_id = GroupId::new("RINCON_123:1");
1087
1088        // Add group so coordinator lookup works
1089        {
1090            let mut store = manager.store.write();
1091            store.add_group(GroupInfo::new(
1092                group_id.clone(),
1093                speaker_id.clone(),
1094                vec![speaker_id.clone()],
1095            ));
1096        }
1097
1098        // Register watch on coordinator for group_volume
1099        manager.register_watch(&speaker_id, "group_volume");
1100
1101        // Set group property (should emit event via coordinator)
1102        manager.set_group_property(&group_id, GroupVolume::new(80));
1103
1104        // Verify event was emitted
1105        let iter = manager.iter();
1106        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1107        assert!(event.is_some());
1108
1109        let event = event.unwrap();
1110        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
1111        assert_eq!(event.property_key, "group_volume");
1112        assert_eq!(event.service, Service::GroupRenderingControl);
1113    }
1114
1115    #[test]
1116    fn test_set_group_property_no_event_when_unwatched() {
1117        let manager = StateManager::new().unwrap();
1118
1119        let devices = vec![Device {
1120            id: "RINCON_123".to_string(),
1121            name: "Living Room".to_string(),
1122            room_name: "Living Room".to_string(),
1123            ip_address: "192.168.1.100".to_string(),
1124            port: 1400,
1125            model_name: "Sonos One".to_string(),
1126        }];
1127        manager.add_devices(devices).unwrap();
1128
1129        let speaker_id = SpeakerId::new("RINCON_123");
1130        let group_id = GroupId::new("RINCON_123:1");
1131
1132        {
1133            let mut store = manager.store.write();
1134            store.add_group(GroupInfo::new(
1135                group_id.clone(),
1136                speaker_id.clone(),
1137                vec![speaker_id.clone()],
1138            ));
1139        }
1140
1141        // Don't register any watch
1142        manager.set_group_property(&group_id, GroupVolume::new(50));
1143
1144        let iter = manager.iter();
1145        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
1146        assert!(event.is_none());
1147    }
1148
1149    // ========================================================================
1150    // StateStore Group Operations Tests
1151    // ========================================================================
1152
1153    #[test]
1154    fn test_add_group_updates_speaker_to_group() {
1155        let mut store = StateStore::new();
1156
1157        let speaker1 = SpeakerId::new("RINCON_111");
1158        let speaker2 = SpeakerId::new("RINCON_222");
1159        let group_id = GroupId::new("RINCON_111:1");
1160
1161        let group = GroupInfo::new(
1162            group_id.clone(),
1163            speaker1.clone(),
1164            vec![speaker1.clone(), speaker2.clone()],
1165        );
1166
1167        store.add_group(group);
1168
1169        // Verify speaker_to_group mapping is updated for all members
1170        assert_eq!(store.speaker_to_group.get(&speaker1), Some(&group_id));
1171        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group_id));
1172    }
1173
1174    #[test]
1175    fn test_add_group_single_speaker() {
1176        let mut store = StateStore::new();
1177
1178        let speaker = SpeakerId::new("RINCON_333");
1179        let group_id = GroupId::new("RINCON_333:1");
1180
1181        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1182
1183        store.add_group(group.clone());
1184
1185        // Verify speaker_to_group mapping
1186        assert_eq!(store.speaker_to_group.get(&speaker), Some(&group_id));
1187
1188        // Verify group is stored
1189        assert_eq!(store.groups.get(&group_id), Some(&group));
1190    }
1191
1192    #[test]
1193    fn test_get_group_for_speaker_returns_correct_group() {
1194        let mut store = StateStore::new();
1195
1196        let speaker1 = SpeakerId::new("RINCON_111");
1197        let speaker2 = SpeakerId::new("RINCON_222");
1198        let speaker3 = SpeakerId::new("RINCON_333");
1199        let group1_id = GroupId::new("RINCON_111:1");
1200        let group2_id = GroupId::new("RINCON_333:1");
1201
1202        // Group 1: speaker1 (coordinator) + speaker2
1203        let group1 = GroupInfo::new(
1204            group1_id.clone(),
1205            speaker1.clone(),
1206            vec![speaker1.clone(), speaker2.clone()],
1207        );
1208
1209        // Group 2: speaker3 alone
1210        let group2 = GroupInfo::new(group2_id.clone(), speaker3.clone(), vec![speaker3.clone()]);
1211
1212        store.add_group(group1.clone());
1213        store.add_group(group2.clone());
1214
1215        // Verify get_group_for_speaker returns correct groups
1216        assert_eq!(store.get_group_for_speaker(&speaker1), Some(&group1));
1217        assert_eq!(store.get_group_for_speaker(&speaker2), Some(&group1));
1218        assert_eq!(store.get_group_for_speaker(&speaker3), Some(&group2));
1219    }
1220
1221    #[test]
1222    fn test_get_group_for_speaker_returns_none_for_unknown() {
1223        let store = StateStore::new();
1224
1225        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1226
1227        assert!(store.get_group_for_speaker(&unknown_speaker).is_none());
1228    }
1229
1230    #[test]
1231    fn test_clear_groups_removes_all_group_data() {
1232        let mut store = StateStore::new();
1233
1234        let speaker1 = SpeakerId::new("RINCON_111");
1235        let speaker2 = SpeakerId::new("RINCON_222");
1236        let group_id = GroupId::new("RINCON_111:1");
1237
1238        let group = GroupInfo::new(
1239            group_id.clone(),
1240            speaker1.clone(),
1241            vec![speaker1.clone(), speaker2.clone()],
1242        );
1243
1244        store.add_group(group);
1245
1246        // Verify data exists
1247        assert!(!store.groups.is_empty());
1248        assert!(!store.speaker_to_group.is_empty());
1249
1250        // Clear groups
1251        store.clear_groups();
1252
1253        // Verify all group data is cleared
1254        assert!(store.groups.is_empty());
1255        assert!(store.group_props.is_empty());
1256        assert!(store.speaker_to_group.is_empty());
1257    }
1258
1259    #[test]
1260    fn test_clear_groups_then_add_new_groups() {
1261        let mut store = StateStore::new();
1262
1263        // Add initial group
1264        let speaker1 = SpeakerId::new("RINCON_111");
1265        let group1_id = GroupId::new("RINCON_111:1");
1266        let group1 = GroupInfo::new(group1_id.clone(), speaker1.clone(), vec![speaker1.clone()]);
1267        store.add_group(group1);
1268
1269        // Clear and add new group
1270        store.clear_groups();
1271
1272        let speaker2 = SpeakerId::new("RINCON_222");
1273        let group2_id = GroupId::new("RINCON_222:1");
1274        let group2 = GroupInfo::new(group2_id.clone(), speaker2.clone(), vec![speaker2.clone()]);
1275        store.add_group(group2.clone());
1276
1277        // Verify old group is gone, new group exists
1278        assert!(!store.groups.contains_key(&group1_id));
1279        assert_eq!(store.groups.get(&group2_id), Some(&group2));
1280
1281        // Verify speaker_to_group is updated correctly
1282        assert!(!store.speaker_to_group.contains_key(&speaker1));
1283        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group2_id));
1284    }
1285
1286    // ========================================================================
1287    // StateManager Group Methods Tests
1288    // ========================================================================
1289
1290    #[test]
1291    fn test_state_manager_groups_returns_all_groups() {
1292        let manager = StateManager::new().unwrap();
1293
1294        // Add devices
1295        let devices = vec![
1296            Device {
1297                id: "RINCON_111".to_string(),
1298                name: "Living Room".to_string(),
1299                room_name: "Living Room".to_string(),
1300                ip_address: "192.168.1.100".to_string(),
1301                port: 1400,
1302                model_name: "Sonos One".to_string(),
1303            },
1304            Device {
1305                id: "RINCON_222".to_string(),
1306                name: "Kitchen".to_string(),
1307                room_name: "Kitchen".to_string(),
1308                ip_address: "192.168.1.101".to_string(),
1309                port: 1400,
1310                model_name: "Sonos One".to_string(),
1311            },
1312        ];
1313        manager.add_devices(devices).unwrap();
1314
1315        // Create groups via initialize
1316        let speaker1 = SpeakerId::new("RINCON_111");
1317        let speaker2 = SpeakerId::new("RINCON_222");
1318        let group1 = GroupInfo::new(
1319            GroupId::new("RINCON_111:1"),
1320            speaker1.clone(),
1321            vec![speaker1.clone()],
1322        );
1323        let group2 = GroupInfo::new(
1324            GroupId::new("RINCON_222:1"),
1325            speaker2.clone(),
1326            vec![speaker2.clone()],
1327        );
1328
1329        let topology = Topology::new(
1330            manager.speaker_infos(),
1331            vec![group1.clone(), group2.clone()],
1332        );
1333        manager.initialize(topology);
1334
1335        // Verify groups() returns all groups
1336        let groups = manager.groups();
1337        assert_eq!(groups.len(), 2);
1338
1339        // Verify both groups are present (order may vary)
1340        let group_ids: Vec<_> = groups.iter().map(|g| g.id.clone()).collect();
1341        assert!(group_ids.contains(&GroupId::new("RINCON_111:1")));
1342        assert!(group_ids.contains(&GroupId::new("RINCON_222:1")));
1343    }
1344
1345    #[test]
1346    fn test_state_manager_groups_returns_empty_when_no_groups() {
1347        let manager = StateManager::new().unwrap();
1348
1349        // No groups added
1350        let groups = manager.groups();
1351        assert!(groups.is_empty());
1352    }
1353
1354    #[test]
1355    fn test_state_manager_get_group_returns_correct_group() {
1356        let manager = StateManager::new().unwrap();
1357
1358        // Add device
1359        let devices = vec![Device {
1360            id: "RINCON_111".to_string(),
1361            name: "Living Room".to_string(),
1362            room_name: "Living Room".to_string(),
1363            ip_address: "192.168.1.100".to_string(),
1364            port: 1400,
1365            model_name: "Sonos One".to_string(),
1366        }];
1367        manager.add_devices(devices).unwrap();
1368
1369        // Create group via initialize
1370        let speaker = SpeakerId::new("RINCON_111");
1371        let group_id = GroupId::new("RINCON_111:1");
1372        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1373
1374        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1375        manager.initialize(topology);
1376
1377        // Verify get_group returns the correct group
1378        let found = manager.get_group(&group_id);
1379        assert!(found.is_some());
1380        assert_eq!(found.unwrap(), group);
1381    }
1382
1383    #[test]
1384    fn test_state_manager_get_group_returns_none_for_unknown() {
1385        let manager = StateManager::new().unwrap();
1386
1387        // No groups added
1388        let unknown_id = GroupId::new("RINCON_UNKNOWN:1");
1389        let found = manager.get_group(&unknown_id);
1390        assert!(found.is_none());
1391    }
1392
1393    #[test]
1394    fn test_state_manager_get_group_for_speaker_returns_correct_group() {
1395        let manager = StateManager::new().unwrap();
1396
1397        // Add devices
1398        let devices = vec![
1399            Device {
1400                id: "RINCON_111".to_string(),
1401                name: "Living Room".to_string(),
1402                room_name: "Living Room".to_string(),
1403                ip_address: "192.168.1.100".to_string(),
1404                port: 1400,
1405                model_name: "Sonos One".to_string(),
1406            },
1407            Device {
1408                id: "RINCON_222".to_string(),
1409                name: "Kitchen".to_string(),
1410                room_name: "Kitchen".to_string(),
1411                ip_address: "192.168.1.101".to_string(),
1412                port: 1400,
1413                model_name: "Sonos One".to_string(),
1414            },
1415        ];
1416        manager.add_devices(devices).unwrap();
1417
1418        // Create a group with both speakers
1419        let speaker1 = SpeakerId::new("RINCON_111");
1420        let speaker2 = SpeakerId::new("RINCON_222");
1421        let group_id = GroupId::new("RINCON_111:1");
1422        let group = GroupInfo::new(
1423            group_id.clone(),
1424            speaker1.clone(),
1425            vec![speaker1.clone(), speaker2.clone()],
1426        );
1427
1428        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1429        manager.initialize(topology);
1430
1431        // Verify get_group_for_speaker returns the correct group for both speakers
1432        let found1 = manager.get_group_for_speaker(&speaker1);
1433        assert!(found1.is_some());
1434        assert_eq!(found1.unwrap(), group);
1435
1436        let found2 = manager.get_group_for_speaker(&speaker2);
1437        assert!(found2.is_some());
1438        assert_eq!(found2.unwrap(), group);
1439    }
1440
1441    #[test]
1442    fn test_state_manager_get_group_for_speaker_returns_none_for_unknown() {
1443        let manager = StateManager::new().unwrap();
1444
1445        // No groups added
1446        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1447        let found = manager.get_group_for_speaker(&unknown_speaker);
1448        assert!(found.is_none());
1449    }
1450
1451    #[test]
1452    fn test_state_manager_group_methods_consistency() {
1453        let manager = StateManager::new().unwrap();
1454
1455        // Add device
1456        let devices = vec![Device {
1457            id: "RINCON_111".to_string(),
1458            name: "Living Room".to_string(),
1459            room_name: "Living Room".to_string(),
1460            ip_address: "192.168.1.100".to_string(),
1461            port: 1400,
1462            model_name: "Sonos One".to_string(),
1463        }];
1464        manager.add_devices(devices).unwrap();
1465
1466        // Create group via initialize
1467        let speaker = SpeakerId::new("RINCON_111");
1468        let group_id = GroupId::new("RINCON_111:1");
1469        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1470
1471        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1472        manager.initialize(topology);
1473
1474        // Verify all three methods return consistent data
1475        let groups = manager.groups();
1476        assert_eq!(groups.len(), 1);
1477        assert_eq!(groups[0], group);
1478
1479        let by_id = manager.get_group(&group_id);
1480        assert_eq!(by_id, Some(group.clone()));
1481
1482        let by_speaker = manager.get_group_for_speaker(&speaker);
1483        assert_eq!(by_speaker, Some(group.clone()));
1484
1485        // All should return the same group
1486        assert_eq!(groups[0], by_id.unwrap());
1487        assert_eq!(groups[0], by_speaker.unwrap());
1488    }
1489
1490    // ========================================================================
1491    // boot_seq Tests
1492    // ========================================================================
1493
1494    #[test]
1495    fn test_get_boot_seq_returns_none_for_unknown_speaker() {
1496        let manager = StateManager::new().unwrap();
1497        let unknown = SpeakerId::new("RINCON_UNKNOWN");
1498        assert!(manager.get_boot_seq(&unknown).is_none());
1499    }
1500
1501    #[test]
1502    fn test_boot_seq_defaults_to_zero_for_new_speaker() {
1503        let manager = StateManager::new().unwrap();
1504
1505        let devices = vec![Device {
1506            id: "RINCON_123".to_string(),
1507            name: "Living Room".to_string(),
1508            room_name: "Living Room".to_string(),
1509            ip_address: "192.168.1.100".to_string(),
1510            port: 1400,
1511            model_name: "Sonos One".to_string(),
1512        }];
1513        manager.add_devices(devices).unwrap();
1514
1515        let speaker_id = SpeakerId::new("RINCON_123");
1516
1517        // Before any topology event, boot_seq should be 0
1518        assert_eq!(manager.get_boot_seq(&speaker_id), Some(0));
1519    }
1520
1521    // ========================================================================
1522    // StateWatchRegistry Tests
1523    // ========================================================================
1524
1525    #[test]
1526    fn test_state_watch_registry_register_and_unregister() {
1527        let watched = Arc::new(RwLock::new(HashSet::new()));
1528        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1529        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1530
1531        let ip: IpAddr = "192.168.1.100".parse().unwrap();
1532        let speaker_id = SpeakerId::new("RINCON_123");
1533        ip_to_speaker.write().insert(ip, speaker_id.clone());
1534
1535        let registry = StateWatchRegistry {
1536            watched: Arc::clone(&watched),
1537            ip_to_speaker: Arc::clone(&ip_to_speaker),
1538            key_to_service: Arc::clone(&key_to_service),
1539        };
1540
1541        // Register watches on two services
1542        registry.register_watch(&speaker_id, "volume", Service::RenderingControl);
1543        registry.register_watch(&speaker_id, "mute", Service::RenderingControl);
1544        registry.register_watch(&speaker_id, "playback_state", Service::AVTransport);
1545
1546        assert_eq!(watched.read().len(), 3);
1547
1548        // Unregister RenderingControl — should remove volume + mute, keep playback_state
1549        registry.unregister_watches_for_service(ip, Service::RenderingControl);
1550
1551        let w = watched.read();
1552        assert_eq!(w.len(), 1);
1553        assert!(w.contains(&(speaker_id.clone(), "playback_state")));
1554        assert!(!w.contains(&(speaker_id.clone(), "volume")));
1555        assert!(!w.contains(&(speaker_id.clone(), "mute")));
1556    }
1557
1558    #[test]
1559    fn test_state_watch_registry_unknown_ip_is_noop() {
1560        let watched = Arc::new(RwLock::new(HashSet::new()));
1561        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1562        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1563
1564        let speaker_id = SpeakerId::new("RINCON_123");
1565
1566        let registry = StateWatchRegistry {
1567            watched: Arc::clone(&watched),
1568            ip_to_speaker,
1569            key_to_service: Arc::clone(&key_to_service),
1570        };
1571
1572        // Register a watch (simulating direct add to shared set)
1573        watched.write().insert((speaker_id.clone(), "volume"));
1574        key_to_service
1575            .write()
1576            .insert("volume", Service::RenderingControl);
1577
1578        // Unregister for an unknown IP — should be a no-op
1579        let unknown_ip: IpAddr = "10.0.0.1".parse().unwrap();
1580        registry.unregister_watches_for_service(unknown_ip, Service::RenderingControl);
1581
1582        // Watch should still be there
1583        assert_eq!(watched.read().len(), 1);
1584    }
1585
1586    #[test]
1587    fn test_state_watch_registry_only_removes_matching_speaker() {
1588        let watched = Arc::new(RwLock::new(HashSet::new()));
1589        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
1590        let key_to_service = Arc::new(RwLock::new(HashMap::new()));
1591
1592        let ip1: IpAddr = "192.168.1.100".parse().unwrap();
1593        let ip2: IpAddr = "192.168.1.101".parse().unwrap();
1594        let speaker1 = SpeakerId::new("RINCON_111");
1595        let speaker2 = SpeakerId::new("RINCON_222");
1596
1597        ip_to_speaker.write().insert(ip1, speaker1.clone());
1598        ip_to_speaker.write().insert(ip2, speaker2.clone());
1599
1600        let registry = StateWatchRegistry {
1601            watched: Arc::clone(&watched),
1602            ip_to_speaker,
1603            key_to_service: Arc::clone(&key_to_service),
1604        };
1605
1606        // Both speakers watch volume
1607        registry.register_watch(&speaker1, "volume", Service::RenderingControl);
1608        registry.register_watch(&speaker2, "volume", Service::RenderingControl);
1609        assert_eq!(watched.read().len(), 2);
1610
1611        // Unregister only speaker1's IP
1612        registry.unregister_watches_for_service(ip1, Service::RenderingControl);
1613
1614        let w = watched.read();
1615        assert_eq!(w.len(), 1);
1616        assert!(w.contains(&(speaker2.clone(), "volume")));
1617        assert!(!w.contains(&(speaker1.clone(), "volume")));
1618    }
1619
1620    // ========================================================================
1621    // resolve_coordinator Tests
1622    // ========================================================================
1623
1624    #[test]
1625    fn test_resolve_coordinator_for_standalone_speaker() {
1626        let mut store = StateStore::new();
1627
1628        let speaker = SpeakerId::new("RINCON_111");
1629        let group_id = GroupId::new("RINCON_111:1");
1630
1631        store.add_speaker(SpeakerInfo {
1632            id: speaker.clone(),
1633            name: "Living Room".to_string(),
1634            room_name: "Living Room".to_string(),
1635            ip_address: "192.168.1.100".parse().unwrap(),
1636            port: 1400,
1637            model_name: "Test".to_string(),
1638            software_version: "1.0".to_string(),
1639            boot_seq: 0,
1640            satellites: vec![],
1641        });
1642        store.add_group(GroupInfo::new(
1643            group_id,
1644            speaker.clone(),
1645            vec![speaker.clone()],
1646        ));
1647
1648        // Standalone speaker is its own coordinator
1649        assert_eq!(store.resolve_coordinator(&speaker), speaker);
1650    }
1651
1652    #[test]
1653    fn test_resolve_coordinator_for_group_member() {
1654        let mut store = StateStore::new();
1655
1656        let coordinator = SpeakerId::new("RINCON_COORD");
1657        let member = SpeakerId::new("RINCON_MEMBER");
1658        let group_id = GroupId::new("RINCON_COORD:1");
1659
1660        store.add_group(GroupInfo::new(
1661            group_id,
1662            coordinator.clone(),
1663            vec![coordinator.clone(), member.clone()],
1664        ));
1665
1666        // Member resolves to the coordinator
1667        assert_eq!(store.resolve_coordinator(&member), coordinator);
1668        // Coordinator resolves to itself
1669        assert_eq!(store.resolve_coordinator(&coordinator), coordinator);
1670    }
1671
1672    #[test]
1673    fn test_resolve_coordinator_no_group_data() {
1674        let store = StateStore::new();
1675
1676        let speaker = SpeakerId::new("RINCON_UNKNOWN");
1677
1678        // No group data — falls back to speaker's own ID
1679        assert_eq!(store.resolve_coordinator(&speaker), speaker);
1680    }
1681
1682    // ========================================================================
1683    // get_resolved Tests
1684    // ========================================================================
1685
1686    #[test]
1687    fn test_get_resolved_per_coordinator_reads_from_coordinator() {
1688        let mut store = StateStore::new();
1689
1690        let coordinator = SpeakerId::new("RINCON_COORD");
1691        let member = SpeakerId::new("RINCON_MEMBER");
1692        let group_id = GroupId::new("RINCON_COORD:1");
1693
1694        store.add_speaker(SpeakerInfo {
1695            id: coordinator.clone(),
1696            name: "Coord".to_string(),
1697            room_name: "Coord".to_string(),
1698            ip_address: "192.168.1.100".parse().unwrap(),
1699            port: 1400,
1700            model_name: "Test".to_string(),
1701            software_version: "1.0".to_string(),
1702            boot_seq: 0,
1703            satellites: vec![],
1704        });
1705        store.add_speaker(SpeakerInfo {
1706            id: member.clone(),
1707            name: "Member".to_string(),
1708            room_name: "Member".to_string(),
1709            ip_address: "192.168.1.101".parse().unwrap(),
1710            port: 1400,
1711            model_name: "Test".to_string(),
1712            software_version: "1.0".to_string(),
1713            boot_seq: 0,
1714            satellites: vec![],
1715        });
1716        store.add_group(GroupInfo::new(
1717            group_id,
1718            coordinator.clone(),
1719            vec![coordinator.clone(), member.clone()],
1720        ));
1721
1722        // Set PlaybackState only on coordinator
1723        store.set(&coordinator, PlaybackState::Playing);
1724
1725        // get_resolved on member should return coordinator's value (PerCoordinator + Speaker scope)
1726        let resolved: Option<PlaybackState> = store.get_resolved(&member);
1727        assert_eq!(resolved, Some(PlaybackState::Playing));
1728
1729        // Direct get on member should return None (no data copied)
1730        let direct: Option<PlaybackState> = store.get(&member);
1731        assert_eq!(direct, None);
1732    }
1733
1734    #[test]
1735    fn test_get_resolved_per_speaker_reads_own_props() {
1736        let mut store = StateStore::new();
1737
1738        let coordinator = SpeakerId::new("RINCON_COORD");
1739        let member = SpeakerId::new("RINCON_MEMBER");
1740        let group_id = GroupId::new("RINCON_COORD:1");
1741
1742        store.add_speaker(SpeakerInfo {
1743            id: coordinator.clone(),
1744            name: "Coord".to_string(),
1745            room_name: "Coord".to_string(),
1746            ip_address: "192.168.1.100".parse().unwrap(),
1747            port: 1400,
1748            model_name: "Test".to_string(),
1749            software_version: "1.0".to_string(),
1750            boot_seq: 0,
1751            satellites: vec![],
1752        });
1753        store.add_speaker(SpeakerInfo {
1754            id: member.clone(),
1755            name: "Member".to_string(),
1756            room_name: "Member".to_string(),
1757            ip_address: "192.168.1.101".parse().unwrap(),
1758            port: 1400,
1759            model_name: "Test".to_string(),
1760            software_version: "1.0".to_string(),
1761            boot_seq: 0,
1762            satellites: vec![],
1763        });
1764        store.add_group(GroupInfo::new(
1765            group_id,
1766            coordinator.clone(),
1767            vec![coordinator.clone(), member.clone()],
1768        ));
1769
1770        // Set Volume on coordinator only (PerSpeaker service)
1771        store.set(&coordinator, Volume::new(80));
1772
1773        // get_resolved on member should NOT resolve to coordinator for PerSpeaker
1774        let resolved: Option<Volume> = store.get_resolved(&member);
1775        assert_eq!(resolved, None);
1776
1777        // get_resolved on coordinator returns its own value
1778        let coord_resolved: Option<Volume> = store.get_resolved(&coordinator);
1779        assert_eq!(coord_resolved, Some(Volume::new(80)));
1780    }
1781
1782    #[test]
1783    fn test_update_speaker_ip() {
1784        let manager = StateManager::new().unwrap();
1785
1786        let devices = vec![Device {
1787            id: "RINCON_111".to_string(),
1788            name: "Office".to_string(),
1789            room_name: "Office".to_string(),
1790            ip_address: "192.168.4.198".to_string(),
1791            port: 1400,
1792            model_name: "Roam 2".to_string(),
1793        }];
1794        manager.add_devices(devices).unwrap();
1795
1796        let speaker_id = SpeakerId::new("RINCON_111");
1797        let old_ip: IpAddr = "192.168.4.198".parse().unwrap();
1798        let new_ip: IpAddr = "192.168.4.200".parse().unwrap();
1799
1800        // Verify initial state
1801        assert_eq!(manager.get_speaker_ip(&speaker_id), Some(old_ip));
1802
1803        // Update IP
1804        manager.update_speaker_ip(&speaker_id, new_ip);
1805
1806        // Verify forward map updated
1807        assert_eq!(manager.get_speaker_ip(&speaker_id), Some(new_ip));
1808
1809        // Verify reverse map updated (old IP removed, new IP present)
1810        let ip_map = manager.ip_to_speaker.read();
1811        assert!(!ip_map.contains_key(&old_ip));
1812        assert_eq!(ip_map.get(&new_ip), Some(&speaker_id));
1813    }
1814
1815    #[test]
1816    fn test_update_speaker_ip_no_change() {
1817        let manager = StateManager::new().unwrap();
1818
1819        let devices = vec![Device {
1820            id: "RINCON_111".to_string(),
1821            name: "Office".to_string(),
1822            room_name: "Office".to_string(),
1823            ip_address: "192.168.4.198".to_string(),
1824            port: 1400,
1825            model_name: "Roam 2".to_string(),
1826        }];
1827        manager.add_devices(devices).unwrap();
1828
1829        let speaker_id = SpeakerId::new("RINCON_111");
1830        let same_ip: IpAddr = "192.168.4.198".parse().unwrap();
1831
1832        // Update with same IP — should be a no-op
1833        manager.update_speaker_ip(&speaker_id, same_ip);
1834        assert_eq!(manager.get_speaker_ip(&speaker_id), Some(same_ip));
1835    }
1836
1837    #[test]
1838    fn test_satellite_ids() {
1839        let manager = StateManager::new().unwrap();
1840
1841        assert!(manager.get_satellite_ids().is_empty());
1842
1843        let ids = vec![SpeakerId::new("RINCON_SAT1"), SpeakerId::new("RINCON_SAT2")];
1844        manager.set_satellite_ids(ids.clone());
1845
1846        let stored = manager.get_satellite_ids();
1847        assert_eq!(stored.len(), 2);
1848        assert!(stored.contains(&SpeakerId::new("RINCON_SAT1")));
1849        assert!(stored.contains(&SpeakerId::new("RINCON_SAT2")));
1850    }
1851}