1use crate::sample::Sample;
4use crate::stream_info::StreamInfo;
5use crate::types::*;
6use crossbeam_channel::{bounded, Receiver};
7use parking_lot::Mutex;
8use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
12use tokio::net::TcpStream;
13
14pub struct StreamInlet {
16 info: StreamInfo,
17 max_buflen: i32,
18 max_chunklen: i32,
19 recover: bool,
20 sample_rx: Receiver<Sample>,
21 sample_tx: crossbeam_channel::Sender<Sample>,
22 connected: Arc<AtomicBool>,
23 shutdown: Arc<AtomicBool>,
24 time_correction: Arc<Mutex<f64>>,
25 samples_available: Arc<AtomicU32>,
26 post_processing: Arc<AtomicU32>,
27 postproc: Mutex<crate::postproc::TimestampPostProcessor>,
28}
29
30impl StreamInlet {
31 pub fn new(info: &StreamInfo, max_buflen: i32, max_chunklen: i32, recover: bool) -> Self {
32 let (tx, rx) = bounded(max_buflen.max(1) as usize);
33 let connected = Arc::new(AtomicBool::new(false));
34 let shutdown = Arc::new(AtomicBool::new(false));
35 let time_correction = Arc::new(Mutex::new(0.0f64));
36 let samples_available = Arc::new(AtomicU32::new(0));
37 let post_processing = Arc::new(AtomicU32::new(PROC_NONE));
38
39 let srate = info.nominal_srate();
40 let halftime = crate::config::CONFIG.smoothing_halftime;
41 StreamInlet {
42 info: info.clone(),
43 max_buflen,
44 max_chunklen,
45 recover,
46 sample_rx: rx,
47 sample_tx: tx,
48 connected: connected.clone(),
49 shutdown: shutdown.clone(),
50 time_correction,
51 samples_available,
52 post_processing: post_processing.clone(),
53 postproc: Mutex::new(crate::postproc::TimestampPostProcessor::new(
54 PROC_NONE, srate, halftime,
55 )),
56 }
57 }
58
59 pub fn open_stream(&self, timeout: f64) -> Result<(), String> {
61 if self.connected.load(Ordering::Relaxed) {
62 return Ok(());
63 }
64
65 let info = self.info.clone();
66 let tx = self.sample_tx.clone();
67 let connected = self.connected.clone();
68 let shutdown = self.shutdown.clone();
69 let max_buflen = self.max_buflen;
70 let max_chunklen = self.max_chunklen;
71 let samples_avail = self.samples_available.clone();
72
73 let connected2 = self.connected.clone();
76 let recover = self.recover;
77 let source_uid = info.uid();
78 std::thread::Builder::new()
79 .name("lsl_data_recv".to_string())
80 .spawn(move || {
81 let rt = tokio::runtime::Builder::new_current_thread()
82 .enable_all()
83 .build()
84 .unwrap();
85 rt.block_on(async move {
86 let mut current_info = info.clone();
87 loop {
88 if shutdown.load(Ordering::Relaxed) {
89 break;
90 }
91 match Self::connect_and_receive(
92 ¤t_info,
93 &tx,
94 &connected,
95 &shutdown,
96 max_buflen,
97 max_chunklen,
98 &samples_avail,
99 )
100 .await
101 {
102 Ok(()) => break,
103 Err(e) => {
104 connected.store(false, Ordering::SeqCst);
105 if shutdown.load(Ordering::Relaxed) {
106 break;
107 }
108 if !recover {
109 log::trace!("[inlet] Connection lost, recovery disabled");
110 break;
111 }
112 log::trace!("[inlet] Connection lost: {}, re-resolving...", e);
113 tokio::time::sleep(Duration::from_millis(500)).await;
115 let uid = source_uid.clone();
116 match tokio::task::spawn_blocking(move || {
117 crate::resolver::resolve_by_property("uid", &uid, 1, 3.0)
118 })
119 .await
120 {
121 Ok(found) if !found.is_empty() => {
122 current_info = found.into_iter().next().unwrap();
123 log::trace!("[inlet] Re-resolved, reconnecting...");
124 }
125 _ => {
126 log::trace!("[inlet] Could not re-resolve, will retry...");
127 tokio::time::sleep(Duration::from_secs(1)).await;
128 }
129 }
130 }
131 }
132 }
133 });
134 })
135 .map_err(|e| e.to_string())?;
136
137 let deadline = std::time::Instant::now() + Duration::from_secs_f64(timeout.max(0.001));
139 while !connected2.load(Ordering::SeqCst) {
140 if std::time::Instant::now() >= deadline {
141 return Err("open_stream timed out".to_string());
142 }
143 std::thread::sleep(Duration::from_millis(1));
144 }
145 Ok(())
146 }
147
148 async fn connect_and_receive(
149 info: &StreamInfo,
150 tx: &crossbeam_channel::Sender<Sample>,
151 connected: &Arc<AtomicBool>,
152 shutdown: &Arc<AtomicBool>,
153 max_buflen: i32,
154 max_chunklen: i32,
155 samples_avail: &Arc<AtomicU32>,
156 ) -> Result<(), String> {
157 let stream = Self::try_connect(info).await?;
159 stream.set_nodelay(true).ok();
160
161 let mut reader = BufReader::new(stream);
162
163 let fmt = info.channel_format();
165 let nch = info.channel_count();
166 let proto_version = info.version().min(LSL_PROTOCOL_VERSION);
167
168 if proto_version >= 110 {
169 let request = format!(
170 "LSL:streamfeed/{} {}\r\nNative-Byte-Order: 1234\r\nEndian-Performance: 0\r\nHas-IEEE754-Floats: 1\r\nSupports-Subnormals: 1\r\nValue-Size: {}\r\nData-Protocol-Version: {}\r\nMax-Buffer-Length: {}\r\nMax-Chunk-Length: {}\r\n\r\n",
171 proto_version,
172 info.uid(),
173 fmt.channel_bytes(),
174 proto_version,
175 max_buflen,
176 max_chunklen,
177 );
178 reader
179 .get_mut()
180 .write_all(request.as_bytes())
181 .await
182 .map_err(|e| e.to_string())?;
183 reader.get_mut().flush().await.map_err(|e| e.to_string())?;
184
185 let mut response_line = String::new();
187 reader
188 .read_line(&mut response_line)
189 .await
190 .map_err(|e| e.to_string())?;
191 if !response_line.contains("200") {
192 return Err(format!("Server error: {}", response_line.trim()));
193 }
194
195 loop {
197 let mut line = String::new();
198 reader
199 .read_line(&mut line)
200 .await
201 .map_err(|e| e.to_string())?;
202 if line.trim().is_empty() {
203 break;
204 }
205 }
206 } else {
207 let request = format!("LSL:streamfeed\r\n{} {}\r\n", max_buflen, max_chunklen);
208 reader
209 .get_mut()
210 .write_all(request.as_bytes())
211 .await
212 .map_err(|e| e.to_string())?;
213 reader.get_mut().flush().await.map_err(|e| e.to_string())?;
214 }
215
216 let use_proto_100 = proto_version < 110;
218 for test_offset in [4, 2] {
219 let received = if use_proto_100 {
220 read_sample_async_100(&mut reader, fmt, nch).await?
221 } else {
222 read_sample_async(&mut reader, fmt, nch).await?
223 };
224 let mut expected = Sample::new(fmt, nch, 0.0);
225 expected.assign_test_pattern(test_offset);
226 if received != expected {
227 return Err("Test pattern mismatch".to_string());
228 }
229 }
230
231 connected.store(true, Ordering::SeqCst);
232
233 let srate = info.nominal_srate();
235 let mut last_timestamp = 0.0f64;
236
237 loop {
238 if shutdown.load(Ordering::Relaxed) {
239 break;
240 }
241
242 let mut sample = if use_proto_100 {
243 read_sample_async_100(&mut reader, fmt, nch).await?
244 } else {
245 read_sample_async(&mut reader, fmt, nch).await?
246 };
247
248 if sample.timestamp == DEDUCED_TIMESTAMP {
250 sample.timestamp = last_timestamp;
251 if srate != IRREGULAR_RATE {
252 sample.timestamp += 1.0 / srate;
253 }
254 }
255 last_timestamp = sample.timestamp;
256
257 samples_avail.fetch_add(1, Ordering::Relaxed);
258 if tx.send(sample).is_err() {
259 break;
260 }
261 }
262
263 Ok(())
264 }
265
266 async fn try_connect(info: &StreamInfo) -> Result<TcpStream, String> {
269 let v6_port = info.v6data_port();
271 if v6_port > 0 {
272 let v6_addr = info.v6address();
273 let host = if v6_addr.is_empty() {
274 "::1".to_string()
275 } else {
276 v6_addr
277 };
278 let addr = format!("[{}]:{}", host, v6_port);
279 log::trace!("[inlet] Trying IPv6 {}...", addr);
280 match TcpStream::connect(&addr).await {
281 Ok(stream) => {
282 log::trace!("[inlet] Connected via IPv6");
283 return Ok(stream);
284 }
285 Err(e) => {
286 log::trace!("[inlet] IPv6 connect failed: {}, trying IPv4", e);
287 }
288 }
289 }
290
291 let port = info.v4data_port();
293 let addr_str = info.v4address();
294 let host = if addr_str.is_empty() {
295 "127.0.0.1".to_string()
296 } else {
297 addr_str
298 };
299 let addr = format!("{}:{}", host, port);
300 log::trace!("[inlet] Connecting IPv4 {}...", addr);
301 let stream = TcpStream::connect(&addr).await.map_err(|e| {
302 log::trace!("[inlet] Connect error: {}", e);
303 e.to_string()
304 })?;
305 log::trace!("[inlet] Connected via IPv4");
306 Ok(stream)
307 }
308
309 pub fn pull_sample_f(&self, buffer: &mut [f32], timeout: f64) -> Result<f64, String> {
311 let sample = self.pull_sample_raw(timeout)?;
312 match sample {
313 Some(s) => {
314 s.retrieve_f32(buffer);
315 self.samples_available.fetch_sub(1, Ordering::Relaxed);
316 Ok(self.postprocess_timestamp(s.timestamp))
317 }
318 None => Ok(0.0),
319 }
320 }
321
322 pub fn pull_sample_d(&self, buffer: &mut [f64], timeout: f64) -> Result<f64, String> {
323 let sample = self.pull_sample_raw(timeout)?;
324 match sample {
325 Some(s) => {
326 s.retrieve_f64(buffer);
327 self.samples_available.fetch_sub(1, Ordering::Relaxed);
328 Ok(self.postprocess_timestamp(s.timestamp))
329 }
330 None => Ok(0.0),
331 }
332 }
333
334 pub fn pull_sample_i32(&self, buffer: &mut [i32], timeout: f64) -> Result<f64, String> {
335 let sample = self.pull_sample_raw(timeout)?;
336 match sample {
337 Some(s) => {
338 s.retrieve_i32(buffer);
339 self.samples_available.fetch_sub(1, Ordering::Relaxed);
340 Ok(self.postprocess_timestamp(s.timestamp))
341 }
342 None => Ok(0.0),
343 }
344 }
345
346 pub fn pull_sample_i16(&self, buffer: &mut [i16], timeout: f64) -> Result<f64, String> {
347 let sample = self.pull_sample_raw(timeout)?;
348 match sample {
349 Some(s) => {
350 s.retrieve_i16(buffer);
351 self.samples_available.fetch_sub(1, Ordering::Relaxed);
352 Ok(self.postprocess_timestamp(s.timestamp))
353 }
354 None => Ok(0.0),
355 }
356 }
357
358 pub fn pull_sample_i64(&self, buffer: &mut [i64], timeout: f64) -> Result<f64, String> {
359 let sample = self.pull_sample_raw(timeout)?;
360 match sample {
361 Some(s) => {
362 s.retrieve_i64(buffer);
363 self.samples_available.fetch_sub(1, Ordering::Relaxed);
364 Ok(self.postprocess_timestamp(s.timestamp))
365 }
366 None => Ok(0.0),
367 }
368 }
369
370 pub fn pull_sample_str(&self, timeout: f64) -> Result<(Vec<String>, f64), String> {
371 let sample = self.pull_sample_raw(timeout)?;
372 match sample {
373 Some(s) => {
374 let strings = s.retrieve_strings();
375 self.samples_available.fetch_sub(1, Ordering::Relaxed);
376 Ok((strings, self.postprocess_timestamp(s.timestamp)))
377 }
378 None => Ok((Vec::new(), 0.0)),
379 }
380 }
381
382 fn pull_sample_raw(&self, timeout: f64) -> Result<Option<Sample>, String> {
383 if timeout <= 0.0 {
384 match self.sample_rx.try_recv() {
385 Ok(s) => Ok(Some(s)),
386 Err(_) => Ok(None),
387 }
388 } else if timeout >= FOREVER {
389 match self.sample_rx.recv() {
390 Ok(s) => Ok(Some(s)),
391 Err(_) => Err("channel closed".to_string()),
392 }
393 } else {
394 match self
395 .sample_rx
396 .recv_timeout(Duration::from_secs_f64(timeout))
397 {
398 Ok(s) => Ok(Some(s)),
399 Err(crossbeam_channel::RecvTimeoutError::Timeout) => Ok(None),
400 Err(_) => Err("channel closed".to_string()),
401 }
402 }
403 }
404
405 fn postprocess_timestamp(&self, ts: f64) -> f64 {
406 let flags = self.post_processing.load(Ordering::Relaxed);
407 if flags == PROC_NONE {
408 return ts;
409 }
410 let mut proc = self.postproc.lock();
411 proc.set_clock_offset(*self.time_correction.lock());
412 proc.process(ts)
413 }
414
415 pub fn close_stream(&self) {
416 }
418
419 pub fn time_correction(&self, timeout: f64) -> f64 {
422 let host = {
423 let v4 = self.info.v4address();
424 if v4.is_empty() {
425 "127.0.0.1".to_string()
426 } else {
427 v4
428 }
429 };
430 let port = self.info.v4service_port();
431 let offset = crate::time_receiver::time_correction(&host, port, timeout);
432 *self.time_correction.lock() = offset;
433 offset
434 }
435
436 pub fn set_postprocessing(&self, flags: u32) {
437 self.post_processing.store(flags, Ordering::Relaxed);
438 let srate = self.info.nominal_srate();
439 let halftime = crate::config::CONFIG.smoothing_halftime;
440 *self.postproc.lock() =
441 crate::postproc::TimestampPostProcessor::new(flags, srate, halftime);
442 }
443
444 pub fn samples_available(&self) -> u32 {
445 self.sample_rx.len() as u32
446 }
447
448 pub fn flush(&self) -> u32 {
449 let mut count = 0u32;
450 while self.sample_rx.try_recv().is_ok() {
451 count += 1;
452 }
453 count
454 }
455
456 pub fn was_clock_reset(&self) -> bool {
457 false
458 }
459
460 pub fn smoothing_halftime(&self, value: f32) {
461 let flags = self.post_processing.load(Ordering::Relaxed);
462 let srate = self.info.nominal_srate();
463 *self.postproc.lock() = crate::postproc::TimestampPostProcessor::new(flags, srate, value);
464 }
465
466 pub fn get_fullinfo(&self, _timeout: f64) -> StreamInfo {
467 self.info.clone()
468 }
469}
470
471impl Drop for StreamInlet {
472 fn drop(&mut self) {
473 self.shutdown.store(true, Ordering::Relaxed);
474 }
475}
476
477async fn read_sample_async(
479 reader: &mut BufReader<TcpStream>,
480 fmt: ChannelFormat,
481 num_channels: u32,
482) -> Result<Sample, String> {
483 use crate::sample::SampleData;
484
485 let mut tag = [0u8; 1];
486 reader
487 .read_exact(&mut tag)
488 .await
489 .map_err(|e| e.to_string())?;
490
491 let timestamp = if tag[0] == TAG_DEDUCED_TIMESTAMP {
492 DEDUCED_TIMESTAMP
493 } else {
494 let mut ts_bytes = [0u8; 8];
495 reader
496 .read_exact(&mut ts_bytes)
497 .await
498 .map_err(|e| e.to_string())?;
499 f64::from_le_bytes(ts_bytes)
500 };
501
502 let n = num_channels as usize;
503 let data = match fmt {
504 ChannelFormat::Float32 => {
505 let mut raw = vec![0u8; n * 4];
506 reader
507 .read_exact(&mut raw)
508 .await
509 .map_err(|e| e.to_string())?;
510 SampleData::Float32(
511 raw.chunks_exact(4)
512 .map(|c| f32::from_le_bytes(c.try_into().unwrap()))
513 .collect(),
514 )
515 }
516 ChannelFormat::Double64 => {
517 let mut raw = vec![0u8; n * 8];
518 reader
519 .read_exact(&mut raw)
520 .await
521 .map_err(|e| e.to_string())?;
522 SampleData::Double64(
523 raw.chunks_exact(8)
524 .map(|c| f64::from_le_bytes(c.try_into().unwrap()))
525 .collect(),
526 )
527 }
528 ChannelFormat::Int32 => {
529 let mut raw = vec![0u8; n * 4];
530 reader
531 .read_exact(&mut raw)
532 .await
533 .map_err(|e| e.to_string())?;
534 SampleData::Int32(
535 raw.chunks_exact(4)
536 .map(|c| i32::from_le_bytes(c.try_into().unwrap()))
537 .collect(),
538 )
539 }
540 ChannelFormat::Int16 => {
541 let mut raw = vec![0u8; n * 2];
542 reader
543 .read_exact(&mut raw)
544 .await
545 .map_err(|e| e.to_string())?;
546 SampleData::Int16(
547 raw.chunks_exact(2)
548 .map(|c| i16::from_le_bytes(c.try_into().unwrap()))
549 .collect(),
550 )
551 }
552 ChannelFormat::Int8 => {
553 let mut raw = vec![0u8; n];
554 reader
555 .read_exact(&mut raw)
556 .await
557 .map_err(|e| e.to_string())?;
558 SampleData::Int8(raw.into_iter().map(|b| b as i8).collect())
559 }
560 ChannelFormat::Int64 => {
561 let mut raw = vec![0u8; n * 8];
562 reader
563 .read_exact(&mut raw)
564 .await
565 .map_err(|e| e.to_string())?;
566 SampleData::Int64(
567 raw.chunks_exact(8)
568 .map(|c| i64::from_le_bytes(c.try_into().unwrap()))
569 .collect(),
570 )
571 }
572 ChannelFormat::String | ChannelFormat::Undefined => {
573 let mut strings = Vec::with_capacity(n);
574 for _ in 0..n {
575 let mut lenbytes = [0u8; 1];
576 reader
577 .read_exact(&mut lenbytes)
578 .await
579 .map_err(|e| e.to_string())?;
580 let len: usize = match lenbytes[0] {
581 1 => {
582 let mut b = [0u8; 1];
583 reader.read_exact(&mut b).await.map_err(|e| e.to_string())?;
584 b[0] as usize
585 }
586 4 => {
587 let mut b = [0u8; 4];
588 reader.read_exact(&mut b).await.map_err(|e| e.to_string())?;
589 u32::from_le_bytes(b) as usize
590 }
591 8 => {
592 let mut b = [0u8; 8];
593 reader.read_exact(&mut b).await.map_err(|e| e.to_string())?;
594 u64::from_le_bytes(b) as usize
595 }
596 _ => return Err("invalid varlen int".to_string()),
597 };
598 let mut sbuf = vec![0u8; len];
599 reader
600 .read_exact(&mut sbuf)
601 .await
602 .map_err(|e| e.to_string())?;
603 strings.push(String::from_utf8_lossy(&sbuf).into_owned());
604 }
605 SampleData::StringData(strings)
606 }
607 };
608
609 Ok(Sample {
610 timestamp,
611 pushthrough: true,
612 data,
613 })
614}
615
616async fn read_sample_async_100(
618 reader: &mut BufReader<TcpStream>,
619 fmt: ChannelFormat,
620 num_channels: u32,
621) -> Result<Sample, String> {
622 use crate::sample::SampleData;
623
624 let mut ts_bytes = [0u8; 8];
626 reader
627 .read_exact(&mut ts_bytes)
628 .await
629 .map_err(|e| e.to_string())?;
630 let timestamp = f64::from_le_bytes(ts_bytes);
631
632 let n = num_channels as usize;
633 let data = match fmt {
634 ChannelFormat::Float32 => {
635 let mut raw = vec![0u8; n * 4];
636 reader
637 .read_exact(&mut raw)
638 .await
639 .map_err(|e| e.to_string())?;
640 SampleData::Float32(
641 raw.chunks_exact(4)
642 .map(|c| f32::from_le_bytes(c.try_into().unwrap()))
643 .collect(),
644 )
645 }
646 ChannelFormat::Double64 => {
647 let mut raw = vec![0u8; n * 8];
648 reader
649 .read_exact(&mut raw)
650 .await
651 .map_err(|e| e.to_string())?;
652 SampleData::Double64(
653 raw.chunks_exact(8)
654 .map(|c| f64::from_le_bytes(c.try_into().unwrap()))
655 .collect(),
656 )
657 }
658 ChannelFormat::Int32 => {
659 let mut raw = vec![0u8; n * 4];
660 reader
661 .read_exact(&mut raw)
662 .await
663 .map_err(|e| e.to_string())?;
664 SampleData::Int32(
665 raw.chunks_exact(4)
666 .map(|c| i32::from_le_bytes(c.try_into().unwrap()))
667 .collect(),
668 )
669 }
670 ChannelFormat::Int16 => {
671 let mut raw = vec![0u8; n * 2];
672 reader
673 .read_exact(&mut raw)
674 .await
675 .map_err(|e| e.to_string())?;
676 SampleData::Int16(
677 raw.chunks_exact(2)
678 .map(|c| i16::from_le_bytes(c.try_into().unwrap()))
679 .collect(),
680 )
681 }
682 ChannelFormat::Int8 => {
683 let mut raw = vec![0u8; n];
684 reader
685 .read_exact(&mut raw)
686 .await
687 .map_err(|e| e.to_string())?;
688 SampleData::Int8(raw.into_iter().map(|b| b as i8).collect())
689 }
690 ChannelFormat::Int64 => {
691 let mut raw = vec![0u8; n * 8];
692 reader
693 .read_exact(&mut raw)
694 .await
695 .map_err(|e| e.to_string())?;
696 SampleData::Int64(
697 raw.chunks_exact(8)
698 .map(|c| i64::from_le_bytes(c.try_into().unwrap()))
699 .collect(),
700 )
701 }
702 ChannelFormat::String | ChannelFormat::Undefined => {
703 let mut strings = Vec::with_capacity(n);
704 for _ in 0..n {
705 let mut len_bytes = [0u8; 4];
707 reader
708 .read_exact(&mut len_bytes)
709 .await
710 .map_err(|e| e.to_string())?;
711 let len = u32::from_le_bytes(len_bytes) as usize;
712 let mut sbuf = vec![0u8; len];
713 reader
714 .read_exact(&mut sbuf)
715 .await
716 .map_err(|e| e.to_string())?;
717 strings.push(String::from_utf8_lossy(&sbuf).into_owned());
718 }
719 SampleData::StringData(strings)
720 }
721 };
722
723 Ok(Sample {
724 timestamp,
725 pushthrough: true,
726 data,
727 })
728}