Skip to main content

nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::Points;
24use nominal_api::tonic::io::nominal::scout::api::proto::Series;
25use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
27use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
28use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
29use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
30use parking_lot::Condvar;
31use parking_lot::Mutex;
32use parking_lot::MutexGuard;
33use tracing::debug;
34use tracing::error;
35use tracing::info;
36
37use crate::client::NominalApiClients;
38use crate::client::PRODUCTION_API_URL;
39use crate::consumer::AvroFileConsumer;
40use crate::consumer::DualWriteRequestConsumer;
41use crate::consumer::ListeningWriteRequestConsumer;
42use crate::consumer::NominalCoreConsumer;
43use crate::consumer::RequestConsumerWithFallback;
44use crate::consumer::WriteRequestConsumer;
45use crate::listener::LoggingListener;
46use crate::types::ChannelDescriptor;
47use crate::types::IntoPoints;
48use crate::types::IntoTimestamp;
49
50#[derive(Debug, Clone)]
51pub struct NominalStreamOpts {
52    pub max_points_per_record: usize,
53    pub max_request_delay: Duration,
54    pub max_buffered_requests: usize,
55    pub request_dispatcher_tasks: usize,
56    pub base_api_url: String,
57}
58
59impl Default for NominalStreamOpts {
60    fn default() -> Self {
61        Self {
62            max_points_per_record: 250_000,
63            max_request_delay: Duration::from_millis(100),
64            max_buffered_requests: 4,
65            request_dispatcher_tasks: 8,
66            base_api_url: PRODUCTION_API_URL.to_string(),
67        }
68    }
69}
70
71#[derive(Default)]
72pub struct NominalDatasetStreamBuilder {
73    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
74    stream_to_file: Option<PathBuf>,
75    file_fallback: Option<PathBuf>,
76    listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
77    opts: NominalStreamOpts,
78}
79
80impl Debug for NominalDatasetStreamBuilder {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("NominalDatasetStreamBuilder")
83            .field("stream_to_core", &self.stream_to_core.is_some())
84            .field("stream_to_file", &self.stream_to_file)
85            .field("file_fallback", &self.file_fallback)
86            .field("listeners", &self.listeners.len())
87            .finish()
88    }
89}
90
91impl NominalDatasetStreamBuilder {
92    pub fn new() -> Self {
93        Self::default()
94    }
95}
96
97impl NominalDatasetStreamBuilder {
98    pub fn stream_to_core(
99        self,
100        bearer_token: BearerToken,
101        dataset: ResourceIdentifier,
102        handle: tokio::runtime::Handle,
103    ) -> NominalDatasetStreamBuilder {
104        NominalDatasetStreamBuilder {
105            stream_to_core: Some((bearer_token, dataset, handle)),
106            stream_to_file: self.stream_to_file,
107            file_fallback: self.file_fallback,
108            listeners: self.listeners,
109            opts: self.opts,
110        }
111    }
112
113    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
114        self.stream_to_file = Some(file_path.into());
115        self
116    }
117
118    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
119        self.file_fallback = Some(file_path.into());
120        self
121    }
122
123    pub fn add_listener(
124        mut self,
125        listener: Arc<dyn crate::listener::NominalStreamListener>,
126    ) -> Self {
127        self.listeners.push(listener);
128        self
129    }
130
131    pub fn with_listeners(
132        mut self,
133        listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
134    ) -> Self {
135        self.listeners = listeners;
136        self
137    }
138
139    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
140        self.opts = opts;
141        self
142    }
143
144    #[cfg(feature = "logging")]
145    fn init_logging(self, directive: Option<&str>) -> Self {
146        use tracing_subscriber::layer::SubscriberExt;
147        use tracing_subscriber::util::SubscriberInitExt;
148
149        // Build the filter, either from an explicit directive or the environment.
150        let base = tracing_subscriber::EnvFilter::builder()
151            .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
152        let env_filter = match directive {
153            Some(d) => base.parse_lossy(d),
154            None => base.from_env_lossy(),
155        };
156
157        let subscriber = tracing_subscriber::registry()
158            .with(
159                tracing_subscriber::fmt::layer()
160                    .with_thread_ids(true)
161                    .with_thread_names(true)
162                    .with_line_number(true),
163            )
164            .with(env_filter);
165
166        if let Err(error) = subscriber.try_init() {
167            eprintln!("nominal streaming failed to enable logging: {error}");
168        }
169
170        self
171    }
172
173    #[cfg(feature = "logging")]
174    pub fn enable_logging(self) -> Self {
175        self.init_logging(None)
176    }
177
178    #[cfg(feature = "logging")]
179    pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
180        self.init_logging(Some(log_directive))
181    }
182
183    pub fn build(self) -> NominalDatasetStream {
184        let core_consumer = self.core_consumer();
185        let file_consumer = self.file_consumer();
186        let fallback_consumer = self.fallback_consumer();
187
188        match (core_consumer, file_consumer, fallback_consumer) {
189            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
190            (Some(_), Some(_), Some(_)) => {
191                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
192            }
193            (Some(core), None, None) => self.into_stream(core),
194            (Some(core), None, Some(fallback)) => {
195                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
196            }
197            (None, Some(file), None) => self.into_stream(file),
198            (None, Some(file), Some(fallback)) => {
199                // todo: should this even be supported?
200                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
201            }
202            (Some(core), Some(file), None) => {
203                self.into_stream(DualWriteRequestConsumer::new(core, file))
204            }
205        }
206    }
207
208    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
209        self.stream_to_core
210            .as_ref()
211            .map(|(auth_provider, dataset, handle)| {
212                NominalCoreConsumer::new(
213                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
214                    handle.clone(),
215                    auth_provider.clone(),
216                    dataset.clone(),
217                )
218            })
219    }
220
221    fn file_consumer(&self) -> Option<AvroFileConsumer> {
222        self.stream_to_file
223            .as_ref()
224            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
225    }
226
227    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
228        self.file_fallback
229            .as_ref()
230            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
231    }
232
233    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
234        let mut listeners = self.listeners;
235        listeners.push(Arc::new(LoggingListener));
236        let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
237        NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
238    }
239}
240
241// for backcompat, new code should use NominalDatasetStream
242#[deprecated]
243pub type NominalDatasourceStream = NominalDatasetStream;
244
245pub struct NominalDatasetStream {
246    opts: NominalStreamOpts,
247    running: Arc<AtomicBool>,
248    unflushed_points: Arc<AtomicUsize>,
249    primary_buffer: Arc<SeriesBuffer>,
250    secondary_buffer: Arc<SeriesBuffer>,
251    primary_handle: thread::JoinHandle<()>,
252    secondary_handle: thread::JoinHandle<()>,
253}
254
255impl NominalDatasetStream {
256    pub fn builder() -> NominalDatasetStreamBuilder {
257        NominalDatasetStreamBuilder::new()
258    }
259
260    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
261        consumer: C,
262        opts: NominalStreamOpts,
263    ) -> Self {
264        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
265        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
266
267        let (request_tx, request_rx) =
268            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
269
270        let running = Arc::new(AtomicBool::new(true));
271        let unflushed_points = Arc::new(AtomicUsize::new(0));
272
273        let primary_handle = thread::Builder::new()
274            .name("nmstream_primary".to_string())
275            .spawn({
276                let points_buffer = Arc::clone(&primary_buffer);
277                let running = running.clone();
278                let tx = request_tx.clone();
279                move || {
280                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
281                }
282            })
283            .unwrap();
284
285        let secondary_handle = thread::Builder::new()
286            .name("nmstream_secondary".to_string())
287            .spawn({
288                let secondary_buffer = Arc::clone(&secondary_buffer);
289                let running = running.clone();
290                move || {
291                    batch_processor(
292                        running,
293                        secondary_buffer,
294                        request_tx,
295                        opts.max_request_delay,
296                    );
297                }
298            })
299            .unwrap();
300
301        let consumer = Arc::new(consumer);
302
303        for i in 0..opts.request_dispatcher_tasks {
304            thread::Builder::new()
305                .name(format!("nmstream_dispatch_{i}"))
306                .spawn({
307                    let running = Arc::clone(&running);
308                    let unflushed_points = Arc::clone(&unflushed_points);
309                    let rx = request_rx.clone();
310                    let consumer = consumer.clone();
311                    move || {
312                        debug!("starting request dispatcher #{}", i);
313                        request_dispatcher(running, unflushed_points, rx, consumer);
314                    }
315                })
316                .unwrap();
317        }
318
319        NominalDatasetStream {
320            opts,
321            running,
322            unflushed_points,
323            primary_buffer,
324            secondary_buffer,
325            primary_handle,
326            secondary_handle,
327        }
328    }
329
330    pub fn double_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalDoubleWriter<'_> {
331        NominalDoubleWriter {
332            writer: NominalChannelWriter::new(self, channel_descriptor),
333        }
334    }
335
336    pub fn string_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStringWriter<'_> {
337        NominalStringWriter {
338            writer: NominalChannelWriter::new(self, channel_descriptor),
339        }
340    }
341
342    pub fn integer_writer(
343        &self,
344        channel_descriptor: ChannelDescriptor,
345    ) -> NominalIntegerWriter<'_> {
346        NominalIntegerWriter {
347            writer: NominalChannelWriter::new(self, channel_descriptor),
348        }
349    }
350
351    pub fn uint64_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalUint64Writer<'_> {
352        NominalUint64Writer {
353            writer: NominalChannelWriter::new(self, channel_descriptor),
354        }
355    }
356
357    pub fn struct_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStructWriter<'_> {
358        NominalStructWriter {
359            writer: NominalChannelWriter::new(self, channel_descriptor),
360        }
361    }
362
363    pub fn double_array_writer(
364        &self,
365        channel_descriptor: ChannelDescriptor,
366    ) -> NominalDoubleArrayWriter<'_> {
367        NominalDoubleArrayWriter {
368            writer: NominalChannelWriter::new(self, channel_descriptor),
369        }
370    }
371
372    pub fn string_array_writer(
373        &self,
374        channel_descriptor: ChannelDescriptor,
375    ) -> NominalStringArrayWriter<'_> {
376        NominalStringArrayWriter {
377            writer: NominalChannelWriter::new(self, channel_descriptor),
378        }
379    }
380
381    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
382        let new_points = new_points.into_points();
383        let new_count = points_len(&new_points);
384
385        self.when_capacity(new_count, |mut sb| {
386            sb.extend(channel_descriptor, new_points)
387        });
388    }
389
390    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
391        self.unflushed_points
392            .fetch_add(new_count, Ordering::Release);
393
394        if self.primary_buffer.has_capacity(new_count) {
395            debug!("adding {} points to primary buffer", new_count);
396            callback(self.primary_buffer.lock());
397        } else if self.secondary_buffer.has_capacity(new_count) {
398            // primary buffer is definitely full
399            self.primary_handle.thread().unpark();
400            debug!("adding {} points to secondary buffer", new_count);
401            callback(self.secondary_buffer.lock());
402        } else {
403            let buf = if self.primary_buffer < self.secondary_buffer {
404                info!("waiting for primary buffer to flush to append {new_count} points...");
405                self.primary_handle.thread().unpark();
406                &self.primary_buffer
407            } else {
408                info!("waiting for secondary buffer to flush to append {new_count} points...");
409                self.secondary_handle.thread().unpark();
410                &self.secondary_buffer
411            };
412
413            buf.on_notify(callback);
414        }
415    }
416}
417
418pub struct NominalChannelWriter<'ds, T>
419where
420    Vec<T>: IntoPoints,
421{
422    channel: ChannelDescriptor,
423    stream: &'ds NominalDatasetStream,
424    last_flushed_at: Instant,
425    unflushed: Vec<T>,
426}
427
428impl<T> NominalChannelWriter<'_, T>
429where
430    Vec<T>: IntoPoints,
431{
432    fn new(
433        stream: &NominalDatasetStream,
434        channel: ChannelDescriptor,
435    ) -> NominalChannelWriter<'_, T> {
436        NominalChannelWriter {
437            channel,
438            stream,
439            last_flushed_at: Instant::now(),
440            unflushed: vec![],
441        }
442    }
443
444    fn push_point(&mut self, point: T) {
445        self.unflushed.push(point);
446        if self.unflushed.len() >= self.stream.opts.max_points_per_record
447            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
448        {
449            debug!(
450                "conditionally flushing {:?}, ({} points, {:?} since last)",
451                self.channel,
452                self.unflushed.len(),
453                self.last_flushed_at.elapsed()
454            );
455            self.flush();
456        }
457    }
458
459    fn flush(&mut self) {
460        if self.unflushed.is_empty() {
461            return;
462        }
463        info!(
464            "flushing writer for {:?} with {} points",
465            self.channel,
466            self.unflushed.len()
467        );
468        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
469            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
470            buf.extend(&self.channel, to_flush);
471            self.last_flushed_at = Instant::now();
472        })
473    }
474}
475
476impl<T> Drop for NominalChannelWriter<'_, T>
477where
478    Vec<T>: IntoPoints,
479{
480    fn drop(&mut self) {
481        info!("flushing then dropping writer for: {:?}", self.channel);
482        self.flush();
483    }
484}
485
486pub struct NominalDoubleWriter<'ds> {
487    writer: NominalChannelWriter<'ds, DoublePoint>,
488}
489
490impl NominalDoubleWriter<'_> {
491    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
492        self.writer.push_point(DoublePoint {
493            timestamp: Some(timestamp.into_timestamp()),
494            value,
495        });
496    }
497}
498
499pub struct NominalIntegerWriter<'ds> {
500    writer: NominalChannelWriter<'ds, IntegerPoint>,
501}
502
503impl NominalIntegerWriter<'_> {
504    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
505        self.writer.push_point(IntegerPoint {
506            timestamp: Some(timestamp.into_timestamp()),
507            value,
508        });
509    }
510}
511
512pub struct NominalUint64Writer<'ds> {
513    writer: NominalChannelWriter<'ds, Uint64Point>,
514}
515
516impl NominalUint64Writer<'_> {
517    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
518        self.writer.push_point(Uint64Point {
519            timestamp: Some(timestamp.into_timestamp()),
520            value,
521        });
522    }
523}
524
525pub struct NominalStringWriter<'ds> {
526    writer: NominalChannelWriter<'ds, StringPoint>,
527}
528
529impl NominalStringWriter<'_> {
530    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
531        self.writer.push_point(StringPoint {
532            timestamp: Some(timestamp.into_timestamp()),
533            value: value.into(),
534        });
535    }
536}
537
538pub struct NominalStructWriter<'ds> {
539    writer: NominalChannelWriter<'ds, StructPoint>,
540}
541
542impl NominalStructWriter<'_> {
543    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
544        self.writer.push_point(StructPoint {
545            timestamp: Some(timestamp.into_timestamp()),
546            json_string: value.into(),
547        });
548    }
549}
550
551pub struct NominalDoubleArrayWriter<'ds> {
552    writer: NominalChannelWriter<'ds, DoubleArrayPoint>,
553}
554
555impl NominalDoubleArrayWriter<'_> {
556    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: Vec<f64>) {
557        self.writer.push_point(DoubleArrayPoint {
558            timestamp: Some(timestamp.into_timestamp()),
559            value,
560        });
561    }
562}
563
564pub struct NominalStringArrayWriter<'ds> {
565    writer: NominalChannelWriter<'ds, StringArrayPoint>,
566}
567
568impl NominalStringArrayWriter<'_> {
569    pub fn push(
570        &mut self,
571        timestamp: impl IntoTimestamp,
572        value: impl IntoIterator<Item = impl Into<String>>,
573    ) {
574        self.writer.push_point(StringArrayPoint {
575            timestamp: Some(timestamp.into_timestamp()),
576            value: value.into_iter().map(Into::into).collect(),
577        });
578    }
579}
580
581struct SeriesBuffer {
582    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
583    /// The total number of data points in the buffer.
584    ///
585    /// To ensure that `count` stays in sync with the contents of the `HashMap`,
586    /// only update `count` through the `SeriesBufferGuard`.
587    count: AtomicUsize,
588    flush_time: AtomicU64,
589    condvar: Condvar,
590    max_capacity: usize,
591}
592
593struct SeriesBufferGuard<'sb> {
594    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
595    count: &'sb AtomicUsize,
596}
597
598impl SeriesBufferGuard<'_> {
599    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
600        let points = points.into_points();
601        let new_point_count = points_len(&points);
602
603        if !self.sb.contains_key(channel_descriptor) {
604            self.sb.insert(channel_descriptor.clone(), points);
605        } else {
606            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
607                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
608                    existing.points.extend(new.points)
609                }
610                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
611                    existing.points.extend(new.points)
612                }
613                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
614                    existing.points.extend(new.points)
615                }
616                (
617                    PointsType::ArrayPoints(ArrayPoints {
618                        array_type: Some(ArrayType::DoubleArrayPoints(existing)),
619                    }),
620                    PointsType::ArrayPoints(ArrayPoints {
621                        array_type: Some(ArrayType::DoubleArrayPoints(new)),
622                    }),
623                ) => existing.points.extend(new.points),
624                (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
625                    existing.points.extend(new.points)
626                }
627                (
628                    PointsType::ArrayPoints(ArrayPoints {
629                        array_type: Some(ArrayType::StringArrayPoints(existing)),
630                    }),
631                    PointsType::ArrayPoints(ArrayPoints {
632                        array_type: Some(ArrayType::StringArrayPoints(new)),
633                    }),
634                ) => existing.points.extend(new.points),
635                (
636                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
637                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
638                ) => {}
639                (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
640                    existing.points.extend(new.points);
641                }
642                // this is hideous, but exhaustive matching is good to avoid future errors
643                (
644                    PointsType::DoublePoints(_),
645                    PointsType::IntegerPoints(_)
646                    | PointsType::Uint64Points(_)
647                    | PointsType::StringPoints(_)
648                    | PointsType::ArrayPoints(_)
649                    | PointsType::StructPoints(_),
650                )
651                | (
652                    PointsType::StringPoints(_),
653                    PointsType::DoublePoints(_)
654                    | PointsType::IntegerPoints(_)
655                    | PointsType::Uint64Points(_)
656                    | PointsType::ArrayPoints(_)
657                    | PointsType::StructPoints(_),
658                )
659                | (
660                    PointsType::IntegerPoints(_),
661                    PointsType::DoublePoints(_)
662                    | PointsType::Uint64Points(_)
663                    | PointsType::StringPoints(_)
664                    | PointsType::ArrayPoints(_)
665                    | PointsType::StructPoints(_),
666                )
667                | (
668                    PointsType::ArrayPoints(_),
669                    PointsType::DoublePoints(_)
670                    | PointsType::Uint64Points(_)
671                    | PointsType::StringPoints(_)
672                    | PointsType::IntegerPoints(_)
673                    | PointsType::StructPoints(_),
674                )
675                | (
676                    PointsType::ArrayPoints(ArrayPoints {
677                        array_type: Some(_),
678                    }),
679                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
680                )
681                | (
682                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
683                    PointsType::ArrayPoints(ArrayPoints {
684                        array_type: Some(_),
685                    }),
686                )
687                | (
688                    PointsType::ArrayPoints(ArrayPoints {
689                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
690                    }),
691                    PointsType::ArrayPoints(ArrayPoints {
692                        array_type: Some(ArrayType::StringArrayPoints(_)),
693                    }),
694                )
695                | (
696                    PointsType::ArrayPoints(ArrayPoints {
697                        array_type: Some(ArrayType::StringArrayPoints(_)),
698                    }),
699                    PointsType::ArrayPoints(ArrayPoints {
700                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
701                    }),
702                )
703                | (
704                    PointsType::Uint64Points(_),
705                    PointsType::IntegerPoints(_)
706                    | PointsType::StringPoints(_)
707                    | PointsType::DoublePoints(_)
708                    | PointsType::ArrayPoints(_)
709                    | PointsType::StructPoints(_),
710                )
711                | (
712                    PointsType::StructPoints(_),
713                    PointsType::DoublePoints(_)
714                    | PointsType::Uint64Points(_)
715                    | PointsType::StringPoints(_)
716                    | PointsType::IntegerPoints(_)
717                    | PointsType::ArrayPoints(_),
718                ) => {
719                    // todo: improve error
720                    panic!("mismatched types");
721                }
722            }
723        }
724
725        self.count.fetch_add(new_point_count, Ordering::Release);
726    }
727}
728
729impl PartialEq for SeriesBuffer {
730    fn eq(&self, other: &Self) -> bool {
731        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
732    }
733}
734
735impl PartialOrd for SeriesBuffer {
736    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
737        let flush_time = self.flush_time.load(Ordering::Acquire);
738        let other_flush_time = other.flush_time.load(Ordering::Acquire);
739        flush_time.partial_cmp(&other_flush_time)
740    }
741}
742
743impl SeriesBuffer {
744    fn new(capacity: usize) -> Self {
745        Self {
746            points: Mutex::new(HashMap::new()),
747            count: AtomicUsize::new(0),
748            flush_time: AtomicU64::new(0),
749            condvar: Condvar::new(),
750            max_capacity: capacity,
751        }
752    }
753
754    /// Checks if the buffer has enough capacity to add new points.
755    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
756    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
757    /// to handle splitting batches of points across multiple requests.
758    fn has_capacity(&self, new_points_count: usize) -> bool {
759        let count = self.count.load(Ordering::Acquire);
760        count == 0 || count + new_points_count <= self.max_capacity
761    }
762
763    fn lock(&self) -> SeriesBufferGuard<'_> {
764        SeriesBufferGuard {
765            sb: self.points.lock(),
766            count: &self.count,
767        }
768    }
769
770    fn take(&self) -> (usize, Vec<Series>) {
771        let mut points = self.lock();
772        self.flush_time.store(
773            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
774            Ordering::Release,
775        );
776        let result = points
777            .sb
778            .drain()
779            .map(|(ChannelDescriptor { name, tags }, points)| {
780                let channel = Channel { name };
781                let points_obj = Points {
782                    points_type: Some(points),
783                };
784                Series {
785                    channel: Some(channel),
786                    tags: tags
787                        .map(|tags| tags.into_iter().collect())
788                        .unwrap_or_default(),
789                    points: Some(points_obj),
790                }
791            })
792            .collect();
793        let result_count = points
794            .count
795            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
796            .unwrap();
797        (result_count, result)
798    }
799
800    fn is_empty(&self) -> bool {
801        self.count() == 0
802    }
803
804    fn count(&self) -> usize {
805        self.count.load(Ordering::Acquire)
806    }
807
808    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
809        let mut points_lock = self.points.lock();
810        // concurrency bug without this - the buffer could have been emptied since we
811        // checked the count, so this will wait forever & block any new points from entering
812        if !points_lock.is_empty() {
813            self.condvar.wait(&mut points_lock);
814        } else {
815            debug!("buffer emptied since last check, skipping condvar wait");
816        }
817        on_notify(SeriesBufferGuard {
818            sb: points_lock,
819            count: &self.count,
820        });
821    }
822
823    fn notify(&self) -> bool {
824        self.condvar.notify_one()
825    }
826}
827
828fn batch_processor(
829    running: Arc<AtomicBool>,
830    points_buffer: Arc<SeriesBuffer>,
831    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
832    max_request_delay: Duration,
833) {
834    loop {
835        debug!("starting processor loop");
836        if points_buffer.is_empty() {
837            if !running.load(Ordering::Acquire) {
838                debug!("batch processor thread exiting due to running flag");
839                drop(request_chan);
840                break;
841            } else {
842                debug!("empty points buffer, waiting");
843                thread::park_timeout(max_request_delay);
844            }
845            continue;
846        }
847        let (point_count, series) = points_buffer.take();
848
849        if points_buffer.notify() {
850            debug!("notified one waiting thread after clearing points buffer");
851        }
852
853        let write_request = WriteRequestNominal {
854            series,
855            session_name: None,
856        };
857
858        if request_chan.is_full() {
859            debug!("ready to queue request but request channel is full");
860        }
861        let rep = request_chan.send((write_request, point_count));
862        debug!("queued request for processing");
863        if rep.is_err() {
864            error!("failed to send request to dispatcher");
865        } else {
866            debug!("finished submitting request");
867        }
868
869        thread::park_timeout(max_request_delay);
870    }
871    debug!("batch processor thread exiting");
872}
873
874impl Drop for NominalDatasetStream {
875    fn drop(&mut self) {
876        debug!("starting drop for NominalDatasetStream");
877        self.running.store(false, Ordering::Release);
878        loop {
879            let count = self.unflushed_points.load(Ordering::Acquire);
880            if count == 0 {
881                break;
882            }
883            debug!(
884                "waiting for all points to be flushed before dropping stream, {count} points remaining",
885            );
886            // todo: reduce this + give up after some maximum timeout is reached
887            thread::sleep(Duration::from_millis(50));
888        }
889    }
890}
891
892fn request_dispatcher<C: WriteRequestConsumer + 'static>(
893    running: Arc<AtomicBool>,
894    unflushed_points: Arc<AtomicUsize>,
895    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
896    consumer: Arc<C>,
897) {
898    let mut total_request_time = 0;
899    loop {
900        match request_rx.recv() {
901            Ok((request, point_count)) => {
902                debug!("received writerequest from channel");
903                let req_start = Instant::now();
904                match consumer.consume(&request) {
905                    Ok(_) => {
906                        let time = req_start.elapsed().as_millis();
907                        debug!("request of {} points sent in {} ms", point_count, time);
908                        total_request_time += time as u64;
909                    }
910                    Err(e) => {
911                        error!("Failed to send request: {e:?}");
912                    }
913                }
914                unflushed_points.fetch_sub(point_count, Ordering::Release);
915
916                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
917                {
918                    info!("all points flushed, closing dispatcher thread");
919                    // notify the processor thread that all points have been flushed
920                    drop(request_rx);
921                    break;
922                }
923            }
924            Err(e) => {
925                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
926                break;
927            }
928        }
929    }
930    debug!(
931        "request dispatcher thread exiting. total request time: {}",
932        total_request_time
933    );
934}
935
936fn points_len(points_type: &PointsType) -> usize {
937    match points_type {
938        PointsType::DoublePoints(points) => points.points.len(),
939        PointsType::StringPoints(points) => points.points.len(),
940        PointsType::IntegerPoints(points) => points.points.len(),
941        PointsType::Uint64Points(points) => points.points.len(),
942        PointsType::ArrayPoints(points) => match &points.array_type {
943            Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
944            Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
945            None => 0,
946        },
947        PointsType::StructPoints(points) => points.points.len(),
948    }
949}