1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::array_points::ArrayType;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::ArrayPoints;
19use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::Points;
23use nominal_api::tonic::io::nominal::scout::api::proto::Series;
24use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
25use nominal_api::tonic::io::nominal::scout::api::proto::StructPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::Uint64Point;
27use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
28use parking_lot::Condvar;
29use parking_lot::Mutex;
30use parking_lot::MutexGuard;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34
35use crate::client::NominalApiClients;
36use crate::client::PRODUCTION_API_URL;
37use crate::consumer::AvroFileConsumer;
38use crate::consumer::DualWriteRequestConsumer;
39use crate::consumer::ListeningWriteRequestConsumer;
40use crate::consumer::NominalCoreConsumer;
41use crate::consumer::RequestConsumerWithFallback;
42use crate::consumer::WriteRequestConsumer;
43use crate::listener::LoggingListener;
44use crate::types::ChannelDescriptor;
45use crate::types::IntoPoints;
46use crate::types::IntoTimestamp;
47
48#[derive(Debug, Clone)]
49pub struct NominalStreamOpts {
50 pub max_points_per_record: usize,
51 pub max_request_delay: Duration,
52 pub max_buffered_requests: usize,
53 pub request_dispatcher_tasks: usize,
54 pub base_api_url: String,
55}
56
57impl Default for NominalStreamOpts {
58 fn default() -> Self {
59 Self {
60 max_points_per_record: 250_000,
61 max_request_delay: Duration::from_millis(100),
62 max_buffered_requests: 4,
63 request_dispatcher_tasks: 8,
64 base_api_url: PRODUCTION_API_URL.to_string(),
65 }
66 }
67}
68
69#[derive(Default)]
70pub struct NominalDatasetStreamBuilder {
71 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
72 stream_to_file: Option<PathBuf>,
73 file_fallback: Option<PathBuf>,
74 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
75 opts: NominalStreamOpts,
76}
77
78impl Debug for NominalDatasetStreamBuilder {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 f.debug_struct("NominalDatasetStreamBuilder")
81 .field("stream_to_core", &self.stream_to_core.is_some())
82 .field("stream_to_file", &self.stream_to_file)
83 .field("file_fallback", &self.file_fallback)
84 .field("listeners", &self.listeners.len())
85 .finish()
86 }
87}
88
89impl NominalDatasetStreamBuilder {
90 pub fn new() -> Self {
91 Self::default()
92 }
93}
94
95impl NominalDatasetStreamBuilder {
96 pub fn stream_to_core(
97 self,
98 bearer_token: BearerToken,
99 dataset: ResourceIdentifier,
100 handle: tokio::runtime::Handle,
101 ) -> NominalDatasetStreamBuilder {
102 NominalDatasetStreamBuilder {
103 stream_to_core: Some((bearer_token, dataset, handle)),
104 stream_to_file: self.stream_to_file,
105 file_fallback: self.file_fallback,
106 listeners: self.listeners,
107 opts: self.opts,
108 }
109 }
110
111 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
112 self.stream_to_file = Some(file_path.into());
113 self
114 }
115
116 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
117 self.file_fallback = Some(file_path.into());
118 self
119 }
120
121 pub fn add_listener(
122 mut self,
123 listener: Arc<dyn crate::listener::NominalStreamListener>,
124 ) -> Self {
125 self.listeners.push(listener);
126 self
127 }
128
129 pub fn with_listeners(
130 mut self,
131 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
132 ) -> Self {
133 self.listeners = listeners;
134 self
135 }
136
137 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
138 self.opts = opts;
139 self
140 }
141
142 #[cfg(feature = "logging")]
143 fn init_logging(self, directive: Option<&str>) -> Self {
144 use tracing_subscriber::layer::SubscriberExt;
145 use tracing_subscriber::util::SubscriberInitExt;
146
147 let base = tracing_subscriber::EnvFilter::builder()
149 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
150 let env_filter = match directive {
151 Some(d) => base.parse_lossy(d),
152 None => base.from_env_lossy(),
153 };
154
155 let subscriber = tracing_subscriber::registry()
156 .with(
157 tracing_subscriber::fmt::layer()
158 .with_thread_ids(true)
159 .with_thread_names(true)
160 .with_line_number(true),
161 )
162 .with(env_filter);
163
164 if let Err(error) = subscriber.try_init() {
165 eprintln!("nominal streaming failed to enable logging: {error}");
166 }
167
168 self
169 }
170
171 #[cfg(feature = "logging")]
172 pub fn enable_logging(self) -> Self {
173 self.init_logging(None)
174 }
175
176 #[cfg(feature = "logging")]
177 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
178 self.init_logging(Some(log_directive))
179 }
180
181 pub fn build(self) -> NominalDatasetStream {
182 let core_consumer = self.core_consumer();
183 let file_consumer = self.file_consumer();
184 let fallback_consumer = self.fallback_consumer();
185
186 match (core_consumer, file_consumer, fallback_consumer) {
187 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
188 (Some(_), Some(_), Some(_)) => {
189 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
190 }
191 (Some(core), None, None) => self.into_stream(core),
192 (Some(core), None, Some(fallback)) => {
193 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
194 }
195 (None, Some(file), None) => self.into_stream(file),
196 (None, Some(file), Some(fallback)) => {
197 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
199 }
200 (Some(core), Some(file), None) => {
201 self.into_stream(DualWriteRequestConsumer::new(core, file))
202 }
203 }
204 }
205
206 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
207 self.stream_to_core
208 .as_ref()
209 .map(|(auth_provider, dataset, handle)| {
210 NominalCoreConsumer::new(
211 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
212 handle.clone(),
213 auth_provider.clone(),
214 dataset.clone(),
215 )
216 })
217 }
218
219 fn file_consumer(&self) -> Option<AvroFileConsumer> {
220 self.stream_to_file
221 .as_ref()
222 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
223 }
224
225 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
226 self.file_fallback
227 .as_ref()
228 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
229 }
230
231 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
232 let mut listeners = self.listeners;
233 listeners.push(Arc::new(LoggingListener));
234 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
235 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
236 }
237}
238
239#[deprecated]
241pub type NominalDatasourceStream = NominalDatasetStream;
242
243pub struct NominalDatasetStream {
244 opts: NominalStreamOpts,
245 running: Arc<AtomicBool>,
246 unflushed_points: Arc<AtomicUsize>,
247 primary_buffer: Arc<SeriesBuffer>,
248 secondary_buffer: Arc<SeriesBuffer>,
249 primary_handle: thread::JoinHandle<()>,
250 secondary_handle: thread::JoinHandle<()>,
251}
252
253impl NominalDatasetStream {
254 pub fn builder() -> NominalDatasetStreamBuilder {
255 NominalDatasetStreamBuilder::new()
256 }
257
258 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
259 consumer: C,
260 opts: NominalStreamOpts,
261 ) -> Self {
262 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
263 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
264
265 let (request_tx, request_rx) =
266 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
267
268 let running = Arc::new(AtomicBool::new(true));
269 let unflushed_points = Arc::new(AtomicUsize::new(0));
270
271 let primary_handle = thread::Builder::new()
272 .name("nmstream_primary".to_string())
273 .spawn({
274 let points_buffer = Arc::clone(&primary_buffer);
275 let running = running.clone();
276 let tx = request_tx.clone();
277 move || {
278 batch_processor(running, points_buffer, tx, opts.max_request_delay);
279 }
280 })
281 .unwrap();
282
283 let secondary_handle = thread::Builder::new()
284 .name("nmstream_secondary".to_string())
285 .spawn({
286 let secondary_buffer = Arc::clone(&secondary_buffer);
287 let running = running.clone();
288 move || {
289 batch_processor(
290 running,
291 secondary_buffer,
292 request_tx,
293 opts.max_request_delay,
294 );
295 }
296 })
297 .unwrap();
298
299 let consumer = Arc::new(consumer);
300
301 for i in 0..opts.request_dispatcher_tasks {
302 thread::Builder::new()
303 .name(format!("nmstream_dispatch_{i}"))
304 .spawn({
305 let running = Arc::clone(&running);
306 let unflushed_points = Arc::clone(&unflushed_points);
307 let rx = request_rx.clone();
308 let consumer = consumer.clone();
309 move || {
310 debug!("starting request dispatcher");
311 request_dispatcher(running, unflushed_points, rx, consumer);
312 }
313 })
314 .unwrap();
315 }
316
317 NominalDatasetStream {
318 opts,
319 running,
320 unflushed_points,
321 primary_buffer,
322 secondary_buffer,
323 primary_handle,
324 secondary_handle,
325 }
326 }
327
328 pub fn double_writer<'a>(
329 &'a self,
330 channel_descriptor: &'a ChannelDescriptor,
331 ) -> NominalDoubleWriter<'a> {
332 NominalDoubleWriter {
333 writer: NominalChannelWriter::new(self, channel_descriptor),
334 }
335 }
336
337 pub fn string_writer<'a>(
338 &'a self,
339 channel_descriptor: &'a ChannelDescriptor,
340 ) -> NominalStringWriter<'a> {
341 NominalStringWriter {
342 writer: NominalChannelWriter::new(self, channel_descriptor),
343 }
344 }
345
346 pub fn integer_writer<'a>(
347 &'a self,
348 channel_descriptor: &'a ChannelDescriptor,
349 ) -> NominalIntegerWriter<'a> {
350 NominalIntegerWriter {
351 writer: NominalChannelWriter::new(self, channel_descriptor),
352 }
353 }
354
355 pub fn uint64_writer<'a>(
356 &'a self,
357 channel_descriptor: &'a ChannelDescriptor,
358 ) -> NominalUint64Writer<'a> {
359 NominalUint64Writer {
360 writer: NominalChannelWriter::new(self, channel_descriptor),
361 }
362 }
363
364 pub fn struct_writer<'a>(
365 &'a self,
366 channel_descriptor: &'a ChannelDescriptor,
367 ) -> NominalStructWriter<'a> {
368 NominalStructWriter {
369 writer: NominalChannelWriter::new(self, channel_descriptor),
370 }
371 }
372 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
373 let new_points = new_points.into_points();
374 let new_count = points_len(&new_points);
375
376 self.when_capacity(new_count, |mut sb| {
377 sb.extend(channel_descriptor, new_points)
378 });
379 }
380
381 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
382 self.unflushed_points
383 .fetch_add(new_count, Ordering::Release);
384
385 if self.primary_buffer.has_capacity(new_count) {
386 debug!("adding {} points to primary buffer", new_count);
387 callback(self.primary_buffer.lock());
388 } else if self.secondary_buffer.has_capacity(new_count) {
389 self.primary_handle.thread().unpark();
391 debug!("adding {} points to secondary buffer", new_count);
392 callback(self.secondary_buffer.lock());
393 } else {
394 let buf = if self.primary_buffer < self.secondary_buffer {
395 info!("waiting for primary buffer to flush to append {new_count} points...");
396 self.primary_handle.thread().unpark();
397 &self.primary_buffer
398 } else {
399 info!("waiting for secondary buffer to flush to append {new_count} points...");
400 self.secondary_handle.thread().unpark();
401 &self.secondary_buffer
402 };
403
404 buf.on_notify(callback);
405 }
406 }
407}
408
409pub struct NominalChannelWriter<'ds, T>
410where
411 Vec<T>: IntoPoints,
412{
413 channel: &'ds ChannelDescriptor,
414 stream: &'ds NominalDatasetStream,
415 last_flushed_at: Instant,
416 unflushed: Vec<T>,
417}
418
419impl<T> NominalChannelWriter<'_, T>
420where
421 Vec<T>: IntoPoints,
422{
423 fn new<'ds>(
424 stream: &'ds NominalDatasetStream,
425 channel: &'ds ChannelDescriptor,
426 ) -> NominalChannelWriter<'ds, T> {
427 NominalChannelWriter {
428 channel,
429 stream,
430 last_flushed_at: Instant::now(),
431 unflushed: vec![],
432 }
433 }
434
435 fn push_point(&mut self, point: T) {
436 self.unflushed.push(point);
437 if self.unflushed.len() >= self.stream.opts.max_points_per_record
438 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
439 {
440 debug!(
441 "conditionally flushing {:?}, ({} points, {:?} since last)",
442 self.channel,
443 self.unflushed.len(),
444 self.last_flushed_at.elapsed()
445 );
446 self.flush();
447 }
448 }
449
450 fn flush(&mut self) {
451 if self.unflushed.is_empty() {
452 return;
453 }
454 info!(
455 "flushing writer for {:?} with {} points",
456 self.channel,
457 self.unflushed.len()
458 );
459 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
460 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
461 buf.extend(self.channel, to_flush);
462 self.last_flushed_at = Instant::now();
463 })
464 }
465}
466
467impl<T> Drop for NominalChannelWriter<'_, T>
468where
469 Vec<T>: IntoPoints,
470{
471 fn drop(&mut self) {
472 info!("flushing then dropping writer for: {:?}", self.channel);
473 self.flush();
474 }
475}
476
477pub struct NominalDoubleWriter<'ds> {
478 writer: NominalChannelWriter<'ds, DoublePoint>,
479}
480
481impl NominalDoubleWriter<'_> {
482 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
483 self.writer.push_point(DoublePoint {
484 timestamp: Some(timestamp.into_timestamp()),
485 value,
486 });
487 }
488}
489
490pub struct NominalIntegerWriter<'ds> {
491 writer: NominalChannelWriter<'ds, IntegerPoint>,
492}
493
494impl NominalIntegerWriter<'_> {
495 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
496 self.writer.push_point(IntegerPoint {
497 timestamp: Some(timestamp.into_timestamp()),
498 value,
499 });
500 }
501}
502
503pub struct NominalUint64Writer<'ds> {
504 writer: NominalChannelWriter<'ds, Uint64Point>,
505}
506
507impl NominalUint64Writer<'_> {
508 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: u64) {
509 self.writer.push_point(Uint64Point {
510 timestamp: Some(timestamp.into_timestamp()),
511 value,
512 });
513 }
514}
515
516pub struct NominalStringWriter<'ds> {
517 writer: NominalChannelWriter<'ds, StringPoint>,
518}
519
520impl NominalStringWriter<'_> {
521 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
522 self.writer.push_point(StringPoint {
523 timestamp: Some(timestamp.into_timestamp()),
524 value: value.into(),
525 });
526 }
527}
528
529pub struct NominalStructWriter<'ds> {
530 writer: NominalChannelWriter<'ds, StructPoint>,
531}
532
533impl NominalStructWriter<'_> {
534 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
535 self.writer.push_point(StructPoint {
536 timestamp: Some(timestamp.into_timestamp()),
537 json_string: value.into(),
538 });
539 }
540}
541
542struct SeriesBuffer {
543 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
544 count: AtomicUsize,
549 flush_time: AtomicU64,
550 condvar: Condvar,
551 max_capacity: usize,
552}
553
554struct SeriesBufferGuard<'sb> {
555 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
556 count: &'sb AtomicUsize,
557}
558
559impl SeriesBufferGuard<'_> {
560 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
561 let points = points.into_points();
562 let new_point_count = points_len(&points);
563
564 if !self.sb.contains_key(channel_descriptor) {
565 self.sb.insert(channel_descriptor.clone(), points);
566 } else {
567 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
568 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
569 existing.points.extend(new.points)
570 }
571 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
572 existing.points.extend(new.points)
573 }
574 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
575 existing.points.extend(new.points)
576 }
577 (
578 PointsType::ArrayPoints(ArrayPoints {
579 array_type: Some(ArrayType::DoubleArrayPoints(existing)),
580 }),
581 PointsType::ArrayPoints(ArrayPoints {
582 array_type: Some(ArrayType::DoubleArrayPoints(new)),
583 }),
584 ) => existing.points.extend(new.points),
585 (PointsType::Uint64Points(existing), PointsType::Uint64Points(new)) => {
586 existing.points.extend(new.points)
587 }
588 (
589 PointsType::ArrayPoints(ArrayPoints {
590 array_type: Some(ArrayType::StringArrayPoints(existing)),
591 }),
592 PointsType::ArrayPoints(ArrayPoints {
593 array_type: Some(ArrayType::StringArrayPoints(new)),
594 }),
595 ) => existing.points.extend(new.points),
596 (
597 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
598 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
599 ) => {}
600 (PointsType::StructPoints(existing), PointsType::StructPoints(new)) => {
601 existing.points.extend(new.points);
602 }
603 (
605 PointsType::DoublePoints(_),
606 PointsType::IntegerPoints(_)
607 | PointsType::Uint64Points(_)
608 | PointsType::StringPoints(_)
609 | PointsType::ArrayPoints(_)
610 | PointsType::StructPoints(_),
611 )
612 | (
613 PointsType::StringPoints(_),
614 PointsType::DoublePoints(_)
615 | PointsType::IntegerPoints(_)
616 | PointsType::Uint64Points(_)
617 | PointsType::ArrayPoints(_)
618 | PointsType::StructPoints(_),
619 )
620 | (
621 PointsType::IntegerPoints(_),
622 PointsType::DoublePoints(_)
623 | PointsType::Uint64Points(_)
624 | PointsType::StringPoints(_)
625 | PointsType::ArrayPoints(_)
626 | PointsType::StructPoints(_),
627 )
628 | (
629 PointsType::ArrayPoints(_),
630 PointsType::DoublePoints(_)
631 | PointsType::Uint64Points(_)
632 | PointsType::StringPoints(_)
633 | PointsType::IntegerPoints(_)
634 | PointsType::StructPoints(_),
635 )
636 | (
637 PointsType::ArrayPoints(ArrayPoints {
638 array_type: Some(_),
639 }),
640 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
641 )
642 | (
643 PointsType::ArrayPoints(ArrayPoints { array_type: None }),
644 PointsType::ArrayPoints(ArrayPoints {
645 array_type: Some(_),
646 }),
647 )
648 | (
649 PointsType::ArrayPoints(ArrayPoints {
650 array_type: Some(ArrayType::DoubleArrayPoints(_)),
651 }),
652 PointsType::ArrayPoints(ArrayPoints {
653 array_type: Some(ArrayType::StringArrayPoints(_)),
654 }),
655 )
656 | (
657 PointsType::ArrayPoints(ArrayPoints {
658 array_type: Some(ArrayType::StringArrayPoints(_)),
659 }),
660 PointsType::ArrayPoints(ArrayPoints {
661 array_type: Some(ArrayType::DoubleArrayPoints(_)),
662 }),
663 )
664 | (
665 PointsType::Uint64Points(_),
666 PointsType::IntegerPoints(_)
667 | PointsType::StringPoints(_)
668 | PointsType::DoublePoints(_)
669 | PointsType::ArrayPoints(_)
670 | PointsType::StructPoints(_),
671 )
672 | (
673 PointsType::StructPoints(_),
674 PointsType::DoublePoints(_)
675 | PointsType::Uint64Points(_)
676 | PointsType::StringPoints(_)
677 | PointsType::IntegerPoints(_)
678 | PointsType::ArrayPoints(_),
679 ) => {
680 panic!("mismatched types");
682 }
683 }
684 }
685
686 self.count.fetch_add(new_point_count, Ordering::Release);
687 }
688}
689
690impl PartialEq for SeriesBuffer {
691 fn eq(&self, other: &Self) -> bool {
692 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
693 }
694}
695
696impl PartialOrd for SeriesBuffer {
697 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
698 let flush_time = self.flush_time.load(Ordering::Acquire);
699 let other_flush_time = other.flush_time.load(Ordering::Acquire);
700 flush_time.partial_cmp(&other_flush_time)
701 }
702}
703
704impl SeriesBuffer {
705 fn new(capacity: usize) -> Self {
706 Self {
707 points: Mutex::new(HashMap::new()),
708 count: AtomicUsize::new(0),
709 flush_time: AtomicU64::new(0),
710 condvar: Condvar::new(),
711 max_capacity: capacity,
712 }
713 }
714
715 fn has_capacity(&self, new_points_count: usize) -> bool {
720 let count = self.count.load(Ordering::Acquire);
721 count == 0 || count + new_points_count <= self.max_capacity
722 }
723
724 fn lock(&self) -> SeriesBufferGuard<'_> {
725 SeriesBufferGuard {
726 sb: self.points.lock(),
727 count: &self.count,
728 }
729 }
730
731 fn take(&self) -> (usize, Vec<Series>) {
732 let mut points = self.lock();
733 self.flush_time.store(
734 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
735 Ordering::Release,
736 );
737 let result = points
738 .sb
739 .drain()
740 .map(|(ChannelDescriptor { name, tags }, points)| {
741 let channel = Channel { name };
742 let points_obj = Points {
743 points_type: Some(points),
744 };
745 Series {
746 channel: Some(channel),
747 tags: tags
748 .map(|tags| tags.into_iter().collect())
749 .unwrap_or_default(),
750 points: Some(points_obj),
751 }
752 })
753 .collect();
754 let result_count = points
755 .count
756 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
757 .unwrap();
758 (result_count, result)
759 }
760
761 fn is_empty(&self) -> bool {
762 self.count() == 0
763 }
764
765 fn count(&self) -> usize {
766 self.count.load(Ordering::Acquire)
767 }
768
769 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
770 let mut points_lock = self.points.lock();
771 if !points_lock.is_empty() {
774 self.condvar.wait(&mut points_lock);
775 } else {
776 debug!("buffer emptied since last check, skipping condvar wait");
777 }
778 on_notify(SeriesBufferGuard {
779 sb: points_lock,
780 count: &self.count,
781 });
782 }
783
784 fn notify(&self) -> bool {
785 self.condvar.notify_one()
786 }
787}
788
789fn batch_processor(
790 running: Arc<AtomicBool>,
791 points_buffer: Arc<SeriesBuffer>,
792 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
793 max_request_delay: Duration,
794) {
795 loop {
796 debug!("starting processor loop");
797 if points_buffer.is_empty() {
798 if !running.load(Ordering::Acquire) {
799 debug!("batch processor thread exiting due to running flag");
800 drop(request_chan);
801 break;
802 } else {
803 debug!("empty points buffer, waiting");
804 thread::park_timeout(max_request_delay);
805 }
806 continue;
807 }
808 let (point_count, series) = points_buffer.take();
809
810 if points_buffer.notify() {
811 debug!("notified one waiting thread after clearing points buffer");
812 }
813
814 let write_request = WriteRequestNominal { series };
815
816 if request_chan.is_full() {
817 debug!("ready to queue request but request channel is full");
818 }
819 let rep = request_chan.send((write_request, point_count));
820 debug!("queued request for processing");
821 if rep.is_err() {
822 error!("failed to send request to dispatcher");
823 } else {
824 debug!("finished submitting request");
825 }
826
827 thread::park_timeout(max_request_delay);
828 }
829 debug!("batch processor thread exiting");
830}
831
832impl Drop for NominalDatasetStream {
833 fn drop(&mut self) {
834 debug!("starting drop for NominalDatasetStream");
835 self.running.store(false, Ordering::Release);
836 loop {
837 let count = self.unflushed_points.load(Ordering::Acquire);
838 if count == 0 {
839 break;
840 }
841 debug!(
842 "waiting for all points to be flushed before dropping stream, {count} points remaining",
843 );
844 thread::sleep(Duration::from_millis(50));
846 }
847 }
848}
849
850fn request_dispatcher<C: WriteRequestConsumer + 'static>(
851 running: Arc<AtomicBool>,
852 unflushed_points: Arc<AtomicUsize>,
853 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
854 consumer: Arc<C>,
855) {
856 let mut total_request_time = 0;
857 loop {
858 match request_rx.recv() {
859 Ok((request, point_count)) => {
860 debug!("received writerequest from channel");
861 let req_start = Instant::now();
862 match consumer.consume(&request) {
863 Ok(_) => {
864 let time = req_start.elapsed().as_millis();
865 debug!("request of {} points sent in {} ms", point_count, time);
866 total_request_time += time as u64;
867 }
868 Err(e) => {
869 error!("Failed to send request: {e:?}");
870 }
871 }
872 unflushed_points.fetch_sub(point_count, Ordering::Release);
873
874 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
875 {
876 info!("all points flushed, closing dispatcher thread");
877 drop(request_rx);
879 break;
880 }
881 }
882 Err(e) => {
883 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
884 break;
885 }
886 }
887 }
888 debug!(
889 "request dispatcher thread exiting. total request time: {}",
890 total_request_time
891 );
892}
893
894fn points_len(points_type: &PointsType) -> usize {
895 match points_type {
896 PointsType::DoublePoints(points) => points.points.len(),
897 PointsType::StringPoints(points) => points.points.len(),
898 PointsType::IntegerPoints(points) => points.points.len(),
899 PointsType::Uint64Points(points) => points.points.len(),
900 PointsType::ArrayPoints(points) => match &points.array_type {
901 Some(ArrayType::DoubleArrayPoints(points)) => points.points.len(),
902 Some(ArrayType::StringArrayPoints(points)) => points.points.len(),
903 None => 0,
904 },
905 PointsType::StructPoints(points) => points.points.len(),
906 }
907}