nominal_streaming/
stream.rs

1use std::collections::BTreeMap;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::path::PathBuf;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::AtomicUsize;
8use std::sync::atomic::Ordering;
9use std::sync::Arc;
10use std::thread;
11use std::time::Duration;
12use std::time::Instant;
13use std::time::UNIX_EPOCH;
14
15use conjure_object::BearerToken;
16use conjure_object::ResourceIdentifier;
17use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
18use nominal_api::tonic::io::nominal::scout::api::proto::Channel;
19use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
20use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
21use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
22use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
23use nominal_api::tonic::io::nominal::scout::api::proto::Points;
24use nominal_api::tonic::io::nominal::scout::api::proto::Series;
25use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
26use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
27use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
28use parking_lot::Condvar;
29use parking_lot::Mutex;
30use parking_lot::MutexGuard;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34use tracing::warn;
35
36use crate::client::PRODUCTION_STREAMING_CLIENT;
37use crate::consumer::AvroFileConsumer;
38use crate::consumer::DualWriteRequestConsumer;
39use crate::consumer::ListeningWriteRequestConsumer;
40use crate::consumer::NominalCoreConsumer;
41use crate::consumer::RequestConsumerWithFallback;
42use crate::consumer::WriteRequestConsumer;
43use crate::notifier::LoggingListener;
44
45/// A descriptor for a channel.
46///
47/// Note that this is used internally to compare channels.
48#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd)]
49pub struct ChannelDescriptor {
50    /// The name of the channel.
51    pub name: String,
52    /// The tags associated with the channel, if any.
53    pub tags: Option<BTreeMap<String, String>>,
54}
55
56impl ChannelDescriptor {
57    /// Creates a new channel descriptor from the given `name`.
58    ///
59    /// If you would like to include tags, see also [`Self::new_with_tags`].
60    pub fn new(name: impl Into<String>) -> Self {
61        Self {
62            name: name.into(),
63            tags: None,
64        }
65    }
66
67    /// Creates a new channel descriptor from the given `name` and `tags`.
68    pub fn with_tags(
69        name: impl Into<String>,
70        tags: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
71    ) -> Self {
72        Self {
73            name: name.into(),
74            tags: Some(
75                tags.into_iter()
76                    .map(|(key, value)| (key.into(), value.into()))
77                    .collect(),
78            ),
79        }
80    }
81}
82
83pub trait AuthProvider: Clone + Send + Sync {
84    fn token(&self) -> Option<BearerToken>;
85}
86
87pub trait IntoPoints {
88    fn into_points(self) -> PointsType;
89}
90
91impl IntoPoints for PointsType {
92    fn into_points(self) -> PointsType {
93        self
94    }
95}
96
97impl IntoPoints for Vec<DoublePoint> {
98    fn into_points(self) -> PointsType {
99        PointsType::DoublePoints(DoublePoints { points: self })
100    }
101}
102
103impl IntoPoints for Vec<StringPoint> {
104    fn into_points(self) -> PointsType {
105        PointsType::StringPoints(StringPoints { points: self })
106    }
107}
108
109impl IntoPoints for Vec<IntegerPoint> {
110    fn into_points(self) -> PointsType {
111        PointsType::IntegerPoints(IntegerPoints { points: self })
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct NominalStreamOpts {
117    pub max_points_per_record: usize,
118    pub max_request_delay: Duration,
119    pub max_buffered_requests: usize,
120    pub request_dispatcher_tasks: usize,
121}
122
123impl Default for NominalStreamOpts {
124    fn default() -> Self {
125        Self {
126            max_points_per_record: 250_000,
127            max_request_delay: Duration::from_millis(100),
128            max_buffered_requests: 4,
129            request_dispatcher_tasks: 8,
130        }
131    }
132}
133
134#[derive(Debug, Default)]
135pub struct NominalDatasetStreamBuilder {
136    stream_to_core: Option<(BearerToken, ResourceIdentifier, tokio::runtime::Handle)>,
137    stream_to_file: Option<PathBuf>,
138    file_fallback: Option<PathBuf>,
139    opts: NominalStreamOpts,
140}
141
142impl NominalDatasetStreamBuilder {
143    pub fn new() -> NominalDatasetStreamBuilder {
144        NominalDatasetStreamBuilder {
145            ..Default::default()
146        }
147    }
148
149    pub fn stream_to_core(
150        mut self,
151        token: BearerToken,
152        dataset: ResourceIdentifier,
153        handle: tokio::runtime::Handle,
154    ) -> Self {
155        self.stream_to_core = Some((token, dataset, handle));
156        self
157    }
158    pub fn stream_to_file(mut self, file_path: impl Into<PathBuf>) -> Self {
159        self.stream_to_file = Some(file_path.into());
160        self
161    }
162
163    pub fn with_file_fallback(mut self, file_path: impl Into<PathBuf>) -> Self {
164        self.file_fallback = Some(file_path.into());
165        self
166    }
167
168    pub fn with_options(mut self, opts: NominalStreamOpts) -> Self {
169        self.opts = opts;
170        self
171    }
172
173    pub fn build(self) -> NominalDatasetStream {
174        let core_consumer = self.core_consumer();
175        let file_consumer = self.file_consumer();
176        let fallback_consumer = self.fallback_consumer();
177
178        match (core_consumer, file_consumer, fallback_consumer) {
179            (None, None, _) => panic!("nominal dataset stream must either stream to file or core"),
180            (Some(_), Some(_), Some(_)) => {
181                panic!("must choose one of stream_to_file and file_fallback when streaming to core")
182            }
183            (Some(core), None, None) => self.into_stream(core),
184            (Some(core), None, Some(fallback)) => {
185                self.into_stream(RequestConsumerWithFallback::new(core, fallback))
186            }
187            (None, Some(file), None) => self.into_stream(file),
188            (None, Some(file), Some(fallback)) => {
189                // todo: should this even be supported?
190                self.into_stream(RequestConsumerWithFallback::new(file, fallback))
191            }
192            (Some(core), Some(file), None) => {
193                self.into_stream(DualWriteRequestConsumer::new(core, file))
194            }
195        }
196    }
197
198    fn core_consumer(&self) -> Option<NominalCoreConsumer<BearerToken>> {
199        self.stream_to_core
200            .as_ref()
201            .map(|(token, dataset, handle)| {
202                NominalCoreConsumer::new(
203                    PRODUCTION_STREAMING_CLIENT.clone(),
204                    handle.clone(),
205                    token.clone(),
206                    dataset.clone(),
207                )
208            })
209    }
210
211    fn file_consumer(&self) -> Option<AvroFileConsumer> {
212        self.stream_to_file
213            .as_ref()
214            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
215    }
216
217    fn fallback_consumer(&self) -> Option<AvroFileConsumer> {
218        self.file_fallback
219            .as_ref()
220            .map(|path| AvroFileConsumer::new_with_full_path(path).unwrap())
221    }
222
223    fn into_stream<C: WriteRequestConsumer + 'static>(self, consumer: C) -> NominalDatasetStream {
224        let logging_consumer =
225            ListeningWriteRequestConsumer::new(consumer, vec![Arc::new(LoggingListener)]);
226        NominalDatasetStream::new_with_consumer(logging_consumer, self.opts)
227    }
228}
229
230// for backcompat, new code should use NominalDatasetStream
231#[deprecated]
232pub type NominalDatasourceStream = NominalDatasetStream;
233
234pub struct NominalDatasetStream {
235    running: Arc<AtomicBool>,
236    unflushed_points: Arc<AtomicUsize>,
237    primary_buffer: Arc<SeriesBuffer>,
238    secondary_buffer: Arc<SeriesBuffer>,
239    primary_handle: thread::JoinHandle<()>,
240    secondary_handle: thread::JoinHandle<()>,
241}
242
243impl NominalDatasetStream {
244    pub fn builder() -> NominalDatasetStreamBuilder {
245        NominalDatasetStreamBuilder::new()
246    }
247
248    pub fn new_with_consumer<C: WriteRequestConsumer + 'static>(
249        consumer: C,
250        opts: NominalStreamOpts,
251    ) -> Self {
252        let primary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
253        let secondary_buffer = Arc::new(SeriesBuffer::new(opts.max_points_per_record));
254
255        let (request_tx, request_rx) =
256            crossbeam_channel::bounded::<(WriteRequestNominal, usize)>(opts.max_buffered_requests);
257
258        let running = Arc::new(AtomicBool::new(true));
259        let unflushed_points = Arc::new(AtomicUsize::new(0));
260
261        let primary_handle = thread::Builder::new()
262            .name("nmstream_primary".to_string())
263            .spawn({
264                let points_buffer = Arc::clone(&primary_buffer);
265                let running = running.clone();
266                let tx = request_tx.clone();
267                move || {
268                    batch_processor(running, points_buffer, tx, opts.max_request_delay);
269                }
270            })
271            .unwrap();
272
273        let secondary_handle = thread::Builder::new()
274            .name("nmstream_secondary".to_string())
275            .spawn({
276                let secondary_buffer = Arc::clone(&secondary_buffer);
277                let running = running.clone();
278                move || {
279                    batch_processor(
280                        running,
281                        secondary_buffer,
282                        request_tx,
283                        opts.max_request_delay,
284                    );
285                }
286            })
287            .unwrap();
288
289        let consumer = Arc::new(consumer);
290
291        for i in 0..opts.request_dispatcher_tasks {
292            thread::Builder::new()
293                .name(format!("nmstream_dispatch_{i}"))
294                .spawn({
295                    let running = Arc::clone(&running);
296                    let unflushed_points = Arc::clone(&unflushed_points);
297                    let rx = request_rx.clone();
298                    let consumer = consumer.clone();
299                    move || {
300                        debug!("starting request dispatcher");
301                        request_dispatcher(running, unflushed_points, rx, consumer);
302                    }
303                })
304                .unwrap();
305        }
306
307        NominalDatasetStream {
308            running,
309            unflushed_points,
310            primary_buffer,
311            secondary_buffer,
312            primary_handle,
313            secondary_handle,
314        }
315    }
316
317    pub fn enqueue(&self, channel_descriptor: &ChannelDescriptor, new_points: impl IntoPoints) {
318        let new_points = new_points.into_points();
319        let new_count = points_len(&new_points);
320
321        self.unflushed_points
322            .fetch_add(new_count, Ordering::Release);
323
324        if self.primary_buffer.has_capacity(new_count) {
325            debug!("adding {} points to primary buffer", new_count);
326            self.primary_buffer
327                .add_points(channel_descriptor, new_points);
328        } else if self.secondary_buffer.has_capacity(new_count) {
329            // primary buffer is definitely full
330            self.primary_handle.thread().unpark();
331            debug!("adding {} points to secondary buffer", new_count);
332            self.secondary_buffer
333                .add_points(channel_descriptor, new_points);
334        } else {
335            warn!("both buffers are full, picking least recently flushed buffer to add to");
336            // both buffers are full - wait on the buffer that flushed least recently (i.e more
337            // likely that it's nearly done)
338            let buf = if self.primary_buffer < self.secondary_buffer {
339                debug!("waiting for primary buffer to flush...");
340                self.primary_handle.thread().unpark();
341                &self.primary_buffer
342            } else {
343                debug!("waiting for secondary buffer to flush...");
344                self.secondary_handle.thread().unpark();
345                &self.secondary_buffer
346            };
347            buf.add_on_notify(channel_descriptor, new_points);
348            debug!("added points after wait to chosen buffer")
349        }
350    }
351}
352
353struct SeriesBuffer {
354    points: Mutex<HashMap<ChannelDescriptor, PointsType>>,
355    count: AtomicUsize,
356    flush_time: AtomicU64,
357    condvar: Condvar,
358    max_capacity: usize,
359}
360
361impl PartialEq for SeriesBuffer {
362    fn eq(&self, other: &Self) -> bool {
363        self.flush_time.load(Ordering::Acquire) == other.flush_time.load(Ordering::Acquire)
364    }
365}
366
367impl PartialOrd for SeriesBuffer {
368    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
369        let flush_time = self.flush_time.load(Ordering::Acquire);
370        let other_flush_time = other.flush_time.load(Ordering::Acquire);
371        flush_time.partial_cmp(&other_flush_time)
372    }
373}
374
375impl SeriesBuffer {
376    fn new(capacity: usize) -> Self {
377        Self {
378            points: Mutex::new(HashMap::new()),
379            count: AtomicUsize::new(0),
380            flush_time: AtomicU64::new(0),
381            condvar: Condvar::new(),
382            max_capacity: capacity,
383        }
384    }
385
386    /// Checks if the buffer has enough capacity to add new points.
387    /// Note that the buffer can be larger than MAX_POINTS_PER_RECORD if a single batch of points
388    /// larger than MAX_POINTS_PER_RECORD is inserted while the buffer is empty. This avoids needing
389    /// to handle splitting batches of points across multiple requests.
390    fn has_capacity(&self, new_points_count: usize) -> bool {
391        let count = self.count.load(Ordering::Acquire);
392        count == 0 || count + new_points_count <= self.max_capacity
393    }
394
395    fn add_points(&self, channel_descriptor: &ChannelDescriptor, new_points: PointsType) {
396        self.inner_add_points(channel_descriptor, new_points, self.points.lock());
397    }
398
399    fn inner_add_points(
400        &self,
401        channel_descriptor: &ChannelDescriptor,
402        new_points: PointsType,
403        mut points_guard: MutexGuard<HashMap<ChannelDescriptor, PointsType>>,
404    ) {
405        self.count
406            .fetch_add(points_len(&new_points), Ordering::Release);
407        match (points_guard.get_mut(channel_descriptor), new_points) {
408            (None, new_points) => {
409                points_guard.insert(channel_descriptor.clone(), new_points);
410            }
411            (Some(PointsType::DoublePoints(points)), PointsType::DoublePoints(new_points)) => {
412                points
413                    .points
414                    .extend_from_slice(new_points.points.as_slice());
415            }
416            (Some(PointsType::StringPoints(points)), PointsType::StringPoints(new_points)) => {
417                points
418                    .points
419                    .extend_from_slice(new_points.points.as_slice());
420            }
421            (Some(PointsType::IntegerPoints(points)), PointsType::IntegerPoints(new_points)) => {
422                points
423                    .points
424                    .extend_from_slice(new_points.points.as_slice());
425            }
426            (Some(PointsType::DoublePoints(_)), PointsType::StringPoints(_)) => {
427                // todo: return an error instead of panicking
428                panic!(
429                    "attempting to add points of the wrong type to an existing channel. expected: double. provided: string"
430                )
431            }
432            (Some(PointsType::DoublePoints(_)), PointsType::IntegerPoints(_)) => {
433                // todo: return an error instead of panicking
434                panic!(
435                    "attempting to add points of the wrong type to an existing channel. expected: double. provided: string"
436                )
437            }
438            (Some(PointsType::StringPoints(_)), PointsType::DoublePoints(_)) => {
439                // todo: return an error instead of panicking
440                panic!(
441                    "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
442                )
443            }
444            (Some(PointsType::StringPoints(_)), PointsType::IntegerPoints(_)) => {
445                // todo: return an error instead of panicking
446                panic!(
447                    "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
448                )
449            }
450            (Some(PointsType::IntegerPoints(_)), PointsType::DoublePoints(_)) => {
451                // todo: return an error instead of panicking
452                panic!(
453                    "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
454                )
455            }
456            (Some(PointsType::IntegerPoints(_)), PointsType::StringPoints(_)) => {
457                // todo: return an error instead of panicking
458                panic!(
459                    "attempting to add points of the wrong type to an existing channel. expected: string. provided: double"
460                )
461            }
462        }
463    }
464
465    fn take(&self) -> (usize, Vec<Series>) {
466        let mut points = self.points.lock();
467        self.flush_time.store(
468            UNIX_EPOCH.elapsed().unwrap().as_nanos() as u64,
469            Ordering::Release,
470        );
471        let result = points
472            .drain()
473            .map(|(ChannelDescriptor { name, tags }, points)| {
474                let channel = Channel { name };
475                let points_obj = Points {
476                    points_type: Some(points),
477                };
478                Series {
479                    channel: Some(channel),
480                    tags: tags
481                        .map(|tags| tags.into_iter().collect())
482                        .unwrap_or_default(),
483                    points: Some(points_obj),
484                }
485            })
486            .collect();
487        let result_count = self
488            .count
489            .fetch_update(Ordering::Release, Ordering::Acquire, |_| Some(0))
490            .unwrap();
491        (result_count, result)
492    }
493
494    fn is_empty(&self) -> bool {
495        self.count() == 0
496    }
497
498    fn count(&self) -> usize {
499        self.count.load(Ordering::Acquire)
500    }
501
502    fn add_on_notify(&self, channel_descriptor: &ChannelDescriptor, new_points: PointsType) {
503        let mut points_lock = self.points.lock();
504        // concurrency bug without this - the buffer could have been emptied since we
505        // checked the count, so this will wait forever & block any new points from entering
506        if !points_lock.is_empty() {
507            self.condvar.wait(&mut points_lock);
508        } else {
509            debug!("buffer emptied since last check, skipping condvar wait");
510        }
511        self.inner_add_points(channel_descriptor, new_points, points_lock);
512    }
513
514    fn notify(&self) -> bool {
515        self.condvar.notify_one()
516    }
517}
518
519fn batch_processor(
520    running: Arc<AtomicBool>,
521    points_buffer: Arc<SeriesBuffer>,
522    request_chan: crossbeam_channel::Sender<(WriteRequestNominal, usize)>,
523    max_request_delay: Duration,
524) {
525    loop {
526        debug!("starting processor loop");
527        if points_buffer.is_empty() {
528            if !running.load(Ordering::Acquire) {
529                debug!("batch processor thread exiting due to running flag");
530                drop(request_chan);
531                break;
532            } else {
533                debug!("empty points buffer, waiting");
534                thread::park_timeout(max_request_delay);
535            }
536            continue;
537        }
538        let (point_count, series) = points_buffer.take();
539
540        if points_buffer.notify() {
541            debug!("notified one waiting thread after clearing points buffer");
542        }
543
544        let write_request = WriteRequestNominal { series };
545
546        if request_chan.is_full() {
547            warn!("request channel is full");
548        }
549        let rep = request_chan.send((write_request, point_count));
550        debug!("queued request for processing");
551        if rep.is_err() {
552            error!("failed to send request to dispatcher");
553        } else {
554            debug!("finished submitting request");
555        }
556
557        thread::park_timeout(max_request_delay);
558    }
559    debug!("batch processor thread exiting");
560}
561
562impl Drop for NominalDatasetStream {
563    fn drop(&mut self) {
564        debug!("starting drop for NominalDatasourceStream");
565        self.running.store(false, Ordering::Release);
566        while self.unflushed_points.load(Ordering::Acquire) > 0 {
567            debug!(
568                "waiting for all points to be flushed before dropping stream, {} points remaining",
569                self.unflushed_points.load(Ordering::Acquire)
570            );
571            // todo: reduce this + give up after some maximum timeout is reached
572            thread::sleep(Duration::from_millis(50));
573        }
574    }
575}
576
577fn request_dispatcher<C: WriteRequestConsumer + 'static>(
578    running: Arc<AtomicBool>,
579    unflushed_points: Arc<AtomicUsize>,
580    request_rx: crossbeam_channel::Receiver<(WriteRequestNominal, usize)>,
581    consumer: Arc<C>,
582) {
583    let mut total_request_time = 0;
584    loop {
585        match request_rx.recv() {
586            Ok((request, point_count)) => {
587                debug!("received writerequest from channel");
588                let req_start = Instant::now();
589                match consumer.consume(&request) {
590                    Ok(_) => {
591                        let time = req_start.elapsed().as_millis();
592                        debug!("request of {} points sent in {} ms", point_count, time);
593                        total_request_time += time as u64;
594                    }
595                    Err(e) => {
596                        error!("Failed to send request: {e:?}");
597                    }
598                }
599                unflushed_points.fetch_sub(point_count, Ordering::Release);
600
601                if unflushed_points.load(Ordering::Acquire) == 0 && !running.load(Ordering::Acquire)
602                {
603                    info!("all points flushed, closing dispatcher thread");
604                    // notify the processor thread that all points have been flushed
605                    drop(request_rx);
606                    break;
607                }
608            }
609            Err(e) => {
610                debug!("request channel closed, exiting dispatcher thread. info: '{e}'");
611                break;
612            }
613        }
614    }
615    debug!(
616        "request dispatcher thread exiting. total request time: {}",
617        total_request_time
618    );
619}
620
621fn points_len(points_type: &PointsType) -> usize {
622    match points_type {
623        PointsType::DoublePoints(points) => points.points.len(),
624        PointsType::StringPoints(points) => points.points.len(),
625        PointsType::IntegerPoints(points) => points.points.len(),
626    }
627}