1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::NominalApiClients;
32use crate::client::PRODUCTION_API_URL;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::listener::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46 pub max_points_per_record: usize,
47 pub max_request_delay: Duration,
48 pub max_buffered_requests: usize,
49 pub request_dispatcher_tasks: usize,
50 pub base_api_url: String,
51}
52
53impl Default for NominalStreamOpts {
54 fn default() -> Self {
55 Self {
56 max_points_per_record: 250_000,
57 max_request_delay: Duration::from_millis(100),
58 max_buffered_requests: 4,
59 request_dispatcher_tasks: 8,
60 base_api_url: PRODUCTION_API_URL.to_string(),
61 }
62 }
63}
64
65pub struct NominalDatasetStreamBuilder {
66 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
67 stream_to_file: Option<PathBuf>,
68 file_fallback: Option<PathBuf>,
69 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
70 opts: NominalStreamOpts,
71}
72
73impl Debug for NominalDatasetStreamBuilder {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("NominalDatasetStreamBuilder")
76 .field("stream_to_core", &self.stream_to_core.is_some())
77 .field("stream_to_file", &self.stream_to_file)
78 .field("file_fallback", &self.file_fallback)
79 .field("listeners", &self.listeners.len())
80 .finish()
81 }
82}
83
84impl Default for NominalDatasetStreamBuilder {
85 fn default() -> Self {
86 Self {
87 stream_to_core: None,
88 stream_to_file: None,
89 file_fallback: None,
90 listeners: Vec::new(),
91 opts: NominalStreamOpts::default(),
92 }
93 }
94}
95
96impl NominalDatasetStreamBuilder {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl NominalDatasetStreamBuilder {
103 pub fn stream_to_core(
104 self,
105 bearer_token: BearerToken,
106 dataset: ResourceIdentifier,
107 handle: tokio::runtime::Handle,
108 ) -> NominalDatasetStreamBuilder {
109 NominalDatasetStreamBuilder {
110 stream_to_core: Some((bearer_token, dataset, handle)),
111 stream_to_file: self.stream_to_file,
112 file_fallback: self.file_fallback,
113 listeners: self.listeners,
114 opts: self.opts,
115 }
116 }
117
118 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
119 self.stream_to_file = Some(file_path.into());
120 self
121 }
122
123 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
124 self.file_fallback = Some(file_path.into());
125 self
126 }
127
128 pub fn add_listener(
129 mut self,
130 listener: Arc<dyn crate::listener::NominalStreamListener>,
131 ) -> Self {
132 self.listeners.push(listener);
133 self
134 }
135
136 pub fn with_listeners(
137 mut self,
138 listeners: Vec<Arc<dyn crate::listener::NominalStreamListener>>,
139 ) -> Self {
140 self.listeners = listeners;
141 self
142 }
143
144 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
145 self.opts = opts;
146 self
147 }
148
149 #[cfg(feature = "logging")]
150 fn init_logging(self, directive: Option<&str>) -> Self {
151 use tracing_subscriber::layer::SubscriberExt;
152 use tracing_subscriber::util::SubscriberInitExt;
153
154 let base = tracing_subscriber::EnvFilter::builder()
156 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
157 let env_filter = match directive {
158 Some(d) => base.parse_lossy(d),
159 None => base.from_env_lossy(),
160 };
161
162 let subscriber = tracing_subscriber::registry()
163 .with(
164 tracing_subscriber::fmt::layer()
165 .with_thread_ids(true)
166 .with_thread_names(true)
167 .with_line_number(true),
168 )
169 .with(env_filter);
170
171 if let Err(error) = subscriber.try_init() {
172 eprintln!("nominal streaming failed to enable logging: {error}");
173 }
174
175 self
176 }
177
178 #[cfg(feature = "logging")]
179 pub fn enable_logging(self) -> Self {
180 self.init_logging(None)
181 }
182
183 #[cfg(feature = "logging")]
184 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
185 self.init_logging(Some(log_directive))
186 }
187
188 pub fn build(self) -> NominalDatasetStream {
189 let core_consumer = self.core_consumer();
190 let file_consumer = self.file_consumer();
191 let fallback_consumer = self.fallback_consumer();
192
193 match (core_consumer, file_consumer, fallback_consumer) {
194 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
195 (Some(_), Some(_), Some(_)) => {
196 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
197 }
198 (Some(core), None, None) => self.into_stream(core),
199 (Some(core), None, Some(fallback)) => {
200 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
201 }
202 (None, Some(file), None) => self.into_stream(file),
203 (None, Some(file), Some(fallback)) => {
204 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
206 }
207 (Some(core), Some(file), None) => {
208 self.into_stream(DualWriteRequestConsumer::new(core, file))
209 }
210 }
211 }
212
213 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
214 self.stream_to_core
215 .as_ref()
216 .map(|(auth_provider, dataset, handle)| {
217 NominalCoreConsumer::new(
218 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
219 handle.clone(),
220 auth_provider.clone(),
221 dataset.clone(),
222 )
223 })
224 }
225
226 fn file_consumer(&self) -> Option<AvroFileConsumer> {
227 self.stream_to_file
228 .as_ref()
229 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
230 }
231
232 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
233 self.file_fallback
234 .as_ref()
235 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
236 }
237
238 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
239 let mut listeners = self.listeners;
240 listeners.push(Arc::new(LoggingListener));
241 let listening_consumer = ListeningWriteRequestConsumer::new(consumer, listeners);
242 NominalDatasetStream::new_with_consumer(listening_consumer, self.opts)
243 }
244}
245
246#[deprecated]
248pub type NominalDatasourceStream = NominalDatasetStream;
249
250pub struct NominalDatasetStream {
251 opts: NominalStreamOpts,
252 running: Arc<AtomicBool>,
253 unflushed_points: Arc<AtomicUsize>,
254 primary_buffer: Arc<SeriesBuffer>,
255 secondary_buffer: Arc<SeriesBuffer>,
256 primary_handle: thread::JoinHandle<()>,
257 secondary_handle: thread::JoinHandle<()>,
258}
259
260impl NominalDatasetStream {
261 pub fn builder() -> NominalDatasetStreamBuilder {
262 NominalDatasetStreamBuilder::new()
263 }
264
265 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
266 consumer: C,
267 opts: NominalStreamOpts,
268 ) -> Self {
269 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
270 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
271
272 let (request_tx, request_rx) =
273 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
274
275 let running = Arc::new(AtomicBool::new(true));
276 let unflushed_points = Arc::new(AtomicUsize::new(0));
277
278 let primary_handle = thread::Builder::new()
279 .name("nmstream_primary".to_string())
280 .spawn({
281 let points_buffer = Arc::clone(&primary_buffer);
282 let running = running.clone();
283 let tx = request_tx.clone();
284 move || {
285 batch_processor(running, points_buffer, tx, opts.max_request_delay);
286 }
287 })
288 .unwrap();
289
290 let secondary_handle = thread::Builder::new()
291 .name("nmstream_secondary".to_string())
292 .spawn({
293 let secondary_buffer = Arc::clone(&secondary_buffer);
294 let running = running.clone();
295 move || {
296 batch_processor(
297 running,
298 secondary_buffer,
299 request_tx,
300 opts.max_request_delay,
301 );
302 }
303 })
304 .unwrap();
305
306 let consumer = Arc::new(consumer);
307
308 for i in 0..opts.request_dispatcher_tasks {
309 thread::Builder::new()
310 .name(format!("nmstream_dispatch_{i}"))
311 .spawn({
312 let running = Arc::clone(&running);
313 let unflushed_points = Arc::clone(&unflushed_points);
314 let rx = request_rx.clone();
315 let consumer = consumer.clone();
316 move || {
317 debug!("starting request dispatcher");
318 request_dispatcher(running, unflushed_points, rx, consumer);
319 }
320 })
321 .unwrap();
322 }
323
324 NominalDatasetStream {
325 opts,
326 running,
327 unflushed_points,
328 primary_buffer,
329 secondary_buffer,
330 primary_handle,
331 secondary_handle,
332 }
333 }
334
335 pub fn double_writer<'a>(
336 &'a self,
337 channel_descriptor: &'a ChannelDescriptor,
338 ) -> NominalDoubleWriter<'a> {
339 NominalDoubleWriter {
340 writer: NominalChannelWriter::new(self, channel_descriptor),
341 }
342 }
343
344 pub fn string_writer<'a>(
345 &'a self,
346 channel_descriptor: &'a ChannelDescriptor,
347 ) -> NominalStringWriter<'a> {
348 NominalStringWriter {
349 writer: NominalChannelWriter::new(self, channel_descriptor),
350 }
351 }
352
353 pub fn integer_writer<'a>(
354 &'a self,
355 channel_descriptor: &'a ChannelDescriptor,
356 ) -> NominalIntegerWriter<'a> {
357 NominalIntegerWriter {
358 writer: NominalChannelWriter::new(self, channel_descriptor),
359 }
360 }
361
362 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
363 let new_points = new_points.into_points();
364 let new_count = points_len(&new_points);
365
366 self.when_capacity(new_count, |mut sb| {
367 sb.extend(channel_descriptor, new_points)
368 });
369 }
370
371 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
372 self.unflushed_points
373 .fetch_add(new_count, Ordering::Release);
374
375 if self.primary_buffer.has_capacity(new_count) {
376 debug!("adding {} points to primary buffer", new_count);
377 callback(self.primary_buffer.lock());
378 } else if self.secondary_buffer.has_capacity(new_count) {
379 self.primary_handle.thread().unpark();
381 debug!("adding {} points to secondary buffer", new_count);
382 callback(self.secondary_buffer.lock());
383 } else {
384 let buf = if self.primary_buffer < self.secondary_buffer {
385 info!("waiting for primary buffer to flush to append {new_count} points...");
386 self.primary_handle.thread().unpark();
387 &self.primary_buffer
388 } else {
389 info!("waiting for secondary buffer to flush to append {new_count} points...");
390 self.secondary_handle.thread().unpark();
391 &self.secondary_buffer
392 };
393
394 buf.on_notify(callback);
395 }
396 }
397}
398
399pub struct NominalChannelWriter<'ds, T>
400where
401 Vec<T>: IntoPoints,
402{
403 channel: &'ds ChannelDescriptor,
404 stream: &'ds NominalDatasetStream,
405 last_flushed_at: Instant,
406 unflushed: Vec<T>,
407}
408
409impl<T> NominalChannelWriter<'_, T>
410where
411 Vec<T>: IntoPoints,
412{
413 fn new<'ds>(
414 stream: &'ds NominalDatasetStream,
415 channel: &'ds ChannelDescriptor,
416 ) -> NominalChannelWriter<'ds, T> {
417 NominalChannelWriter {
418 channel,
419 stream,
420 last_flushed_at: Instant::now(),
421 unflushed: vec![],
422 }
423 }
424
425 fn push_point(&mut self, point: T) {
426 self.unflushed.push(point);
427 if self.unflushed.len() >= self.stream.opts.max_points_per_record
428 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
429 {
430 debug!(
431 "conditionally flushing {:?}, ({} points, {:?} since last)",
432 self.channel,
433 self.unflushed.len(),
434 self.last_flushed_at.elapsed()
435 );
436 self.flush();
437 }
438 }
439
440 fn flush(&mut self) {
441 if self.unflushed.is_empty() {
442 return;
443 }
444 info!(
445 "flushing writer for {:?} with {} points",
446 self.channel,
447 self.unflushed.len()
448 );
449 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
450 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
451 buf.extend(self.channel, to_flush);
452 self.last_flushed_at = Instant::now();
453 })
454 }
455}
456
457impl<T> Drop for NominalChannelWriter<'_, T>
458where
459 Vec<T>: IntoPoints,
460{
461 fn drop(&mut self) {
462 info!("flushing then dropping writer for: {:?}", self.channel);
463 self.flush();
464 }
465}
466
467pub struct NominalDoubleWriter<'ds> {
468 writer: NominalChannelWriter<'ds, DoublePoint>,
469}
470
471impl NominalDoubleWriter<'_> {
472 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
473 self.writer.push_point(DoublePoint {
474 timestamp: Some(timestamp.into_timestamp()),
475 value,
476 });
477 }
478}
479
480pub struct NominalIntegerWriter<'ds> {
481 writer: NominalChannelWriter<'ds, IntegerPoint>,
482}
483
484impl NominalIntegerWriter<'_> {
485 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
486 self.writer.push_point(IntegerPoint {
487 timestamp: Some(timestamp.into_timestamp()),
488 value,
489 });
490 }
491}
492
493pub struct NominalStringWriter<'ds> {
494 writer: NominalChannelWriter<'ds, StringPoint>,
495}
496
497impl NominalStringWriter<'_> {
498 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
499 self.writer.push_point(StringPoint {
500 timestamp: Some(timestamp.into_timestamp()),
501 value: value.into(),
502 });
503 }
504}
505
506struct SeriesBuffer {
507 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
508 count: AtomicUsize,
509 flush_time: AtomicU64,
510 condvar: Condvar,
511 max_capacity: usize,
512}
513
514struct SeriesBufferGuard<'sb> {
515 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
516 count: &'sb AtomicUsize,
517}
518
519impl SeriesBufferGuard<'_> {
520 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
521 let points = points.into_points();
522 let new_point_count = points_len(&points);
523
524 if !self.sb.contains_key(channel_descriptor) {
525 self.sb.insert(channel_descriptor.clone(), points);
526 } else {
527 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
528 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
529 existing.points.extend(new.points)
530 }
531 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
532 existing.points.extend(new.points)
533 }
534 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
535 existing.points.extend(new.points)
536 }
537 (_, _) => {
538 panic!("mismatched types");
540 }
541 }
542 }
543
544 self.count.fetch_add(new_point_count, Ordering::Release);
545 }
546}
547
548impl PartialEq for SeriesBuffer {
549 fn eq(&self, other: &Self) -> bool {
550 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
551 }
552}
553
554impl PartialOrd for SeriesBuffer {
555 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
556 let flush_time = self.flush_time.load(Ordering::Acquire);
557 let other_flush_time = other.flush_time.load(Ordering::Acquire);
558 flush_time.partial_cmp(&other_flush_time)
559 }
560}
561
562impl SeriesBuffer {
563 fn new(capacity: usize) -> Self {
564 Self {
565 points: Mutex::new(HashMap::new()),
566 count: AtomicUsize::new(0),
567 flush_time: AtomicU64::new(0),
568 condvar: Condvar::new(),
569 max_capacity: capacity,
570 }
571 }
572
573 fn has_capacity(&self, new_points_count: usize) -> bool {
578 let count = self.count.load(Ordering::Acquire);
579 count == 0 || count + new_points_count <= self.max_capacity
580 }
581
582 fn lock(&self) -> SeriesBufferGuard<'_> {
583 SeriesBufferGuard {
584 sb: self.points.lock(),
585 count: &self.count,
586 }
587 }
588
589 fn take(&self) -> (usize, Vec<Series>) {
590 let mut points = self.lock();
591 self.flush_time.store(
592 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
593 Ordering::Release,
594 );
595 let result = points
596 .sb
597 .drain()
598 .map(|(ChannelDescriptor { name, tags }, points)| {
599 let channel = Channel { name };
600 let points_obj = Points {
601 points_type: Some(points),
602 };
603 Series {
604 channel: Some(channel),
605 tags: tags
606 .map(|tags| tags.into_iter().collect())
607 .unwrap_or_default(),
608 points: Some(points_obj),
609 }
610 })
611 .collect();
612 let result_count = self
613 .count
614 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
615 .unwrap();
616 (result_count, result)
617 }
618
619 fn is_empty(&self) -> bool {
620 self.count() == 0
621 }
622
623 fn count(&self) -> usize {
624 self.count.load(Ordering::Acquire)
625 }
626
627 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
628 let mut points_lock = self.points.lock();
629 if !points_lock.is_empty() {
632 self.condvar.wait(&mut points_lock);
633 } else {
634 debug!("buffer emptied since last check, skipping condvar wait");
635 }
636 on_notify(SeriesBufferGuard {
637 sb: points_lock,
638 count: &self.count,
639 });
640 }
641
642 fn notify(&self) -> bool {
643 self.condvar.notify_one()
644 }
645}
646
647fn batch_processor(
648 running: Arc<AtomicBool>,
649 points_buffer: Arc<SeriesBuffer>,
650 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
651 max_request_delay: Duration,
652) {
653 loop {
654 debug!("starting processor loop");
655 if points_buffer.is_empty() {
656 if !running.load(Ordering::Acquire) {
657 debug!("batch processor thread exiting due to running flag");
658 drop(request_chan);
659 break;
660 } else {
661 debug!("empty points buffer, waiting");
662 thread::park_timeout(max_request_delay);
663 }
664 continue;
665 }
666 let (point_count, series) = points_buffer.take();
667
668 if points_buffer.notify() {
669 debug!("notified one waiting thread after clearing points buffer");
670 }
671
672 let write_request = WriteRequestNominal { series };
673
674 if request_chan.is_full() {
675 debug!("ready to queue request but request channel is full");
676 }
677 let rep = request_chan.send((write_request, point_count));
678 debug!("queued request for processing");
679 if rep.is_err() {
680 error!("failed to send request to dispatcher");
681 } else {
682 debug!("finished submitting request");
683 }
684
685 thread::park_timeout(max_request_delay);
686 }
687 debug!("batch processor thread exiting");
688}
689
690impl Drop for NominalDatasetStream {
691 fn drop(&mut self) {
692 debug!("starting drop for NominalDatasetStream");
693 self.running.store(false, Ordering::Release);
694 while self.unflushed_points.load(Ordering::Acquire) > 0 {
695 debug!(
696 "waiting for all points to be flushed before dropping stream, {} points remaining",
697 self.unflushed_points.load(Ordering::Acquire)
698 );
699 thread::sleep(Duration::from_millis(50));
701 }
702 }
703}
704
705fn request_dispatcher<C: WriteRequestConsumer + 'static>(
706 running: Arc<AtomicBool>,
707 unflushed_points: Arc<AtomicUsize>,
708 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
709 consumer: Arc<C>,
710) {
711 let mut total_request_time = 0;
712 loop {
713 match request_rx.recv() {
714 Ok((request, point_count)) => {
715 debug!("received writerequest from channel");
716 let req_start = Instant::now();
717 match consumer.consume(&request) {
718 Ok(_) => {
719 let time = req_start.elapsed().as_millis();
720 debug!("request of {} points sent in {} ms", point_count, time);
721 total_request_time += time as u64;
722 }
723 Err(e) => {
724 error!("Failed to send request: {e:?}");
725 }
726 }
727 unflushed_points.fetch_sub(point_count, Ordering::Release);
728
729 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
730 {
731 info!("all points flushed, closing dispatcher thread");
732 drop(request_rx);
734 break;
735 }
736 }
737 Err(e) => {
738 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
739 break;
740 }
741 }
742 }
743 debug!(
744 "request dispatcher thread exiting. total request time: {}",
745 total_request_time
746 );
747}
748
749fn points_len(points_type: &PointsType) -> usize {
750 match points_type {
751 PointsType::DoublePoints(points) => points.points.len(),
752 PointsType::StringPoints(points) => points.points.len(),
753 PointsType::IntegerPoints(points) => points.points.len(),
754 }
755}