1pub mod client;
2pub mod consumer;
3pub mod notifier;
4pub mod stream;
5mod types;
6pub mod upload;
7
8pub mod prelude {
10 pub use conjure_object::BearerToken;
11 pub use conjure_object::ResourceIdentifier;
12 pub use nominal_api::tonic::google::protobuf::Timestamp;
13 pub use nominal_api::tonic::io::nominal::scout::api::proto::points::PointsType;
14 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoint;
15 pub use nominal_api::tonic::io::nominal::scout::api::proto::DoublePoints;
16 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
17 pub use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoints;
18 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoint;
19 pub use nominal_api::tonic::io::nominal::scout::api::proto::StringPoints;
20 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequest;
21 pub use nominal_api::tonic::io::nominal::scout::api::proto::WriteRequestNominal;
22
23 pub use crate::consumer::NominalCoreConsumer;
24 pub use crate::stream::NominalDatasetStream;
25 pub use crate::stream::NominalDatasourceStream;
26 pub use crate::stream::NominalStreamOpts;
27 pub use crate::types::ChannelDescriptor;
28}
29
30#[cfg(test)]
31mod tests {
32 use std::collections::HashMap;
33 use std::sync::Arc;
34 use std::sync::Mutex;
35 use std::thread;
36 use std::time::Duration;
37 use std::time::UNIX_EPOCH;
38
39 use nominal_api::tonic::io::nominal::scout::api::proto::IntegerPoint;
40
41 use crate::client::PRODUCTION_API_URL;
42 use crate::consumer::ConsumerResult;
43 use crate::consumer::WriteRequestConsumer;
44 use crate::prelude::*;
45 use crate::types::IntoTimestamp;
46
47 #[derive(Debug)]
48 struct TestDatasourceStream {
49 requests: Mutex<Vec<WriteRequestNominal>>,
50 }
51
52 impl WriteRequestConsumer for Arc<TestDatasourceStream> {
53 fn consume(&self, request: &WriteRequestNominal) -> ConsumerResult<()> {
54 self.requests.lock().unwrap().push(request.clone());
55 Ok(())
56 }
57 }
58
59 fn create_test_stream() -> (Arc<TestDatasourceStream>, NominalDatasetStream) {
60 let test_consumer = Arc::new(TestDatasourceStream {
61 requests: Mutex::new(vec![]),
62 });
63 let stream = NominalDatasetStream::new_with_consumer(
64 test_consumer.clone(),
65 NominalStreamOpts {
66 max_points_per_record: 1000,
67 max_request_delay: Duration::from_millis(100),
68 max_buffered_requests: 2,
69 request_dispatcher_tasks: 4,
70 base_api_url: PRODUCTION_API_URL.to_string(),
71 },
72 );
73
74 (test_consumer, stream)
75 }
76
77 #[test]
78 fn test_stream() {
79 let (test_consumer, stream) = create_test_stream();
80
81 for batch in 0..5 {
82 let mut points = Vec::new();
83 for i in 0..1000 {
84 let start_time = UNIX_EPOCH.elapsed().unwrap();
85 points.push(DoublePoint {
86 timestamp: Some(Timestamp {
87 seconds: start_time.as_secs() as i64,
88 nanos: start_time.subsec_nanos() as i32 + i,
89 }),
90 value: (i % 50) as f64,
91 });
92 }
93
94 stream.enqueue(
95 &ChannelDescriptor::with_tags("channel_1", [("batch_id", batch.to_string())]),
96 points,
97 );
98 }
99
100 drop(stream); let requests = test_consumer.requests.lock().unwrap();
103
104 assert_eq!(requests.len(), 5);
107 let series = requests.first().unwrap().series.first().unwrap();
108 if let Some(PointsType::DoublePoints(points)) =
109 series.points.as_ref().unwrap().points_type.as_ref()
110 {
111 assert_eq!(points.points.len(), 1000);
112 } else {
113 panic!("unexpected data type");
114 }
115 }
116
117 #[test]
118 fn test_stream_types() {
119 let (test_consumer, stream) = create_test_stream();
120
121 for batch in 0..5 {
122 let mut doubles = Vec::new();
123 let mut strings = Vec::new();
124 let mut ints = Vec::new();
125 for i in 0..1000 {
126 let start_time = UNIX_EPOCH.elapsed().unwrap();
127 doubles.push(DoublePoint {
128 timestamp: Some(start_time.into_timestamp()),
129 value: (i % 50) as f64,
130 });
131 strings.push(StringPoint {
132 timestamp: Some(start_time.into_timestamp()),
133 value: format!("{}", i % 50),
134 });
135 ints.push(IntegerPoint {
136 timestamp: Some(start_time.into_timestamp()),
137 value: i % 50,
138 })
139 }
140
141 stream.enqueue(
142 &ChannelDescriptor::with_tags("double", [("batch_id", batch.to_string())]),
143 doubles,
144 );
145 stream.enqueue(
146 &ChannelDescriptor::with_tags("string", [("batch_id", batch.to_string())]),
147 strings,
148 );
149 stream.enqueue(
150 &ChannelDescriptor::with_tags("int", [("batch_id", batch.to_string())]),
151 ints,
152 );
153 }
154
155 drop(stream); let requests = test_consumer.requests.lock().unwrap();
158
159 assert_eq!(requests.len(), 15);
162
163 let r = requests
164 .iter()
165 .flat_map(|r| r.series.clone())
166 .map(|s| {
167 (
168 s.channel.unwrap().name,
169 s.points.unwrap().points_type.unwrap(),
170 )
171 })
172 .collect::<HashMap<_, _>>();
173 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
174 panic!("invalid double points type");
175 };
176
177 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
178 panic!("invalid int points type");
179 };
180
181 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
182 panic!("invalid string points type");
183 };
184
185 assert_eq!(dp.points.len(), 1000);
187 assert_eq!(sp.points.len(), 1000);
188 assert_eq!(ip.points.len(), 1000);
189 }
190
191 #[test_log::test]
192 fn test_writer() {
193 let (test_consumer, stream) = create_test_stream();
194
195 let cd = ChannelDescriptor::new("channel_1");
196 let mut writer = stream.double_writer(&cd);
197
198 for i in 0..5000 {
199 let start_time = UNIX_EPOCH.elapsed().unwrap();
200 let value = i % 50;
201 writer.push(start_time, value as f64);
202 }
203
204 drop(writer); drop(stream); let requests = test_consumer.requests.lock().unwrap();
208
209 assert_eq!(requests.len(), 5);
210 let series = requests.first().unwrap().series.first().unwrap();
211 if let Some(PointsType::DoublePoints(points)) =
212 series.points.as_ref().unwrap().points_type.as_ref()
213 {
214 assert_eq!(points.points.len(), 1000);
215 } else {
216 panic!("unexpected data type");
217 }
218 }
219
220 #[test_log::test]
221 fn test_time_flush() {
222 let (test_consumer, stream) = create_test_stream();
223
224 let cd = ChannelDescriptor::new("channel_1");
225 let mut writer = stream.double_writer(&cd);
226
227 writer.push(UNIX_EPOCH.elapsed().unwrap(), 1.0);
228 thread::sleep(Duration::from_millis(101));
229 writer.push(UNIX_EPOCH.elapsed().unwrap(), 2.0); thread::sleep(Duration::from_millis(101));
231 writer.push(UNIX_EPOCH.elapsed().unwrap(), 3.0); drop(writer);
234 drop(stream);
235
236 let requests = test_consumer.requests.lock().unwrap();
237 dbg!(&requests);
238 assert_eq!(requests.len(), 2);
239 }
240
241 #[test_log::test]
242 fn test_writer_types() {
243 let (test_consumer, stream) = create_test_stream();
244
245 let cd1 = ChannelDescriptor::new("double");
246 let cd2 = ChannelDescriptor::new("string");
247 let cd3 = ChannelDescriptor::new("int");
248 let mut double_writer = stream.double_writer(&cd1);
249 let mut string_writer = stream.string_writer(&cd2);
250 let mut integer_writer = stream.integer_writer(&cd3);
251
252 for i in 0..5000 {
253 let start_time = UNIX_EPOCH.elapsed().unwrap();
254 let value = i % 50;
255 double_writer.push(start_time, value as f64);
256 string_writer.push(start_time, format!("{}", value));
257 integer_writer.push(start_time, value);
258 }
259
260 drop(double_writer);
261 drop(string_writer);
262 drop(integer_writer);
263 drop(stream);
264
265 let requests = test_consumer.requests.lock().unwrap();
266
267 assert_eq!(requests.len(), 15);
268
269 let r = requests
270 .iter()
271 .flat_map(|r| r.series.clone())
272 .map(|s| {
273 (
274 s.channel.unwrap().name,
275 s.points.unwrap().points_type.unwrap(),
276 )
277 })
278 .collect::<HashMap<_, _>>();
279
280 let PointsType::DoublePoints(dp) = r.get("double").unwrap() else {
281 panic!("invalid double points type");
282 };
283
284 let PointsType::IntegerPoints(ip) = r.get("int").unwrap() else {
285 panic!("invalid int points type");
286 };
287
288 let PointsType::StringPoints(sp) = r.get("string").unwrap() else {
289 panic!("invalid string points type");
290 };
291
292 assert_eq!(dp.points.len(), 1000);
294 assert_eq!(sp.points.len(), 1000);
295 assert_eq!(ip.points.len(), 1000);
296 }
297}