1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::Points;
24use nominal_api::tonic::io::nominal::scout::api::proto::Series;
25use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
27use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
28use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
29use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
30use parking_lot::Condvar;
31use parking_lot::Mutex;
32use parking_lot::MutexGuard;
33use tracing::debug;
34use tracing::error;
35use tracing::info;
36
37use crate::client::NominalApiClients;
38use crate::client::PRODUCTION_API_URL;
39use crate::consumer::AvroFileConsumer;
40use crate::consumer::DualWriteRequestConsumer;
41use crate::consumer::ListeningWriteRequestConsumer;
42use crate::consumer::NominalCoreConsumer;
43use crate::consumer::RequestConsumerWithFallback;
44use crate::consumer::WriteRequestConsumer;
45use crate::listener::LoggingListener;
46use crate::types::ChannelDescriptor;
47use crate::types::IntoPoints;
48use crate::types::IntoTimestamp;
49
50#[derive(Debug, Clone)]
51pub struct NominalStreamOpts {
52 pub max_points_per_record: usize,
53 pub max_request_delay: Duration,
54 pub max_buffered_requests: usize,
55 pub request_dispatcher_tasks: usize,
56 pub base_api_url: String,
57}
58
59impl Default for NominalStreamOpts {
60 fn default() -> Self {
61 Self {
62 max_points_per_record: 250_000,
63 max_request_delay: Duration::from_millis(100),
64 max_buffered_requests: 4,
65 request_dispatcher_tasks: 8,
66 base_api_url: PRODUCTION_API_URL.to_string(),
67 }
68 }
69}
70
71#[derive(Default)]
72pub struct NominalDatasetStreamBuilder {
73 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
74 stream_to_file: Option<PathBuf>,
75 file_fallback: Option<PathBuf>,
76 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
77 opts: NominalStreamOpts,
78}
79
80impl Debug for NominalDatasetStreamBuilder {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("NominalDatasetStreamBuilder")
83 .field("stream_to_core", &self.stream_to_core.is_some())
84 .field("stream_to_file", &self.stream_to_file)
85 .field("file_fallback", &self.file_fallback)
86 .field("listeners", &self.listeners.len())
87 .finish()
88 }
89}
90
91impl NominalDatasetStreamBuilder {
92 pub fn new() -> Self {
93 Self::default()
94 }
95}
96
97impl NominalDatasetStreamBuilder {
98 pub fn stream_to_core(
99 self,
100 bearer_token: BearerToken,
101 dataset: ResourceIdentifier,
102 handle: tokio::runtime::Handle,
103 ) -> NominalDatasetStreamBuilder {
104 NominalDatasetStreamBuilder {
105 stream_to_core: Some((bearer_token, dataset, handle)),
106 stream_to_file: self.stream_to_file,
107 file_fallback: self.file_fallback,
108 listeners: self.listeners,
109 opts: self.opts,
110 }
111 }
112
113 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
114 self.stream_to_file = Some(file_path.into());
115 self
116 }
117
118 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
119 self.file_fallback = Some(file_path.into());
120 self
121 }
122
123 pub fn add_listener(
124 mut self,
125 listener: Arc<dyn crate::listener::NominalStreamListener>,
126 ) -> Self {
127 self.listeners.push(listener);
128 self
129 }
130
131 pub fn with_listeners(
132 mut self,
133 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
134 ) -> Self {
135 self.listeners = listeners;
136 self
137 }
138
139 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
140 self.opts = opts;
141 self
142 }
143
144 #[cfg(feature = "logging")]
145 fn init_logging(self, directive: Option<&str>) -> Self {
146 use tracing_subscriber::layer::SubscriberExt;
147 use tracing_subscriber::util::SubscriberInitExt;
148
149 let base = tracing_subscriber::EnvFilter::builder()
151 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
152 let env_filter = match directive {
153 Some(d) => base.parse_lossy(d),
154 None => base.from_env_lossy(),
155 };
156
157 let subscriber = tracing_subscriber::registry()
158 .with(
159 tracing_subscriber::fmt::layer()
160 .with_thread_ids(true)
161 .with_thread_names(true)
162 .with_line_number(true),
163 )
164 .with(env_filter);
165
166 if let Err(error) = subscriber.try_init() {
167 eprintln!("nominal streaming failed to enable logging: {error}");
168 }
169
170 self
171 }
172
173 #[cfg(feature = "logging")]
174 pub fn enable_logging(self) -> Self {
175 self.init_logging(None)
176 }
177
178 #[cfg(feature = "logging")]
179 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
180 self.init_logging(Some(log_directive))
181 }
182
183 pub fn build(self) -> NominalDatasetStream {
184 let core_consumer = self.core_consumer();
185 let file_consumer = self.file_consumer();
186 let fallback_consumer = self.fallback_consumer();
187
188 match (core_consumer, file_consumer, fallback_consumer) {
189 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
190 (Some(_), Some(_), Some(_)) => {
191 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
192 }
193 (Some(core), None, None) => self.into_stream(core),
194 (Some(core), None, Some(fallback)) => {
195 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
196 }
197 (None, Some(file), None) => self.into_stream(file),
198 (None, Some(file), Some(fallback)) => {
199 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
201 }
202 (Some(core), Some(file), None) => {
203 self.into_stream(DualWriteRequestConsumer::new(core, file))
204 }
205 }
206 }
207
208 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
209 self.stream_to_core
210 .as_ref()
211 .map(|(auth_provider, dataset, handle)| {
212 NominalCoreConsumer::new(
213 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
214 handle.clone(),
215 auth_provider.clone(),
216 dataset.clone(),
217 )
218 })
219 }
220
221 fn dataset_rid(&self) -> Option<ResourceIdentifier> {
222 self.stream_to_core.as_ref().map(|(_, rid, _)| rid.clone())
223 }
224
225 fn file_consumer(&self) -> Option<AvroFileConsumer> {
226 self.stream_to_file.as_ref().map(|path| {
227 AvroFileConsumer::new_with_full_path(path, true, self.dataset_rid()).unwrap()
228 })
229 }
230
231 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
232 self.file_fallback.as_ref().map(|path| {
233 AvroFileConsumer::new_with_full_path(path, true, self.dataset_rid()).unwrap()
234 })
235 }
236
237 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
238 let mut listeners = self.listeners;
239 listeners.push(Arc::new(LoggingListener));
240 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
241 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
242 }
243}
244
245#[deprecated]
247pub type NominalDatasourceStream = NominalDatasetStream;
248
249pub struct NominalDatasetStream {
250 opts: NominalStreamOpts,
251 running: Arc<AtomicBool>,
252 unflushed_points: Arc<AtomicUsize>,
253 primary_buffer: Arc<SeriesBuffer>,
254 secondary_buffer: Arc<SeriesBuffer>,
255 primary_handle: thread::JoinHandle<()>,
256 secondary_handle: thread::JoinHandle<()>,
257 #[cfg(feature = "instrument")]
263 pub batch_processor_ns: Arc<AtomicU64>,
264 #[cfg(feature = "instrument")]
270 pub dispatcher_ns: Arc<AtomicU64>,
271}
272
273impl NominalDatasetStream {
274 pub fn builder() -> NominalDatasetStreamBuilder {
275 NominalDatasetStreamBuilder::new()
276 }
277
278 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
279 consumer: C,
280 opts: NominalStreamOpts,
281 ) -> Self {
282 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
283 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
284
285 let (request_tx, request_rx) =
286 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
287
288 let running = Arc::new(AtomicBool::new(true));
289 let unflushed_points = Arc::new(AtomicUsize::new(0));
290
291 #[cfg(feature = "instrument")]
292 let batch_processor_ns = Arc::new(AtomicU64::new(0));
293 #[cfg(feature = "instrument")]
294 let dispatcher_ns = Arc::new(AtomicU64::new(0));
295
296 let primary_handle = thread::Builder::new()
297 .name("nmstream_primary".to_string())
298 .spawn({
299 let points_buffer = Arc::clone(&primary_buffer);
300 let running = running.clone();
301 let tx = request_tx.clone();
302 #[cfg(feature = "instrument")]
303 let bp_ns = Arc::clone(&batch_processor_ns);
304 move || {
305 batch_processor(
306 running,
307 points_buffer,
308 tx,
309 opts.max_request_delay,
310 #[cfg(feature = "instrument")]
311 bp_ns,
312 );
313 }
314 })
315 .unwrap();
316
317 let secondary_handle = thread::Builder::new()
318 .name("nmstream_secondary".to_string())
319 .spawn({
320 let secondary_buffer = Arc::clone(&secondary_buffer);
321 let running = running.clone();
322 #[cfg(feature = "instrument")]
323 let bp_ns = Arc::clone(&batch_processor_ns);
324 move || {
325 batch_processor(
326 running,
327 secondary_buffer,
328 request_tx,
329 opts.max_request_delay,
330 #[cfg(feature = "instrument")]
331 bp_ns,
332 );
333 }
334 })
335 .unwrap();
336
337 let consumer = Arc::new(consumer);
338
339 for i in 0..opts.request_dispatcher_tasks {
340 thread::Builder::new()
341 .name(format!("nmstream_dispatch_{i}"))
342 .spawn({
343 let running = Arc::clone(&running);
344 let unflushed_points = Arc::clone(&unflushed_points);
345 let rx = request_rx.clone();
346 let consumer = consumer.clone();
347 #[cfg(feature = "instrument")]
348 let disp_ns = Arc::clone(&dispatcher_ns);
349 move || {
350 debug!("starting request dispatcher #{}", i);
351 request_dispatcher(
352 running,
353 unflushed_points,
354 rx,
355 consumer,
356 #[cfg(feature = "instrument")]
357 disp_ns,
358 );
359 }
360 })
361 .unwrap();
362 }
363
364 NominalDatasetStream {
365 opts,
366 running,
367 unflushed_points,
368 primary_buffer,
369 secondary_buffer,
370 primary_handle,
371 secondary_handle,
372 #[cfg(feature = "instrument")]
373 batch_processor_ns,
374 #[cfg(feature = "instrument")]
375 dispatcher_ns,
376 }
377 }
378
379 pub fn double_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalDoubleWriter<'_> {
380 NominalDoubleWriter {
381 writer: NominalChannelWriter::new(self, channel_descriptor),
382 }
383 }
384
385 pub fn string_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStringWriter<'_> {
386 NominalStringWriter {
387 writer: NominalChannelWriter::new(self, channel_descriptor),
388 }
389 }
390
391 pub fn integer_writer(
392 &self,
393 channel_descriptor: ChannelDescriptor,
394 ) -> NominalIntegerWriter<'_> {
395 NominalIntegerWriter {
396 writer: NominalChannelWriter::new(self, channel_descriptor),
397 }
398 }
399
400 pub fn uint64_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalUint64Writer<'_> {
401 NominalUint64Writer {
402 writer: NominalChannelWriter::new(self, channel_descriptor),
403 }
404 }
405
406 pub fn struct_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStructWriter<'_> {
407 NominalStructWriter {
408 writer: NominalChannelWriter::new(self, channel_descriptor),
409 }
410 }
411
412 pub fn double_array_writer(
413 &self,
414 channel_descriptor: ChannelDescriptor,
415 ) -> NominalDoubleArrayWriter<'_> {
416 NominalDoubleArrayWriter {
417 writer: NominalChannelWriter::new(self, channel_descriptor),
418 }
419 }
420
421 pub fn string_array_writer(
422 &self,
423 channel_descriptor: ChannelDescriptor,
424 ) -> NominalStringArrayWriter<'_> {
425 NominalStringArrayWriter {
426 writer: NominalChannelWriter::new(self, channel_descriptor),
427 }
428 }
429
430 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
431 let new_points = new_points.into_points();
432 let new_count = points_len(&new_points);
433
434 self.when_capacity(new_count, |mut sb| {
435 sb.extend(channel_descriptor, new_points)
436 });
437 }
438
439 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
440 self.unflushed_points
441 .fetch_add(new_count, Ordering::Release);
442
443 if self.primary_buffer.has_capacity(new_count) {
444 debug!("adding {} points to primary buffer", new_count);
445 callback(self.primary_buffer.lock());
446 } else if self.secondary_buffer.has_capacity(new_count) {
447 self.primary_handle.thread().unpark();
449 debug!("adding {} points to secondary buffer", new_count);
450 callback(self.secondary_buffer.lock());
451 } else {
452 let buf = if self.primary_buffer < self.secondary_buffer {
453 info!("waiting for primary buffer to flush to append {new_count} points...");
454 self.primary_handle.thread().unpark();
455 &self.primary_buffer
456 } else {
457 info!("waiting for secondary buffer to flush to append {new_count} points...");
458 self.secondary_handle.thread().unpark();
459 &self.secondary_buffer
460 };
461
462 buf.on_notify(callback);
463 }
464 }
465}
466
467pub struct NominalChannelWriter<'ds, T>
468where
469 Vec<T>: IntoPoints,
470{
471 channel: ChannelDescriptor,
472 stream: &'ds NominalDatasetStream,
473 last_flushed_at: Instant,
474 unflushed: Vec<T>,
475}
476
477impl<T> NominalChannelWriter<'_, T>
478where
479 Vec<T>: IntoPoints,
480{
481 fn new(
482 stream: &NominalDatasetStream,
483 channel: ChannelDescriptor,
484 ) -> NominalChannelWriter<'_, T> {
485 NominalChannelWriter {
486 channel,
487 stream,
488 last_flushed_at: Instant::now(),
489 unflushed: vec![],
490 }
491 }
492
493 fn push_point(&mut self, point: T) {
494 self.unflushed.push(point);
495 if self.unflushed.len() >= self.stream.opts.max_points_per_record
496 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
497 {
498 debug!(
499 "conditionally flushing {:?}, ({} points, {:?} since last)",
500 self.channel,
501 self.unflushed.len(),
502 self.last_flushed_at.elapsed()
503 );
504 self.flush();
505 }
506 }
507
508 fn flush(&mut self) {
509 if self.unflushed.is_empty() {
510 return;
511 }
512 info!(
513 "flushing writer for {:?} with {} points",
514 self.channel,
515 self.unflushed.len()
516 );
517 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
518 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
519 buf.extend(&self.channel, to_flush);
520 self.last_flushed_at = Instant::now();
521 })
522 }
523}
524
525impl<T> Drop for NominalChannelWriter<'_, T>
526where
527 Vec<T>: IntoPoints,
528{
529 fn drop(&mut self) {
530 info!("flushing then dropping writer for: {:?}", self.channel);
531 self.flush();
532 }
533}
534
535pub struct NominalDoubleWriter<'ds> {
536 writer: NominalChannelWriter<'ds, DoublePoint>,
537}
538
539impl NominalDoubleWriter<'_> {
540 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
541 self.writer.push_point(DoublePoint {
542 timestamp: Some(timestamp.into_timestamp()),
543 value,
544 });
545 }
546}
547
548pub struct NominalIntegerWriter<'ds> {
549 writer: NominalChannelWriter<'ds, IntegerPoint>,
550}
551
552impl NominalIntegerWriter<'_> {
553 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
554 self.writer.push_point(IntegerPoint {
555 timestamp: Some(timestamp.into_timestamp()),
556 value,
557 });
558 }
559}
560
561pub struct NominalUint64Writer<'ds> {
562 writer: NominalChannelWriter<'ds, Uint64Point>,
563}
564
565impl NominalUint64Writer<'_> {
566 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
567 self.writer.push_point(Uint64Point {
568 timestamp: Some(timestamp.into_timestamp()),
569 value,
570 });
571 }
572}
573
574pub struct NominalStringWriter<'ds> {
575 writer: NominalChannelWriter<'ds, StringPoint>,
576}
577
578impl NominalStringWriter<'_> {
579 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
580 self.writer.push_point(StringPoint {
581 timestamp: Some(timestamp.into_timestamp()),
582 value: value.into(),
583 });
584 }
585}
586
587pub struct NominalStructWriter<'ds> {
588 writer: NominalChannelWriter<'ds, StructPoint>,
589}
590
591impl NominalStructWriter<'_> {
592 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
593 self.writer.push_point(StructPoint {
594 timestamp: Some(timestamp.into_timestamp()),
595 json_string: value.into(),
596 });
597 }
598}
599
600pub struct NominalDoubleArrayWriter<'ds> {
601 writer: NominalChannelWriter<'ds, DoubleArrayPoint>,
602}
603
604impl NominalDoubleArrayWriter<'_> {
605 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: Vec<f64>) {
606 self.writer.push_point(DoubleArrayPoint {
607 timestamp: Some(timestamp.into_timestamp()),
608 value,
609 });
610 }
611}
612
613pub struct NominalStringArrayWriter<'ds> {
614 writer: NominalChannelWriter<'ds, StringArrayPoint>,
615}
616
617impl NominalStringArrayWriter<'_> {
618 pub fn push(
619 &mut self,
620 timestamp: impl IntoTimestamp,
621 value: impl IntoIterator<Item = impl Into<String>>,
622 ) {
623 self.writer.push_point(StringArrayPoint {
624 timestamp: Some(timestamp.into_timestamp()),
625 value: value.into_iter().map(Into::into).collect(),
626 });
627 }
628}
629
630struct SeriesBuffer {
631 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
632 count: AtomicUsize,
637 flush_time: AtomicU64,
638 condvar: Condvar,
639 max_capacity: usize,
640}
641
642struct SeriesBufferGuard<'sb> {
643 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
644 count: &'sb AtomicUsize,
645}
646
647impl SeriesBufferGuard<'_> {
648 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
649 let points = points.into_points();
650 let new_point_count = points_len(&points);
651
652 if !self.sb.contains_key(channel_descriptor) {
653 self.sb.insert(channel_descriptor.clone(), points);
654 } else {
655 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
656 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
657 existing.points.extend(new.points)
658 }
659 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
660 existing.points.extend(new.points)
661 }
662 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
663 existing.points.extend(new.points)
664 }
665 (
666 PointsType::ArrayPoints(ArrayPoints {
667 array_type: Some(ArrayType::DoubleArrayPoints(existing)),
668 }),
669 PointsType::ArrayPoints(ArrayPoints {
670 array_type: Some(ArrayType::DoubleArrayPoints(new)),
671 }),
672 ) => existing.points.extend(new.points),
673 (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
674 existing.points.extend(new.points)
675 }
676 (
677 PointsType::ArrayPoints(ArrayPoints {
678 array_type: Some(ArrayType::StringArrayPoints(existing)),
679 }),
680 PointsType::ArrayPoints(ArrayPoints {
681 array_type: Some(ArrayType::StringArrayPoints(new)),
682 }),
683 ) => existing.points.extend(new.points),
684 (
685 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
686 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
687 ) => {}
688 (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
689 existing.points.extend(new.points);
690 }
691 (
693 PointsType::DoublePoints(_),
694 PointsType::IntegerPoints(_)
695 | PointsType::Uint64Points(_)
696 | PointsType::StringPoints(_)
697 | PointsType::ArrayPoints(_)
698 | PointsType::StructPoints(_),
699 )
700 | (
701 PointsType::StringPoints(_),
702 PointsType::DoublePoints(_)
703 | PointsType::IntegerPoints(_)
704 | PointsType::Uint64Points(_)
705 | PointsType::ArrayPoints(_)
706 | PointsType::StructPoints(_),
707 )
708 | (
709 PointsType::IntegerPoints(_),
710 PointsType::DoublePoints(_)
711 | PointsType::Uint64Points(_)
712 | PointsType::StringPoints(_)
713 | PointsType::ArrayPoints(_)
714 | PointsType::StructPoints(_),
715 )
716 | (
717 PointsType::ArrayPoints(_),
718 PointsType::DoublePoints(_)
719 | PointsType::Uint64Points(_)
720 | PointsType::StringPoints(_)
721 | PointsType::IntegerPoints(_)
722 | PointsType::StructPoints(_),
723 )
724 | (
725 PointsType::ArrayPoints(ArrayPoints {
726 array_type: Some(_),
727 }),
728 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
729 )
730 | (
731 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
732 PointsType::ArrayPoints(ArrayPoints {
733 array_type: Some(_),
734 }),
735 )
736 | (
737 PointsType::ArrayPoints(ArrayPoints {
738 array_type: Some(ArrayType::DoubleArrayPoints(_)),
739 }),
740 PointsType::ArrayPoints(ArrayPoints {
741 array_type: Some(ArrayType::StringArrayPoints(_)),
742 }),
743 )
744 | (
745 PointsType::ArrayPoints(ArrayPoints {
746 array_type: Some(ArrayType::StringArrayPoints(_)),
747 }),
748 PointsType::ArrayPoints(ArrayPoints {
749 array_type: Some(ArrayType::DoubleArrayPoints(_)),
750 }),
751 )
752 | (
753 PointsType::Uint64Points(_),
754 PointsType::IntegerPoints(_)
755 | PointsType::StringPoints(_)
756 | PointsType::DoublePoints(_)
757 | PointsType::ArrayPoints(_)
758 | PointsType::StructPoints(_),
759 )
760 | (
761 PointsType::StructPoints(_),
762 PointsType::DoublePoints(_)
763 | PointsType::Uint64Points(_)
764 | PointsType::StringPoints(_)
765 | PointsType::IntegerPoints(_)
766 | PointsType::ArrayPoints(_),
767 ) => {
768 panic!("mismatched types");
770 }
771 }
772 }
773
774 self.count.fetch_add(new_point_count, Ordering::Release);
775 }
776}
777
778impl PartialEq for SeriesBuffer {
779 fn eq(&self, other: &Self) -> bool {
780 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
781 }
782}
783
784impl PartialOrd for SeriesBuffer {
785 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
786 let flush_time = self.flush_time.load(Ordering::Acquire);
787 let other_flush_time = other.flush_time.load(Ordering::Acquire);
788 flush_time.partial_cmp(&other_flush_time)
789 }
790}
791
792impl SeriesBuffer {
793 fn new(capacity: usize) -> Self {
794 Self {
795 points: Mutex::new(HashMap::new()),
796 count: AtomicUsize::new(0),
797 flush_time: AtomicU64::new(0),
798 condvar: Condvar::new(),
799 max_capacity: capacity,
800 }
801 }
802
803 fn has_capacity(&self, new_points_count: usize) -> bool {
808 let count = self.count.load(Ordering::Acquire);
809 count == 0 || count + new_points_count <= self.max_capacity
810 }
811
812 fn lock(&self) -> SeriesBufferGuard<'_> {
813 SeriesBufferGuard {
814 sb: self.points.lock(),
815 count: &self.count,
816 }
817 }
818
819 fn take(&self) -> (usize, Vec<Series>) {
820 let mut points = self.lock();
821 self.flush_time.store(
822 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
823 Ordering::Release,
824 );
825 let result = points
826 .sb
827 .drain()
828 .map(|(ChannelDescriptor { name, tags }, points)| {
829 let channel = Channel { name };
830 let points_obj = Points {
831 points_type: Some(points),
832 };
833 Series {
834 channel: Some(channel),
835 tags: tags
836 .map(|tags| tags.into_iter().collect())
837 .unwrap_or_default(),
838 points: Some(points_obj),
839 }
840 })
841 .collect();
842 let result_count = points
843 .count
844 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
845 .unwrap();
846 (result_count, result)
847 }
848
849 fn is_empty(&self) -> bool {
850 self.count() == 0
851 }
852
853 fn count(&self) -> usize {
854 self.count.load(Ordering::Acquire)
855 }
856
857 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
858 let mut points_lock = self.points.lock();
859 if !points_lock.is_empty() {
862 self.condvar.wait(&mut points_lock);
863 } else {
864 debug!("buffer emptied since last check, skipping condvar wait");
865 }
866 on_notify(SeriesBufferGuard {
867 sb: points_lock,
868 count: &self.count,
869 });
870 }
871
872 fn notify(&self) -> bool {
873 self.condvar.notify_one()
874 }
875}
876
877fn batch_processor(
878 running: Arc<AtomicBool>,
879 points_buffer: Arc<SeriesBuffer>,
880 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
881 max_request_delay: Duration,
882 #[cfg(feature = "instrument")] bp_ns: Arc<AtomicU64>,
883) {
884 loop {
885 debug!("starting processor loop");
886 if points_buffer.is_empty() {
887 if !running.load(Ordering::Acquire) {
888 debug!("batch processor thread exiting due to running flag");
889 drop(request_chan);
890 break;
891 } else {
892 debug!("empty points buffer, waiting");
893 thread::park_timeout(max_request_delay);
894 }
895 continue;
896 }
897
898 #[cfg(feature = "instrument")]
899 let t = Instant::now();
900
901 let (point_count, series) = points_buffer.take();
902
903 if points_buffer.notify() {
904 debug!("notified one waiting thread after clearing points buffer");
905 }
906
907 let write_request = WriteRequestNominal {
908 series,
909 session_name: None,
910 };
911
912 if request_chan.is_full() {
913 debug!("ready to queue request but request channel is full");
914 }
915 let rep = request_chan.send((write_request, point_count));
916 debug!("queued request for processing");
917 if rep.is_err() {
918 error!("failed to send request to dispatcher");
919 } else {
920 debug!("finished submitting request");
921 }
922
923 #[cfg(feature = "instrument")]
924 bp_ns.fetch_add(t.elapsed().as_nanos() as u64, Ordering::Relaxed);
925
926 thread::park_timeout(max_request_delay);
927 }
928 debug!("batch processor thread exiting");
929}
930
931impl Drop for NominalDatasetStream {
932 fn drop(&mut self) {
933 debug!("starting drop for NominalDatasetStream");
934 self.running.store(false, Ordering::Release);
935 loop {
936 let count = self.unflushed_points.load(Ordering::Acquire);
937 if count == 0 {
938 break;
939 }
940 debug!(
941 "waiting for all points to be flushed before dropping stream, {count} points remaining",
942 );
943 thread::sleep(Duration::from_millis(50));
945 }
946 }
947}
948
949fn request_dispatcher<C: WriteRequestConsumer + 'static>(
950 running: Arc<AtomicBool>,
951 unflushed_points: Arc<AtomicUsize>,
952 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
953 consumer: Arc<C>,
954 #[cfg(feature = "instrument")] disp_ns: Arc<AtomicU64>,
955) {
956 let mut total_request_time = 0;
957 loop {
958 match request_rx.recv() {
959 Ok((request, point_count)) => {
960 debug!("received writerequest from channel");
961 let req_start = Instant::now();
962 match consumer.consume(&request) {
963 Ok(_) => {
964 let time = req_start.elapsed().as_millis();
965 debug!("request of {} points sent in {} ms", point_count, time);
966 total_request_time += time as u64;
967 }
968 Err(e) => {
969 error!("Failed to send request: {e:?}");
970 }
971 }
972 #[cfg(feature = "instrument")]
973 disp_ns.fetch_add(req_start.elapsed().as_nanos() as u64, Ordering::Relaxed);
974 unflushed_points.fetch_sub(point_count, Ordering::Release);
975
976 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
977 {
978 info!("all points flushed, closing dispatcher thread");
979 drop(request_rx);
981 break;
982 }
983 }
984 Err(e) => {
985 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
986 break;
987 }
988 }
989 }
990 debug!(
991 "request dispatcher thread exiting. total request time: {}",
992 total_request_time
993 );
994}
995
996fn points_len(points_type: &PointsType) -> usize {
997 match points_type {
998 PointsType::DoublePoints(points) => points.points.len(),
999 PointsType::StringPoints(points) => points.points.len(),
1000 PointsType::IntegerPoints(points) => points.points.len(),
1001 PointsType::Uint64Points(points) => points.points.len(),
1002 PointsType::ArrayPoints(points) => match &points.array_type {
1003 Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
1004 Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
1005 None => 0,
1006 },
1007 PointsType::StructPoints(points) => points.points.len(),
1008 }
1009}