nominal_streaming/
consumer.rs

1use std::error::Error;
2use std::fmt::Debug;
3use std::fmt::Formatter;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::LazyLock;
7
8use apache_avro::types::Record;
9use apache_avro::types::Value;
10use conjure_object::ResourceIdentifier;
11use nominal_api::tonic::google::protobuf::Timestamp;
12use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
13use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
14use nominal_api::tonic::io::nominal::scout::api::proto::Points;
15use nominal_api::tonic::io::nominal::scout::api::proto::Series;
16use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
17use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
18use parking_lot::Mutex;
19use prost::Message;
20use tracing::warn;
21
22use crate::client::StreamingClient;
23use crate::client::{self};
24use crate::notifier::NominalStreamListener;
25use crate::stream::AuthProvider;
26
27#[derive(Debug, thiserror::Error)]
28pub enum ConsumerError {
29    #[error("io error: {0}")]
30    IoError(#[from] std::io::Error),
31    #[error("avro error: {0}")]
32    AvroError(#[from] Box<apache_avro::Error>),
33    #[error("No auth token provided. Please make sure you're authenticated.")]
34    MissingTokenError,
35    #[error("request error: {0}")]
36    RequestError(String),
37    #[error("consumer error occurred: {0}")]
38    GenericConsumerError(#[from] Box<dyn Error + Send + Sync>),
39}
40
41pub type ConsumerResult<T> = Result<T, ConsumerError>;
42
43pub trait WriteRequestConsumer: Send + Sync + Debug {
44    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()>;
45}
46
47#[derive(Clone)]
48pub struct NominalCoreConsumer<T: AuthProvider> {
49    client: StreamingClient,
50    handle: tokio::runtime::Handle,
51    token_provider: T,
52    data_source_rid: ResourceIdentifier,
53}
54
55impl<T: AuthProvider> NominalCoreConsumer<T> {
56    pub fn new(
57        client: StreamingClient,
58        handle: tokio::runtime::Handle,
59        token_provider: T,
60        data_source_rid: ResourceIdentifier,
61    ) -> Self {
62        Self {
63            client,
64            handle,
65            token_provider,
66            data_source_rid,
67        }
68    }
69}
70
71impl<T: AuthProvider> Debug for NominalCoreConsumer<T> {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("NominalCoreConsumer")
74            .field("client", &self.client)
75            .field("data_source_rid", &self.data_source_rid)
76            .finish()
77    }
78}
79
80impl<T: AuthProvider + 'static> WriteRequestConsumer for NominalCoreConsumer<T> {
81    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
82        let token = self
83            .token_provider
84            .token()
85            .ok_or(ConsumerError::MissingTokenError)?;
86        let write_request =
87            client::encode_request(request.encode_to_vec(), &token, &self.data_source_rid)?;
88        self.handle.block_on(async {
89            self.client
90                .send(write_request)
91                .await
92                .map_err(|e| ConsumerError::RequestError(format!("{e:?}")))
93        })?;
94        Ok(())
95    }
96}
97
98const DEFAULT_FILE_PREFIX: &str = "nominal_stream";
99
100pub static CORE_SCHEMA_STR: &str = r#"{
101  "type": "record",
102  "name": "AvroStream",
103  "namespace": "io.nominal.ingest",
104  "fields": [
105      {
106          "name": "channel",
107          "type": "string",
108          "doc": "Channel/series name (e.g., 'vehicle_id', 'col_1', 'temperature')"
109      },
110      {
111          "name": "timestamps",
112          "type": {"type": "array", "items": "long"},
113          "doc": "Array of Unix timestamps in nanoseconds"
114      },
115      {
116          "name": "values",
117          "type": {"type": "array", "items": ["double", "string"]},
118          "doc": "Array of values. Can either be doubles or strings"
119      },
120      {
121          "name": "tags",
122          "type": {"type": "map", "values": "string"},
123          "default": {},
124          "doc": "Key-value metadata tags"
125      }
126  ]
127}
128"#;
129
130pub static CORE_AVRO_SCHEMA: LazyLock<apache_avro::Schema> = LazyLock::new(|| {
131    let json = serde_json::from_str(CORE_SCHEMA_STR).expect("Failed to parse JSON schema");
132    apache_avro::Schema::parse(&json).expect("Failed to parse Avro schema")
133});
134
135#[derive(Clone)]
136pub struct AvroFileConsumer {
137    writer: Arc<Mutex<apache_avro::Writer<'static, std::fs::File>>>,
138    path: PathBuf,
139}
140
141impl Debug for AvroFileConsumer {
142    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
143        f.debug_struct("AvroFileConsumer")
144            .field("path", &self.path)
145            .finish()
146    }
147}
148
149impl AvroFileConsumer {
150    pub fn new(
151        directory: impl Into<PathBuf>,
152        file_prefix: Option<String>,
153    ) -> std::io::Result<Self> {
154        let datetime = chrono::Utc::now().format("%Y%m%d_%H%M%S").to_string();
155        let prefix = file_prefix.unwrap_or_else(|| DEFAULT_FILE_PREFIX.to_string());
156        let filename = format!("{prefix}_{datetime}.avro");
157        let directory = directory.into();
158        let full_path = directory.join(&filename);
159
160        Self::new_with_full_path(full_path)
161    }
162
163    pub fn new_with_full_path(file_path: impl Into<PathBuf>) -> std::io::Result<Self> {
164        let path = file_path.into();
165        std::fs::create_dir_all(path.parent().unwrap_or(&path))?;
166        let file = std::fs::OpenOptions::new()
167            .create(true)
168            .truncate(false)
169            .write(true)
170            .open(&path)?;
171
172        let writer = apache_avro::Writer::builder()
173            .schema(&CORE_AVRO_SCHEMA)
174            .writer(file)
175            .codec(apache_avro::Codec::Snappy)
176            .build();
177
178        Ok(Self {
179            writer: Arc::new(Mutex::new(writer)),
180            path,
181        })
182    }
183
184    fn append_series(&self, series: &[Series]) -> ConsumerResult<()> {
185        let mut records: Vec<Record> = Vec::new();
186        for series in series {
187            let (timestamps, values) = points_to_avro(series.points.as_ref());
188
189            let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
190
191            record.put(
192                "channel",
193                series
194                    .channel
195                    .as_ref()
196                    .map(|c| c.name.clone())
197                    .unwrap_or("values".to_string()),
198            );
199            record.put("timestamps", Value::Array(timestamps));
200            record.put("values", Value::Array(values));
201            record.put("tags", series.tags.clone());
202
203            records.push(record);
204        }
205
206        self.writer
207            .lock()
208            .extend(records)
209            .map_err(|e| ConsumerError::AvroError(Box::new(e)))?;
210
211        Ok(())
212    }
213}
214
215fn points_to_avro(points: Option<&Points>) -> (Vec<Value>, Vec<Value>) {
216    match points {
217        Some(Points {
218            points_type: Some(PointsType::DoublePoints(DoublePoints { points })),
219        }) => points
220            .iter()
221            .map(|point| {
222                (
223                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
224                    Value::Union(0, Box::new(Value::Double(point.value))),
225                )
226            })
227            .collect(),
228        Some(Points {
229            points_type: Some(PointsType::StringPoints(StringPoints { points })),
230        }) => points
231            .iter()
232            .map(|point| {
233                (
234                    convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
235                    Value::Union(1, Box::new(Value::String(point.value.clone()))),
236                )
237            })
238            .collect(),
239        _ => (Vec::new(), Vec::new()),
240    }
241}
242
243fn convert_timestamp_to_nanoseconds(timestamp: Timestamp) -> Value {
244    Value::Long(timestamp.seconds * 1_000_000_000 + timestamp.nanos as i64)
245}
246
247impl WriteRequestConsumer for AvroFileConsumer {
248    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
249        self.append_series(&request.series)?;
250        Ok(())
251    }
252}
253
254#[derive(Clone)]
255pub struct RequestConsumerWithFallback<P, F>
256where
257    P: WriteRequestConsumer,
258    F: WriteRequestConsumer,
259{
260    primary: P,
261    fallback: F,
262}
263
264impl<P, F> RequestConsumerWithFallback<P, F>
265where
266    P: WriteRequestConsumer,
267    F: WriteRequestConsumer,
268{
269    pub fn new(primary: P, fallback: F) -> Self {
270        Self { primary, fallback }
271    }
272}
273
274impl<P, F> Debug for RequestConsumerWithFallback<P, F>
275where
276    F: Send + Sync + WriteRequestConsumer,
277    P: Send + Sync + WriteRequestConsumer,
278{
279    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
280        f.debug_struct("RequestConsumerWithFallback")
281            .field("primary", &self.primary)
282            .field("fallback", &self.fallback)
283            .finish()
284    }
285}
286
287#[derive(Debug, Clone)]
288pub struct DualWriteRequestConsumer<P, S>
289where
290    P: WriteRequestConsumer,
291    S: WriteRequestConsumer,
292{
293    primary: P,
294    secondary: S,
295}
296
297impl<P, S> DualWriteRequestConsumer<P, S>
298where
299    P: WriteRequestConsumer,
300    S: WriteRequestConsumer,
301{
302    pub fn new(primary: P, secondary: S) -> Self {
303        Self { primary, secondary }
304    }
305}
306
307impl<P, S> WriteRequestConsumer for DualWriteRequestConsumer<P, S>
308where
309    P: WriteRequestConsumer + Send + Sync,
310    S: WriteRequestConsumer + Send + Sync,
311{
312    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
313        let primary_result = self.primary.consume(request);
314        let secondary_result = self.secondary.consume(request);
315        if let Err(e) = &primary_result {
316            warn!("Sending request to primary consumer failed: {:?}", e);
317        }
318        if let Err(e) = &secondary_result {
319            warn!("Sending request to secondary consumer failed: {:?}", e);
320        }
321
322        // If either failed, return the error
323        primary_result.and(secondary_result)
324    }
325}
326
327impl<P, F> WriteRequestConsumer for RequestConsumerWithFallback<P, F>
328where
329    P: WriteRequestConsumer + Send + Sync,
330    F: WriteRequestConsumer + Send + Sync,
331{
332    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
333        if let Err(e) = self.primary.consume(request) {
334            warn!("Sending request to primary consumer failed. Attempting fallback.");
335            let fallback_result = self.fallback.consume(request);
336            // we want to notify the caller about the missing token error as it is a user error
337            // todo: get rid of this once we figure out why the auth handle blocks in connect
338            if let ConsumerError::MissingTokenError = e {
339                return Err(ConsumerError::MissingTokenError);
340            }
341            return fallback_result;
342        }
343        Ok(())
344    }
345}
346
347#[derive(Debug, Clone)]
348pub struct ListeningWriteRequestConsumer<C>
349where
350    C: WriteRequestConsumer,
351{
352    consumer: C,
353    listeners: Vec<Arc<dyn NominalStreamListener>>,
354}
355
356impl<C> ListeningWriteRequestConsumer<C>
357where
358    C: WriteRequestConsumer,
359{
360    pub fn new(consumer: C, listeners: Vec<Arc<dyn NominalStreamListener>>) -> Self {
361        Self {
362            consumer,
363            listeners,
364        }
365    }
366}
367
368impl<C> WriteRequestConsumer for ListeningWriteRequestConsumer<C>
369where
370    C: WriteRequestConsumer + Send + Sync,
371{
372    fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
373        let len = request.series.len();
374        match self.consumer.consume(request) {
375            Ok(_) => Ok(()),
376            Err(e) => {
377                let message = format!("Failed to consume request of {len} series");
378
379                for listener in &self.listeners {
380                    listener.on_error(&message, &e);
381                }
382
383                Err(e)
384            }
385        }
386    }
387}