1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30
31use crate::client::NominalApiClients;
32use crate::client::PRODUCTION_API_URL;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::notifier::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46 pub max_points_per_record: usize,
47 pub max_request_delay: Duration,
48 pub max_buffered_requests: usize,
49 pub request_dispatcher_tasks: usize,
50 pub base_api_url: String,
51}
52
53impl Default for NominalStreamOpts {
54 fn default() -> Self {
55 Self {
56 max_points_per_record: 250_000,
57 max_request_delay: Duration::from_millis(100),
58 max_buffered_requests: 4,
59 request_dispatcher_tasks: 8,
60 base_api_url: PRODUCTION_API_URL.to_string(),
61 }
62 }
63}
64
65#[derive(Debug, Default)]
66pub struct NominalDatasetStreamBuilder {
67 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
68 stream_to_file: Option<PathBuf>,
69 file_fallback: Option<PathBuf>,
70 opts: NominalStreamOpts,
71}
72
73impl NominalDatasetStreamBuilder {
74 pub fn new() -> NominalDatasetStreamBuilder {
75 NominalDatasetStreamBuilder {
76 ..Default::default()
77 }
78 }
79
80 pub fn stream_to_core(
81 mut self,
82 token: BearerToken,
83 dataset: ResourceIdentifier,
84 handle: tokio::runtime::Handle,
85 ) -> Self {
86 self.stream_to_core = Some((token, dataset, handle));
87 self
88 }
89
90 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
91 self.stream_to_file = Some(file_path.into());
92 self
93 }
94
95 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
96 self.file_fallback = Some(file_path.into());
97 self
98 }
99
100 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
101 self.opts = opts;
102 self
103 }
104
105 #[cfg(feature = "logging")]
106 pub fn enable_logging(self) -> Self {
107 use tracing_subscriber::layer::SubscriberExt;
108 use tracing_subscriber::util::SubscriberInitExt;
109
110 let subscriber = tracing_subscriber::registry()
111 .with(
112 tracing_subscriber::fmt::layer()
113 .with_thread_ids(true)
114 .with_thread_names(true)
115 .with_line_number(true),
116 )
117 .with(
118 tracing_subscriber::EnvFilter::builder()
119 .with_default_directive(tracing_subscriber::filter::LevelFilter::DEBUG.into())
120 .from_env_lossy(),
121 );
122
123 if let Err(error) = subscriber.try_init() {
124 eprintln!("nominal streaming failed to enable logging: {error}");
125 }
126
127 self
128 }
129
130 pub fn build(self) -> NominalDatasetStream {
131 let core_consumer = self.core_consumer();
132 let file_consumer = self.file_consumer();
133 let fallback_consumer = self.fallback_consumer();
134
135 match (core_consumer, file_consumer, fallback_consumer) {
136 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
137 (Some(_), Some(_), Some(_)) => {
138 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
139 }
140 (Some(core), None, None) => self.into_stream(core),
141 (Some(core), None, Some(fallback)) => {
142 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
143 }
144 (None, Some(file), None) => self.into_stream(file),
145 (None, Some(file), Some(fallback)) => {
146 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
148 }
149 (Some(core), Some(file), None) => {
150 self.into_stream(DualWriteRequestConsumer::new(core, file))
151 }
152 }
153 }
154
155 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
156 self.stream_to_core
157 .as_ref()
158 .map(|(token, dataset, handle)| {
159 NominalCoreConsumer::new(
160 NominalApiClients::from_uri(self.opts.base_api_url.as_str()),
161 handle.clone(),
162 token.clone(),
163 dataset.clone(),
164 )
165 })
166 }
167
168 fn file_consumer(&self) -> Option<AvroFileConsumer> {
169 self.stream_to_file
170 .as_ref()
171 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
172 }
173
174 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
175 self.file_fallback
176 .as_ref()
177 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
178 }
179
180 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
181 let logging_consumer =
182 ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
183 NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
184 }
185}
186
187#[deprecated]
189pub type NominalDatasourceStream = NominalDatasetStream;
190
191pub struct NominalDatasetStream {
192 opts: NominalStreamOpts,
193 running: Arc<AtomicBool>,
194 unflushed_points: Arc<AtomicUsize>,
195 primary_buffer: Arc<SeriesBuffer>,
196 secondary_buffer: Arc<SeriesBuffer>,
197 primary_handle: thread::JoinHandle<()>,
198 secondary_handle: thread::JoinHandle<()>,
199}
200
201impl NominalDatasetStream {
202 pub fn builder() -> NominalDatasetStreamBuilder {
203 NominalDatasetStreamBuilder::new()
204 }
205
206 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
207 consumer: C,
208 opts: NominalStreamOpts,
209 ) -> Self {
210 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
211 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
212
213 let (request_tx, request_rx) =
214 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
215
216 let running = Arc::new(AtomicBool::new(true));
217 let unflushed_points = Arc::new(AtomicUsize::new(0));
218
219 let primary_handle = thread::Builder::new()
220 .name("nmstream_primary".to_string())
221 .spawn({
222 let points_buffer = Arc::clone(&primary_buffer);
223 let running = running.clone();
224 let tx = request_tx.clone();
225 move || {
226 batch_processor(running, points_buffer, tx, opts.max_request_delay);
227 }
228 })
229 .unwrap();
230
231 let secondary_handle = thread::Builder::new()
232 .name("nmstream_secondary".to_string())
233 .spawn({
234 let secondary_buffer = Arc::clone(&secondary_buffer);
235 let running = running.clone();
236 move || {
237 batch_processor(
238 running,
239 secondary_buffer,
240 request_tx,
241 opts.max_request_delay,
242 );
243 }
244 })
245 .unwrap();
246
247 let consumer = Arc::new(consumer);
248
249 for i in 0..opts.request_dispatcher_tasks {
250 thread::Builder::new()
251 .name(format!("nmstream_dispatch_{i}"))
252 .spawn({
253 let running = Arc::clone(&running);
254 let unflushed_points = Arc::clone(&unflushed_points);
255 let rx = request_rx.clone();
256 let consumer = consumer.clone();
257 move || {
258 debug!("starting request dispatcher");
259 request_dispatcher(running, unflushed_points, rx, consumer);
260 }
261 })
262 .unwrap();
263 }
264
265 NominalDatasetStream {
266 opts,
267 running,
268 unflushed_points,
269 primary_buffer,
270 secondary_buffer,
271 primary_handle,
272 secondary_handle,
273 }
274 }
275
276 pub fn double_writer<'a>(
277 &'a self,
278 channel_descriptor: &'a ChannelDescriptor,
279 ) -> NominalDoubleWriter<'a> {
280 NominalDoubleWriter {
281 writer: NominalChannelWriter::new(self, channel_descriptor),
282 }
283 }
284
285 pub fn string_writer<'a>(
286 &'a self,
287 channel_descriptor: &'a ChannelDescriptor,
288 ) -> NominalStringWriter<'a> {
289 NominalStringWriter {
290 writer: NominalChannelWriter::new(self, channel_descriptor),
291 }
292 }
293
294 pub fn integer_writer<'a>(
295 &'a self,
296 channel_descriptor: &'a ChannelDescriptor,
297 ) -> NominalIntegerWriter<'a> {
298 NominalIntegerWriter {
299 writer: NominalChannelWriter::new(self, channel_descriptor),
300 }
301 }
302
303 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
304 let new_points = new_points.into_points();
305 let new_count = points_len(&new_points);
306
307 self.when_capacity(new_count, |mut sb| {
308 sb.extend(channel_descriptor, new_points)
309 });
310 }
311
312 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
313 self.unflushed_points
314 .fetch_add(new_count, Ordering::Release);
315
316 if self.primary_buffer.has_capacity(new_count) {
317 debug!("adding {} points to primary buffer", new_count);
318 callback(self.primary_buffer.lock());
319 } else if self.secondary_buffer.has_capacity(new_count) {
320 self.primary_handle.thread().unpark();
322 debug!("adding {} points to secondary buffer", new_count);
323 callback(self.secondary_buffer.lock());
324 } else {
325 let buf = if self.primary_buffer < self.secondary_buffer {
326 info!("waiting for primary buffer to flush to append {new_count} points...");
327 self.primary_handle.thread().unpark();
328 &self.primary_buffer
329 } else {
330 info!("waiting for secondary buffer to flush to append {new_count} points...");
331 self.secondary_handle.thread().unpark();
332 &self.secondary_buffer
333 };
334
335 buf.on_notify(callback);
336 }
337 }
338}
339
340pub struct NominalChannelWriter<'ds, T>
341where
342 Vec<T>: IntoPoints,
343{
344 channel: &'ds ChannelDescriptor,
345 stream: &'ds NominalDatasetStream,
346 last_flushed_at: Instant,
347 unflushed: Vec<T>,
348}
349
350impl<T> NominalChannelWriter<'_, T>
351where
352 Vec<T>: IntoPoints,
353{
354 fn new<'ds>(
355 stream: &'ds NominalDatasetStream,
356 channel: &'ds ChannelDescriptor,
357 ) -> NominalChannelWriter<'ds, T> {
358 NominalChannelWriter {
359 channel,
360 stream,
361 last_flushed_at: Instant::now(),
362 unflushed: vec![],
363 }
364 }
365
366 fn push_point(&mut self, point: T) {
367 self.unflushed.push(point);
368 if self.unflushed.len() >= self.stream.opts.max_points_per_record
369 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
370 {
371 debug!(
372 "conditionally flushing {:?}, ({} points, {:?} since last)",
373 self.channel,
374 self.unflushed.len(),
375 self.last_flushed_at.elapsed()
376 );
377 self.flush();
378 }
379 }
380
381 fn flush(&mut self) {
382 if self.unflushed.is_empty() {
383 return;
384 }
385 info!(
386 "flushing writer for {:?} with {} points",
387 self.channel,
388 self.unflushed.len()
389 );
390 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
391 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
392 buf.extend(self.channel, to_flush);
393 self.last_flushed_at = Instant::now();
394 })
395 }
396}
397
398impl<T> Drop for NominalChannelWriter<'_, T>
399where
400 Vec<T>: IntoPoints,
401{
402 fn drop(&mut self) {
403 info!("flushing then dropping writer for: {:?}", self.channel);
404 self.flush();
405 }
406}
407
408pub struct NominalDoubleWriter<'ds> {
409 writer: NominalChannelWriter<'ds, DoublePoint>,
410}
411
412impl NominalDoubleWriter<'_> {
413 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
414 self.writer.push_point(DoublePoint {
415 timestamp: Some(timestamp.into_timestamp()),
416 value,
417 });
418 }
419}
420
421pub struct NominalIntegerWriter<'ds> {
422 writer: NominalChannelWriter<'ds, IntegerPoint>,
423}
424
425impl NominalIntegerWriter<'_> {
426 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
427 self.writer.push_point(IntegerPoint {
428 timestamp: Some(timestamp.into_timestamp()),
429 value,
430 });
431 }
432}
433
434pub struct NominalStringWriter<'ds> {
435 writer: NominalChannelWriter<'ds, StringPoint>,
436}
437
438impl NominalStringWriter<'_> {
439 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
440 self.writer.push_point(StringPoint {
441 timestamp: Some(timestamp.into_timestamp()),
442 value: value.into(),
443 });
444 }
445}
446
447struct SeriesBuffer {
448 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
449 count: AtomicUsize,
450 flush_time: AtomicU64,
451 condvar: Condvar,
452 max_capacity: usize,
453}
454
455struct SeriesBufferGuard<'sb> {
456 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
457 count: &'sb AtomicUsize,
458}
459
460impl SeriesBufferGuard<'_> {
461 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
462 let points = points.into_points();
463 let new_point_count = points_len(&points);
464
465 if !self.sb.contains_key(channel_descriptor) {
466 self.sb.insert(channel_descriptor.clone(), points);
467 } else {
468 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
469 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
470 existing.points.extend(new.points)
471 }
472 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
473 existing.points.extend(new.points)
474 }
475 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
476 existing.points.extend(new.points)
477 }
478 (_, _) => {
479 panic!("mismatched types");
481 }
482 }
483 }
484
485 self.count.fetch_add(new_point_count, Ordering::Release);
486 }
487}
488
489impl PartialEq for SeriesBuffer {
490 fn eq(&self, other: &Self) -> bool {
491 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
492 }
493}
494
495impl PartialOrd for SeriesBuffer {
496 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
497 let flush_time = self.flush_time.load(Ordering::Acquire);
498 let other_flush_time = other.flush_time.load(Ordering::Acquire);
499 flush_time.partial_cmp(&other_flush_time)
500 }
501}
502
503impl SeriesBuffer {
504 fn new(capacity: usize) -> Self {
505 Self {
506 points: Mutex::new(HashMap::new()),
507 count: AtomicUsize::new(0),
508 flush_time: AtomicU64::new(0),
509 condvar: Condvar::new(),
510 max_capacity: capacity,
511 }
512 }
513
514 fn has_capacity(&self, new_points_count: usize) -> bool {
519 let count = self.count.load(Ordering::Acquire);
520 count == 0 || count + new_points_count <= self.max_capacity
521 }
522
523 fn lock(&self) -> SeriesBufferGuard<'_> {
524 SeriesBufferGuard {
525 sb: self.points.lock(),
526 count: &self.count,
527 }
528 }
529
530 fn take(&self) -> (usize, Vec<Series>) {
531 let mut points = self.lock();
532 self.flush_time.store(
533 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
534 Ordering::Release,
535 );
536 let result = points
537 .sb
538 .drain()
539 .map(|(ChannelDescriptor { name, tags }, points)| {
540 let channel = Channel { name };
541 let points_obj = Points {
542 points_type: Some(points),
543 };
544 Series {
545 channel: Some(channel),
546 tags: tags
547 .map(|tags| tags.into_iter().collect())
548 .unwrap_or_default(),
549 points: Some(points_obj),
550 }
551 })
552 .collect();
553 let result_count = self
554 .count
555 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
556 .unwrap();
557 (result_count, result)
558 }
559
560 fn is_empty(&self) -> bool {
561 self.count() == 0
562 }
563
564 fn count(&self) -> usize {
565 self.count.load(Ordering::Acquire)
566 }
567
568 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
569 let mut points_lock = self.points.lock();
570 if !points_lock.is_empty() {
573 self.condvar.wait(&mut points_lock);
574 } else {
575 debug!("buffer emptied since last check, skipping condvar wait");
576 }
577 on_notify(SeriesBufferGuard {
578 sb: points_lock,
579 count: &self.count,
580 });
581 }
582
583 fn notify(&self) -> bool {
584 self.condvar.notify_one()
585 }
586}
587
588fn batch_processor(
589 running: Arc<AtomicBool>,
590 points_buffer: Arc<SeriesBuffer>,
591 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
592 max_request_delay: Duration,
593) {
594 loop {
595 debug!("starting processor loop");
596 if points_buffer.is_empty() {
597 if !running.load(Ordering::Acquire) {
598 debug!("batch processor thread exiting due to running flag");
599 drop(request_chan);
600 break;
601 } else {
602 debug!("empty points buffer, waiting");
603 thread::park_timeout(max_request_delay);
604 }
605 continue;
606 }
607 let (point_count, series) = points_buffer.take();
608
609 if points_buffer.notify() {
610 debug!("notified one waiting thread after clearing points buffer");
611 }
612
613 let write_request = WriteRequestNominal { series };
614
615 if request_chan.is_full() {
616 debug!("ready to queue request but request channel is full");
617 }
618 let rep = request_chan.send((write_request, point_count));
619 debug!("queued request for processing");
620 if rep.is_err() {
621 error!("failed to send request to dispatcher");
622 } else {
623 debug!("finished submitting request");
624 }
625
626 thread::park_timeout(max_request_delay);
627 }
628 debug!("batch processor thread exiting");
629}
630
631impl Drop for NominalDatasetStream {
632 fn drop(&mut self) {
633 debug!("starting drop for NominalDatasetStream");
634 self.running.store(false, Ordering::Release);
635 while self.unflushed_points.load(Ordering::Acquire) > 0 {
636 debug!(
637 "waiting for all points to be flushed before dropping stream, {} points remaining",
638 self.unflushed_points.load(Ordering::Acquire)
639 );
640 thread::sleep(Duration::from_millis(50));
642 }
643 }
644}
645
646fn request_dispatcher<C: WriteRequestConsumer + 'static>(
647 running: Arc<AtomicBool>,
648 unflushed_points: Arc<AtomicUsize>,
649 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
650 consumer: Arc<C>,
651) {
652 let mut total_request_time = 0;
653 loop {
654 match request_rx.recv() {
655 Ok((request, point_count)) => {
656 debug!("received writerequest from channel");
657 let req_start = Instant::now();
658 match consumer.consume(&request) {
659 Ok(_) => {
660 let time = req_start.elapsed().as_millis();
661 debug!("request of {} points sent in {} ms", point_count, time);
662 total_request_time += time as u64;
663 }
664 Err(e) => {
665 error!("Failed to send request: {e:?}");
666 }
667 }
668 unflushed_points.fetch_sub(point_count, Ordering::Release);
669
670 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
671 {
672 info!("all points flushed, closing dispatcher thread");
673 drop(request_rx);
675 break;
676 }
677 }
678 Err(e) => {
679 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
680 break;
681 }
682 }
683 }
684 debug!(
685 "request dispatcher thread exiting. total request time: {}",
686 total_request_time
687 );
688}
689
690fn points_len(points_type: &PointsType) -> usize {
691 match points_type {
692 PointsType::DoublePoints(points) => points.points.len(),
693 PointsType::StringPoints(points) => points.points.len(),
694 PointsType::IntegerPoints(points) => points.points.len(),
695 }
696}