nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::NominalApiClients;
32use crate::client::PRODUCTION_API_URL;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::listener::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46    pub max_points_per_record: usize,
47    pub max_request_delay: Duration,
48    pub max_buffered_requests: usize,
49    pub request_dispatcher_tasks: usize,
50    pub base_api_url: String,
51}
52
53impl Default for NominalStreamOpts {
54    fn default() -> Self {
55        Self {
56            max_points_per_record: 250_000,
57            max_request_delay: Duration::from_millis(100),
58            max_buffered_requests: 4,
59            request_dispatcher_tasks: 8,
60            base_api_url: PRODUCTION_API_URL.to_string(),
61        }
62    }
63}
64
65pub struct NominalDatasetStreamBuilder {
66    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
67    stream_to_file: Option<PathBuf>,
68    file_fallback: Option<PathBuf>,
69    listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
70    opts: NominalStreamOpts,
71}
72
73impl Debug for NominalDatasetStreamBuilder {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("NominalDatasetStreamBuilder")
76            .field("stream_to_core", &self.stream_to_core.is_some())
77            .field("stream_to_file", &self.stream_to_file)
78            .field("file_fallback", &self.file_fallback)
79            .field("listeners", &self.listeners.len())
80            .finish()
81    }
82}
83
84impl Default for NominalDatasetStreamBuilder {
85    fn default() -> Self {
86        Self {
87            stream_to_core: None,
88            stream_to_file: None,
89            file_fallback: None,
90            listeners: Vec::new(),
91            opts: NominalStreamOpts::default(),
92        }
93    }
94}
95
96impl NominalDatasetStreamBuilder {
97    pub fn new() -> Self {
98        Self::default()
99    }
100}
101
102impl NominalDatasetStreamBuilder {
103    pub fn stream_to_core(
104        self,
105        bearer_token: BearerToken,
106        dataset: ResourceIdentifier,
107        handle: tokio::runtime::Handle,
108    ) -> NominalDatasetStreamBuilder {
109        NominalDatasetStreamBuilder {
110            stream_to_core: Some((bearer_token, dataset, handle)),
111            stream_to_file: self.stream_to_file,
112            file_fallback: self.file_fallback,
113            listeners: self.listeners,
114            opts: self.opts,
115        }
116    }
117
118    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
119        self.stream_to_file = Some(file_path.into());
120        self
121    }
122
123    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
124        self.file_fallback = Some(file_path.into());
125        self
126    }
127
128    pub fn add_listener(
129        mut self,
130        listener: Arc<dyn crate::listener::NominalStreamListener>,
131    ) -> Self {
132        self.listeners.push(listener);
133        self
134    }
135
136    pub fn with_listeners(
137        mut self,
138        listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
139    ) -> Self {
140        self.listeners = listeners;
141        self
142    }
143
144    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
145        self.opts = opts;
146        self
147    }
148
149    #[cfg(feature = "logging")]
150    fn init_logging(self, directive: Option<&str>) -> Self {
151        use tracing_subscriber::layer::SubscriberExt;
152        use tracing_subscriber::util::SubscriberInitExt;
153
154        // Build the filter, either from an explicit directive or the environment.
155        let base = tracing_subscriber::EnvFilter::builder()
156            .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
157        let env_filter = match directive {
158            Some(d) => base.parse_lossy(d),
159            None => base.from_env_lossy(),
160        };
161
162        let subscriber = tracing_subscriber::registry()
163            .with(
164                tracing_subscriber::fmt::layer()
165                    .with_thread_ids(true)
166                    .with_thread_names(true)
167                    .with_line_number(true),
168            )
169            .with(env_filter);
170
171        if let Err(error) = subscriber.try_init() {
172            eprintln!("nominal streaming failed to enable logging: {error}");
173        }
174
175        self
176    }
177
178    #[cfg(feature = "logging")]
179    pub fn enable_logging(self) -> Self {
180        self.init_logging(None)
181    }
182
183    #[cfg(feature = "logging")]
184    pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
185        self.init_logging(Some(log_directive))
186    }
187
188    pub fn build(self) -> NominalDatasetStream {
189        let core_consumer = self.core_consumer();
190        let file_consumer = self.file_consumer();
191        let fallback_consumer = self.fallback_consumer();
192
193        match (core_consumer, file_consumer, fallback_consumer) {
194            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
195            (Some(_), Some(_), Some(_)) => {
196                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
197            }
198            (Some(core), None, None) => self.into_stream(core),
199            (Some(core), None, Some(fallback)) => {
200                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
201            }
202            (None, Some(file), None) => self.into_stream(file),
203            (None, Some(file), Some(fallback)) => {
204                // todo: should this even be supported?
205                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
206            }
207            (Some(core), Some(file), None) => {
208                self.into_stream(DualWriteRequestConsumer::new(core, file))
209            }
210        }
211    }
212
213    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
214        self.stream_to_core
215            .as_ref()
216            .map(|(auth_provider, dataset, handle)| {
217                NominalCoreConsumer::new(
218                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
219                    handle.clone(),
220                    auth_provider.clone(),
221                    dataset.clone(),
222                )
223            })
224    }
225
226    fn file_consumer(&self) -> Option<AvroFileConsumer> {
227        self.stream_to_file
228            .as_ref()
229            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
230    }
231
232    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
233        self.file_fallback
234            .as_ref()
235            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
236    }
237
238    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
239        let mut listeners = self.listeners;
240        listeners.push(Arc::new(LoggingListener));
241        let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
242        NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
243    }
244}
245
246// for backcompat, new code should use NominalDatasetStream
247#[deprecated]
248pub type NominalDatasourceStream = NominalDatasetStream;
249
250pub struct NominalDatasetStream {
251    opts: NominalStreamOpts,
252    running: Arc<AtomicBool>,
253    unflushed_points: Arc<AtomicUsize>,
254    primary_buffer: Arc<SeriesBuffer>,
255    secondary_buffer: Arc<SeriesBuffer>,
256    primary_handle: thread::JoinHandle<()>,
257    secondary_handle: thread::JoinHandle<()>,
258}
259
260impl NominalDatasetStream {
261    pub fn builder() -> NominalDatasetStreamBuilder {
262        NominalDatasetStreamBuilder::new()
263    }
264
265    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
266        consumer: C,
267        opts: NominalStreamOpts,
268    ) -> Self {
269        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
270        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
271
272        let (request_tx, request_rx) =
273            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
274
275        let running = Arc::new(AtomicBool::new(true));
276        let unflushed_points = Arc::new(AtomicUsize::new(0));
277
278        let primary_handle = thread::Builder::new()
279            .name("nmstream_primary".to_string())
280            .spawn({
281                let points_buffer = Arc::clone(&primary_buffer);
282                let running = running.clone();
283                let tx = request_tx.clone();
284                move || {
285                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
286                }
287            })
288            .unwrap();
289
290        let secondary_handle = thread::Builder::new()
291            .name("nmstream_secondary".to_string())
292            .spawn({
293                let secondary_buffer = Arc::clone(&secondary_buffer);
294                let running = running.clone();
295                move || {
296                    batch_processor(
297                        running,
298                        secondary_buffer,
299                        request_tx,
300                        opts.max_request_delay,
301                    );
302                }
303            })
304            .unwrap();
305
306        let consumer = Arc::new(consumer);
307
308        for i in 0..opts.request_dispatcher_tasks {
309            thread::Builder::new()
310                .name(format!("nmstream_dispatch_{i}"))
311                .spawn({
312                    let running = Arc::clone(&running);
313                    let unflushed_points = Arc::clone(&unflushed_points);
314                    let rx = request_rx.clone();
315                    let consumer = consumer.clone();
316                    move || {
317                        debug!("starting request dispatcher");
318                        request_dispatcher(running, unflushed_points, rx, consumer);
319                    }
320                })
321                .unwrap();
322        }
323
324        NominalDatasetStream {
325            opts,
326            running,
327            unflushed_points,
328            primary_buffer,
329            secondary_buffer,
330            primary_handle,
331            secondary_handle,
332        }
333    }
334
335    pub fn double_writer<'a>(
336        &'a self,
337        channel_descriptor: &'a ChannelDescriptor,
338    ) -> NominalDoubleWriter<'a> {
339        NominalDoubleWriter {
340            writer: NominalChannelWriter::new(self, channel_descriptor),
341        }
342    }
343
344    pub fn string_writer<'a>(
345        &'a self,
346        channel_descriptor: &'a ChannelDescriptor,
347    ) -> NominalStringWriter<'a> {
348        NominalStringWriter {
349            writer: NominalChannelWriter::new(self, channel_descriptor),
350        }
351    }
352
353    pub fn integer_writer<'a>(
354        &'a self,
355        channel_descriptor: &'a ChannelDescriptor,
356    ) -> NominalIntegerWriter<'a> {
357        NominalIntegerWriter {
358            writer: NominalChannelWriter::new(self, channel_descriptor),
359        }
360    }
361
362    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
363        let new_points = new_points.into_points();
364        let new_count = points_len(&new_points);
365
366        self.when_capacity(new_count, |mut sb| {
367            sb.extend(channel_descriptor, new_points)
368        });
369    }
370
371    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
372        self.unflushed_points
373            .fetch_add(new_count, Ordering::Release);
374
375        if self.primary_buffer.has_capacity(new_count) {
376            debug!("adding {} points to primary buffer", new_count);
377            callback(self.primary_buffer.lock());
378        } else if self.secondary_buffer.has_capacity(new_count) {
379            // primary buffer is definitely full
380            self.primary_handle.thread().unpark();
381            debug!("adding {} points to secondary buffer", new_count);
382            callback(self.secondary_buffer.lock());
383        } else {
384            let buf = if self.primary_buffer < self.secondary_buffer {
385                info!("waiting for primary buffer to flush to append {new_count} points...");
386                self.primary_handle.thread().unpark();
387                &self.primary_buffer
388            } else {
389                info!("waiting for secondary buffer to flush to append {new_count} points...");
390                self.secondary_handle.thread().unpark();
391                &self.secondary_buffer
392            };
393
394            buf.on_notify(callback);
395        }
396    }
397}
398
399pub struct NominalChannelWriter<'ds, T>
400where
401    Vec<T>: IntoPoints,
402{
403    channel: &'ds ChannelDescriptor,
404    stream: &'ds NominalDatasetStream,
405    last_flushed_at: Instant,
406    unflushed: Vec<T>,
407}
408
409impl<T> NominalChannelWriter<'_, T>
410where
411    Vec<T>: IntoPoints,
412{
413    fn new<'ds>(
414        stream: &'ds NominalDatasetStream,
415        channel: &'ds ChannelDescriptor,
416    ) -> NominalChannelWriter<'ds, T> {
417        NominalChannelWriter {
418            channel,
419            stream,
420            last_flushed_at: Instant::now(),
421            unflushed: vec![],
422        }
423    }
424
425    fn push_point(&mut self, point: T) {
426        self.unflushed.push(point);
427        if self.unflushed.len() >= self.stream.opts.max_points_per_record
428            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
429        {
430            debug!(
431                "conditionally flushing {:?}, ({} points, {:?} since last)",
432                self.channel,
433                self.unflushed.len(),
434                self.last_flushed_at.elapsed()
435            );
436            self.flush();
437        }
438    }
439
440    fn flush(&mut self) {
441        if self.unflushed.is_empty() {
442            return;
443        }
444        info!(
445            "flushing writer for {:?} with {} points",
446            self.channel,
447            self.unflushed.len()
448        );
449        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
450            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
451            buf.extend(self.channel, to_flush);
452            self.last_flushed_at = Instant::now();
453        })
454    }
455}
456
457impl<T> Drop for NominalChannelWriter<'_, T>
458where
459    Vec<T>: IntoPoints,
460{
461    fn drop(&mut self) {
462        info!("flushing then dropping writer for: {:?}", self.channel);
463        self.flush();
464    }
465}
466
467pub struct NominalDoubleWriter<'ds> {
468    writer: NominalChannelWriter<'ds, DoublePoint>,
469}
470
471impl NominalDoubleWriter<'_> {
472    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
473        self.writer.push_point(DoublePoint {
474            timestamp: Some(timestamp.into_timestamp()),
475            value,
476        });
477    }
478}
479
480pub struct NominalIntegerWriter<'ds> {
481    writer: NominalChannelWriter<'ds, IntegerPoint>,
482}
483
484impl NominalIntegerWriter<'_> {
485    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
486        self.writer.push_point(IntegerPoint {
487            timestamp: Some(timestamp.into_timestamp()),
488            value,
489        });
490    }
491}
492
493pub struct NominalStringWriter<'ds> {
494    writer: NominalChannelWriter<'ds, StringPoint>,
495}
496
497impl NominalStringWriter<'_> {
498    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
499        self.writer.push_point(StringPoint {
500            timestamp: Some(timestamp.into_timestamp()),
501            value: value.into(),
502        });
503    }
504}
505
506struct SeriesBuffer {
507    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
508    count: AtomicUsize,
509    flush_time: AtomicU64,
510    condvar: Condvar,
511    max_capacity: usize,
512}
513
514struct SeriesBufferGuard<'sb> {
515    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
516    count: &'sb AtomicUsize,
517}
518
519impl SeriesBufferGuard<'_> {
520    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
521        let points = points.into_points();
522        let new_point_count = points_len(&points);
523
524        if !self.sb.contains_key(channel_descriptor) {
525            self.sb.insert(channel_descriptor.clone(), points);
526        } else {
527            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
528                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
529                    existing.points.extend(new.points)
530                }
531                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
532                    existing.points.extend(new.points)
533                }
534                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
535                    existing.points.extend(new.points)
536                }
537                (_, _) => {
538                    // todo: improve error
539                    panic!("mismatched types");
540                }
541            }
542        }
543
544        self.count.fetch_add(new_point_count, Ordering::Release);
545    }
546}
547
548impl PartialEq for SeriesBuffer {
549    fn eq(&self, other: &Self) -> bool {
550        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
551    }
552}
553
554impl PartialOrd for SeriesBuffer {
555    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
556        let flush_time = self.flush_time.load(Ordering::Acquire);
557        let other_flush_time = other.flush_time.load(Ordering::Acquire);
558        flush_time.partial_cmp(&other_flush_time)
559    }
560}
561
562impl SeriesBuffer {
563    fn new(capacity: usize) -> Self {
564        Self {
565            points: Mutex::new(HashMap::new()),
566            count: AtomicUsize::new(0),
567            flush_time: AtomicU64::new(0),
568            condvar: Condvar::new(),
569            max_capacity: capacity,
570        }
571    }
572
573    /// Checks if the buffer has enough capacity to add new points.
574    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
575    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
576    /// to handle splitting batches of points across multiple requests.
577    fn has_capacity(&self, new_points_count: usize) -> bool {
578        let count = self.count.load(Ordering::Acquire);
579        count == 0 || count + new_points_count <= self.max_capacity
580    }
581
582    fn lock(&self) -> SeriesBufferGuard<'_> {
583        SeriesBufferGuard {
584            sb: self.points.lock(),
585            count: &self.count,
586        }
587    }
588
589    fn take(&self) -> (usize, Vec<Series>) {
590        let mut points = self.lock();
591        self.flush_time.store(
592            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
593            Ordering::Release,
594        );
595        let result = points
596            .sb
597            .drain()
598            .map(|(ChannelDescriptor { name, tags }, points)| {
599                let channel = Channel { name };
600                let points_obj = Points {
601                    points_type: Some(points),
602                };
603                Series {
604                    channel: Some(channel),
605                    tags: tags
606                        .map(|tags| tags.into_iter().collect())
607                        .unwrap_or_default(),
608                    points: Some(points_obj),
609                }
610            })
611            .collect();
612        let result_count = self
613            .count
614            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
615            .unwrap();
616        (result_count, result)
617    }
618
619    fn is_empty(&self) -> bool {
620        self.count() == 0
621    }
622
623    fn count(&self) -> usize {
624        self.count.load(Ordering::Acquire)
625    }
626
627    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
628        let mut points_lock = self.points.lock();
629        // concurrency bug without this - the buffer could have been emptied since we
630        // checked the count, so this will wait forever & block any new points from entering
631        if !points_lock.is_empty() {
632            self.condvar.wait(&mut points_lock);
633        } else {
634            debug!("buffer emptied since last check, skipping condvar wait");
635        }
636        on_notify(SeriesBufferGuard {
637            sb: points_lock,
638            count: &self.count,
639        });
640    }
641
642    fn notify(&self) -> bool {
643        self.condvar.notify_one()
644    }
645}
646
647fn batch_processor(
648    running: Arc<AtomicBool>,
649    points_buffer: Arc<SeriesBuffer>,
650    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
651    max_request_delay: Duration,
652) {
653    loop {
654        debug!("starting processor loop");
655        if points_buffer.is_empty() {
656            if !running.load(Ordering::Acquire) {
657                debug!("batch processor thread exiting due to running flag");
658                drop(request_chan);
659                break;
660            } else {
661                debug!("empty points buffer, waiting");
662                thread::park_timeout(max_request_delay);
663            }
664            continue;
665        }
666        let (point_count, series) = points_buffer.take();
667
668        if points_buffer.notify() {
669            debug!("notified one waiting thread after clearing points buffer");
670        }
671
672        let write_request = WriteRequestNominal { series };
673
674        if request_chan.is_full() {
675            debug!("ready to queue request but request channel is full");
676        }
677        let rep = request_chan.send((write_request, point_count));
678        debug!("queued request for processing");
679        if rep.is_err() {
680            error!("failed to send request to dispatcher");
681        } else {
682            debug!("finished submitting request");
683        }
684
685        thread::park_timeout(max_request_delay);
686    }
687    debug!("batch processor thread exiting");
688}
689
690impl Drop for NominalDatasetStream {
691    fn drop(&mut self) {
692        debug!("starting drop for NominalDatasetStream");
693        self.running.store(false, Ordering::Release);
694        while self.unflushed_points.load(Ordering::Acquire) > 0 {
695            debug!(
696                "waiting for all points to be flushed before dropping stream, {} points remaining",
697                self.unflushed_points.load(Ordering::Acquire)
698            );
699            // todo: reduce this + give up after some maximum timeout is reached
700            thread::sleep(Duration::from_millis(50));
701        }
702    }
703}
704
705fn request_dispatcher<C: WriteRequestConsumer + 'static>(
706    running: Arc<AtomicBool>,
707    unflushed_points: Arc<AtomicUsize>,
708    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
709    consumer: Arc<C>,
710) {
711    let mut total_request_time = 0;
712    loop {
713        match request_rx.recv() {
714            Ok((request, point_count)) => {
715                debug!("received writerequest from channel");
716                let req_start = Instant::now();
717                match consumer.consume(&request) {
718                    Ok(_) => {
719                        let time = req_start.elapsed().as_millis();
720                        debug!("request of {} points sent in {} ms", point_count, time);
721                        total_request_time += time as u64;
722                    }
723                    Err(e) => {
724                        error!("Failed to send request: {e:?}");
725                    }
726                }
727                unflushed_points.fetch_sub(point_count, Ordering::Release);
728
729                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
730                {
731                    info!("all points flushed, closing dispatcher thread");
732                    // notify the processor thread that all points have been flushed
733                    drop(request_rx);
734                    break;
735                }
736            }
737            Err(e) => {
738                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
739                break;
740            }
741        }
742    }
743    debug!(
744        "request dispatcher thread exiting. total request time: {}",
745        total_request_time
746    );
747}
748
749fn points_len(points_type: &PointsType) -> usize {
750    match points_type {
751        PointsType::DoublePoints(points) => points.points.len(),
752        PointsType::StringPoints(points) => points.points.len(),
753        PointsType::IntegerPoints(points) => points.points.len(),
754    }
755}