1use std::collections::BTreeMap;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::path::PathBuf;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::AtomicUsize;
8use std::sync::atomic::Ordering;
9use std::sync::Arc;
10use std::thread;
11use std::time::Duration;
12use std::time::Instant;
13use std::time::UNIX_EPOCH;
14
15use conjure_object::BearerToken;
16use conjure_object::ResourceIdentifier;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
19use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
23use nominal_api::tonic::io::nominal::scout::api::proto::Points;
24use nominal_api::tonic::io::nominal::scout::api::proto::Series;
25use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
27use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
28use parking_lot::Condvar;
29use parking_lot::Mutex;
30use parking_lot::MutexGuard;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34use tracing::warn;
35
36use crate::client::PRODUCTION_STREAMING_CLIENT;
37use crate::consumer::AvroFileConsumer;
38use crate::consumer::DualWriteRequestConsumer;
39use crate::consumer::ListeningWriteRequestConsumer;
40use crate::consumer::NominalCoreConsumer;
41use crate::consumer::RequestConsumerWithFallback;
42use crate::consumer::WriteRequestConsumer;
43use crate::notifier::LoggingListener;
44
45#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd)]
49pub struct ChannelDescriptor {
50 pub name: String,
52 pub tags: Option<BTreeMap<String, String>>,
54}
55
56impl ChannelDescriptor {
57 pub fn new(name: impl Into<String>) -> Self {
61 Self {
62 name: name.into(),
63 tags: None,
64 }
65 }
66
67 pub fn with_tags(
69 name: impl Into<String>,
70 tags: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
71 ) -> Self {
72 Self {
73 name: name.into(),
74 tags: Some(
75 tags.into_iter()
76 .map(|(key, value)| (key.into(), value.into()))
77 .collect(),
78 ),
79 }
80 }
81}
82
83pub trait AuthProvider: Clone + Send + Sync {
84 fn token(&self) -> Option<BearerToken>;
85}
86
87pub trait IntoPoints {
88 fn into_points(self) -> PointsType;
89}
90
91impl IntoPoints for PointsType {
92 fn into_points(self) -> PointsType {
93 self
94 }
95}
96
97impl IntoPoints for Vec<DoublePoint> {
98 fn into_points(self) -> PointsType {
99 PointsType::DoublePoints(DoublePoints { points: self })
100 }
101}
102
103impl IntoPoints for Vec<StringPoint> {
104 fn into_points(self) -> PointsType {
105 PointsType::StringPoints(StringPoints { points: self })
106 }
107}
108
109impl IntoPoints for Vec<IntegerPoint> {
110 fn into_points(self) -> PointsType {
111 PointsType::IntegerPoints(IntegerPoints { points: self })
112 }
113}
114
115#[derive(Debug, Clone)]
116pub struct NominalStreamOpts {
117 pub max_points_per_record: usize,
118 pub max_request_delay: Duration,
119 pub max_buffered_requests: usize,
120 pub request_dispatcher_tasks: usize,
121}
122
123impl Default for NominalStreamOpts {
124 fn default() -> Self {
125 Self {
126 max_points_per_record: 250_000,
127 max_request_delay: Duration::from_millis(100),
128 max_buffered_requests: 4,
129 request_dispatcher_tasks: 8,
130 }
131 }
132}
133
134#[derive(Debug, Default)]
135pub struct NominalDatasetStreamBuilder {
136 stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
137 stream_to_file: Option<PathBuf>,
138 file_fallback: Option<PathBuf>,
139 opts: NominalStreamOpts,
140}
141
142impl NominalDatasetStreamBuilder {
143 pub fn new() -> NominalDatasetStreamBuilder {
144 NominalDatasetStreamBuilder {
145 ..Default::default()
146 }
147 }
148
149 pub fn stream_to_core(
150 mut self,
151 token: BearerToken,
152 dataset: ResourceIdentifier,
153 handle: tokio::runtime::Handle,
154 ) -> Self {
155 self.stream_to_core = Some((token, dataset, handle));
156 self
157 }
158 pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
159 self.stream_to_file = Some(file_path.into());
160 self
161 }
162
163 pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
164 self.file_fallback = Some(file_path.into());
165 self
166 }
167
168 pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
169 self.opts = opts;
170 self
171 }
172
173 pub fn build(self) -> NominalDatasetStream {
174 let core_consumer = self.core_consumer();
175 let file_consumer = self.file_consumer();
176 let fallback_consumer = self.fallback_consumer();
177
178 match (core_consumer, file_consumer, fallback_consumer) {
179 (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
180 (Some(_), Some(_), Some(_)) => {
181 panic!("must choose one of stream_to_file and file_fallback when streaming to core")
182 }
183 (Some(core), None, None) => self.into_stream(core),
184 (Some(core), None, Some(fallback)) => {
185 self.into_stream(RequestConsumerWithFallback::new(core, fallback))
186 }
187 (None, Some(file), None) => self.into_stream(file),
188 (None, Some(file), Some(fallback)) => {
189 self.into_stream(RequestConsumerWithFallback::new(file, fallback))
191 }
192 (Some(core), Some(file), None) => {
193 self.into_stream(DualWriteRequestConsumer::new(core, file))
194 }
195 }
196 }
197
198 fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
199 self.stream_to_core
200 .as_ref()
201 .map(|(token, dataset, handle)| {
202 NominalCoreConsumer::new(
203 PRODUCTION_STREAMING_CLIENT.clone(),
204 handle.clone(),
205 token.clone(),
206 dataset.clone(),
207 )
208 })
209 }
210
211 fn file_consumer(&self) -> Option<AvroFileConsumer> {
212 self.stream_to_file
213 .as_ref()
214 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
215 }
216
217 fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
218 self.file_fallback
219 .as_ref()
220 .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
221 }
222
223 fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
224 let logging_consumer =
225 ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
226 NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
227 }
228}
229
230#[deprecated]
232pub type NominalDatasourceStream = NominalDatasetStream;
233
234pub struct NominalDatasetStream {
235 running: Arc<AtomicBool>,
236 unflushed_points: Arc<AtomicUsize>,
237 primary_buffer: Arc<SeriesBuffer>,
238 secondary_buffer: Arc<SeriesBuffer>,
239 primary_handle: thread::JoinHandle<()>,
240 secondary_handle: thread::JoinHandle<()>,
241}
242
243impl NominalDatasetStream {
244 pub fn builder() -> NominalDatasetStreamBuilder {
245 NominalDatasetStreamBuilder::new()
246 }
247
248 pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
249 consumer: C,
250 opts: NominalStreamOpts,
251 ) -> Self {
252 let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
253 let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
254
255 let (request_tx, request_rx) =
256 crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
257
258 let running = Arc::new(AtomicBool::new(true));
259 let unflushed_points = Arc::new(AtomicUsize::new(0));
260
261 let primary_handle = thread::Builder::new()
262 .name("nmstream_primary".to_string())
263 .spawn({
264 let points_buffer = Arc::clone(&primary_buffer);
265 let running = running.clone();
266 let tx = request_tx.clone();
267 move || {
268 batch_processor(running, points_buffer, tx, opts.max_request_delay);
269 }
270 })
271 .unwrap();
272
273 let secondary_handle = thread::Builder::new()
274 .name("nmstream_secondary".to_string())
275 .spawn({
276 let secondary_buffer = Arc::clone(&secondary_buffer);
277 let running = running.clone();
278 move || {
279 batch_processor(
280 running,
281 secondary_buffer,
282 request_tx,
283 opts.max_request_delay,
284 );
285 }
286 })
287 .unwrap();
288
289 let consumer = Arc::new(consumer);
290
291 for i in 0..opts.request_dispatcher_tasks {
292 thread::Builder::new()
293 .name(format!("nmstream_dispatch_{i}"))
294 .spawn({
295 let running = Arc::clone(&running);
296 let unflushed_points = Arc::clone(&unflushed_points);
297 let rx = request_rx.clone();
298 let consumer = consumer.clone();
299 move || {
300 debug!("starting request dispatcher");
301 request_dispatcher(running, unflushed_points, rx, consumer);
302 }
303 })
304 .unwrap();
305 }
306
307 NominalDatasetStream {
308 running,
309 unflushed_points,
310 primary_buffer,
311 secondary_buffer,
312 primary_handle,
313 secondary_handle,
314 }
315 }
316
317 pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
318 let new_points = new_points.into_points();
319 let new_count = points_len(&new_points);
320
321 self.unflushed_points
322 .fetch_add(new_count, Ordering::Release);
323
324 if self.primary_buffer.has_capacity(new_count) {
325 debug!("adding {} points to primary buffer", new_count);
326 self.primary_buffer
327 .add_points(channel_descriptor, new_points);
328 } else if self.secondary_buffer.has_capacity(new_count) {
329 self.primary_handle.thread().unpark();
331 debug!("adding {} points to secondary buffer", new_count);
332 self.secondary_buffer
333 .add_points(channel_descriptor, new_points);
334 } else {
335 warn!("both buffers are full, picking least recently flushed buffer to add to");
336 let buf = if self.primary_buffer < self.secondary_buffer {
339 debug!("waiting for primary buffer to flush...");
340 self.primary_handle.thread().unpark();
341 &self.primary_buffer
342 } else {
343 debug!("waiting for secondary buffer to flush...");
344 self.secondary_handle.thread().unpark();
345 &self.secondary_buffer
346 };
347 buf.add_on_notify(channel_descriptor, new_points);
348 debug!("added points after wait to chosen buffer")
349 }
350 }
351}
352
353struct SeriesBuffer {
354 points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
355 count: AtomicUsize,
356 flush_time: AtomicU64,
357 condvar: Condvar,
358 max_capacity: usize,
359}
360
361impl PartialEq for SeriesBuffer {
362 fn eq(&self, other: &Self) -> bool {
363 self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
364 }
365}
366
367impl PartialOrd for SeriesBuffer {
368 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
369 let flush_time = self.flush_time.load(Ordering::Acquire);
370 let other_flush_time = other.flush_time.load(Ordering::Acquire);
371 flush_time.partial_cmp(&other_flush_time)
372 }
373}
374
375impl SeriesBuffer {
376 fn new(capacity: usize) -> Self {
377 Self {
378 points: Mutex::new(HashMap::new()),
379 count: AtomicUsize::new(0),
380 flush_time: AtomicU64::new(0),
381 condvar: Condvar::new(),
382 max_capacity: capacity,
383 }
384 }
385
386 fn has_capacity(&self, new_points_count: usize) -> bool {
391 let count = self.count.load(Ordering::Acquire);
392 count == 0 || count + new_points_count <= self.max_capacity
393 }
394
395 fn add_points(&self, channel_descriptor: &ChannelDescriptor, new_points: PointsType) {
396 self.inner_add_points(channel_descriptor, new_points, self.points.lock());
397 }
398
399 fn inner_add_points(
400 &self,
401 channel_descriptor: &ChannelDescriptor,
402 new_points: PointsType,
403 mut points_guard: MutexGuard<HashMap<ChannelDescriptor, PointsType>>,
404 ) {
405 self.count
406 .fetch_add(points_len(&new_points), Ordering::Release);
407 match (points_guard.get_mut(channel_descriptor), new_points) {
408 (None, new_points) => {
409 points_guard.insert(channel_descriptor.clone(), new_points);
410 }
411 (Some(PointsType::DoublePoints(points)), PointsType::DoublePoints(new_points)) => {
412 points
413 .points
414 .extend_from_slice(new_points.points.as_slice());
415 }
416 (Some(PointsType::StringPoints(points)), PointsType::StringPoints(new_points)) => {
417 points
418 .points
419 .extend_from_slice(new_points.points.as_slice());
420 }
421 (Some(PointsType::IntegerPoints(points)), PointsType::IntegerPoints(new_points)) => {
422 points
423 .points
424 .extend_from_slice(new_points.points.as_slice());
425 }
426 (Some(PointsType::DoublePoints(_)), PointsType::StringPoints(_)) => {
427 panic!(
429 "attempting to add points of the wrong type to an existing channel. expected: double. provided: string"
430 )
431 }
432 (Some(PointsType::DoublePoints(_)), PointsType::IntegerPoints(_)) => {
433 panic!(
435 "attempting to add points of the wrong type to an existing channel. expected: double. provided: string"
436 )
437 }
438 (Some(PointsType::StringPoints(_)), PointsType::DoublePoints(_)) => {
439 panic!(
441 "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
442 )
443 }
444 (Some(PointsType::StringPoints(_)), PointsType::IntegerPoints(_)) => {
445 panic!(
447 "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
448 )
449 }
450 (Some(PointsType::IntegerPoints(_)), PointsType::DoublePoints(_)) => {
451 panic!(
453 "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
454 )
455 }
456 (Some(PointsType::IntegerPoints(_)), PointsType::StringPoints(_)) => {
457 panic!(
459 "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
460 )
461 }
462 }
463 }
464
465 fn take(&self) -> (usize, Vec<Series>) {
466 let mut points = self.points.lock();
467 self.flush_time.store(
468 UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
469 Ordering::Release,
470 );
471 let result = points
472 .drain()
473 .map(|(ChannelDescriptor { name, tags }, points)| {
474 let channel = Channel { name };
475 let points_obj = Points {
476 points_type: Some(points),
477 };
478 Series {
479 channel: Some(channel),
480 tags: tags
481 .map(|tags| tags.into_iter().collect())
482 .unwrap_or_default(),
483 points: Some(points_obj),
484 }
485 })
486 .collect();
487 let result_count = self
488 .count
489 .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
490 .unwrap();
491 (result_count, result)
492 }
493
494 fn is_empty(&self) -> bool {
495 self.count() == 0
496 }
497
498 fn count(&self) -> usize {
499 self.count.load(Ordering::Acquire)
500 }
501
502 fn add_on_notify(&self, channel_descriptor: &ChannelDescriptor, new_points: PointsType) {
503 let mut points_lock = self.points.lock();
504 if !points_lock.is_empty() {
507 self.condvar.wait(&mut points_lock);
508 } else {
509 debug!("buffer emptied since last check, skipping condvar wait");
510 }
511 self.inner_add_points(channel_descriptor, new_points, points_lock);
512 }
513
514 fn notify(&self) -> bool {
515 self.condvar.notify_one()
516 }
517}
518
519fn batch_processor(
520 running: Arc<AtomicBool>,
521 points_buffer: Arc<SeriesBuffer>,
522 request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
523 max_request_delay: Duration,
524) {
525 loop {
526 debug!("starting processor loop");
527 if points_buffer.is_empty() {
528 if !running.load(Ordering::Acquire) {
529 debug!("batch processor thread exiting due to running flag");
530 drop(request_chan);
531 break;
532 } else {
533 debug!("empty points buffer, waiting");
534 thread::park_timeout(max_request_delay);
535 }
536 continue;
537 }
538 let (point_count, series) = points_buffer.take();
539
540 if points_buffer.notify() {
541 debug!("notified one waiting thread after clearing points buffer");
542 }
543
544 let write_request = WriteRequestNominal { series };
545
546 if request_chan.is_full() {
547 warn!("request channel is full");
548 }
549 let rep = request_chan.send((write_request, point_count));
550 debug!("queued request for processing");
551 if rep.is_err() {
552 error!("failed to send request to dispatcher");
553 } else {
554 debug!("finished submitting request");
555 }
556
557 thread::park_timeout(max_request_delay);
558 }
559 debug!("batch processor thread exiting");
560}
561
562impl Drop for NominalDatasetStream {
563 fn drop(&mut self) {
564 debug!("starting drop for NominalDatasourceStream");
565 self.running.store(false, Ordering::Release);
566 while self.unflushed_points.load(Ordering::Acquire) > 0 {
567 debug!(
568 "waiting for all points to be flushed before dropping stream, {} points remaining",
569 self.unflushed_points.load(Ordering::Acquire)
570 );
571 thread::sleep(Duration::from_millis(50));
573 }
574 }
575}
576
577fn request_dispatcher<C: WriteRequestConsumer + 'static>(
578 running: Arc<AtomicBool>,
579 unflushed_points: Arc<AtomicUsize>,
580 request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
581 consumer: Arc<C>,
582) {
583 let mut total_request_time = 0;
584 loop {
585 match request_rx.recv() {
586 Ok((request, point_count)) => {
587 debug!("received writerequest from channel");
588 let req_start = Instant::now();
589 match consumer.consume(&request) {
590 Ok(_) => {
591 let time = req_start.elapsed().as_millis();
592 debug!("request of {} points sent in {} ms", point_count, time);
593 total_request_time += time as u64;
594 }
595 Err(e) => {
596 error!("Failed to send request: {e:?}");
597 }
598 }
599 unflushed_points.fetch_sub(point_count, Ordering::Release);
600
601 if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
602 {
603 info!("all points flushed, closing dispatcher thread");
604 drop(request_rx);
606 break;
607 }
608 }
609 Err(e) => {
610 debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
611 break;
612 }
613 }
614 }
615 debug!(
616 "request dispatcher thread exiting. total request time: {}",
617 total_request_time
618 );
619}
620
621fn points_len(points_type: &PointsType) -> usize {
622 match points_type {
623 PointsType::DoublePoints(points) => points.points.len(),
624 PointsType::StringPoints(points) => points.points.len(),
625 PointsType::IntegerPoints(points) => points.points.len(),
626 }
627}