1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::Points;
23use nominal_api::tonic::io::nominal::scout::api::proto::Series;
24use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
25use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
26use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
27use parking_lot::Condvar;
28use parking_lot::Mutex;
29use parking_lot::MutexGuard;
30use tracing::debug;
31use tracing::error;
32use tracing::info;
33
34use crate::client::NominalApiClients;
35use crate::client::PRODUCTION_API_URL;
36use crate::consumer::AvroFileConsumer;
37use crate::consumer::DualWriteRequestConsumer;
38use crate::consumer::ListeningWriteRequestConsumer;
39use crate::consumer::NominalCoreConsumer;
40use crate::consumer::RequestConsumerWithFallback;
41use crate::consumer::WriteRequestConsumer;
42use crate::listener::LoggingListener;
43use crate::types::ChannelDescriptor;
44use crate::types::IntoPoints;
45use crate::types::IntoTimestamp;
46
47#[derive(Debug, Clone)]
48pub struct NominalStreamOpts {
49 pub max_points_per_record: usize,
50 pub max_request_delay: Duration,
51 pub max_buffered_requests: usize,
52 pub request_dispatcher_tasks: usize,
53 pub base_api_url: String,
54}
55
56impl Default for NominalStreamOpts {
57 fn default() -> Self {
58 Self {
59 max_points_per_record: 250_000,
60 max_request_delay: Duration::from_millis(100),
61 max_buffered_requests: 4,
62 request_dispatcher_tasks: 8,
63 base_api_url: PRODUCTION_API_URL.to_string(),
64 }
65 }
66}
67
68#[derive(Default)]
69pub struct NominalDatasetStreamBuilder {
70 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
71 stream_to_file: Option<PathBuf>,
72 file_fallback: Option<PathBuf>,
73 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
74 opts: NominalStreamOpts,
75}
76
77impl Debug for NominalDatasetStreamBuilder {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("NominalDatasetStreamBuilder")
80 .field("stream_to_core", &self.stream_to_core.is_some())
81 .field("stream_to_file", &self.stream_to_file)
82 .field("file_fallback", &self.file_fallback)
83 .field("listeners", &self.listeners.len())
84 .finish()
85 }
86}
87
88impl NominalDatasetStreamBuilder {
89 pub fn new() -> Self {
90 Self::default()
91 }
92}
93
94impl NominalDatasetStreamBuilder {
95 pub fn stream_to_core(
96 self,
97 bearer_token: BearerToken,
98 dataset: ResourceIdentifier,
99 handle: tokio::runtime::Handle,
100 ) -> NominalDatasetStreamBuilder {
101 NominalDatasetStreamBuilder {
102 stream_to_core: Some((bearer_token, dataset, handle)),
103 stream_to_file: self.stream_to_file,
104 file_fallback: self.file_fallback,
105 listeners: self.listeners,
106 opts: self.opts,
107 }
108 }
109
110 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
111 self.stream_to_file = Some(file_path.into());
112 self
113 }
114
115 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
116 self.file_fallback = Some(file_path.into());
117 self
118 }
119
120 pub fn add_listener(
121 mut self,
122 listener: Arc<dyn crate::listener::NominalStreamListener>,
123 ) -> Self {
124 self.listeners.push(listener);
125 self
126 }
127
128 pub fn with_listeners(
129 mut self,
130 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
131 ) -> Self {
132 self.listeners = listeners;
133 self
134 }
135
136 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
137 self.opts = opts;
138 self
139 }
140
141 #[cfg(feature = "logging")]
142 fn init_logging(self, directive: Option<&str>) -> Self {
143 use tracing_subscriber::layer::SubscriberExt;
144 use tracing_subscriber::util::SubscriberInitExt;
145
146 let base = tracing_subscriber::EnvFilter::builder()
148 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
149 let env_filter = match directive {
150 Some(d) => base.parse_lossy(d),
151 None => base.from_env_lossy(),
152 };
153
154 let subscriber = tracing_subscriber::registry()
155 .with(
156 tracing_subscriber::fmt::layer()
157 .with_thread_ids(true)
158 .with_thread_names(true)
159 .with_line_number(true),
160 )
161 .with(env_filter);
162
163 if let Err(error) = subscriber.try_init() {
164 eprintln!("nominal streaming failed to enable logging: {error}");
165 }
166
167 self
168 }
169
170 #[cfg(feature = "logging")]
171 pub fn enable_logging(self) -> Self {
172 self.init_logging(None)
173 }
174
175 #[cfg(feature = "logging")]
176 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
177 self.init_logging(Some(log_directive))
178 }
179
180 pub fn build(self) -> NominalDatasetStream {
181 let core_consumer = self.core_consumer();
182 let file_consumer = self.file_consumer();
183 let fallback_consumer = self.fallback_consumer();
184
185 match (core_consumer, file_consumer, fallback_consumer) {
186 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
187 (Some(_), Some(_), Some(_)) => {
188 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
189 }
190 (Some(core), None, None) => self.into_stream(core),
191 (Some(core), None, Some(fallback)) => {
192 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
193 }
194 (None, Some(file), None) => self.into_stream(file),
195 (None, Some(file), Some(fallback)) => {
196 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
198 }
199 (Some(core), Some(file), None) => {
200 self.into_stream(DualWriteRequestConsumer::new(core, file))
201 }
202 }
203 }
204
205 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
206 self.stream_to_core
207 .as_ref()
208 .map(|(auth_provider, dataset, handle)| {
209 NominalCoreConsumer::new(
210 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
211 handle.clone(),
212 auth_provider.clone(),
213 dataset.clone(),
214 )
215 })
216 }
217
218 fn file_consumer(&self) -> Option<AvroFileConsumer> {
219 self.stream_to_file
220 .as_ref()
221 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
222 }
223
224 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
225 self.file_fallback
226 .as_ref()
227 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
228 }
229
230 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
231 let mut listeners = self.listeners;
232 listeners.push(Arc::new(LoggingListener));
233 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
234 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
235 }
236}
237
238#[deprecated]
240pub type NominalDatasourceStream = NominalDatasetStream;
241
242pub struct NominalDatasetStream {
243 opts: NominalStreamOpts,
244 running: Arc<AtomicBool>,
245 unflushed_points: Arc<AtomicUsize>,
246 primary_buffer: Arc<SeriesBuffer>,
247 secondary_buffer: Arc<SeriesBuffer>,
248 primary_handle: thread::JoinHandle<()>,
249 secondary_handle: thread::JoinHandle<()>,
250}
251
252impl NominalDatasetStream {
253 pub fn builder() -> NominalDatasetStreamBuilder {
254 NominalDatasetStreamBuilder::new()
255 }
256
257 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
258 consumer: C,
259 opts: NominalStreamOpts,
260 ) -> Self {
261 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
262 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
263
264 let (request_tx, request_rx) =
265 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
266
267 let running = Arc::new(AtomicBool::new(true));
268 let unflushed_points = Arc::new(AtomicUsize::new(0));
269
270 let primary_handle = thread::Builder::new()
271 .name("nmstream_primary".to_string())
272 .spawn({
273 let points_buffer = Arc::clone(&primary_buffer);
274 let running = running.clone();
275 let tx = request_tx.clone();
276 move || {
277 batch_processor(running, points_buffer, tx, opts.max_request_delay);
278 }
279 })
280 .unwrap();
281
282 let secondary_handle = thread::Builder::new()
283 .name("nmstream_secondary".to_string())
284 .spawn({
285 let secondary_buffer = Arc::clone(&secondary_buffer);
286 let running = running.clone();
287 move || {
288 batch_processor(
289 running,
290 secondary_buffer,
291 request_tx,
292 opts.max_request_delay,
293 );
294 }
295 })
296 .unwrap();
297
298 let consumer = Arc::new(consumer);
299
300 for i in 0..opts.request_dispatcher_tasks {
301 thread::Builder::new()
302 .name(format!("nmstream_dispatch_{i}"))
303 .spawn({
304 let running = Arc::clone(&running);
305 let unflushed_points = Arc::clone(&unflushed_points);
306 let rx = request_rx.clone();
307 let consumer = consumer.clone();
308 move || {
309 debug!("starting request dispatcher");
310 request_dispatcher(running, unflushed_points, rx, consumer);
311 }
312 })
313 .unwrap();
314 }
315
316 NominalDatasetStream {
317 opts,
318 running,
319 unflushed_points,
320 primary_buffer,
321 secondary_buffer,
322 primary_handle,
323 secondary_handle,
324 }
325 }
326
327 pub fn double_writer<'a>(
328 &'a self,
329 channel_descriptor: &'a ChannelDescriptor,
330 ) -> NominalDoubleWriter<'a> {
331 NominalDoubleWriter {
332 writer: NominalChannelWriter::new(self, channel_descriptor),
333 }
334 }
335
336 pub fn string_writer<'a>(
337 &'a self,
338 channel_descriptor: &'a ChannelDescriptor,
339 ) -> NominalStringWriter<'a> {
340 NominalStringWriter {
341 writer: NominalChannelWriter::new(self, channel_descriptor),
342 }
343 }
344
345 pub fn integer_writer<'a>(
346 &'a self,
347 channel_descriptor: &'a ChannelDescriptor,
348 ) -> NominalIntegerWriter<'a> {
349 NominalIntegerWriter {
350 writer: NominalChannelWriter::new(self, channel_descriptor),
351 }
352 }
353
354 pub fn uint64_writer<'a>(
355 &'a self,
356 channel_descriptor: &'a ChannelDescriptor,
357 ) -> NominalUint64Writer<'a> {
358 NominalUint64Writer {
359 writer: NominalChannelWriter::new(self, channel_descriptor),
360 }
361 }
362
363 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
364 let new_points = new_points.into_points();
365 let new_count = points_len(&new_points);
366
367 self.when_capacity(new_count, |mut sb| {
368 sb.extend(channel_descriptor, new_points)
369 });
370 }
371
372 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
373 self.unflushed_points
374 .fetch_add(new_count, Ordering::Release);
375
376 if self.primary_buffer.has_capacity(new_count) {
377 debug!("adding {} points to primary buffer", new_count);
378 callback(self.primary_buffer.lock());
379 } else if self.secondary_buffer.has_capacity(new_count) {
380 self.primary_handle.thread().unpark();
382 debug!("adding {} points to secondary buffer", new_count);
383 callback(self.secondary_buffer.lock());
384 } else {
385 let buf = if self.primary_buffer < self.secondary_buffer {
386 info!("waiting for primary buffer to flush to append {new_count} points...");
387 self.primary_handle.thread().unpark();
388 &self.primary_buffer
389 } else {
390 info!("waiting for secondary buffer to flush to append {new_count} points...");
391 self.secondary_handle.thread().unpark();
392 &self.secondary_buffer
393 };
394
395 buf.on_notify(callback);
396 }
397 }
398}
399
400pub struct NominalChannelWriter<'ds, T>
401where
402 Vec<T>: IntoPoints,
403{
404 channel: &'ds ChannelDescriptor,
405 stream: &'ds NominalDatasetStream,
406 last_flushed_at: Instant,
407 unflushed: Vec<T>,
408}
409
410impl<T> NominalChannelWriter<'_, T>
411where
412 Vec<T>: IntoPoints,
413{
414 fn new<'ds>(
415 stream: &'ds NominalDatasetStream,
416 channel: &'ds ChannelDescriptor,
417 ) -> NominalChannelWriter<'ds, T> {
418 NominalChannelWriter {
419 channel,
420 stream,
421 last_flushed_at: Instant::now(),
422 unflushed: vec![],
423 }
424 }
425
426 fn push_point(&mut self, point: T) {
427 self.unflushed.push(point);
428 if self.unflushed.len() >= self.stream.opts.max_points_per_record
429 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
430 {
431 debug!(
432 "conditionally flushing {:?}, ({} points, {:?} since last)",
433 self.channel,
434 self.unflushed.len(),
435 self.last_flushed_at.elapsed()
436 );
437 self.flush();
438 }
439 }
440
441 fn flush(&mut self) {
442 if self.unflushed.is_empty() {
443 return;
444 }
445 info!(
446 "flushing writer for {:?} with {} points",
447 self.channel,
448 self.unflushed.len()
449 );
450 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
451 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
452 buf.extend(self.channel, to_flush);
453 self.last_flushed_at = Instant::now();
454 })
455 }
456}
457
458impl<T> Drop for NominalChannelWriter<'_, T>
459where
460 Vec<T>: IntoPoints,
461{
462 fn drop(&mut self) {
463 info!("flushing then dropping writer for: {:?}", self.channel);
464 self.flush();
465 }
466}
467
468pub struct NominalDoubleWriter<'ds> {
469 writer: NominalChannelWriter<'ds, DoublePoint>,
470}
471
472impl NominalDoubleWriter<'_> {
473 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
474 self.writer.push_point(DoublePoint {
475 timestamp: Some(timestamp.into_timestamp()),
476 value,
477 });
478 }
479}
480
481pub struct NominalIntegerWriter<'ds> {
482 writer: NominalChannelWriter<'ds, IntegerPoint>,
483}
484
485impl NominalIntegerWriter<'_> {
486 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
487 self.writer.push_point(IntegerPoint {
488 timestamp: Some(timestamp.into_timestamp()),
489 value,
490 });
491 }
492}
493
494pub struct NominalUint64Writer<'ds> {
495 writer: NominalChannelWriter<'ds, Uint64Point>,
496}
497
498impl NominalUint64Writer<'_> {
499 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
500 self.writer.push_point(Uint64Point {
501 timestamp: Some(timestamp.into_timestamp()),
502 value,
503 });
504 }
505}
506
507pub struct NominalStringWriter<'ds> {
508 writer: NominalChannelWriter<'ds, StringPoint>,
509}
510
511impl NominalStringWriter<'_> {
512 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
513 self.writer.push_point(StringPoint {
514 timestamp: Some(timestamp.into_timestamp()),
515 value: value.into(),
516 });
517 }
518}
519
520struct SeriesBuffer {
521 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
522 count: AtomicUsize,
527 flush_time: AtomicU64,
528 condvar: Condvar,
529 max_capacity: usize,
530}
531
532struct SeriesBufferGuard<'sb> {
533 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
534 count: &'sb AtomicUsize,
535}
536
537impl SeriesBufferGuard<'_> {
538 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
539 let points = points.into_points();
540 let new_point_count = points_len(&points);
541
542 if !self.sb.contains_key(channel_descriptor) {
543 self.sb.insert(channel_descriptor.clone(), points);
544 } else {
545 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
546 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
547 existing.points.extend(new.points)
548 }
549 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
550 existing.points.extend(new.points)
551 }
552 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
553 existing.points.extend(new.points)
554 }
555 (
556 PointsType::ArrayPoints(ArrayPoints {
557 array_type: Some(ArrayType::DoubleArrayPoints(existing)),
558 }),
559 PointsType::ArrayPoints(ArrayPoints {
560 array_type: Some(ArrayType::DoubleArrayPoints(new)),
561 }),
562 ) => existing.points.extend(new.points),
563 (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
564 existing.points.extend(new.points)
565 }
566 (
567 PointsType::ArrayPoints(ArrayPoints {
568 array_type: Some(ArrayType::StringArrayPoints(existing)),
569 }),
570 PointsType::ArrayPoints(ArrayPoints {
571 array_type: Some(ArrayType::StringArrayPoints(new)),
572 }),
573 ) => existing.points.extend(new.points),
574 (
575 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
576 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
577 ) => {}
578 (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
579 existing.points.extend(new.points);
580 }
581 (
583 PointsType::DoublePoints(_),
584 PointsType::IntegerPoints(_)
585 | PointsType::Uint64Points(_)
586 | PointsType::StringPoints(_)
587 | PointsType::ArrayPoints(_)
588 | PointsType::StructPoints(_),
589 )
590 | (
591 PointsType::StringPoints(_),
592 PointsType::DoublePoints(_)
593 | PointsType::IntegerPoints(_)
594 | PointsType::Uint64Points(_)
595 | PointsType::ArrayPoints(_)
596 | PointsType::StructPoints(_),
597 )
598 | (
599 PointsType::IntegerPoints(_),
600 PointsType::DoublePoints(_)
601 | PointsType::Uint64Points(_)
602 | PointsType::StringPoints(_)
603 | PointsType::ArrayPoints(_)
604 | PointsType::StructPoints(_),
605 )
606 | (
607 PointsType::ArrayPoints(_),
608 PointsType::DoublePoints(_)
609 | PointsType::Uint64Points(_)
610 | PointsType::StringPoints(_)
611 | PointsType::IntegerPoints(_)
612 | PointsType::StructPoints(_),
613 )
614 | (
615 PointsType::ArrayPoints(ArrayPoints {
616 array_type: Some(_),
617 }),
618 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
619 )
620 | (
621 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
622 PointsType::ArrayPoints(ArrayPoints {
623 array_type: Some(_),
624 }),
625 )
626 | (
627 PointsType::ArrayPoints(ArrayPoints {
628 array_type: Some(ArrayType::DoubleArrayPoints(_)),
629 }),
630 PointsType::ArrayPoints(ArrayPoints {
631 array_type: Some(ArrayType::StringArrayPoints(_)),
632 }),
633 )
634 | (
635 PointsType::ArrayPoints(ArrayPoints {
636 array_type: Some(ArrayType::StringArrayPoints(_)),
637 }),
638 PointsType::ArrayPoints(ArrayPoints {
639 array_type: Some(ArrayType::DoubleArrayPoints(_)),
640 }),
641 )
642 | (
643 PointsType::Uint64Points(_),
644 PointsType::IntegerPoints(_)
645 | PointsType::StringPoints(_)
646 | PointsType::DoublePoints(_)
647 | PointsType::ArrayPoints(_)
648 | PointsType::StructPoints(_),
649 )
650 | (
651 PointsType::StructPoints(_),
652 PointsType::DoublePoints(_)
653 | PointsType::Uint64Points(_)
654 | PointsType::StringPoints(_)
655 | PointsType::IntegerPoints(_)
656 | PointsType::ArrayPoints(_),
657 ) => {
658 panic!("mismatched types");
660 }
661 }
662 }
663
664 self.count.fetch_add(new_point_count, Ordering::Release);
665 }
666}
667
668impl PartialEq for SeriesBuffer {
669 fn eq(&self, other: &Self) -> bool {
670 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
671 }
672}
673
674impl PartialOrd for SeriesBuffer {
675 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
676 let flush_time = self.flush_time.load(Ordering::Acquire);
677 let other_flush_time = other.flush_time.load(Ordering::Acquire);
678 flush_time.partial_cmp(&other_flush_time)
679 }
680}
681
682impl SeriesBuffer {
683 fn new(capacity: usize) -> Self {
684 Self {
685 points: Mutex::new(HashMap::new()),
686 count: AtomicUsize::new(0),
687 flush_time: AtomicU64::new(0),
688 condvar: Condvar::new(),
689 max_capacity: capacity,
690 }
691 }
692
693 fn has_capacity(&self, new_points_count: usize) -> bool {
698 let count = self.count.load(Ordering::Acquire);
699 count == 0 || count + new_points_count <= self.max_capacity
700 }
701
702 fn lock(&self) -> SeriesBufferGuard<'_> {
703 SeriesBufferGuard {
704 sb: self.points.lock(),
705 count: &self.count,
706 }
707 }
708
709 fn take(&self) -> (usize, Vec<Series>) {
710 let mut points = self.lock();
711 self.flush_time.store(
712 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
713 Ordering::Release,
714 );
715 let result = points
716 .sb
717 .drain()
718 .map(|(ChannelDescriptor { name, tags }, points)| {
719 let channel = Channel { name };
720 let points_obj = Points {
721 points_type: Some(points),
722 };
723 Series {
724 channel: Some(channel),
725 tags: tags
726 .map(|tags| tags.into_iter().collect())
727 .unwrap_or_default(),
728 points: Some(points_obj),
729 }
730 })
731 .collect();
732 let result_count = points
733 .count
734 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
735 .unwrap();
736 (result_count, result)
737 }
738
739 fn is_empty(&self) -> bool {
740 self.count() == 0
741 }
742
743 fn count(&self) -> usize {
744 self.count.load(Ordering::Acquire)
745 }
746
747 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
748 let mut points_lock = self.points.lock();
749 if !points_lock.is_empty() {
752 self.condvar.wait(&mut points_lock);
753 } else {
754 debug!("buffer emptied since last check, skipping condvar wait");
755 }
756 on_notify(SeriesBufferGuard {
757 sb: points_lock,
758 count: &self.count,
759 });
760 }
761
762 fn notify(&self) -> bool {
763 self.condvar.notify_one()
764 }
765}
766
767fn batch_processor(
768 running: Arc<AtomicBool>,
769 points_buffer: Arc<SeriesBuffer>,
770 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
771 max_request_delay: Duration,
772) {
773 loop {
774 debug!("starting processor loop");
775 if points_buffer.is_empty() {
776 if !running.load(Ordering::Acquire) {
777 debug!("batch processor thread exiting due to running flag");
778 drop(request_chan);
779 break;
780 } else {
781 debug!("empty points buffer, waiting");
782 thread::park_timeout(max_request_delay);
783 }
784 continue;
785 }
786 let (point_count, series) = points_buffer.take();
787
788 if points_buffer.notify() {
789 debug!("notified one waiting thread after clearing points buffer");
790 }
791
792 let write_request = WriteRequestNominal { series };
793
794 if request_chan.is_full() {
795 debug!("ready to queue request but request channel is full");
796 }
797 let rep = request_chan.send((write_request, point_count));
798 debug!("queued request for processing");
799 if rep.is_err() {
800 error!("failed to send request to dispatcher");
801 } else {
802 debug!("finished submitting request");
803 }
804
805 thread::park_timeout(max_request_delay);
806 }
807 debug!("batch processor thread exiting");
808}
809
810impl Drop for NominalDatasetStream {
811 fn drop(&mut self) {
812 debug!("starting drop for NominalDatasetStream");
813 self.running.store(false, Ordering::Release);
814 loop {
815 let count = self.unflushed_points.load(Ordering::Acquire);
816 if count == 0 {
817 break;
818 }
819 debug!(
820 "waiting for all points to be flushed before dropping stream, {count} points remaining",
821 );
822 thread::sleep(Duration::from_millis(50));
824 }
825 }
826}
827
828fn request_dispatcher<C: WriteRequestConsumer + 'static>(
829 running: Arc<AtomicBool>,
830 unflushed_points: Arc<AtomicUsize>,
831 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
832 consumer: Arc<C>,
833) {
834 let mut total_request_time = 0;
835 loop {
836 match request_rx.recv() {
837 Ok((request, point_count)) => {
838 debug!("received writerequest from channel");
839 let req_start = Instant::now();
840 match consumer.consume(&request) {
841 Ok(_) => {
842 let time = req_start.elapsed().as_millis();
843 debug!("request of {} points sent in {} ms", point_count, time);
844 total_request_time += time as u64;
845 }
846 Err(e) => {
847 error!("Failed to send request: {e:?}");
848 }
849 }
850 unflushed_points.fetch_sub(point_count, Ordering::Release);
851
852 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
853 {
854 info!("all points flushed, closing dispatcher thread");
855 drop(request_rx);
857 break;
858 }
859 }
860 Err(e) => {
861 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
862 break;
863 }
864 }
865 }
866 debug!(
867 "request dispatcher thread exiting. total request time: {}",
868 total_request_time
869 );
870}
871
872fn points_len(points_type: &PointsType) -> usize {
873 match points_type {
874 PointsType::DoublePoints(points) => points.points.len(),
875 PointsType::StringPoints(points) => points.points.len(),
876 PointsType::IntegerPoints(points) => points.points.len(),
877 PointsType::Uint64Points(points) => points.points.len(),
878 PointsType::ArrayPoints(points) => match &points.array_type {
879 Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
880 Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
881 None => 0,
882 },
883 PointsType::StructPoints(points) => points.points.len(),
884 }
885}