1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::PRODUCTION_STREAMING_CLIENT;
32use crate::consumer::AvroFileConsumer;
33use crate::consumer::DualWriteRequestConsumer;
34use crate::consumer::ListeningWriteRequestConsumer;
35use crate::consumer::NominalCoreConsumer;
36use crate::consumer::RequestConsumerWithFallback;
37use crate::consumer::WriteRequestConsumer;
38use crate::notifier::LoggingListener;
39use crate::types::ChannelDescriptor;
40use crate::types::IntoPoints;
41use crate::types::IntoTimestamp;
42
43#[derive(Debug, Clone)]
44pub struct NominalStreamOpts {
45 pub max_points_per_record: usize,
46 pub max_request_delay: Duration,
47 pub max_buffered_requests: usize,
48 pub request_dispatcher_tasks: usize,
49}
50
51impl Default for NominalStreamOpts {
52 fn default() -> Self {
53 Self {
54 max_points_per_record: 250_000,
55 max_request_delay: Duration::from_millis(100),
56 max_buffered_requests: 4,
57 request_dispatcher_tasks: 8,
58 }
59 }
60}
61
62#[derive(Debug, Default)]
63pub struct NominalDatasetStreamBuilder {
64 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
65 stream_to_file: Option<PathBuf>,
66 file_fallback: Option<PathBuf>,
67 opts: NominalStreamOpts,
68}
69
70impl NominalDatasetStreamBuilder {
71 pub fn new() -> NominalDatasetStreamBuilder {
72 NominalDatasetStreamBuilder {
73 ..Default::default()
74 }
75 }
76
77 pub fn stream_to_core(
78 mut self,
79 token: BearerToken,
80 dataset: ResourceIdentifier,
81 handle: tokio::runtime::Handle,
82 ) -> Self {
83 self.stream_to_core = Some((token, dataset, handle));
84 self
85 }
86 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
87 self.stream_to_file = Some(file_path.into());
88 self
89 }
90
91 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
92 self.file_fallback = Some(file_path.into());
93 self
94 }
95
96 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
97 self.opts = opts;
98 self
99 }
100
101 #[cfg(feature = "logging")]
102 pub fn enable_logging(self) -> Self {
103 use tracing_subscriber::layer::SubscriberExt;
104 use tracing_subscriber::util::SubscriberInitExt;
105
106 let subscriber = tracing_subscriber::registry()
107 .with(
108 tracing_subscriber::EnvFilter::builder()
109 .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
110 .from_env_lossy(),
111 )
112 .with(tracing_subscriber::fmt::layer());
113
114 if let Err(error) = subscriber.try_init() {
115 eprintln!("nominal streaming failed to enabled logging: {error}");
116 }
117
118 self
119 }
120
121 pub fn build(self) -> NominalDatasetStream {
122 let core_consumer = self.core_consumer();
123 let file_consumer = self.file_consumer();
124 let fallback_consumer = self.fallback_consumer();
125
126 match (core_consumer, file_consumer, fallback_consumer) {
127 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
128 (Some(_), Some(_), Some(_)) => {
129 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
130 }
131 (Some(core), None, None) => self.into_stream(core),
132 (Some(core), None, Some(fallback)) => {
133 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
134 }
135 (None, Some(file), None) => self.into_stream(file),
136 (None, Some(file), Some(fallback)) => {
137 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
139 }
140 (Some(core), Some(file), None) => {
141 self.into_stream(DualWriteRequestConsumer::new(core, file))
142 }
143 }
144 }
145
146 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
147 self.stream_to_core
148 .as_ref()
149 .map(|(token, dataset, handle)| {
150 NominalCoreConsumer::new(
151 PRODUCTION_STREAMING_CLIENT.clone(),
152 handle.clone(),
153 token.clone(),
154 dataset.clone(),
155 )
156 })
157 }
158
159 fn file_consumer(&self) -> Option<AvroFileConsumer> {
160 self.stream_to_file
161 .as_ref()
162 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
163 }
164
165 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
166 self.file_fallback
167 .as_ref()
168 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
169 }
170
171 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
172 let logging_consumer =
173 ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
174 NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
175 }
176}
177
178#[deprecated]
180pub type NominalDatasourceStream = NominalDatasetStream;
181
182pub struct NominalDatasetStream {
183 opts: NominalStreamOpts,
184 running: Arc<AtomicBool>,
185 unflushed_points: Arc<AtomicUsize>,
186 primary_buffer: Arc<SeriesBuffer>,
187 secondary_buffer: Arc<SeriesBuffer>,
188 primary_handle: thread::JoinHandle<()>,
189 secondary_handle: thread::JoinHandle<()>,
190}
191
192impl NominalDatasetStream {
193 pub fn builder() -> NominalDatasetStreamBuilder {
194 NominalDatasetStreamBuilder::new()
195 }
196
197 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
198 consumer: C,
199 opts: NominalStreamOpts,
200 ) -> Self {
201 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
202 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
203
204 let (request_tx, request_rx) =
205 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
206
207 let running = Arc::new(AtomicBool::new(true));
208 let unflushed_points = Arc::new(AtomicUsize::new(0));
209
210 let primary_handle = thread::Builder::new()
211 .name("nmstream_primary".to_string())
212 .spawn({
213 let points_buffer = Arc::clone(&primary_buffer);
214 let running = running.clone();
215 let tx = request_tx.clone();
216 move || {
217 batch_processor(running, points_buffer, tx, opts.max_request_delay);
218 }
219 })
220 .unwrap();
221
222 let secondary_handle = thread::Builder::new()
223 .name("nmstream_secondary".to_string())
224 .spawn({
225 let secondary_buffer = Arc::clone(&secondary_buffer);
226 let running = running.clone();
227 move || {
228 batch_processor(
229 running,
230 secondary_buffer,
231 request_tx,
232 opts.max_request_delay,
233 );
234 }
235 })
236 .unwrap();
237
238 let consumer = Arc::new(consumer);
239
240 for i in 0..opts.request_dispatcher_tasks {
241 thread::Builder::new()
242 .name(format!("nmstream_dispatch_{i}"))
243 .spawn({
244 let running = Arc::clone(&running);
245 let unflushed_points = Arc::clone(&unflushed_points);
246 let rx = request_rx.clone();
247 let consumer = consumer.clone();
248 move || {
249 debug!("starting request dispatcher");
250 request_dispatcher(running, unflushed_points, rx, consumer);
251 }
252 })
253 .unwrap();
254 }
255
256 NominalDatasetStream {
257 opts,
258 running,
259 unflushed_points,
260 primary_buffer,
261 secondary_buffer,
262 primary_handle,
263 secondary_handle,
264 }
265 }
266
267 pub fn double_writer<'a>(
268 &'a self,
269 channel_descriptor: &'a ChannelDescriptor,
270 ) -> NominalDoubleWriter<'a> {
271 NominalDoubleWriter {
272 writer: NominalChannelWriter::new(self, channel_descriptor),
273 }
274 }
275
276 pub fn string_writer<'a>(
277 &'a self,
278 channel_descriptor: &'a ChannelDescriptor,
279 ) -> NominalStringWriter<'a> {
280 NominalStringWriter {
281 writer: NominalChannelWriter::new(self, channel_descriptor),
282 }
283 }
284
285 pub fn integer_writer<'a>(
286 &'a self,
287 channel_descriptor: &'a ChannelDescriptor,
288 ) -> NominalIntegerWriter<'a> {
289 NominalIntegerWriter {
290 writer: NominalChannelWriter::new(self, channel_descriptor),
291 }
292 }
293
294 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
295 let new_points = new_points.into_points();
296 let new_count = points_len(&new_points);
297
298 self.when_capacity(new_count, |mut sb| {
299 sb.extend(channel_descriptor, new_points)
300 });
301 }
302
303 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
304 self.unflushed_points
305 .fetch_add(new_count, Ordering::Release);
306
307 if self.primary_buffer.has_capacity(new_count) {
308 debug!("adding {} points to primary buffer", new_count);
309 callback(self.primary_buffer.lock());
310 } else if self.secondary_buffer.has_capacity(new_count) {
311 self.primary_handle.thread().unpark();
313 debug!("adding {} points to secondary buffer", new_count);
314 callback(self.secondary_buffer.lock());
315 } else {
316 let buf = if self.primary_buffer < self.secondary_buffer {
317 info!("waiting for primary buffer to flush to append {new_count} points...");
318 self.primary_handle.thread().unpark();
319 &self.primary_buffer
320 } else {
321 info!("waiting for secondary buffer to flush to append {new_count} points...");
322 self.secondary_handle.thread().unpark();
323 &self.secondary_buffer
324 };
325
326 buf.on_notify(callback);
327 }
328 }
329}
330
331pub struct NominalChannelWriter<'ds, T>
332where
333 Vec<T>: IntoPoints,
334{
335 channel: &'ds ChannelDescriptor,
336 stream: &'ds NominalDatasetStream,
337 last_flushed_at: Instant,
338 unflushed: Vec<T>,
339}
340
341impl<T> NominalChannelWriter<'_, T>
342where
343 Vec<T>: IntoPoints,
344{
345 fn new<'ds>(
346 stream: &'ds NominalDatasetStream,
347 channel: &'ds ChannelDescriptor,
348 ) -> NominalChannelWriter<'ds, T> {
349 NominalChannelWriter {
350 channel,
351 stream,
352 last_flushed_at: Instant::now(),
353 unflushed: vec![],
354 }
355 }
356
357 fn push_point(&mut self, point: T) {
358 self.unflushed.push(point);
359 if self.unflushed.len() >= self.stream.opts.max_points_per_record
360 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
361 {
362 debug!(
363 "conditionally flushing {:?}, ({} points, {:?} since last)",
364 self.channel,
365 self.unflushed.len(),
366 self.last_flushed_at.elapsed()
367 );
368 self.flush();
369 }
370 }
371
372 fn flush(&mut self) {
373 if self.unflushed.is_empty() {
374 return;
375 }
376 info!(
377 "flushing writer for {:?} with {} points",
378 self.channel,
379 self.unflushed.len()
380 );
381 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
382 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
383 buf.extend(self.channel, to_flush);
384 self.last_flushed_at = Instant::now();
385 })
386 }
387}
388
389impl<T> Drop for NominalChannelWriter<'_, T>
390where
391 Vec<T>: IntoPoints,
392{
393 fn drop(&mut self) {
394 info!("flushing then dropping writer for: {:?}", self.channel);
395 self.flush();
396 }
397}
398
399pub struct NominalDoubleWriter<'ds> {
400 writer: NominalChannelWriter<'ds, DoublePoint>,
401}
402
403impl NominalDoubleWriter<'_> {
404 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
405 self.writer.push_point(DoublePoint {
406 timestamp: Some(timestamp.into_timestamp()),
407 value,
408 });
409 }
410}
411
412pub struct NominalIntegerWriter<'ds> {
413 writer: NominalChannelWriter<'ds, IntegerPoint>,
414}
415
416impl NominalIntegerWriter<'_> {
417 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
418 self.writer.push_point(IntegerPoint {
419 timestamp: Some(timestamp.into_timestamp()),
420 value,
421 });
422 }
423}
424
425pub struct NominalStringWriter<'ds> {
426 writer: NominalChannelWriter<'ds, StringPoint>,
427}
428
429impl NominalStringWriter<'_> {
430 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
431 self.writer.push_point(StringPoint {
432 timestamp: Some(timestamp.into_timestamp()),
433 value: value.into(),
434 });
435 }
436}
437
438struct SeriesBuffer {
439 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
440 count: AtomicUsize,
441 flush_time: AtomicU64,
442 condvar: Condvar,
443 max_capacity: usize,
444}
445
446struct SeriesBufferGuard<'sb> {
447 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
448 count: &'sb AtomicUsize,
449}
450
451impl SeriesBufferGuard<'_> {
452 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
453 let points = points.into_points();
454 let new_point_count = points_len(&points);
455
456 if !self.sb.contains_key(channel_descriptor) {
457 self.sb.insert(channel_descriptor.clone(), points);
458 } else {
459 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
460 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
461 existing.points.extend(new.points)
462 }
463 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
464 existing.points.extend(new.points)
465 }
466 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
467 existing.points.extend(new.points)
468 }
469 (_, _) => {
470 panic!("mismatched types");
472 }
473 }
474 }
475
476 self.count.fetch_add(new_point_count, Ordering::Release);
477 }
478}
479
480impl PartialEq for SeriesBuffer {
481 fn eq(&self, other: &Self) -> bool {
482 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
483 }
484}
485
486impl PartialOrd for SeriesBuffer {
487 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
488 let flush_time = self.flush_time.load(Ordering::Acquire);
489 let other_flush_time = other.flush_time.load(Ordering::Acquire);
490 flush_time.partial_cmp(&other_flush_time)
491 }
492}
493
494impl SeriesBuffer {
495 fn new(capacity: usize) -> Self {
496 Self {
497 points: Mutex::new(HashMap::new()),
498 count: AtomicUsize::new(0),
499 flush_time: AtomicU64::new(0),
500 condvar: Condvar::new(),
501 max_capacity: capacity,
502 }
503 }
504
505 fn has_capacity(&self, new_points_count: usize) -> bool {
510 let count = self.count.load(Ordering::Acquire);
511 count == 0 || count + new_points_count <= self.max_capacity
512 }
513
514 fn lock(&self) -> SeriesBufferGuard<'_> {
515 SeriesBufferGuard {
516 sb: self.points.lock(),
517 count: &self.count,
518 }
519 }
520
521 fn take(&self) -> (usize, Vec<Series>) {
522 let mut points = self.lock();
523 self.flush_time.store(
524 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
525 Ordering::Release,
526 );
527 let result = points
528 .sb
529 .drain()
530 .map(|(ChannelDescriptor { name, tags }, points)| {
531 let channel = Channel { name };
532 let points_obj = Points {
533 points_type: Some(points),
534 };
535 Series {
536 channel: Some(channel),
537 tags: tags
538 .map(|tags| tags.into_iter().collect())
539 .unwrap_or_default(),
540 points: Some(points_obj),
541 }
542 })
543 .collect();
544 let result_count = self
545 .count
546 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
547 .unwrap();
548 (result_count, result)
549 }
550
551 fn is_empty(&self) -> bool {
552 self.count() == 0
553 }
554
555 fn count(&self) -> usize {
556 self.count.load(Ordering::Acquire)
557 }
558
559 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
560 let mut points_lock = self.points.lock();
561 if !points_lock.is_empty() {
564 self.condvar.wait(&mut points_lock);
565 } else {
566 debug!("buffer emptied since last check, skipping condvar wait");
567 }
568 on_notify(SeriesBufferGuard {
569 sb: points_lock,
570 count: &self.count,
571 });
572 }
573
574 fn notify(&self) -> bool {
575 self.condvar.notify_one()
576 }
577}
578
579fn batch_processor(
580 running: Arc<AtomicBool>,
581 points_buffer: Arc<SeriesBuffer>,
582 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
583 max_request_delay: Duration,
584) {
585 loop {
586 debug!("starting processor loop");
587 if points_buffer.is_empty() {
588 if !running.load(Ordering::Acquire) {
589 debug!("batch processor thread exiting due to running flag");
590 drop(request_chan);
591 break;
592 } else {
593 debug!("empty points buffer, waiting");
594 thread::park_timeout(max_request_delay);
595 }
596 continue;
597 }
598 let (point_count, series) = points_buffer.take();
599
600 if points_buffer.notify() {
601 debug!("notified one waiting thread after clearing points buffer");
602 }
603
604 let write_request = WriteRequestNominal { series };
605
606 if request_chan.is_full() {
607 debug!("ready to queue request but request channel is full");
608 }
609 let rep = request_chan.send((write_request, point_count));
610 debug!("queued request for processing");
611 if rep.is_err() {
612 error!("failed to send request to dispatcher");
613 } else {
614 debug!("finished submitting request");
615 }
616
617 thread::park_timeout(max_request_delay);
618 }
619 debug!("batch processor thread exiting");
620}
621
622impl Drop for NominalDatasetStream {
623 fn drop(&mut self) {
624 debug!("starting drop for NominalDatasetStream");
625 self.running.store(false, Ordering::Release);
626 while self.unflushed_points.load(Ordering::Acquire) > 0 {
627 debug!(
628 "waiting for all points to be flushed before dropping stream, {} points remaining",
629 self.unflushed_points.load(Ordering::Acquire)
630 );
631 thread::sleep(Duration::from_millis(50));
633 }
634 }
635}
636
637fn request_dispatcher<C: WriteRequestConsumer + 'static>(
638 running: Arc<AtomicBool>,
639 unflushed_points: Arc<AtomicUsize>,
640 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
641 consumer: Arc<C>,
642) {
643 let mut total_request_time = 0;
644 loop {
645 match request_rx.recv() {
646 Ok((request, point_count)) => {
647 debug!("received writerequest from channel");
648 let req_start = Instant::now();
649 match consumer.consume(&request) {
650 Ok(_) => {
651 let time = req_start.elapsed().as_millis();
652 debug!("request of {} points sent in {} ms", point_count, time);
653 total_request_time += time as u64;
654 }
655 Err(e) => {
656 error!("Failed to send request: {e:?}");
657 }
658 }
659 unflushed_points.fetch_sub(point_count, Ordering::Release);
660
661 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
662 {
663 info!("all points flushed, closing dispatcher thread");
664 drop(request_rx);
666 break;
667 }
668 }
669 Err(e) => {
670 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
671 break;
672 }
673 }
674 }
675 debug!(
676 "request dispatcher thread exiting. total request time: {}",
677 total_request_time
678 );
679}
680
681fn points_len(points_type: &PointsType) -> usize {
682 match points_type {
683 PointsType::DoublePoints(points) => points.points.len(),
684 PointsType::StringPoints(points) => points.points.len(),
685 PointsType::IntegerPoints(points) => points.points.len(),
686 }
687}