1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::Points;
23use nominal_api::tonic::io::nominal::scout::api::proto::Series;
24use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
25use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
26use parking_lot::Condvar;
27use parking_lot::Mutex;
28use parking_lot::MutexGuard;
29use tracing::debug;
30use tracing::error;
31use tracing::info;
32
33use crate::client::NominalApiClients;
34use crate::client::PRODUCTION_API_URL;
35use crate::consumer::AvroFileConsumer;
36use crate::consumer::DualWriteRequestConsumer;
37use crate::consumer::ListeningWriteRequestConsumer;
38use crate::consumer::NominalCoreConsumer;
39use crate::consumer::RequestConsumerWithFallback;
40use crate::consumer::WriteRequestConsumer;
41use crate::listener::LoggingListener;
42use crate::types::ChannelDescriptor;
43use crate::types::IntoPoints;
44use crate::types::IntoTimestamp;
45
46#[derive(Debug, Clone)]
47pub struct NominalStreamOpts {
48 pub max_points_per_record: usize,
49 pub max_request_delay: Duration,
50 pub max_buffered_requests: usize,
51 pub request_dispatcher_tasks: usize,
52 pub base_api_url: String,
53}
54
55impl Default for NominalStreamOpts {
56 fn default() -> Self {
57 Self {
58 max_points_per_record: 250_000,
59 max_request_delay: Duration::from_millis(100),
60 max_buffered_requests: 4,
61 request_dispatcher_tasks: 8,
62 base_api_url: PRODUCTION_API_URL.to_string(),
63 }
64 }
65}
66
67#[derive(Default)]
68pub struct NominalDatasetStreamBuilder {
69 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
70 stream_to_file: Option<PathBuf>,
71 file_fallback: Option<PathBuf>,
72 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
73 opts: NominalStreamOpts,
74}
75
76impl Debug for NominalDatasetStreamBuilder {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("NominalDatasetStreamBuilder")
79 .field("stream_to_core", &self.stream_to_core.is_some())
80 .field("stream_to_file", &self.stream_to_file)
81 .field("file_fallback", &self.file_fallback)
82 .field("listeners", &self.listeners.len())
83 .finish()
84 }
85}
86
87impl NominalDatasetStreamBuilder {
88 pub fn new() -> Self {
89 Self::default()
90 }
91}
92
93impl NominalDatasetStreamBuilder {
94 pub fn stream_to_core(
95 self,
96 bearer_token: BearerToken,
97 dataset: ResourceIdentifier,
98 handle: tokio::runtime::Handle,
99 ) -> NominalDatasetStreamBuilder {
100 NominalDatasetStreamBuilder {
101 stream_to_core: Some((bearer_token, dataset, handle)),
102 stream_to_file: self.stream_to_file,
103 file_fallback: self.file_fallback,
104 listeners: self.listeners,
105 opts: self.opts,
106 }
107 }
108
109 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
110 self.stream_to_file = Some(file_path.into());
111 self
112 }
113
114 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
115 self.file_fallback = Some(file_path.into());
116 self
117 }
118
119 pub fn add_listener(
120 mut self,
121 listener: Arc<dyn crate::listener::NominalStreamListener>,
122 ) -> Self {
123 self.listeners.push(listener);
124 self
125 }
126
127 pub fn with_listeners(
128 mut self,
129 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
130 ) -> Self {
131 self.listeners = listeners;
132 self
133 }
134
135 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
136 self.opts = opts;
137 self
138 }
139
140 #[cfg(feature = "logging")]
141 fn init_logging(self, directive: Option<&str>) -> Self {
142 use tracing_subscriber::layer::SubscriberExt;
143 use tracing_subscriber::util::SubscriberInitExt;
144
145 let base = tracing_subscriber::EnvFilter::builder()
147 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
148 let env_filter = match directive {
149 Some(d) => base.parse_lossy(d),
150 None => base.from_env_lossy(),
151 };
152
153 let subscriber = tracing_subscriber::registry()
154 .with(
155 tracing_subscriber::fmt::layer()
156 .with_thread_ids(true)
157 .with_thread_names(true)
158 .with_line_number(true),
159 )
160 .with(env_filter);
161
162 if let Err(error) = subscriber.try_init() {
163 eprintln!("nominal streaming failed to enable logging: {error}");
164 }
165
166 self
167 }
168
169 #[cfg(feature = "logging")]
170 pub fn enable_logging(self) -> Self {
171 self.init_logging(None)
172 }
173
174 #[cfg(feature = "logging")]
175 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
176 self.init_logging(Some(log_directive))
177 }
178
179 pub fn build(self) -> NominalDatasetStream {
180 let core_consumer = self.core_consumer();
181 let file_consumer = self.file_consumer();
182 let fallback_consumer = self.fallback_consumer();
183
184 match (core_consumer, file_consumer, fallback_consumer) {
185 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
186 (Some(_), Some(_), Some(_)) => {
187 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
188 }
189 (Some(core), None, None) => self.into_stream(core),
190 (Some(core), None, Some(fallback)) => {
191 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
192 }
193 (None, Some(file), None) => self.into_stream(file),
194 (None, Some(file), Some(fallback)) => {
195 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
197 }
198 (Some(core), Some(file), None) => {
199 self.into_stream(DualWriteRequestConsumer::new(core, file))
200 }
201 }
202 }
203
204 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
205 self.stream_to_core
206 .as_ref()
207 .map(|(auth_provider, dataset, handle)| {
208 NominalCoreConsumer::new(
209 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
210 handle.clone(),
211 auth_provider.clone(),
212 dataset.clone(),
213 )
214 })
215 }
216
217 fn file_consumer(&self) -> Option<AvroFileConsumer> {
218 self.stream_to_file
219 .as_ref()
220 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
221 }
222
223 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
224 self.file_fallback
225 .as_ref()
226 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
227 }
228
229 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
230 let mut listeners = self.listeners;
231 listeners.push(Arc::new(LoggingListener));
232 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
233 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
234 }
235}
236
237#[deprecated]
239pub type NominalDatasourceStream = NominalDatasetStream;
240
241pub struct NominalDatasetStream {
242 opts: NominalStreamOpts,
243 running: Arc<AtomicBool>,
244 unflushed_points: Arc<AtomicUsize>,
245 primary_buffer: Arc<SeriesBuffer>,
246 secondary_buffer: Arc<SeriesBuffer>,
247 primary_handle: thread::JoinHandle<()>,
248 secondary_handle: thread::JoinHandle<()>,
249}
250
251impl NominalDatasetStream {
252 pub fn builder() -> NominalDatasetStreamBuilder {
253 NominalDatasetStreamBuilder::new()
254 }
255
256 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
257 consumer: C,
258 opts: NominalStreamOpts,
259 ) -> Self {
260 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
261 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
262
263 let (request_tx, request_rx) =
264 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
265
266 let running = Arc::new(AtomicBool::new(true));
267 let unflushed_points = Arc::new(AtomicUsize::new(0));
268
269 let primary_handle = thread::Builder::new()
270 .name("nmstream_primary".to_string())
271 .spawn({
272 let points_buffer = Arc::clone(&primary_buffer);
273 let running = running.clone();
274 let tx = request_tx.clone();
275 move || {
276 batch_processor(running, points_buffer, tx, opts.max_request_delay);
277 }
278 })
279 .unwrap();
280
281 let secondary_handle = thread::Builder::new()
282 .name("nmstream_secondary".to_string())
283 .spawn({
284 let secondary_buffer = Arc::clone(&secondary_buffer);
285 let running = running.clone();
286 move || {
287 batch_processor(
288 running,
289 secondary_buffer,
290 request_tx,
291 opts.max_request_delay,
292 );
293 }
294 })
295 .unwrap();
296
297 let consumer = Arc::new(consumer);
298
299 for i in 0..opts.request_dispatcher_tasks {
300 thread::Builder::new()
301 .name(format!("nmstream_dispatch_{i}"))
302 .spawn({
303 let running = Arc::clone(&running);
304 let unflushed_points = Arc::clone(&unflushed_points);
305 let rx = request_rx.clone();
306 let consumer = consumer.clone();
307 move || {
308 debug!("starting request dispatcher");
309 request_dispatcher(running, unflushed_points, rx, consumer);
310 }
311 })
312 .unwrap();
313 }
314
315 NominalDatasetStream {
316 opts,
317 running,
318 unflushed_points,
319 primary_buffer,
320 secondary_buffer,
321 primary_handle,
322 secondary_handle,
323 }
324 }
325
326 pub fn double_writer<'a>(
327 &'a self,
328 channel_descriptor: &'a ChannelDescriptor,
329 ) -> NominalDoubleWriter<'a> {
330 NominalDoubleWriter {
331 writer: NominalChannelWriter::new(self, channel_descriptor),
332 }
333 }
334
335 pub fn string_writer<'a>(
336 &'a self,
337 channel_descriptor: &'a ChannelDescriptor,
338 ) -> NominalStringWriter<'a> {
339 NominalStringWriter {
340 writer: NominalChannelWriter::new(self, channel_descriptor),
341 }
342 }
343
344 pub fn integer_writer<'a>(
345 &'a self,
346 channel_descriptor: &'a ChannelDescriptor,
347 ) -> NominalIntegerWriter<'a> {
348 NominalIntegerWriter {
349 writer: NominalChannelWriter::new(self, channel_descriptor),
350 }
351 }
352
353 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
354 let new_points = new_points.into_points();
355 let new_count = points_len(&new_points);
356
357 self.when_capacity(new_count, |mut sb| {
358 sb.extend(channel_descriptor, new_points)
359 });
360 }
361
362 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
363 self.unflushed_points
364 .fetch_add(new_count, Ordering::Release);
365
366 if self.primary_buffer.has_capacity(new_count) {
367 debug!("adding {} points to primary buffer", new_count);
368 callback(self.primary_buffer.lock());
369 } else if self.secondary_buffer.has_capacity(new_count) {
370 self.primary_handle.thread().unpark();
372 debug!("adding {} points to secondary buffer", new_count);
373 callback(self.secondary_buffer.lock());
374 } else {
375 let buf = if self.primary_buffer < self.secondary_buffer {
376 info!("waiting for primary buffer to flush to append {new_count} points...");
377 self.primary_handle.thread().unpark();
378 &self.primary_buffer
379 } else {
380 info!("waiting for secondary buffer to flush to append {new_count} points...");
381 self.secondary_handle.thread().unpark();
382 &self.secondary_buffer
383 };
384
385 buf.on_notify(callback);
386 }
387 }
388}
389
390pub struct NominalChannelWriter<'ds, T>
391where
392 Vec<T>: IntoPoints,
393{
394 channel: &'ds ChannelDescriptor,
395 stream: &'ds NominalDatasetStream,
396 last_flushed_at: Instant,
397 unflushed: Vec<T>,
398}
399
400impl<T> NominalChannelWriter<'_, T>
401where
402 Vec<T>: IntoPoints,
403{
404 fn new<'ds>(
405 stream: &'ds NominalDatasetStream,
406 channel: &'ds ChannelDescriptor,
407 ) -> NominalChannelWriter<'ds, T> {
408 NominalChannelWriter {
409 channel,
410 stream,
411 last_flushed_at: Instant::now(),
412 unflushed: vec![],
413 }
414 }
415
416 fn push_point(&mut self, point: T) {
417 self.unflushed.push(point);
418 if self.unflushed.len() >= self.stream.opts.max_points_per_record
419 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
420 {
421 debug!(
422 "conditionally flushing {:?}, ({} points, {:?} since last)",
423 self.channel,
424 self.unflushed.len(),
425 self.last_flushed_at.elapsed()
426 );
427 self.flush();
428 }
429 }
430
431 fn flush(&mut self) {
432 if self.unflushed.is_empty() {
433 return;
434 }
435 info!(
436 "flushing writer for {:?} with {} points",
437 self.channel,
438 self.unflushed.len()
439 );
440 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
441 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
442 buf.extend(self.channel, to_flush);
443 self.last_flushed_at = Instant::now();
444 })
445 }
446}
447
448impl<T> Drop for NominalChannelWriter<'_, T>
449where
450 Vec<T>: IntoPoints,
451{
452 fn drop(&mut self) {
453 info!("flushing then dropping writer for: {:?}", self.channel);
454 self.flush();
455 }
456}
457
458pub struct NominalDoubleWriter<'ds> {
459 writer: NominalChannelWriter<'ds, DoublePoint>,
460}
461
462impl NominalDoubleWriter<'_> {
463 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
464 self.writer.push_point(DoublePoint {
465 timestamp: Some(timestamp.into_timestamp()),
466 value,
467 });
468 }
469}
470
471pub struct NominalIntegerWriter<'ds> {
472 writer: NominalChannelWriter<'ds, IntegerPoint>,
473}
474
475impl NominalIntegerWriter<'_> {
476 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
477 self.writer.push_point(IntegerPoint {
478 timestamp: Some(timestamp.into_timestamp()),
479 value,
480 });
481 }
482}
483
484pub struct NominalStringWriter<'ds> {
485 writer: NominalChannelWriter<'ds, StringPoint>,
486}
487
488impl NominalStringWriter<'_> {
489 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
490 self.writer.push_point(StringPoint {
491 timestamp: Some(timestamp.into_timestamp()),
492 value: value.into(),
493 });
494 }
495}
496
497struct SeriesBuffer {
498 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
499 count: AtomicUsize,
500 flush_time: AtomicU64,
501 condvar: Condvar,
502 max_capacity: usize,
503}
504
505struct SeriesBufferGuard<'sb> {
506 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
507 count: &'sb AtomicUsize,
508}
509
510impl SeriesBufferGuard<'_> {
511 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
512 let points = points.into_points();
513 let new_point_count = points_len(&points);
514
515 if !self.sb.contains_key(channel_descriptor) {
516 self.sb.insert(channel_descriptor.clone(), points);
517 } else {
518 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
519 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
520 existing.points.extend(new.points)
521 }
522 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
523 existing.points.extend(new.points)
524 }
525 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
526 existing.points.extend(new.points)
527 }
528 (
529 PointsType::ArrayPoints(ArrayPoints {
530 array_type: Some(ArrayType::DoubleArrayPoints(existing)),
531 }),
532 PointsType::ArrayPoints(ArrayPoints {
533 array_type: Some(ArrayType::DoubleArrayPoints(new)),
534 }),
535 ) => existing.points.extend(new.points),
536 (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
537 existing.points.extend(new.points)
538 }
539 (
540 PointsType::ArrayPoints(ArrayPoints {
541 array_type: Some(ArrayType::StringArrayPoints(existing)),
542 }),
543 PointsType::ArrayPoints(ArrayPoints {
544 array_type: Some(ArrayType::StringArrayPoints(new)),
545 }),
546 ) => existing.points.extend(new.points),
547 (
548 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
549 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
550 ) => {}
551 (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
552 existing.points.extend(new.points);
553 }
554 (
556 PointsType::DoublePoints(_),
557 PointsType::IntegerPoints(_)
558 | PointsType::Uint64Points(_)
559 | PointsType::StringPoints(_)
560 | PointsType::ArrayPoints(_)
561 | PointsType::StructPoints(_),
562 )
563 | (
564 PointsType::StringPoints(_),
565 PointsType::DoublePoints(_)
566 | PointsType::IntegerPoints(_)
567 | PointsType::Uint64Points(_)
568 | PointsType::ArrayPoints(_)
569 | PointsType::StructPoints(_),
570 )
571 | (
572 PointsType::IntegerPoints(_),
573 PointsType::DoublePoints(_)
574 | PointsType::Uint64Points(_)
575 | PointsType::StringPoints(_)
576 | PointsType::ArrayPoints(_)
577 | PointsType::StructPoints(_),
578 )
579 | (
580 PointsType::ArrayPoints(_),
581 PointsType::DoublePoints(_)
582 | PointsType::Uint64Points(_)
583 | PointsType::StringPoints(_)
584 | PointsType::IntegerPoints(_)
585 | PointsType::StructPoints(_),
586 )
587 | (
588 PointsType::ArrayPoints(ArrayPoints {
589 array_type: Some(_),
590 }),
591 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
592 )
593 | (
594 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
595 PointsType::ArrayPoints(ArrayPoints {
596 array_type: Some(_),
597 }),
598 )
599 | (
600 PointsType::ArrayPoints(ArrayPoints {
601 array_type: Some(ArrayType::DoubleArrayPoints(_)),
602 }),
603 PointsType::ArrayPoints(ArrayPoints {
604 array_type: Some(ArrayType::StringArrayPoints(_)),
605 }),
606 )
607 | (
608 PointsType::ArrayPoints(ArrayPoints {
609 array_type: Some(ArrayType::StringArrayPoints(_)),
610 }),
611 PointsType::ArrayPoints(ArrayPoints {
612 array_type: Some(ArrayType::DoubleArrayPoints(_)),
613 }),
614 )
615 | (
616 PointsType::Uint64Points(_),
617 PointsType::IntegerPoints(_)
618 | PointsType::StringPoints(_)
619 | PointsType::DoublePoints(_)
620 | PointsType::ArrayPoints(_)
621 | PointsType::StructPoints(_),
622 )
623 | (
624 PointsType::StructPoints(_),
625 PointsType::DoublePoints(_)
626 | PointsType::Uint64Points(_)
627 | PointsType::StringPoints(_)
628 | PointsType::IntegerPoints(_)
629 | PointsType::ArrayPoints(_),
630 ) => {
631 panic!("mismatched types");
633 }
634 }
635 }
636
637 self.count.fetch_add(new_point_count, Ordering::Release);
638 }
639}
640
641impl PartialEq for SeriesBuffer {
642 fn eq(&self, other: &Self) -> bool {
643 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
644 }
645}
646
647impl PartialOrd for SeriesBuffer {
648 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
649 let flush_time = self.flush_time.load(Ordering::Acquire);
650 let other_flush_time = other.flush_time.load(Ordering::Acquire);
651 flush_time.partial_cmp(&other_flush_time)
652 }
653}
654
655impl SeriesBuffer {
656 fn new(capacity: usize) -> Self {
657 Self {
658 points: Mutex::new(HashMap::new()),
659 count: AtomicUsize::new(0),
660 flush_time: AtomicU64::new(0),
661 condvar: Condvar::new(),
662 max_capacity: capacity,
663 }
664 }
665
666 fn has_capacity(&self, new_points_count: usize) -> bool {
671 let count = self.count.load(Ordering::Acquire);
672 count == 0 || count + new_points_count <= self.max_capacity
673 }
674
675 fn lock(&self) -> SeriesBufferGuard<'_> {
676 SeriesBufferGuard {
677 sb: self.points.lock(),
678 count: &self.count,
679 }
680 }
681
682 fn take(&self) -> (usize, Vec<Series>) {
683 let mut points = self.lock();
684 self.flush_time.store(
685 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
686 Ordering::Release,
687 );
688 let result = points
689 .sb
690 .drain()
691 .map(|(ChannelDescriptor { name, tags }, points)| {
692 let channel = Channel { name };
693 let points_obj = Points {
694 points_type: Some(points),
695 };
696 Series {
697 channel: Some(channel),
698 tags: tags
699 .map(|tags| tags.into_iter().collect())
700 .unwrap_or_default(),
701 points: Some(points_obj),
702 }
703 })
704 .collect();
705 let result_count = self
706 .count
707 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
708 .unwrap();
709 (result_count, result)
710 }
711
712 fn is_empty(&self) -> bool {
713 self.count() == 0
714 }
715
716 fn count(&self) -> usize {
717 self.count.load(Ordering::Acquire)
718 }
719
720 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
721 let mut points_lock = self.points.lock();
722 if !points_lock.is_empty() {
725 self.condvar.wait(&mut points_lock);
726 } else {
727 debug!("buffer emptied since last check, skipping condvar wait");
728 }
729 on_notify(SeriesBufferGuard {
730 sb: points_lock,
731 count: &self.count,
732 });
733 }
734
735 fn notify(&self) -> bool {
736 self.condvar.notify_one()
737 }
738}
739
740fn batch_processor(
741 running: Arc<AtomicBool>,
742 points_buffer: Arc<SeriesBuffer>,
743 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
744 max_request_delay: Duration,
745) {
746 loop {
747 debug!("starting processor loop");
748 if points_buffer.is_empty() {
749 if !running.load(Ordering::Acquire) {
750 debug!("batch processor thread exiting due to running flag");
751 drop(request_chan);
752 break;
753 } else {
754 debug!("empty points buffer, waiting");
755 thread::park_timeout(max_request_delay);
756 }
757 continue;
758 }
759 let (point_count, series) = points_buffer.take();
760
761 if points_buffer.notify() {
762 debug!("notified one waiting thread after clearing points buffer");
763 }
764
765 let write_request = WriteRequestNominal { series };
766
767 if request_chan.is_full() {
768 debug!("ready to queue request but request channel is full");
769 }
770 let rep = request_chan.send((write_request, point_count));
771 debug!("queued request for processing");
772 if rep.is_err() {
773 error!("failed to send request to dispatcher");
774 } else {
775 debug!("finished submitting request");
776 }
777
778 thread::park_timeout(max_request_delay);
779 }
780 debug!("batch processor thread exiting");
781}
782
783impl Drop for NominalDatasetStream {
784 fn drop(&mut self) {
785 debug!("starting drop for NominalDatasetStream");
786 self.running.store(false, Ordering::Release);
787 loop {
788 let count = self.unflushed_points.load(Ordering::Acquire);
789 if count == 0 {
790 break;
791 }
792 debug!(
793 "waiting for all points to be flushed before dropping stream, {count} points remaining",
794 );
795 thread::sleep(Duration::from_millis(50));
797 }
798 }
799}
800
801fn request_dispatcher<C: WriteRequestConsumer + 'static>(
802 running: Arc<AtomicBool>,
803 unflushed_points: Arc<AtomicUsize>,
804 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
805 consumer: Arc<C>,
806) {
807 let mut total_request_time = 0;
808 loop {
809 match request_rx.recv() {
810 Ok((request, point_count)) => {
811 debug!("received writerequest from channel");
812 let req_start = Instant::now();
813 match consumer.consume(&request) {
814 Ok(_) => {
815 let time = req_start.elapsed().as_millis();
816 debug!("request of {} points sent in {} ms", point_count, time);
817 total_request_time += time as u64;
818 }
819 Err(e) => {
820 error!("Failed to send request: {e:?}");
821 }
822 }
823 unflushed_points.fetch_sub(point_count, Ordering::Release);
824
825 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
826 {
827 info!("all points flushed, closing dispatcher thread");
828 drop(request_rx);
830 break;
831 }
832 }
833 Err(e) => {
834 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
835 break;
836 }
837 }
838 }
839 debug!(
840 "request dispatcher thread exiting. total request time: {}",
841 total_request_time
842 );
843}
844
845fn points_len(points_type: &PointsType) -> usize {
846 match points_type {
847 PointsType::DoublePoints(points) => points.points.len(),
848 PointsType::StringPoints(points) => points.points.len(),
849 PointsType::IntegerPoints(points) => points.points.len(),
850 PointsType::Uint64Points(points) => points.points.len(),
851 PointsType::ArrayPoints(points) => match &points.array_type {
852 Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
853 Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
854 None => 0,
855 },
856 PointsType::StructPoints(points) => points.points.len(),
857 }
858}