Skip to main content

sonos_state/
state.rs

1//! Sync-first State Management for Sonos devices
2//!
3//! Provides a synchronous API for managing Sonos device state with
4//! background event processing.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use sonos_state::{StateManager, Volume};
10//! use sonos_discovery;
11//!
12//! // Create state manager (sync)
13//! let manager = StateManager::new()?;
14//! let devices = sonos_discovery::get();
15//! manager.add_devices(devices)?;
16//!
17//! // Get speakers
18//! for info in manager.speaker_infos() {
19//!     println!("{}: {}", info.name, info.ip_address);
20//! }
21//!
22//! // Blocking iteration over changes
23//! for event in manager.iter() {
24//!     println!("Change: {:?}", event);
25//! }
26//! ```
27
28use std::any::{Any, TypeId};
29use std::collections::{HashMap, HashSet};
30use std::net::IpAddr;
31use std::sync::{mpsc, Arc, Mutex, OnceLock, RwLock};
32use std::thread::JoinHandle;
33use std::time::{Duration, Instant};
34
35use sonos_api::Service;
36use sonos_discovery::Device;
37use sonos_event_manager::SonosEventManager;
38use tracing::info;
39
40use crate::event_worker::spawn_state_event_worker;
41use crate::iter::ChangeIterator;
42use crate::model::{GroupId, SpeakerId, SpeakerInfo};
43use crate::property::{GroupInfo, Property, SonosProperty, Topology};
44use crate::{Result, StateError};
45
46// ============================================================================
47// ChangeEvent - for iter()
48// ============================================================================
49
50/// A change event emitted when a watched property changes
51#[derive(Debug, Clone)]
52pub struct ChangeEvent {
53    /// Speaker or entity that changed
54    pub speaker_id: SpeakerId,
55    /// Property key that changed
56    pub property_key: &'static str,
57    /// Service the property belongs to
58    pub service: Service,
59    /// When the change occurred
60    pub timestamp: Instant,
61}
62
63impl ChangeEvent {
64    pub fn new(speaker_id: SpeakerId, property_key: &'static str, service: Service) -> Self {
65        Self {
66            speaker_id,
67            property_key,
68            service,
69            timestamp: Instant::now(),
70        }
71    }
72}
73
74// ============================================================================
75// Internal StateStore
76// ============================================================================
77
78/// Internal state storage
79pub struct StateStore {
80    /// Speaker metadata
81    pub(crate) speakers: HashMap<SpeakerId, SpeakerInfo>,
82    /// IP to speaker ID mapping
83    pub(crate) ip_to_speaker: HashMap<IpAddr, SpeakerId>,
84    /// Property values: (speaker_id, property_key) -> type-erased value
85    pub(crate) speaker_props: HashMap<SpeakerId, PropertyBag>,
86    /// Group metadata
87    pub(crate) groups: HashMap<GroupId, GroupInfo>,
88    /// Group properties
89    pub(crate) group_props: HashMap<GroupId, PropertyBag>,
90    /// System properties
91    pub(crate) system_props: PropertyBag,
92    /// Speaker to group mapping for quick lookups
93    pub(crate) speaker_to_group: HashMap<SpeakerId, GroupId>,
94}
95
96impl StateStore {
97    pub(crate) fn new() -> Self {
98        Self {
99            speakers: HashMap::new(),
100            ip_to_speaker: HashMap::new(),
101            speaker_props: HashMap::new(),
102            groups: HashMap::new(),
103            group_props: HashMap::new(),
104            system_props: PropertyBag::new(),
105            speaker_to_group: HashMap::new(),
106        }
107    }
108
109    pub(crate) fn add_speaker(&mut self, speaker: SpeakerInfo) {
110        let id = speaker.id.clone();
111        let ip = speaker.ip_address;
112        self.ip_to_speaker.insert(ip, id.clone());
113        self.speakers.insert(id.clone(), speaker);
114        self.speaker_props
115            .entry(id)
116            .or_insert_with(PropertyBag::new);
117    }
118
119    fn speaker(&self, id: &SpeakerId) -> Option<&SpeakerInfo> {
120        self.speakers.get(id)
121    }
122
123    fn speakers(&self) -> Vec<SpeakerInfo> {
124        self.speakers.values().cloned().collect()
125    }
126
127    pub(crate) fn add_group(&mut self, group: GroupInfo) {
128        let id = group.id.clone();
129        // Update speaker_to_group mapping for all members
130        for member_id in &group.member_ids {
131            self.speaker_to_group.insert(member_id.clone(), id.clone());
132        }
133        self.groups.insert(id.clone(), group);
134        self.group_props.entry(id).or_insert_with(PropertyBag::new);
135    }
136
137    /// Get the group a speaker belongs to
138    #[allow(dead_code)]
139    pub(crate) fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<&GroupInfo> {
140        let group_id = self.speaker_to_group.get(speaker_id)?;
141        self.groups.get(group_id)
142    }
143
144    /// Clear all groups and speaker_to_group mappings
145    ///
146    /// Used when processing topology updates to replace all group data
147    pub(crate) fn clear_groups(&mut self) {
148        self.groups.clear();
149        self.group_props.clear();
150        self.speaker_to_group.clear();
151    }
152
153    pub(crate) fn get<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
154        self.speaker_props.get(speaker_id)?.get::<P>()
155    }
156
157    pub(crate) fn set<P: Property>(&mut self, speaker_id: &SpeakerId, value: P) -> bool {
158        let bag = self
159            .speaker_props
160            .entry(speaker_id.clone())
161            .or_insert_with(PropertyBag::new);
162        bag.set(value)
163    }
164
165    pub(crate) fn get_group<P: Property>(&self, group_id: &GroupId) -> Option<P> {
166        self.group_props.get(group_id)?.get::<P>()
167    }
168
169    pub(crate) fn set_group<P: Property>(&mut self, group_id: &GroupId, value: P) -> bool {
170        let bag = self
171            .group_props
172            .entry(group_id.clone())
173            .or_insert_with(PropertyBag::new);
174        bag.set(value)
175    }
176
177    fn set_system<P: Property>(&mut self, value: P) -> bool {
178        self.system_props.set(value)
179    }
180
181    fn is_empty(&self) -> bool {
182        self.speakers.is_empty()
183    }
184
185    fn speaker_count(&self) -> usize {
186        self.speakers.len()
187    }
188
189    fn group_count(&self) -> usize {
190        self.groups.len()
191    }
192}
193
194// ============================================================================
195// PropertyBag - type-erased property storage
196// ============================================================================
197
198pub(crate) struct PropertyBag {
199    /// Map<TypeId, Box<dyn Any>> where Any is the property value
200    values: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
201}
202
203impl PropertyBag {
204    pub(crate) fn new() -> Self {
205        Self {
206            values: HashMap::new(),
207        }
208    }
209
210    fn get<P: Property>(&self) -> Option<P> {
211        let type_id = TypeId::of::<P>();
212        self.values
213            .get(&type_id)
214            .and_then(|boxed| boxed.downcast_ref::<P>())
215            .cloned()
216    }
217
218    fn set<P: Property>(&mut self, value: P) -> bool {
219        let type_id = TypeId::of::<P>();
220        let current = self
221            .values
222            .get(&type_id)
223            .and_then(|boxed| boxed.downcast_ref::<P>());
224
225        if current != Some(&value) {
226            self.values.insert(type_id, Box::new(value));
227            true
228        } else {
229            false
230        }
231    }
232}
233
234// ============================================================================
235// StateManager - main entry point
236// ============================================================================
237
238/// Core state manager with sync-first API
239///
240/// All public methods are synchronous. Background event processing
241/// happens in a dedicated thread.
242pub struct StateManager {
243    /// Property values storage
244    store: Arc<RwLock<StateStore>>,
245
246    /// Watched properties for iter() filtering
247    watched: Arc<RwLock<HashSet<(SpeakerId, &'static str)>>>,
248
249    /// Service subscription ref counts: (speaker_ip, service) -> count
250    subscriptions: Arc<RwLock<HashMap<(IpAddr, Service), usize>>>,
251
252    /// IP to speaker ID mapping (for event worker)
253    ip_to_speaker: Arc<RwLock<HashMap<IpAddr, SpeakerId>>>,
254
255    /// Event manager (set-once via OnceLock — enables live events)
256    event_manager: OnceLock<Arc<SonosEventManager>>,
257
258    /// Channel for sending change events to iter()
259    event_tx: mpsc::Sender<ChangeEvent>,
260
261    /// Receiver for iter() - wrapped in `Arc<Mutex>` for cloning
262    event_rx: Arc<Mutex<mpsc::Receiver<ChangeEvent>>>,
263
264    /// Background event processor handle (lazily spawned)
265    _worker: Mutex<Option<JoinHandle<()>>>,
266
267    /// Cleanup timeout for subscriptions
268    cleanup_timeout: Duration,
269}
270
271impl StateManager {
272    /// Create a new StateManager with default settings (sync)
273    ///
274    /// # Example
275    ///
276    /// ```rust,ignore
277    /// let manager = StateManager::new()?;
278    /// ```
279    pub fn new() -> Result<Self> {
280        Self::builder().build()
281    }
282
283    /// Create a StateManager builder for custom configuration
284    pub fn builder() -> StateManagerBuilder {
285        StateManagerBuilder::default()
286    }
287
288    /// Add discovered devices (sync)
289    ///
290    /// # Example
291    ///
292    /// ```rust,ignore
293    /// let devices = sonos_discovery::get();
294    /// manager.add_devices(devices)?;
295    /// ```
296    pub fn add_devices(&self, devices: Vec<Device>) -> Result<()> {
297        let mut store = self.store.write().map_err(|_| StateError::LockPoisoned)?;
298        let mut ip_map = self
299            .ip_to_speaker
300            .write()
301            .map_err(|_| StateError::LockPoisoned)?;
302
303        for device in devices {
304            let speaker_id = SpeakerId::new(&device.id);
305            let ip: IpAddr = device
306                .ip_address
307                .parse()
308                .map_err(|_| StateError::InvalidIpAddress(device.ip_address.clone()))?;
309
310            let friendly_name = if device.room_name.is_empty() || device.room_name == "Unknown" {
311                device.name.clone()
312            } else {
313                device.room_name.clone()
314            };
315
316            let info = SpeakerInfo {
317                id: speaker_id.clone(),
318                name: friendly_name,
319                room_name: device.room_name.clone(),
320                ip_address: ip,
321                port: device.port,
322                model_name: device.model_name.clone(),
323                software_version: "unknown".to_string(),
324                boot_seq: 0,
325                satellites: vec![],
326            };
327
328            // Update ip_to_speaker mapping
329            ip_map.insert(ip, speaker_id.clone());
330            tracing::debug!(
331                "Added speaker {} at IP {} to ip_to_speaker map",
332                speaker_id.as_str(),
333                ip
334            );
335
336            store.add_speaker(info);
337        }
338
339        // Also add devices to event manager if present
340        drop(store);
341        drop(ip_map);
342
343        if let Some(em) = self.event_manager.get() {
344            let devices_for_em: Vec<_> = self
345                .speaker_infos()
346                .iter()
347                .map(|info| sonos_discovery::Device {
348                    id: info.id.as_str().to_string(),
349                    name: info.name.clone(),
350                    room_name: info.room_name.clone(),
351                    ip_address: info.ip_address.to_string(),
352                    port: info.port,
353                    model_name: info.model_name.clone(),
354                })
355                .collect();
356
357            if let Err(e) = em.add_devices(devices_for_em) {
358                tracing::warn!("Failed to add devices to event manager: {}", e);
359            }
360        }
361
362        Ok(())
363    }
364
365    /// Get all speaker info
366    pub fn speaker_infos(&self) -> Vec<SpeakerInfo> {
367        let store = match self.store.read() {
368            Ok(s) => s,
369            Err(_) => return vec![],
370        };
371        store.speakers()
372    }
373
374    /// Get a specific speaker info by ID
375    pub fn speaker_info(&self, speaker_id: &SpeakerId) -> Option<SpeakerInfo> {
376        let store = self.store.read().ok()?;
377        store.speaker(speaker_id).cloned()
378    }
379
380    /// Get speaker IP by ID
381    pub fn get_speaker_ip(&self, speaker_id: &SpeakerId) -> Option<IpAddr> {
382        let store = self.store.read().ok()?;
383        store.speaker(speaker_id).map(|s| s.ip_address)
384    }
385
386    /// Get boot_seq for a speaker (used by GroupManagement AddMember)
387    pub fn get_boot_seq(&self, speaker_id: &SpeakerId) -> Option<u32> {
388        let store = self.store.read().ok()?;
389        store.speaker(speaker_id).map(|s| s.boot_seq)
390    }
391
392    /// Create a blocking iterator over change events
393    ///
394    /// Only emits events for properties that have been watched.
395    ///
396    /// # Example
397    ///
398    /// ```rust,ignore
399    /// // First, watch some properties
400    /// speaker.volume.watch()?;
401    ///
402    /// // Then iterate over changes
403    /// for event in manager.iter() {
404    ///     println!("Changed: {} on {}", event.property_key, event.speaker_id);
405    /// }
406    /// ```
407    pub fn iter(&self) -> ChangeIterator {
408        ChangeIterator::new(Arc::clone(&self.event_rx))
409    }
410
411    /// Get current property value (sync, no subscription)
412    pub fn get_property<P: Property>(&self, speaker_id: &SpeakerId) -> Option<P> {
413        let store = self.store.read().ok()?;
414        store.get::<P>(speaker_id)
415    }
416
417    /// Get current group property value (sync, no subscription)
418    pub fn get_group_property<P: Property>(&self, group_id: &GroupId) -> Option<P> {
419        let store = self.store.read().ok()?;
420        store.get_group::<P>(group_id)
421    }
422
423    /// Set a property value
424    ///
425    /// Updates the property value in the store and emits a change event
426    /// if the property is being watched.
427    pub fn set_property<P: SonosProperty>(&self, speaker_id: &SpeakerId, value: P) {
428        let changed = {
429            let mut store = match self.store.write() {
430                Ok(s) => s,
431                Err(_) => return,
432            };
433            store.set::<P>(speaker_id, value)
434        };
435
436        if changed {
437            self.maybe_emit_change(speaker_id, P::KEY, P::SERVICE);
438        }
439    }
440
441    /// Set a group property value
442    ///
443    /// Updates the group property value in the store and emits a change event
444    /// if the property is being watched (keyed on the coordinator's speaker ID).
445    /// Used by the SDK layer to store group-scoped values fetched via API calls.
446    pub fn set_group_property<P: SonosProperty>(&self, group_id: &GroupId, value: P) {
447        let coordinator_id = {
448            let mut store = match self.store.write() {
449                Ok(s) => s,
450                Err(_) => return,
451            };
452            let changed = store.set_group::<P>(group_id, value);
453            if !changed {
454                return;
455            }
456            store.groups.get(group_id).map(|g| g.coordinator_id.clone())
457        };
458
459        if let Some(coordinator_id) = coordinator_id {
460            self.maybe_emit_change(&coordinator_id, P::KEY, P::SERVICE);
461        }
462    }
463
464    /// Register a property as watched (called by PropertyHandle::watch)
465    pub fn register_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
466        if let Ok(mut watched) = self.watched.write() {
467            watched.insert((speaker_id.clone(), property_key));
468        }
469    }
470
471    /// Unregister a property watch
472    pub fn unregister_watch(&self, speaker_id: &SpeakerId, property_key: &'static str) {
473        if let Ok(mut watched) = self.watched.write() {
474            watched.remove(&(speaker_id.clone(), property_key));
475        }
476    }
477
478    /// Watch a property with automatic UPnP subscription (recommended API)
479    ///
480    /// This is the preferred method for watching properties as it:
481    /// 1. Registers the property for change notifications
482    /// 2. Subscribes to the UPnP service via the event manager
483    ///
484    /// Returns the current cached value if available.
485    pub fn watch_property_with_subscription<P: SonosProperty>(
486        &self,
487        speaker_id: &SpeakerId,
488    ) -> Result<Option<P>> {
489        // Register for change notifications
490        self.register_watch(speaker_id, P::KEY);
491
492        // Subscribe via event manager if available
493        if let Some(em) = self.event_manager.get() {
494            // Get speaker IP from store
495            if let Some(ip) = self.get_speaker_ip(speaker_id) {
496                if let Err(e) = em.ensure_service_subscribed(ip, P::SERVICE) {
497                    tracing::warn!(
498                        "Failed to subscribe to {:?} for {}: {}",
499                        P::SERVICE,
500                        speaker_id.as_str(),
501                        e
502                    );
503                }
504            }
505        }
506
507        Ok(self.get_property::<P>(speaker_id))
508    }
509
510    /// Unwatch a property and release UPnP subscription
511    pub fn unwatch_property_with_subscription<P: SonosProperty>(&self, speaker_id: &SpeakerId) {
512        // Unregister from change notifications
513        self.unregister_watch(speaker_id, P::KEY);
514
515        // Release subscription via event manager if available
516        if let Some(em) = self.event_manager.get() {
517            if let Some(ip) = self.get_speaker_ip(speaker_id) {
518                if let Err(e) = em.release_service_subscription(ip, P::SERVICE) {
519                    tracing::warn!(
520                        "Failed to unsubscribe from {:?} for {}: {}",
521                        P::SERVICE,
522                        speaker_id.as_str(),
523                        e
524                    );
525                }
526            }
527        }
528    }
529
530    /// Check if a property is being watched
531    pub fn is_watched(&self, speaker_id: &SpeakerId, property_key: &'static str) -> bool {
532        self.watched
533            .read()
534            .map(|w| w.contains(&(speaker_id.clone(), property_key)))
535            .unwrap_or(false)
536    }
537
538    /// Emit a change event if the property is being watched
539    fn maybe_emit_change(
540        &self,
541        speaker_id: &SpeakerId,
542        property_key: &'static str,
543        service: Service,
544    ) {
545        let is_watched = self
546            .watched
547            .read()
548            .map(|w| w.contains(&(speaker_id.clone(), property_key)))
549            .unwrap_or(false);
550
551        if is_watched {
552            let event = ChangeEvent::new(speaker_id.clone(), property_key, service);
553            let _ = self.event_tx.send(event);
554        }
555    }
556
557    /// Initialize from topology data
558    pub fn initialize(&self, topology: Topology) {
559        if let Ok(mut store) = self.store.write() {
560            for speaker in &topology.speakers {
561                store.add_speaker(speaker.clone());
562            }
563            for group in &topology.groups {
564                store.add_group(group.clone());
565            }
566            store.set_system(topology);
567        }
568    }
569
570    /// Check if initialized with any speakers
571    pub fn is_initialized(&self) -> bool {
572        self.store.read().map(|s| !s.is_empty()).unwrap_or(false)
573    }
574
575    /// Get number of speakers
576    pub fn speaker_count(&self) -> usize {
577        self.store.read().map(|s| s.speaker_count()).unwrap_or(0)
578    }
579
580    /// Get number of groups
581    pub fn group_count(&self) -> usize {
582        self.store.read().map(|s| s.group_count()).unwrap_or(0)
583    }
584
585    /// Get all current groups
586    ///
587    /// Returns all groups in the system. Every speaker is always in a group,
588    /// so a single speaker forms a group of one.
589    pub fn groups(&self) -> Vec<GroupInfo> {
590        self.store
591            .read()
592            .map(|s| s.groups.values().cloned().collect())
593            .unwrap_or_default()
594    }
595
596    /// Get a specific group by ID
597    pub fn get_group(&self, group_id: &GroupId) -> Option<GroupInfo> {
598        self.store.read().ok()?.groups.get(group_id).cloned()
599    }
600
601    /// Get the group a speaker belongs to
602    ///
603    /// Uses the speaker_to_group mapping for quick lookup.
604    pub fn get_group_for_speaker(&self, speaker_id: &SpeakerId) -> Option<GroupInfo> {
605        let store = self.store.read().ok()?;
606        let group_id = store.speaker_to_group.get(speaker_id)?;
607        store.groups.get(group_id).cloned()
608    }
609
610    /// Get access to the event manager (if configured)
611    ///
612    /// This allows PropertyHandle::watch() to trigger UPnP subscriptions
613    /// via the event manager's ensure_service_subscribed() method.
614    pub fn event_manager(&self) -> Option<&Arc<SonosEventManager>> {
615        self.event_manager.get()
616    }
617
618    /// Wire an event manager into this StateManager after construction.
619    ///
620    /// Spawns the event worker thread and registers all known devices.
621    /// Can only be called once — subsequent calls are no-ops.
622    pub fn set_event_manager(&self, em: Arc<SonosEventManager>) -> Result<()> {
623        if self.event_manager.set(Arc::clone(&em)).is_err() {
624            return Ok(()); // Already set — no-op
625        }
626
627        // Register all known devices with the event manager
628        let devices_for_em: Vec<_> = self
629            .speaker_infos()
630            .iter()
631            .map(|info| sonos_discovery::Device {
632                id: info.id.as_str().to_string(),
633                name: info.name.clone(),
634                room_name: info.room_name.clone(),
635                ip_address: info.ip_address.to_string(),
636                port: info.port,
637                model_name: info.model_name.clone(),
638            })
639            .collect();
640
641        if let Err(e) = em.add_devices(devices_for_em) {
642            tracing::warn!(
643                "Failed to add devices to event manager during lazy init: {}",
644                e
645            );
646        }
647
648        // Spawn event worker thread
649        let worker = spawn_state_event_worker(
650            em,
651            Arc::clone(&self.store),
652            Arc::clone(&self.watched),
653            self.event_tx.clone(),
654            Arc::clone(&self.ip_to_speaker),
655        );
656        info!("StateManager event worker started (lazy init)");
657
658        if let Ok(mut w) = self._worker.lock() {
659            *w = Some(worker);
660        }
661
662        Ok(())
663    }
664}
665
666impl Clone for StateManager {
667    fn clone(&self) -> Self {
668        let event_manager = OnceLock::new();
669        if let Some(em) = self.event_manager.get() {
670            let _ = event_manager.set(Arc::clone(em));
671        }
672        Self {
673            store: Arc::clone(&self.store),
674            watched: Arc::clone(&self.watched),
675            subscriptions: Arc::clone(&self.subscriptions),
676            ip_to_speaker: Arc::clone(&self.ip_to_speaker),
677            event_manager,
678            event_tx: self.event_tx.clone(),
679            event_rx: Arc::clone(&self.event_rx),
680            _worker: Mutex::new(None),
681            cleanup_timeout: self.cleanup_timeout,
682        }
683    }
684}
685
686// ============================================================================
687// StateManagerBuilder
688// ============================================================================
689
690/// Builder for StateManager configuration
691pub struct StateManagerBuilder {
692    cleanup_timeout: Duration,
693    event_manager: Option<Arc<SonosEventManager>>,
694}
695
696impl Default for StateManagerBuilder {
697    fn default() -> Self {
698        Self {
699            cleanup_timeout: Duration::from_secs(5),
700            event_manager: None,
701        }
702    }
703}
704
705impl StateManagerBuilder {
706    /// Set the cleanup timeout for subscriptions
707    pub fn cleanup_timeout(mut self, timeout: Duration) -> Self {
708        self.cleanup_timeout = timeout;
709        self
710    }
711
712    /// Set the event manager for live event processing
713    ///
714    /// When an event manager is provided, the StateManager will:
715    /// - Spawn a background worker to process events
716    /// - Automatically subscribe/unsubscribe via `watch()`/`unwatch()` on properties
717    /// - Update state from incoming events
718    pub fn with_event_manager(mut self, em: Arc<SonosEventManager>) -> Self {
719        self.event_manager = Some(em);
720        self
721    }
722
723    /// Build the StateManager
724    pub fn build(self) -> Result<StateManager> {
725        let (event_tx, event_rx) = mpsc::channel();
726
727        let store = Arc::new(RwLock::new(StateStore::new()));
728        let watched = Arc::new(RwLock::new(HashSet::new()));
729        let ip_to_speaker = Arc::new(RwLock::new(HashMap::new()));
730
731        let event_manager_lock = OnceLock::new();
732        let mut worker = None;
733
734        // If event_manager provided at build time, wire it up eagerly
735        if let Some(em) = self.event_manager {
736            let _ = event_manager_lock.set(Arc::clone(&em));
737            let worker_handle = spawn_state_event_worker(
738                em,
739                Arc::clone(&store),
740                Arc::clone(&watched),
741                event_tx.clone(),
742                Arc::clone(&ip_to_speaker),
743            );
744            info!("StateManager event worker started");
745            worker = Some(worker_handle);
746        }
747
748        let manager = StateManager {
749            store,
750            watched,
751            subscriptions: Arc::new(RwLock::new(HashMap::new())),
752            ip_to_speaker,
753            event_manager: event_manager_lock,
754            event_tx,
755            event_rx: Arc::new(Mutex::new(event_rx)),
756            _worker: Mutex::new(worker),
757            cleanup_timeout: self.cleanup_timeout,
758        };
759
760        info!("StateManager created (sync-first mode)");
761        Ok(manager)
762    }
763}
764
765#[cfg(test)]
766mod tests {
767    use super::*;
768    use crate::property::{GroupVolume, Volume};
769    use sonos_api::Service;
770
771    #[test]
772    fn test_state_manager_creation() {
773        let manager = StateManager::new().unwrap();
774        assert!(!manager.is_initialized());
775        assert_eq!(manager.speaker_count(), 0);
776    }
777
778    #[test]
779    fn test_add_devices() {
780        let manager = StateManager::new().unwrap();
781
782        let devices = vec![Device {
783            id: "RINCON_123".to_string(),
784            name: "Living Room".to_string(),
785            room_name: "Living Room".to_string(),
786            ip_address: "192.168.1.100".to_string(),
787            port: 1400,
788            model_name: "Sonos One".to_string(),
789        }];
790
791        manager.add_devices(devices).unwrap();
792        assert_eq!(manager.speaker_count(), 1);
793    }
794
795    #[test]
796    fn test_property_storage() {
797        let manager = StateManager::new().unwrap();
798
799        let devices = vec![Device {
800            id: "RINCON_123".to_string(),
801            name: "Living Room".to_string(),
802            room_name: "Living Room".to_string(),
803            ip_address: "192.168.1.100".to_string(),
804            port: 1400,
805            model_name: "Sonos One".to_string(),
806        }];
807        manager.add_devices(devices).unwrap();
808
809        let speaker_id = SpeakerId::new("RINCON_123");
810
811        // Initially None
812        assert!(manager.get_property::<Volume>(&speaker_id).is_none());
813
814        // Set value
815        manager.set_property(&speaker_id, Volume::new(50));
816        assert_eq!(
817            manager.get_property::<Volume>(&speaker_id),
818            Some(Volume::new(50))
819        );
820    }
821
822    #[test]
823    fn test_watch_registration() {
824        let manager = StateManager::new().unwrap();
825
826        let devices = vec![Device {
827            id: "RINCON_123".to_string(),
828            name: "Living Room".to_string(),
829            room_name: "Living Room".to_string(),
830            ip_address: "192.168.1.100".to_string(),
831            port: 1400,
832            model_name: "Sonos One".to_string(),
833        }];
834        manager.add_devices(devices).unwrap();
835
836        let speaker_id = SpeakerId::new("RINCON_123");
837
838        // Not watched initially
839        assert!(!manager.is_watched(&speaker_id, "volume"));
840
841        // Register watch
842        manager.register_watch(&speaker_id, "volume");
843        assert!(manager.is_watched(&speaker_id, "volume"));
844
845        // Unregister watch
846        manager.unregister_watch(&speaker_id, "volume");
847        assert!(!manager.is_watched(&speaker_id, "volume"));
848    }
849
850    #[test]
851    fn test_change_event_emission() {
852        let manager = StateManager::new().unwrap();
853
854        let devices = vec![Device {
855            id: "RINCON_123".to_string(),
856            name: "Living Room".to_string(),
857            room_name: "Living Room".to_string(),
858            ip_address: "192.168.1.100".to_string(),
859            port: 1400,
860            model_name: "Sonos One".to_string(),
861        }];
862        manager.add_devices(devices).unwrap();
863
864        let speaker_id = SpeakerId::new("RINCON_123");
865
866        // Register watch
867        manager.register_watch(&speaker_id, "volume");
868
869        // Set property (should emit event)
870        manager.set_property(&speaker_id, Volume::new(75));
871
872        // Get event via iter
873        let iter = manager.iter();
874        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
875        assert!(event.is_some());
876
877        let event = event.unwrap();
878        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
879        assert_eq!(event.property_key, "volume");
880    }
881
882    #[test]
883    fn test_set_group_property_emits_change_event() {
884        let manager = StateManager::new().unwrap();
885
886        let devices = vec![Device {
887            id: "RINCON_123".to_string(),
888            name: "Living Room".to_string(),
889            room_name: "Living Room".to_string(),
890            ip_address: "192.168.1.100".to_string(),
891            port: 1400,
892            model_name: "Sonos One".to_string(),
893        }];
894        manager.add_devices(devices).unwrap();
895
896        let speaker_id = SpeakerId::new("RINCON_123");
897        let group_id = GroupId::new("RINCON_123:1");
898
899        // Add group so coordinator lookup works
900        if let Ok(mut store) = manager.store.write() {
901            store.add_group(GroupInfo::new(
902                group_id.clone(),
903                speaker_id.clone(),
904                vec![speaker_id.clone()],
905            ));
906        }
907
908        // Register watch on coordinator for group_volume
909        manager.register_watch(&speaker_id, "group_volume");
910
911        // Set group property (should emit event via coordinator)
912        manager.set_group_property(&group_id, GroupVolume::new(80));
913
914        // Verify event was emitted
915        let iter = manager.iter();
916        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
917        assert!(event.is_some());
918
919        let event = event.unwrap();
920        assert_eq!(event.speaker_id.as_str(), "RINCON_123");
921        assert_eq!(event.property_key, "group_volume");
922        assert_eq!(event.service, Service::GroupRenderingControl);
923    }
924
925    #[test]
926    fn test_set_group_property_no_event_when_unwatched() {
927        let manager = StateManager::new().unwrap();
928
929        let devices = vec![Device {
930            id: "RINCON_123".to_string(),
931            name: "Living Room".to_string(),
932            room_name: "Living Room".to_string(),
933            ip_address: "192.168.1.100".to_string(),
934            port: 1400,
935            model_name: "Sonos One".to_string(),
936        }];
937        manager.add_devices(devices).unwrap();
938
939        let speaker_id = SpeakerId::new("RINCON_123");
940        let group_id = GroupId::new("RINCON_123:1");
941
942        if let Ok(mut store) = manager.store.write() {
943            store.add_group(GroupInfo::new(
944                group_id.clone(),
945                speaker_id.clone(),
946                vec![speaker_id.clone()],
947            ));
948        }
949
950        // Don't register any watch
951        manager.set_group_property(&group_id, GroupVolume::new(50));
952
953        let iter = manager.iter();
954        let event = iter.recv_timeout(std::time::Duration::from_millis(100));
955        assert!(event.is_none());
956    }
957
958    // ========================================================================
959    // StateStore Group Operations Tests
960    // ========================================================================
961
962    #[test]
963    fn test_add_group_updates_speaker_to_group() {
964        let mut store = StateStore::new();
965
966        let speaker1 = SpeakerId::new("RINCON_111");
967        let speaker2 = SpeakerId::new("RINCON_222");
968        let group_id = GroupId::new("RINCON_111:1");
969
970        let group = GroupInfo::new(
971            group_id.clone(),
972            speaker1.clone(),
973            vec![speaker1.clone(), speaker2.clone()],
974        );
975
976        store.add_group(group);
977
978        // Verify speaker_to_group mapping is updated for all members
979        assert_eq!(store.speaker_to_group.get(&speaker1), Some(&group_id));
980        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group_id));
981    }
982
983    #[test]
984    fn test_add_group_single_speaker() {
985        let mut store = StateStore::new();
986
987        let speaker = SpeakerId::new("RINCON_333");
988        let group_id = GroupId::new("RINCON_333:1");
989
990        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
991
992        store.add_group(group.clone());
993
994        // Verify speaker_to_group mapping
995        assert_eq!(store.speaker_to_group.get(&speaker), Some(&group_id));
996
997        // Verify group is stored
998        assert_eq!(store.groups.get(&group_id), Some(&group));
999    }
1000
1001    #[test]
1002    fn test_get_group_for_speaker_returns_correct_group() {
1003        let mut store = StateStore::new();
1004
1005        let speaker1 = SpeakerId::new("RINCON_111");
1006        let speaker2 = SpeakerId::new("RINCON_222");
1007        let speaker3 = SpeakerId::new("RINCON_333");
1008        let group1_id = GroupId::new("RINCON_111:1");
1009        let group2_id = GroupId::new("RINCON_333:1");
1010
1011        // Group 1: speaker1 (coordinator) + speaker2
1012        let group1 = GroupInfo::new(
1013            group1_id.clone(),
1014            speaker1.clone(),
1015            vec![speaker1.clone(), speaker2.clone()],
1016        );
1017
1018        // Group 2: speaker3 alone
1019        let group2 = GroupInfo::new(group2_id.clone(), speaker3.clone(), vec![speaker3.clone()]);
1020
1021        store.add_group(group1.clone());
1022        store.add_group(group2.clone());
1023
1024        // Verify get_group_for_speaker returns correct groups
1025        assert_eq!(store.get_group_for_speaker(&speaker1), Some(&group1));
1026        assert_eq!(store.get_group_for_speaker(&speaker2), Some(&group1));
1027        assert_eq!(store.get_group_for_speaker(&speaker3), Some(&group2));
1028    }
1029
1030    #[test]
1031    fn test_get_group_for_speaker_returns_none_for_unknown() {
1032        let store = StateStore::new();
1033
1034        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1035
1036        assert!(store.get_group_for_speaker(&unknown_speaker).is_none());
1037    }
1038
1039    #[test]
1040    fn test_clear_groups_removes_all_group_data() {
1041        let mut store = StateStore::new();
1042
1043        let speaker1 = SpeakerId::new("RINCON_111");
1044        let speaker2 = SpeakerId::new("RINCON_222");
1045        let group_id = GroupId::new("RINCON_111:1");
1046
1047        let group = GroupInfo::new(
1048            group_id.clone(),
1049            speaker1.clone(),
1050            vec![speaker1.clone(), speaker2.clone()],
1051        );
1052
1053        store.add_group(group);
1054
1055        // Verify data exists
1056        assert!(!store.groups.is_empty());
1057        assert!(!store.speaker_to_group.is_empty());
1058
1059        // Clear groups
1060        store.clear_groups();
1061
1062        // Verify all group data is cleared
1063        assert!(store.groups.is_empty());
1064        assert!(store.group_props.is_empty());
1065        assert!(store.speaker_to_group.is_empty());
1066    }
1067
1068    #[test]
1069    fn test_clear_groups_then_add_new_groups() {
1070        let mut store = StateStore::new();
1071
1072        // Add initial group
1073        let speaker1 = SpeakerId::new("RINCON_111");
1074        let group1_id = GroupId::new("RINCON_111:1");
1075        let group1 = GroupInfo::new(group1_id.clone(), speaker1.clone(), vec![speaker1.clone()]);
1076        store.add_group(group1);
1077
1078        // Clear and add new group
1079        store.clear_groups();
1080
1081        let speaker2 = SpeakerId::new("RINCON_222");
1082        let group2_id = GroupId::new("RINCON_222:1");
1083        let group2 = GroupInfo::new(group2_id.clone(), speaker2.clone(), vec![speaker2.clone()]);
1084        store.add_group(group2.clone());
1085
1086        // Verify old group is gone, new group exists
1087        assert!(!store.groups.contains_key(&group1_id));
1088        assert_eq!(store.groups.get(&group2_id), Some(&group2));
1089
1090        // Verify speaker_to_group is updated correctly
1091        assert!(!store.speaker_to_group.contains_key(&speaker1));
1092        assert_eq!(store.speaker_to_group.get(&speaker2), Some(&group2_id));
1093    }
1094
1095    // ========================================================================
1096    // StateManager Group Methods Tests
1097    // ========================================================================
1098
1099    #[test]
1100    fn test_state_manager_groups_returns_all_groups() {
1101        let manager = StateManager::new().unwrap();
1102
1103        // Add devices
1104        let devices = vec![
1105            Device {
1106                id: "RINCON_111".to_string(),
1107                name: "Living Room".to_string(),
1108                room_name: "Living Room".to_string(),
1109                ip_address: "192.168.1.100".to_string(),
1110                port: 1400,
1111                model_name: "Sonos One".to_string(),
1112            },
1113            Device {
1114                id: "RINCON_222".to_string(),
1115                name: "Kitchen".to_string(),
1116                room_name: "Kitchen".to_string(),
1117                ip_address: "192.168.1.101".to_string(),
1118                port: 1400,
1119                model_name: "Sonos One".to_string(),
1120            },
1121        ];
1122        manager.add_devices(devices).unwrap();
1123
1124        // Create groups via initialize
1125        let speaker1 = SpeakerId::new("RINCON_111");
1126        let speaker2 = SpeakerId::new("RINCON_222");
1127        let group1 = GroupInfo::new(
1128            GroupId::new("RINCON_111:1"),
1129            speaker1.clone(),
1130            vec![speaker1.clone()],
1131        );
1132        let group2 = GroupInfo::new(
1133            GroupId::new("RINCON_222:1"),
1134            speaker2.clone(),
1135            vec![speaker2.clone()],
1136        );
1137
1138        let topology = Topology::new(
1139            manager.speaker_infos(),
1140            vec![group1.clone(), group2.clone()],
1141        );
1142        manager.initialize(topology);
1143
1144        // Verify groups() returns all groups
1145        let groups = manager.groups();
1146        assert_eq!(groups.len(), 2);
1147
1148        // Verify both groups are present (order may vary)
1149        let group_ids: Vec<_> = groups.iter().map(|g| g.id.clone()).collect();
1150        assert!(group_ids.contains(&GroupId::new("RINCON_111:1")));
1151        assert!(group_ids.contains(&GroupId::new("RINCON_222:1")));
1152    }
1153
1154    #[test]
1155    fn test_state_manager_groups_returns_empty_when_no_groups() {
1156        let manager = StateManager::new().unwrap();
1157
1158        // No groups added
1159        let groups = manager.groups();
1160        assert!(groups.is_empty());
1161    }
1162
1163    #[test]
1164    fn test_state_manager_get_group_returns_correct_group() {
1165        let manager = StateManager::new().unwrap();
1166
1167        // Add device
1168        let devices = vec![Device {
1169            id: "RINCON_111".to_string(),
1170            name: "Living Room".to_string(),
1171            room_name: "Living Room".to_string(),
1172            ip_address: "192.168.1.100".to_string(),
1173            port: 1400,
1174            model_name: "Sonos One".to_string(),
1175        }];
1176        manager.add_devices(devices).unwrap();
1177
1178        // Create group via initialize
1179        let speaker = SpeakerId::new("RINCON_111");
1180        let group_id = GroupId::new("RINCON_111:1");
1181        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1182
1183        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1184        manager.initialize(topology);
1185
1186        // Verify get_group returns the correct group
1187        let found = manager.get_group(&group_id);
1188        assert!(found.is_some());
1189        assert_eq!(found.unwrap(), group);
1190    }
1191
1192    #[test]
1193    fn test_state_manager_get_group_returns_none_for_unknown() {
1194        let manager = StateManager::new().unwrap();
1195
1196        // No groups added
1197        let unknown_id = GroupId::new("RINCON_UNKNOWN:1");
1198        let found = manager.get_group(&unknown_id);
1199        assert!(found.is_none());
1200    }
1201
1202    #[test]
1203    fn test_state_manager_get_group_for_speaker_returns_correct_group() {
1204        let manager = StateManager::new().unwrap();
1205
1206        // Add devices
1207        let devices = vec![
1208            Device {
1209                id: "RINCON_111".to_string(),
1210                name: "Living Room".to_string(),
1211                room_name: "Living Room".to_string(),
1212                ip_address: "192.168.1.100".to_string(),
1213                port: 1400,
1214                model_name: "Sonos One".to_string(),
1215            },
1216            Device {
1217                id: "RINCON_222".to_string(),
1218                name: "Kitchen".to_string(),
1219                room_name: "Kitchen".to_string(),
1220                ip_address: "192.168.1.101".to_string(),
1221                port: 1400,
1222                model_name: "Sonos One".to_string(),
1223            },
1224        ];
1225        manager.add_devices(devices).unwrap();
1226
1227        // Create a group with both speakers
1228        let speaker1 = SpeakerId::new("RINCON_111");
1229        let speaker2 = SpeakerId::new("RINCON_222");
1230        let group_id = GroupId::new("RINCON_111:1");
1231        let group = GroupInfo::new(
1232            group_id.clone(),
1233            speaker1.clone(),
1234            vec![speaker1.clone(), speaker2.clone()],
1235        );
1236
1237        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1238        manager.initialize(topology);
1239
1240        // Verify get_group_for_speaker returns the correct group for both speakers
1241        let found1 = manager.get_group_for_speaker(&speaker1);
1242        assert!(found1.is_some());
1243        assert_eq!(found1.unwrap(), group);
1244
1245        let found2 = manager.get_group_for_speaker(&speaker2);
1246        assert!(found2.is_some());
1247        assert_eq!(found2.unwrap(), group);
1248    }
1249
1250    #[test]
1251    fn test_state_manager_get_group_for_speaker_returns_none_for_unknown() {
1252        let manager = StateManager::new().unwrap();
1253
1254        // No groups added
1255        let unknown_speaker = SpeakerId::new("RINCON_UNKNOWN");
1256        let found = manager.get_group_for_speaker(&unknown_speaker);
1257        assert!(found.is_none());
1258    }
1259
1260    #[test]
1261    fn test_state_manager_group_methods_consistency() {
1262        let manager = StateManager::new().unwrap();
1263
1264        // Add device
1265        let devices = vec![Device {
1266            id: "RINCON_111".to_string(),
1267            name: "Living Room".to_string(),
1268            room_name: "Living Room".to_string(),
1269            ip_address: "192.168.1.100".to_string(),
1270            port: 1400,
1271            model_name: "Sonos One".to_string(),
1272        }];
1273        manager.add_devices(devices).unwrap();
1274
1275        // Create group via initialize
1276        let speaker = SpeakerId::new("RINCON_111");
1277        let group_id = GroupId::new("RINCON_111:1");
1278        let group = GroupInfo::new(group_id.clone(), speaker.clone(), vec![speaker.clone()]);
1279
1280        let topology = Topology::new(manager.speaker_infos(), vec![group.clone()]);
1281        manager.initialize(topology);
1282
1283        // Verify all three methods return consistent data
1284        let groups = manager.groups();
1285        assert_eq!(groups.len(), 1);
1286        assert_eq!(groups[0], group);
1287
1288        let by_id = manager.get_group(&group_id);
1289        assert_eq!(by_id, Some(group.clone()));
1290
1291        let by_speaker = manager.get_group_for_speaker(&speaker);
1292        assert_eq!(by_speaker, Some(group.clone()));
1293
1294        // All should return the same group
1295        assert_eq!(groups[0], by_id.unwrap());
1296        assert_eq!(groups[0], by_speaker.unwrap());
1297    }
1298
1299    // ========================================================================
1300    // boot_seq Tests
1301    // ========================================================================
1302
1303    #[test]
1304    fn test_get_boot_seq_returns_none_for_unknown_speaker() {
1305        let manager = StateManager::new().unwrap();
1306        let unknown = SpeakerId::new("RINCON_UNKNOWN");
1307        assert!(manager.get_boot_seq(&unknown).is_none());
1308    }
1309
1310    #[test]
1311    fn test_boot_seq_defaults_to_zero_for_new_speaker() {
1312        let manager = StateManager::new().unwrap();
1313
1314        let devices = vec![Device {
1315            id: "RINCON_123".to_string(),
1316            name: "Living Room".to_string(),
1317            room_name: "Living Room".to_string(),
1318            ip_address: "192.168.1.100".to_string(),
1319            port: 1400,
1320            model_name: "Sonos One".to_string(),
1321        }];
1322        manager.add_devices(devices).unwrap();
1323
1324        let speaker_id = SpeakerId::new("RINCON_123");
1325
1326        // Before any topology event, boot_seq should be 0
1327        assert_eq!(manager.get_boot_seq(&speaker_id), Some(0));
1328    }
1329}