nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::Points;
23use nominal_api::tonic::io::nominal::scout::api::proto::Series;
24use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
25use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
26use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
27use parking_lot::Condvar;
28use parking_lot::Mutex;
29use parking_lot::MutexGuard;
30use tracing::debug;
31use tracing::error;
32use tracing::info;
33
34use crate::client::NominalApiClients;
35use crate::client::PRODUCTION_API_URL;
36use crate::consumer::AvroFileConsumer;
37use crate::consumer::DualWriteRequestConsumer;
38use crate::consumer::ListeningWriteRequestConsumer;
39use crate::consumer::NominalCoreConsumer;
40use crate::consumer::RequestConsumerWithFallback;
41use crate::consumer::WriteRequestConsumer;
42use crate::listener::LoggingListener;
43use crate::types::ChannelDescriptor;
44use crate::types::IntoPoints;
45use crate::types::IntoTimestamp;
46
47#[derive(Debug, Clone)]
48pub struct NominalStreamOpts {
49    pub max_points_per_record: usize,
50    pub max_request_delay: Duration,
51    pub max_buffered_requests: usize,
52    pub request_dispatcher_tasks: usize,
53    pub base_api_url: String,
54}
55
56impl Default for NominalStreamOpts {
57    fn default() -> Self {
58        Self {
59            max_points_per_record: 250_000,
60            max_request_delay: Duration::from_millis(100),
61            max_buffered_requests: 4,
62            request_dispatcher_tasks: 8,
63            base_api_url: PRODUCTION_API_URL.to_string(),
64        }
65    }
66}
67
68#[derive(Default)]
69pub struct NominalDatasetStreamBuilder {
70    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
71    stream_to_file: Option<PathBuf>,
72    file_fallback: Option<PathBuf>,
73    listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
74    opts: NominalStreamOpts,
75}
76
77impl Debug for NominalDatasetStreamBuilder {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("NominalDatasetStreamBuilder")
80            .field("stream_to_core", &self.stream_to_core.is_some())
81            .field("stream_to_file", &self.stream_to_file)
82            .field("file_fallback", &self.file_fallback)
83            .field("listeners", &self.listeners.len())
84            .finish()
85    }
86}
87
88impl NominalDatasetStreamBuilder {
89    pub fn new() -> Self {
90        Self::default()
91    }
92}
93
94impl NominalDatasetStreamBuilder {
95    pub fn stream_to_core(
96        self,
97        bearer_token: BearerToken,
98        dataset: ResourceIdentifier,
99        handle: tokio::runtime::Handle,
100    ) -> NominalDatasetStreamBuilder {
101        NominalDatasetStreamBuilder {
102            stream_to_core: Some((bearer_token, dataset, handle)),
103            stream_to_file: self.stream_to_file,
104            file_fallback: self.file_fallback,
105            listeners: self.listeners,
106            opts: self.opts,
107        }
108    }
109
110    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
111        self.stream_to_file = Some(file_path.into());
112        self
113    }
114
115    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
116        self.file_fallback = Some(file_path.into());
117        self
118    }
119
120    pub fn add_listener(
121        mut self,
122        listener: Arc<dyn crate::listener::NominalStreamListener>,
123    ) -> Self {
124        self.listeners.push(listener);
125        self
126    }
127
128    pub fn with_listeners(
129        mut self,
130        listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
131    ) -> Self {
132        self.listeners = listeners;
133        self
134    }
135
136    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
137        self.opts = opts;
138        self
139    }
140
141    #[cfg(feature = "logging")]
142    fn init_logging(self, directive: Option<&str>) -> Self {
143        use tracing_subscriber::layer::SubscriberExt;
144        use tracing_subscriber::util::SubscriberInitExt;
145
146        // Build the filter, either from an explicit directive or the environment.
147        let base = tracing_subscriber::EnvFilter::builder()
148            .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
149        let env_filter = match directive {
150            Some(d) => base.parse_lossy(d),
151            None => base.from_env_lossy(),
152        };
153
154        let subscriber = tracing_subscriber::registry()
155            .with(
156                tracing_subscriber::fmt::layer()
157                    .with_thread_ids(true)
158                    .with_thread_names(true)
159                    .with_line_number(true),
160            )
161            .with(env_filter);
162
163        if let Err(error) = subscriber.try_init() {
164            eprintln!("nominal streaming failed to enable logging: {error}");
165        }
166
167        self
168    }
169
170    #[cfg(feature = "logging")]
171    pub fn enable_logging(self) -> Self {
172        self.init_logging(None)
173    }
174
175    #[cfg(feature = "logging")]
176    pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
177        self.init_logging(Some(log_directive))
178    }
179
180    pub fn build(self) -> NominalDatasetStream {
181        let core_consumer = self.core_consumer();
182        let file_consumer = self.file_consumer();
183        let fallback_consumer = self.fallback_consumer();
184
185        match (core_consumer, file_consumer, fallback_consumer) {
186            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
187            (Some(_), Some(_), Some(_)) => {
188                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
189            }
190            (Some(core), None, None) => self.into_stream(core),
191            (Some(core), None, Some(fallback)) => {
192                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
193            }
194            (None, Some(file), None) => self.into_stream(file),
195            (None, Some(file), Some(fallback)) => {
196                // todo: should this even be supported?
197                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
198            }
199            (Some(core), Some(file), None) => {
200                self.into_stream(DualWriteRequestConsumer::new(core, file))
201            }
202        }
203    }
204
205    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
206        self.stream_to_core
207            .as_ref()
208            .map(|(auth_provider, dataset, handle)| {
209                NominalCoreConsumer::new(
210                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
211                    handle.clone(),
212                    auth_provider.clone(),
213                    dataset.clone(),
214                )
215            })
216    }
217
218    fn file_consumer(&self) -> Option<AvroFileConsumer> {
219        self.stream_to_file
220            .as_ref()
221            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
222    }
223
224    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
225        self.file_fallback
226            .as_ref()
227            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
228    }
229
230    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
231        let mut listeners = self.listeners;
232        listeners.push(Arc::new(LoggingListener));
233        let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
234        NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
235    }
236}
237
238// for backcompat, new code should use NominalDatasetStream
239#[deprecated]
240pub type NominalDatasourceStream = NominalDatasetStream;
241
242pub struct NominalDatasetStream {
243    opts: NominalStreamOpts,
244    running: Arc<AtomicBool>,
245    unflushed_points: Arc<AtomicUsize>,
246    primary_buffer: Arc<SeriesBuffer>,
247    secondary_buffer: Arc<SeriesBuffer>,
248    primary_handle: thread::JoinHandle<()>,
249    secondary_handle: thread::JoinHandle<()>,
250}
251
252impl NominalDatasetStream {
253    pub fn builder() -> NominalDatasetStreamBuilder {
254        NominalDatasetStreamBuilder::new()
255    }
256
257    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
258        consumer: C,
259        opts: NominalStreamOpts,
260    ) -> Self {
261        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
262        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
263
264        let (request_tx, request_rx) =
265            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
266
267        let running = Arc::new(AtomicBool::new(true));
268        let unflushed_points = Arc::new(AtomicUsize::new(0));
269
270        let primary_handle = thread::Builder::new()
271            .name("nmstream_primary".to_string())
272            .spawn({
273                let points_buffer = Arc::clone(&primary_buffer);
274                let running = running.clone();
275                let tx = request_tx.clone();
276                move || {
277                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
278                }
279            })
280            .unwrap();
281
282        let secondary_handle = thread::Builder::new()
283            .name("nmstream_secondary".to_string())
284            .spawn({
285                let secondary_buffer = Arc::clone(&secondary_buffer);
286                let running = running.clone();
287                move || {
288                    batch_processor(
289                        running,
290                        secondary_buffer,
291                        request_tx,
292                        opts.max_request_delay,
293                    );
294                }
295            })
296            .unwrap();
297
298        let consumer = Arc::new(consumer);
299
300        for i in 0..opts.request_dispatcher_tasks {
301            thread::Builder::new()
302                .name(format!("nmstream_dispatch_{i}"))
303                .spawn({
304                    let running = Arc::clone(&running);
305                    let unflushed_points = Arc::clone(&unflushed_points);
306                    let rx = request_rx.clone();
307                    let consumer = consumer.clone();
308                    move || {
309                        debug!("starting request dispatcher");
310                        request_dispatcher(running, unflushed_points, rx, consumer);
311                    }
312                })
313                .unwrap();
314        }
315
316        NominalDatasetStream {
317            opts,
318            running,
319            unflushed_points,
320            primary_buffer,
321            secondary_buffer,
322            primary_handle,
323            secondary_handle,
324        }
325    }
326
327    pub fn double_writer<'a>(
328        &'a self,
329        channel_descriptor: &'a ChannelDescriptor,
330    ) -> NominalDoubleWriter<'a> {
331        NominalDoubleWriter {
332            writer: NominalChannelWriter::new(self, channel_descriptor),
333        }
334    }
335
336    pub fn string_writer<'a>(
337        &'a self,
338        channel_descriptor: &'a ChannelDescriptor,
339    ) -> NominalStringWriter<'a> {
340        NominalStringWriter {
341            writer: NominalChannelWriter::new(self, channel_descriptor),
342        }
343    }
344
345    pub fn integer_writer<'a>(
346        &'a self,
347        channel_descriptor: &'a ChannelDescriptor,
348    ) -> NominalIntegerWriter<'a> {
349        NominalIntegerWriter {
350            writer: NominalChannelWriter::new(self, channel_descriptor),
351        }
352    }
353
354    pub fn uint64_writer<'a>(
355        &'a self,
356        channel_descriptor: &'a ChannelDescriptor,
357    ) -> NominalUint64Writer<'a> {
358        NominalUint64Writer {
359            writer: NominalChannelWriter::new(self, channel_descriptor),
360        }
361    }
362
363    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
364        let new_points = new_points.into_points();
365        let new_count = points_len(&new_points);
366
367        self.when_capacity(new_count, |mut sb| {
368            sb.extend(channel_descriptor, new_points)
369        });
370    }
371
372    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
373        self.unflushed_points
374            .fetch_add(new_count, Ordering::Release);
375
376        if self.primary_buffer.has_capacity(new_count) {
377            debug!("adding {} points to primary buffer", new_count);
378            callback(self.primary_buffer.lock());
379        } else if self.secondary_buffer.has_capacity(new_count) {
380            // primary buffer is definitely full
381            self.primary_handle.thread().unpark();
382            debug!("adding {} points to secondary buffer", new_count);
383            callback(self.secondary_buffer.lock());
384        } else {
385            let buf = if self.primary_buffer < self.secondary_buffer {
386                info!("waiting for primary buffer to flush to append {new_count} points...");
387                self.primary_handle.thread().unpark();
388                &self.primary_buffer
389            } else {
390                info!("waiting for secondary buffer to flush to append {new_count} points...");
391                self.secondary_handle.thread().unpark();
392                &self.secondary_buffer
393            };
394
395            buf.on_notify(callback);
396        }
397    }
398}
399
400pub struct NominalChannelWriter<'ds, T>
401where
402    Vec<T>: IntoPoints,
403{
404    channel: &'ds ChannelDescriptor,
405    stream: &'ds NominalDatasetStream,
406    last_flushed_at: Instant,
407    unflushed: Vec<T>,
408}
409
410impl<T> NominalChannelWriter<'_, T>
411where
412    Vec<T>: IntoPoints,
413{
414    fn new<'ds>(
415        stream: &'ds NominalDatasetStream,
416        channel: &'ds ChannelDescriptor,
417    ) -> NominalChannelWriter<'ds, T> {
418        NominalChannelWriter {
419            channel,
420            stream,
421            last_flushed_at: Instant::now(),
422            unflushed: vec![],
423        }
424    }
425
426    fn push_point(&mut self, point: T) {
427        self.unflushed.push(point);
428        if self.unflushed.len() >= self.stream.opts.max_points_per_record
429            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
430        {
431            debug!(
432                "conditionally flushing {:?}, ({} points, {:?} since last)",
433                self.channel,
434                self.unflushed.len(),
435                self.last_flushed_at.elapsed()
436            );
437            self.flush();
438        }
439    }
440
441    fn flush(&mut self) {
442        if self.unflushed.is_empty() {
443            return;
444        }
445        info!(
446            "flushing writer for {:?} with {} points",
447            self.channel,
448            self.unflushed.len()
449        );
450        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
451            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
452            buf.extend(self.channel, to_flush);
453            self.last_flushed_at = Instant::now();
454        })
455    }
456}
457
458impl<T> Drop for NominalChannelWriter<'_, T>
459where
460    Vec<T>: IntoPoints,
461{
462    fn drop(&mut self) {
463        info!("flushing then dropping writer for: {:?}", self.channel);
464        self.flush();
465    }
466}
467
468pub struct NominalDoubleWriter<'ds> {
469    writer: NominalChannelWriter<'ds, DoublePoint>,
470}
471
472impl NominalDoubleWriter<'_> {
473    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
474        self.writer.push_point(DoublePoint {
475            timestamp: Some(timestamp.into_timestamp()),
476            value,
477        });
478    }
479}
480
481pub struct NominalIntegerWriter<'ds> {
482    writer: NominalChannelWriter<'ds, IntegerPoint>,
483}
484
485impl NominalIntegerWriter<'_> {
486    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
487        self.writer.push_point(IntegerPoint {
488            timestamp: Some(timestamp.into_timestamp()),
489            value,
490        });
491    }
492}
493
494pub struct NominalUint64Writer<'ds> {
495    writer: NominalChannelWriter<'ds, Uint64Point>,
496}
497
498impl NominalUint64Writer<'_> {
499    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
500        self.writer.push_point(Uint64Point {
501            timestamp: Some(timestamp.into_timestamp()),
502            value,
503        });
504    }
505}
506
507pub struct NominalStringWriter<'ds> {
508    writer: NominalChannelWriter<'ds, StringPoint>,
509}
510
511impl NominalStringWriter<'_> {
512    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
513        self.writer.push_point(StringPoint {
514            timestamp: Some(timestamp.into_timestamp()),
515            value: value.into(),
516        });
517    }
518}
519
520struct SeriesBuffer {
521    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
522    /// The total number of data points in the buffer.
523    ///
524    /// To ensure that `count` stays in sync with the contents of the `HashMap`,
525    /// only update `count` through the `SeriesBufferGuard`.
526    count: AtomicUsize,
527    flush_time: AtomicU64,
528    condvar: Condvar,
529    max_capacity: usize,
530}
531
532struct SeriesBufferGuard<'sb> {
533    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
534    count: &'sb AtomicUsize,
535}
536
537impl SeriesBufferGuard<'_> {
538    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
539        let points = points.into_points();
540        let new_point_count = points_len(&points);
541
542        if !self.sb.contains_key(channel_descriptor) {
543            self.sb.insert(channel_descriptor.clone(), points);
544        } else {
545            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
546                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
547                    existing.points.extend(new.points)
548                }
549                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
550                    existing.points.extend(new.points)
551                }
552                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
553                    existing.points.extend(new.points)
554                }
555                (
556                    PointsType::ArrayPoints(ArrayPoints {
557                        array_type: Some(ArrayType::DoubleArrayPoints(existing)),
558                    }),
559                    PointsType::ArrayPoints(ArrayPoints {
560                        array_type: Some(ArrayType::DoubleArrayPoints(new)),
561                    }),
562                ) => existing.points.extend(new.points),
563                (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
564                    existing.points.extend(new.points)
565                }
566                (
567                    PointsType::ArrayPoints(ArrayPoints {
568                        array_type: Some(ArrayType::StringArrayPoints(existing)),
569                    }),
570                    PointsType::ArrayPoints(ArrayPoints {
571                        array_type: Some(ArrayType::StringArrayPoints(new)),
572                    }),
573                ) => existing.points.extend(new.points),
574                (
575                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
576                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
577                ) => {}
578                (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
579                    existing.points.extend(new.points);
580                }
581                // this is hideous, but exhaustive matching is good to avoid future errors
582                (
583                    PointsType::DoublePoints(_),
584                    PointsType::IntegerPoints(_)
585                    | PointsType::Uint64Points(_)
586                    | PointsType::StringPoints(_)
587                    | PointsType::ArrayPoints(_)
588                    | PointsType::StructPoints(_),
589                )
590                | (
591                    PointsType::StringPoints(_),
592                    PointsType::DoublePoints(_)
593                    | PointsType::IntegerPoints(_)
594                    | PointsType::Uint64Points(_)
595                    | PointsType::ArrayPoints(_)
596                    | PointsType::StructPoints(_),
597                )
598                | (
599                    PointsType::IntegerPoints(_),
600                    PointsType::DoublePoints(_)
601                    | PointsType::Uint64Points(_)
602                    | PointsType::StringPoints(_)
603                    | PointsType::ArrayPoints(_)
604                    | PointsType::StructPoints(_),
605                )
606                | (
607                    PointsType::ArrayPoints(_),
608                    PointsType::DoublePoints(_)
609                    | PointsType::Uint64Points(_)
610                    | PointsType::StringPoints(_)
611                    | PointsType::IntegerPoints(_)
612                    | PointsType::StructPoints(_),
613                )
614                | (
615                    PointsType::ArrayPoints(ArrayPoints {
616                        array_type: Some(_),
617                    }),
618                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
619                )
620                | (
621                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
622                    PointsType::ArrayPoints(ArrayPoints {
623                        array_type: Some(_),
624                    }),
625                )
626                | (
627                    PointsType::ArrayPoints(ArrayPoints {
628                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
629                    }),
630                    PointsType::ArrayPoints(ArrayPoints {
631                        array_type: Some(ArrayType::StringArrayPoints(_)),
632                    }),
633                )
634                | (
635                    PointsType::ArrayPoints(ArrayPoints {
636                        array_type: Some(ArrayType::StringArrayPoints(_)),
637                    }),
638                    PointsType::ArrayPoints(ArrayPoints {
639                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
640                    }),
641                )
642                | (
643                    PointsType::Uint64Points(_),
644                    PointsType::IntegerPoints(_)
645                    | PointsType::StringPoints(_)
646                    | PointsType::DoublePoints(_)
647                    | PointsType::ArrayPoints(_)
648                    | PointsType::StructPoints(_),
649                )
650                | (
651                    PointsType::StructPoints(_),
652                    PointsType::DoublePoints(_)
653                    | PointsType::Uint64Points(_)
654                    | PointsType::StringPoints(_)
655                    | PointsType::IntegerPoints(_)
656                    | PointsType::ArrayPoints(_),
657                ) => {
658                    // todo: improve error
659                    panic!("mismatched types");
660                }
661            }
662        }
663
664        self.count.fetch_add(new_point_count, Ordering::Release);
665    }
666}
667
668impl PartialEq for SeriesBuffer {
669    fn eq(&self, other: &Self) -> bool {
670        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
671    }
672}
673
674impl PartialOrd for SeriesBuffer {
675    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
676        let flush_time = self.flush_time.load(Ordering::Acquire);
677        let other_flush_time = other.flush_time.load(Ordering::Acquire);
678        flush_time.partial_cmp(&other_flush_time)
679    }
680}
681
682impl SeriesBuffer {
683    fn new(capacity: usize) -> Self {
684        Self {
685            points: Mutex::new(HashMap::new()),
686            count: AtomicUsize::new(0),
687            flush_time: AtomicU64::new(0),
688            condvar: Condvar::new(),
689            max_capacity: capacity,
690        }
691    }
692
693    /// Checks if the buffer has enough capacity to add new points.
694    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
695    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
696    /// to handle splitting batches of points across multiple requests.
697    fn has_capacity(&self, new_points_count: usize) -> bool {
698        let count = self.count.load(Ordering::Acquire);
699        count == 0 || count + new_points_count <= self.max_capacity
700    }
701
702    fn lock(&self) -> SeriesBufferGuard<'_> {
703        SeriesBufferGuard {
704            sb: self.points.lock(),
705            count: &self.count,
706        }
707    }
708
709    fn take(&self) -> (usize, Vec<Series>) {
710        let mut points = self.lock();
711        self.flush_time.store(
712            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
713            Ordering::Release,
714        );
715        let result = points
716            .sb
717            .drain()
718            .map(|(ChannelDescriptor { name, tags }, points)| {
719                let channel = Channel { name };
720                let points_obj = Points {
721                    points_type: Some(points),
722                };
723                Series {
724                    channel: Some(channel),
725                    tags: tags
726                        .map(|tags| tags.into_iter().collect())
727                        .unwrap_or_default(),
728                    points: Some(points_obj),
729                }
730            })
731            .collect();
732        let result_count = points
733            .count
734            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
735            .unwrap();
736        (result_count, result)
737    }
738
739    fn is_empty(&self) -> bool {
740        self.count() == 0
741    }
742
743    fn count(&self) -> usize {
744        self.count.load(Ordering::Acquire)
745    }
746
747    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
748        let mut points_lock = self.points.lock();
749        // concurrency bug without this - the buffer could have been emptied since we
750        // checked the count, so this will wait forever & block any new points from entering
751        if !points_lock.is_empty() {
752            self.condvar.wait(&mut points_lock);
753        } else {
754            debug!("buffer emptied since last check, skipping condvar wait");
755        }
756        on_notify(SeriesBufferGuard {
757            sb: points_lock,
758            count: &self.count,
759        });
760    }
761
762    fn notify(&self) -> bool {
763        self.condvar.notify_one()
764    }
765}
766
767fn batch_processor(
768    running: Arc<AtomicBool>,
769    points_buffer: Arc<SeriesBuffer>,
770    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
771    max_request_delay: Duration,
772) {
773    loop {
774        debug!("starting processor loop");
775        if points_buffer.is_empty() {
776            if !running.load(Ordering::Acquire) {
777                debug!("batch processor thread exiting due to running flag");
778                drop(request_chan);
779                break;
780            } else {
781                debug!("empty points buffer, waiting");
782                thread::park_timeout(max_request_delay);
783            }
784            continue;
785        }
786        let (point_count, series) = points_buffer.take();
787
788        if points_buffer.notify() {
789            debug!("notified one waiting thread after clearing points buffer");
790        }
791
792        let write_request = WriteRequestNominal { series };
793
794        if request_chan.is_full() {
795            debug!("ready to queue request but request channel is full");
796        }
797        let rep = request_chan.send((write_request, point_count));
798        debug!("queued request for processing");
799        if rep.is_err() {
800            error!("failed to send request to dispatcher");
801        } else {
802            debug!("finished submitting request");
803        }
804
805        thread::park_timeout(max_request_delay);
806    }
807    debug!("batch processor thread exiting");
808}
809
810impl Drop for NominalDatasetStream {
811    fn drop(&mut self) {
812        debug!("starting drop for NominalDatasetStream");
813        self.running.store(false, Ordering::Release);
814        loop {
815            let count = self.unflushed_points.load(Ordering::Acquire);
816            if count == 0 {
817                break;
818            }
819            debug!(
820                "waiting for all points to be flushed before dropping stream, {count} points remaining",
821            );
822            // todo: reduce this + give up after some maximum timeout is reached
823            thread::sleep(Duration::from_millis(50));
824        }
825    }
826}
827
828fn request_dispatcher<C: WriteRequestConsumer + 'static>(
829    running: Arc<AtomicBool>,
830    unflushed_points: Arc<AtomicUsize>,
831    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
832    consumer: Arc<C>,
833) {
834    let mut total_request_time = 0;
835    loop {
836        match request_rx.recv() {
837            Ok((request, point_count)) => {
838                debug!("received writerequest from channel");
839                let req_start = Instant::now();
840                match consumer.consume(&request) {
841                    Ok(_) => {
842                        let time = req_start.elapsed().as_millis();
843                        debug!("request of {} points sent in {} ms", point_count, time);
844                        total_request_time += time as u64;
845                    }
846                    Err(e) => {
847                        error!("Failed to send request: {e:?}");
848                    }
849                }
850                unflushed_points.fetch_sub(point_count, Ordering::Release);
851
852                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
853                {
854                    info!("all points flushed, closing dispatcher thread");
855                    // notify the processor thread that all points have been flushed
856                    drop(request_rx);
857                    break;
858                }
859            }
860            Err(e) => {
861                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
862                break;
863            }
864        }
865    }
866    debug!(
867        "request dispatcher thread exiting. total request time: {}",
868        total_request_time
869    );
870}
871
872fn points_len(points_type: &PointsType) -> usize {
873    match points_type {
874        PointsType::DoublePoints(points) => points.points.len(),
875        PointsType::StringPoints(points) => points.points.len(),
876        PointsType::IntegerPoints(points) => points.points.len(),
877        PointsType::Uint64Points(points) => points.points.len(),
878        PointsType::ArrayPoints(points) => match &points.array_type {
879            Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
880            Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
881            None => 0,
882        },
883        PointsType::StructPoints(points) => points.points.len(),
884    }
885}