nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::NominalApiClients;
32use crate::client::PRODUCTION_API_URL;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::notifier::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46    pub max_points_per_record: usize,
47    pub max_request_delay: Duration,
48    pub max_buffered_requests: usize,
49    pub request_dispatcher_tasks: usize,
50    pub base_api_url: String,
51}
52
53impl Default for NominalStreamOpts {
54    fn default() -> Self {
55        Self {
56            max_points_per_record: 250_000,
57            max_request_delay: Duration::from_millis(100),
58            max_buffered_requests: 4,
59            request_dispatcher_tasks: 8,
60            base_api_url: PRODUCTION_API_URL.to_string(),
61        }
62    }
63}
64
65#[derive(Debug, Default)]
66pub struct NominalDatasetStreamBuilder {
67    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
68    stream_to_file: Option<PathBuf>,
69    file_fallback: Option<PathBuf>,
70    opts: NominalStreamOpts,
71}
72
73impl NominalDatasetStreamBuilder {
74    pub fn new() -> NominalDatasetStreamBuilder {
75        NominalDatasetStreamBuilder {
76            ..Default::default()
77        }
78    }
79
80    pub fn stream_to_core(
81        mut self,
82        token: BearerToken,
83        dataset: ResourceIdentifier,
84        handle: tokio::runtime::Handle,
85    ) -> Self {
86        self.stream_to_core = Some((token, dataset, handle));
87        self
88    }
89
90    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
91        self.stream_to_file = Some(file_path.into());
92        self
93    }
94
95    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
96        self.file_fallback = Some(file_path.into());
97        self
98    }
99
100    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
101        self.opts = opts;
102        self
103    }
104
105    #[cfg(feature = "logging")]
106    fn init_logging(self, directive: Option<&str>) -> Self {
107        use tracing_subscriber::layer::SubscriberExt;
108        use tracing_subscriber::util::SubscriberInitExt;
109
110        // Build the filter, either from an explicit directive or the environment.
111        let base = tracing_subscriber::EnvFilter::builder()
112            .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
113        let env_filter = match directive {
114            Some(d) => base.parse_lossy(d),
115            None => base.from_env_lossy(),
116        };
117
118        let subscriber = tracing_subscriber::registry()
119            .with(
120                tracing_subscriber::fmt::layer()
121                    .with_thread_ids(true)
122                    .with_thread_names(true)
123                    .with_line_number(true),
124            )
125            .with(env_filter);
126
127        if let Err(error) = subscriber.try_init() {
128            eprintln!("nominal streaming failed to enable logging: {error}");
129        }
130
131        self
132    }
133
134    #[cfg(feature = "logging")]
135    pub fn enable_logging(self) -> Self {
136        self.init_logging(None)
137    }
138
139    #[cfg(feature = "logging")]
140    pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
141        self.init_logging(Some(log_directive))
142    }
143
144    pub fn build(self) -> NominalDatasetStream {
145        let core_consumer = self.core_consumer();
146        let file_consumer = self.file_consumer();
147        let fallback_consumer = self.fallback_consumer();
148
149        match (core_consumer, file_consumer, fallback_consumer) {
150            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
151            (Some(_), Some(_), Some(_)) => {
152                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
153            }
154            (Some(core), None, None) => self.into_stream(core),
155            (Some(core), None, Some(fallback)) => {
156                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
157            }
158            (None, Some(file), None) => self.into_stream(file),
159            (None, Some(file), Some(fallback)) => {
160                // todo: should this even be supported?
161                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
162            }
163            (Some(core), Some(file), None) => {
164                self.into_stream(DualWriteRequestConsumer::new(core, file))
165            }
166        }
167    }
168
169    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
170        self.stream_to_core
171            .as_ref()
172            .map(|(token, dataset, handle)| {
173                NominalCoreConsumer::new(
174                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
175                    handle.clone(),
176                    token.clone(),
177                    dataset.clone(),
178                )
179            })
180    }
181
182    fn file_consumer(&self) -> Option<AvroFileConsumer> {
183        self.stream_to_file
184            .as_ref()
185            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
186    }
187
188    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
189        self.file_fallback
190            .as_ref()
191            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
192    }
193
194    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
195        let logging_consumer =
196            ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
197        NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
198    }
199}
200
201// for backcompat, new code should use NominalDatasetStream
202#[deprecated]
203pub type NominalDatasourceStream = NominalDatasetStream;
204
205pub struct NominalDatasetStream {
206    opts: NominalStreamOpts,
207    running: Arc<AtomicBool>,
208    unflushed_points: Arc<AtomicUsize>,
209    primary_buffer: Arc<SeriesBuffer>,
210    secondary_buffer: Arc<SeriesBuffer>,
211    primary_handle: thread::JoinHandle<()>,
212    secondary_handle: thread::JoinHandle<()>,
213}
214
215impl NominalDatasetStream {
216    pub fn builder() -> NominalDatasetStreamBuilder {
217        NominalDatasetStreamBuilder::new()
218    }
219
220    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
221        consumer: C,
222        opts: NominalStreamOpts,
223    ) -> Self {
224        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
225        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
226
227        let (request_tx, request_rx) =
228            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
229
230        let running = Arc::new(AtomicBool::new(true));
231        let unflushed_points = Arc::new(AtomicUsize::new(0));
232
233        let primary_handle = thread::Builder::new()
234            .name("nmstream_primary".to_string())
235            .spawn({
236                let points_buffer = Arc::clone(&primary_buffer);
237                let running = running.clone();
238                let tx = request_tx.clone();
239                move || {
240                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
241                }
242            })
243            .unwrap();
244
245        let secondary_handle = thread::Builder::new()
246            .name("nmstream_secondary".to_string())
247            .spawn({
248                let secondary_buffer = Arc::clone(&secondary_buffer);
249                let running = running.clone();
250                move || {
251                    batch_processor(
252                        running,
253                        secondary_buffer,
254                        request_tx,
255                        opts.max_request_delay,
256                    );
257                }
258            })
259            .unwrap();
260
261        let consumer = Arc::new(consumer);
262
263        for i in 0..opts.request_dispatcher_tasks {
264            thread::Builder::new()
265                .name(format!("nmstream_dispatch_{i}"))
266                .spawn({
267                    let running = Arc::clone(&running);
268                    let unflushed_points = Arc::clone(&unflushed_points);
269                    let rx = request_rx.clone();
270                    let consumer = consumer.clone();
271                    move || {
272                        debug!("starting request dispatcher");
273                        request_dispatcher(running, unflushed_points, rx, consumer);
274                    }
275                })
276                .unwrap();
277        }
278
279        NominalDatasetStream {
280            opts,
281            running,
282            unflushed_points,
283            primary_buffer,
284            secondary_buffer,
285            primary_handle,
286            secondary_handle,
287        }
288    }
289
290    pub fn double_writer<'a>(
291        &'a self,
292        channel_descriptor: &'a ChannelDescriptor,
293    ) -> NominalDoubleWriter<'a> {
294        NominalDoubleWriter {
295            writer: NominalChannelWriter::new(self, channel_descriptor),
296        }
297    }
298
299    pub fn string_writer<'a>(
300        &'a self,
301        channel_descriptor: &'a ChannelDescriptor,
302    ) -> NominalStringWriter<'a> {
303        NominalStringWriter {
304            writer: NominalChannelWriter::new(self, channel_descriptor),
305        }
306    }
307
308    pub fn integer_writer<'a>(
309        &'a self,
310        channel_descriptor: &'a ChannelDescriptor,
311    ) -> NominalIntegerWriter<'a> {
312        NominalIntegerWriter {
313            writer: NominalChannelWriter::new(self, channel_descriptor),
314        }
315    }
316
317    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
318        let new_points = new_points.into_points();
319        let new_count = points_len(&new_points);
320
321        self.when_capacity(new_count, |mut sb| {
322            sb.extend(channel_descriptor, new_points)
323        });
324    }
325
326    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
327        self.unflushed_points
328            .fetch_add(new_count, Ordering::Release);
329
330        if self.primary_buffer.has_capacity(new_count) {
331            debug!("adding {} points to primary buffer", new_count);
332            callback(self.primary_buffer.lock());
333        } else if self.secondary_buffer.has_capacity(new_count) {
334            // primary buffer is definitely full
335            self.primary_handle.thread().unpark();
336            debug!("adding {} points to secondary buffer", new_count);
337            callback(self.secondary_buffer.lock());
338        } else {
339            let buf = if self.primary_buffer < self.secondary_buffer {
340                info!("waiting for primary buffer to flush to append {new_count} points...");
341                self.primary_handle.thread().unpark();
342                &self.primary_buffer
343            } else {
344                info!("waiting for secondary buffer to flush to append {new_count} points...");
345                self.secondary_handle.thread().unpark();
346                &self.secondary_buffer
347            };
348
349            buf.on_notify(callback);
350        }
351    }
352}
353
354pub struct NominalChannelWriter<'ds, T>
355where
356    Vec<T>: IntoPoints,
357{
358    channel: &'ds ChannelDescriptor,
359    stream: &'ds NominalDatasetStream,
360    last_flushed_at: Instant,
361    unflushed: Vec<T>,
362}
363
364impl<T> NominalChannelWriter<'_, T>
365where
366    Vec<T>: IntoPoints,
367{
368    fn new<'ds>(
369        stream: &'ds NominalDatasetStream,
370        channel: &'ds ChannelDescriptor,
371    ) -> NominalChannelWriter<'ds, T> {
372        NominalChannelWriter {
373            channel,
374            stream,
375            last_flushed_at: Instant::now(),
376            unflushed: vec![],
377        }
378    }
379
380    fn push_point(&mut self, point: T) {
381        self.unflushed.push(point);
382        if self.unflushed.len() >= self.stream.opts.max_points_per_record
383            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
384        {
385            debug!(
386                "conditionally flushing {:?}, ({} points, {:?} since last)",
387                self.channel,
388                self.unflushed.len(),
389                self.last_flushed_at.elapsed()
390            );
391            self.flush();
392        }
393    }
394
395    fn flush(&mut self) {
396        if self.unflushed.is_empty() {
397            return;
398        }
399        info!(
400            "flushing writer for {:?} with {} points",
401            self.channel,
402            self.unflushed.len()
403        );
404        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
405            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
406            buf.extend(self.channel, to_flush);
407            self.last_flushed_at = Instant::now();
408        })
409    }
410}
411
412impl<T> Drop for NominalChannelWriter<'_, T>
413where
414    Vec<T>: IntoPoints,
415{
416    fn drop(&mut self) {
417        info!("flushing then dropping writer for: {:?}", self.channel);
418        self.flush();
419    }
420}
421
422pub struct NominalDoubleWriter<'ds> {
423    writer: NominalChannelWriter<'ds, DoublePoint>,
424}
425
426impl NominalDoubleWriter<'_> {
427    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
428        self.writer.push_point(DoublePoint {
429            timestamp: Some(timestamp.into_timestamp()),
430            value,
431        });
432    }
433}
434
435pub struct NominalIntegerWriter<'ds> {
436    writer: NominalChannelWriter<'ds, IntegerPoint>,
437}
438
439impl NominalIntegerWriter<'_> {
440    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
441        self.writer.push_point(IntegerPoint {
442            timestamp: Some(timestamp.into_timestamp()),
443            value,
444        });
445    }
446}
447
448pub struct NominalStringWriter<'ds> {
449    writer: NominalChannelWriter<'ds, StringPoint>,
450}
451
452impl NominalStringWriter<'_> {
453    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
454        self.writer.push_point(StringPoint {
455            timestamp: Some(timestamp.into_timestamp()),
456            value: value.into(),
457        });
458    }
459}
460
461struct SeriesBuffer {
462    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
463    count: AtomicUsize,
464    flush_time: AtomicU64,
465    condvar: Condvar,
466    max_capacity: usize,
467}
468
469struct SeriesBufferGuard<'sb> {
470    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
471    count: &'sb AtomicUsize,
472}
473
474impl SeriesBufferGuard<'_> {
475    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
476        let points = points.into_points();
477        let new_point_count = points_len(&points);
478
479        if !self.sb.contains_key(channel_descriptor) {
480            self.sb.insert(channel_descriptor.clone(), points);
481        } else {
482            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
483                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
484                    existing.points.extend(new.points)
485                }
486                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
487                    existing.points.extend(new.points)
488                }
489                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
490                    existing.points.extend(new.points)
491                }
492                (_, _) => {
493                    // todo: improve error
494                    panic!("mismatched types");
495                }
496            }
497        }
498
499        self.count.fetch_add(new_point_count, Ordering::Release);
500    }
501}
502
503impl PartialEq for SeriesBuffer {
504    fn eq(&self, other: &Self) -> bool {
505        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
506    }
507}
508
509impl PartialOrd for SeriesBuffer {
510    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
511        let flush_time = self.flush_time.load(Ordering::Acquire);
512        let other_flush_time = other.flush_time.load(Ordering::Acquire);
513        flush_time.partial_cmp(&other_flush_time)
514    }
515}
516
517impl SeriesBuffer {
518    fn new(capacity: usize) -> Self {
519        Self {
520            points: Mutex::new(HashMap::new()),
521            count: AtomicUsize::new(0),
522            flush_time: AtomicU64::new(0),
523            condvar: Condvar::new(),
524            max_capacity: capacity,
525        }
526    }
527
528    /// Checks if the buffer has enough capacity to add new points.
529    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
530    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
531    /// to handle splitting batches of points across multiple requests.
532    fn has_capacity(&self, new_points_count: usize) -> bool {
533        let count = self.count.load(Ordering::Acquire);
534        count == 0 || count + new_points_count <= self.max_capacity
535    }
536
537    fn lock(&self) -> SeriesBufferGuard<'_> {
538        SeriesBufferGuard {
539            sb: self.points.lock(),
540            count: &self.count,
541        }
542    }
543
544    fn take(&self) -> (usize, Vec<Series>) {
545        let mut points = self.lock();
546        self.flush_time.store(
547            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
548            Ordering::Release,
549        );
550        let result = points
551            .sb
552            .drain()
553            .map(|(ChannelDescriptor { name, tags }, points)| {
554                let channel = Channel { name };
555                let points_obj = Points {
556                    points_type: Some(points),
557                };
558                Series {
559                    channel: Some(channel),
560                    tags: tags
561                        .map(|tags| tags.into_iter().collect())
562                        .unwrap_or_default(),
563                    points: Some(points_obj),
564                }
565            })
566            .collect();
567        let result_count = self
568            .count
569            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
570            .unwrap();
571        (result_count, result)
572    }
573
574    fn is_empty(&self) -> bool {
575        self.count() == 0
576    }
577
578    fn count(&self) -> usize {
579        self.count.load(Ordering::Acquire)
580    }
581
582    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
583        let mut points_lock = self.points.lock();
584        // concurrency bug without this - the buffer could have been emptied since we
585        // checked the count, so this will wait forever & block any new points from entering
586        if !points_lock.is_empty() {
587            self.condvar.wait(&mut points_lock);
588        } else {
589            debug!("buffer emptied since last check, skipping condvar wait");
590        }
591        on_notify(SeriesBufferGuard {
592            sb: points_lock,
593            count: &self.count,
594        });
595    }
596
597    fn notify(&self) -> bool {
598        self.condvar.notify_one()
599    }
600}
601
602fn batch_processor(
603    running: Arc<AtomicBool>,
604    points_buffer: Arc<SeriesBuffer>,
605    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
606    max_request_delay: Duration,
607) {
608    loop {
609        debug!("starting processor loop");
610        if points_buffer.is_empty() {
611            if !running.load(Ordering::Acquire) {
612                debug!("batch processor thread exiting due to running flag");
613                drop(request_chan);
614                break;
615            } else {
616                debug!("empty points buffer, waiting");
617                thread::park_timeout(max_request_delay);
618            }
619            continue;
620        }
621        let (point_count, series) = points_buffer.take();
622
623        if points_buffer.notify() {
624            debug!("notified one waiting thread after clearing points buffer");
625        }
626
627        let write_request = WriteRequestNominal { series };
628
629        if request_chan.is_full() {
630            debug!("ready to queue request but request channel is full");
631        }
632        let rep = request_chan.send((write_request, point_count));
633        debug!("queued request for processing");
634        if rep.is_err() {
635            error!("failed to send request to dispatcher");
636        } else {
637            debug!("finished submitting request");
638        }
639
640        thread::park_timeout(max_request_delay);
641    }
642    debug!("batch processor thread exiting");
643}
644
645impl Drop for NominalDatasetStream {
646    fn drop(&mut self) {
647        debug!("starting drop for NominalDatasetStream");
648        self.running.store(false, Ordering::Release);
649        while self.unflushed_points.load(Ordering::Acquire) > 0 {
650            debug!(
651                "waiting for all points to be flushed before dropping stream, {} points remaining",
652                self.unflushed_points.load(Ordering::Acquire)
653            );
654            // todo: reduce this + give up after some maximum timeout is reached
655            thread::sleep(Duration::from_millis(50));
656        }
657    }
658}
659
660fn request_dispatcher<C: WriteRequestConsumer + 'static>(
661    running: Arc<AtomicBool>,
662    unflushed_points: Arc<AtomicUsize>,
663    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
664    consumer: Arc<C>,
665) {
666    let mut total_request_time = 0;
667    loop {
668        match request_rx.recv() {
669            Ok((request, point_count)) => {
670                debug!("received writerequest from channel");
671                let req_start = Instant::now();
672                match consumer.consume(&request) {
673                    Ok(_) => {
674                        let time = req_start.elapsed().as_millis();
675                        debug!("request of {} points sent in {} ms", point_count, time);
676                        total_request_time += time as u64;
677                    }
678                    Err(e) => {
679                        error!("Failed to send request: {e:?}");
680                    }
681                }
682                unflushed_points.fetch_sub(point_count, Ordering::Release);
683
684                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
685                {
686                    info!("all points flushed, closing dispatcher thread");
687                    // notify the processor thread that all points have been flushed
688                    drop(request_rx);
689                    break;
690                }
691            }
692            Err(e) => {
693                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
694                break;
695            }
696        }
697    }
698    debug!(
699        "request dispatcher thread exiting. total request time: {}",
700        total_request_time
701    );
702}
703
704fn points_len(points_type: &PointsType) -> usize {
705    match points_type {
706        PointsType::DoublePoints(points) => points.points.len(),
707        PointsType::StringPoints(points) => points.points.len(),
708        PointsType::IntegerPoints(points) => points.points.len(),
709    }
710}