1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::path::PathBuf;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::AtomicU64;
6use std::sync::atomic::AtomicUsize;
7use std::sync::atomic::Ordering;
8use std::sync::Arc;
9use std::thread;
10use std::time::Duration;
11use std::time::Instant;
12use std::time::UNIX_EPOCH;
13
14use conjure_object::BearerToken;
15use conjure_object::ResourceIdentifier;
16use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
17use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
18use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
19use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::Points;
21use nominal_api::tonic::io::nominal::scout::api::proto::Series;
22use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
23use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
24use parking_lot::Condvar;
25use parking_lot::Mutex;
26use parking_lot::MutexGuard;
27use tracing::debug;
28use tracing::error;
29use tracing::info;
30use tracing::warn;
31
32use crate::client::PRODUCTION_STREAMING_CLIENT;
33use crate::consumer::AvroFileConsumer;
34use crate::consumer::DualWriteRequestConsumer;
35use crate::consumer::ListeningWriteRequestConsumer;
36use crate::consumer::NominalCoreConsumer;
37use crate::consumer::RequestConsumerWithFallback;
38use crate::consumer::WriteRequestConsumer;
39use crate::notifier::LoggingListener;
40use crate::types::ChannelDescriptor;
41use crate::types::IntoPoints;
42use crate::types::IntoTimestamp;
43
44#[derive(Debug, Clone)]
45pub struct NominalStreamOpts {
46 pub max_points_per_record: usize,
47 pub max_request_delay: Duration,
48 pub max_buffered_requests: usize,
49 pub request_dispatcher_tasks: usize,
50}
51
52impl Default for NominalStreamOpts {
53 fn default() -> Self {
54 Self {
55 max_points_per_record: 250_000,
56 max_request_delay: Duration::from_millis(100),
57 max_buffered_requests: 4,
58 request_dispatcher_tasks: 8,
59 }
60 }
61}
62
63#[derive(Debug, Default)]
64pub struct NominalDatasetStreamBuilder {
65 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
66 stream_to_file: Option<PathBuf>,
67 file_fallback: Option<PathBuf>,
68 opts: NominalStreamOpts,
69}
70
71impl NominalDatasetStreamBuilder {
72 pub fn new() -> NominalDatasetStreamBuilder {
73 NominalDatasetStreamBuilder {
74 ..Default::default()
75 }
76 }
77
78 pub fn stream_to_core(
79 mut self,
80 token: BearerToken,
81 dataset: ResourceIdentifier,
82 handle: tokio::runtime::Handle,
83 ) -> Self {
84 self.stream_to_core = Some((token, dataset, handle));
85 self
86 }
87 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
88 self.stream_to_file = Some(file_path.into());
89 self
90 }
91
92 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
93 self.file_fallback = Some(file_path.into());
94 self
95 }
96
97 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
98 self.opts = opts;
99 self
100 }
101
102 pub fn build(self) -> NominalDatasetStream {
103 let core_consumer = self.core_consumer();
104 let file_consumer = self.file_consumer();
105 let fallback_consumer = self.fallback_consumer();
106
107 match (core_consumer, file_consumer, fallback_consumer) {
108 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
109 (Some(_), Some(_), Some(_)) => {
110 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
111 }
112 (Some(core), None, None) => self.into_stream(core),
113 (Some(core), None, Some(fallback)) => {
114 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
115 }
116 (None, Some(file), None) => self.into_stream(file),
117 (None, Some(file), Some(fallback)) => {
118 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
120 }
121 (Some(core), Some(file), None) => {
122 self.into_stream(DualWriteRequestConsumer::new(core, file))
123 }
124 }
125 }
126
127 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
128 self.stream_to_core
129 .as_ref()
130 .map(|(token, dataset, handle)| {
131 NominalCoreConsumer::new(
132 PRODUCTION_STREAMING_CLIENT.clone(),
133 handle.clone(),
134 token.clone(),
135 dataset.clone(),
136 )
137 })
138 }
139
140 fn file_consumer(&self) -> Option<AvroFileConsumer> {
141 self.stream_to_file
142 .as_ref()
143 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
144 }
145
146 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
147 self.file_fallback
148 .as_ref()
149 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
150 }
151
152 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
153 let logging_consumer =
154 ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
155 NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
156 }
157}
158
159#[deprecated]
161pub type NominalDatasourceStream = NominalDatasetStream;
162
163pub struct NominalDatasetStream {
164 opts: NominalStreamOpts,
165 running: Arc<AtomicBool>,
166 unflushed_points: Arc<AtomicUsize>,
167 primary_buffer: Arc<SeriesBuffer>,
168 secondary_buffer: Arc<SeriesBuffer>,
169 primary_handle: thread::JoinHandle<()>,
170 secondary_handle: thread::JoinHandle<()>,
171}
172
173impl NominalDatasetStream {
174 pub fn builder() -> NominalDatasetStreamBuilder {
175 NominalDatasetStreamBuilder::new()
176 }
177
178 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
179 consumer: C,
180 opts: NominalStreamOpts,
181 ) -> Self {
182 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
183 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
184
185 let (request_tx, request_rx) =
186 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
187
188 let running = Arc::new(AtomicBool::new(true));
189 let unflushed_points = Arc::new(AtomicUsize::new(0));
190
191 let primary_handle = thread::Builder::new()
192 .name("nmstream_primary".to_string())
193 .spawn({
194 let points_buffer = Arc::clone(&primary_buffer);
195 let running = running.clone();
196 let tx = request_tx.clone();
197 move || {
198 batch_processor(running, points_buffer, tx, opts.max_request_delay);
199 }
200 })
201 .unwrap();
202
203 let secondary_handle = thread::Builder::new()
204 .name("nmstream_secondary".to_string())
205 .spawn({
206 let secondary_buffer = Arc::clone(&secondary_buffer);
207 let running = running.clone();
208 move || {
209 batch_processor(
210 running,
211 secondary_buffer,
212 request_tx,
213 opts.max_request_delay,
214 );
215 }
216 })
217 .unwrap();
218
219 let consumer = Arc::new(consumer);
220
221 for i in 0..opts.request_dispatcher_tasks {
222 thread::Builder::new()
223 .name(format!("nmstream_dispatch_{i}"))
224 .spawn({
225 let running = Arc::clone(&running);
226 let unflushed_points = Arc::clone(&unflushed_points);
227 let rx = request_rx.clone();
228 let consumer = consumer.clone();
229 move || {
230 debug!("starting request dispatcher");
231 request_dispatcher(running, unflushed_points, rx, consumer);
232 }
233 })
234 .unwrap();
235 }
236
237 NominalDatasetStream {
238 opts,
239 running,
240 unflushed_points,
241 primary_buffer,
242 secondary_buffer,
243 primary_handle,
244 secondary_handle,
245 }
246 }
247
248 pub fn double_writer<'a>(
249 &'a self,
250 channel_descriptor: &'a ChannelDescriptor,
251 ) -> NominalDoubleWriter<'a> {
252 NominalDoubleWriter {
253 writer: NominalChannelWriter::new(self, channel_descriptor),
254 }
255 }
256
257 pub fn string_writer<'a>(
258 &'a self,
259 channel_descriptor: &'a ChannelDescriptor,
260 ) -> NominalStringWriter<'a> {
261 NominalStringWriter {
262 writer: NominalChannelWriter::new(self, channel_descriptor),
263 }
264 }
265
266 pub fn integer_writer<'a>(
267 &'a self,
268 channel_descriptor: &'a ChannelDescriptor,
269 ) -> NominalIntegerWriter<'a> {
270 NominalIntegerWriter {
271 writer: NominalChannelWriter::new(self, channel_descriptor),
272 }
273 }
274
275 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
276 let new_points = new_points.into_points();
277 let new_count = points_len(&new_points);
278
279 self.when_capacity(new_count, |mut sb| {
280 sb.extend(channel_descriptor, new_points)
281 });
282 }
283
284 fn when_capacity(&self, new_count: usize, callback: impl FnOnce(SeriesBufferGuard)) {
285 self.unflushed_points
286 .fetch_add(new_count, Ordering::Release);
287
288 if self.primary_buffer.has_capacity(new_count) {
289 debug!("adding {} points to primary buffer", new_count);
290 callback(self.primary_buffer.lock());
291 } else if self.secondary_buffer.has_capacity(new_count) {
292 self.primary_handle.thread().unpark();
294 debug!("adding {} points to secondary buffer", new_count);
295 callback(self.secondary_buffer.lock());
296 } else {
297 warn!(
298 "both buffers full, picking least recently flushed buffer to append single point"
299 );
300 let buf = if self.primary_buffer < self.secondary_buffer {
301 debug!("waiting for primary buffer to flush...");
302 self.primary_handle.thread().unpark();
303 &self.primary_buffer
304 } else {
305 debug!("waiting for secondary buffer to flush...");
306 self.secondary_handle.thread().unpark();
307 &self.secondary_buffer
308 };
309
310 buf.on_notify(callback);
311 }
312 }
313}
314
315pub struct NominalChannelWriter<'ds, T>
316where
317 Vec<T>: IntoPoints,
318{
319 channel: &'ds ChannelDescriptor,
320 stream: &'ds NominalDatasetStream,
321 last_flushed_at: Instant,
322 unflushed: Vec<T>,
323}
324
325impl<T> NominalChannelWriter<'_, T>
326where
327 Vec<T>: IntoPoints,
328{
329 fn new<'ds>(
330 stream: &'ds NominalDatasetStream,
331 channel: &'ds ChannelDescriptor,
332 ) -> NominalChannelWriter<'ds, T> {
333 NominalChannelWriter {
334 channel,
335 stream,
336 last_flushed_at: Instant::now(),
337 unflushed: vec![],
338 }
339 }
340
341 fn push_point(&mut self, point: T) {
342 self.unflushed.push(point);
343 if self.unflushed.len() >= self.stream.opts.max_points_per_record
344 || self.last_flushed_at.elapsed() > self.stream.opts.max_request_delay
345 {
346 debug!(
347 "conditionally flushing {:?}, ({} points, {:?} since last)",
348 self.channel,
349 self.unflushed.len(),
350 self.last_flushed_at.elapsed()
351 );
352 self.flush();
353 }
354 }
355
356 fn flush(&mut self) {
357 if self.unflushed.is_empty() {
358 return;
359 }
360 info!(
361 "flushing writer for {:?} with {} points",
362 self.channel,
363 self.unflushed.len()
364 );
365 self.stream.when_capacity(self.unflushed.len(), |mut buf| {
366 let to_flush: Vec<T> = self.unflushed.drain(..).collect();
367 buf.extend(self.channel, to_flush);
368 self.last_flushed_at = Instant::now();
369 })
370 }
371}
372
373impl<T> Drop for NominalChannelWriter<'_, T>
374where
375 Vec<T>: IntoPoints,
376{
377 fn drop(&mut self) {
378 info!("flushing then dropping writer for: {:?}", self.channel);
379 self.flush();
380 }
381}
382
383pub struct NominalDoubleWriter<'ds> {
384 writer: NominalChannelWriter<'ds, DoublePoint>,
385}
386
387impl NominalDoubleWriter<'_> {
388 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: f64) {
389 self.writer.push_point(DoublePoint {
390 timestamp: Some(timestamp.into_timestamp()),
391 value,
392 });
393 }
394}
395
396pub struct NominalIntegerWriter<'ds> {
397 writer: NominalChannelWriter<'ds, IntegerPoint>,
398}
399
400impl NominalIntegerWriter<'_> {
401 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: i64) {
402 self.writer.push_point(IntegerPoint {
403 timestamp: Some(timestamp.into_timestamp()),
404 value,
405 });
406 }
407}
408
409pub struct NominalStringWriter<'ds> {
410 writer: NominalChannelWriter<'ds, StringPoint>,
411}
412
413impl NominalStringWriter<'_> {
414 pub fn push(&mut self, timestamp: impl IntoTimestamp, value: impl Into<String>) {
415 self.writer.push_point(StringPoint {
416 timestamp: Some(timestamp.into_timestamp()),
417 value: value.into(),
418 });
419 }
420}
421
422struct SeriesBuffer {
423 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
424 count: AtomicUsize,
425 flush_time: AtomicU64,
426 condvar: Condvar,
427 max_capacity: usize,
428}
429
430struct SeriesBufferGuard<'sb> {
431 sb: MutexGuard<'sb, HashMap<ChannelDescriptor, PointsType>>,
432 count: &'sb AtomicUsize,
433}
434
435impl SeriesBufferGuard<'_> {
436 fn extend(&mut self, channel_descriptor: &ChannelDescriptor, points: impl IntoPoints) {
437 let points = points.into_points();
438 let new_point_count = points_len(&points);
439
440 if !self.sb.contains_key(channel_descriptor) {
441 self.sb.insert(channel_descriptor.clone(), points);
442 } else {
443 match (self.sb.get_mut(channel_descriptor).unwrap(), points) {
444 (PointsType::DoublePoints(existing), PointsType::DoublePoints(new)) => {
445 existing.points.extend(new.points)
446 }
447 (PointsType::StringPoints(existing), PointsType::StringPoints(new)) => {
448 existing.points.extend(new.points)
449 }
450 (PointsType::IntegerPoints(existing), PointsType::IntegerPoints(new)) => {
451 existing.points.extend(new.points)
452 }
453 (_, _) => {
454 panic!("mismatched types");
456 }
457 }
458 }
459
460 self.count.fetch_add(new_point_count, Ordering::Release);
461 }
462}
463
464impl PartialEq for SeriesBuffer {
465 fn eq(&self, other: &Self) -> bool {
466 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
467 }
468}
469
470impl PartialOrd for SeriesBuffer {
471 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
472 let flush_time = self.flush_time.load(Ordering::Acquire);
473 let other_flush_time = other.flush_time.load(Ordering::Acquire);
474 flush_time.partial_cmp(&other_flush_time)
475 }
476}
477
478impl SeriesBuffer {
479 fn new(capacity: usize) -> Self {
480 Self {
481 points: Mutex::new(HashMap::new()),
482 count: AtomicUsize::new(0),
483 flush_time: AtomicU64::new(0),
484 condvar: Condvar::new(),
485 max_capacity: capacity,
486 }
487 }
488
489 fn has_capacity(&self, new_points_count: usize) -> bool {
494 let count = self.count.load(Ordering::Acquire);
495 count == 0 || count + new_points_count <= self.max_capacity
496 }
497
498 fn lock(&self) -> SeriesBufferGuard<'_> {
499 SeriesBufferGuard {
500 sb: self.points.lock(),
501 count: &self.count,
502 }
503 }
504
505 fn take(&self) -> (usize, Vec<Series>) {
506 let mut points = self.lock();
507 self.flush_time.store(
508 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
509 Ordering::Release,
510 );
511 let result = points
512 .sb
513 .drain()
514 .map(|(ChannelDescriptor { name, tags }, points)| {
515 let channel = Channel { name };
516 let points_obj = Points {
517 points_type: Some(points),
518 };
519 Series {
520 channel: Some(channel),
521 tags: tags
522 .map(|tags| tags.into_iter().collect())
523 .unwrap_or_default(),
524 points: Some(points_obj),
525 }
526 })
527 .collect();
528 let result_count = self
529 .count
530 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
531 .unwrap();
532 (result_count, result)
533 }
534
535 fn is_empty(&self) -> bool {
536 self.count() == 0
537 }
538
539 fn count(&self) -> usize {
540 self.count.load(Ordering::Acquire)
541 }
542
543 fn on_notify(&self, on_notify: impl FnOnce(SeriesBufferGuard)) {
544 let mut points_lock = self.points.lock();
545 if !points_lock.is_empty() {
548 self.condvar.wait(&mut points_lock);
549 } else {
550 debug!("buffer emptied since last check, skipping condvar wait");
551 }
552 on_notify(SeriesBufferGuard {
553 sb: points_lock,
554 count: &self.count,
555 });
556 }
557
558 fn notify(&self) -> bool {
559 self.condvar.notify_one()
560 }
561}
562
563fn batch_processor(
564 running: Arc<AtomicBool>,
565 points_buffer: Arc<SeriesBuffer>,
566 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
567 max_request_delay: Duration,
568) {
569 loop {
570 debug!("starting processor loop");
571 if points_buffer.is_empty() {
572 if !running.load(Ordering::Acquire) {
573 debug!("batch processor thread exiting due to running flag");
574 drop(request_chan);
575 break;
576 } else {
577 debug!("empty points buffer, waiting");
578 thread::park_timeout(max_request_delay);
579 }
580 continue;
581 }
582 let (point_count, series) = points_buffer.take();
583
584 if points_buffer.notify() {
585 debug!("notified one waiting thread after clearing points buffer");
586 }
587
588 let write_request = WriteRequestNominal { series };
589
590 if request_chan.is_full() {
591 warn!("request channel is full");
592 }
593 let rep = request_chan.send((write_request, point_count));
594 debug!("queued request for processing");
595 if rep.is_err() {
596 error!("failed to send request to dispatcher");
597 } else {
598 debug!("finished submitting request");
599 }
600
601 thread::park_timeout(max_request_delay);
602 }
603 debug!("batch processor thread exiting");
604}
605
606impl Drop for NominalDatasetStream {
607 fn drop(&mut self) {
608 debug!("starting drop for NominalDatasetStream");
609 self.running.store(false, Ordering::Release);
610 while self.unflushed_points.load(Ordering::Acquire) > 0 {
611 debug!(
612 "waiting for all points to be flushed before dropping stream, {} points remaining",
613 self.unflushed_points.load(Ordering::Acquire)
614 );
615 thread::sleep(Duration::from_millis(50));
617 }
618 }
619}
620
621fn request_dispatcher<C: WriteRequestConsumer + 'static>(
622 running: Arc<AtomicBool>,
623 unflushed_points: Arc<AtomicUsize>,
624 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
625 consumer: Arc<C>,
626) {
627 let mut total_request_time = 0;
628 loop {
629 match request_rx.recv() {
630 Ok((request, point_count)) => {
631 debug!("received writerequest from channel");
632 let req_start = Instant::now();
633 match consumer.consume(&request) {
634 Ok(_) => {
635 let time = req_start.elapsed().as_millis();
636 debug!("request of {} points sent in {} ms", point_count, time);
637 total_request_time += time as u64;
638 }
639 Err(e) => {
640 error!("Failed to send request: {e:?}");
641 }
642 }
643 unflushed_points.fetch_sub(point_count, Ordering::Release);
644
645 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
646 {
647 info!("all points flushed, closing dispatcher thread");
648 drop(request_rx);
650 break;
651 }
652 }
653 Err(e) => {
654 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
655 break;
656 }
657 }
658 }
659 debug!(
660 "request dispatcher thread exiting. total request time: {}",
661 total_request_time
662 );
663}
664
665fn points_len(points_type: &PointsType) -> usize {
666 match points_type {
667 PointsType::DoublePoints(points) => points.points.len(),
668 PointsType::StringPoints(points) => points.points.len(),
669 PointsType::IntegerPoints(points) => points.points.len(),
670 }
671}