1use std::error::Error;
2use std::fmt::Debug;
3use std::fmt::Formatter;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::sync::LazyLock;
7
8use apache_avro::types::Record;
9use apache_avro::types::Value;
10use conjure_object::ResourceIdentifier;
11use nominal_api::tonic::google::protobuf::Timestamp;
12use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
13use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
14use nominal_api::tonic::io::nominal::scout::api::proto::Points;
15use nominal_api::tonic::io::nominal::scout::api::proto::Series;
16use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
17use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
18use parking_lot::Mutex;
19use prost::Message;
20use tracing::warn;
21
22use crate::client::StreamingClient;
23use crate::client::{self};
24use crate::notifier::NominalStreamListener;
25use crate::types::AuthProvider;
26
27#[derive(Debug, thiserror::Error)]
28pub enum ConsumerError {
29 #[error("io error: {0}")]
30 IoError(#[from] std::io::Error),
31 #[error("avro error: {0}")]
32 AvroError(#[from] Box<apache_avro::Error>),
33 #[error("No auth token provided. Please make sure you're authenticated.")]
34 MissingTokenError,
35 #[error("request error: {0}")]
36 RequestError(String),
37 #[error("consumer error occurred: {0}")]
38 GenericConsumerError(#[from] Box<dyn Error + Send + Sync>),
39}
40
41pub type ConsumerResult<T> = Result<T, ConsumerError>;
42
43pub trait WriteRequestConsumer: Send + Sync + Debug {
44 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()>;
45}
46
47#[derive(Clone)]
48pub struct NominalCoreConsumer<T: AuthProvider> {
49 client: StreamingClient,
50 handle: tokio::runtime::Handle,
51 token_provider: T,
52 data_source_rid: ResourceIdentifier,
53}
54
55impl<T: AuthProvider> NominalCoreConsumer<T> {
56 pub fn new(
57 client: StreamingClient,
58 handle: tokio::runtime::Handle,
59 token_provider: T,
60 data_source_rid: ResourceIdentifier,
61 ) -> Self {
62 Self {
63 client,
64 handle,
65 token_provider,
66 data_source_rid,
67 }
68 }
69}
70
71impl<T: AuthProvider> Debug for NominalCoreConsumer<T> {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("NominalCoreConsumer")
74 .field("client", &self.client)
75 .field("data_source_rid", &self.data_source_rid)
76 .finish()
77 }
78}
79
80impl<T: AuthProvider + 'static> WriteRequestConsumer for NominalCoreConsumer<T> {
81 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
82 let token = self
83 .token_provider
84 .token()
85 .ok_or(ConsumerError::MissingTokenError)?;
86 let write_request =
87 client::encode_request(request.encode_to_vec(), &token, &self.data_source_rid)?;
88 self.handle.block_on(async {
89 self.client
90 .send(write_request)
91 .await
92 .map_err(|e| ConsumerError::RequestError(format!("{e:?}")))
93 })?;
94 Ok(())
95 }
96}
97
98const DEFAULT_FILE_PREFIX: &str = "nominal_stream";
99
100pub static CORE_SCHEMA_STR: &str = r#"{
101 "type": "record",
102 "name": "AvroStream",
103 "namespace": "io.nominal.ingest",
104 "fields": [
105 {
106 "name": "channel",
107 "type": "string",
108 "doc": "Channel/series name (e.g., 'vehicle_id', 'col_1', 'temperature')"
109 },
110 {
111 "name": "timestamps",
112 "type": {"type": "array", "items": "long"},
113 "doc": "Array of Unix timestamps in nanoseconds"
114 },
115 {
116 "name": "values",
117 "type": {"type": "array", "items": ["double", "string"]},
118 "doc": "Array of values. Can either be doubles or strings"
119 },
120 {
121 "name": "tags",
122 "type": {"type": "map", "values": "string"},
123 "default": {},
124 "doc": "Key-value metadata tags"
125 }
126 ]
127}
128"#;
129
130pub static CORE_AVRO_SCHEMA: LazyLock<apache_avro::Schema> = LazyLock::new(|| {
131 let json = serde_json::from_str(CORE_SCHEMA_STR).expect("Failed to parse JSON schema");
132 apache_avro::Schema::parse(&json).expect("Failed to parse Avro schema")
133});
134
135#[derive(Clone)]
136pub struct AvroFileConsumer {
137 writer: Arc<Mutex<apache_avro::Writer<'static, std::fs::File>>>,
138 path: PathBuf,
139}
140
141impl Debug for AvroFileConsumer {
142 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("AvroFileConsumer")
144 .field("path", &self.path)
145 .finish()
146 }
147}
148
149impl AvroFileConsumer {
150 pub fn new(
151 directory: impl Into<PathBuf>,
152 file_prefix: Option<String>,
153 ) -> std::io::Result<Self> {
154 let datetime = chrono::Utc::now().format("%Y%m%d_%H%M%S").to_string();
155 let prefix = file_prefix.unwrap_or_else(|| DEFAULT_FILE_PREFIX.to_string());
156 let filename = format!("{prefix}_{datetime}.avro");
157 let directory = directory.into();
158 let full_path = directory.join(&filename);
159
160 Self::new_with_full_path(full_path)
161 }
162
163 pub fn new_with_full_path(file_path: impl Into<PathBuf>) -> std::io::Result<Self> {
164 let path = file_path.into();
165 std::fs::create_dir_all(path.parent().unwrap_or(&path))?;
166 let file = std::fs::OpenOptions::new()
167 .create(true)
168 .truncate(false)
169 .write(true)
170 .open(&path)?;
171
172 let writer = apache_avro::Writer::builder()
173 .schema(&CORE_AVRO_SCHEMA)
174 .writer(file)
175 .codec(apache_avro::Codec::Snappy)
176 .build();
177
178 Ok(Self {
179 writer: Arc::new(Mutex::new(writer)),
180 path,
181 })
182 }
183
184 fn append_series(&self, series: &[Series]) -> ConsumerResult<()> {
185 let mut records: Vec<Record> = Vec::new();
186 for series in series {
187 let (timestamps, values) = points_to_avro(series.points.as_ref());
188
189 let mut record = Record::new(&CORE_AVRO_SCHEMA).expect("Failed to create Avro record");
190
191 record.put(
192 "channel",
193 series
194 .channel
195 .as_ref()
196 .map(|c| c.name.clone())
197 .unwrap_or("values".to_string()),
198 );
199 record.put("timestamps", Value::Array(timestamps));
200 record.put("values", Value::Array(values));
201 record.put("tags", series.tags.clone());
202
203 records.push(record);
204 }
205
206 self.writer
207 .lock()
208 .extend(records)
209 .map_err(|e| ConsumerError::AvroError(Box::new(e)))?;
210
211 Ok(())
212 }
213}
214
215fn points_to_avro(points: Option<&Points>) -> (Vec<Value>, Vec<Value>) {
216 match points {
217 Some(Points {
218 points_type: Some(PointsType::DoublePoints(DoublePoints { points })),
219 }) => points
220 .iter()
221 .map(|point| {
222 (
223 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
224 Value::Union(0, Box::new(Value::Double(point.value))),
225 )
226 })
227 .collect(),
228 Some(Points {
229 points_type: Some(PointsType::StringPoints(StringPoints { points })),
230 }) => points
231 .iter()
232 .map(|point| {
233 (
234 convert_timestamp_to_nanoseconds(point.timestamp.unwrap()),
235 Value::Union(1, Box::new(Value::String(point.value.clone()))),
236 )
237 })
238 .collect(),
239 _ => (Vec::new(), Vec::new()),
240 }
241}
242
243fn convert_timestamp_to_nanoseconds(timestamp: Timestamp) -> Value {
244 Value::Long(timestamp.seconds * 1_000_000_000 + timestamp.nanos as i64)
245}
246
247impl WriteRequestConsumer for AvroFileConsumer {
248 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
249 self.append_series(&request.series)?;
250 Ok(())
251 }
252}
253
254#[derive(Clone)]
255pub struct RequestConsumerWithFallback<P, F>
256where
257 P: WriteRequestConsumer,
258 F: WriteRequestConsumer,
259{
260 primary: P,
261 fallback: F,
262}
263
264impl<P, F> RequestConsumerWithFallback<P, F>
265where
266 P: WriteRequestConsumer,
267 F: WriteRequestConsumer,
268{
269 pub fn new(primary: P, fallback: F) -> Self {
270 Self { primary, fallback }
271 }
272}
273
274impl<P, F> Debug for RequestConsumerWithFallback<P, F>
275where
276 F: Send + Sync + WriteRequestConsumer,
277 P: Send + Sync + WriteRequestConsumer,
278{
279 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
280 f.debug_struct("RequestConsumerWithFallback")
281 .field("primary", &self.primary)
282 .field("fallback", &self.fallback)
283 .finish()
284 }
285}
286
287#[derive(Debug, Clone)]
288pub struct DualWriteRequestConsumer<P, S>
289where
290 P: WriteRequestConsumer,
291 S: WriteRequestConsumer,
292{
293 primary: P,
294 secondary: S,
295}
296
297impl<P, S> DualWriteRequestConsumer<P, S>
298where
299 P: WriteRequestConsumer,
300 S: WriteRequestConsumer,
301{
302 pub fn new(primary: P, secondary: S) -> Self {
303 Self { primary, secondary }
304 }
305}
306
307impl<P, S> WriteRequestConsumer for DualWriteRequestConsumer<P, S>
308where
309 P: WriteRequestConsumer + Send + Sync,
310 S: WriteRequestConsumer + Send + Sync,
311{
312 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
313 let primary_result = self.primary.consume(request);
314 let secondary_result = self.secondary.consume(request);
315 if let Err(e) = &primary_result {
316 warn!("Sending request to primary consumer failed: {:?}", e);
317 }
318 if let Err(e) = &secondary_result {
319 warn!("Sending request to secondary consumer failed: {:?}", e);
320 }
321
322 primary_result.and(secondary_result)
324 }
325}
326
327impl<P, F> WriteRequestConsumer for RequestConsumerWithFallback<P, F>
328where
329 P: WriteRequestConsumer + Send + Sync,
330 F: WriteRequestConsumer + Send + Sync,
331{
332 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
333 if let Err(e) = self.primary.consume(request) {
334 warn!("Sending request to primary consumer failed. Attempting fallback.");
335 let fallback_result = self.fallback.consume(request);
336 if let ConsumerError::MissingTokenError = e {
339 return Err(ConsumerError::MissingTokenError);
340 }
341 return fallback_result;
342 }
343 Ok(())
344 }
345}
346
347#[derive(Debug, Clone)]
348pub struct ListeningWriteRequestConsumer<C>
349where
350 C: WriteRequestConsumer,
351{
352 consumer: C,
353 listeners: Vec<Arc<dyn NominalStreamListener>>,
354}
355
356impl<C> ListeningWriteRequestConsumer<C>
357where
358 C: WriteRequestConsumer,
359{
360 pub fn new(consumer: C, listeners: Vec<Arc<dyn NominalStreamListener>>) -> Self {
361 Self {
362 consumer,
363 listeners,
364 }
365 }
366}
367
368impl<C> WriteRequestConsumer for ListeningWriteRequestConsumer<C>
369where
370 C: WriteRequestConsumer + Send + Sync,
371{
372 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
373 let len = request.series.len();
374 match self.consumer.consume(request) {
375 Ok(_) => Ok(()),
376 Err(e) => {
377 let message = format!("Failed to consume request of {len} series");
378
379 for listener in &self.listeners {
380 listener.on_error(&message, &e);
381 }
382
383 Err(e)
384 }
385 }
386 }
387}