Skip to main content

lsl_core/
stream_info.rs

1//! StreamInfo: describes the properties of a data stream.
2
3use crate::config::CONFIG;
4use crate::types::*;
5use crate::xml_dom::{xml_unescape, XmlNode};
6use parking_lot::Mutex;
7use std::sync::Arc;
8
9/// Stream information. Describes the properties of a data stream.
10/// Shared via Arc so it can be safely passed between outlet/inlet/server components.
11#[derive(Clone)]
12pub struct StreamInfo {
13    inner: Arc<Mutex<StreamInfoData>>,
14}
15
16pub struct StreamInfoData {
17    // core data fields
18    pub name: String,
19    pub type_: String,
20    pub channel_count: u32,
21    pub nominal_srate: f64,
22    pub channel_format: ChannelFormat,
23    pub source_id: String,
24    // network identity
25    pub version: i32,
26    pub uid: String,
27    pub created_at: f64,
28    pub session_id: String,
29    pub hostname: String,
30    pub v4address: String,
31    pub v4data_port: u16,
32    pub v4service_port: u16,
33    pub v6address: String,
34    pub v6data_port: u16,
35    pub v6service_port: u16,
36    // XML description subtree
37    pub desc: XmlNode,
38}
39
40impl StreamInfo {
41    pub fn new(
42        name: &str,
43        type_: &str,
44        channel_count: u32,
45        nominal_srate: f64,
46        channel_format: ChannelFormat,
47        source_id: &str,
48    ) -> Self {
49        let hostname = hostname::get()
50            .map(|s| s.to_string_lossy().into_owned())
51            .unwrap_or_default();
52        StreamInfo {
53            inner: Arc::new(Mutex::new(StreamInfoData {
54                name: name.to_string(),
55                type_: type_.to_string(),
56                channel_count,
57                nominal_srate,
58                channel_format,
59                source_id: source_id.to_string(),
60                version: CONFIG.use_protocol_version,
61                uid: uuid::Uuid::new_v4().to_string(),
62                created_at: 0.0,
63                session_id: CONFIG.session_id.clone(),
64                hostname,
65                v4address: String::new(),
66                v4data_port: 0,
67                v4service_port: 0,
68                v6address: String::new(),
69                v6data_port: 0,
70                v6service_port: 0,
71                desc: XmlNode::new("desc"),
72            })),
73        }
74    }
75
76    // === Accessors ===
77
78    pub fn name(&self) -> String {
79        self.inner.lock().name.clone()
80    }
81    pub fn type_(&self) -> String {
82        self.inner.lock().type_.clone()
83    }
84    pub fn channel_count(&self) -> u32 {
85        self.inner.lock().channel_count
86    }
87    pub fn nominal_srate(&self) -> f64 {
88        self.inner.lock().nominal_srate
89    }
90    pub fn channel_format(&self) -> ChannelFormat {
91        self.inner.lock().channel_format
92    }
93    pub fn source_id(&self) -> String {
94        self.inner.lock().source_id.clone()
95    }
96    pub fn version(&self) -> i32 {
97        self.inner.lock().version
98    }
99    pub fn uid(&self) -> String {
100        self.inner.lock().uid.clone()
101    }
102    pub fn created_at(&self) -> f64 {
103        self.inner.lock().created_at
104    }
105    pub fn session_id(&self) -> String {
106        self.inner.lock().session_id.clone()
107    }
108    pub fn hostname(&self) -> String {
109        self.inner.lock().hostname.clone()
110    }
111    pub fn v4address(&self) -> String {
112        self.inner.lock().v4address.clone()
113    }
114    pub fn v4data_port(&self) -> u16 {
115        self.inner.lock().v4data_port
116    }
117    pub fn v4service_port(&self) -> u16 {
118        self.inner.lock().v4service_port
119    }
120    pub fn v6address(&self) -> String {
121        self.inner.lock().v6address.clone()
122    }
123    pub fn v6data_port(&self) -> u16 {
124        self.inner.lock().v6data_port
125    }
126    pub fn v6service_port(&self) -> u16 {
127        self.inner.lock().v6service_port
128    }
129
130    pub fn channel_bytes(&self) -> usize {
131        self.inner.lock().channel_format.channel_bytes()
132    }
133    pub fn sample_bytes(&self) -> usize {
134        let d = self.inner.lock();
135        d.channel_format.channel_bytes() * d.channel_count as usize
136    }
137
138    /// Get a clone of the desc node
139    pub fn desc(&self) -> XmlNode {
140        self.inner.lock().desc.clone()
141    }
142
143    // === Setters ===
144
145    pub fn set_uid(&self, uid: &str) {
146        self.inner.lock().uid = uid.to_string();
147    }
148    pub fn set_created_at(&self, t: f64) {
149        self.inner.lock().created_at = t;
150    }
151    pub fn set_session_id(&self, s: &str) {
152        self.inner.lock().session_id = s.to_string();
153    }
154    pub fn set_hostname(&self, s: &str) {
155        self.inner.lock().hostname = s.to_string();
156    }
157    pub fn set_v4address(&self, s: &str) {
158        self.inner.lock().v4address = s.to_string();
159    }
160    pub fn set_v4data_port(&self, p: u16) {
161        self.inner.lock().v4data_port = p;
162    }
163    pub fn set_v4service_port(&self, p: u16) {
164        self.inner.lock().v4service_port = p;
165    }
166    pub fn set_v6address(&self, s: &str) {
167        self.inner.lock().v6address = s.to_string();
168    }
169    pub fn set_v6data_port(&self, p: u16) {
170        self.inner.lock().v6data_port = p;
171    }
172    pub fn set_v6service_port(&self, p: u16) {
173        self.inner.lock().v6service_port = p;
174    }
175    pub fn set_channel_format(&self, fmt: ChannelFormat) {
176        self.inner.lock().channel_format = fmt;
177    }
178    pub fn set_source_id(&self, s: &str) {
179        self.inner.lock().source_id = s.to_string();
180    }
181    pub fn set_version(&self, v: i32) {
182        self.inner.lock().version = v;
183    }
184    pub fn set_name(&self, s: &str) {
185        self.inner.lock().name = s.to_string();
186    }
187
188    /// Reset the UID to a new random value
189    pub fn reset_uid(&self) -> String {
190        let uid = uuid::Uuid::new_v4().to_string();
191        self.inner.lock().uid = uid.clone();
192        uid
193    }
194
195    /// Generate the shortinfo XML (without desc contents)
196    pub fn to_shortinfo_message(&self) -> String {
197        let d = self.inner.lock();
198        let mut xml = String::with_capacity(1024);
199        xml.push_str("<?xml version=\"1.0\"?>\n<info>");
200        append_field(&mut xml, "name", &d.name);
201        append_field(&mut xml, "type", &d.type_);
202        append_field(&mut xml, "channel_count", &d.channel_count.to_string());
203        append_field(&mut xml, "channel_format", d.channel_format.as_str());
204        append_field(&mut xml, "source_id", &d.source_id);
205        append_field(&mut xml, "nominal_srate", &format!("{}", d.nominal_srate));
206        append_field(
207            &mut xml,
208            "version",
209            &format!("{}", d.version as f64 / 100.0),
210        );
211        append_field(&mut xml, "created_at", &format!("{}", d.created_at));
212        append_field(&mut xml, "uid", &d.uid);
213        append_field(&mut xml, "session_id", &d.session_id);
214        append_field(&mut xml, "hostname", &d.hostname);
215        append_field(&mut xml, "v4address", &d.v4address);
216        append_field(&mut xml, "v4data_port", &d.v4data_port.to_string());
217        append_field(&mut xml, "v4service_port", &d.v4service_port.to_string());
218        append_field(&mut xml, "v6address", &d.v6address);
219        append_field(&mut xml, "v6data_port", &d.v6data_port.to_string());
220        append_field(&mut xml, "v6service_port", &d.v6service_port.to_string());
221        xml.push_str("<desc></desc>");
222        xml.push_str("</info>\n");
223        xml
224    }
225
226    /// Generate the full info XML (with desc contents)
227    pub fn to_fullinfo_message(&self) -> String {
228        let d = self.inner.lock();
229        let mut xml = String::with_capacity(2048);
230        xml.push_str("<?xml version=\"1.0\"?>\n<info>");
231        append_field(&mut xml, "name", &d.name);
232        append_field(&mut xml, "type", &d.type_);
233        append_field(&mut xml, "channel_count", &d.channel_count.to_string());
234        append_field(&mut xml, "channel_format", d.channel_format.as_str());
235        append_field(&mut xml, "source_id", &d.source_id);
236        append_field(&mut xml, "nominal_srate", &format!("{}", d.nominal_srate));
237        append_field(
238            &mut xml,
239            "version",
240            &format!("{}", d.version as f64 / 100.0),
241        );
242        append_field(&mut xml, "created_at", &format!("{}", d.created_at));
243        append_field(&mut xml, "uid", &d.uid);
244        append_field(&mut xml, "session_id", &d.session_id);
245        append_field(&mut xml, "hostname", &d.hostname);
246        append_field(&mut xml, "v4address", &d.v4address);
247        append_field(&mut xml, "v4data_port", &d.v4data_port.to_string());
248        append_field(&mut xml, "v4service_port", &d.v4service_port.to_string());
249        append_field(&mut xml, "v6address", &d.v6address);
250        append_field(&mut xml, "v6data_port", &d.v6data_port.to_string());
251        append_field(&mut xml, "v6service_port", &d.v6service_port.to_string());
252        xml.push_str(&d.desc.to_xml());
253        xml.push_str("</info>\n");
254        xml
255    }
256
257    /// Parse from a shortinfo XML message
258    pub fn from_shortinfo_message(xml: &str) -> Option<StreamInfo> {
259        let name = extract_tag(xml, "name")?;
260        let type_ = extract_tag(xml, "type").unwrap_or_default();
261        let channel_count = extract_tag(xml, "channel_count")
262            .and_then(|s| s.parse().ok())
263            .unwrap_or(0);
264        let nominal_srate = extract_tag(xml, "nominal_srate")
265            .and_then(|s| s.parse().ok())
266            .unwrap_or(0.0);
267        let channel_format = extract_tag(xml, "channel_format")
268            .map(|s| ChannelFormat::from_name(&s))
269            .unwrap_or(ChannelFormat::Undefined);
270        let source_id = extract_tag(xml, "source_id").unwrap_or_default();
271        let version = extract_tag(xml, "version")
272            .and_then(|s| s.parse::<f64>().ok())
273            .map(|v| (v * 100.0) as i32)
274            .unwrap_or(LSL_PROTOCOL_VERSION);
275        let uid = extract_tag(xml, "uid").unwrap_or_default();
276        let created_at = extract_tag(xml, "created_at")
277            .and_then(|s| s.parse().ok())
278            .unwrap_or(0.0);
279        let session_id = extract_tag(xml, "session_id").unwrap_or_default();
280        let hostname = extract_tag(xml, "hostname").unwrap_or_default();
281        let v4address = extract_tag(xml, "v4address").unwrap_or_default();
282        let v4data_port = extract_tag(xml, "v4data_port")
283            .and_then(|s| s.parse().ok())
284            .unwrap_or(0);
285        let v4service_port = extract_tag(xml, "v4service_port")
286            .and_then(|s| s.parse().ok())
287            .unwrap_or(0);
288        let v6address = extract_tag(xml, "v6address").unwrap_or_default();
289        let v6data_port = extract_tag(xml, "v6data_port")
290            .and_then(|s| s.parse().ok())
291            .unwrap_or(0);
292        let v6service_port = extract_tag(xml, "v6service_port")
293            .and_then(|s| s.parse().ok())
294            .unwrap_or(0);
295
296        let info = StreamInfo {
297            inner: Arc::new(Mutex::new(StreamInfoData {
298                name,
299                type_,
300                channel_count,
301                nominal_srate,
302                channel_format,
303                source_id,
304                version,
305                uid,
306                created_at,
307                session_id,
308                hostname,
309                v4address,
310                v4data_port,
311                v4service_port,
312                v6address,
313                v6data_port,
314                v6service_port,
315                desc: XmlNode::new("desc"),
316            })),
317        };
318        Some(info)
319    }
320
321    /// Check if a query string matches this stream info.
322    /// Supports simple XPath-like queries: empty always matches,
323    /// name='X' / type='X' / source_id='X' etc.
324    pub fn matches_query(&self, query: &str) -> bool {
325        if query.is_empty() {
326            return true;
327        }
328        let d = self.inner.lock();
329        // Support: "clause and clause" with "or" between groups
330        // e.g. "name='A' and type='B'" or "name='C'"
331        for or_group in query.split(" or ") {
332            let or_group = or_group.trim();
333            if or_group.is_empty() {
334                continue;
335            }
336            let mut all_match = true;
337            for clause in or_group.split(" and ") {
338                let clause = clause.trim();
339                if clause.is_empty() {
340                    continue;
341                }
342                if !match_clause(clause, &d) {
343                    all_match = false;
344                    break;
345                }
346            }
347            if all_match {
348                return true;
349            }
350        }
351        false
352    }
353
354    /// Lock and provide read-only access to inner data
355    pub fn with_data<R>(&self, f: impl FnOnce(&StreamInfoData) -> R) -> R {
356        f(&self.inner.lock())
357    }
358
359    /// Lock and provide mutable access to inner data
360    pub fn with_data_mut<R>(&self, f: impl FnOnce(&mut StreamInfoData) -> R) -> R {
361        f(&mut self.inner.lock())
362    }
363}
364
365fn match_clause(clause: &str, d: &StreamInfoData) -> bool {
366    let clause = clause.trim();
367
368    // not(...) — negation
369    if clause.starts_with("not(") && clause.ends_with(')') {
370        let inner = &clause[4..clause.len() - 1];
371        return !match_clause(inner, d);
372    }
373
374    // starts-with(field,'value')
375    if clause.starts_with("starts-with(") && clause.ends_with(')') {
376        let inner = &clause[12..clause.len() - 1];
377        if let Some(comma) = inner.find(',') {
378            let field = inner[..comma].trim();
379            let val = inner[comma + 1..]
380                .trim()
381                .trim_matches('\'')
382                .trim_matches('"');
383            return get_field(field, d).starts_with(val);
384        }
385        return true;
386    }
387
388    // contains(field,'value')
389    if clause.starts_with("contains(") && clause.ends_with(')') {
390        let inner = &clause[9..clause.len() - 1];
391        if let Some(comma) = inner.find(',') {
392            let field = inner[..comma].trim();
393            let val = inner[comma + 1..]
394                .trim()
395                .trim_matches('\'')
396                .trim_matches('"');
397            return get_field(field, d).contains(val);
398        }
399        return true;
400    }
401
402    // field!='value' — inequality
403    if let Some(pos) = clause.find("!=") {
404        let field = clause[..pos].trim();
405        let val = clause[pos + 2..]
406            .trim()
407            .trim_matches('\'')
408            .trim_matches('"');
409        return get_field(field, d) != val;
410    }
411
412    // field>='value' — greater-or-equal
413    if let Some(pos) = clause.find(">=") {
414        let field = clause[..pos].trim();
415        let val = clause[pos + 2..]
416            .trim()
417            .trim_matches('\'')
418            .trim_matches('"');
419        if let (Ok(fv), Ok(vv)) = (get_field(field, d).parse::<f64>(), val.parse::<f64>()) {
420            return fv >= vv;
421        }
422        return get_field(field, d).as_str() >= val;
423    }
424
425    // field<='value'
426    if let Some(pos) = clause.find("<=") {
427        let field = clause[..pos].trim();
428        let val = clause[pos + 2..]
429            .trim()
430            .trim_matches('\'')
431            .trim_matches('"');
432        if let (Ok(fv), Ok(vv)) = (get_field(field, d).parse::<f64>(), val.parse::<f64>()) {
433            return fv <= vv;
434        }
435        return get_field(field, d).as_str() <= val;
436    }
437
438    // field>'value' — greater
439    if let Some(pos) = clause.find('>') {
440        if !clause[..pos].contains('=') {
441            let field = clause[..pos].trim();
442            let val = clause[pos + 1..]
443                .trim()
444                .trim_matches('\'')
445                .trim_matches('"');
446            if let (Ok(fv), Ok(vv)) = (get_field(field, d).parse::<f64>(), val.parse::<f64>()) {
447                return fv > vv;
448            }
449        }
450    }
451
452    // field<'value' — less
453    if let Some(pos) = clause.find('<') {
454        if !clause[..pos].contains('=') && !clause[..pos].contains('!') {
455            let field = clause[..pos].trim();
456            let val = clause[pos + 1..]
457                .trim()
458                .trim_matches('\'')
459                .trim_matches('"');
460            if let (Ok(fv), Ok(vv)) = (get_field(field, d).parse::<f64>(), val.parse::<f64>()) {
461                return fv < vv;
462            }
463        }
464    }
465
466    // field='value' — equality
467    if let Some(eq_pos) = clause.find('=') {
468        let field = clause[..eq_pos].trim();
469        let val = clause[eq_pos + 1..]
470            .trim()
471            .trim_matches('\'')
472            .trim_matches('"');
473        return get_field(field, d) == val;
474    }
475
476    true // can't parse: pass
477}
478
479fn get_field(field: &str, d: &StreamInfoData) -> String {
480    match field {
481        "name" => d.name.clone(),
482        "type" => d.type_.clone(),
483        "source_id" => d.source_id.clone(),
484        "channel_count" => d.channel_count.to_string(),
485        "nominal_srate" => format!("{}", d.nominal_srate),
486        "channel_format" => d.channel_format.as_str().to_string(),
487        "uid" => d.uid.clone(),
488        "session_id" => d.session_id.clone(),
489        "hostname" => d.hostname.clone(),
490        "created_at" => format!("{}", d.created_at),
491        "version" => d.version.to_string(),
492        _ => String::new(),
493    }
494}
495
496fn append_field(xml: &mut String, tag: &str, value: &str) {
497    xml.push('<');
498    xml.push_str(tag);
499    xml.push('>');
500    xml_escape_into(xml, value);
501    xml.push_str("</");
502    xml.push_str(tag);
503    xml.push('>');
504}
505
506fn xml_escape_into(out: &mut String, s: &str) {
507    for c in s.chars() {
508        match c {
509            '&' => out.push_str("&amp;"),
510            '<' => out.push_str("&lt;"),
511            '>' => out.push_str("&gt;"),
512            '"' => out.push_str("&quot;"),
513            '\'' => out.push_str("&apos;"),
514            _ => out.push(c),
515        }
516    }
517}
518
519fn extract_tag(xml: &str, tag: &str) -> Option<String> {
520    let open = format!("<{}>", tag);
521    let close = format!("</{}>", tag);
522    let start = xml.find(&open)?;
523    let after_open = start + open.len();
524    let end = xml[after_open..].find(&close)?;
525    let content = &xml[after_open..after_open + end];
526    Some(xml_unescape(content.trim()))
527}