1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::Points;
23use nominal_api::tonic::io::nominal::scout::api::proto::Series;
24use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
25use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
27use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
28use parking_lot::Condvar;
29use parking_lot::Mutex;
30use parking_lot::MutexGuard;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34
35use crate::client::NominalApiClients;
36use crate::client::PRODUCTION_API_URL;
37use crate::consumer::AvroFileConsumer;
38use crate::consumer::DualWriteRequestConsumer;
39use crate::consumer::ListeningWriteRequestConsumer;
40use crate::consumer::NominalCoreConsumer;
41use crate::consumer::RequestConsumerWithFallback;
42use crate::consumer::WriteRequestConsumer;
43use crate::listener::LoggingListener;
44use crate::types::ChannelDescriptor;
45use crate::types::IntoPoints;
46use crate::types::IntoTimestamp;
47
48#[derive(Debug, Clone)]
49pub struct NominalStreamOpts {
50 pub max_points_per_record: usize,
51 pub max_request_delay: Duration,
52 pub max_buffered_requests: usize,
53 pub request_dispatcher_tasks: usize,
54 pub base_api_url: String,
55}
56
57impl Default for NominalStreamOpts {
58 fn default() -> Self {
59 Self {
60 max_points_per_record: 250_000,
61 max_request_delay: Duration::from_millis(100),
62 max_buffered_requests: 4,
63 request_dispatcher_tasks: 8,
64 base_api_url: PRODUCTION_API_URL.to_string(),
65 }
66 }
67}
68
69#[derive(Default)]
70pub struct NominalDatasetStreamBuilder {
71 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
72 stream_to_file: Option<PathBuf>,
73 file_fallback: Option<PathBuf>,
74 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
75 opts: NominalStreamOpts,
76}
77
78impl Debug for NominalDatasetStreamBuilder {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("NominalDatasetStreamBuilder")
81 .field("stream_to_core", &self.stream_to_core.is_some())
82 .field("stream_to_file", &self.stream_to_file)
83 .field("file_fallback", &self.file_fallback)
84 .field("listeners", &self.listeners.len())
85 .finish()
86 }
87}
88
89impl NominalDatasetStreamBuilder {
90 pub fn new() -> Self {
91 Self::default()
92 }
93}
94
95impl NominalDatasetStreamBuilder {
96 pub fn stream_to_core(
97 self,
98 bearer_token: BearerToken,
99 dataset: ResourceIdentifier,
100 handle: tokio::runtime::Handle,
101 ) -> NominalDatasetStreamBuilder {
102 NominalDatasetStreamBuilder {
103 stream_to_core: Some((bearer_token, dataset, handle)),
104 stream_to_file: self.stream_to_file,
105 file_fallback: self.file_fallback,
106 listeners: self.listeners,
107 opts: self.opts,
108 }
109 }
110
111 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
112 self.stream_to_file = Some(file_path.into());
113 self
114 }
115
116 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
117 self.file_fallback = Some(file_path.into());
118 self
119 }
120
121 pub fn add_listener(
122 mut self,
123 listener: Arc<dyn crate::listener::NominalStreamListener>,
124 ) -> Self {
125 self.listeners.push(listener);
126 self
127 }
128
129 pub fn with_listeners(
130 mut self,
131 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
132 ) -> Self {
133 self.listeners = listeners;
134 self
135 }
136
137 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
138 self.opts = opts;
139 self
140 }
141
142 #[cfg(feature = "logging")]
143 fn init_logging(self, directive: Option<&str>) -> Self {
144 use tracing_subscriber::layer::SubscriberExt;
145 use tracing_subscriber::util::SubscriberInitExt;
146
147 let base = tracing_subscriber::EnvFilter::builder()
149 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
150 let env_filter = match directive {
151 Some(d) => base.parse_lossy(d),
152 None => base.from_env_lossy(),
153 };
154
155 let subscriber = tracing_subscriber::registry()
156 .with(
157 tracing_subscriber::fmt::layer()
158 .with_thread_ids(true)
159 .with_thread_names(true)
160 .with_line_number(true),
161 )
162 .with(env_filter);
163
164 if let Err(error) = subscriber.try_init() {
165 eprintln!("nominal streaming failed to enable logging: {error}");
166 }
167
168 self
169 }
170
171 #[cfg(feature = "logging")]
172 pub fn enable_logging(self) -> Self {
173 self.init_logging(None)
174 }
175
176 #[cfg(feature = "logging")]
177 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
178 self.init_logging(Some(log_directive))
179 }
180
181 pub fn build(self) -> NominalDatasetStream {
182 let core_consumer = self.core_consumer();
183 let file_consumer = self.file_consumer();
184 let fallback_consumer = self.fallback_consumer();
185
186 match (core_consumer, file_consumer, fallback_consumer) {
187 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
188 (Some(_), Some(_), Some(_)) => {
189 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
190 }
191 (Some(core), None, None) => self.into_stream(core),
192 (Some(core), None, Some(fallback)) => {
193 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
194 }
195 (None, Some(file), None) => self.into_stream(file),
196 (None, Some(file), Some(fallback)) => {
197 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
199 }
200 (Some(core), Some(file), None) => {
201 self.into_stream(DualWriteRequestConsumer::new(core, file))
202 }
203 }
204 }
205
206 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
207 self.stream_to_core
208 .as_ref()
209 .map(|(auth_provider, dataset, handle)| {
210 NominalCoreConsumer::new(
211 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
212 handle.clone(),
213 auth_provider.clone(),
214 dataset.clone(),
215 )
216 })
217 }
218
219 fn file_consumer(&self) -> Option<AvroFileConsumer> {
220 self.stream_to_file
221 .as_ref()
222 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
223 }
224
225 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
226 self.file_fallback
227 .as_ref()
228 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
229 }
230
231 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
232 let mut listeners = self.listeners;
233 listeners.push(Arc::new(LoggingListener));
234 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
235 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
236 }
237}
238
239#[deprecated]
241pub type NominalDatasourceStream = NominalDatasetStream;
242
243pub struct NominalDatasetStream {
244 opts: NominalStreamOpts,
245 running: Arc<AtomicBool>,
246 unflushed_points: Arc<AtomicUsize>,
247 primary_buffer: Arc<SeriesBuffer>,
248 secondary_buffer: Arc<SeriesBuffer>,
249 primary_handle: thread::JoinHandle<()>,
250 secondary_handle: thread::JoinHandle<()>,
251}
252
253impl NominalDatasetStream {
254 pub fn builder() -> NominalDatasetStreamBuilder {
255 NominalDatasetStreamBuilder::new()
256 }
257
258 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
259 consumer: C,
260 opts: NominalStreamOpts,
261 ) -> Self {
262 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
263 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
264
265 let (request_tx, request_rx) =
266 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
267
268 let running = Arc::new(AtomicBool::new(true));
269 let unflushed_points = Arc::new(AtomicUsize::new(0));
270
271 let primary_handle = thread::Builder::new()
272 .name("nmstream_primary".to_string())
273 .spawn({
274 let points_buffer = Arc::clone(&primary_buffer);
275 let running = running.clone();
276 let tx = request_tx.clone();
277 move || {
278 batch_processor(running, points_buffer, tx, opts.max_request_delay);
279 }
280 })
281 .unwrap();
282
283 let secondary_handle = thread::Builder::new()
284 .name("nmstream_secondary".to_string())
285 .spawn({
286 let secondary_buffer = Arc::clone(&secondary_buffer);
287 let running = running.clone();
288 move || {
289 batch_processor(
290 running,
291 secondary_buffer,
292 request_tx,
293 opts.max_request_delay,
294 );
295 }
296 })
297 .unwrap();
298
299 let consumer = Arc::new(consumer);
300
301 for i in 0..opts.request_dispatcher_tasks {
302 thread::Builder::new()
303 .name(format!("nmstream_dispatch_{i}"))
304 .spawn({
305 let running = Arc::clone(&running);
306 let unflushed_points = Arc::clone(&unflushed_points);
307 let rx = request_rx.clone();
308 let consumer = consumer.clone();
309 move || {
310 debug!("starting request dispatcher #{}", i);
311 request_dispatcher(running, unflushed_points, rx, consumer);
312 }
313 })
314 .unwrap();
315 }
316
317 NominalDatasetStream {
318 opts,
319 running,
320 unflushed_points,
321 primary_buffer,
322 secondary_buffer,
323 primary_handle,
324 secondary_handle,
325 }
326 }
327
328 pub fn double_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalDoubleWriter<'_> {
329 NominalDoubleWriter {
330 writer: NominalChannelWriter::new(self, channel_descriptor),
331 }
332 }
333
334 pub fn string_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStringWriter<'_> {
335 NominalStringWriter {
336 writer: NominalChannelWriter::new(self, channel_descriptor),
337 }
338 }
339
340 pub fn integer_writer(
341 &self,
342 channel_descriptor: ChannelDescriptor,
343 ) -> NominalIntegerWriter<'_> {
344 NominalIntegerWriter {
345 writer: NominalChannelWriter::new(self, channel_descriptor),
346 }
347 }
348
349 pub fn uint64_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalUint64Writer<'_> {
350 NominalUint64Writer {
351 writer: NominalChannelWriter::new(self, channel_descriptor),
352 }
353 }
354
355 pub fn struct_writer(&self, channel_descriptor: ChannelDescriptor) -> NominalStructWriter<'_> {
356 NominalStructWriter {
357 writer: NominalChannelWriter::new(self, channel_descriptor),
358 }
359 }
360 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
361 let new_points = new_points.into_points();
362 let new_count = points_len(&new_points);
363
364 self.when_capacity(new_count, |mut sb| {
365 sb.extend(channel_descriptor, new_points)
366 });
367 }
368
369 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
370 self.unflushed_points
371 .fetch_add(new_count, Ordering::Release);
372
373 if self.primary_buffer.has_capacity(new_count) {
374 debug!("adding {} points to primary buffer", new_count);
375 callback(self.primary_buffer.lock());
376 } else if self.secondary_buffer.has_capacity(new_count) {
377 self.primary_handle.thread().unpark();
379 debug!("adding {} points to secondary buffer", new_count);
380 callback(self.secondary_buffer.lock());
381 } else {
382 let buf = if self.primary_buffer < self.secondary_buffer {
383 info!("waiting for primary buffer to flush to append {new_count} points...");
384 self.primary_handle.thread().unpark();
385 &self.primary_buffer
386 } else {
387 info!("waiting for secondary buffer to flush to append {new_count} points...");
388 self.secondary_handle.thread().unpark();
389 &self.secondary_buffer
390 };
391
392 buf.on_notify(callback);
393 }
394 }
395}
396
397pub struct NominalChannelWriter<'ds, T>
398where
399 Vec<T>: IntoPoints,
400{
401 channel: ChannelDescriptor,
402 stream: &'ds NominalDatasetStream,
403 last_flushed_at: Instant,
404 unflushed: Vec<T>,
405}
406
407impl<T> NominalChannelWriter<'_, T>
408where
409 Vec<T>: IntoPoints,
410{
411 fn new(
412 stream: &NominalDatasetStream,
413 channel: ChannelDescriptor,
414 ) -> NominalChannelWriter<'_, T> {
415 NominalChannelWriter {
416 channel,
417 stream,
418 last_flushed_at: Instant::now(),
419 unflushed: vec![],
420 }
421 }
422
423 fn push_point(&mut self, point: T) {
424 self.unflushed.push(point);
425 if self.unflushed.len() >= self.stream.opts.max_points_per_record
426 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
427 {
428 debug!(
429 "conditionally flushing {:?}, ({} points, {:?} since last)",
430 self.channel,
431 self.unflushed.len(),
432 self.last_flushed_at.elapsed()
433 );
434 self.flush();
435 }
436 }
437
438 fn flush(&mut self) {
439 if self.unflushed.is_empty() {
440 return;
441 }
442 info!(
443 "flushing writer for {:?} with {} points",
444 self.channel,
445 self.unflushed.len()
446 );
447 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
448 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
449 buf.extend(&self.channel, to_flush);
450 self.last_flushed_at = Instant::now();
451 })
452 }
453}
454
455impl<T> Drop for NominalChannelWriter<'_, T>
456where
457 Vec<T>: IntoPoints,
458{
459 fn drop(&mut self) {
460 info!("flushing then dropping writer for: {:?}", self.channel);
461 self.flush();
462 }
463}
464
465pub struct NominalDoubleWriter<'ds> {
466 writer: NominalChannelWriter<'ds, DoublePoint>,
467}
468
469impl NominalDoubleWriter<'_> {
470 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
471 self.writer.push_point(DoublePoint {
472 timestamp: Some(timestamp.into_timestamp()),
473 value,
474 });
475 }
476}
477
478pub struct NominalIntegerWriter<'ds> {
479 writer: NominalChannelWriter<'ds, IntegerPoint>,
480}
481
482impl NominalIntegerWriter<'_> {
483 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
484 self.writer.push_point(IntegerPoint {
485 timestamp: Some(timestamp.into_timestamp()),
486 value,
487 });
488 }
489}
490
491pub struct NominalUint64Writer<'ds> {
492 writer: NominalChannelWriter<'ds, Uint64Point>,
493}
494
495impl NominalUint64Writer<'_> {
496 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
497 self.writer.push_point(Uint64Point {
498 timestamp: Some(timestamp.into_timestamp()),
499 value,
500 });
501 }
502}
503
504pub struct NominalStringWriter<'ds> {
505 writer: NominalChannelWriter<'ds, StringPoint>,
506}
507
508impl NominalStringWriter<'_> {
509 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
510 self.writer.push_point(StringPoint {
511 timestamp: Some(timestamp.into_timestamp()),
512 value: value.into(),
513 });
514 }
515}
516
517pub struct NominalStructWriter<'ds> {
518 writer: NominalChannelWriter<'ds, StructPoint>,
519}
520
521impl NominalStructWriter<'_> {
522 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
523 self.writer.push_point(StructPoint {
524 timestamp: Some(timestamp.into_timestamp()),
525 json_string: value.into(),
526 });
527 }
528}
529
530struct SeriesBuffer {
531 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
532 count: AtomicUsize,
537 flush_time: AtomicU64,
538 condvar: Condvar,
539 max_capacity: usize,
540}
541
542struct SeriesBufferGuard<'sb> {
543 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
544 count: &'sb AtomicUsize,
545}
546
547impl SeriesBufferGuard<'_> {
548 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
549 let points = points.into_points();
550 let new_point_count = points_len(&points);
551
552 if !self.sb.contains_key(channel_descriptor) {
553 self.sb.insert(channel_descriptor.clone(), points);
554 } else {
555 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
556 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
557 existing.points.extend(new.points)
558 }
559 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
560 existing.points.extend(new.points)
561 }
562 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
563 existing.points.extend(new.points)
564 }
565 (
566 PointsType::ArrayPoints(ArrayPoints {
567 array_type: Some(ArrayType::DoubleArrayPoints(existing)),
568 }),
569 PointsType::ArrayPoints(ArrayPoints {
570 array_type: Some(ArrayType::DoubleArrayPoints(new)),
571 }),
572 ) => existing.points.extend(new.points),
573 (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
574 existing.points.extend(new.points)
575 }
576 (
577 PointsType::ArrayPoints(ArrayPoints {
578 array_type: Some(ArrayType::StringArrayPoints(existing)),
579 }),
580 PointsType::ArrayPoints(ArrayPoints {
581 array_type: Some(ArrayType::StringArrayPoints(new)),
582 }),
583 ) => existing.points.extend(new.points),
584 (
585 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
586 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
587 ) => {}
588 (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
589 existing.points.extend(new.points);
590 }
591 (
593 PointsType::DoublePoints(_),
594 PointsType::IntegerPoints(_)
595 | PointsType::Uint64Points(_)
596 | PointsType::StringPoints(_)
597 | PointsType::ArrayPoints(_)
598 | PointsType::StructPoints(_),
599 )
600 | (
601 PointsType::StringPoints(_),
602 PointsType::DoublePoints(_)
603 | PointsType::IntegerPoints(_)
604 | PointsType::Uint64Points(_)
605 | PointsType::ArrayPoints(_)
606 | PointsType::StructPoints(_),
607 )
608 | (
609 PointsType::IntegerPoints(_),
610 PointsType::DoublePoints(_)
611 | PointsType::Uint64Points(_)
612 | PointsType::StringPoints(_)
613 | PointsType::ArrayPoints(_)
614 | PointsType::StructPoints(_),
615 )
616 | (
617 PointsType::ArrayPoints(_),
618 PointsType::DoublePoints(_)
619 | PointsType::Uint64Points(_)
620 | PointsType::StringPoints(_)
621 | PointsType::IntegerPoints(_)
622 | PointsType::StructPoints(_),
623 )
624 | (
625 PointsType::ArrayPoints(ArrayPoints {
626 array_type: Some(_),
627 }),
628 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
629 )
630 | (
631 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
632 PointsType::ArrayPoints(ArrayPoints {
633 array_type: Some(_),
634 }),
635 )
636 | (
637 PointsType::ArrayPoints(ArrayPoints {
638 array_type: Some(ArrayType::DoubleArrayPoints(_)),
639 }),
640 PointsType::ArrayPoints(ArrayPoints {
641 array_type: Some(ArrayType::StringArrayPoints(_)),
642 }),
643 )
644 | (
645 PointsType::ArrayPoints(ArrayPoints {
646 array_type: Some(ArrayType::StringArrayPoints(_)),
647 }),
648 PointsType::ArrayPoints(ArrayPoints {
649 array_type: Some(ArrayType::DoubleArrayPoints(_)),
650 }),
651 )
652 | (
653 PointsType::Uint64Points(_),
654 PointsType::IntegerPoints(_)
655 | PointsType::StringPoints(_)
656 | PointsType::DoublePoints(_)
657 | PointsType::ArrayPoints(_)
658 | PointsType::StructPoints(_),
659 )
660 | (
661 PointsType::StructPoints(_),
662 PointsType::DoublePoints(_)
663 | PointsType::Uint64Points(_)
664 | PointsType::StringPoints(_)
665 | PointsType::IntegerPoints(_)
666 | PointsType::ArrayPoints(_),
667 ) => {
668 panic!("mismatched types");
670 }
671 }
672 }
673
674 self.count.fetch_add(new_point_count, Ordering::Release);
675 }
676}
677
678impl PartialEq for SeriesBuffer {
679 fn eq(&self, other: &Self) -> bool {
680 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
681 }
682}
683
684impl PartialOrd for SeriesBuffer {
685 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
686 let flush_time = self.flush_time.load(Ordering::Acquire);
687 let other_flush_time = other.flush_time.load(Ordering::Acquire);
688 flush_time.partial_cmp(&other_flush_time)
689 }
690}
691
692impl SeriesBuffer {
693 fn new(capacity: usize) -> Self {
694 Self {
695 points: Mutex::new(HashMap::new()),
696 count: AtomicUsize::new(0),
697 flush_time: AtomicU64::new(0),
698 condvar: Condvar::new(),
699 max_capacity: capacity,
700 }
701 }
702
703 fn has_capacity(&self, new_points_count: usize) -> bool {
708 let count = self.count.load(Ordering::Acquire);
709 count == 0 || count + new_points_count <= self.max_capacity
710 }
711
712 fn lock(&self) -> SeriesBufferGuard<'_> {
713 SeriesBufferGuard {
714 sb: self.points.lock(),
715 count: &self.count,
716 }
717 }
718
719 fn take(&self) -> (usize, Vec<Series>) {
720 let mut points = self.lock();
721 self.flush_time.store(
722 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
723 Ordering::Release,
724 );
725 let result = points
726 .sb
727 .drain()
728 .map(|(ChannelDescriptor { name, tags }, points)| {
729 let channel = Channel { name };
730 let points_obj = Points {
731 points_type: Some(points),
732 };
733 Series {
734 channel: Some(channel),
735 tags: tags
736 .map(|tags| tags.into_iter().collect())
737 .unwrap_or_default(),
738 points: Some(points_obj),
739 }
740 })
741 .collect();
742 let result_count = points
743 .count
744 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
745 .unwrap();
746 (result_count, result)
747 }
748
749 fn is_empty(&self) -> bool {
750 self.count() == 0
751 }
752
753 fn count(&self) -> usize {
754 self.count.load(Ordering::Acquire)
755 }
756
757 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
758 let mut points_lock = self.points.lock();
759 if !points_lock.is_empty() {
762 self.condvar.wait(&mut points_lock);
763 } else {
764 debug!("buffer emptied since last check, skipping condvar wait");
765 }
766 on_notify(SeriesBufferGuard {
767 sb: points_lock,
768 count: &self.count,
769 });
770 }
771
772 fn notify(&self) -> bool {
773 self.condvar.notify_one()
774 }
775}
776
777fn batch_processor(
778 running: Arc<AtomicBool>,
779 points_buffer: Arc<SeriesBuffer>,
780 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
781 max_request_delay: Duration,
782) {
783 loop {
784 debug!("starting processor loop");
785 if points_buffer.is_empty() {
786 if !running.load(Ordering::Acquire) {
787 debug!("batch processor thread exiting due to running flag");
788 drop(request_chan);
789 break;
790 } else {
791 debug!("empty points buffer, waiting");
792 thread::park_timeout(max_request_delay);
793 }
794 continue;
795 }
796 let (point_count, series) = points_buffer.take();
797
798 if points_buffer.notify() {
799 debug!("notified one waiting thread after clearing points buffer");
800 }
801
802 let write_request = WriteRequestNominal { series };
803
804 if request_chan.is_full() {
805 debug!("ready to queue request but request channel is full");
806 }
807 let rep = request_chan.send((write_request, point_count));
808 debug!("queued request for processing");
809 if rep.is_err() {
810 error!("failed to send request to dispatcher");
811 } else {
812 debug!("finished submitting request");
813 }
814
815 thread::park_timeout(max_request_delay);
816 }
817 debug!("batch processor thread exiting");
818}
819
820impl Drop for NominalDatasetStream {
821 fn drop(&mut self) {
822 debug!("starting drop for NominalDatasetStream");
823 self.running.store(false, Ordering::Release);
824 loop {
825 let count = self.unflushed_points.load(Ordering::Acquire);
826 if count == 0 {
827 break;
828 }
829 debug!(
830 "waiting for all points to be flushed before dropping stream, {count} points remaining",
831 );
832 thread::sleep(Duration::from_millis(50));
834 }
835 }
836}
837
838fn request_dispatcher<C: WriteRequestConsumer + 'static>(
839 running: Arc<AtomicBool>,
840 unflushed_points: Arc<AtomicUsize>,
841 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
842 consumer: Arc<C>,
843) {
844 let mut total_request_time = 0;
845 loop {
846 match request_rx.recv() {
847 Ok((request, point_count)) => {
848 debug!("received writerequest from channel");
849 let req_start = Instant::now();
850 match consumer.consume(&request) {
851 Ok(_) => {
852 let time = req_start.elapsed().as_millis();
853 debug!("request of {} points sent in {} ms", point_count, time);
854 total_request_time += time as u64;
855 }
856 Err(e) => {
857 error!("Failed to send request: {e:?}");
858 }
859 }
860 unflushed_points.fetch_sub(point_count, Ordering::Release);
861
862 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
863 {
864 info!("all points flushed, closing dispatcher thread");
865 drop(request_rx);
867 break;
868 }
869 }
870 Err(e) => {
871 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
872 break;
873 }
874 }
875 }
876 debug!(
877 "request dispatcher thread exiting. total request time: {}",
878 total_request_time
879 );
880}
881
882fn points_len(points_type: &PointsType) -> usize {
883 match points_type {
884 PointsType::DoublePoints(points) => points.points.len(),
885 PointsType::StringPoints(points) => points.points.len(),
886 PointsType::IntegerPoints(points) => points.points.len(),
887 PointsType::Uint64Points(points) => points.points.len(),
888 PointsType::ArrayPoints(points) => match &points.array_type {
889 Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
890 Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
891 None => 0,
892 },
893 PointsType::StructPoints(points) => points.points.len(),
894 }
895}