Skip to main content

nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::Points;
24use nominal_api::tonic::io::nominal::scout::api::proto::Series;
25use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
27use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
28use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
29use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
30use parking_lot::Condvar;
31use parking_lot::Mutex;
32use parking_lot::MutexGuard;
33use tracing::debug;
34use tracing::error;
35use tracing::info;
36
37use crate::client::NominalApiClients;
38use crate::client::PRODUCTION_API_URL;
39use crate::consumer::AvroFileConsumer;
40use crate::consumer::DualWriteRequestConsumer;
41use crate::consumer::ListeningWriteRequestConsumer;
42use crate::consumer::NominalCoreConsumer;
43use crate::consumer::RequestConsumerWithFallback;
44use crate::consumer::WriteRequestConsumer;
45use crate::listener::LoggingListener;
46use crate::types::ChannelDescriptor;
47use crate::types::IntoPoints;
48use crate::types::IntoTimestamp;
49
50#[derive(Debug, Clone)]
51pub struct NominalStreamOpts {
52    pub max_points_per_record: usize,
53    pub max_request_delay: Duration,
54    pub max_buffered_requests: usize,
55    pub request_dispatcher_tasks: usize,
56    pub base_api_url: String,
57}
58
59impl Default for NominalStreamOpts {
60    fn default() -> Self {
61        Self {
62            max_points_per_record: 250_000,
63            max_request_delay: Duration::from_millis(100),
64            max_buffered_requests: 4,
65            request_dispatcher_tasks: 8,
66            base_api_url: PRODUCTION_API_URL.to_string(),
67        }
68    }
69}
70
71#[derive(Default)]
72pub struct NominalDatasetStreamBuilder {
73    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
74    stream_to_file: Option<PathBuf>,
75    file_fallback: Option<PathBuf>,
76    listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
77    opts: NominalStreamOpts,
78}
79
80impl Debug for NominalDatasetStreamBuilder {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("NominalDatasetStreamBuilder")
83            .field("stream_to_core", &self.stream_to_core.is_some())
84            .field("stream_to_file", &self.stream_to_file)
85            .field("file_fallback", &self.file_fallback)
86            .field("listeners", &self.listeners.len())
87            .finish()
88    }
89}
90
91impl NominalDatasetStreamBuilder {
92    pub fn new() -> Self {
93        Self::default()
94    }
95}
96
97impl NominalDatasetStreamBuilder {
98    pub fn stream_to_core(
99        self,
100        bearer_token: BearerToken,
101        dataset: ResourceIdentifier,
102        handle: tokio::runtime::Handle,
103    ) -> NominalDatasetStreamBuilder {
104        NominalDatasetStreamBuilder {
105            stream_to_core: Some((bearer_token, dataset, handle)),
106            stream_to_file: self.stream_to_file,
107            file_fallback: self.file_fallback,
108            listeners: self.listeners,
109            opts: self.opts,
110        }
111    }
112
113    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
114        self.stream_to_file = Some(file_path.into());
115        self
116    }
117
118    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
119        self.file_fallback = Some(file_path.into());
120        self
121    }
122
123    pub fn add_listener(
124        mut self,
125        listener: Arc<dyn crate::listener::NominalStreamListener>,
126    ) -> Self {
127        self.listeners.push(listener);
128        self
129    }
130
131    pub fn with_listeners(
132        mut self,
133        listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
134    ) -> Self {
135        self.listeners = listeners;
136        self
137    }
138
139    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
140        self.opts = opts;
141        self
142    }
143
144    #[cfg(feature = "logging")]
145    fn init_logging(self, directive: Option<&str>) -> Self {
146        use tracing_subscriber::layer::SubscriberExt;
147        use tracing_subscriber::util::SubscriberInitExt;
148
149        // Build the filter, either from an explicit directive or the environment.
150        let base = tracing_subscriber::EnvFilter::builder()
151            .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
152        let env_filter = match directive {
153            Some(d) => base.parse_lossy(d),
154            None => base.from_env_lossy(),
155        };
156
157        let subscriber = tracing_subscriber::registry()
158            .with(
159                tracing_subscriber::fmt::layer()
160                    .with_thread_ids(true)
161                    .with_thread_names(true)
162                    .with_line_number(true),
163            )
164            .with(env_filter);
165
166        if let Err(error) = subscriber.try_init() {
167            eprintln!("nominal streaming failed to enable logging: {error}");
168        }
169
170        self
171    }
172
173    #[cfg(feature = "logging")]
174    pub fn enable_logging(self) -> Self {
175        self.init_logging(None)
176    }
177
178    #[cfg(feature = "logging")]
179    pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
180        self.init_logging(Some(log_directive))
181    }
182
183    pub fn build(self) -> NominalDatasetStream {
184        let core_consumer = self.core_consumer();
185        let file_consumer = self.file_consumer();
186        let fallback_consumer = self.fallback_consumer();
187
188        match (core_consumer, file_consumer, fallback_consumer) {
189            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
190            (Some(_), Some(_), Some(_)) => {
191                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
192            }
193            (Some(core), None, None) => self.into_stream(core),
194            (Some(core), None, Some(fallback)) => {
195                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
196            }
197            (None, Some(file), None) => self.into_stream(file),
198            (None, Some(file), Some(fallback)) => {
199                // todo: should this even be supported?
200                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
201            }
202            (Some(core), Some(file), None) => {
203                self.into_stream(DualWriteRequestConsumer::new(core, file))
204            }
205        }
206    }
207
208    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
209        self.stream_to_core
210            .as_ref()
211            .map(|(auth_provider, dataset, handle)| {
212                NominalCoreConsumer::new(
213                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
214                    handle.clone(),
215                    auth_provider.clone(),
216                    dataset.clone(),
217                )
218            })
219    }
220
221    fn dataset_rid(&self) -> Option<ResourceIdentifier> {
222        self.stream_to_core.as_ref().map(|(_, rid, _)| rid.clone())
223    }
224
225    fn file_consumer(&self) -> Option<AvroFileConsumer> {
226        self.stream_to_file.as_ref().map(|path| {
227            AvroFileConsumer::new_with_full_path(path, true, self.dataset_rid()).unwrap()
228        })
229    }
230
231    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
232        self.file_fallback.as_ref().map(|path| {
233            AvroFileConsumer::new_with_full_path(path, true, self.dataset_rid()).unwrap()
234        })
235    }
236
237    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
238        let mut listeners = self.listeners;
239        listeners.push(Arc::new(LoggingListener));
240        let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
241        NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
242    }
243}
244
245// for backcompat, new code should use NominalDatasetStream
246#[deprecated]
247pub type NominalDatasourceStream = NominalDatasetStream;
248
249pub struct NominalDatasetStream {
250    opts: NominalStreamOpts,
251    running: Arc<AtomicBool>,
252    unflushed_points: Arc<AtomicUsize>,
253    primary_buffer: Arc<SeriesBuffer>,
254    secondary_buffer: Arc<SeriesBuffer>,
255    primary_handle: thread::JoinHandle<()>,
256    secondary_handle: thread::JoinHandle<()>,
257    /// Records the total time spent processing batches on background threads.
258    ///
259    /// This field is only available when the `instrument` feature is enabled.
260    /// This field is *not* SemVer-compliant -- it may be removed during a minor version bump.
261    /// This is intended only for use in benchmarks.
262    #[cfg(feature = "instrument")]
263    pub batch_processor_ns: Arc<AtomicU64>,
264    /// Records the total time dispatching batched points on background threads.
265    ///
266    /// This field is only available when the `instrument` feature is enabled.
267    /// This field is *not* SemVer-compliant -- it may be removed during a minor version bump.
268    /// This is intended only for use in benchmarks.
269    #[cfg(feature = "instrument")]
270    pub dispatcher_ns: Arc<AtomicU64>,
271}
272
273impl NominalDatasetStream {
274    pub fn builder() -> NominalDatasetStreamBuilder {
275        NominalDatasetStreamBuilder::new()
276    }
277
278    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
279        consumer: C,
280        opts: NominalStreamOpts,
281    ) -> Self {
282        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
283        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
284
285        let (request_tx, request_rx) =
286            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
287
288        let running = Arc::new(AtomicBool::new(true));
289        let unflushed_points = Arc::new(AtomicUsize::new(0));
290
291        #[cfg(feature = "instrument")]
292        let batch_processor_ns = Arc::new(AtomicU64::new(0));
293        #[cfg(feature = "instrument")]
294        let dispatcher_ns = Arc::new(AtomicU64::new(0));
295
296        let primary_handle = thread::Builder::new()
297            .name("nmstream_primary".to_string())
298            .spawn({
299                let points_buffer = Arc::clone(&primary_buffer);
300                let running = running.clone();
301                let tx = request_tx.clone();
302                #[cfg(feature = "instrument")]
303                let bp_ns = Arc::clone(&batch_processor_ns);
304                move || {
305                    batch_processor(
306                        running,
307                        points_buffer,
308                        tx,
309                        opts.max_request_delay,
310                        #[cfg(feature = "instrument")]
311                        bp_ns,
312                    );
313                }
314            })
315            .unwrap();
316
317        let secondary_handle = thread::Builder::new()
318            .name("nmstream_secondary".to_string())
319            .spawn({
320                let secondary_buffer = Arc::clone(&secondary_buffer);
321                let running = running.clone();
322                #[cfg(feature = "instrument")]
323                let bp_ns = Arc::clone(&batch_processor_ns);
324                move || {
325                    batch_processor(
326                        running,
327                        secondary_buffer,
328                        request_tx,
329                        opts.max_request_delay,
330                        #[cfg(feature = "instrument")]
331                        bp_ns,
332                    );
333                }
334            })
335            .unwrap();
336
337        let consumer = Arc::new(consumer);
338
339        for i in 0..opts.request_dispatcher_tasks {
340            thread::Builder::new()
341                .name(format!("nmstream_dispatch_{i}"))
342                .spawn({
343                    let running = Arc::clone(&running);
344                    let unflushed_points = Arc::clone(&unflushed_points);
345                    let rx = request_rx.clone();
346                    let consumer = consumer.clone();
347                    #[cfg(feature = "instrument")]
348                    let disp_ns = Arc::clone(&dispatcher_ns);
349                    move || {
350                        debug!("starting request dispatcher #{}", i);
351                        request_dispatcher(
352                            running,
353                            unflushed_points,
354                            rx,
355                            consumer,
356                            #[cfg(feature = "instrument")]
357                            disp_ns,
358                        );
359                    }
360                })
361                .unwrap();
362        }
363
364        NominalDatasetStream {
365            opts,
366            running,
367            unflushed_points,
368            primary_buffer,
369            secondary_buffer,
370            primary_handle,
371            secondary_handle,
372            #[cfg(feature = "instrument")]
373            batch_processor_ns,
374            #[cfg(feature = "instrument")]
375            dispatcher_ns,
376        }
377    }
378
379    pub fn double_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalDoubleWriter<'_> {
380        NominalDoubleWriter {
381            writer: NominalChannelWriter::new(self, channel_descriptor),
382        }
383    }
384
385    pub fn string_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStringWriter<'_> {
386        NominalStringWriter {
387            writer: NominalChannelWriter::new(self, channel_descriptor),
388        }
389    }
390
391    pub fn integer_writer(
392        &self,
393        channel_descriptor: ChannelDescriptor,
394    ) -> NominalIntegerWriter<'_> {
395        NominalIntegerWriter {
396            writer: NominalChannelWriter::new(self, channel_descriptor),
397        }
398    }
399
400    pub fn uint64_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalUint64Writer<'_> {
401        NominalUint64Writer {
402            writer: NominalChannelWriter::new(self, channel_descriptor),
403        }
404    }
405
406    pub fn struct_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStructWriter<'_> {
407        NominalStructWriter {
408            writer: NominalChannelWriter::new(self, channel_descriptor),
409        }
410    }
411
412    pub fn double_array_writer(
413        &self,
414        channel_descriptor: ChannelDescriptor,
415    ) -> NominalDoubleArrayWriter<'_> {
416        NominalDoubleArrayWriter {
417            writer: NominalChannelWriter::new(self, channel_descriptor),
418        }
419    }
420
421    pub fn string_array_writer(
422        &self,
423        channel_descriptor: ChannelDescriptor,
424    ) -> NominalStringArrayWriter<'_> {
425        NominalStringArrayWriter {
426            writer: NominalChannelWriter::new(self, channel_descriptor),
427        }
428    }
429
430    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
431        let new_points = new_points.into_points();
432        let new_count = points_len(&new_points);
433
434        self.when_capacity(new_count, |mut sb| {
435            sb.extend(channel_descriptor, new_points)
436        });
437    }
438
439    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
440        self.unflushed_points
441            .fetch_add(new_count, Ordering::Release);
442
443        if self.primary_buffer.has_capacity(new_count) {
444            debug!("adding {} points to primary buffer", new_count);
445            callback(self.primary_buffer.lock());
446        } else if self.secondary_buffer.has_capacity(new_count) {
447            // primary buffer is definitely full
448            self.primary_handle.thread().unpark();
449            debug!("adding {} points to secondary buffer", new_count);
450            callback(self.secondary_buffer.lock());
451        } else {
452            let buf = if self.primary_buffer < self.secondary_buffer {
453                info!("waiting for primary buffer to flush to append {new_count} points...");
454                self.primary_handle.thread().unpark();
455                &self.primary_buffer
456            } else {
457                info!("waiting for secondary buffer to flush to append {new_count} points...");
458                self.secondary_handle.thread().unpark();
459                &self.secondary_buffer
460            };
461
462            buf.on_notify(callback);
463        }
464    }
465}
466
467pub struct NominalChannelWriter<'ds, T>
468where
469    Vec<T>: IntoPoints,
470{
471    channel: ChannelDescriptor,
472    stream: &'ds NominalDatasetStream,
473    last_flushed_at: Instant,
474    unflushed: Vec<T>,
475}
476
477impl<T> NominalChannelWriter<'_, T>
478where
479    Vec<T>: IntoPoints,
480{
481    fn new(
482        stream: &NominalDatasetStream,
483        channel: ChannelDescriptor,
484    ) -> NominalChannelWriter<'_, T> {
485        NominalChannelWriter {
486            channel,
487            stream,
488            last_flushed_at: Instant::now(),
489            unflushed: vec![],
490        }
491    }
492
493    fn push_point(&mut self, point: T) {
494        self.unflushed.push(point);
495        if self.unflushed.len() >= self.stream.opts.max_points_per_record
496            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
497        {
498            debug!(
499                "conditionally flushing {:?}, ({} points, {:?} since last)",
500                self.channel,
501                self.unflushed.len(),
502                self.last_flushed_at.elapsed()
503            );
504            self.flush();
505        }
506    }
507
508    fn flush(&mut self) {
509        if self.unflushed.is_empty() {
510            return;
511        }
512        info!(
513            "flushing writer for {:?} with {} points",
514            self.channel,
515            self.unflushed.len()
516        );
517        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
518            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
519            buf.extend(&self.channel, to_flush);
520            self.last_flushed_at = Instant::now();
521        })
522    }
523}
524
525impl<T> Drop for NominalChannelWriter<'_, T>
526where
527    Vec<T>: IntoPoints,
528{
529    fn drop(&mut self) {
530        info!("flushing then dropping writer for: {:?}", self.channel);
531        self.flush();
532    }
533}
534
535pub struct NominalDoubleWriter<'ds> {
536    writer: NominalChannelWriter<'ds, DoublePoint>,
537}
538
539impl NominalDoubleWriter<'_> {
540    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
541        self.writer.push_point(DoublePoint {
542            timestamp: Some(timestamp.into_timestamp()),
543            value,
544        });
545    }
546}
547
548pub struct NominalIntegerWriter<'ds> {
549    writer: NominalChannelWriter<'ds, IntegerPoint>,
550}
551
552impl NominalIntegerWriter<'_> {
553    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
554        self.writer.push_point(IntegerPoint {
555            timestamp: Some(timestamp.into_timestamp()),
556            value,
557        });
558    }
559}
560
561pub struct NominalUint64Writer<'ds> {
562    writer: NominalChannelWriter<'ds, Uint64Point>,
563}
564
565impl NominalUint64Writer<'_> {
566    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
567        self.writer.push_point(Uint64Point {
568            timestamp: Some(timestamp.into_timestamp()),
569            value,
570        });
571    }
572}
573
574pub struct NominalStringWriter<'ds> {
575    writer: NominalChannelWriter<'ds, StringPoint>,
576}
577
578impl NominalStringWriter<'_> {
579    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
580        self.writer.push_point(StringPoint {
581            timestamp: Some(timestamp.into_timestamp()),
582            value: value.into(),
583        });
584    }
585}
586
587pub struct NominalStructWriter<'ds> {
588    writer: NominalChannelWriter<'ds, StructPoint>,
589}
590
591impl NominalStructWriter<'_> {
592    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
593        self.writer.push_point(StructPoint {
594            timestamp: Some(timestamp.into_timestamp()),
595            json_string: value.into(),
596        });
597    }
598}
599
600pub struct NominalDoubleArrayWriter<'ds> {
601    writer: NominalChannelWriter<'ds, DoubleArrayPoint>,
602}
603
604impl NominalDoubleArrayWriter<'_> {
605    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: Vec<f64>) {
606        self.writer.push_point(DoubleArrayPoint {
607            timestamp: Some(timestamp.into_timestamp()),
608            value,
609        });
610    }
611}
612
613pub struct NominalStringArrayWriter<'ds> {
614    writer: NominalChannelWriter<'ds, StringArrayPoint>,
615}
616
617impl NominalStringArrayWriter<'_> {
618    pub fn push(
619        &mut self,
620        timestamp: impl IntoTimestamp,
621        value: impl IntoIterator<Item = impl Into<String>>,
622    ) {
623        self.writer.push_point(StringArrayPoint {
624            timestamp: Some(timestamp.into_timestamp()),
625            value: value.into_iter().map(Into::into).collect(),
626        });
627    }
628}
629
630struct SeriesBuffer {
631    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
632    /// The total number of data points in the buffer.
633    ///
634    /// To ensure that `count` stays in sync with the contents of the `HashMap`,
635    /// only update `count` through the `SeriesBufferGuard`.
636    count: AtomicUsize,
637    flush_time: AtomicU64,
638    condvar: Condvar,
639    max_capacity: usize,
640}
641
642struct SeriesBufferGuard<'sb> {
643    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
644    count: &'sb AtomicUsize,
645}
646
647impl SeriesBufferGuard<'_> {
648    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
649        let points = points.into_points();
650        let new_point_count = points_len(&points);
651
652        if !self.sb.contains_key(channel_descriptor) {
653            self.sb.insert(channel_descriptor.clone(), points);
654        } else {
655            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
656                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
657                    existing.points.extend(new.points)
658                }
659                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
660                    existing.points.extend(new.points)
661                }
662                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
663                    existing.points.extend(new.points)
664                }
665                (
666                    PointsType::ArrayPoints(ArrayPoints {
667                        array_type: Some(ArrayType::DoubleArrayPoints(existing)),
668                    }),
669                    PointsType::ArrayPoints(ArrayPoints {
670                        array_type: Some(ArrayType::DoubleArrayPoints(new)),
671                    }),
672                ) => existing.points.extend(new.points),
673                (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
674                    existing.points.extend(new.points)
675                }
676                (
677                    PointsType::ArrayPoints(ArrayPoints {
678                        array_type: Some(ArrayType::StringArrayPoints(existing)),
679                    }),
680                    PointsType::ArrayPoints(ArrayPoints {
681                        array_type: Some(ArrayType::StringArrayPoints(new)),
682                    }),
683                ) => existing.points.extend(new.points),
684                (
685                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
686                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
687                ) => {}
688                (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
689                    existing.points.extend(new.points);
690                }
691                // this is hideous, but exhaustive matching is good to avoid future errors
692                (
693                    PointsType::DoublePoints(_),
694                    PointsType::IntegerPoints(_)
695                    | PointsType::Uint64Points(_)
696                    | PointsType::StringPoints(_)
697                    | PointsType::ArrayPoints(_)
698                    | PointsType::StructPoints(_),
699                )
700                | (
701                    PointsType::StringPoints(_),
702                    PointsType::DoublePoints(_)
703                    | PointsType::IntegerPoints(_)
704                    | PointsType::Uint64Points(_)
705                    | PointsType::ArrayPoints(_)
706                    | PointsType::StructPoints(_),
707                )
708                | (
709                    PointsType::IntegerPoints(_),
710                    PointsType::DoublePoints(_)
711                    | PointsType::Uint64Points(_)
712                    | PointsType::StringPoints(_)
713                    | PointsType::ArrayPoints(_)
714                    | PointsType::StructPoints(_),
715                )
716                | (
717                    PointsType::ArrayPoints(_),
718                    PointsType::DoublePoints(_)
719                    | PointsType::Uint64Points(_)
720                    | PointsType::StringPoints(_)
721                    | PointsType::IntegerPoints(_)
722                    | PointsType::StructPoints(_),
723                )
724                | (
725                    PointsType::ArrayPoints(ArrayPoints {
726                        array_type: Some(_),
727                    }),
728                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
729                )
730                | (
731                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
732                    PointsType::ArrayPoints(ArrayPoints {
733                        array_type: Some(_),
734                    }),
735                )
736                | (
737                    PointsType::ArrayPoints(ArrayPoints {
738                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
739                    }),
740                    PointsType::ArrayPoints(ArrayPoints {
741                        array_type: Some(ArrayType::StringArrayPoints(_)),
742                    }),
743                )
744                | (
745                    PointsType::ArrayPoints(ArrayPoints {
746                        array_type: Some(ArrayType::StringArrayPoints(_)),
747                    }),
748                    PointsType::ArrayPoints(ArrayPoints {
749                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
750                    }),
751                )
752                | (
753                    PointsType::Uint64Points(_),
754                    PointsType::IntegerPoints(_)
755                    | PointsType::StringPoints(_)
756                    | PointsType::DoublePoints(_)
757                    | PointsType::ArrayPoints(_)
758                    | PointsType::StructPoints(_),
759                )
760                | (
761                    PointsType::StructPoints(_),
762                    PointsType::DoublePoints(_)
763                    | PointsType::Uint64Points(_)
764                    | PointsType::StringPoints(_)
765                    | PointsType::IntegerPoints(_)
766                    | PointsType::ArrayPoints(_),
767                ) => {
768                    // todo: improve error
769                    panic!("mismatched types");
770                }
771            }
772        }
773
774        self.count.fetch_add(new_point_count, Ordering::Release);
775    }
776}
777
778impl PartialEq for SeriesBuffer {
779    fn eq(&self, other: &Self) -> bool {
780        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
781    }
782}
783
784impl PartialOrd for SeriesBuffer {
785    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
786        let flush_time = self.flush_time.load(Ordering::Acquire);
787        let other_flush_time = other.flush_time.load(Ordering::Acquire);
788        flush_time.partial_cmp(&other_flush_time)
789    }
790}
791
792impl SeriesBuffer {
793    fn new(capacity: usize) -> Self {
794        Self {
795            points: Mutex::new(HashMap::new()),
796            count: AtomicUsize::new(0),
797            flush_time: AtomicU64::new(0),
798            condvar: Condvar::new(),
799            max_capacity: capacity,
800        }
801    }
802
803    /// Checks if the buffer has enough capacity to add new points.
804    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
805    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
806    /// to handle splitting batches of points across multiple requests.
807    fn has_capacity(&self, new_points_count: usize) -> bool {
808        let count = self.count.load(Ordering::Acquire);
809        count == 0 || count + new_points_count <= self.max_capacity
810    }
811
812    fn lock(&self) -> SeriesBufferGuard<'_> {
813        SeriesBufferGuard {
814            sb: self.points.lock(),
815            count: &self.count,
816        }
817    }
818
819    fn take(&self) -> (usize, Vec<Series>) {
820        let mut points = self.lock();
821        self.flush_time.store(
822            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
823            Ordering::Release,
824        );
825        let result = points
826            .sb
827            .drain()
828            .map(|(ChannelDescriptor { name, tags }, points)| {
829                let channel = Channel { name };
830                let points_obj = Points {
831                    points_type: Some(points),
832                };
833                Series {
834                    channel: Some(channel),
835                    tags: tags
836                        .map(|tags| tags.into_iter().collect())
837                        .unwrap_or_default(),
838                    points: Some(points_obj),
839                }
840            })
841            .collect();
842        let result_count = points
843            .count
844            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
845            .unwrap();
846        (result_count, result)
847    }
848
849    fn is_empty(&self) -> bool {
850        self.count() == 0
851    }
852
853    fn count(&self) -> usize {
854        self.count.load(Ordering::Acquire)
855    }
856
857    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
858        let mut points_lock = self.points.lock();
859        // concurrency bug without this - the buffer could have been emptied since we
860        // checked the count, so this will wait forever & block any new points from entering
861        if !points_lock.is_empty() {
862            self.condvar.wait(&mut points_lock);
863        } else {
864            debug!("buffer emptied since last check, skipping condvar wait");
865        }
866        on_notify(SeriesBufferGuard {
867            sb: points_lock,
868            count: &self.count,
869        });
870    }
871
872    fn notify(&self) -> bool {
873        self.condvar.notify_one()
874    }
875}
876
877fn batch_processor(
878    running: Arc<AtomicBool>,
879    points_buffer: Arc<SeriesBuffer>,
880    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
881    max_request_delay: Duration,
882    #[cfg(feature = "instrument")] bp_ns: Arc<AtomicU64>,
883) {
884    loop {
885        debug!("starting processor loop");
886        if points_buffer.is_empty() {
887            if !running.load(Ordering::Acquire) {
888                debug!("batch processor thread exiting due to running flag");
889                drop(request_chan);
890                break;
891            } else {
892                debug!("empty points buffer, waiting");
893                thread::park_timeout(max_request_delay);
894            }
895            continue;
896        }
897
898        #[cfg(feature = "instrument")]
899        let t = Instant::now();
900
901        let (point_count, series) = points_buffer.take();
902
903        if points_buffer.notify() {
904            debug!("notified one waiting thread after clearing points buffer");
905        }
906
907        let write_request = WriteRequestNominal {
908            series,
909            session_name: None,
910        };
911
912        if request_chan.is_full() {
913            debug!("ready to queue request but request channel is full");
914        }
915        let rep = request_chan.send((write_request, point_count));
916        debug!("queued request for processing");
917        if rep.is_err() {
918            error!("failed to send request to dispatcher");
919        } else {
920            debug!("finished submitting request");
921        }
922
923        #[cfg(feature = "instrument")]
924        bp_ns.fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
925
926        thread::park_timeout(max_request_delay);
927    }
928    debug!("batch processor thread exiting");
929}
930
931impl Drop for NominalDatasetStream {
932    fn drop(&mut self) {
933        debug!("starting drop for NominalDatasetStream");
934        self.running.store(false, Ordering::Release);
935        loop {
936            let count = self.unflushed_points.load(Ordering::Acquire);
937            if count == 0 {
938                break;
939            }
940            debug!(
941                "waiting for all points to be flushed before dropping stream, {count} points remaining",
942            );
943            // todo: reduce this + give up after some maximum timeout is reached
944            thread::sleep(Duration::from_millis(50));
945        }
946    }
947}
948
949fn request_dispatcher<C: WriteRequestConsumer + 'static>(
950    running: Arc<AtomicBool>,
951    unflushed_points: Arc<AtomicUsize>,
952    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
953    consumer: Arc<C>,
954    #[cfg(feature = "instrument")] disp_ns: Arc<AtomicU64>,
955) {
956    let mut total_request_time = 0;
957    loop {
958        match request_rx.recv() {
959            Ok((request, point_count)) => {
960                debug!("received writerequest from channel");
961                let req_start = Instant::now();
962                match consumer.consume(&request) {
963                    Ok(_) => {
964                        let time = req_start.elapsed().as_millis();
965                        debug!("request of {} points sent in {} ms", point_count, time);
966                        total_request_time += time as u64;
967                    }
968                    Err(e) => {
969                        error!("Failed to send request: {e:?}");
970                    }
971                }
972                #[cfg(feature = "instrument")]
973                disp_ns.fetch_add(req_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
974                unflushed_points.fetch_sub(point_count, Ordering::Release);
975
976                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
977                {
978                    info!("all points flushed, closing dispatcher thread");
979                    // notify the processor thread that all points have been flushed
980                    drop(request_rx);
981                    break;
982                }
983            }
984            Err(e) => {
985                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
986                break;
987            }
988        }
989    }
990    debug!(
991        "request dispatcher thread exiting. total request time: {}",
992        total_request_time
993    );
994}
995
996fn points_len(points_type: &PointsType) -> usize {
997    match points_type {
998        PointsType::DoublePoints(points) => points.points.len(),
999        PointsType::StringPoints(points) => points.points.len(),
1000        PointsType::IntegerPoints(points) => points.points.len(),
1001        PointsType::Uint64Points(points) => points.points.len(),
1002        PointsType::ArrayPoints(points) => match &points.array_type {
1003            Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
1004            Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
1005            None => 0,
1006        },
1007        PointsType::StructPoints(points) => points.points.len(),
1008    }
1009}