1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoubleArrayPoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::Points;
24use nominal_api::tonic::io::nominal::scout::api::proto::Series;
25use nominal_api::tonic::io::nominal::scout::api::proto::StringArrayPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
27use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
28use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
29use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
30use parking_lot::Condvar;
31use parking_lot::Mutex;
32use parking_lot::MutexGuard;
33use tracing::debug;
34use tracing::error;
35use tracing::info;
36
37use crate::client::NominalApiClients;
38use crate::client::PRODUCTION_API_URL;
39use crate::consumer::AvroFileConsumer;
40use crate::consumer::DualWriteRequestConsumer;
41use crate::consumer::ListeningWriteRequestConsumer;
42use crate::consumer::NominalCoreConsumer;
43use crate::consumer::RequestConsumerWithFallback;
44use crate::consumer::WriteRequestConsumer;
45use crate::listener::LoggingListener;
46use crate::types::ChannelDescriptor;
47use crate::types::IntoPoints;
48use crate::types::IntoTimestamp;
49
50#[derive(Debug, Clone)]
51pub struct NominalStreamOpts {
52 pub max_points_per_record: usize,
53 pub max_request_delay: Duration,
54 pub max_buffered_requests: usize,
55 pub request_dispatcher_tasks: usize,
56 pub base_api_url: String,
57}
58
59impl Default for NominalStreamOpts {
60 fn default() -> Self {
61 Self {
62 max_points_per_record: 250_000,
63 max_request_delay: Duration::from_millis(100),
64 max_buffered_requests: 4,
65 request_dispatcher_tasks: 8,
66 base_api_url: PRODUCTION_API_URL.to_string(),
67 }
68 }
69}
70
71#[derive(Default)]
72pub struct NominalDatasetStreamBuilder {
73 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
74 stream_to_file: Option<PathBuf>,
75 file_fallback: Option<PathBuf>,
76 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
77 opts: NominalStreamOpts,
78}
79
80impl Debug for NominalDatasetStreamBuilder {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("NominalDatasetStreamBuilder")
83 .field("stream_to_core", &self.stream_to_core.is_some())
84 .field("stream_to_file", &self.stream_to_file)
85 .field("file_fallback", &self.file_fallback)
86 .field("listeners", &self.listeners.len())
87 .finish()
88 }
89}
90
91impl NominalDatasetStreamBuilder {
92 pub fn new() -> Self {
93 Self::default()
94 }
95}
96
97impl NominalDatasetStreamBuilder {
98 pub fn stream_to_core(
99 self,
100 bearer_token: BearerToken,
101 dataset: ResourceIdentifier,
102 handle: tokio::runtime::Handle,
103 ) -> NominalDatasetStreamBuilder {
104 NominalDatasetStreamBuilder {
105 stream_to_core: Some((bearer_token, dataset, handle)),
106 stream_to_file: self.stream_to_file,
107 file_fallback: self.file_fallback,
108 listeners: self.listeners,
109 opts: self.opts,
110 }
111 }
112
113 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
114 self.stream_to_file = Some(file_path.into());
115 self
116 }
117
118 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
119 self.file_fallback = Some(file_path.into());
120 self
121 }
122
123 pub fn add_listener(
124 mut self,
125 listener: Arc<dyn crate::listener::NominalStreamListener>,
126 ) -> Self {
127 self.listeners.push(listener);
128 self
129 }
130
131 pub fn with_listeners(
132 mut self,
133 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
134 ) -> Self {
135 self.listeners = listeners;
136 self
137 }
138
139 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
140 self.opts = opts;
141 self
142 }
143
144 #[cfg(feature = "logging")]
145 fn init_logging(self, directive: Option<&str>) -> Self {
146 use tracing_subscriber::layer::SubscriberExt;
147 use tracing_subscriber::util::SubscriberInitExt;
148
149 let base = tracing_subscriber::EnvFilter::builder()
151 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
152 let env_filter = match directive {
153 Some(d) => base.parse_lossy(d),
154 None => base.from_env_lossy(),
155 };
156
157 let subscriber = tracing_subscriber::registry()
158 .with(
159 tracing_subscriber::fmt::layer()
160 .with_thread_ids(true)
161 .with_thread_names(true)
162 .with_line_number(true),
163 )
164 .with(env_filter);
165
166 if let Err(error) = subscriber.try_init() {
167 eprintln!("nominal streaming failed to enable logging: {error}");
168 }
169
170 self
171 }
172
173 #[cfg(feature = "logging")]
174 pub fn enable_logging(self) -> Self {
175 self.init_logging(None)
176 }
177
178 #[cfg(feature = "logging")]
179 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
180 self.init_logging(Some(log_directive))
181 }
182
183 pub fn build(self) -> NominalDatasetStream {
184 let core_consumer = self.core_consumer();
185 let file_consumer = self.file_consumer();
186 let fallback_consumer = self.fallback_consumer();
187
188 match (core_consumer, file_consumer, fallback_consumer) {
189 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
190 (Some(_), Some(_), Some(_)) => {
191 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
192 }
193 (Some(core), None, None) => self.into_stream(core),
194 (Some(core), None, Some(fallback)) => {
195 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
196 }
197 (None, Some(file), None) => self.into_stream(file),
198 (None, Some(file), Some(fallback)) => {
199 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
201 }
202 (Some(core), Some(file), None) => {
203 self.into_stream(DualWriteRequestConsumer::new(core, file))
204 }
205 }
206 }
207
208 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
209 self.stream_to_core
210 .as_ref()
211 .map(|(auth_provider, dataset, handle)| {
212 NominalCoreConsumer::new(
213 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
214 handle.clone(),
215 auth_provider.clone(),
216 dataset.clone(),
217 )
218 })
219 }
220
221 fn dataset_rid(&self) -> Option<ResourceIdentifier> {
222 self.stream_to_core.as_ref().map(|(_, rid, _)| rid.clone())
223 }
224
225 fn file_consumer(&self) -> Option<AvroFileConsumer> {
226 self.stream_to_file.as_ref().map(|path| {
227 AvroFileConsumer::new_with_full_path(path, true, self.dataset_rid()).unwrap()
228 })
229 }
230
231 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
232 self.file_fallback.as_ref().map(|path| {
233 AvroFileConsumer::new_with_full_path(path, true, self.dataset_rid()).unwrap()
234 })
235 }
236
237 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
238 let mut listeners = self.listeners;
239 listeners.push(Arc::new(LoggingListener));
240 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
241 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
242 }
243}
244
245#[deprecated]
247pub type NominalDatasourceStream = NominalDatasetStream;
248
249pub struct NominalDatasetStream {
250 opts: NominalStreamOpts,
251 running: Arc<AtomicBool>,
252 unflushed_points: Arc<AtomicUsize>,
253 primary_buffer: Arc<SeriesBuffer>,
254 secondary_buffer: Arc<SeriesBuffer>,
255 primary_handle: thread::JoinHandle<()>,
256 secondary_handle: thread::JoinHandle<()>,
257}
258
259impl NominalDatasetStream {
260 pub fn builder() -> NominalDatasetStreamBuilder {
261 NominalDatasetStreamBuilder::new()
262 }
263
264 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
265 consumer: C,
266 opts: NominalStreamOpts,
267 ) -> Self {
268 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
269 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
270
271 let (request_tx, request_rx) =
272 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
273
274 let running = Arc::new(AtomicBool::new(true));
275 let unflushed_points = Arc::new(AtomicUsize::new(0));
276
277 let primary_handle = thread::Builder::new()
278 .name("nmstream_primary".to_string())
279 .spawn({
280 let points_buffer = Arc::clone(&primary_buffer);
281 let running = running.clone();
282 let tx = request_tx.clone();
283 move || {
284 batch_processor(running, points_buffer, tx, opts.max_request_delay);
285 }
286 })
287 .unwrap();
288
289 let secondary_handle = thread::Builder::new()
290 .name("nmstream_secondary".to_string())
291 .spawn({
292 let secondary_buffer = Arc::clone(&secondary_buffer);
293 let running = running.clone();
294 move || {
295 batch_processor(
296 running,
297 secondary_buffer,
298 request_tx,
299 opts.max_request_delay,
300 );
301 }
302 })
303 .unwrap();
304
305 let consumer = Arc::new(consumer);
306
307 for i in 0..opts.request_dispatcher_tasks {
308 thread::Builder::new()
309 .name(format!("nmstream_dispatch_{i}"))
310 .spawn({
311 let running = Arc::clone(&running);
312 let unflushed_points = Arc::clone(&unflushed_points);
313 let rx = request_rx.clone();
314 let consumer = consumer.clone();
315 move || {
316 debug!("starting request dispatcher #{}", i);
317 request_dispatcher(running, unflushed_points, rx, consumer);
318 }
319 })
320 .unwrap();
321 }
322
323 NominalDatasetStream {
324 opts,
325 running,
326 unflushed_points,
327 primary_buffer,
328 secondary_buffer,
329 primary_handle,
330 secondary_handle,
331 }
332 }
333
334 pub fn double_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalDoubleWriter<'_> {
335 NominalDoubleWriter {
336 writer: NominalChannelWriter::new(self, channel_descriptor),
337 }
338 }
339
340 pub fn string_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStringWriter<'_> {
341 NominalStringWriter {
342 writer: NominalChannelWriter::new(self, channel_descriptor),
343 }
344 }
345
346 pub fn integer_writer(
347 &self,
348 channel_descriptor: ChannelDescriptor,
349 ) -> NominalIntegerWriter<'_> {
350 NominalIntegerWriter {
351 writer: NominalChannelWriter::new(self, channel_descriptor),
352 }
353 }
354
355 pub fn uint64_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalUint64Writer<'_> {
356 NominalUint64Writer {
357 writer: NominalChannelWriter::new(self, channel_descriptor),
358 }
359 }
360
361 pub fn struct_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStructWriter<'_> {
362 NominalStructWriter {
363 writer: NominalChannelWriter::new(self, channel_descriptor),
364 }
365 }
366
367 pub fn double_array_writer(
368 &self,
369 channel_descriptor: ChannelDescriptor,
370 ) -> NominalDoubleArrayWriter<'_> {
371 NominalDoubleArrayWriter {
372 writer: NominalChannelWriter::new(self, channel_descriptor),
373 }
374 }
375
376 pub fn string_array_writer(
377 &self,
378 channel_descriptor: ChannelDescriptor,
379 ) -> NominalStringArrayWriter<'_> {
380 NominalStringArrayWriter {
381 writer: NominalChannelWriter::new(self, channel_descriptor),
382 }
383 }
384
385 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
386 let new_points = new_points.into_points();
387 let new_count = points_len(&new_points);
388
389 self.when_capacity(new_count, |mut sb| {
390 sb.extend(channel_descriptor, new_points)
391 });
392 }
393
394 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
395 self.unflushed_points
396 .fetch_add(new_count, Ordering::Release);
397
398 if self.primary_buffer.has_capacity(new_count) {
399 debug!("adding {} points to primary buffer", new_count);
400 callback(self.primary_buffer.lock());
401 } else if self.secondary_buffer.has_capacity(new_count) {
402 self.primary_handle.thread().unpark();
404 debug!("adding {} points to secondary buffer", new_count);
405 callback(self.secondary_buffer.lock());
406 } else {
407 let buf = if self.primary_buffer < self.secondary_buffer {
408 info!("waiting for primary buffer to flush to append {new_count} points...");
409 self.primary_handle.thread().unpark();
410 &self.primary_buffer
411 } else {
412 info!("waiting for secondary buffer to flush to append {new_count} points...");
413 self.secondary_handle.thread().unpark();
414 &self.secondary_buffer
415 };
416
417 buf.on_notify(callback);
418 }
419 }
420}
421
422pub struct NominalChannelWriter<'ds, T>
423where
424 Vec<T>: IntoPoints,
425{
426 channel: ChannelDescriptor,
427 stream: &'ds NominalDatasetStream,
428 last_flushed_at: Instant,
429 unflushed: Vec<T>,
430}
431
432impl<T> NominalChannelWriter<'_, T>
433where
434 Vec<T>: IntoPoints,
435{
436 fn new(
437 stream: &NominalDatasetStream,
438 channel: ChannelDescriptor,
439 ) -> NominalChannelWriter<'_, T> {
440 NominalChannelWriter {
441 channel,
442 stream,
443 last_flushed_at: Instant::now(),
444 unflushed: vec![],
445 }
446 }
447
448 fn push_point(&mut self, point: T) {
449 self.unflushed.push(point);
450 if self.unflushed.len() >= self.stream.opts.max_points_per_record
451 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
452 {
453 debug!(
454 "conditionally flushing {:?}, ({} points, {:?} since last)",
455 self.channel,
456 self.unflushed.len(),
457 self.last_flushed_at.elapsed()
458 );
459 self.flush();
460 }
461 }
462
463 fn flush(&mut self) {
464 if self.unflushed.is_empty() {
465 return;
466 }
467 info!(
468 "flushing writer for {:?} with {} points",
469 self.channel,
470 self.unflushed.len()
471 );
472 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
473 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
474 buf.extend(&self.channel, to_flush);
475 self.last_flushed_at = Instant::now();
476 })
477 }
478}
479
480impl<T> Drop for NominalChannelWriter<'_, T>
481where
482 Vec<T>: IntoPoints,
483{
484 fn drop(&mut self) {
485 info!("flushing then dropping writer for: {:?}", self.channel);
486 self.flush();
487 }
488}
489
490pub struct NominalDoubleWriter<'ds> {
491 writer: NominalChannelWriter<'ds, DoublePoint>,
492}
493
494impl NominalDoubleWriter<'_> {
495 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
496 self.writer.push_point(DoublePoint {
497 timestamp: Some(timestamp.into_timestamp()),
498 value,
499 });
500 }
501}
502
503pub struct NominalIntegerWriter<'ds> {
504 writer: NominalChannelWriter<'ds, IntegerPoint>,
505}
506
507impl NominalIntegerWriter<'_> {
508 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
509 self.writer.push_point(IntegerPoint {
510 timestamp: Some(timestamp.into_timestamp()),
511 value,
512 });
513 }
514}
515
516pub struct NominalUint64Writer<'ds> {
517 writer: NominalChannelWriter<'ds, Uint64Point>,
518}
519
520impl NominalUint64Writer<'_> {
521 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
522 self.writer.push_point(Uint64Point {
523 timestamp: Some(timestamp.into_timestamp()),
524 value,
525 });
526 }
527}
528
529pub struct NominalStringWriter<'ds> {
530 writer: NominalChannelWriter<'ds, StringPoint>,
531}
532
533impl NominalStringWriter<'_> {
534 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
535 self.writer.push_point(StringPoint {
536 timestamp: Some(timestamp.into_timestamp()),
537 value: value.into(),
538 });
539 }
540}
541
542pub struct NominalStructWriter<'ds> {
543 writer: NominalChannelWriter<'ds, StructPoint>,
544}
545
546impl NominalStructWriter<'_> {
547 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
548 self.writer.push_point(StructPoint {
549 timestamp: Some(timestamp.into_timestamp()),
550 json_string: value.into(),
551 });
552 }
553}
554
555pub struct NominalDoubleArrayWriter<'ds> {
556 writer: NominalChannelWriter<'ds, DoubleArrayPoint>,
557}
558
559impl NominalDoubleArrayWriter<'_> {
560 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: Vec<f64>) {
561 self.writer.push_point(DoubleArrayPoint {
562 timestamp: Some(timestamp.into_timestamp()),
563 value,
564 });
565 }
566}
567
568pub struct NominalStringArrayWriter<'ds> {
569 writer: NominalChannelWriter<'ds, StringArrayPoint>,
570}
571
572impl NominalStringArrayWriter<'_> {
573 pub fn push(
574 &mut self,
575 timestamp: impl IntoTimestamp,
576 value: impl IntoIterator<Item = impl Into<String>>,
577 ) {
578 self.writer.push_point(StringArrayPoint {
579 timestamp: Some(timestamp.into_timestamp()),
580 value: value.into_iter().map(Into::into).collect(),
581 });
582 }
583}
584
585struct SeriesBuffer {
586 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
587 count: AtomicUsize,
592 flush_time: AtomicU64,
593 condvar: Condvar,
594 max_capacity: usize,
595}
596
597struct SeriesBufferGuard<'sb> {
598 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
599 count: &'sb AtomicUsize,
600}
601
602impl SeriesBufferGuard<'_> {
603 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
604 let points = points.into_points();
605 let new_point_count = points_len(&points);
606
607 if !self.sb.contains_key(channel_descriptor) {
608 self.sb.insert(channel_descriptor.clone(), points);
609 } else {
610 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
611 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
612 existing.points.extend(new.points)
613 }
614 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
615 existing.points.extend(new.points)
616 }
617 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
618 existing.points.extend(new.points)
619 }
620 (
621 PointsType::ArrayPoints(ArrayPoints {
622 array_type: Some(ArrayType::DoubleArrayPoints(existing)),
623 }),
624 PointsType::ArrayPoints(ArrayPoints {
625 array_type: Some(ArrayType::DoubleArrayPoints(new)),
626 }),
627 ) => existing.points.extend(new.points),
628 (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
629 existing.points.extend(new.points)
630 }
631 (
632 PointsType::ArrayPoints(ArrayPoints {
633 array_type: Some(ArrayType::StringArrayPoints(existing)),
634 }),
635 PointsType::ArrayPoints(ArrayPoints {
636 array_type: Some(ArrayType::StringArrayPoints(new)),
637 }),
638 ) => existing.points.extend(new.points),
639 (
640 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
641 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
642 ) => {}
643 (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
644 existing.points.extend(new.points);
645 }
646 (
648 PointsType::DoublePoints(_),
649 PointsType::IntegerPoints(_)
650 | PointsType::Uint64Points(_)
651 | PointsType::StringPoints(_)
652 | PointsType::ArrayPoints(_)
653 | PointsType::StructPoints(_),
654 )
655 | (
656 PointsType::StringPoints(_),
657 PointsType::DoublePoints(_)
658 | PointsType::IntegerPoints(_)
659 | PointsType::Uint64Points(_)
660 | PointsType::ArrayPoints(_)
661 | PointsType::StructPoints(_),
662 )
663 | (
664 PointsType::IntegerPoints(_),
665 PointsType::DoublePoints(_)
666 | PointsType::Uint64Points(_)
667 | PointsType::StringPoints(_)
668 | PointsType::ArrayPoints(_)
669 | PointsType::StructPoints(_),
670 )
671 | (
672 PointsType::ArrayPoints(_),
673 PointsType::DoublePoints(_)
674 | PointsType::Uint64Points(_)
675 | PointsType::StringPoints(_)
676 | PointsType::IntegerPoints(_)
677 | PointsType::StructPoints(_),
678 )
679 | (
680 PointsType::ArrayPoints(ArrayPoints {
681 array_type: Some(_),
682 }),
683 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
684 )
685 | (
686 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
687 PointsType::ArrayPoints(ArrayPoints {
688 array_type: Some(_),
689 }),
690 )
691 | (
692 PointsType::ArrayPoints(ArrayPoints {
693 array_type: Some(ArrayType::DoubleArrayPoints(_)),
694 }),
695 PointsType::ArrayPoints(ArrayPoints {
696 array_type: Some(ArrayType::StringArrayPoints(_)),
697 }),
698 )
699 | (
700 PointsType::ArrayPoints(ArrayPoints {
701 array_type: Some(ArrayType::StringArrayPoints(_)),
702 }),
703 PointsType::ArrayPoints(ArrayPoints {
704 array_type: Some(ArrayType::DoubleArrayPoints(_)),
705 }),
706 )
707 | (
708 PointsType::Uint64Points(_),
709 PointsType::IntegerPoints(_)
710 | PointsType::StringPoints(_)
711 | PointsType::DoublePoints(_)
712 | PointsType::ArrayPoints(_)
713 | PointsType::StructPoints(_),
714 )
715 | (
716 PointsType::StructPoints(_),
717 PointsType::DoublePoints(_)
718 | PointsType::Uint64Points(_)
719 | PointsType::StringPoints(_)
720 | PointsType::IntegerPoints(_)
721 | PointsType::ArrayPoints(_),
722 ) => {
723 panic!("mismatched types");
725 }
726 }
727 }
728
729 self.count.fetch_add(new_point_count, Ordering::Release);
730 }
731}
732
733impl PartialEq for SeriesBuffer {
734 fn eq(&self, other: &Self) -> bool {
735 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
736 }
737}
738
739impl PartialOrd for SeriesBuffer {
740 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
741 let flush_time = self.flush_time.load(Ordering::Acquire);
742 let other_flush_time = other.flush_time.load(Ordering::Acquire);
743 flush_time.partial_cmp(&other_flush_time)
744 }
745}
746
747impl SeriesBuffer {
748 fn new(capacity: usize) -> Self {
749 Self {
750 points: Mutex::new(HashMap::new()),
751 count: AtomicUsize::new(0),
752 flush_time: AtomicU64::new(0),
753 condvar: Condvar::new(),
754 max_capacity: capacity,
755 }
756 }
757
758 fn has_capacity(&self, new_points_count: usize) -> bool {
763 let count = self.count.load(Ordering::Acquire);
764 count == 0 || count + new_points_count <= self.max_capacity
765 }
766
767 fn lock(&self) -> SeriesBufferGuard<'_> {
768 SeriesBufferGuard {
769 sb: self.points.lock(),
770 count: &self.count,
771 }
772 }
773
774 fn take(&self) -> (usize, Vec<Series>) {
775 let mut points = self.lock();
776 self.flush_time.store(
777 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
778 Ordering::Release,
779 );
780 let result = points
781 .sb
782 .drain()
783 .map(|(ChannelDescriptor { name, tags }, points)| {
784 let channel = Channel { name };
785 let points_obj = Points {
786 points_type: Some(points),
787 };
788 Series {
789 channel: Some(channel),
790 tags: tags
791 .map(|tags| tags.into_iter().collect())
792 .unwrap_or_default(),
793 points: Some(points_obj),
794 }
795 })
796 .collect();
797 let result_count = points
798 .count
799 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
800 .unwrap();
801 (result_count, result)
802 }
803
804 fn is_empty(&self) -> bool {
805 self.count() == 0
806 }
807
808 fn count(&self) -> usize {
809 self.count.load(Ordering::Acquire)
810 }
811
812 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
813 let mut points_lock = self.points.lock();
814 if !points_lock.is_empty() {
817 self.condvar.wait(&mut points_lock);
818 } else {
819 debug!("buffer emptied since last check, skipping condvar wait");
820 }
821 on_notify(SeriesBufferGuard {
822 sb: points_lock,
823 count: &self.count,
824 });
825 }
826
827 fn notify(&self) -> bool {
828 self.condvar.notify_one()
829 }
830}
831
832fn batch_processor(
833 running: Arc<AtomicBool>,
834 points_buffer: Arc<SeriesBuffer>,
835 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
836 max_request_delay: Duration,
837) {
838 loop {
839 debug!("starting processor loop");
840 if points_buffer.is_empty() {
841 if !running.load(Ordering::Acquire) {
842 debug!("batch processor thread exiting due to running flag");
843 drop(request_chan);
844 break;
845 } else {
846 debug!("empty points buffer, waiting");
847 thread::park_timeout(max_request_delay);
848 }
849 continue;
850 }
851 let (point_count, series) = points_buffer.take();
852
853 if points_buffer.notify() {
854 debug!("notified one waiting thread after clearing points buffer");
855 }
856
857 let write_request = WriteRequestNominal {
858 series,
859 session_name: None,
860 };
861
862 if request_chan.is_full() {
863 debug!("ready to queue request but request channel is full");
864 }
865 let rep = request_chan.send((write_request, point_count));
866 debug!("queued request for processing");
867 if rep.is_err() {
868 error!("failed to send request to dispatcher");
869 } else {
870 debug!("finished submitting request");
871 }
872
873 thread::park_timeout(max_request_delay);
874 }
875 debug!("batch processor thread exiting");
876}
877
878impl Drop for NominalDatasetStream {
879 fn drop(&mut self) {
880 debug!("starting drop for NominalDatasetStream");
881 self.running.store(false, Ordering::Release);
882 loop {
883 let count = self.unflushed_points.load(Ordering::Acquire);
884 if count == 0 {
885 break;
886 }
887 debug!(
888 "waiting for all points to be flushed before dropping stream, {count} points remaining",
889 );
890 thread::sleep(Duration::from_millis(50));
892 }
893 }
894}
895
896fn request_dispatcher<C: WriteRequestConsumer + 'static>(
897 running: Arc<AtomicBool>,
898 unflushed_points: Arc<AtomicUsize>,
899 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
900 consumer: Arc<C>,
901) {
902 let mut total_request_time = 0;
903 loop {
904 match request_rx.recv() {
905 Ok((request, point_count)) => {
906 debug!("received writerequest from channel");
907 let req_start = Instant::now();
908 match consumer.consume(&request) {
909 Ok(_) => {
910 let time = req_start.elapsed().as_millis();
911 debug!("request of {} points sent in {} ms", point_count, time);
912 total_request_time += time as u64;
913 }
914 Err(e) => {
915 error!("Failed to send request: {e:?}");
916 }
917 }
918 unflushed_points.fetch_sub(point_count, Ordering::Release);
919
920 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
921 {
922 info!("all points flushed, closing dispatcher thread");
923 drop(request_rx);
925 break;
926 }
927 }
928 Err(e) => {
929 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
930 break;
931 }
932 }
933 }
934 debug!(
935 "request dispatcher thread exiting. total request time: {}",
936 total_request_time
937 );
938}
939
940fn points_len(points_type: &PointsType) -> usize {
941 match points_type {
942 PointsType::DoublePoints(points) => points.points.len(),
943 PointsType::StringPoints(points) => points.points.len(),
944 PointsType::IntegerPoints(points) => points.points.len(),
945 PointsType::Uint64Points(points) => points.points.len(),
946 PointsType::ArrayPoints(points) => match &points.array_type {
947 Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
948 Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
949 None => 0,
950 },
951 PointsType::StructPoints(points) => points.points.len(),
952 }
953}