ember_plus/glow/
stream.rs1use super::element::{EmberValue, StreamFormat};
4use super::root::StreamEntry;
5use crate::error::Result;
6
7#[derive(Debug, Default)]
9pub struct StreamManager {
10 subscriptions: std::collections::HashMap<i32, StreamSubscription>,
12}
13
14#[derive(Debug, Clone)]
16pub struct StreamSubscription {
17 pub stream_id: i32,
19 pub format: StreamFormat,
21 pub last_value: Option<EmberValue>,
23}
24
25impl StreamManager {
26 pub fn new() -> Self {
28 StreamManager::default()
29 }
30
31 pub fn subscribe(&mut self, stream_id: i32, format: StreamFormat) {
33 self.subscriptions.insert(stream_id, StreamSubscription {
34 stream_id,
35 format,
36 last_value: None,
37 });
38 }
39
40 pub fn unsubscribe(&mut self, stream_id: i32) {
42 self.subscriptions.remove(&stream_id);
43 }
44
45 pub fn process_entry(&mut self, entry: &StreamEntry) {
47 if let Some(sub) = self.subscriptions.get_mut(&entry.stream_identifier) {
48 sub.last_value = Some(entry.value.clone());
49 }
50 }
51
52 pub fn get_value(&self, stream_id: i32) -> Option<&EmberValue> {
54 self.subscriptions.get(&stream_id).and_then(|s| s.last_value.as_ref())
55 }
56
57 pub fn is_subscribed(&self, stream_id: i32) -> bool {
59 self.subscriptions.contains_key(&stream_id)
60 }
61
62 pub fn subscriptions(&self) -> impl Iterator<Item = &StreamSubscription> {
64 self.subscriptions.values()
65 }
66}
67
68pub fn decode_stream_value(data: &[u8], format: StreamFormat, offset: usize) -> Option<EmberValue> {
70 if offset >= data.len() {
71 return None;
72 }
73 let data = &data[offset..];
74
75 match format {
76 StreamFormat::UInt8 => {
77 data.first().map(|&v| EmberValue::Integer(v as i64))
78 }
79 StreamFormat::Int8 => {
80 data.first().map(|&v| EmberValue::Integer(v as i8 as i64))
81 }
82 StreamFormat::UInt16BigEndian => {
83 if data.len() >= 2 {
84 Some(EmberValue::Integer(u16::from_be_bytes([data[0], data[1]]) as i64))
85 } else {
86 None
87 }
88 }
89 StreamFormat::UInt16LittleEndian => {
90 if data.len() >= 2 {
91 Some(EmberValue::Integer(u16::from_le_bytes([data[0], data[1]]) as i64))
92 } else {
93 None
94 }
95 }
96 StreamFormat::Int16BigEndian => {
97 if data.len() >= 2 {
98 Some(EmberValue::Integer(i16::from_be_bytes([data[0], data[1]]) as i64))
99 } else {
100 None
101 }
102 }
103 StreamFormat::Int16LittleEndian => {
104 if data.len() >= 2 {
105 Some(EmberValue::Integer(i16::from_le_bytes([data[0], data[1]]) as i64))
106 } else {
107 None
108 }
109 }
110 StreamFormat::UInt32BigEndian => {
111 if data.len() >= 4 {
112 Some(EmberValue::Integer(u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64))
113 } else {
114 None
115 }
116 }
117 StreamFormat::UInt32LittleEndian => {
118 if data.len() >= 4 {
119 Some(EmberValue::Integer(u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as i64))
120 } else {
121 None
122 }
123 }
124 StreamFormat::Int32BigEndian => {
125 if data.len() >= 4 {
126 Some(EmberValue::Integer(i32::from_be_bytes([data[0], data[1], data[2], data[3]]) as i64))
127 } else {
128 None
129 }
130 }
131 StreamFormat::Int32LittleEndian => {
132 if data.len() >= 4 {
133 Some(EmberValue::Integer(i32::from_le_bytes([data[0], data[1], data[2], data[3]]) as i64))
134 } else {
135 None
136 }
137 }
138 StreamFormat::Float32BigEndian => {
139 if data.len() >= 4 {
140 Some(EmberValue::Real(f32::from_be_bytes([data[0], data[1], data[2], data[3]]) as f64))
141 } else {
142 None
143 }
144 }
145 StreamFormat::Float32LittleEndian => {
146 if data.len() >= 4 {
147 Some(EmberValue::Real(f32::from_le_bytes([data[0], data[1], data[2], data[3]]) as f64))
148 } else {
149 None
150 }
151 }
152 StreamFormat::Float64BigEndian => {
153 if data.len() >= 8 {
154 let arr: [u8; 8] = data[..8].try_into().ok()?;
155 Some(EmberValue::Real(f64::from_be_bytes(arr)))
156 } else {
157 None
158 }
159 }
160 StreamFormat::Float64LittleEndian => {
161 if data.len() >= 8 {
162 let arr: [u8; 8] = data[..8].try_into().ok()?;
163 Some(EmberValue::Real(f64::from_le_bytes(arr)))
164 } else {
165 None
166 }
167 }
168 _ => None,
169 }
170}