nominal_streaming/
stream.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::Points;
23use nominal_api::tonic::io::nominal::scout::api::proto::Series;
24use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
25use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
26use parking_lot::Condvar;
27use parking_lot::Mutex;
28use parking_lot::MutexGuard;
29use tracing::debug;
30use tracing::error;
31use tracing::info;
32
33use crate::client::NominalApiClients;
34use crate::client::PRODUCTION_API_URL;
35use crate::consumer::AvroFileConsumer;
36use crate::consumer::DualWriteRequestConsumer;
37use crate::consumer::ListeningWriteRequestConsumer;
38use crate::consumer::NominalCoreConsumer;
39use crate::consumer::RequestConsumerWithFallback;
40use crate::consumer::WriteRequestConsumer;
41use crate::listener::LoggingListener;
42use crate::types::ChannelDescriptor;
43use crate::types::IntoPoints;
44use crate::types::IntoTimestamp;
45
46#[derive(Debug, Clone)]
47pub struct NominalStreamOpts {
48    pub max_points_per_record: usize,
49    pub max_request_delay: Duration,
50    pub max_buffered_requests: usize,
51    pub request_dispatcher_tasks: usize,
52    pub base_api_url: String,
53}
54
55impl Default for NominalStreamOpts {
56    fn default() -> Self {
57        Self {
58            max_points_per_record: 250_000,
59            max_request_delay: Duration::from_millis(100),
60            max_buffered_requests: 4,
61            request_dispatcher_tasks: 8,
62            base_api_url: PRODUCTION_API_URL.to_string(),
63        }
64    }
65}
66
67#[derive(Default)]
68pub struct NominalDatasetStreamBuilder {
69    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
70    stream_to_file: Option<PathBuf>,
71    file_fallback: Option<PathBuf>,
72    listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
73    opts: NominalStreamOpts,
74}
75
76impl Debug for NominalDatasetStreamBuilder {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("NominalDatasetStreamBuilder")
79            .field("stream_to_core", &self.stream_to_core.is_some())
80            .field("stream_to_file", &self.stream_to_file)
81            .field("file_fallback", &self.file_fallback)
82            .field("listeners", &self.listeners.len())
83            .finish()
84    }
85}
86
87impl NominalDatasetStreamBuilder {
88    pub fn new() -> Self {
89        Self::default()
90    }
91}
92
93impl NominalDatasetStreamBuilder {
94    pub fn stream_to_core(
95        self,
96        bearer_token: BearerToken,
97        dataset: ResourceIdentifier,
98        handle: tokio::runtime::Handle,
99    ) -> NominalDatasetStreamBuilder {
100        NominalDatasetStreamBuilder {
101            stream_to_core: Some((bearer_token, dataset, handle)),
102            stream_to_file: self.stream_to_file,
103            file_fallback: self.file_fallback,
104            listeners: self.listeners,
105            opts: self.opts,
106        }
107    }
108
109    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
110        self.stream_to_file = Some(file_path.into());
111        self
112    }
113
114    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
115        self.file_fallback = Some(file_path.into());
116        self
117    }
118
119    pub fn add_listener(
120        mut self,
121        listener: Arc<dyn crate::listener::NominalStreamListener>,
122    ) -> Self {
123        self.listeners.push(listener);
124        self
125    }
126
127    pub fn with_listeners(
128        mut self,
129        listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
130    ) -> Self {
131        self.listeners = listeners;
132        self
133    }
134
135    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
136        self.opts = opts;
137        self
138    }
139
140    #[cfg(feature = "logging")]
141    fn init_logging(self, directive: Option<&str>) -> Self {
142        use tracing_subscriber::layer::SubscriberExt;
143        use tracing_subscriber::util::SubscriberInitExt;
144
145        // Build the filter, either from an explicit directive or the environment.
146        let base = tracing_subscriber::EnvFilter::builder()
147            .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
148        let env_filter = match directive {
149            Some(d) => base.parse_lossy(d),
150            None => base.from_env_lossy(),
151        };
152
153        let subscriber = tracing_subscriber::registry()
154            .with(
155                tracing_subscriber::fmt::layer()
156                    .with_thread_ids(true)
157                    .with_thread_names(true)
158                    .with_line_number(true),
159            )
160            .with(env_filter);
161
162        if let Err(error) = subscriber.try_init() {
163            eprintln!("nominal streaming failed to enable logging: {error}");
164        }
165
166        self
167    }
168
169    #[cfg(feature = "logging")]
170    pub fn enable_logging(self) -> Self {
171        self.init_logging(None)
172    }
173
174    #[cfg(feature = "logging")]
175    pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
176        self.init_logging(Some(log_directive))
177    }
178
179    pub fn build(self) -> NominalDatasetStream {
180        let core_consumer = self.core_consumer();
181        let file_consumer = self.file_consumer();
182        let fallback_consumer = self.fallback_consumer();
183
184        match (core_consumer, file_consumer, fallback_consumer) {
185            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
186            (Some(_), Some(_), Some(_)) => {
187                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
188            }
189            (Some(core), None, None) => self.into_stream(core),
190            (Some(core), None, Some(fallback)) => {
191                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
192            }
193            (None, Some(file), None) => self.into_stream(file),
194            (None, Some(file), Some(fallback)) => {
195                // todo: should this even be supported?
196                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
197            }
198            (Some(core), Some(file), None) => {
199                self.into_stream(DualWriteRequestConsumer::new(core, file))
200            }
201        }
202    }
203
204    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
205        self.stream_to_core
206            .as_ref()
207            .map(|(auth_provider, dataset, handle)| {
208                NominalCoreConsumer::new(
209                    NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
210                    handle.clone(),
211                    auth_provider.clone(),
212                    dataset.clone(),
213                )
214            })
215    }
216
217    fn file_consumer(&self) -> Option<AvroFileConsumer> {
218        self.stream_to_file
219            .as_ref()
220            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
221    }
222
223    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
224        self.file_fallback
225            .as_ref()
226            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
227    }
228
229    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
230        let mut listeners = self.listeners;
231        listeners.push(Arc::new(LoggingListener));
232        let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
233        NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
234    }
235}
236
237// for backcompat, new code should use NominalDatasetStream
238#[deprecated]
239pub type NominalDatasourceStream = NominalDatasetStream;
240
241pub struct NominalDatasetStream {
242    opts: NominalStreamOpts,
243    running: Arc<AtomicBool>,
244    unflushed_points: Arc<AtomicUsize>,
245    primary_buffer: Arc<SeriesBuffer>,
246    secondary_buffer: Arc<SeriesBuffer>,
247    primary_handle: thread::JoinHandle<()>,
248    secondary_handle: thread::JoinHandle<()>,
249}
250
251impl NominalDatasetStream {
252    pub fn builder() -> NominalDatasetStreamBuilder {
253        NominalDatasetStreamBuilder::new()
254    }
255
256    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
257        consumer: C,
258        opts: NominalStreamOpts,
259    ) -> Self {
260        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
261        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
262
263        let (request_tx, request_rx) =
264            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
265
266        let running = Arc::new(AtomicBool::new(true));
267        let unflushed_points = Arc::new(AtomicUsize::new(0));
268
269        let primary_handle = thread::Builder::new()
270            .name("nmstream_primary".to_string())
271            .spawn({
272                let points_buffer = Arc::clone(&primary_buffer);
273                let running = running.clone();
274                let tx = request_tx.clone();
275                move || {
276                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
277                }
278            })
279            .unwrap();
280
281        let secondary_handle = thread::Builder::new()
282            .name("nmstream_secondary".to_string())
283            .spawn({
284                let secondary_buffer = Arc::clone(&secondary_buffer);
285                let running = running.clone();
286                move || {
287                    batch_processor(
288                        running,
289                        secondary_buffer,
290                        request_tx,
291                        opts.max_request_delay,
292                    );
293                }
294            })
295            .unwrap();
296
297        let consumer = Arc::new(consumer);
298
299        for i in 0..opts.request_dispatcher_tasks {
300            thread::Builder::new()
301                .name(format!("nmstream_dispatch_{i}"))
302                .spawn({
303                    let running = Arc::clone(&running);
304                    let unflushed_points = Arc::clone(&unflushed_points);
305                    let rx = request_rx.clone();
306                    let consumer = consumer.clone();
307                    move || {
308                        debug!("starting request dispatcher");
309                        request_dispatcher(running, unflushed_points, rx, consumer);
310                    }
311                })
312                .unwrap();
313        }
314
315        NominalDatasetStream {
316            opts,
317            running,
318            unflushed_points,
319            primary_buffer,
320            secondary_buffer,
321            primary_handle,
322            secondary_handle,
323        }
324    }
325
326    pub fn double_writer<'a>(
327        &'a self,
328        channel_descriptor: &'a ChannelDescriptor,
329    ) -> NominalDoubleWriter<'a> {
330        NominalDoubleWriter {
331            writer: NominalChannelWriter::new(self, channel_descriptor),
332        }
333    }
334
335    pub fn string_writer<'a>(
336        &'a self,
337        channel_descriptor: &'a ChannelDescriptor,
338    ) -> NominalStringWriter<'a> {
339        NominalStringWriter {
340            writer: NominalChannelWriter::new(self, channel_descriptor),
341        }
342    }
343
344    pub fn integer_writer<'a>(
345        &'a self,
346        channel_descriptor: &'a ChannelDescriptor,
347    ) -> NominalIntegerWriter<'a> {
348        NominalIntegerWriter {
349            writer: NominalChannelWriter::new(self, channel_descriptor),
350        }
351    }
352
353    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
354        let new_points = new_points.into_points();
355        let new_count = points_len(&new_points);
356
357        self.when_capacity(new_count, |mut sb| {
358            sb.extend(channel_descriptor, new_points)
359        });
360    }
361
362    fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
363        self.unflushed_points
364            .fetch_add(new_count, Ordering::Release);
365
366        if self.primary_buffer.has_capacity(new_count) {
367            debug!("adding {} points to primary buffer", new_count);
368            callback(self.primary_buffer.lock());
369        } else if self.secondary_buffer.has_capacity(new_count) {
370            // primary buffer is definitely full
371            self.primary_handle.thread().unpark();
372            debug!("adding {} points to secondary buffer", new_count);
373            callback(self.secondary_buffer.lock());
374        } else {
375            let buf = if self.primary_buffer < self.secondary_buffer {
376                info!("waiting for primary buffer to flush to append {new_count} points...");
377                self.primary_handle.thread().unpark();
378                &self.primary_buffer
379            } else {
380                info!("waiting for secondary buffer to flush to append {new_count} points...");
381                self.secondary_handle.thread().unpark();
382                &self.secondary_buffer
383            };
384
385            buf.on_notify(callback);
386        }
387    }
388}
389
390pub struct NominalChannelWriter<'ds, T>
391where
392    Vec<T>: IntoPoints,
393{
394    channel: &'ds ChannelDescriptor,
395    stream: &'ds NominalDatasetStream,
396    last_flushed_at: Instant,
397    unflushed: Vec<T>,
398}
399
400impl<T> NominalChannelWriter<'_, T>
401where
402    Vec<T>: IntoPoints,
403{
404    fn new<'ds>(
405        stream: &'ds NominalDatasetStream,
406        channel: &'ds ChannelDescriptor,
407    ) -> NominalChannelWriter<'ds, T> {
408        NominalChannelWriter {
409            channel,
410            stream,
411            last_flushed_at: Instant::now(),
412            unflushed: vec![],
413        }
414    }
415
416    fn push_point(&mut self, point: T) {
417        self.unflushed.push(point);
418        if self.unflushed.len() >= self.stream.opts.max_points_per_record
419            || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
420        {
421            debug!(
422                "conditionally flushing {:?}, ({} points, {:?} since last)",
423                self.channel,
424                self.unflushed.len(),
425                self.last_flushed_at.elapsed()
426            );
427            self.flush();
428        }
429    }
430
431    fn flush(&mut self) {
432        if self.unflushed.is_empty() {
433            return;
434        }
435        info!(
436            "flushing writer for {:?} with {} points",
437            self.channel,
438            self.unflushed.len()
439        );
440        self.stream.when_capacity(self.unflushed.len(), |mut buf| {
441            let to_flush: Vec<T> = self.unflushed.drain(..).collect();
442            buf.extend(self.channel, to_flush);
443            self.last_flushed_at = Instant::now();
444        })
445    }
446}
447
448impl<T> Drop for NominalChannelWriter<'_, T>
449where
450    Vec<T>: IntoPoints,
451{
452    fn drop(&mut self) {
453        info!("flushing then dropping writer for: {:?}", self.channel);
454        self.flush();
455    }
456}
457
458pub struct NominalDoubleWriter<'ds> {
459    writer: NominalChannelWriter<'ds, DoublePoint>,
460}
461
462impl NominalDoubleWriter<'_> {
463    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
464        self.writer.push_point(DoublePoint {
465            timestamp: Some(timestamp.into_timestamp()),
466            value,
467        });
468    }
469}
470
471pub struct NominalIntegerWriter<'ds> {
472    writer: NominalChannelWriter<'ds, IntegerPoint>,
473}
474
475impl NominalIntegerWriter<'_> {
476    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
477        self.writer.push_point(IntegerPoint {
478            timestamp: Some(timestamp.into_timestamp()),
479            value,
480        });
481    }
482}
483
484pub struct NominalStringWriter<'ds> {
485    writer: NominalChannelWriter<'ds, StringPoint>,
486}
487
488impl NominalStringWriter<'_> {
489    pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
490        self.writer.push_point(StringPoint {
491            timestamp: Some(timestamp.into_timestamp()),
492            value: value.into(),
493        });
494    }
495}
496
497struct SeriesBuffer {
498    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
499    /// The total number of data points in the buffer.
500    ///
501    /// To ensure that `count` stays in sync with the contents of the `HashMap`,
502    /// only update `count` through the `SeriesBufferGuard`.
503    count: AtomicUsize,
504    flush_time: AtomicU64,
505    condvar: Condvar,
506    max_capacity: usize,
507}
508
509struct SeriesBufferGuard<'sb> {
510    sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
511    count: &'sb AtomicUsize,
512}
513
514impl SeriesBufferGuard<'_> {
515    fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
516        let points = points.into_points();
517        let new_point_count = points_len(&points);
518
519        if !self.sb.contains_key(channel_descriptor) {
520            self.sb.insert(channel_descriptor.clone(), points);
521        } else {
522            match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
523                (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
524                    existing.points.extend(new.points)
525                }
526                (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
527                    existing.points.extend(new.points)
528                }
529                (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
530                    existing.points.extend(new.points)
531                }
532                (
533                    PointsType::ArrayPoints(ArrayPoints {
534                        array_type: Some(ArrayType::DoubleArrayPoints(existing)),
535                    }),
536                    PointsType::ArrayPoints(ArrayPoints {
537                        array_type: Some(ArrayType::DoubleArrayPoints(new)),
538                    }),
539                ) => existing.points.extend(new.points),
540                (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
541                    existing.points.extend(new.points)
542                }
543                (
544                    PointsType::ArrayPoints(ArrayPoints {
545                        array_type: Some(ArrayType::StringArrayPoints(existing)),
546                    }),
547                    PointsType::ArrayPoints(ArrayPoints {
548                        array_type: Some(ArrayType::StringArrayPoints(new)),
549                    }),
550                ) => existing.points.extend(new.points),
551                (
552                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
553                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
554                ) => {}
555                (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
556                    existing.points.extend(new.points);
557                }
558                // this is hideous, but exhaustive matching is good to avoid future errors
559                (
560                    PointsType::DoublePoints(_),
561                    PointsType::IntegerPoints(_)
562                    | PointsType::Uint64Points(_)
563                    | PointsType::StringPoints(_)
564                    | PointsType::ArrayPoints(_)
565                    | PointsType::StructPoints(_),
566                )
567                | (
568                    PointsType::StringPoints(_),
569                    PointsType::DoublePoints(_)
570                    | PointsType::IntegerPoints(_)
571                    | PointsType::Uint64Points(_)
572                    | PointsType::ArrayPoints(_)
573                    | PointsType::StructPoints(_),
574                )
575                | (
576                    PointsType::IntegerPoints(_),
577                    PointsType::DoublePoints(_)
578                    | PointsType::Uint64Points(_)
579                    | PointsType::StringPoints(_)
580                    | PointsType::ArrayPoints(_)
581                    | PointsType::StructPoints(_),
582                )
583                | (
584                    PointsType::ArrayPoints(_),
585                    PointsType::DoublePoints(_)
586                    | PointsType::Uint64Points(_)
587                    | PointsType::StringPoints(_)
588                    | PointsType::IntegerPoints(_)
589                    | PointsType::StructPoints(_),
590                )
591                | (
592                    PointsType::ArrayPoints(ArrayPoints {
593                        array_type: Some(_),
594                    }),
595                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
596                )
597                | (
598                    PointsType::ArrayPoints(ArrayPoints { array_type: None }),
599                    PointsType::ArrayPoints(ArrayPoints {
600                        array_type: Some(_),
601                    }),
602                )
603                | (
604                    PointsType::ArrayPoints(ArrayPoints {
605                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
606                    }),
607                    PointsType::ArrayPoints(ArrayPoints {
608                        array_type: Some(ArrayType::StringArrayPoints(_)),
609                    }),
610                )
611                | (
612                    PointsType::ArrayPoints(ArrayPoints {
613                        array_type: Some(ArrayType::StringArrayPoints(_)),
614                    }),
615                    PointsType::ArrayPoints(ArrayPoints {
616                        array_type: Some(ArrayType::DoubleArrayPoints(_)),
617                    }),
618                )
619                | (
620                    PointsType::Uint64Points(_),
621                    PointsType::IntegerPoints(_)
622                    | PointsType::StringPoints(_)
623                    | PointsType::DoublePoints(_)
624                    | PointsType::ArrayPoints(_)
625                    | PointsType::StructPoints(_),
626                )
627                | (
628                    PointsType::StructPoints(_),
629                    PointsType::DoublePoints(_)
630                    | PointsType::Uint64Points(_)
631                    | PointsType::StringPoints(_)
632                    | PointsType::IntegerPoints(_)
633                    | PointsType::ArrayPoints(_),
634                ) => {
635                    // todo: improve error
636                    panic!("mismatched types");
637                }
638            }
639        }
640
641        self.count.fetch_add(new_point_count, Ordering::Release);
642    }
643}
644
645impl PartialEq for SeriesBuffer {
646    fn eq(&self, other: &Self) -> bool {
647        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
648    }
649}
650
651impl PartialOrd for SeriesBuffer {
652    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
653        let flush_time = self.flush_time.load(Ordering::Acquire);
654        let other_flush_time = other.flush_time.load(Ordering::Acquire);
655        flush_time.partial_cmp(&other_flush_time)
656    }
657}
658
659impl SeriesBuffer {
660    fn new(capacity: usize) -> Self {
661        Self {
662            points: Mutex::new(HashMap::new()),
663            count: AtomicUsize::new(0),
664            flush_time: AtomicU64::new(0),
665            condvar: Condvar::new(),
666            max_capacity: capacity,
667        }
668    }
669
670    /// Checks if the buffer has enough capacity to add new points.
671    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
672    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
673    /// to handle splitting batches of points across multiple requests.
674    fn has_capacity(&self, new_points_count: usize) -> bool {
675        let count = self.count.load(Ordering::Acquire);
676        count == 0 || count + new_points_count <= self.max_capacity
677    }
678
679    fn lock(&self) -> SeriesBufferGuard<'_> {
680        SeriesBufferGuard {
681            sb: self.points.lock(),
682            count: &self.count,
683        }
684    }
685
686    fn take(&self) -> (usize, Vec<Series>) {
687        let mut points = self.lock();
688        self.flush_time.store(
689            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
690            Ordering::Release,
691        );
692        let result = points
693            .sb
694            .drain()
695            .map(|(ChannelDescriptor { name, tags }, points)| {
696                let channel = Channel { name };
697                let points_obj = Points {
698                    points_type: Some(points),
699                };
700                Series {
701                    channel: Some(channel),
702                    tags: tags
703                        .map(|tags| tags.into_iter().collect())
704                        .unwrap_or_default(),
705                    points: Some(points_obj),
706                }
707            })
708            .collect();
709        let result_count = points
710            .count
711            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
712            .unwrap();
713        (result_count, result)
714    }
715
716    fn is_empty(&self) -> bool {
717        self.count() == 0
718    }
719
720    fn count(&self) -> usize {
721        self.count.load(Ordering::Acquire)
722    }
723
724    fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
725        let mut points_lock = self.points.lock();
726        // concurrency bug without this - the buffer could have been emptied since we
727        // checked the count, so this will wait forever & block any new points from entering
728        if !points_lock.is_empty() {
729            self.condvar.wait(&mut points_lock);
730        } else {
731            debug!("buffer emptied since last check, skipping condvar wait");
732        }
733        on_notify(SeriesBufferGuard {
734            sb: points_lock,
735            count: &self.count,
736        });
737    }
738
739    fn notify(&self) -> bool {
740        self.condvar.notify_one()
741    }
742}
743
744fn batch_processor(
745    running: Arc<AtomicBool>,
746    points_buffer: Arc<SeriesBuffer>,
747    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
748    max_request_delay: Duration,
749) {
750    loop {
751        debug!("starting processor loop");
752        if points_buffer.is_empty() {
753            if !running.load(Ordering::Acquire) {
754                debug!("batch processor thread exiting due to running flag");
755                drop(request_chan);
756                break;
757            } else {
758                debug!("empty points buffer, waiting");
759                thread::park_timeout(max_request_delay);
760            }
761            continue;
762        }
763        let (point_count, series) = points_buffer.take();
764
765        if points_buffer.notify() {
766            debug!("notified one waiting thread after clearing points buffer");
767        }
768
769        let write_request = WriteRequestNominal { series };
770
771        if request_chan.is_full() {
772            debug!("ready to queue request but request channel is full");
773        }
774        let rep = request_chan.send((write_request, point_count));
775        debug!("queued request for processing");
776        if rep.is_err() {
777            error!("failed to send request to dispatcher");
778        } else {
779            debug!("finished submitting request");
780        }
781
782        thread::park_timeout(max_request_delay);
783    }
784    debug!("batch processor thread exiting");
785}
786
787impl Drop for NominalDatasetStream {
788    fn drop(&mut self) {
789        debug!("starting drop for NominalDatasetStream");
790        self.running.store(false, Ordering::Release);
791        loop {
792            let count = self.unflushed_points.load(Ordering::Acquire);
793            if count == 0 {
794                break;
795            }
796            debug!(
797                "waiting for all points to be flushed before dropping stream, {count} points remaining",
798            );
799            // todo: reduce this + give up after some maximum timeout is reached
800            thread::sleep(Duration::from_millis(50));
801        }
802    }
803}
804
805fn request_dispatcher<C: WriteRequestConsumer + 'static>(
806    running: Arc<AtomicBool>,
807    unflushed_points: Arc<AtomicUsize>,
808    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
809    consumer: Arc<C>,
810) {
811    let mut total_request_time = 0;
812    loop {
813        match request_rx.recv() {
814            Ok((request, point_count)) => {
815                debug!("received writerequest from channel");
816                let req_start = Instant::now();
817                match consumer.consume(&request) {
818                    Ok(_) => {
819                        let time = req_start.elapsed().as_millis();
820                        debug!("request of {} points sent in {} ms", point_count, time);
821                        total_request_time += time as u64;
822                    }
823                    Err(e) => {
824                        error!("Failed to send request: {e:?}");
825                    }
826                }
827                unflushed_points.fetch_sub(point_count, Ordering::Release);
828
829                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
830                {
831                    info!("all points flushed, closing dispatcher thread");
832                    // notify the processor thread that all points have been flushed
833                    drop(request_rx);
834                    break;
835                }
836            }
837            Err(e) => {
838                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
839                break;
840            }
841        }
842    }
843    debug!(
844        "request dispatcher thread exiting. total request time: {}",
845        total_request_time
846    );
847}
848
849fn points_len(points_type: &PointsType) -> usize {
850    match points_type {
851        PointsType::DoublePoints(points) => points.points.len(),
852        PointsType::StringPoints(points) => points.points.len(),
853        PointsType::IntegerPoints(points) => points.points.len(),
854        PointsType::Uint64Points(points) => points.points.len(),
855        PointsType::ArrayPoints(points) => match &points.array_type {
856            Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
857            Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
858            None => 0,
859        },
860        PointsType::StructPoints(points) => points.points.len(),
861    }
862}