1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::NominalApiClients;
32use crate::client::PRODUCTION_API_URL;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::notifier::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46 pub max_points_per_record: usize,
47 pub max_request_delay: Duration,
48 pub max_buffered_requests: usize,
49 pub request_dispatcher_tasks: usize,
50 pub base_api_url: String,
51}
52
53impl Default for NominalStreamOpts {
54 fn default() -> Self {
55 Self {
56 max_points_per_record: 250_000,
57 max_request_delay: Duration::from_millis(100),
58 max_buffered_requests: 4,
59 request_dispatcher_tasks: 8,
60 base_api_url: PRODUCTION_API_URL.to_string(),
61 }
62 }
63}
64
65#[derive(Debug, Default)]
66pub struct NominalDatasetStreamBuilder {
67 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
68 stream_to_file: Option<PathBuf>,
69 file_fallback: Option<PathBuf>,
70 opts: NominalStreamOpts,
71}
72
73impl NominalDatasetStreamBuilder {
74 pub fn new() -> NominalDatasetStreamBuilder {
75 NominalDatasetStreamBuilder {
76 ..Default::default()
77 }
78 }
79
80 pub fn stream_to_core(
81 mut self,
82 token: BearerToken,
83 dataset: ResourceIdentifier,
84 handle: tokio::runtime::Handle,
85 ) -> Self {
86 self.stream_to_core = Some((token, dataset, handle));
87 self
88 }
89
90 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
91 self.stream_to_file = Some(file_path.into());
92 self
93 }
94
95 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
96 self.file_fallback = Some(file_path.into());
97 self
98 }
99
100 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
101 self.opts = opts;
102 self
103 }
104
105 #[cfg(feature = "logging")]
106 fn init_logging(self, directive: Option<&str>) -> Self {
107 use tracing_subscriber::layer::SubscriberExt;
108 use tracing_subscriber::util::SubscriberInitExt;
109
110 let base = tracing_subscriber::EnvFilter::builder()
112 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into());
113 let env_filter = match directive {
114 Some(d) => base.parse_lossy(d),
115 None => base.from_env_lossy(),
116 };
117
118 let subscriber = tracing_subscriber::registry()
119 .with(
120 tracing_subscriber::fmt::layer()
121 .with_thread_ids(true)
122 .with_thread_names(true)
123 .with_line_number(true),
124 )
125 .with(env_filter);
126
127 if let Err(error) = subscriber.try_init() {
128 eprintln!("nominal streaming failed to enable logging: {error}");
129 }
130
131 self
132 }
133
134 #[cfg(feature = "logging")]
135 pub fn enable_logging(self) -> Self {
136 self.init_logging(None)
137 }
138
139 #[cfg(feature = "logging")]
140 pub fn enable_logging_with_directive(self, log_directive: &str) -> Self {
141 self.init_logging(Some(log_directive))
142 }
143
144 pub fn build(self) -> NominalDatasetStream {
145 let core_consumer = self.core_consumer();
146 let file_consumer = self.file_consumer();
147 let fallback_consumer = self.fallback_consumer();
148
149 match (core_consumer, file_consumer, fallback_consumer) {
150 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
151 (Some(_), Some(_), Some(_)) => {
152 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
153 }
154 (Some(core), None, None) => self.into_stream(core),
155 (Some(core), None, Some(fallback)) => {
156 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
157 }
158 (None, Some(file), None) => self.into_stream(file),
159 (None, Some(file), Some(fallback)) => {
160 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
162 }
163 (Some(core), Some(file), None) => {
164 self.into_stream(DualWriteRequestConsumer::new(core, file))
165 }
166 }
167 }
168
169 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
170 self.stream_to_core
171 .as_ref()
172 .map(|(token, dataset, handle)| {
173 NominalCoreConsumer::new(
174 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
175 handle.clone(),
176 token.clone(),
177 dataset.clone(),
178 )
179 })
180 }
181
182 fn file_consumer(&self) -> Option<AvroFileConsumer> {
183 self.stream_to_file
184 .as_ref()
185 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
186 }
187
188 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
189 self.file_fallback
190 .as_ref()
191 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
192 }
193
194 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
195 let logging_consumer =
196 ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
197 NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
198 }
199}
200
201#[deprecated]
203pub type NominalDatasourceStream = NominalDatasetStream;
204
205pub struct NominalDatasetStream {
206 opts: NominalStreamOpts,
207 running: Arc<AtomicBool>,
208 unflushed_points: Arc<AtomicUsize>,
209 primary_buffer: Arc<SeriesBuffer>,
210 secondary_buffer: Arc<SeriesBuffer>,
211 primary_handle: thread::JoinHandle<()>,
212 secondary_handle: thread::JoinHandle<()>,
213}
214
215impl NominalDatasetStream {
216 pub fn builder() -> NominalDatasetStreamBuilder {
217 NominalDatasetStreamBuilder::new()
218 }
219
220 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
221 consumer: C,
222 opts: NominalStreamOpts,
223 ) -> Self {
224 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
225 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
226
227 let (request_tx, request_rx) =
228 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
229
230 let running = Arc::new(AtomicBool::new(true));
231 let unflushed_points = Arc::new(AtomicUsize::new(0));
232
233 let primary_handle = thread::Builder::new()
234 .name("nmstream_primary".to_string())
235 .spawn({
236 let points_buffer = Arc::clone(&primary_buffer);
237 let running = running.clone();
238 let tx = request_tx.clone();
239 move || {
240 batch_processor(running, points_buffer, tx, opts.max_request_delay);
241 }
242 })
243 .unwrap();
244
245 let secondary_handle = thread::Builder::new()
246 .name("nmstream_secondary".to_string())
247 .spawn({
248 let secondary_buffer = Arc::clone(&secondary_buffer);
249 let running = running.clone();
250 move || {
251 batch_processor(
252 running,
253 secondary_buffer,
254 request_tx,
255 opts.max_request_delay,
256 );
257 }
258 })
259 .unwrap();
260
261 let consumer = Arc::new(consumer);
262
263 for i in 0..opts.request_dispatcher_tasks {
264 thread::Builder::new()
265 .name(format!("nmstream_dispatch_{i}"))
266 .spawn({
267 let running = Arc::clone(&running);
268 let unflushed_points = Arc::clone(&unflushed_points);
269 let rx = request_rx.clone();
270 let consumer = consumer.clone();
271 move || {
272 debug!("starting request dispatcher");
273 request_dispatcher(running, unflushed_points, rx, consumer);
274 }
275 })
276 .unwrap();
277 }
278
279 NominalDatasetStream {
280 opts,
281 running,
282 unflushed_points,
283 primary_buffer,
284 secondary_buffer,
285 primary_handle,
286 secondary_handle,
287 }
288 }
289
290 pub fn double_writer<'a>(
291 &'a self,
292 channel_descriptor: &'a ChannelDescriptor,
293 ) -> NominalDoubleWriter<'a> {
294 NominalDoubleWriter {
295 writer: NominalChannelWriter::new(self, channel_descriptor),
296 }
297 }
298
299 pub fn string_writer<'a>(
300 &'a self,
301 channel_descriptor: &'a ChannelDescriptor,
302 ) -> NominalStringWriter<'a> {
303 NominalStringWriter {
304 writer: NominalChannelWriter::new(self, channel_descriptor),
305 }
306 }
307
308 pub fn integer_writer<'a>(
309 &'a self,
310 channel_descriptor: &'a ChannelDescriptor,
311 ) -> NominalIntegerWriter<'a> {
312 NominalIntegerWriter {
313 writer: NominalChannelWriter::new(self, channel_descriptor),
314 }
315 }
316
317 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
318 let new_points = new_points.into_points();
319 let new_count = points_len(&new_points);
320
321 self.when_capacity(new_count, |mut sb| {
322 sb.extend(channel_descriptor, new_points)
323 });
324 }
325
326 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
327 self.unflushed_points
328 .fetch_add(new_count, Ordering::Release);
329
330 if self.primary_buffer.has_capacity(new_count) {
331 debug!("adding {} points to primary buffer", new_count);
332 callback(self.primary_buffer.lock());
333 } else if self.secondary_buffer.has_capacity(new_count) {
334 self.primary_handle.thread().unpark();
336 debug!("adding {} points to secondary buffer", new_count);
337 callback(self.secondary_buffer.lock());
338 } else {
339 let buf = if self.primary_buffer < self.secondary_buffer {
340 info!("waiting for primary buffer to flush to append {new_count} points...");
341 self.primary_handle.thread().unpark();
342 &self.primary_buffer
343 } else {
344 info!("waiting for secondary buffer to flush to append {new_count} points...");
345 self.secondary_handle.thread().unpark();
346 &self.secondary_buffer
347 };
348
349 buf.on_notify(callback);
350 }
351 }
352}
353
354pub struct NominalChannelWriter<'ds, T>
355where
356 Vec<T>: IntoPoints,
357{
358 channel: &'ds ChannelDescriptor,
359 stream: &'ds NominalDatasetStream,
360 last_flushed_at: Instant,
361 unflushed: Vec<T>,
362}
363
364impl<T> NominalChannelWriter<'_, T>
365where
366 Vec<T>: IntoPoints,
367{
368 fn new<'ds>(
369 stream: &'ds NominalDatasetStream,
370 channel: &'ds ChannelDescriptor,
371 ) -> NominalChannelWriter<'ds, T> {
372 NominalChannelWriter {
373 channel,
374 stream,
375 last_flushed_at: Instant::now(),
376 unflushed: vec![],
377 }
378 }
379
380 fn push_point(&mut self, point: T) {
381 self.unflushed.push(point);
382 if self.unflushed.len() >= self.stream.opts.max_points_per_record
383 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
384 {
385 debug!(
386 "conditionally flushing {:?}, ({} points, {:?} since last)",
387 self.channel,
388 self.unflushed.len(),
389 self.last_flushed_at.elapsed()
390 );
391 self.flush();
392 }
393 }
394
395 fn flush(&mut self) {
396 if self.unflushed.is_empty() {
397 return;
398 }
399 info!(
400 "flushing writer for {:?} with {} points",
401 self.channel,
402 self.unflushed.len()
403 );
404 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
405 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
406 buf.extend(self.channel, to_flush);
407 self.last_flushed_at = Instant::now();
408 })
409 }
410}
411
412impl<T> Drop for NominalChannelWriter<'_, T>
413where
414 Vec<T>: IntoPoints,
415{
416 fn drop(&mut self) {
417 info!("flushing then dropping writer for: {:?}", self.channel);
418 self.flush();
419 }
420}
421
422pub struct NominalDoubleWriter<'ds> {
423 writer: NominalChannelWriter<'ds, DoublePoint>,
424}
425
426impl NominalDoubleWriter<'_> {
427 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
428 self.writer.push_point(DoublePoint {
429 timestamp: Some(timestamp.into_timestamp()),
430 value,
431 });
432 }
433}
434
435pub struct NominalIntegerWriter<'ds> {
436 writer: NominalChannelWriter<'ds, IntegerPoint>,
437}
438
439impl NominalIntegerWriter<'_> {
440 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
441 self.writer.push_point(IntegerPoint {
442 timestamp: Some(timestamp.into_timestamp()),
443 value,
444 });
445 }
446}
447
448pub struct NominalStringWriter<'ds> {
449 writer: NominalChannelWriter<'ds, StringPoint>,
450}
451
452impl NominalStringWriter<'_> {
453 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
454 self.writer.push_point(StringPoint {
455 timestamp: Some(timestamp.into_timestamp()),
456 value: value.into(),
457 });
458 }
459}
460
461struct SeriesBuffer {
462 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
463 count: AtomicUsize,
464 flush_time: AtomicU64,
465 condvar: Condvar,
466 max_capacity: usize,
467}
468
469struct SeriesBufferGuard<'sb> {
470 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
471 count: &'sb AtomicUsize,
472}
473
474impl SeriesBufferGuard<'_> {
475 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
476 let points = points.into_points();
477 let new_point_count = points_len(&points);
478
479 if !self.sb.contains_key(channel_descriptor) {
480 self.sb.insert(channel_descriptor.clone(), points);
481 } else {
482 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
483 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
484 existing.points.extend(new.points)
485 }
486 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
487 existing.points.extend(new.points)
488 }
489 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
490 existing.points.extend(new.points)
491 }
492 (_, _) => {
493 panic!("mismatched types");
495 }
496 }
497 }
498
499 self.count.fetch_add(new_point_count, Ordering::Release);
500 }
501}
502
503impl PartialEq for SeriesBuffer {
504 fn eq(&self, other: &Self) -> bool {
505 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
506 }
507}
508
509impl PartialOrd for SeriesBuffer {
510 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
511 let flush_time = self.flush_time.load(Ordering::Acquire);
512 let other_flush_time = other.flush_time.load(Ordering::Acquire);
513 flush_time.partial_cmp(&other_flush_time)
514 }
515}
516
517impl SeriesBuffer {
518 fn new(capacity: usize) -> Self {
519 Self {
520 points: Mutex::new(HashMap::new()),
521 count: AtomicUsize::new(0),
522 flush_time: AtomicU64::new(0),
523 condvar: Condvar::new(),
524 max_capacity: capacity,
525 }
526 }
527
528 fn has_capacity(&self, new_points_count: usize) -> bool {
533 let count = self.count.load(Ordering::Acquire);
534 count == 0 || count + new_points_count <= self.max_capacity
535 }
536
537 fn lock(&self) -> SeriesBufferGuard<'_> {
538 SeriesBufferGuard {
539 sb: self.points.lock(),
540 count: &self.count,
541 }
542 }
543
544 fn take(&self) -> (usize, Vec<Series>) {
545 let mut points = self.lock();
546 self.flush_time.store(
547 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
548 Ordering::Release,
549 );
550 let result = points
551 .sb
552 .drain()
553 .map(|(ChannelDescriptor { name, tags }, points)| {
554 let channel = Channel { name };
555 let points_obj = Points {
556 points_type: Some(points),
557 };
558 Series {
559 channel: Some(channel),
560 tags: tags
561 .map(|tags| tags.into_iter().collect())
562 .unwrap_or_default(),
563 points: Some(points_obj),
564 }
565 })
566 .collect();
567 let result_count = self
568 .count
569 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
570 .unwrap();
571 (result_count, result)
572 }
573
574 fn is_empty(&self) -> bool {
575 self.count() == 0
576 }
577
578 fn count(&self) -> usize {
579 self.count.load(Ordering::Acquire)
580 }
581
582 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
583 let mut points_lock = self.points.lock();
584 if !points_lock.is_empty() {
587 self.condvar.wait(&mut points_lock);
588 } else {
589 debug!("buffer emptied since last check, skipping condvar wait");
590 }
591 on_notify(SeriesBufferGuard {
592 sb: points_lock,
593 count: &self.count,
594 });
595 }
596
597 fn notify(&self) -> bool {
598 self.condvar.notify_one()
599 }
600}
601
602fn batch_processor(
603 running: Arc<AtomicBool>,
604 points_buffer: Arc<SeriesBuffer>,
605 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
606 max_request_delay: Duration,
607) {
608 loop {
609 debug!("starting processor loop");
610 if points_buffer.is_empty() {
611 if !running.load(Ordering::Acquire) {
612 debug!("batch processor thread exiting due to running flag");
613 drop(request_chan);
614 break;
615 } else {
616 debug!("empty points buffer, waiting");
617 thread::park_timeout(max_request_delay);
618 }
619 continue;
620 }
621 let (point_count, series) = points_buffer.take();
622
623 if points_buffer.notify() {
624 debug!("notified one waiting thread after clearing points buffer");
625 }
626
627 let write_request = WriteRequestNominal { series };
628
629 if request_chan.is_full() {
630 debug!("ready to queue request but request channel is full");
631 }
632 let rep = request_chan.send((write_request, point_count));
633 debug!("queued request for processing");
634 if rep.is_err() {
635 error!("failed to send request to dispatcher");
636 } else {
637 debug!("finished submitting request");
638 }
639
640 thread::park_timeout(max_request_delay);
641 }
642 debug!("batch processor thread exiting");
643}
644
645impl Drop for NominalDatasetStream {
646 fn drop(&mut self) {
647 debug!("starting drop for NominalDatasetStream");
648 self.running.store(false, Ordering::Release);
649 while self.unflushed_points.load(Ordering::Acquire) > 0 {
650 debug!(
651 "waiting for all points to be flushed before dropping stream, {} points remaining",
652 self.unflushed_points.load(Ordering::Acquire)
653 );
654 thread::sleep(Duration::from_millis(50));
656 }
657 }
658}
659
660fn request_dispatcher<C: WriteRequestConsumer + 'static>(
661 running: Arc<AtomicBool>,
662 unflushed_points: Arc<AtomicUsize>,
663 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
664 consumer: Arc<C>,
665) {
666 let mut total_request_time = 0;
667 loop {
668 match request_rx.recv() {
669 Ok((request, point_count)) => {
670 debug!("received writerequest from channel");
671 let req_start = Instant::now();
672 match consumer.consume(&request) {
673 Ok(_) => {
674 let time = req_start.elapsed().as_millis();
675 debug!("request of {} points sent in {} ms", point_count, time);
676 total_request_time += time as u64;
677 }
678 Err(e) => {
679 error!("Failed to send request: {e:?}");
680 }
681 }
682 unflushed_points.fetch_sub(point_count, Ordering::Release);
683
684 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
685 {
686 info!("all points flushed, closing dispatcher thread");
687 drop(request_rx);
689 break;
690 }
691 }
692 Err(e) => {
693 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
694 break;
695 }
696 }
697 }
698 debug!(
699 "request dispatcher thread exiting. total request time: {}",
700 total_request_time
701 );
702}
703
704fn points_len(points_type: &PointsType) -> usize {
705 match points_type {
706 PointsType::DoublePoints(points) => points.points.len(),
707 PointsType::StringPoints(points) => points.points.len(),
708 PointsType::IntegerPoints(points) => points.points.len(),
709 }
710}