Skip to main content

lsl_core/
inlet.rs

1//! StreamInlet: receives data from the network.
2
3use crate::sample::Sample;
4use crate::stream_info::StreamInfo;
5use crate::types::*;
6use crossbeam_channel::{bounded, Receiver};
7use parking_lot::Mutex;
8use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
12use tokio::net::TcpStream;
13
14/// A stream inlet. Receives data from a stream outlet.
15pub struct StreamInlet {
16    info: StreamInfo,
17    max_buflen: i32,
18    max_chunklen: i32,
19    recover: bool,
20    sample_rx: Receiver<Sample>,
21    sample_tx: crossbeam_channel::Sender<Sample>,
22    connected: Arc<AtomicBool>,
23    shutdown: Arc<AtomicBool>,
24    time_correction: Arc<Mutex<f64>>,
25    samples_available: Arc<AtomicU32>,
26    post_processing: Arc<AtomicU32>,
27    postproc: Mutex<crate::postproc::TimestampPostProcessor>,
28}
29
30impl StreamInlet {
31    pub fn new(info: &StreamInfo, max_buflen: i32, max_chunklen: i32, recover: bool) -> Self {
32        let (tx, rx) = bounded(max_buflen.max(1) as usize);
33        let connected = Arc::new(AtomicBool::new(false));
34        let shutdown = Arc::new(AtomicBool::new(false));
35        let time_correction = Arc::new(Mutex::new(0.0f64));
36        let samples_available = Arc::new(AtomicU32::new(0));
37        let post_processing = Arc::new(AtomicU32::new(PROC_NONE));
38
39        let srate = info.nominal_srate();
40        let halftime = crate::config::CONFIG.smoothing_halftime;
41        StreamInlet {
42            info: info.clone(),
43            max_buflen,
44            max_chunklen,
45            recover,
46            sample_rx: rx,
47            sample_tx: tx,
48            connected: connected.clone(),
49            shutdown: shutdown.clone(),
50            time_correction,
51            samples_available,
52            post_processing: post_processing.clone(),
53            postproc: Mutex::new(crate::postproc::TimestampPostProcessor::new(
54                PROC_NONE, srate, halftime,
55            )),
56        }
57    }
58
59    /// Open the data stream (connect to the outlet's TCP port).
60    pub fn open_stream(&self, timeout: f64) -> Result<(), String> {
61        if self.connected.load(Ordering::Relaxed) {
62            return Ok(());
63        }
64
65        let info = self.info.clone();
66        let tx = self.sample_tx.clone();
67        let connected = self.connected.clone();
68        let shutdown = self.shutdown.clone();
69        let max_buflen = self.max_buflen;
70        let max_chunklen = self.max_chunklen;
71        let samples_avail = self.samples_available.clone();
72
73        // Start the data receiver in a dedicated thread with its own runtime.
74        // When `recover` is true, automatically re-resolves and reconnects on loss.
75        let connected2 = self.connected.clone();
76        let recover = self.recover;
77        let source_uid = info.uid();
78        std::thread::Builder::new()
79            .name("lsl_data_recv".to_string())
80            .spawn(move || {
81                let rt = tokio::runtime::Builder::new_current_thread()
82                    .enable_all()
83                    .build()
84                    .unwrap();
85                rt.block_on(async move {
86                    let mut current_info = info.clone();
87                    loop {
88                        if shutdown.load(Ordering::Relaxed) {
89                            break;
90                        }
91                        match Self::connect_and_receive(
92                            &current_info,
93                            &tx,
94                            &connected,
95                            &shutdown,
96                            max_buflen,
97                            max_chunklen,
98                            &samples_avail,
99                        )
100                        .await
101                        {
102                            Ok(()) => break,
103                            Err(e) => {
104                                connected.store(false, Ordering::SeqCst);
105                                if shutdown.load(Ordering::Relaxed) {
106                                    break;
107                                }
108                                if !recover {
109                                    log::trace!("[inlet] Connection lost, recovery disabled");
110                                    break;
111                                }
112                                log::trace!("[inlet] Connection lost: {}, re-resolving...", e);
113                                // Try to re-resolve the stream by UID
114                                tokio::time::sleep(Duration::from_millis(500)).await;
115                                let uid = source_uid.clone();
116                                match tokio::task::spawn_blocking(move || {
117                                    crate::resolver::resolve_by_property("uid", &uid, 1, 3.0)
118                                })
119                                .await
120                                {
121                                    Ok(found) if !found.is_empty() => {
122                                        current_info = found.into_iter().next().unwrap();
123                                        log::trace!("[inlet] Re-resolved, reconnecting...");
124                                    }
125                                    _ => {
126                                        log::trace!("[inlet] Could not re-resolve, will retry...");
127                                        tokio::time::sleep(Duration::from_secs(1)).await;
128                                    }
129                                }
130                            }
131                        }
132                    }
133                });
134            })
135            .map_err(|e| e.to_string())?;
136
137        // Wait for connection
138        let deadline = std::time::Instant::now() + Duration::from_secs_f64(timeout.max(0.001));
139        while !connected2.load(Ordering::SeqCst) {
140            if std::time::Instant::now() >= deadline {
141                return Err("open_stream timed out".to_string());
142            }
143            std::thread::sleep(Duration::from_millis(1));
144        }
145        Ok(())
146    }
147
148    async fn connect_and_receive(
149        info: &StreamInfo,
150        tx: &crossbeam_channel::Sender<Sample>,
151        connected: &Arc<AtomicBool>,
152        shutdown: &Arc<AtomicBool>,
153        max_buflen: i32,
154        max_chunklen: i32,
155        samples_avail: &Arc<AtomicU32>,
156    ) -> Result<(), String> {
157        // Try connecting: prefer IPv6 if available, fall back to IPv4
158        let stream = Self::try_connect(info).await?;
159        stream.set_nodelay(true).ok();
160
161        let mut reader = BufReader::new(stream);
162
163        // Protocol negotiation (1.10)
164        let fmt = info.channel_format();
165        let nch = info.channel_count();
166        let proto_version = info.version().min(LSL_PROTOCOL_VERSION);
167
168        if proto_version >= 110 {
169            let request = format!(
170                "LSL:streamfeed/{} {}\r\nNative-Byte-Order: 1234\r\nEndian-Performance: 0\r\nHas-IEEE754-Floats: 1\r\nSupports-Subnormals: 1\r\nValue-Size: {}\r\nData-Protocol-Version: {}\r\nMax-Buffer-Length: {}\r\nMax-Chunk-Length: {}\r\n\r\n",
171                proto_version,
172                info.uid(),
173                fmt.channel_bytes(),
174                proto_version,
175                max_buflen,
176                max_chunklen,
177            );
178            reader
179                .get_mut()
180                .write_all(request.as_bytes())
181                .await
182                .map_err(|e| e.to_string())?;
183            reader.get_mut().flush().await.map_err(|e| e.to_string())?;
184
185            // Read response line
186            let mut response_line = String::new();
187            reader
188                .read_line(&mut response_line)
189                .await
190                .map_err(|e| e.to_string())?;
191            if !response_line.contains("200") {
192                return Err(format!("Server error: {}", response_line.trim()));
193            }
194
195            // Read response headers
196            loop {
197                let mut line = String::new();
198                reader
199                    .read_line(&mut line)
200                    .await
201                    .map_err(|e| e.to_string())?;
202                if line.trim().is_empty() {
203                    break;
204                }
205            }
206        } else {
207            let request = format!("LSL:streamfeed\r\n{} {}\r\n", max_buflen, max_chunklen);
208            reader
209                .get_mut()
210                .write_all(request.as_bytes())
211                .await
212                .map_err(|e| e.to_string())?;
213            reader.get_mut().flush().await.map_err(|e| e.to_string())?;
214        }
215
216        // Read and validate test patterns
217        let use_proto_100 = proto_version < 110;
218        for test_offset in [4, 2] {
219            let received = if use_proto_100 {
220                read_sample_async_100(&mut reader, fmt, nch).await?
221            } else {
222                read_sample_async(&mut reader, fmt, nch).await?
223            };
224            let mut expected = Sample::new(fmt, nch, 0.0);
225            expected.assign_test_pattern(test_offset);
226            if received != expected {
227                return Err("Test pattern mismatch".to_string());
228            }
229        }
230
231        connected.store(true, Ordering::SeqCst);
232
233        // Receive loop
234        let srate = info.nominal_srate();
235        let mut last_timestamp = 0.0f64;
236
237        loop {
238            if shutdown.load(Ordering::Relaxed) {
239                break;
240            }
241
242            let mut sample = if use_proto_100 {
243                read_sample_async_100(&mut reader, fmt, nch).await?
244            } else {
245                read_sample_async(&mut reader, fmt, nch).await?
246            };
247
248            // Deduce timestamp if needed
249            if sample.timestamp == DEDUCED_TIMESTAMP {
250                sample.timestamp = last_timestamp;
251                if srate != IRREGULAR_RATE {
252                    sample.timestamp += 1.0 / srate;
253                }
254            }
255            last_timestamp = sample.timestamp;
256
257            samples_avail.fetch_add(1, Ordering::Relaxed);
258            if tx.send(sample).is_err() {
259                break;
260            }
261        }
262
263        Ok(())
264    }
265
266    /// Try to connect via IPv6 first (if the stream advertises a v6 port),
267    /// then fall back to IPv4.
268    async fn try_connect(info: &StreamInfo) -> Result<TcpStream, String> {
269        // Try IPv6 if the stream has a v6 data port
270        let v6_port = info.v6data_port();
271        if v6_port > 0 {
272            let v6_addr = info.v6address();
273            let host = if v6_addr.is_empty() {
274                "::1".to_string()
275            } else {
276                v6_addr
277            };
278            let addr = format!("[{}]:{}", host, v6_port);
279            log::trace!("[inlet] Trying IPv6 {}...", addr);
280            match TcpStream::connect(&addr).await {
281                Ok(stream) => {
282                    log::trace!("[inlet] Connected via IPv6");
283                    return Ok(stream);
284                }
285                Err(e) => {
286                    log::trace!("[inlet] IPv6 connect failed: {}, trying IPv4", e);
287                }
288            }
289        }
290
291        // Fall back to IPv4
292        let port = info.v4data_port();
293        let addr_str = info.v4address();
294        let host = if addr_str.is_empty() {
295            "127.0.0.1".to_string()
296        } else {
297            addr_str
298        };
299        let addr = format!("{}:{}", host, port);
300        log::trace!("[inlet] Connecting IPv4 {}...", addr);
301        let stream = TcpStream::connect(&addr).await.map_err(|e| {
302            log::trace!("[inlet] Connect error: {}", e);
303            e.to_string()
304        })?;
305        log::trace!("[inlet] Connected via IPv4");
306        Ok(stream)
307    }
308
309    /// Pull a single float sample. Returns the timestamp, or 0 on timeout.
310    pub fn pull_sample_f(&self, buffer: &mut [f32], timeout: f64) -> Result<f64, String> {
311        let sample = self.pull_sample_raw(timeout)?;
312        match sample {
313            Some(s) => {
314                s.retrieve_f32(buffer);
315                self.samples_available.fetch_sub(1, Ordering::Relaxed);
316                Ok(self.postprocess_timestamp(s.timestamp))
317            }
318            None => Ok(0.0),
319        }
320    }
321
322    pub fn pull_sample_d(&self, buffer: &mut [f64], timeout: f64) -> Result<f64, String> {
323        let sample = self.pull_sample_raw(timeout)?;
324        match sample {
325            Some(s) => {
326                s.retrieve_f64(buffer);
327                self.samples_available.fetch_sub(1, Ordering::Relaxed);
328                Ok(self.postprocess_timestamp(s.timestamp))
329            }
330            None => Ok(0.0),
331        }
332    }
333
334    pub fn pull_sample_i32(&self, buffer: &mut [i32], timeout: f64) -> Result<f64, String> {
335        let sample = self.pull_sample_raw(timeout)?;
336        match sample {
337            Some(s) => {
338                s.retrieve_i32(buffer);
339                self.samples_available.fetch_sub(1, Ordering::Relaxed);
340                Ok(self.postprocess_timestamp(s.timestamp))
341            }
342            None => Ok(0.0),
343        }
344    }
345
346    pub fn pull_sample_i16(&self, buffer: &mut [i16], timeout: f64) -> Result<f64, String> {
347        let sample = self.pull_sample_raw(timeout)?;
348        match sample {
349            Some(s) => {
350                s.retrieve_i16(buffer);
351                self.samples_available.fetch_sub(1, Ordering::Relaxed);
352                Ok(self.postprocess_timestamp(s.timestamp))
353            }
354            None => Ok(0.0),
355        }
356    }
357
358    pub fn pull_sample_i64(&self, buffer: &mut [i64], timeout: f64) -> Result<f64, String> {
359        let sample = self.pull_sample_raw(timeout)?;
360        match sample {
361            Some(s) => {
362                s.retrieve_i64(buffer);
363                self.samples_available.fetch_sub(1, Ordering::Relaxed);
364                Ok(self.postprocess_timestamp(s.timestamp))
365            }
366            None => Ok(0.0),
367        }
368    }
369
370    pub fn pull_sample_str(&self, timeout: f64) -> Result<(Vec<String>, f64), String> {
371        let sample = self.pull_sample_raw(timeout)?;
372        match sample {
373            Some(s) => {
374                let strings = s.retrieve_strings();
375                self.samples_available.fetch_sub(1, Ordering::Relaxed);
376                Ok((strings, self.postprocess_timestamp(s.timestamp)))
377            }
378            None => Ok((Vec::new(), 0.0)),
379        }
380    }
381
382    fn pull_sample_raw(&self, timeout: f64) -> Result<Option<Sample>, String> {
383        if timeout <= 0.0 {
384            match self.sample_rx.try_recv() {
385                Ok(s) => Ok(Some(s)),
386                Err(_) => Ok(None),
387            }
388        } else if timeout >= FOREVER {
389            match self.sample_rx.recv() {
390                Ok(s) => Ok(Some(s)),
391                Err(_) => Err("channel closed".to_string()),
392            }
393        } else {
394            match self
395                .sample_rx
396                .recv_timeout(Duration::from_secs_f64(timeout))
397            {
398                Ok(s) => Ok(Some(s)),
399                Err(crossbeam_channel::RecvTimeoutError::Timeout) => Ok(None),
400                Err(_) => Err("channel closed".to_string()),
401            }
402        }
403    }
404
405    fn postprocess_timestamp(&self, ts: f64) -> f64 {
406        let flags = self.post_processing.load(Ordering::Relaxed);
407        if flags == PROC_NONE {
408            return ts;
409        }
410        let mut proc = self.postproc.lock();
411        proc.set_clock_offset(*self.time_correction.lock());
412        proc.process(ts)
413    }
414
415    pub fn close_stream(&self) {
416        // The receiver thread will notice shutdown
417    }
418
419    /// Estimate the clock offset between this machine and the outlet's machine.
420    /// Uses NTP-like probing against the outlet's UDP service port.
421    pub fn time_correction(&self, timeout: f64) -> f64 {
422        let host = {
423            let v4 = self.info.v4address();
424            if v4.is_empty() {
425                "127.0.0.1".to_string()
426            } else {
427                v4
428            }
429        };
430        let port = self.info.v4service_port();
431        let offset = crate::time_receiver::time_correction(&host, port, timeout);
432        *self.time_correction.lock() = offset;
433        offset
434    }
435
436    pub fn set_postprocessing(&self, flags: u32) {
437        self.post_processing.store(flags, Ordering::Relaxed);
438        let srate = self.info.nominal_srate();
439        let halftime = crate::config::CONFIG.smoothing_halftime;
440        *self.postproc.lock() =
441            crate::postproc::TimestampPostProcessor::new(flags, srate, halftime);
442    }
443
444    pub fn samples_available(&self) -> u32 {
445        self.sample_rx.len() as u32
446    }
447
448    pub fn flush(&self) -> u32 {
449        let mut count = 0u32;
450        while self.sample_rx.try_recv().is_ok() {
451            count += 1;
452        }
453        count
454    }
455
456    pub fn was_clock_reset(&self) -> bool {
457        false
458    }
459
460    pub fn smoothing_halftime(&self, value: f32) {
461        let flags = self.post_processing.load(Ordering::Relaxed);
462        let srate = self.info.nominal_srate();
463        *self.postproc.lock() = crate::postproc::TimestampPostProcessor::new(flags, srate, value);
464    }
465
466    pub fn get_fullinfo(&self, _timeout: f64) -> StreamInfo {
467        self.info.clone()
468    }
469}
470
471impl Drop for StreamInlet {
472    fn drop(&mut self) {
473        self.shutdown.store(true, Ordering::Relaxed);
474    }
475}
476
477/// Read a sample from an async reader using protocol 1.10
478async fn read_sample_async(
479    reader: &mut BufReader<TcpStream>,
480    fmt: ChannelFormat,
481    num_channels: u32,
482) -> Result<Sample, String> {
483    use crate::sample::SampleData;
484
485    let mut tag = [0u8; 1];
486    reader
487        .read_exact(&mut tag)
488        .await
489        .map_err(|e| e.to_string())?;
490
491    let timestamp = if tag[0] == TAG_DEDUCED_TIMESTAMP {
492        DEDUCED_TIMESTAMP
493    } else {
494        let mut ts_bytes = [0u8; 8];
495        reader
496            .read_exact(&mut ts_bytes)
497            .await
498            .map_err(|e| e.to_string())?;
499        f64::from_le_bytes(ts_bytes)
500    };
501
502    let n = num_channels as usize;
503    let data = match fmt {
504        ChannelFormat::Float32 => {
505            let mut raw = vec![0u8; n * 4];
506            reader
507                .read_exact(&mut raw)
508                .await
509                .map_err(|e| e.to_string())?;
510            SampleData::Float32(
511                raw.chunks_exact(4)
512                    .map(|c| f32::from_le_bytes(c.try_into().unwrap()))
513                    .collect(),
514            )
515        }
516        ChannelFormat::Double64 => {
517            let mut raw = vec![0u8; n * 8];
518            reader
519                .read_exact(&mut raw)
520                .await
521                .map_err(|e| e.to_string())?;
522            SampleData::Double64(
523                raw.chunks_exact(8)
524                    .map(|c| f64::from_le_bytes(c.try_into().unwrap()))
525                    .collect(),
526            )
527        }
528        ChannelFormat::Int32 => {
529            let mut raw = vec![0u8; n * 4];
530            reader
531                .read_exact(&mut raw)
532                .await
533                .map_err(|e| e.to_string())?;
534            SampleData::Int32(
535                raw.chunks_exact(4)
536                    .map(|c| i32::from_le_bytes(c.try_into().unwrap()))
537                    .collect(),
538            )
539        }
540        ChannelFormat::Int16 => {
541            let mut raw = vec![0u8; n * 2];
542            reader
543                .read_exact(&mut raw)
544                .await
545                .map_err(|e| e.to_string())?;
546            SampleData::Int16(
547                raw.chunks_exact(2)
548                    .map(|c| i16::from_le_bytes(c.try_into().unwrap()))
549                    .collect(),
550            )
551        }
552        ChannelFormat::Int8 => {
553            let mut raw = vec![0u8; n];
554            reader
555                .read_exact(&mut raw)
556                .await
557                .map_err(|e| e.to_string())?;
558            SampleData::Int8(raw.into_iter().map(|b| b as i8).collect())
559        }
560        ChannelFormat::Int64 => {
561            let mut raw = vec![0u8; n * 8];
562            reader
563                .read_exact(&mut raw)
564                .await
565                .map_err(|e| e.to_string())?;
566            SampleData::Int64(
567                raw.chunks_exact(8)
568                    .map(|c| i64::from_le_bytes(c.try_into().unwrap()))
569                    .collect(),
570            )
571        }
572        ChannelFormat::String | ChannelFormat::Undefined => {
573            let mut strings = Vec::with_capacity(n);
574            for _ in 0..n {
575                let mut lenbytes = [0u8; 1];
576                reader
577                    .read_exact(&mut lenbytes)
578                    .await
579                    .map_err(|e| e.to_string())?;
580                let len: usize = match lenbytes[0] {
581                    1 => {
582                        let mut b = [0u8; 1];
583                        reader.read_exact(&mut b).await.map_err(|e| e.to_string())?;
584                        b[0] as usize
585                    }
586                    4 => {
587                        let mut b = [0u8; 4];
588                        reader.read_exact(&mut b).await.map_err(|e| e.to_string())?;
589                        u32::from_le_bytes(b) as usize
590                    }
591                    8 => {
592                        let mut b = [0u8; 8];
593                        reader.read_exact(&mut b).await.map_err(|e| e.to_string())?;
594                        u64::from_le_bytes(b) as usize
595                    }
596                    _ => return Err("invalid varlen int".to_string()),
597                };
598                let mut sbuf = vec![0u8; len];
599                reader
600                    .read_exact(&mut sbuf)
601                    .await
602                    .map_err(|e| e.to_string())?;
603                strings.push(String::from_utf8_lossy(&sbuf).into_owned());
604            }
605            SampleData::StringData(strings)
606        }
607    };
608
609    Ok(Sample {
610        timestamp,
611        pushthrough: true,
612        data,
613    })
614}
615
616/// Read a sample from an async reader using protocol 1.00 (always 8-byte timestamp, 4-byte string lengths).
617async fn read_sample_async_100(
618    reader: &mut BufReader<TcpStream>,
619    fmt: ChannelFormat,
620    num_channels: u32,
621) -> Result<Sample, String> {
622    use crate::sample::SampleData;
623
624    // Protocol 1.00: always 8-byte timestamp
625    let mut ts_bytes = [0u8; 8];
626    reader
627        .read_exact(&mut ts_bytes)
628        .await
629        .map_err(|e| e.to_string())?;
630    let timestamp = f64::from_le_bytes(ts_bytes);
631
632    let n = num_channels as usize;
633    let data = match fmt {
634        ChannelFormat::Float32 => {
635            let mut raw = vec![0u8; n * 4];
636            reader
637                .read_exact(&mut raw)
638                .await
639                .map_err(|e| e.to_string())?;
640            SampleData::Float32(
641                raw.chunks_exact(4)
642                    .map(|c| f32::from_le_bytes(c.try_into().unwrap()))
643                    .collect(),
644            )
645        }
646        ChannelFormat::Double64 => {
647            let mut raw = vec![0u8; n * 8];
648            reader
649                .read_exact(&mut raw)
650                .await
651                .map_err(|e| e.to_string())?;
652            SampleData::Double64(
653                raw.chunks_exact(8)
654                    .map(|c| f64::from_le_bytes(c.try_into().unwrap()))
655                    .collect(),
656            )
657        }
658        ChannelFormat::Int32 => {
659            let mut raw = vec![0u8; n * 4];
660            reader
661                .read_exact(&mut raw)
662                .await
663                .map_err(|e| e.to_string())?;
664            SampleData::Int32(
665                raw.chunks_exact(4)
666                    .map(|c| i32::from_le_bytes(c.try_into().unwrap()))
667                    .collect(),
668            )
669        }
670        ChannelFormat::Int16 => {
671            let mut raw = vec![0u8; n * 2];
672            reader
673                .read_exact(&mut raw)
674                .await
675                .map_err(|e| e.to_string())?;
676            SampleData::Int16(
677                raw.chunks_exact(2)
678                    .map(|c| i16::from_le_bytes(c.try_into().unwrap()))
679                    .collect(),
680            )
681        }
682        ChannelFormat::Int8 => {
683            let mut raw = vec![0u8; n];
684            reader
685                .read_exact(&mut raw)
686                .await
687                .map_err(|e| e.to_string())?;
688            SampleData::Int8(raw.into_iter().map(|b| b as i8).collect())
689        }
690        ChannelFormat::Int64 => {
691            let mut raw = vec![0u8; n * 8];
692            reader
693                .read_exact(&mut raw)
694                .await
695                .map_err(|e| e.to_string())?;
696            SampleData::Int64(
697                raw.chunks_exact(8)
698                    .map(|c| i64::from_le_bytes(c.try_into().unwrap()))
699                    .collect(),
700            )
701        }
702        ChannelFormat::String | ChannelFormat::Undefined => {
703            let mut strings = Vec::with_capacity(n);
704            for _ in 0..n {
705                // Protocol 1.00: always 4-byte length
706                let mut len_bytes = [0u8; 4];
707                reader
708                    .read_exact(&mut len_bytes)
709                    .await
710                    .map_err(|e| e.to_string())?;
711                let len = u32::from_le_bytes(len_bytes) as usize;
712                let mut sbuf = vec![0u8; len];
713                reader
714                    .read_exact(&mut sbuf)
715                    .await
716                    .map_err(|e| e.to_string())?;
717                strings.push(String::from_utf8_lossy(&sbuf).into_owned());
718            }
719            SampleData::StringData(strings)
720        }
721    };
722
723    Ok(Sample {
724        timestamp,
725        pushthrough: true,
726        data,
727    })
728}