1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::Points;
24use nominal_api::tonic::io::nominal::scout::api::proto::Series;
25use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
27use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
28use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
29use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
30use parking_lot::Condvar;
31use parking_lot::Mutex;
32use parking_lot::MutexGuard;
33use tracing::debug;
34use tracing::error;
35use tracing::info;
36
37use crate::client::NominalApiClients;
38use crate::client::PRODUCTION_API_URL;
39use crate::consumer::AvroFileConsumer;
40use crate::consumer::DualWriteRequestConsumer;
41use crate::consumer::ListeningWriteRequestConsumer;
42use crate::consumer::NominalCoreConsumer;
43use crate::consumer::RequestConsumerWithFallback;
44use crate::consumer::WriteRequestConsumer;
45use crate::listener::LoggingListener;
46use crate::types::ChannelDescriptor;
47use crate::types::IntoPoints;
48use crate::types::IntoTimestamp;
49
50#[derive(Debug, Clone)]
51pub struct NominalStreamOpts {
52 pub max_points_per_record: usize,
53 pub max_request_delay: Duration,
54 pub max_buffered_requests: usize,
55 pub request_dispatcher_tasks: usize,
56 pub base_api_url: String,
57}
58
59impl Default for NominalStreamOpts {
60 fn default() -> Self {
61 Self {
62 max_points_per_record: 250_000,
63 max_request_delay: Duration::from_millis(100),
64 max_buffered_requests: 4,
65 request_dispatcher_tasks: 8,
66 base_api_url: PRODUCTION_API_URL.to_string(),
67 }
68 }
69}
70
71#[derive(Default)]
72pub struct NominalDatasetStreamBuilder {
73 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
74 stream_to_file: Option<PathBuf>,
75 file_fallback: Option<PathBuf>,
76 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
77 opts: NominalStreamOpts,
78}
79
80impl Debug for NominalDatasetStreamBuilder {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("NominalDatasetStreamBuilder")
83 .field("stream_to_core", &self.stream_to_core.is_some())
84 .field("stream_to_file", &self.stream_to_file)
85 .field("file_fallback", &self.file_fallback)
86 .field("listeners", &self.listeners.len())
87 .finish()
88 }
89}
90
91impl NominalDatasetStreamBuilder {
92 pub fn new() -> Self {
93 Self::default()
94 }
95}
96
97impl NominalDatasetStreamBuilder {
98 pub fn stream_to_core(
99 self,
100 bearer_token: BearerToken,
101 dataset: ResourceIdentifier,
102 handle: tokio::runtime::Handle,
103 ) -> NominalDatasetStreamBuilder {
104 NominalDatasetStreamBuilder {
105 stream_to_core: Some((bearer_token, dataset, handle)),
106 stream_to_file: self.stream_to_file,
107 file_fallback: self.file_fallback,
108 listeners: self.listeners,
109 opts: self.opts,
110 }
111 }
112
113 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
114 self.stream_to_file = Some(file_path.into());
115 self
116 }
117
118 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
119 self.file_fallback = Some(file_path.into());
120 self
121 }
122
123 pub fn add_listener(
124 mut self,
125 listener: Arc<dyn crate::listener::NominalStreamListener>,
126 ) -> Self {
127 self.listeners.push(listener);
128 self
129 }
130
131 pub fn with_listeners(
132 mut self,
133 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
134 ) -> Self {
135 self.listeners = listeners;
136 self
137 }
138
139 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
140 self.opts = opts;
141 self
142 }
143
144 #[cfg(feature = "logging")]
145 fn init_logging(self, directive: Option<&str>) -> Self {
146 use tracing_subscriber::layer::SubscriberExt;
147 use tracing_subscriber::util::SubscriberInitExt;
148
149 let base = tracing_subscriber::EnvFilter::builder()
151 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
152 let env_filter = match directive {
153 Some(d) => base.parse_lossy(d),
154 None => base.from_env_lossy(),
155 };
156
157 let subscriber = tracing_subscriber::registry()
158 .with(
159 tracing_subscriber::fmt::layer()
160 .with_thread_ids(true)
161 .with_thread_names(true)
162 .with_line_number(true),
163 )
164 .with(env_filter);
165
166 if let Err(error) = subscriber.try_init() {
167 eprintln!("nominal streaming failed to enable logging: {error}");
168 }
169
170 self
171 }
172
173 #[cfg(feature = "logging")]
174 pub fn enable_logging(self) -> Self {
175 self.init_logging(None)
176 }
177
178 #[cfg(feature = "logging")]
179 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
180 self.init_logging(Some(log_directive))
181 }
182
183 pub fn build(self) -> NominalDatasetStream {
184 let core_consumer = self.core_consumer();
185 let file_consumer = self.file_consumer();
186 let fallback_consumer = self.fallback_consumer();
187
188 match (core_consumer, file_consumer, fallback_consumer) {
189 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
190 (Some(_), Some(_), Some(_)) => {
191 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
192 }
193 (Some(core), None, None) => self.into_stream(core),
194 (Some(core), None, Some(fallback)) => {
195 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
196 }
197 (None, Some(file), None) => self.into_stream(file),
198 (None, Some(file), Some(fallback)) => {
199 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
201 }
202 (Some(core), Some(file), None) => {
203 self.into_stream(DualWriteRequestConsumer::new(core, file))
204 }
205 }
206 }
207
208 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
209 self.stream_to_core
210 .as_ref()
211 .map(|(auth_provider, dataset, handle)| {
212 NominalCoreConsumer::new(
213 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
214 handle.clone(),
215 auth_provider.clone(),
216 dataset.clone(),
217 )
218 })
219 }
220
221 fn file_consumer(&self) -> Option<AvroFileConsumer> {
222 self.stream_to_file
223 .as_ref()
224 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
225 }
226
227 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
228 self.file_fallback
229 .as_ref()
230 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
231 }
232
233 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
234 let mut listeners = self.listeners;
235 listeners.push(Arc::new(LoggingListener));
236 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
237 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
238 }
239}
240
241#[deprecated]
243pub type NominalDatasourceStream = NominalDatasetStream;
244
245pub struct NominalDatasetStream {
246 opts: NominalStreamOpts,
247 running: Arc<AtomicBool>,
248 unflushed_points: Arc<AtomicUsize>,
249 primary_buffer: Arc<SeriesBuffer>,
250 secondary_buffer: Arc<SeriesBuffer>,
251 primary_handle: thread::JoinHandle<()>,
252 secondary_handle: thread::JoinHandle<()>,
253}
254
255impl NominalDatasetStream {
256 pub fn builder() -> NominalDatasetStreamBuilder {
257 NominalDatasetStreamBuilder::new()
258 }
259
260 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
261 consumer: C,
262 opts: NominalStreamOpts,
263 ) -> Self {
264 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
265 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
266
267 let (request_tx, request_rx) =
268 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
269
270 let running = Arc::new(AtomicBool::new(true));
271 let unflushed_points = Arc::new(AtomicUsize::new(0));
272
273 let primary_handle = thread::Builder::new()
274 .name("nmstream_primary".to_string())
275 .spawn({
276 let points_buffer = Arc::clone(&primary_buffer);
277 let running = running.clone();
278 let tx = request_tx.clone();
279 move || {
280 batch_processor(running, points_buffer, tx, opts.max_request_delay);
281 }
282 })
283 .unwrap();
284
285 let secondary_handle = thread::Builder::new()
286 .name("nmstream_secondary".to_string())
287 .spawn({
288 let secondary_buffer = Arc::clone(&secondary_buffer);
289 let running = running.clone();
290 move || {
291 batch_processor(
292 running,
293 secondary_buffer,
294 request_tx,
295 opts.max_request_delay,
296 );
297 }
298 })
299 .unwrap();
300
301 let consumer = Arc::new(consumer);
302
303 for i in 0..opts.request_dispatcher_tasks {
304 thread::Builder::new()
305 .name(format!("nmstream_dispatch_{i}"))
306 .spawn({
307 let running = Arc::clone(&running);
308 let unflushed_points = Arc::clone(&unflushed_points);
309 let rx = request_rx.clone();
310 let consumer = consumer.clone();
311 move || {
312 debug!("starting request dispatcher #{}", i);
313 request_dispatcher(running, unflushed_points, rx, consumer);
314 }
315 })
316 .unwrap();
317 }
318
319 NominalDatasetStream {
320 opts,
321 running,
322 unflushed_points,
323 primary_buffer,
324 secondary_buffer,
325 primary_handle,
326 secondary_handle,
327 }
328 }
329
330 pub fn double_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalDoubleWriter<'_> {
331 NominalDoubleWriter {
332 writer: NominalChannelWriter::new(self, channel_descriptor),
333 }
334 }
335
336 pub fn string_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStringWriter<'_> {
337 NominalStringWriter {
338 writer: NominalChannelWriter::new(self, channel_descriptor),
339 }
340 }
341
342 pub fn integer_writer(
343 &self,
344 channel_descriptor: ChannelDescriptor,
345 ) -> NominalIntegerWriter<'_> {
346 NominalIntegerWriter {
347 writer: NominalChannelWriter::new(self, channel_descriptor),
348 }
349 }
350
351 pub fn uint64_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalUint64Writer<'_> {
352 NominalUint64Writer {
353 writer: NominalChannelWriter::new(self, channel_descriptor),
354 }
355 }
356
357 pub fn struct_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStructWriter<'_> {
358 NominalStructWriter {
359 writer: NominalChannelWriter::new(self, channel_descriptor),
360 }
361 }
362
363 pub fn double_array_writer(
364 &self,
365 channel_descriptor: ChannelDescriptor,
366 ) -> NominalDoubleArrayWriter<'_> {
367 NominalDoubleArrayWriter {
368 writer: NominalChannelWriter::new(self, channel_descriptor),
369 }
370 }
371
372 pub fn string_array_writer(
373 &self,
374 channel_descriptor: ChannelDescriptor,
375 ) -> NominalStringArrayWriter<'_> {
376 NominalStringArrayWriter {
377 writer: NominalChannelWriter::new(self, channel_descriptor),
378 }
379 }
380
381 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
382 let new_points = new_points.into_points();
383 let new_count = points_len(&new_points);
384
385 self.when_capacity(new_count, |mut sb| {
386 sb.extend(channel_descriptor, new_points)
387 });
388 }
389
390 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
391 self.unflushed_points
392 .fetch_add(new_count, Ordering::Release);
393
394 if self.primary_buffer.has_capacity(new_count) {
395 debug!("adding {} points to primary buffer", new_count);
396 callback(self.primary_buffer.lock());
397 } else if self.secondary_buffer.has_capacity(new_count) {
398 self.primary_handle.thread().unpark();
400 debug!("adding {} points to secondary buffer", new_count);
401 callback(self.secondary_buffer.lock());
402 } else {
403 let buf = if self.primary_buffer < self.secondary_buffer {
404 info!("waiting for primary buffer to flush to append {new_count} points...");
405 self.primary_handle.thread().unpark();
406 &self.primary_buffer
407 } else {
408 info!("waiting for secondary buffer to flush to append {new_count} points...");
409 self.secondary_handle.thread().unpark();
410 &self.secondary_buffer
411 };
412
413 buf.on_notify(callback);
414 }
415 }
416}
417
418pub struct NominalChannelWriter<'ds, T>
419where
420 Vec<T>: IntoPoints,
421{
422 channel: ChannelDescriptor,
423 stream: &'ds NominalDatasetStream,
424 last_flushed_at: Instant,
425 unflushed: Vec<T>,
426}
427
428impl<T> NominalChannelWriter<'_, T>
429where
430 Vec<T>: IntoPoints,
431{
432 fn new(
433 stream: &NominalDatasetStream,
434 channel: ChannelDescriptor,
435 ) -> NominalChannelWriter<'_, T> {
436 NominalChannelWriter {
437 channel,
438 stream,
439 last_flushed_at: Instant::now(),
440 unflushed: vec![],
441 }
442 }
443
444 fn push_point(&mut self, point: T) {
445 self.unflushed.push(point);
446 if self.unflushed.len() >= self.stream.opts.max_points_per_record
447 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
448 {
449 debug!(
450 "conditionally flushing {:?}, ({} points, {:?} since last)",
451 self.channel,
452 self.unflushed.len(),
453 self.last_flushed_at.elapsed()
454 );
455 self.flush();
456 }
457 }
458
459 fn flush(&mut self) {
460 if self.unflushed.is_empty() {
461 return;
462 }
463 info!(
464 "flushing writer for {:?} with {} points",
465 self.channel,
466 self.unflushed.len()
467 );
468 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
469 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
470 buf.extend(&self.channel, to_flush);
471 self.last_flushed_at = Instant::now();
472 })
473 }
474}
475
476impl<T> Drop for NominalChannelWriter<'_, T>
477where
478 Vec<T>: IntoPoints,
479{
480 fn drop(&mut self) {
481 info!("flushing then dropping writer for: {:?}", self.channel);
482 self.flush();
483 }
484}
485
486pub struct NominalDoubleWriter<'ds> {
487 writer: NominalChannelWriter<'ds, DoublePoint>,
488}
489
490impl NominalDoubleWriter<'_> {
491 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
492 self.writer.push_point(DoublePoint {
493 timestamp: Some(timestamp.into_timestamp()),
494 value,
495 });
496 }
497}
498
499pub struct NominalIntegerWriter<'ds> {
500 writer: NominalChannelWriter<'ds, IntegerPoint>,
501}
502
503impl NominalIntegerWriter<'_> {
504 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
505 self.writer.push_point(IntegerPoint {
506 timestamp: Some(timestamp.into_timestamp()),
507 value,
508 });
509 }
510}
511
512pub struct NominalUint64Writer<'ds> {
513 writer: NominalChannelWriter<'ds, Uint64Point>,
514}
515
516impl NominalUint64Writer<'_> {
517 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
518 self.writer.push_point(Uint64Point {
519 timestamp: Some(timestamp.into_timestamp()),
520 value,
521 });
522 }
523}
524
525pub struct NominalStringWriter<'ds> {
526 writer: NominalChannelWriter<'ds, StringPoint>,
527}
528
529impl NominalStringWriter<'_> {
530 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
531 self.writer.push_point(StringPoint {
532 timestamp: Some(timestamp.into_timestamp()),
533 value: value.into(),
534 });
535 }
536}
537
538pub struct NominalStructWriter<'ds> {
539 writer: NominalChannelWriter<'ds, StructPoint>,
540}
541
542impl NominalStructWriter<'_> {
543 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
544 self.writer.push_point(StructPoint {
545 timestamp: Some(timestamp.into_timestamp()),
546 json_string: value.into(),
547 });
548 }
549}
550
551pub struct NominalDoubleArrayWriter<'ds> {
552 writer: NominalChannelWriter<'ds, DoubleArrayPoint>,
553}
554
555impl NominalDoubleArrayWriter<'_> {
556 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: Vec<f64>) {
557 self.writer.push_point(DoubleArrayPoint {
558 timestamp: Some(timestamp.into_timestamp()),
559 value,
560 });
561 }
562}
563
564pub struct NominalStringArrayWriter<'ds> {
565 writer: NominalChannelWriter<'ds, StringArrayPoint>,
566}
567
568impl NominalStringArrayWriter<'_> {
569 pub fn push(
570 &mut self,
571 timestamp: impl IntoTimestamp,
572 value: impl IntoIterator<Item = impl Into<String>>,
573 ) {
574 self.writer.push_point(StringArrayPoint {
575 timestamp: Some(timestamp.into_timestamp()),
576 value: value.into_iter().map(Into::into).collect(),
577 });
578 }
579}
580
581struct SeriesBuffer {
582 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
583 count: AtomicUsize,
588 flush_time: AtomicU64,
589 condvar: Condvar,
590 max_capacity: usize,
591}
592
593struct SeriesBufferGuard<'sb> {
594 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
595 count: &'sb AtomicUsize,
596}
597
598impl SeriesBufferGuard<'_> {
599 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
600 let points = points.into_points();
601 let new_point_count = points_len(&points);
602
603 if !self.sb.contains_key(channel_descriptor) {
604 self.sb.insert(channel_descriptor.clone(), points);
605 } else {
606 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
607 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
608 existing.points.extend(new.points)
609 }
610 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
611 existing.points.extend(new.points)
612 }
613 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
614 existing.points.extend(new.points)
615 }
616 (
617 PointsType::ArrayPoints(ArrayPoints {
618 array_type: Some(ArrayType::DoubleArrayPoints(existing)),
619 }),
620 PointsType::ArrayPoints(ArrayPoints {
621 array_type: Some(ArrayType::DoubleArrayPoints(new)),
622 }),
623 ) => existing.points.extend(new.points),
624 (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
625 existing.points.extend(new.points)
626 }
627 (
628 PointsType::ArrayPoints(ArrayPoints {
629 array_type: Some(ArrayType::StringArrayPoints(existing)),
630 }),
631 PointsType::ArrayPoints(ArrayPoints {
632 array_type: Some(ArrayType::StringArrayPoints(new)),
633 }),
634 ) => existing.points.extend(new.points),
635 (
636 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
637 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
638 ) => {}
639 (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
640 existing.points.extend(new.points);
641 }
642 (
644 PointsType::DoublePoints(_),
645 PointsType::IntegerPoints(_)
646 | PointsType::Uint64Points(_)
647 | PointsType::StringPoints(_)
648 | PointsType::ArrayPoints(_)
649 | PointsType::StructPoints(_),
650 )
651 | (
652 PointsType::StringPoints(_),
653 PointsType::DoublePoints(_)
654 | PointsType::IntegerPoints(_)
655 | PointsType::Uint64Points(_)
656 | PointsType::ArrayPoints(_)
657 | PointsType::StructPoints(_),
658 )
659 | (
660 PointsType::IntegerPoints(_),
661 PointsType::DoublePoints(_)
662 | PointsType::Uint64Points(_)
663 | PointsType::StringPoints(_)
664 | PointsType::ArrayPoints(_)
665 | PointsType::StructPoints(_),
666 )
667 | (
668 PointsType::ArrayPoints(_),
669 PointsType::DoublePoints(_)
670 | PointsType::Uint64Points(_)
671 | PointsType::StringPoints(_)
672 | PointsType::IntegerPoints(_)
673 | PointsType::StructPoints(_),
674 )
675 | (
676 PointsType::ArrayPoints(ArrayPoints {
677 array_type: Some(_),
678 }),
679 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
680 )
681 | (
682 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
683 PointsType::ArrayPoints(ArrayPoints {
684 array_type: Some(_),
685 }),
686 )
687 | (
688 PointsType::ArrayPoints(ArrayPoints {
689 array_type: Some(ArrayType::DoubleArrayPoints(_)),
690 }),
691 PointsType::ArrayPoints(ArrayPoints {
692 array_type: Some(ArrayType::StringArrayPoints(_)),
693 }),
694 )
695 | (
696 PointsType::ArrayPoints(ArrayPoints {
697 array_type: Some(ArrayType::StringArrayPoints(_)),
698 }),
699 PointsType::ArrayPoints(ArrayPoints {
700 array_type: Some(ArrayType::DoubleArrayPoints(_)),
701 }),
702 )
703 | (
704 PointsType::Uint64Points(_),
705 PointsType::IntegerPoints(_)
706 | PointsType::StringPoints(_)
707 | PointsType::DoublePoints(_)
708 | PointsType::ArrayPoints(_)
709 | PointsType::StructPoints(_),
710 )
711 | (
712 PointsType::StructPoints(_),
713 PointsType::DoublePoints(_)
714 | PointsType::Uint64Points(_)
715 | PointsType::StringPoints(_)
716 | PointsType::IntegerPoints(_)
717 | PointsType::ArrayPoints(_),
718 ) => {
719 panic!("mismatched types");
721 }
722 }
723 }
724
725 self.count.fetch_add(new_point_count, Ordering::Release);
726 }
727}
728
729impl PartialEq for SeriesBuffer {
730 fn eq(&self, other: &Self) -> bool {
731 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
732 }
733}
734
735impl PartialOrd for SeriesBuffer {
736 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
737 let flush_time = self.flush_time.load(Ordering::Acquire);
738 let other_flush_time = other.flush_time.load(Ordering::Acquire);
739 flush_time.partial_cmp(&other_flush_time)
740 }
741}
742
743impl SeriesBuffer {
744 fn new(capacity: usize) -> Self {
745 Self {
746 points: Mutex::new(HashMap::new()),
747 count: AtomicUsize::new(0),
748 flush_time: AtomicU64::new(0),
749 condvar: Condvar::new(),
750 max_capacity: capacity,
751 }
752 }
753
754 fn has_capacity(&self, new_points_count: usize) -> bool {
759 let count = self.count.load(Ordering::Acquire);
760 count == 0 || count + new_points_count <= self.max_capacity
761 }
762
763 fn lock(&self) -> SeriesBufferGuard<'_> {
764 SeriesBufferGuard {
765 sb: self.points.lock(),
766 count: &self.count,
767 }
768 }
769
770 fn take(&self) -> (usize, Vec<Series>) {
771 let mut points = self.lock();
772 self.flush_time.store(
773 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
774 Ordering::Release,
775 );
776 let result = points
777 .sb
778 .drain()
779 .map(|(ChannelDescriptor { name, tags }, points)| {
780 let channel = Channel { name };
781 let points_obj = Points {
782 points_type: Some(points),
783 };
784 Series {
785 channel: Some(channel),
786 tags: tags
787 .map(|tags| tags.into_iter().collect())
788 .unwrap_or_default(),
789 points: Some(points_obj),
790 }
791 })
792 .collect();
793 let result_count = points
794 .count
795 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
796 .unwrap();
797 (result_count, result)
798 }
799
800 fn is_empty(&self) -> bool {
801 self.count() == 0
802 }
803
804 fn count(&self) -> usize {
805 self.count.load(Ordering::Acquire)
806 }
807
808 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
809 let mut points_lock = self.points.lock();
810 if !points_lock.is_empty() {
813 self.condvar.wait(&mut points_lock);
814 } else {
815 debug!("buffer emptied since last check, skipping condvar wait");
816 }
817 on_notify(SeriesBufferGuard {
818 sb: points_lock,
819 count: &self.count,
820 });
821 }
822
823 fn notify(&self) -> bool {
824 self.condvar.notify_one()
825 }
826}
827
828fn batch_processor(
829 running: Arc<AtomicBool>,
830 points_buffer: Arc<SeriesBuffer>,
831 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
832 max_request_delay: Duration,
833) {
834 loop {
835 debug!("starting processor loop");
836 if points_buffer.is_empty() {
837 if !running.load(Ordering::Acquire) {
838 debug!("batch processor thread exiting due to running flag");
839 drop(request_chan);
840 break;
841 } else {
842 debug!("empty points buffer, waiting");
843 thread::park_timeout(max_request_delay);
844 }
845 continue;
846 }
847 let (point_count, series) = points_buffer.take();
848
849 if points_buffer.notify() {
850 debug!("notified one waiting thread after clearing points buffer");
851 }
852
853 let write_request = WriteRequestNominal {
854 series,
855 session_name: None,
856 };
857
858 if request_chan.is_full() {
859 debug!("ready to queue request but request channel is full");
860 }
861 let rep = request_chan.send((write_request, point_count));
862 debug!("queued request for processing");
863 if rep.is_err() {
864 error!("failed to send request to dispatcher");
865 } else {
866 debug!("finished submitting request");
867 }
868
869 thread::park_timeout(max_request_delay);
870 }
871 debug!("batch processor thread exiting");
872}
873
874impl Drop for NominalDatasetStream {
875 fn drop(&mut self) {
876 debug!("starting drop for NominalDatasetStream");
877 self.running.store(false, Ordering::Release);
878 loop {
879 let count = self.unflushed_points.load(Ordering::Acquire);
880 if count == 0 {
881 break;
882 }
883 debug!(
884 "waiting for all points to be flushed before dropping stream, {count} points remaining",
885 );
886 thread::sleep(Duration::from_millis(50));
888 }
889 }
890}
891
892fn request_dispatcher<C: WriteRequestConsumer + 'static>(
893 running: Arc<AtomicBool>,
894 unflushed_points: Arc<AtomicUsize>,
895 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
896 consumer: Arc<C>,
897) {
898 let mut total_request_time = 0;
899 loop {
900 match request_rx.recv() {
901 Ok((request, point_count)) => {
902 debug!("received writerequest from channel");
903 let req_start = Instant::now();
904 match consumer.consume(&request) {
905 Ok(_) => {
906 let time = req_start.elapsed().as_millis();
907 debug!("request of {} points sent in {} ms", point_count, time);
908 total_request_time += time as u64;
909 }
910 Err(e) => {
911 error!("Failed to send request: {e:?}");
912 }
913 }
914 unflushed_points.fetch_sub(point_count, Ordering::Release);
915
916 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
917 {
918 info!("all points flushed, closing dispatcher thread");
919 drop(request_rx);
921 break;
922 }
923 }
924 Err(e) => {
925 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
926 break;
927 }
928 }
929 }
930 debug!(
931 "request dispatcher thread exiting. total request time: {}",
932 total_request_time
933 );
934}
935
936fn points_len(points_type: &PointsType) -> usize {
937 match points_type {
938 PointsType::DoublePoints(points) => points.points.len(),
939 PointsType::StringPoints(points) => points.points.len(),
940 PointsType::IntegerPoints(points) => points.points.len(),
941 PointsType::Uint64Points(points) => points.points.len(),
942 PointsType::ArrayPoints(points) => match &points.array_type {
943 Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
944 Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
945 None => 0,
946 },
947 PointsType::StructPoints(points) => points.points.len(),
948 }
949}