Skip to main content

ember_plus/glow/
stream.rs

1//! Stream handling for Ember+ streaming parameters.
2
3use super::element::{EmberValue, StreamFormat};
4use super::root::StreamEntry;
5use crate::error::Result;
6
7/// Stream manager for handling streaming parameter values.
8#[derive(Debug, Default)]
9pub struct StreamManager {
10    /// Active stream subscriptions
11    subscriptions: std::collections::HashMap<i32, StreamSubscription>,
12}
13
14/// A stream subscription.
15#[derive(Debug, Clone)]
16pub struct StreamSubscription {
17    /// Stream identifier
18    pub stream_id: i32,
19    /// Stream format
20    pub format: StreamFormat,
21    /// Last received value
22    pub last_value: Option<EmberValue>,
23}
24
25impl StreamManager {
26    /// Create a new stream manager.
27    pub fn new() -> Self {
28        StreamManager::default()
29    }
30
31    /// Subscribe to a stream.
32    pub fn subscribe(&mut self, stream_id: i32, format: StreamFormat) {
33        self.subscriptions.insert(stream_id, StreamSubscription {
34            stream_id,
35            format,
36            last_value: None,
37        });
38    }
39
40    /// Unsubscribe from a stream.
41    pub fn unsubscribe(&mut self, stream_id: i32) {
42        self.subscriptions.remove(&stream_id);
43    }
44
45    /// Process a stream entry update.
46    pub fn process_entry(&mut self, entry: &StreamEntry) {
47        if let Some(sub) = self.subscriptions.get_mut(&entry.stream_identifier) {
48            sub.last_value = Some(entry.value.clone());
49        }
50    }
51
52    /// Get the last value for a stream.
53    pub fn get_value(&self, stream_id: i32) -> Option<&EmberValue> {
54        self.subscriptions.get(&stream_id).and_then(|s| s.last_value.as_ref())
55    }
56
57    /// Check if subscribed to a stream.
58    pub fn is_subscribed(&self, stream_id: i32) -> bool {
59        self.subscriptions.contains_key(&stream_id)
60    }
61
62    /// Get all active subscriptions.
63    pub fn subscriptions(&self) -> impl Iterator<Item = &StreamSubscription> {
64        self.subscriptions.values()
65    }
66}
67
68/// Decode a stream value based on format.
69pub fn decode_stream_value(data: &[u8], format: StreamFormat, offset: usize) -> Option<EmberValue> {
70    if offset >= data.len() {
71        return None;
72    }
73    let data = &data[offset..];
74    
75    match format {
76        StreamFormat::UInt8 => {
77            data.first().map(|&v| EmberValue::Integer(v as i64))
78        }
79        StreamFormat::Int8 => {
80            data.first().map(|&v| EmberValue::Integer(v as i8 as i64))
81        }
82        StreamFormat::UInt16BigEndian => {
83            if data.len() >= 2 {
84                Some(EmberValue::Integer(u16::from_be_bytes([data[0], data[1]]) as i64))
85            } else {
86                None
87            }
88        }
89        StreamFormat::UInt16LittleEndian => {
90            if data.len() >= 2 {
91                Some(EmberValue::Integer(u16::from_le_bytes([data[0], data[1]]) as i64))
92            } else {
93                None
94            }
95        }
96        StreamFormat::Int16BigEndian => {
97            if data.len() >= 2 {
98                Some(EmberValue::Integer(i16::from_be_bytes([data[0], data[1]]) as i64))
99            } else {
100                None
101            }
102        }
103        StreamFormat::Int16LittleEndian => {
104            if data.len() >= 2 {
105                Some(EmberValue::Integer(i16::from_le_bytes([data[0], data[1]]) as i64))
106            } else {
107                None
108            }
109        }
110        StreamFormat::UInt32BigEndian => {
111            if data.len() >= 4 {
112                Some(EmberValue::Integer(u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64))
113            } else {
114                None
115            }
116        }
117        StreamFormat::UInt32LittleEndian => {
118            if data.len() >= 4 {
119                Some(EmberValue::Integer(u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as i64))
120            } else {
121                None
122            }
123        }
124        StreamFormat::Int32BigEndian => {
125            if data.len() >= 4 {
126                Some(EmberValue::Integer(i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64))
127            } else {
128                None
129            }
130        }
131        StreamFormat::Int32LittleEndian => {
132            if data.len() >= 4 {
133                Some(EmberValue::Integer(i32::from_le_bytes([data[0], data[1], data[2], data[3]]) as i64))
134            } else {
135                None
136            }
137        }
138        StreamFormat::Float32BigEndian => {
139            if data.len() >= 4 {
140                Some(EmberValue::Real(f32::from_be_bytes([data[0], data[1], data[2], data[3]]) as f64))
141            } else {
142                None
143            }
144        }
145        StreamFormat::Float32LittleEndian => {
146            if data.len() >= 4 {
147                Some(EmberValue::Real(f32::from_le_bytes([data[0], data[1], data[2], data[3]]) as f64))
148            } else {
149                None
150            }
151        }
152        StreamFormat::Float64BigEndian => {
153            if data.len() >= 8 {
154                let arr: [u8; 8] = data[..8].try_into().ok()?;
155                Some(EmberValue::Real(f64::from_be_bytes(arr)))
156            } else {
157                None
158            }
159        }
160        StreamFormat::Float64LittleEndian => {
161            if data.len() >= 8 {
162                let arr: [u8; 8] = data[..8].try_into().ok()?;
163                Some(EmberValue::Real(f64::from_le_bytes(arr)))
164            } else {
165                None
166            }
167        }
168        _ => None,
169    }
170}