nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30use tracing::warn;
31
32use crate::client::PRODUCTION_STREAMING_CLIENT;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::notifier::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46    pub max_points_per_record: usize,
47    pub max_request_delay: Duration,
48    pub max_buffered_requests: usize,
49    pub request_dispatcher_tasks: usize,
50}
51
52impl Default for NominalStreamOpts {
53    fn default() -> Self {
54        Self {
55            max_points_per_record: 250_000,
56            max_request_delay: Duration::from_millis(100),
57            max_buffered_requests: 4,
58            request_dispatcher_tasks: 8,
59        }
60    }
61}
62
63#[derive(Debug, Default)]
64pub struct NominalDatasetStreamBuilder {
65    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
66    stream_to_file: Option<PathBuf>,
67    file_fallback: Option<PathBuf>,
68    opts: NominalStreamOpts,
69}
70
71impl NominalDatasetStreamBuilder {
72    pub fn new() -> NominalDatasetStreamBuilder {
73        NominalDatasetStreamBuilder {
74            ..Default::default()
75        }
76    }
77
78    pub fn stream_to_core(
79        mut self,
80        token: BearerToken,
81        dataset: ResourceIdentifier,
82        handle: tokio::runtime::Handle,
83    ) -> Self {
84        self.stream_to_core = Some((token, dataset, handle));
85        self
86    }
87    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
88        self.stream_to_file = Some(file_path.into());
89        self
90    }
91
92    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
93        self.file_fallback = Some(file_path.into());
94        self
95    }
96
97    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
98        self.opts = opts;
99        self
100    }
101
102    pub fn build(self) -> NominalDatasetStream {
103        let core_consumer = self.core_consumer();
104        let file_consumer = self.file_consumer();
105        let fallback_consumer = self.fallback_consumer();
106
107        match (core_consumer, file_consumer, fallback_consumer) {
108            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
109            (Some(_), Some(_), Some(_)) => {
110                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
111            }
112            (Some(core), None, None) => self.into_stream(core),
113            (Some(core), None, Some(fallback)) => {
114                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
115            }
116            (None, Some(file), None) => self.into_stream(file),
117            (None, Some(file), Some(fallback)) => {
118                // todo: should this even be supported?
119                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
120            }
121            (Some(core), Some(file), None) => {
122                self.into_stream(DualWriteRequestConsumer::new(core, file))
123            }
124        }
125    }
126
127    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
128        self.stream_to_core
129            .as_ref()
130            .map(|(token, dataset, handle)| {
131                NominalCoreConsumer::new(
132                    PRODUCTION_STREAMING_CLIENT.clone(),
133                    handle.clone(),
134                    token.clone(),
135                    dataset.clone(),
136                )
137            })
138    }
139
140    fn file_consumer(&self) -> Option<AvroFileConsumer> {
141        self.stream_to_file
142            .as_ref()
143            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
144    }
145
146    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
147        self.file_fallback
148            .as_ref()
149            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
150    }
151
152    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
153        let logging_consumer =
154            ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
155        NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
156    }
157}
158
159// for backcompat, new code should use NominalDatasetStream
160#[deprecated]
161pub type NominalDatasourceStream = NominalDatasetStream;
162
163pub struct NominalDatasetStream {
164    opts: NominalStreamOpts,
165    running: Arc<AtomicBool>,
166    unflushed_points: Arc<AtomicUsize>,
167    primary_buffer: Arc<SeriesBuffer>,
168    secondary_buffer: Arc<SeriesBuffer>,
169    primary_handle: thread::JoinHandle<()>,
170    secondary_handle: thread::JoinHandle<()>,
171}
172
173impl NominalDatasetStream {
174    pub fn builder() -> NominalDatasetStreamBuilder {
175        NominalDatasetStreamBuilder::new()
176    }
177
178    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
179        consumer: C,
180        opts: NominalStreamOpts,
181    ) -> Self {
182        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
183        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
184
185        let (request_tx, request_rx) =
186            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
187
188        let running = Arc::new(AtomicBool::new(true));
189        let unflushed_points = Arc::new(AtomicUsize::new(0));
190
191        let primary_handle = thread::Builder::new()
192            .name("nmstream_primary".to_string())
193            .spawn({
194                let points_buffer = Arc::clone(&primary_buffer);
195                let running = running.clone();
196                let tx = request_tx.clone();
197                move || {
198                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
199                }
200            })
201            .unwrap();
202
203        let secondary_handle = thread::Builder::new()
204            .name("nmstream_secondary".to_string())
205            .spawn({
206                let secondary_buffer = Arc::clone(&secondary_buffer);
207                let running = running.clone();
208                move || {
209                    batch_processor(
210                        running,
211                        secondary_buffer,
212                        request_tx,
213                        opts.max_request_delay,
214                    );
215                }
216            })
217            .unwrap();
218
219        let consumer = Arc::new(consumer);
220
221        for i in 0..opts.request_dispatcher_tasks {
222            thread::Builder::new()
223                .name(format!("nmstream_dispatch_{i}"))
224                .spawn({
225                    let running = Arc::clone(&running);
226                    let unflushed_points = Arc::clone(&unflushed_points);
227                    let rx = request_rx.clone();
228                    let consumer = consumer.clone();
229                    move || {
230                        debug!("starting request dispatcher");
231                        request_dispatcher(running, unflushed_points, rx, consumer);
232                    }
233                })
234                .unwrap();
235        }
236
237        NominalDatasetStream {
238            opts,
239            running,
240            unflushed_points,
241            primary_buffer,
242            secondary_buffer,
243            primary_handle,
244            secondary_handle,
245        }
246    }
247
248    pub fn double_writer<'a>(
249        &'a self,
250        channel_descriptor: &'a ChannelDescriptor,
251    ) -> NominalDoubleWriter<'a> {
252        NominalDoubleWriter {
253            writer: NominalChannelWriter::new(self, channel_descriptor),
254        }
255    }
256
257    pub fn string_writer<'a>(
258        &'a self,
259        channel_descriptor: &'a ChannelDescriptor,
260    ) -> NominalStringWriter<'a> {
261        NominalStringWriter {
262            writer: NominalChannelWriter::new(self, channel_descriptor),
263        }
264    }
265
266    pub fn integer_writer<'a>(
267        &'a self,
268        channel_descriptor: &'a ChannelDescriptor,
269    ) -> NominalIntegerWriter<'a> {
270        NominalIntegerWriter {
271            writer: NominalChannelWriter::new(self, channel_descriptor),
272        }
273    }
274
275    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
276        let new_points = new_points.into_points();
277        let new_count = points_len(&new_points);
278
279        self.when_capacity(new_count, |mut sb| {
280            sb.extend(channel_descriptor, new_points)
281        });
282    }
283
284    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
285        self.unflushed_points
286            .fetch_add(new_count, Ordering::Release);
287
288        if self.primary_buffer.has_capacity(new_count) {
289            debug!("adding {} points to primary buffer", new_count);
290            callback(self.primary_buffer.lock());
291        } else if self.secondary_buffer.has_capacity(new_count) {
292            // primary buffer is definitely full
293            self.primary_handle.thread().unpark();
294            debug!("adding {} points to secondary buffer", new_count);
295            callback(self.secondary_buffer.lock());
296        } else {
297            warn!(
298                "both buffers full, picking least recently flushed buffer to append single point"
299            );
300            let buf = if self.primary_buffer < self.secondary_buffer {
301                debug!("waiting for primary buffer to flush...");
302                self.primary_handle.thread().unpark();
303                &self.primary_buffer
304            } else {
305                debug!("waiting for secondary buffer to flush...");
306                self.secondary_handle.thread().unpark();
307                &self.secondary_buffer
308            };
309
310            buf.on_notify(callback);
311        }
312    }
313}
314
315pub struct NominalChannelWriter<'ds, T>
316where
317    Vec<T>: IntoPoints,
318{
319    channel: &'ds ChannelDescriptor,
320    stream: &'ds NominalDatasetStream,
321    last_flushed_at: Instant,
322    unflushed: Vec<T>,
323}
324
325impl<T> NominalChannelWriter<'_, T>
326where
327    Vec<T>: IntoPoints,
328{
329    fn new<'ds>(
330        stream: &'ds NominalDatasetStream,
331        channel: &'ds ChannelDescriptor,
332    ) -> NominalChannelWriter<'ds, T> {
333        NominalChannelWriter {
334            channel,
335            stream,
336            last_flushed_at: Instant::now(),
337            unflushed: vec![],
338        }
339    }
340
341    fn push_point(&mut self, point: T) {
342        self.unflushed.push(point);
343        if self.unflushed.len() >= self.stream.opts.max_points_per_record
344            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
345        {
346            debug!(
347                "conditionally flushing {:?}, ({} points, {:?} since last)",
348                self.channel,
349                self.unflushed.len(),
350                self.last_flushed_at.elapsed()
351            );
352            self.flush();
353        }
354    }
355
356    fn flush(&mut self) {
357        if self.unflushed.is_empty() {
358            return;
359        }
360        info!(
361            "flushing writer for {:?} with {} points",
362            self.channel,
363            self.unflushed.len()
364        );
365        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
366            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
367            buf.extend(self.channel, to_flush);
368            self.last_flushed_at = Instant::now();
369        })
370    }
371}
372
373impl<T> Drop for NominalChannelWriter<'_, T>
374where
375    Vec<T>: IntoPoints,
376{
377    fn drop(&mut self) {
378        info!("flushing then dropping writer for: {:?}", self.channel);
379        self.flush();
380    }
381}
382
383pub struct NominalDoubleWriter<'ds> {
384    writer: NominalChannelWriter<'ds, DoublePoint>,
385}
386
387impl NominalDoubleWriter<'_> {
388    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
389        self.writer.push_point(DoublePoint {
390            timestamp: Some(timestamp.into_timestamp()),
391            value,
392        });
393    }
394}
395
396pub struct NominalIntegerWriter<'ds> {
397    writer: NominalChannelWriter<'ds, IntegerPoint>,
398}
399
400impl NominalIntegerWriter<'_> {
401    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
402        self.writer.push_point(IntegerPoint {
403            timestamp: Some(timestamp.into_timestamp()),
404            value,
405        });
406    }
407}
408
409pub struct NominalStringWriter<'ds> {
410    writer: NominalChannelWriter<'ds, StringPoint>,
411}
412
413impl NominalStringWriter<'_> {
414    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
415        self.writer.push_point(StringPoint {
416            timestamp: Some(timestamp.into_timestamp()),
417            value: value.into(),
418        });
419    }
420}
421
422struct SeriesBuffer {
423    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
424    count: AtomicUsize,
425    flush_time: AtomicU64,
426    condvar: Condvar,
427    max_capacity: usize,
428}
429
430struct SeriesBufferGuard<'sb> {
431    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
432    count: &'sb AtomicUsize,
433}
434
435impl SeriesBufferGuard<'_> {
436    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
437        let points = points.into_points();
438        let new_point_count = points_len(&points);
439
440        if !self.sb.contains_key(channel_descriptor) {
441            self.sb.insert(channel_descriptor.clone(), points);
442        } else {
443            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
444                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
445                    existing.points.extend(new.points)
446                }
447                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
448                    existing.points.extend(new.points)
449                }
450                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
451                    existing.points.extend(new.points)
452                }
453                (_, _) => {
454                    // todo: improve error
455                    panic!("mismatched types");
456                }
457            }
458        }
459
460        self.count.fetch_add(new_point_count, Ordering::Release);
461    }
462}
463
464impl PartialEq for SeriesBuffer {
465    fn eq(&self, other: &Self) -> bool {
466        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
467    }
468}
469
470impl PartialOrd for SeriesBuffer {
471    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
472        let flush_time = self.flush_time.load(Ordering::Acquire);
473        let other_flush_time = other.flush_time.load(Ordering::Acquire);
474        flush_time.partial_cmp(&other_flush_time)
475    }
476}
477
478impl SeriesBuffer {
479    fn new(capacity: usize) -> Self {
480        Self {
481            points: Mutex::new(HashMap::new()),
482            count: AtomicUsize::new(0),
483            flush_time: AtomicU64::new(0),
484            condvar: Condvar::new(),
485            max_capacity: capacity,
486        }
487    }
488
489    /// Checks if the buffer has enough capacity to add new points.
490    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
491    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
492    /// to handle splitting batches of points across multiple requests.
493    fn has_capacity(&self, new_points_count: usize) -> bool {
494        let count = self.count.load(Ordering::Acquire);
495        count == 0 || count + new_points_count <= self.max_capacity
496    }
497
498    fn lock(&self) -> SeriesBufferGuard<'_> {
499        SeriesBufferGuard {
500            sb: self.points.lock(),
501            count: &self.count,
502        }
503    }
504
505    fn take(&self) -> (usize, Vec<Series>) {
506        let mut points = self.lock();
507        self.flush_time.store(
508            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
509            Ordering::Release,
510        );
511        let result = points
512            .sb
513            .drain()
514            .map(|(ChannelDescriptor { name, tags }, points)| {
515                let channel = Channel { name };
516                let points_obj = Points {
517                    points_type: Some(points),
518                };
519                Series {
520                    channel: Some(channel),
521                    tags: tags
522                        .map(|tags| tags.into_iter().collect())
523                        .unwrap_or_default(),
524                    points: Some(points_obj),
525                }
526            })
527            .collect();
528        let result_count = self
529            .count
530            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
531            .unwrap();
532        (result_count, result)
533    }
534
535    fn is_empty(&self) -> bool {
536        self.count() == 0
537    }
538
539    fn count(&self) -> usize {
540        self.count.load(Ordering::Acquire)
541    }
542
543    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
544        let mut points_lock = self.points.lock();
545        // concurrency bug without this - the buffer could have been emptied since we
546        // checked the count, so this will wait forever & block any new points from entering
547        if !points_lock.is_empty() {
548            self.condvar.wait(&mut points_lock);
549        } else {
550            debug!("buffer emptied since last check, skipping condvar wait");
551        }
552        on_notify(SeriesBufferGuard {
553            sb: points_lock,
554            count: &self.count,
555        });
556    }
557
558    fn notify(&self) -> bool {
559        self.condvar.notify_one()
560    }
561}
562
563fn batch_processor(
564    running: Arc<AtomicBool>,
565    points_buffer: Arc<SeriesBuffer>,
566    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
567    max_request_delay: Duration,
568) {
569    loop {
570        debug!("starting processor loop");
571        if points_buffer.is_empty() {
572            if !running.load(Ordering::Acquire) {
573                debug!("batch processor thread exiting due to running flag");
574                drop(request_chan);
575                break;
576            } else {
577                debug!("empty points buffer, waiting");
578                thread::park_timeout(max_request_delay);
579            }
580            continue;
581        }
582        let (point_count, series) = points_buffer.take();
583
584        if points_buffer.notify() {
585            debug!("notified one waiting thread after clearing points buffer");
586        }
587
588        let write_request = WriteRequestNominal { series };
589
590        if request_chan.is_full() {
591            warn!("request channel is full");
592        }
593        let rep = request_chan.send((write_request, point_count));
594        debug!("queued request for processing");
595        if rep.is_err() {
596            error!("failed to send request to dispatcher");
597        } else {
598            debug!("finished submitting request");
599        }
600
601        thread::park_timeout(max_request_delay);
602    }
603    debug!("batch processor thread exiting");
604}
605
606impl Drop for NominalDatasetStream {
607    fn drop(&mut self) {
608        debug!("starting drop for NominalDatasetStream");
609        self.running.store(false, Ordering::Release);
610        while self.unflushed_points.load(Ordering::Acquire) > 0 {
611            debug!(
612                "waiting for all points to be flushed before dropping stream, {} points remaining",
613                self.unflushed_points.load(Ordering::Acquire)
614            );
615            // todo: reduce this + give up after some maximum timeout is reached
616            thread::sleep(Duration::from_millis(50));
617        }
618    }
619}
620
621fn request_dispatcher<C: WriteRequestConsumer + 'static>(
622    running: Arc<AtomicBool>,
623    unflushed_points: Arc<AtomicUsize>,
624    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
625    consumer: Arc<C>,
626) {
627    let mut total_request_time = 0;
628    loop {
629        match request_rx.recv() {
630            Ok((request, point_count)) => {
631                debug!("received writerequest from channel");
632                let req_start = Instant::now();
633                match consumer.consume(&request) {
634                    Ok(_) => {
635                        let time = req_start.elapsed().as_millis();
636                        debug!("request of {} points sent in {} ms", point_count, time);
637                        total_request_time += time as u64;
638                    }
639                    Err(e) => {
640                        error!("Failed to send request: {e:?}");
641                    }
642                }
643                unflushed_points.fetch_sub(point_count, Ordering::Release);
644
645                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
646                {
647                    info!("all points flushed, closing dispatcher thread");
648                    // notify the processor thread that all points have been flushed
649                    drop(request_rx);
650                    break;
651                }
652            }
653            Err(e) => {
654                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
655                break;
656            }
657        }
658    }
659    debug!(
660        "request dispatcher thread exiting. total request time: {}",
661        total_request_time
662    );
663}
664
665fn points_len(points_type: &PointsType) -> usize {
666    match points_type {
667        PointsType::DoublePoints(points) => points.points.len(),
668        PointsType::StringPoints(points) => points.points.len(),
669        PointsType::IntegerPoints(points) => points.points.len(),
670    }
671}