Skip to main content

nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::Points;
23use nominal_api::tonic::io::nominal::scout::api::proto::Series;
24use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
25use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
27use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
28use parking_lot::Condvar;
29use parking_lot::Mutex;
30use parking_lot::MutexGuard;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34
35use crate::client::NominalApiClients;
36use crate::client::PRODUCTION_API_URL;
37use crate::consumer::AvroFileConsumer;
38use crate::consumer::DualWriteRequestConsumer;
39use crate::consumer::ListeningWriteRequestConsumer;
40use crate::consumer::NominalCoreConsumer;
41use crate::consumer::RequestConsumerWithFallback;
42use crate::consumer::WriteRequestConsumer;
43use crate::listener::LoggingListener;
44use crate::types::ChannelDescriptor;
45use crate::types::IntoPoints;
46use crate::types::IntoTimestamp;
47
48#[derive(Debug, Clone)]
49pub struct NominalStreamOpts {
50    pub max_points_per_record: usize,
51    pub max_request_delay: Duration,
52    pub max_buffered_requests: usize,
53    pub request_dispatcher_tasks: usize,
54    pub base_api_url: String,
55}
56
57impl Default for NominalStreamOpts {
58    fn default() -> Self {
59        Self {
60            max_points_per_record: 250_000,
61            max_request_delay: Duration::from_millis(100),
62            max_buffered_requests: 4,
63            request_dispatcher_tasks: 8,
64            base_api_url: PRODUCTION_API_URL.to_string(),
65        }
66    }
67}
68
69#[derive(Default)]
70pub struct NominalDatasetStreamBuilder {
71    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
72    stream_to_file: Option<PathBuf>,
73    file_fallback: Option<PathBuf>,
74    listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
75    opts: NominalStreamOpts,
76}
77
78impl Debug for NominalDatasetStreamBuilder {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("NominalDatasetStreamBuilder")
81            .field("stream_to_core", &self.stream_to_core.is_some())
82            .field("stream_to_file", &self.stream_to_file)
83            .field("file_fallback", &self.file_fallback)
84            .field("listeners", &self.listeners.len())
85            .finish()
86    }
87}
88
89impl NominalDatasetStreamBuilder {
90    pub fn new() -> Self {
91        Self::default()
92    }
93}
94
95impl NominalDatasetStreamBuilder {
96    pub fn stream_to_core(
97        self,
98        bearer_token: BearerToken,
99        dataset: ResourceIdentifier,
100        handle: tokio::runtime::Handle,
101    ) -> NominalDatasetStreamBuilder {
102        NominalDatasetStreamBuilder {
103            stream_to_core: Some((bearer_token, dataset, handle)),
104            stream_to_file: self.stream_to_file,
105            file_fallback: self.file_fallback,
106            listeners: self.listeners,
107            opts: self.opts,
108        }
109    }
110
111    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
112        self.stream_to_file = Some(file_path.into());
113        self
114    }
115
116    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
117        self.file_fallback = Some(file_path.into());
118        self
119    }
120
121    pub fn add_listener(
122        mut self,
123        listener: Arc<dyn crate::listener::NominalStreamListener>,
124    ) -> Self {
125        self.listeners.push(listener);
126        self
127    }
128
129    pub fn with_listeners(
130        mut self,
131        listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
132    ) -> Self {
133        self.listeners = listeners;
134        self
135    }
136
137    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
138        self.opts = opts;
139        self
140    }
141
142    #[cfg(feature = "logging")]
143    fn init_logging(self, directive: Option<&str>) -> Self {
144        use tracing_subscriber::layer::SubscriberExt;
145        use tracing_subscriber::util::SubscriberInitExt;
146
147        // Build the filter, either from an explicit directive or the environment.
148        let base = tracing_subscriber::EnvFilter::builder()
149            .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
150        let env_filter = match directive {
151            Some(d) => base.parse_lossy(d),
152            None => base.from_env_lossy(),
153        };
154
155        let subscriber = tracing_subscriber::registry()
156            .with(
157                tracing_subscriber::fmt::layer()
158                    .with_thread_ids(true)
159                    .with_thread_names(true)
160                    .with_line_number(true),
161            )
162            .with(env_filter);
163
164        if let Err(error) = subscriber.try_init() {
165            eprintln!("nominal streaming failed to enable logging: {error}");
166        }
167
168        self
169    }
170
171    #[cfg(feature = "logging")]
172    pub fn enable_logging(self) -> Self {
173        self.init_logging(None)
174    }
175
176    #[cfg(feature = "logging")]
177    pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
178        self.init_logging(Some(log_directive))
179    }
180
181    pub fn build(self) -> NominalDatasetStream {
182        let core_consumer = self.core_consumer();
183        let file_consumer = self.file_consumer();
184        let fallback_consumer = self.fallback_consumer();
185
186        match (core_consumer, file_consumer, fallback_consumer) {
187            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
188            (Some(_), Some(_), Some(_)) => {
189                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
190            }
191            (Some(core), None, None) => self.into_stream(core),
192            (Some(core), None, Some(fallback)) => {
193                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
194            }
195            (None, Some(file), None) => self.into_stream(file),
196            (None, Some(file), Some(fallback)) => {
197                // todo: should this even be supported?
198                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
199            }
200            (Some(core), Some(file), None) => {
201                self.into_stream(DualWriteRequestConsumer::new(core, file))
202            }
203        }
204    }
205
206    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
207        self.stream_to_core
208            .as_ref()
209            .map(|(auth_provider, dataset, handle)| {
210                NominalCoreConsumer::new(
211                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
212                    handle.clone(),
213                    auth_provider.clone(),
214                    dataset.clone(),
215                )
216            })
217    }
218
219    fn file_consumer(&self) -> Option<AvroFileConsumer> {
220        self.stream_to_file
221            .as_ref()
222            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
223    }
224
225    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
226        self.file_fallback
227            .as_ref()
228            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
229    }
230
231    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
232        let mut listeners = self.listeners;
233        listeners.push(Arc::new(LoggingListener));
234        let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
235        NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
236    }
237}
238
239// for backcompat, new code should use NominalDatasetStream
240#[deprecated]
241pub type NominalDatasourceStream = NominalDatasetStream;
242
243pub struct NominalDatasetStream {
244    opts: NominalStreamOpts,
245    running: Arc<AtomicBool>,
246    unflushed_points: Arc<AtomicUsize>,
247    primary_buffer: Arc<SeriesBuffer>,
248    secondary_buffer: Arc<SeriesBuffer>,
249    primary_handle: thread::JoinHandle<()>,
250    secondary_handle: thread::JoinHandle<()>,
251}
252
253impl NominalDatasetStream {
254    pub fn builder() -> NominalDatasetStreamBuilder {
255        NominalDatasetStreamBuilder::new()
256    }
257
258    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
259        consumer: C,
260        opts: NominalStreamOpts,
261    ) -> Self {
262        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
263        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
264
265        let (request_tx, request_rx) =
266            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
267
268        let running = Arc::new(AtomicBool::new(true));
269        let unflushed_points = Arc::new(AtomicUsize::new(0));
270
271        let primary_handle = thread::Builder::new()
272            .name("nmstream_primary".to_string())
273            .spawn({
274                let points_buffer = Arc::clone(&primary_buffer);
275                let running = running.clone();
276                let tx = request_tx.clone();
277                move || {
278                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
279                }
280            })
281            .unwrap();
282
283        let secondary_handle = thread::Builder::new()
284            .name("nmstream_secondary".to_string())
285            .spawn({
286                let secondary_buffer = Arc::clone(&secondary_buffer);
287                let running = running.clone();
288                move || {
289                    batch_processor(
290                        running,
291                        secondary_buffer,
292                        request_tx,
293                        opts.max_request_delay,
294                    );
295                }
296            })
297            .unwrap();
298
299        let consumer = Arc::new(consumer);
300
301        for i in 0..opts.request_dispatcher_tasks {
302            thread::Builder::new()
303                .name(format!("nmstream_dispatch_{i}"))
304                .spawn({
305                    let running = Arc::clone(&running);
306                    let unflushed_points = Arc::clone(&unflushed_points);
307                    let rx = request_rx.clone();
308                    let consumer = consumer.clone();
309                    move || {
310                        debug!("starting request dispatcher #{}", i);
311                        request_dispatcher(running, unflushed_points, rx, consumer);
312                    }
313                })
314                .unwrap();
315        }
316
317        NominalDatasetStream {
318            opts,
319            running,
320            unflushed_points,
321            primary_buffer,
322            secondary_buffer,
323            primary_handle,
324            secondary_handle,
325        }
326    }
327
328    pub fn double_writer<'a>(
329        &'a self,
330        channel_descriptor: &'a ChannelDescriptor,
331    ) -> NominalDoubleWriter<'a> {
332        NominalDoubleWriter {
333            writer: NominalChannelWriter::new(self, channel_descriptor),
334        }
335    }
336
337    pub fn string_writer<'a>(
338        &'a self,
339        channel_descriptor: &'a ChannelDescriptor,
340    ) -> NominalStringWriter<'a> {
341        NominalStringWriter {
342            writer: NominalChannelWriter::new(self, channel_descriptor),
343        }
344    }
345
346    pub fn integer_writer<'a>(
347        &'a self,
348        channel_descriptor: &'a ChannelDescriptor,
349    ) -> NominalIntegerWriter<'a> {
350        NominalIntegerWriter {
351            writer: NominalChannelWriter::new(self, channel_descriptor),
352        }
353    }
354
355    pub fn uint64_writer<'a>(
356        &'a self,
357        channel_descriptor: &'a ChannelDescriptor,
358    ) -> NominalUint64Writer<'a> {
359        NominalUint64Writer {
360            writer: NominalChannelWriter::new(self, channel_descriptor),
361        }
362    }
363
364    pub fn struct_writer<'a>(
365        &'a self,
366        channel_descriptor: &'a ChannelDescriptor,
367    ) -> NominalStructWriter<'a> {
368        NominalStructWriter {
369            writer: NominalChannelWriter::new(self, channel_descriptor),
370        }
371    }
372    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
373        let new_points = new_points.into_points();
374        let new_count = points_len(&new_points);
375
376        self.when_capacity(new_count, |mut sb| {
377            sb.extend(channel_descriptor, new_points)
378        });
379    }
380
381    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
382        self.unflushed_points
383            .fetch_add(new_count, Ordering::Release);
384
385        if self.primary_buffer.has_capacity(new_count) {
386            debug!("adding {} points to primary buffer", new_count);
387            callback(self.primary_buffer.lock());
388        } else if self.secondary_buffer.has_capacity(new_count) {
389            // primary buffer is definitely full
390            self.primary_handle.thread().unpark();
391            debug!("adding {} points to secondary buffer", new_count);
392            callback(self.secondary_buffer.lock());
393        } else {
394            let buf = if self.primary_buffer < self.secondary_buffer {
395                info!("waiting for primary buffer to flush to append {new_count} points...");
396                self.primary_handle.thread().unpark();
397                &self.primary_buffer
398            } else {
399                info!("waiting for secondary buffer to flush to append {new_count} points...");
400                self.secondary_handle.thread().unpark();
401                &self.secondary_buffer
402            };
403
404            buf.on_notify(callback);
405        }
406    }
407}
408
409pub struct NominalChannelWriter<'ds, T>
410where
411    Vec<T>: IntoPoints,
412{
413    channel: &'ds ChannelDescriptor,
414    stream: &'ds NominalDatasetStream,
415    last_flushed_at: Instant,
416    unflushed: Vec<T>,
417}
418
419impl<T> NominalChannelWriter<'_, T>
420where
421    Vec<T>: IntoPoints,
422{
423    fn new<'ds>(
424        stream: &'ds NominalDatasetStream,
425        channel: &'ds ChannelDescriptor,
426    ) -> NominalChannelWriter<'ds, T> {
427        NominalChannelWriter {
428            channel,
429            stream,
430            last_flushed_at: Instant::now(),
431            unflushed: vec![],
432        }
433    }
434
435    fn push_point(&mut self, point: T) {
436        self.unflushed.push(point);
437        if self.unflushed.len() >= self.stream.opts.max_points_per_record
438            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
439        {
440            debug!(
441                "conditionally flushing {:?}, ({} points, {:?} since last)",
442                self.channel,
443                self.unflushed.len(),
444                self.last_flushed_at.elapsed()
445            );
446            self.flush();
447        }
448    }
449
450    fn flush(&mut self) {
451        if self.unflushed.is_empty() {
452            return;
453        }
454        info!(
455            "flushing writer for {:?} with {} points",
456            self.channel,
457            self.unflushed.len()
458        );
459        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
460            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
461            buf.extend(self.channel, to_flush);
462            self.last_flushed_at = Instant::now();
463        })
464    }
465}
466
467impl<T> Drop for NominalChannelWriter<'_, T>
468where
469    Vec<T>: IntoPoints,
470{
471    fn drop(&mut self) {
472        info!("flushing then dropping writer for: {:?}", self.channel);
473        self.flush();
474    }
475}
476
477pub struct NominalDoubleWriter<'ds> {
478    writer: NominalChannelWriter<'ds, DoublePoint>,
479}
480
481impl NominalDoubleWriter<'_> {
482    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
483        self.writer.push_point(DoublePoint {
484            timestamp: Some(timestamp.into_timestamp()),
485            value,
486        });
487    }
488}
489
490pub struct NominalIntegerWriter<'ds> {
491    writer: NominalChannelWriter<'ds, IntegerPoint>,
492}
493
494impl NominalIntegerWriter<'_> {
495    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
496        self.writer.push_point(IntegerPoint {
497            timestamp: Some(timestamp.into_timestamp()),
498            value,
499        });
500    }
501}
502
503pub struct NominalUint64Writer<'ds> {
504    writer: NominalChannelWriter<'ds, Uint64Point>,
505}
506
507impl NominalUint64Writer<'_> {
508    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
509        self.writer.push_point(Uint64Point {
510            timestamp: Some(timestamp.into_timestamp()),
511            value,
512        });
513    }
514}
515
516pub struct NominalStringWriter<'ds> {
517    writer: NominalChannelWriter<'ds, StringPoint>,
518}
519
520impl NominalStringWriter<'_> {
521    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
522        self.writer.push_point(StringPoint {
523            timestamp: Some(timestamp.into_timestamp()),
524            value: value.into(),
525        });
526    }
527}
528
529pub struct NominalStructWriter<'ds> {
530    writer: NominalChannelWriter<'ds, StructPoint>,
531}
532
533impl NominalStructWriter<'_> {
534    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
535        self.writer.push_point(StructPoint {
536            timestamp: Some(timestamp.into_timestamp()),
537            json_string: value.into(),
538        });
539    }
540}
541
542struct SeriesBuffer {
543    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
544    /// The total number of data points in the buffer.
545    ///
546    /// To ensure that `count` stays in sync with the contents of the `HashMap`,
547    /// only update `count` through the `SeriesBufferGuard`.
548    count: AtomicUsize,
549    flush_time: AtomicU64,
550    condvar: Condvar,
551    max_capacity: usize,
552}
553
554struct SeriesBufferGuard<'sb> {
555    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
556    count: &'sb AtomicUsize,
557}
558
559impl SeriesBufferGuard<'_> {
560    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
561        let points = points.into_points();
562        let new_point_count = points_len(&points);
563
564        if !self.sb.contains_key(channel_descriptor) {
565            self.sb.insert(channel_descriptor.clone(), points);
566        } else {
567            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
568                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
569                    existing.points.extend(new.points)
570                }
571                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
572                    existing.points.extend(new.points)
573                }
574                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
575                    existing.points.extend(new.points)
576                }
577                (
578                    PointsType::ArrayPoints(ArrayPoints {
579                        array_type: Some(ArrayType::DoubleArrayPoints(existing)),
580                    }),
581                    PointsType::ArrayPoints(ArrayPoints {
582                        array_type: Some(ArrayType::DoubleArrayPoints(new)),
583                    }),
584                ) => existing.points.extend(new.points),
585                (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
586                    existing.points.extend(new.points)
587                }
588                (
589                    PointsType::ArrayPoints(ArrayPoints {
590                        array_type: Some(ArrayType::StringArrayPoints(existing)),
591                    }),
592                    PointsType::ArrayPoints(ArrayPoints {
593                        array_type: Some(ArrayType::StringArrayPoints(new)),
594                    }),
595                ) => existing.points.extend(new.points),
596                (
597                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
598                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
599                ) => {}
600                (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
601                    existing.points.extend(new.points);
602                }
603                // this is hideous, but exhaustive matching is good to avoid future errors
604                (
605                    PointsType::DoublePoints(_),
606                    PointsType::IntegerPoints(_)
607                    | PointsType::Uint64Points(_)
608                    | PointsType::StringPoints(_)
609                    | PointsType::ArrayPoints(_)
610                    | PointsType::StructPoints(_),
611                )
612                | (
613                    PointsType::StringPoints(_),
614                    PointsType::DoublePoints(_)
615                    | PointsType::IntegerPoints(_)
616                    | PointsType::Uint64Points(_)
617                    | PointsType::ArrayPoints(_)
618                    | PointsType::StructPoints(_),
619                )
620                | (
621                    PointsType::IntegerPoints(_),
622                    PointsType::DoublePoints(_)
623                    | PointsType::Uint64Points(_)
624                    | PointsType::StringPoints(_)
625                    | PointsType::ArrayPoints(_)
626                    | PointsType::StructPoints(_),
627                )
628                | (
629                    PointsType::ArrayPoints(_),
630                    PointsType::DoublePoints(_)
631                    | PointsType::Uint64Points(_)
632                    | PointsType::StringPoints(_)
633                    | PointsType::IntegerPoints(_)
634                    | PointsType::StructPoints(_),
635                )
636                | (
637                    PointsType::ArrayPoints(ArrayPoints {
638                        array_type: Some(_),
639                    }),
640                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
641                )
642                | (
643                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
644                    PointsType::ArrayPoints(ArrayPoints {
645                        array_type: Some(_),
646                    }),
647                )
648                | (
649                    PointsType::ArrayPoints(ArrayPoints {
650                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
651                    }),
652                    PointsType::ArrayPoints(ArrayPoints {
653                        array_type: Some(ArrayType::StringArrayPoints(_)),
654                    }),
655                )
656                | (
657                    PointsType::ArrayPoints(ArrayPoints {
658                        array_type: Some(ArrayType::StringArrayPoints(_)),
659                    }),
660                    PointsType::ArrayPoints(ArrayPoints {
661                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
662                    }),
663                )
664                | (
665                    PointsType::Uint64Points(_),
666                    PointsType::IntegerPoints(_)
667                    | PointsType::StringPoints(_)
668                    | PointsType::DoublePoints(_)
669                    | PointsType::ArrayPoints(_)
670                    | PointsType::StructPoints(_),
671                )
672                | (
673                    PointsType::StructPoints(_),
674                    PointsType::DoublePoints(_)
675                    | PointsType::Uint64Points(_)
676                    | PointsType::StringPoints(_)
677                    | PointsType::IntegerPoints(_)
678                    | PointsType::ArrayPoints(_),
679                ) => {
680                    // todo: improve error
681                    panic!("mismatched types");
682                }
683            }
684        }
685
686        self.count.fetch_add(new_point_count, Ordering::Release);
687    }
688}
689
690impl PartialEq for SeriesBuffer {
691    fn eq(&self, other: &Self) -> bool {
692        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
693    }
694}
695
696impl PartialOrd for SeriesBuffer {
697    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
698        let flush_time = self.flush_time.load(Ordering::Acquire);
699        let other_flush_time = other.flush_time.load(Ordering::Acquire);
700        flush_time.partial_cmp(&other_flush_time)
701    }
702}
703
704impl SeriesBuffer {
705    fn new(capacity: usize) -> Self {
706        Self {
707            points: Mutex::new(HashMap::new()),
708            count: AtomicUsize::new(0),
709            flush_time: AtomicU64::new(0),
710            condvar: Condvar::new(),
711            max_capacity: capacity,
712        }
713    }
714
715    /// Checks if the buffer has enough capacity to add new points.
716    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
717    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
718    /// to handle splitting batches of points across multiple requests.
719    fn has_capacity(&self, new_points_count: usize) -> bool {
720        let count = self.count.load(Ordering::Acquire);
721        count == 0 || count + new_points_count <= self.max_capacity
722    }
723
724    fn lock(&self) -> SeriesBufferGuard<'_> {
725        SeriesBufferGuard {
726            sb: self.points.lock(),
727            count: &self.count,
728        }
729    }
730
731    fn take(&self) -> (usize, Vec<Series>) {
732        let mut points = self.lock();
733        self.flush_time.store(
734            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
735            Ordering::Release,
736        );
737        let result = points
738            .sb
739            .drain()
740            .map(|(ChannelDescriptor { name, tags }, points)| {
741                let channel = Channel { name };
742                let points_obj = Points {
743                    points_type: Some(points),
744                };
745                Series {
746                    channel: Some(channel),
747                    tags: tags
748                        .map(|tags| tags.into_iter().collect())
749                        .unwrap_or_default(),
750                    points: Some(points_obj),
751                }
752            })
753            .collect();
754        let result_count = points
755            .count
756            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
757            .unwrap();
758        (result_count, result)
759    }
760
761    fn is_empty(&self) -> bool {
762        self.count() == 0
763    }
764
765    fn count(&self) -> usize {
766        self.count.load(Ordering::Acquire)
767    }
768
769    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
770        let mut points_lock = self.points.lock();
771        // concurrency bug without this - the buffer could have been emptied since we
772        // checked the count, so this will wait forever & block any new points from entering
773        if !points_lock.is_empty() {
774            self.condvar.wait(&mut points_lock);
775        } else {
776            debug!("buffer emptied since last check, skipping condvar wait");
777        }
778        on_notify(SeriesBufferGuard {
779            sb: points_lock,
780            count: &self.count,
781        });
782    }
783
784    fn notify(&self) -> bool {
785        self.condvar.notify_one()
786    }
787}
788
789fn batch_processor(
790    running: Arc<AtomicBool>,
791    points_buffer: Arc<SeriesBuffer>,
792    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
793    max_request_delay: Duration,
794) {
795    loop {
796        debug!("starting processor loop");
797        if points_buffer.is_empty() {
798            if !running.load(Ordering::Acquire) {
799                debug!("batch processor thread exiting due to running flag");
800                drop(request_chan);
801                break;
802            } else {
803                debug!("empty points buffer, waiting");
804                thread::park_timeout(max_request_delay);
805            }
806            continue;
807        }
808        let (point_count, series) = points_buffer.take();
809
810        if points_buffer.notify() {
811            debug!("notified one waiting thread after clearing points buffer");
812        }
813
814        let write_request = WriteRequestNominal { series };
815
816        if request_chan.is_full() {
817            debug!("ready to queue request but request channel is full");
818        }
819        let rep = request_chan.send((write_request, point_count));
820        debug!("queued request for processing");
821        if rep.is_err() {
822            error!("failed to send request to dispatcher");
823        } else {
824            debug!("finished submitting request");
825        }
826
827        thread::park_timeout(max_request_delay);
828    }
829    debug!("batch processor thread exiting");
830}
831
832impl Drop for NominalDatasetStream {
833    fn drop(&mut self) {
834        debug!("starting drop for NominalDatasetStream");
835        self.running.store(false, Ordering::Release);
836        loop {
837            let count = self.unflushed_points.load(Ordering::Acquire);
838            if count == 0 {
839                break;
840            }
841            debug!(
842                "waiting for all points to be flushed before dropping stream, {count} points remaining",
843            );
844            // todo: reduce this + give up after some maximum timeout is reached
845            thread::sleep(Duration::from_millis(50));
846        }
847    }
848}
849
850fn request_dispatcher<C: WriteRequestConsumer + 'static>(
851    running: Arc<AtomicBool>,
852    unflushed_points: Arc<AtomicUsize>,
853    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
854    consumer: Arc<C>,
855) {
856    let mut total_request_time = 0;
857    loop {
858        match request_rx.recv() {
859            Ok((request, point_count)) => {
860                debug!("received writerequest from channel");
861                let req_start = Instant::now();
862                match consumer.consume(&request) {
863                    Ok(_) => {
864                        let time = req_start.elapsed().as_millis();
865                        debug!("request of {} points sent in {} ms", point_count, time);
866                        total_request_time += time as u64;
867                    }
868                    Err(e) => {
869                        error!("Failed to send request: {e:?}");
870                    }
871                }
872                unflushed_points.fetch_sub(point_count, Ordering::Release);
873
874                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
875                {
876                    info!("all points flushed, closing dispatcher thread");
877                    // notify the processor thread that all points have been flushed
878                    drop(request_rx);
879                    break;
880                }
881            }
882            Err(e) => {
883                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
884                break;
885            }
886        }
887    }
888    debug!(
889        "request dispatcher thread exiting. total request time: {}",
890        total_request_time
891    );
892}
893
894fn points_len(points_type: &PointsType) -> usize {
895    match points_type {
896        PointsType::DoublePoints(points) => points.points.len(),
897        PointsType::StringPoints(points) => points.points.len(),
898        PointsType::IntegerPoints(points) => points.points.len(),
899        PointsType::Uint64Points(points) => points.points.len(),
900        PointsType::ArrayPoints(points) => match &points.array_type {
901            Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
902            Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
903            None => 0,
904        },
905        PointsType::StructPoints(points) => points.points.len(),
906    }
907}