nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::PRODUCTION_STREAMING_CLIENT;
32use crate::consumer::AvroFileConsumer;
33use crate::consumer::DualWriteRequestConsumer;
34use crate::consumer::ListeningWriteRequestConsumer;
35use crate::consumer::NominalCoreConsumer;
36use crate::consumer::RequestConsumerWithFallback;
37use crate::consumer::WriteRequestConsumer;
38use crate::notifier::LoggingListener;
39use crate::types::ChannelDescriptor;
40use crate::types::IntoPoints;
41use crate::types::IntoTimestamp;
42
43#[derive(Debug, Clone)]
44pub struct NominalStreamOpts {
45    pub max_points_per_record: usize,
46    pub max_request_delay: Duration,
47    pub max_buffered_requests: usize,
48    pub request_dispatcher_tasks: usize,
49}
50
51impl Default for NominalStreamOpts {
52    fn default() -> Self {
53        Self {
54            max_points_per_record: 250_000,
55            max_request_delay: Duration::from_millis(100),
56            max_buffered_requests: 4,
57            request_dispatcher_tasks: 8,
58        }
59    }
60}
61
62#[derive(Debug, Default)]
63pub struct NominalDatasetStreamBuilder {
64    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
65    stream_to_file: Option<PathBuf>,
66    file_fallback: Option<PathBuf>,
67    opts: NominalStreamOpts,
68}
69
70impl NominalDatasetStreamBuilder {
71    pub fn new() -> NominalDatasetStreamBuilder {
72        NominalDatasetStreamBuilder {
73            ..Default::default()
74        }
75    }
76
77    pub fn stream_to_core(
78        mut self,
79        token: BearerToken,
80        dataset: ResourceIdentifier,
81        handle: tokio::runtime::Handle,
82    ) -> Self {
83        self.stream_to_core = Some((token, dataset, handle));
84        self
85    }
86    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
87        self.stream_to_file = Some(file_path.into());
88        self
89    }
90
91    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
92        self.file_fallback = Some(file_path.into());
93        self
94    }
95
96    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
97        self.opts = opts;
98        self
99    }
100
101    #[cfg(feature = "logging")]
102    pub fn enable_logging(self) -> Self {
103        use tracing_subscriber::layer::SubscriberExt;
104        use tracing_subscriber::util::SubscriberInitExt;
105
106        let subscriber = tracing_subscriber::registry()
107            .with(
108                tracing_subscriber::EnvFilter::builder()
109                    .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
110                    .from_env_lossy(),
111            )
112            .with(tracing_subscriber::fmt::layer());
113
114        if let Err(error) = subscriber.try_init() {
115            eprintln!("nominal streaming failed to enabled logging: {error}");
116        }
117
118        self
119    }
120
121    pub fn build(self) -> NominalDatasetStream {
122        let core_consumer = self.core_consumer();
123        let file_consumer = self.file_consumer();
124        let fallback_consumer = self.fallback_consumer();
125
126        match (core_consumer, file_consumer, fallback_consumer) {
127            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
128            (Some(_), Some(_), Some(_)) => {
129                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
130            }
131            (Some(core), None, None) => self.into_stream(core),
132            (Some(core), None, Some(fallback)) => {
133                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
134            }
135            (None, Some(file), None) => self.into_stream(file),
136            (None, Some(file), Some(fallback)) => {
137                // todo: should this even be supported?
138                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
139            }
140            (Some(core), Some(file), None) => {
141                self.into_stream(DualWriteRequestConsumer::new(core, file))
142            }
143        }
144    }
145
146    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
147        self.stream_to_core
148            .as_ref()
149            .map(|(token, dataset, handle)| {
150                NominalCoreConsumer::new(
151                    PRODUCTION_STREAMING_CLIENT.clone(),
152                    handle.clone(),
153                    token.clone(),
154                    dataset.clone(),
155                )
156            })
157    }
158
159    fn file_consumer(&self) -> Option<AvroFileConsumer> {
160        self.stream_to_file
161            .as_ref()
162            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
163    }
164
165    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
166        self.file_fallback
167            .as_ref()
168            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
169    }
170
171    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
172        let logging_consumer =
173            ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
174        NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
175    }
176}
177
178// for backcompat, new code should use NominalDatasetStream
179#[deprecated]
180pub type NominalDatasourceStream = NominalDatasetStream;
181
182pub struct NominalDatasetStream {
183    opts: NominalStreamOpts,
184    running: Arc<AtomicBool>,
185    unflushed_points: Arc<AtomicUsize>,
186    primary_buffer: Arc<SeriesBuffer>,
187    secondary_buffer: Arc<SeriesBuffer>,
188    primary_handle: thread::JoinHandle<()>,
189    secondary_handle: thread::JoinHandle<()>,
190}
191
192impl NominalDatasetStream {
193    pub fn builder() -> NominalDatasetStreamBuilder {
194        NominalDatasetStreamBuilder::new()
195    }
196
197    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
198        consumer: C,
199        opts: NominalStreamOpts,
200    ) -> Self {
201        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
202        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
203
204        let (request_tx, request_rx) =
205            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
206
207        let running = Arc::new(AtomicBool::new(true));
208        let unflushed_points = Arc::new(AtomicUsize::new(0));
209
210        let primary_handle = thread::Builder::new()
211            .name("nmstream_primary".to_string())
212            .spawn({
213                let points_buffer = Arc::clone(&primary_buffer);
214                let running = running.clone();
215                let tx = request_tx.clone();
216                move || {
217                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
218                }
219            })
220            .unwrap();
221
222        let secondary_handle = thread::Builder::new()
223            .name("nmstream_secondary".to_string())
224            .spawn({
225                let secondary_buffer = Arc::clone(&secondary_buffer);
226                let running = running.clone();
227                move || {
228                    batch_processor(
229                        running,
230                        secondary_buffer,
231                        request_tx,
232                        opts.max_request_delay,
233                    );
234                }
235            })
236            .unwrap();
237
238        let consumer = Arc::new(consumer);
239
240        for i in 0..opts.request_dispatcher_tasks {
241            thread::Builder::new()
242                .name(format!("nmstream_dispatch_{i}"))
243                .spawn({
244                    let running = Arc::clone(&running);
245                    let unflushed_points = Arc::clone(&unflushed_points);
246                    let rx = request_rx.clone();
247                    let consumer = consumer.clone();
248                    move || {
249                        debug!("starting request dispatcher");
250                        request_dispatcher(running, unflushed_points, rx, consumer);
251                    }
252                })
253                .unwrap();
254        }
255
256        NominalDatasetStream {
257            opts,
258            running,
259            unflushed_points,
260            primary_buffer,
261            secondary_buffer,
262            primary_handle,
263            secondary_handle,
264        }
265    }
266
267    pub fn double_writer<'a>(
268        &'a self,
269        channel_descriptor: &'a ChannelDescriptor,
270    ) -> NominalDoubleWriter<'a> {
271        NominalDoubleWriter {
272            writer: NominalChannelWriter::new(self, channel_descriptor),
273        }
274    }
275
276    pub fn string_writer<'a>(
277        &'a self,
278        channel_descriptor: &'a ChannelDescriptor,
279    ) -> NominalStringWriter<'a> {
280        NominalStringWriter {
281            writer: NominalChannelWriter::new(self, channel_descriptor),
282        }
283    }
284
285    pub fn integer_writer<'a>(
286        &'a self,
287        channel_descriptor: &'a ChannelDescriptor,
288    ) -> NominalIntegerWriter<'a> {
289        NominalIntegerWriter {
290            writer: NominalChannelWriter::new(self, channel_descriptor),
291        }
292    }
293
294    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
295        let new_points = new_points.into_points();
296        let new_count = points_len(&new_points);
297
298        self.when_capacity(new_count, |mut sb| {
299            sb.extend(channel_descriptor, new_points)
300        });
301    }
302
303    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
304        self.unflushed_points
305            .fetch_add(new_count, Ordering::Release);
306
307        if self.primary_buffer.has_capacity(new_count) {
308            debug!("adding {} points to primary buffer", new_count);
309            callback(self.primary_buffer.lock());
310        } else if self.secondary_buffer.has_capacity(new_count) {
311            // primary buffer is definitely full
312            self.primary_handle.thread().unpark();
313            debug!("adding {} points to secondary buffer", new_count);
314            callback(self.secondary_buffer.lock());
315        } else {
316            let buf = if self.primary_buffer < self.secondary_buffer {
317                info!("waiting for primary buffer to flush to append {new_count} points...");
318                self.primary_handle.thread().unpark();
319                &self.primary_buffer
320            } else {
321                info!("waiting for secondary buffer to flush to append {new_count} points...");
322                self.secondary_handle.thread().unpark();
323                &self.secondary_buffer
324            };
325
326            buf.on_notify(callback);
327        }
328    }
329}
330
331pub struct NominalChannelWriter<'ds, T>
332where
333    Vec<T>: IntoPoints,
334{
335    channel: &'ds ChannelDescriptor,
336    stream: &'ds NominalDatasetStream,
337    last_flushed_at: Instant,
338    unflushed: Vec<T>,
339}
340
341impl<T> NominalChannelWriter<'_, T>
342where
343    Vec<T>: IntoPoints,
344{
345    fn new<'ds>(
346        stream: &'ds NominalDatasetStream,
347        channel: &'ds ChannelDescriptor,
348    ) -> NominalChannelWriter<'ds, T> {
349        NominalChannelWriter {
350            channel,
351            stream,
352            last_flushed_at: Instant::now(),
353            unflushed: vec![],
354        }
355    }
356
357    fn push_point(&mut self, point: T) {
358        self.unflushed.push(point);
359        if self.unflushed.len() >= self.stream.opts.max_points_per_record
360            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
361        {
362            debug!(
363                "conditionally flushing {:?}, ({} points, {:?} since last)",
364                self.channel,
365                self.unflushed.len(),
366                self.last_flushed_at.elapsed()
367            );
368            self.flush();
369        }
370    }
371
372    fn flush(&mut self) {
373        if self.unflushed.is_empty() {
374            return;
375        }
376        info!(
377            "flushing writer for {:?} with {} points",
378            self.channel,
379            self.unflushed.len()
380        );
381        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
382            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
383            buf.extend(self.channel, to_flush);
384            self.last_flushed_at = Instant::now();
385        })
386    }
387}
388
389impl<T> Drop for NominalChannelWriter<'_, T>
390where
391    Vec<T>: IntoPoints,
392{
393    fn drop(&mut self) {
394        info!("flushing then dropping writer for: {:?}", self.channel);
395        self.flush();
396    }
397}
398
399pub struct NominalDoubleWriter<'ds> {
400    writer: NominalChannelWriter<'ds, DoublePoint>,
401}
402
403impl NominalDoubleWriter<'_> {
404    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
405        self.writer.push_point(DoublePoint {
406            timestamp: Some(timestamp.into_timestamp()),
407            value,
408        });
409    }
410}
411
412pub struct NominalIntegerWriter<'ds> {
413    writer: NominalChannelWriter<'ds, IntegerPoint>,
414}
415
416impl NominalIntegerWriter<'_> {
417    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
418        self.writer.push_point(IntegerPoint {
419            timestamp: Some(timestamp.into_timestamp()),
420            value,
421        });
422    }
423}
424
425pub struct NominalStringWriter<'ds> {
426    writer: NominalChannelWriter<'ds, StringPoint>,
427}
428
429impl NominalStringWriter<'_> {
430    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
431        self.writer.push_point(StringPoint {
432            timestamp: Some(timestamp.into_timestamp()),
433            value: value.into(),
434        });
435    }
436}
437
438struct SeriesBuffer {
439    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
440    count: AtomicUsize,
441    flush_time: AtomicU64,
442    condvar: Condvar,
443    max_capacity: usize,
444}
445
446struct SeriesBufferGuard<'sb> {
447    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
448    count: &'sb AtomicUsize,
449}
450
451impl SeriesBufferGuard<'_> {
452    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
453        let points = points.into_points();
454        let new_point_count = points_len(&points);
455
456        if !self.sb.contains_key(channel_descriptor) {
457            self.sb.insert(channel_descriptor.clone(), points);
458        } else {
459            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
460                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
461                    existing.points.extend(new.points)
462                }
463                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
464                    existing.points.extend(new.points)
465                }
466                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
467                    existing.points.extend(new.points)
468                }
469                (_, _) => {
470                    // todo: improve error
471                    panic!("mismatched types");
472                }
473            }
474        }
475
476        self.count.fetch_add(new_point_count, Ordering::Release);
477    }
478}
479
480impl PartialEq for SeriesBuffer {
481    fn eq(&self, other: &Self) -> bool {
482        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
483    }
484}
485
486impl PartialOrd for SeriesBuffer {
487    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
488        let flush_time = self.flush_time.load(Ordering::Acquire);
489        let other_flush_time = other.flush_time.load(Ordering::Acquire);
490        flush_time.partial_cmp(&other_flush_time)
491    }
492}
493
494impl SeriesBuffer {
495    fn new(capacity: usize) -> Self {
496        Self {
497            points: Mutex::new(HashMap::new()),
498            count: AtomicUsize::new(0),
499            flush_time: AtomicU64::new(0),
500            condvar: Condvar::new(),
501            max_capacity: capacity,
502        }
503    }
504
505    /// Checks if the buffer has enough capacity to add new points.
506    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
507    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
508    /// to handle splitting batches of points across multiple requests.
509    fn has_capacity(&self, new_points_count: usize) -> bool {
510        let count = self.count.load(Ordering::Acquire);
511        count == 0 || count + new_points_count <= self.max_capacity
512    }
513
514    fn lock(&self) -> SeriesBufferGuard<'_> {
515        SeriesBufferGuard {
516            sb: self.points.lock(),
517            count: &self.count,
518        }
519    }
520
521    fn take(&self) -> (usize, Vec<Series>) {
522        let mut points = self.lock();
523        self.flush_time.store(
524            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
525            Ordering::Release,
526        );
527        let result = points
528            .sb
529            .drain()
530            .map(|(ChannelDescriptor { name, tags }, points)| {
531                let channel = Channel { name };
532                let points_obj = Points {
533                    points_type: Some(points),
534                };
535                Series {
536                    channel: Some(channel),
537                    tags: tags
538                        .map(|tags| tags.into_iter().collect())
539                        .unwrap_or_default(),
540                    points: Some(points_obj),
541                }
542            })
543            .collect();
544        let result_count = self
545            .count
546            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
547            .unwrap();
548        (result_count, result)
549    }
550
551    fn is_empty(&self) -> bool {
552        self.count() == 0
553    }
554
555    fn count(&self) -> usize {
556        self.count.load(Ordering::Acquire)
557    }
558
559    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
560        let mut points_lock = self.points.lock();
561        // concurrency bug without this - the buffer could have been emptied since we
562        // checked the count, so this will wait forever & block any new points from entering
563        if !points_lock.is_empty() {
564            self.condvar.wait(&mut points_lock);
565        } else {
566            debug!("buffer emptied since last check, skipping condvar wait");
567        }
568        on_notify(SeriesBufferGuard {
569            sb: points_lock,
570            count: &self.count,
571        });
572    }
573
574    fn notify(&self) -> bool {
575        self.condvar.notify_one()
576    }
577}
578
579fn batch_processor(
580    running: Arc<AtomicBool>,
581    points_buffer: Arc<SeriesBuffer>,
582    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
583    max_request_delay: Duration,
584) {
585    loop {
586        debug!("starting processor loop");
587        if points_buffer.is_empty() {
588            if !running.load(Ordering::Acquire) {
589                debug!("batch processor thread exiting due to running flag");
590                drop(request_chan);
591                break;
592            } else {
593                debug!("empty points buffer, waiting");
594                thread::park_timeout(max_request_delay);
595            }
596            continue;
597        }
598        let (point_count, series) = points_buffer.take();
599
600        if points_buffer.notify() {
601            debug!("notified one waiting thread after clearing points buffer");
602        }
603
604        let write_request = WriteRequestNominal { series };
605
606        if request_chan.is_full() {
607            debug!("ready to queue request but request channel is full");
608        }
609        let rep = request_chan.send((write_request, point_count));
610        debug!("queued request for processing");
611        if rep.is_err() {
612            error!("failed to send request to dispatcher");
613        } else {
614            debug!("finished submitting request");
615        }
616
617        thread::park_timeout(max_request_delay);
618    }
619    debug!("batch processor thread exiting");
620}
621
622impl Drop for NominalDatasetStream {
623    fn drop(&mut self) {
624        debug!("starting drop for NominalDatasetStream");
625        self.running.store(false, Ordering::Release);
626        while self.unflushed_points.load(Ordering::Acquire) > 0 {
627            debug!(
628                "waiting for all points to be flushed before dropping stream, {} points remaining",
629                self.unflushed_points.load(Ordering::Acquire)
630            );
631            // todo: reduce this + give up after some maximum timeout is reached
632            thread::sleep(Duration::from_millis(50));
633        }
634    }
635}
636
637fn request_dispatcher<C: WriteRequestConsumer + 'static>(
638    running: Arc<AtomicBool>,
639    unflushed_points: Arc<AtomicUsize>,
640    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
641    consumer: Arc<C>,
642) {
643    let mut total_request_time = 0;
644    loop {
645        match request_rx.recv() {
646            Ok((request, point_count)) => {
647                debug!("received writerequest from channel");
648                let req_start = Instant::now();
649                match consumer.consume(&request) {
650                    Ok(_) => {
651                        let time = req_start.elapsed().as_millis();
652                        debug!("request of {} points sent in {} ms", point_count, time);
653                        total_request_time += time as u64;
654                    }
655                    Err(e) => {
656                        error!("Failed to send request: {e:?}");
657                    }
658                }
659                unflushed_points.fetch_sub(point_count, Ordering::Release);
660
661                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
662                {
663                    info!("all points flushed, closing dispatcher thread");
664                    // notify the processor thread that all points have been flushed
665                    drop(request_rx);
666                    break;
667                }
668            }
669            Err(e) => {
670                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
671                break;
672            }
673        }
674    }
675    debug!(
676        "request dispatcher thread exiting. total request time: {}",
677        total_request_time
678    );
679}
680
681fn points_len(points_type: &PointsType) -> usize {
682    match points_type {
683        PointsType::DoublePoints(points) => points.points.len(),
684        PointsType::StringPoints(points) => points.points.len(),
685        PointsType::IntegerPoints(points) => points.points.len(),
686    }
687}