nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::NominalApiClients;
32use crate::client::PRODUCTION_API_URL;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::notifier::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46    pub max_points_per_record: usize,
47    pub max_request_delay: Duration,
48    pub max_buffered_requests: usize,
49    pub request_dispatcher_tasks: usize,
50    pub base_api_url: String,
51}
52
53impl Default for NominalStreamOpts {
54    fn default() -> Self {
55        Self {
56            max_points_per_record: 250_000,
57            max_request_delay: Duration::from_millis(100),
58            max_buffered_requests: 4,
59            request_dispatcher_tasks: 8,
60            base_api_url: PRODUCTION_API_URL.to_string(),
61        }
62    }
63}
64
65#[derive(Debug, Default)]
66pub struct NominalDatasetStreamBuilder {
67    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
68    stream_to_file: Option<PathBuf>,
69    file_fallback: Option<PathBuf>,
70    opts: NominalStreamOpts,
71}
72
73impl NominalDatasetStreamBuilder {
74    pub fn new() -> NominalDatasetStreamBuilder {
75        NominalDatasetStreamBuilder {
76            ..Default::default()
77        }
78    }
79
80    pub fn stream_to_core(
81        mut self,
82        token: BearerToken,
83        dataset: ResourceIdentifier,
84        handle: tokio::runtime::Handle,
85    ) -> Self {
86        self.stream_to_core = Some((token, dataset, handle));
87        self
88    }
89
90    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
91        self.stream_to_file = Some(file_path.into());
92        self
93    }
94
95    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
96        self.file_fallback = Some(file_path.into());
97        self
98    }
99
100    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
101        self.opts = opts;
102        self
103    }
104
105    #[cfg(feature = "logging")]
106    pub fn enable_logging(self) -> Self {
107        use tracing_subscriber::layer::SubscriberExt;
108        use tracing_subscriber::util::SubscriberInitExt;
109
110        let subscriber = tracing_subscriber::registry()
111            .with(
112                tracing_subscriber::fmt::layer()
113                    .with_thread_ids(true)
114                    .with_thread_names(true)
115                    .with_line_number(true),
116            )
117            .with(
118                tracing_subscriber::EnvFilter::builder()
119                    .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into())
120                    .from_env_lossy(),
121            );
122
123        if let Err(error) = subscriber.try_init() {
124            eprintln!("nominal streaming failed to enable logging: {error}");
125        }
126
127        self
128    }
129
130    pub fn build(self) -> NominalDatasetStream {
131        let core_consumer = self.core_consumer();
132        let file_consumer = self.file_consumer();
133        let fallback_consumer = self.fallback_consumer();
134
135        match (core_consumer, file_consumer, fallback_consumer) {
136            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
137            (Some(_), Some(_), Some(_)) => {
138                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
139            }
140            (Some(core), None, None) => self.into_stream(core),
141            (Some(core), None, Some(fallback)) => {
142                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
143            }
144            (None, Some(file), None) => self.into_stream(file),
145            (None, Some(file), Some(fallback)) => {
146                // todo: should this even be supported?
147                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
148            }
149            (Some(core), Some(file), None) => {
150                self.into_stream(DualWriteRequestConsumer::new(core, file))
151            }
152        }
153    }
154
155    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
156        self.stream_to_core
157            .as_ref()
158            .map(|(token, dataset, handle)| {
159                NominalCoreConsumer::new(
160                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
161                    handle.clone(),
162                    token.clone(),
163                    dataset.clone(),
164                )
165            })
166    }
167
168    fn file_consumer(&self) -> Option<AvroFileConsumer> {
169        self.stream_to_file
170            .as_ref()
171            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
172    }
173
174    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
175        self.file_fallback
176            .as_ref()
177            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
178    }
179
180    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
181        let logging_consumer =
182            ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
183        NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
184    }
185}
186
187// for backcompat, new code should use NominalDatasetStream
188#[deprecated]
189pub type NominalDatasourceStream = NominalDatasetStream;
190
191pub struct NominalDatasetStream {
192    opts: NominalStreamOpts,
193    running: Arc<AtomicBool>,
194    unflushed_points: Arc<AtomicUsize>,
195    primary_buffer: Arc<SeriesBuffer>,
196    secondary_buffer: Arc<SeriesBuffer>,
197    primary_handle: thread::JoinHandle<()>,
198    secondary_handle: thread::JoinHandle<()>,
199}
200
201impl NominalDatasetStream {
202    pub fn builder() -> NominalDatasetStreamBuilder {
203        NominalDatasetStreamBuilder::new()
204    }
205
206    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
207        consumer: C,
208        opts: NominalStreamOpts,
209    ) -> Self {
210        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
211        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
212
213        let (request_tx, request_rx) =
214            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
215
216        let running = Arc::new(AtomicBool::new(true));
217        let unflushed_points = Arc::new(AtomicUsize::new(0));
218
219        let primary_handle = thread::Builder::new()
220            .name("nmstream_primary".to_string())
221            .spawn({
222                let points_buffer = Arc::clone(&primary_buffer);
223                let running = running.clone();
224                let tx = request_tx.clone();
225                move || {
226                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
227                }
228            })
229            .unwrap();
230
231        let secondary_handle = thread::Builder::new()
232            .name("nmstream_secondary".to_string())
233            .spawn({
234                let secondary_buffer = Arc::clone(&secondary_buffer);
235                let running = running.clone();
236                move || {
237                    batch_processor(
238                        running,
239                        secondary_buffer,
240                        request_tx,
241                        opts.max_request_delay,
242                    );
243                }
244            })
245            .unwrap();
246
247        let consumer = Arc::new(consumer);
248
249        for i in 0..opts.request_dispatcher_tasks {
250            thread::Builder::new()
251                .name(format!("nmstream_dispatch_{i}"))
252                .spawn({
253                    let running = Arc::clone(&running);
254                    let unflushed_points = Arc::clone(&unflushed_points);
255                    let rx = request_rx.clone();
256                    let consumer = consumer.clone();
257                    move || {
258                        debug!("starting request dispatcher");
259                        request_dispatcher(running, unflushed_points, rx, consumer);
260                    }
261                })
262                .unwrap();
263        }
264
265        NominalDatasetStream {
266            opts,
267            running,
268            unflushed_points,
269            primary_buffer,
270            secondary_buffer,
271            primary_handle,
272            secondary_handle,
273        }
274    }
275
276    pub fn double_writer<'a>(
277        &'a self,
278        channel_descriptor: &'a ChannelDescriptor,
279    ) -> NominalDoubleWriter<'a> {
280        NominalDoubleWriter {
281            writer: NominalChannelWriter::new(self, channel_descriptor),
282        }
283    }
284
285    pub fn string_writer<'a>(
286        &'a self,
287        channel_descriptor: &'a ChannelDescriptor,
288    ) -> NominalStringWriter<'a> {
289        NominalStringWriter {
290            writer: NominalChannelWriter::new(self, channel_descriptor),
291        }
292    }
293
294    pub fn integer_writer<'a>(
295        &'a self,
296        channel_descriptor: &'a ChannelDescriptor,
297    ) -> NominalIntegerWriter<'a> {
298        NominalIntegerWriter {
299            writer: NominalChannelWriter::new(self, channel_descriptor),
300        }
301    }
302
303    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
304        let new_points = new_points.into_points();
305        let new_count = points_len(&new_points);
306
307        self.when_capacity(new_count, |mut sb| {
308            sb.extend(channel_descriptor, new_points)
309        });
310    }
311
312    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
313        self.unflushed_points
314            .fetch_add(new_count, Ordering::Release);
315
316        if self.primary_buffer.has_capacity(new_count) {
317            debug!("adding {} points to primary buffer", new_count);
318            callback(self.primary_buffer.lock());
319        } else if self.secondary_buffer.has_capacity(new_count) {
320            // primary buffer is definitely full
321            self.primary_handle.thread().unpark();
322            debug!("adding {} points to secondary buffer", new_count);
323            callback(self.secondary_buffer.lock());
324        } else {
325            let buf = if self.primary_buffer < self.secondary_buffer {
326                info!("waiting for primary buffer to flush to append {new_count} points...");
327                self.primary_handle.thread().unpark();
328                &self.primary_buffer
329            } else {
330                info!("waiting for secondary buffer to flush to append {new_count} points...");
331                self.secondary_handle.thread().unpark();
332                &self.secondary_buffer
333            };
334
335            buf.on_notify(callback);
336        }
337    }
338}
339
340pub struct NominalChannelWriter<'ds, T>
341where
342    Vec<T>: IntoPoints,
343{
344    channel: &'ds ChannelDescriptor,
345    stream: &'ds NominalDatasetStream,
346    last_flushed_at: Instant,
347    unflushed: Vec<T>,
348}
349
350impl<T> NominalChannelWriter<'_, T>
351where
352    Vec<T>: IntoPoints,
353{
354    fn new<'ds>(
355        stream: &'ds NominalDatasetStream,
356        channel: &'ds ChannelDescriptor,
357    ) -> NominalChannelWriter<'ds, T> {
358        NominalChannelWriter {
359            channel,
360            stream,
361            last_flushed_at: Instant::now(),
362            unflushed: vec![],
363        }
364    }
365
366    fn push_point(&mut self, point: T) {
367        self.unflushed.push(point);
368        if self.unflushed.len() >= self.stream.opts.max_points_per_record
369            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
370        {
371            debug!(
372                "conditionally flushing {:?}, ({} points, {:?} since last)",
373                self.channel,
374                self.unflushed.len(),
375                self.last_flushed_at.elapsed()
376            );
377            self.flush();
378        }
379    }
380
381    fn flush(&mut self) {
382        if self.unflushed.is_empty() {
383            return;
384        }
385        info!(
386            "flushing writer for {:?} with {} points",
387            self.channel,
388            self.unflushed.len()
389        );
390        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
391            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
392            buf.extend(self.channel, to_flush);
393            self.last_flushed_at = Instant::now();
394        })
395    }
396}
397
398impl<T> Drop for NominalChannelWriter<'_, T>
399where
400    Vec<T>: IntoPoints,
401{
402    fn drop(&mut self) {
403        info!("flushing then dropping writer for: {:?}", self.channel);
404        self.flush();
405    }
406}
407
408pub struct NominalDoubleWriter<'ds> {
409    writer: NominalChannelWriter<'ds, DoublePoint>,
410}
411
412impl NominalDoubleWriter<'_> {
413    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
414        self.writer.push_point(DoublePoint {
415            timestamp: Some(timestamp.into_timestamp()),
416            value,
417        });
418    }
419}
420
421pub struct NominalIntegerWriter<'ds> {
422    writer: NominalChannelWriter<'ds, IntegerPoint>,
423}
424
425impl NominalIntegerWriter<'_> {
426    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
427        self.writer.push_point(IntegerPoint {
428            timestamp: Some(timestamp.into_timestamp()),
429            value,
430        });
431    }
432}
433
434pub struct NominalStringWriter<'ds> {
435    writer: NominalChannelWriter<'ds, StringPoint>,
436}
437
438impl NominalStringWriter<'_> {
439    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
440        self.writer.push_point(StringPoint {
441            timestamp: Some(timestamp.into_timestamp()),
442            value: value.into(),
443        });
444    }
445}
446
447struct SeriesBuffer {
448    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
449    count: AtomicUsize,
450    flush_time: AtomicU64,
451    condvar: Condvar,
452    max_capacity: usize,
453}
454
455struct SeriesBufferGuard<'sb> {
456    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
457    count: &'sb AtomicUsize,
458}
459
460impl SeriesBufferGuard<'_> {
461    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
462        let points = points.into_points();
463        let new_point_count = points_len(&points);
464
465        if !self.sb.contains_key(channel_descriptor) {
466            self.sb.insert(channel_descriptor.clone(), points);
467        } else {
468            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
469                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
470                    existing.points.extend(new.points)
471                }
472                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
473                    existing.points.extend(new.points)
474                }
475                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
476                    existing.points.extend(new.points)
477                }
478                (_, _) => {
479                    // todo: improve error
480                    panic!("mismatched types");
481                }
482            }
483        }
484
485        self.count.fetch_add(new_point_count, Ordering::Release);
486    }
487}
488
489impl PartialEq for SeriesBuffer {
490    fn eq(&self, other: &Self) -> bool {
491        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
492    }
493}
494
495impl PartialOrd for SeriesBuffer {
496    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
497        let flush_time = self.flush_time.load(Ordering::Acquire);
498        let other_flush_time = other.flush_time.load(Ordering::Acquire);
499        flush_time.partial_cmp(&other_flush_time)
500    }
501}
502
503impl SeriesBuffer {
504    fn new(capacity: usize) -> Self {
505        Self {
506            points: Mutex::new(HashMap::new()),
507            count: AtomicUsize::new(0),
508            flush_time: AtomicU64::new(0),
509            condvar: Condvar::new(),
510            max_capacity: capacity,
511        }
512    }
513
514    /// Checks if the buffer has enough capacity to add new points.
515    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
516    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
517    /// to handle splitting batches of points across multiple requests.
518    fn has_capacity(&self, new_points_count: usize) -> bool {
519        let count = self.count.load(Ordering::Acquire);
520        count == 0 || count + new_points_count <= self.max_capacity
521    }
522
523    fn lock(&self) -> SeriesBufferGuard<'_> {
524        SeriesBufferGuard {
525            sb: self.points.lock(),
526            count: &self.count,
527        }
528    }
529
530    fn take(&self) -> (usize, Vec<Series>) {
531        let mut points = self.lock();
532        self.flush_time.store(
533            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
534            Ordering::Release,
535        );
536        let result = points
537            .sb
538            .drain()
539            .map(|(ChannelDescriptor { name, tags }, points)| {
540                let channel = Channel { name };
541                let points_obj = Points {
542                    points_type: Some(points),
543                };
544                Series {
545                    channel: Some(channel),
546                    tags: tags
547                        .map(|tags| tags.into_iter().collect())
548                        .unwrap_or_default(),
549                    points: Some(points_obj),
550                }
551            })
552            .collect();
553        let result_count = self
554            .count
555            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
556            .unwrap();
557        (result_count, result)
558    }
559
560    fn is_empty(&self) -> bool {
561        self.count() == 0
562    }
563
564    fn count(&self) -> usize {
565        self.count.load(Ordering::Acquire)
566    }
567
568    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
569        let mut points_lock = self.points.lock();
570        // concurrency bug without this - the buffer could have been emptied since we
571        // checked the count, so this will wait forever & block any new points from entering
572        if !points_lock.is_empty() {
573            self.condvar.wait(&mut points_lock);
574        } else {
575            debug!("buffer emptied since last check, skipping condvar wait");
576        }
577        on_notify(SeriesBufferGuard {
578            sb: points_lock,
579            count: &self.count,
580        });
581    }
582
583    fn notify(&self) -> bool {
584        self.condvar.notify_one()
585    }
586}
587
588fn batch_processor(
589    running: Arc<AtomicBool>,
590    points_buffer: Arc<SeriesBuffer>,
591    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
592    max_request_delay: Duration,
593) {
594    loop {
595        debug!("starting processor loop");
596        if points_buffer.is_empty() {
597            if !running.load(Ordering::Acquire) {
598                debug!("batch processor thread exiting due to running flag");
599                drop(request_chan);
600                break;
601            } else {
602                debug!("empty points buffer, waiting");
603                thread::park_timeout(max_request_delay);
604            }
605            continue;
606        }
607        let (point_count, series) = points_buffer.take();
608
609        if points_buffer.notify() {
610            debug!("notified one waiting thread after clearing points buffer");
611        }
612
613        let write_request = WriteRequestNominal { series };
614
615        if request_chan.is_full() {
616            debug!("ready to queue request but request channel is full");
617        }
618        let rep = request_chan.send((write_request, point_count));
619        debug!("queued request for processing");
620        if rep.is_err() {
621            error!("failed to send request to dispatcher");
622        } else {
623            debug!("finished submitting request");
624        }
625
626        thread::park_timeout(max_request_delay);
627    }
628    debug!("batch processor thread exiting");
629}
630
631impl Drop for NominalDatasetStream {
632    fn drop(&mut self) {
633        debug!("starting drop for NominalDatasetStream");
634        self.running.store(false, Ordering::Release);
635        while self.unflushed_points.load(Ordering::Acquire) > 0 {
636            debug!(
637                "waiting for all points to be flushed before dropping stream, {} points remaining",
638                self.unflushed_points.load(Ordering::Acquire)
639            );
640            // todo: reduce this + give up after some maximum timeout is reached
641            thread::sleep(Duration::from_millis(50));
642        }
643    }
644}
645
646fn request_dispatcher<C: WriteRequestConsumer + 'static>(
647    running: Arc<AtomicBool>,
648    unflushed_points: Arc<AtomicUsize>,
649    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
650    consumer: Arc<C>,
651) {
652    let mut total_request_time = 0;
653    loop {
654        match request_rx.recv() {
655            Ok((request, point_count)) => {
656                debug!("received writerequest from channel");
657                let req_start = Instant::now();
658                match consumer.consume(&request) {
659                    Ok(_) => {
660                        let time = req_start.elapsed().as_millis();
661                        debug!("request of {} points sent in {} ms", point_count, time);
662                        total_request_time += time as u64;
663                    }
664                    Err(e) => {
665                        error!("Failed to send request: {e:?}");
666                    }
667                }
668                unflushed_points.fetch_sub(point_count, Ordering::Release);
669
670                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
671                {
672                    info!("all points flushed, closing dispatcher thread");
673                    // notify the processor thread that all points have been flushed
674                    drop(request_rx);
675                    break;
676                }
677            }
678            Err(e) => {
679                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
680                break;
681            }
682        }
683    }
684    debug!(
685        "request dispatcher thread exiting. total request time: {}",
686        total_request_time
687    );
688}
689
690fn points_len(points_type: &PointsType) -> usize {
691    match points_type {
692        PointsType::DoublePoints(points) => points.points.len(),
693        PointsType::StringPoints(points) => points.points.len(),
694        PointsType::IntegerPoints(points) => points.points.len(),
695    }
696}