1use crate::batch_update::BatchUpdate;
2use crate::consolidation_function::ConsolidationFunction;
3use crate::create::*;
4use crate::errors::RRDCachedClientError;
5use crate::fetch::FetchResponse;
6use crate::parsers::*;
7use crate::sanitisation::check_rrd_path;
8use std::collections::HashMap;
9use tokio::io::AsyncBufReadExt;
10use tokio::io::AsyncWriteExt;
11use tokio::net::UnixStream;
12use tokio::{io::BufReader, net::TcpStream};
13
14#[derive(Debug)]
16pub struct RRDCachedClient<T = TcpStream> {
17 stream: BufReader<T>,
18}
19
20impl RRDCachedClient<TcpStream> {
21 pub async fn connect_tcp(addr: &str) -> Result<Self, RRDCachedClientError> {
23 let stream = TcpStream::connect(addr).await?;
24 let stream = BufReader::new(stream);
25 Ok(Self { stream })
26 }
27}
28
29impl RRDCachedClient<UnixStream> {
30 pub async fn connect_unix(addr: &str) -> Result<Self, RRDCachedClientError> {
32 let stream = UnixStream::connect(addr).await?;
33 let stream = BufReader::new(stream);
34 Ok(Self { stream })
35 }
36}
37
38impl<T> RRDCachedClient<T>
39where
40 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
41{
42 fn assert_response_code(&self, code: i64, message: &str) -> Result<(), RRDCachedClientError> {
43 if code < 0 {
44 Err(RRDCachedClientError::UnexpectedResponse(
45 code,
46 message.to_string(),
47 ))
48 } else {
49 Ok(())
50 }
51 }
52
53 async fn read_line(&mut self) -> Result<String, RRDCachedClientError> {
54 let mut line = String::new();
55 self.stream.read_line(&mut line).await?;
56 Ok(line)
57 }
58
59 async fn read_n_lines(&mut self, n: usize) -> Result<Vec<String>, RRDCachedClientError> {
60 let mut lines = Vec::with_capacity(n);
61 for _ in 0..n {
62 lines.push(self.read_line().await?);
63 }
64 Ok(lines)
65 }
66
67 async fn send_command(
68 &mut self,
69 command: &str,
70 ) -> Result<(usize, String), RRDCachedClientError> {
71 self.stream.write_all(command.as_bytes()).await?;
73 let response_line = self.read_line().await?;
75 let (code, message) = parse_response_line(&response_line)?;
76 self.assert_response_code(code, message)?;
77 let nb_lines = usize::try_from(code).map_err(|_| {
78 RRDCachedClientError::UnexpectedResponse(code, "invalid number of lines".to_string())
79 })?;
80 Ok((nb_lines, message.to_string()))
81 }
82
83 pub async fn help(
87 &mut self,
88 command: Option<&str>,
89 ) -> Result<(String, Vec<String>), RRDCachedClientError> {
90 let command = match command {
91 Some(command) => {
92 let mut a = String::with_capacity(6 + command.len());
93 a.push_str("HELP ");
94 a.push_str(command);
95 a.push('\n');
96 a
97 }
98 None => "HELP\n".to_string(),
99 };
100 let (nb_lines, header) = self.send_command(&command).await?;
101 let lines = self.read_n_lines(nb_lines).await?;
102
103 Ok((header, lines))
104 }
105
106 pub async fn ping(&mut self) -> Result<(), RRDCachedClientError> {
108 let (_, message) = self.send_command("PING\n").await?;
109 assert!(message == "PONG");
110 Ok(())
111 }
112
113 pub async fn create(&mut self, arguments: CreateArguments) -> Result<(), RRDCachedClientError> {
115 let arguments_str = arguments.to_str();
116 let mut command = String::with_capacity(7 + arguments_str.len());
117 command.push_str("CREATE ");
118 command.push_str(&arguments_str);
119 command.push('\n');
120 let (_, message) = self.send_command(&command).await?;
121 if message != "RRD created OK" {
122 return Err(RRDCachedClientError::UnexpectedResponse(
123 0,
124 message.to_string(),
125 ));
126 }
127 Ok(())
128 }
129
130 pub async fn flush(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
132 check_rrd_path(path)?;
133 let mut command = String::with_capacity(6 + path.len() + 5);
134 command.push_str("FLUSH ");
135 command.push_str(path);
136 command.push_str(".rrd\n");
137 let _ = self.send_command(&command).await?;
138 Ok(())
139 }
140
141 pub async fn flush_all(&mut self) -> Result<(), RRDCachedClientError> {
143 let _ = self.send_command("FLUSHALL\n").await?;
144 Ok(())
145 }
146
147 pub async fn pending(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
149 check_rrd_path(path)?;
150 let mut command = String::with_capacity(7 + path.len() + 5);
151 command.push_str("PENDING ");
152 command.push_str(path);
153 command.push_str(".rrd\n");
154 let (nb_lines, _) = self.send_command(&command).await?;
155 if nb_lines > 0 {
156 let lines = self.read_n_lines(nb_lines).await?;
157 Ok(lines)
158 } else {
159 Ok(vec![])
160 }
161 }
162
163 pub async fn forget(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
165 check_rrd_path(path)?;
166 let mut command = String::with_capacity(7 + path.len() + 5);
167 command.push_str("FORGET ");
168 command.push_str(path);
169 command.push_str(".rrd\n");
170 let _ = self.send_command(&command).await?;
171 Ok(())
172 }
173
174 pub async fn queue(&mut self) -> Result<Vec<(String, usize)>, RRDCachedClientError> {
176 let (nb_lines, _message) = self.send_command("QUEUE\n").await?;
177 let nb_lines = self.read_n_lines(nb_lines).await?;
178 let parsed_lines = nb_lines
179 .iter()
180 .map(|line| {
181 let (path, pending) = parse_queue_line(line)?;
182 Ok((path.to_string(), pending))
183 })
184 .collect::<Result<Vec<(String, usize)>, RRDCachedClientError>>()?;
185 Ok(parsed_lines)
186 }
187
188 pub async fn stats(&mut self) -> Result<HashMap<String, i64>, RRDCachedClientError> {
190 let (nb_lines, _message) = self.send_command("STATS\n").await?;
191 let lines = self.read_n_lines(nb_lines).await?;
192 let parsed_lines = lines
193 .iter()
194 .map(|line| {
195 let (name, value) = parse_stats_line(line)?;
196 Ok((name.to_string(), value))
197 })
198 .collect::<Result<HashMap<String, i64>, RRDCachedClientError>>()?;
199 Ok(parsed_lines)
200 }
201
202 pub async fn first(
204 &mut self,
205 path: &str,
206 round_robin_archive: Option<usize>,
207 ) -> Result<usize, RRDCachedClientError> {
208 check_rrd_path(path)?;
209 let round_robin_archive = round_robin_archive.unwrap_or(0);
210 let rranum_str = round_robin_archive.to_string();
211
212 let mut command = String::with_capacity(6 + path.len() + 5 + rranum_str.len() + 1);
213 command.push_str("FIRST ");
214 command.push_str(path);
215 command.push_str(".rrd ");
216 command.push_str(&rranum_str);
217 command.push('\n');
218 let (_, message) = self.send_command(&command).await?;
219 let timestamp = parse_timestamp(&message)?;
220 Ok(timestamp)
221 }
222
223 pub async fn last(&mut self, path: &str) -> Result<usize, RRDCachedClientError> {
225 check_rrd_path(path)?;
226
227 let mut command = String::with_capacity(5 + path.len() + 5);
228 command.push_str("LAST ");
229 command.push_str(path);
230 command.push_str(".rrd\n");
231 let (_, message) = self.send_command(&command).await?;
232 let timestamp = parse_timestamp(&message)?;
233 Ok(timestamp)
234 }
235
236 pub async fn info(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
238 check_rrd_path(path)?;
239 let mut command = String::with_capacity(5 + path.len() + 5);
240 command.push_str("INFO ");
241 command.push_str(path);
242 command.push_str(".rrd\n");
243 let (nb_lines, _message) = self.send_command(&command).await?;
244 let lines = self.read_n_lines(nb_lines).await?;
245 Ok(lines)
246 }
247
248 pub async fn list(
250 &mut self,
251 recursive: bool,
252 path: Option<&str>,
253 ) -> Result<Vec<String>, RRDCachedClientError> {
254 let path = path.unwrap_or("/");
255 let mut command =
256 String::with_capacity(5 + path.len() + 1 + (if recursive { 10 } else { 0 }));
257 command.push_str("LIST ");
258 if recursive {
259 command.push_str("RECURSIVE ");
260 }
261 command.push_str(path);
262 command.push('\n');
263 let (nb_lines, _message) = self.send_command(&command).await?;
264 let lines = self.read_n_lines(nb_lines).await?;
265 Ok(lines)
266 }
267
268 pub async fn suspend(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
270 check_rrd_path(path)?;
271 let mut command = String::with_capacity(8 + path.len() + 5);
272 command.push_str("SUSPEND ");
273 command.push_str(path);
274 command.push_str(".rrd\n");
275 let _ = self.send_command(&command).await?;
276 Ok(())
277 }
278
279 pub async fn resume(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
281 check_rrd_path(path)?;
282 let mut command = String::with_capacity(7 + path.len() + 5);
283 command.push_str("RESUME ");
284 command.push_str(path);
285 command.push_str(".rrd\n");
286 let _ = self.send_command(&command).await?;
287 Ok(())
288 }
289
290 pub async fn suspend_all(&mut self) -> Result<(), RRDCachedClientError> {
292 let _ = self.send_command("SUSPENDALL\n").await?;
293 Ok(())
294 }
295
296 pub async fn resume_all(&mut self) -> Result<(), RRDCachedClientError> {
298 let _ = self.send_command("RESUMEALL\n").await?;
299 Ok(())
300 }
301
302 pub async fn quit(&mut self) -> Result<(), RRDCachedClientError> {
304 self.stream.write_all("QUIT\n".as_bytes()).await?;
306 Ok(())
307 }
308
309 pub async fn update(
313 &mut self,
314 path: &str,
315 timestamp: Option<usize>,
316 data: Vec<f64>,
317 ) -> Result<(), RRDCachedClientError> {
318 let command = BatchUpdate::new(path, timestamp, data)?;
319 let command_str = command.to_command_string()?;
320 let _ = self.send_command(&command_str).await?;
321 Ok(())
322 }
323
324 pub async fn update_one(
328 &mut self,
329 path: &str,
330 timestamp: Option<usize>,
331 data: f64,
332 ) -> Result<(), RRDCachedClientError> {
333 self.update(path, timestamp, vec![data]).await
334 }
335
336 pub async fn batch(&mut self, commands: Vec<BatchUpdate>) -> Result<(), RRDCachedClientError> {
342 let _ = self.send_command("BATCH\n").await?;
343 for command in commands {
344 let command_str = command.to_command_string()?;
345 self.stream.write_all(command_str.as_bytes()).await?;
347 }
348 let (nb_lines, message) = self.send_command(".\n").await?;
350
351 if nb_lines > 0 {
353 let lines = self.read_n_lines(nb_lines).await?;
354 return Err(RRDCachedClientError::BatchUpdateErrorResponse(
355 message, lines,
356 ));
357 }
358 Ok(())
359 }
360
361 pub async fn fetch(
369 &mut self,
370 path: &str,
371 consolidation_function: ConsolidationFunction,
372 start: Option<i64>,
373 end: Option<i64>,
374 columns: Option<Vec<String>>,
375 ) -> Result<FetchResponse, RRDCachedClientError> {
376 check_rrd_path(path)?;
377 let consolidation_function_str = consolidation_function.to_str();
378 let mut capacity = 6 + path.len() + 5 + consolidation_function_str.len() + 1;
380 let mut start_str: Option<String> = None;
381 let mut end_str: Option<String> = None;
382 let mut columns_str: Option<String> = None;
383 match start {
384 Some(start) => {
385 let start_string = start.to_string();
386 capacity += 1 + start_string.len();
387 start_str = Some(start_string);
388
389 if let Some(end) = end {
390 let end_string = end.to_string();
391 capacity += 1 + end_string.len();
392 end_str = Some(end_string);
393 if let Some(columns) = columns {
394 let columns_string = columns.join(" ");
395 capacity += 1 + columns_string.len();
396 columns_str = Some(columns_string);
397 }
398 } else if columns.is_some() {
399 return Err(RRDCachedClientError::InvalidFetch(
400 "end must be specified".to_string(),
401 ));
402 }
403 }
404 None => {
405 if end.is_some() || columns.is_some() {
406 return Err(RRDCachedClientError::InvalidFetch(
407 "start must be specified".to_string(),
408 ));
409 }
410 }
411 }
412 let mut command = String::with_capacity(capacity);
413 command.push_str("FETCH ");
414 command.push_str(path);
415 command.push_str(".rrd ");
416 command.push_str(consolidation_function_str);
417 if let Some(start_str) = start_str {
418 command.push(' ');
419 command.push_str(&start_str);
420 if let Some(end_str) = end_str {
421 command.push(' ');
422 command.push_str(&end_str);
423 if let Some(columns_str) = columns_str {
424 command.push(' ');
425 command.push_str(&columns_str);
426 }
427 }
428 }
429 command.push('\n');
430 assert!(command.len() == capacity);
431
432 let (nb_lines, _message) = self.send_command(&command).await?;
433 let lines = self.read_n_lines(nb_lines).await?;
434
435 let response = FetchResponse::from_lines(lines)?;
436
437 Ok(response)
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use crate::now::now_timestamp;
444
445 use super::*;
446 use serial_test::serial;
447
448 #[tokio::test]
449 async fn test_ping_tcp() {
450 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
451 .await
452 .unwrap();
453 client.ping().await.unwrap();
454 }
455
456 #[tokio::test]
457 async fn test_ping_unix() {
458 let mut client = RRDCachedClient::connect_unix("./rrdcached.sock")
459 .await
460 .unwrap();
461 client.ping().await.unwrap();
462 }
463
464 #[tokio::test]
465 async fn test_help() {
466 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
467 .await
468 .unwrap();
469
470 let (header, lines) = client.help(None).await.unwrap();
471 assert_eq!(header, "Command overview");
472 assert_eq!(lines.len(), 22);
473
474 let (header, lines) = client.help(Some("PING")).await.unwrap();
475 assert_eq!(header, "Help for PING");
476 assert!(!lines.is_empty());
477 }
478
479 async fn create_simple_rrd(client: &mut RRDCachedClient<TcpStream>, name: String) {
480 client
481 .create(CreateArguments {
482 path: name,
483 data_sources: vec![CreateDataSource {
484 name: "ds1".to_string(),
485 minimum: None,
486 maximum: None,
487 heartbeat: 10,
488 serie_type: CreateDataSourceType::Gauge,
489 }],
490 round_robin_archives: vec![CreateRoundRobinArchive {
491 consolidation_function: ConsolidationFunction::Average,
492 xfiles_factor: 0.5,
493 steps: 1,
494 rows: 100,
495 }],
496 start_timestamp: 1609459200,
497 step_seconds: 1,
498 })
499 .await
500 .unwrap();
501 }
502
503 #[tokio::test]
504 async fn test_create() {
505 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
506 .await
507 .unwrap();
508
509 client
510 .create(CreateArguments {
511 path: "test-create".to_string(),
512 data_sources: vec![
513 CreateDataSource {
514 name: "ds1".to_string(),
515 minimum: None,
516 maximum: None,
517 heartbeat: 10,
518 serie_type: CreateDataSourceType::Gauge,
519 },
520 CreateDataSource {
521 name: "ds2".to_string(),
522 minimum: Some(0.0),
523 maximum: Some(100.0),
524 heartbeat: 10,
525 serie_type: CreateDataSourceType::Gauge,
526 },
527 ],
528 round_robin_archives: vec![
529 CreateRoundRobinArchive {
530 consolidation_function: ConsolidationFunction::Average,
531 xfiles_factor: 0.5,
532 steps: 1,
533 rows: 10,
534 },
535 CreateRoundRobinArchive {
536 consolidation_function: ConsolidationFunction::Average,
537 xfiles_factor: 0.5,
538 steps: 10,
539 rows: 10,
540 },
541 ],
542 start_timestamp: 1609459200,
543 step_seconds: 1,
544 })
545 .await
546 .unwrap();
547 }
548
549 #[tokio::test]
550 async fn test_update() {
551 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
552 .await
553 .unwrap();
554
555 create_simple_rrd(&mut client, "test-update".to_string()).await;
556
557 client.update_one("test-update", None, 4.2).await.unwrap();
558 }
559
560 #[tokio::test]
561 async fn test_error() {
562 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
563 .await
564 .unwrap();
565
566 let result = client.list(false, Some("not-found-path")).await;
567 assert!(result.is_err());
568 }
569
570 #[tokio::test]
571 async fn test_flush() {
572 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
573 .await
574 .unwrap();
575
576 create_simple_rrd(&mut client, "test-flush".to_string()).await;
577
578 client.update_one("test-flush", None, 4.2).await.unwrap();
579 client.flush("test-flush").await.unwrap();
580 }
581
582 #[tokio::test]
583 async fn test_flush_all() {
584 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
585 .await
586 .unwrap();
587
588 client.flush_all().await.unwrap();
589 }
590
591 #[serial]
592 #[tokio::test]
593 async fn test_pending() {
594 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
596
597 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
598 .await
599 .unwrap();
600
601 create_simple_rrd(&mut client, "test-pending".to_string()).await;
602
603 client.update_one("test-pending", None, 4.2).await.unwrap();
604
605 let lines = client.pending("test-pending").await.unwrap();
606 assert_eq!(lines.len(), 1);
607
608 client.flush("test-pending").await.unwrap();
610 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
612
613 let lines = client.pending("test-pending").await.unwrap();
614 assert!(lines.is_empty());
615 }
616
617 #[tokio::test]
618 async fn test_forget() {
619 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
620 .await
621 .unwrap();
622
623 create_simple_rrd(&mut client, "test-forget".to_string()).await;
624
625 client.update_one("test-forget", None, 4.2).await.unwrap();
626 client.forget("test-forget").await.unwrap();
627 }
628
629 #[tokio::test]
630 async fn test_queue() {
631 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
632 .await
633 .unwrap();
634
635 let lines = client.queue().await.unwrap();
636
637 assert!(lines.is_empty());
639 }
640
641 #[tokio::test]
642 async fn test_stats() {
643 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
644 .await
645 .unwrap();
646
647 let stats = client.stats().await.unwrap();
648 assert!(!stats.is_empty());
649 }
650
651 #[tokio::test]
652 async fn test_first_and_last() {
653 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
654 .await
655 .unwrap();
656
657 create_simple_rrd(&mut client, "test-first".to_string()).await;
658
659 let _ = client.update_one("test-first", Some(1612345678), 4.2).await;
661
662 let timestamp = client.first("test-first", None).await.unwrap();
663 assert_eq!(timestamp, 1609459101); let timestamp = client.last("test-first").await.unwrap();
666 assert_eq!(timestamp, 1612345678);
667 }
668
669 #[tokio::test]
670 async fn test_info() {
671 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
672 .await
673 .unwrap();
674
675 create_simple_rrd(&mut client, "test-info".to_string()).await;
676 let lines = client.info("test-info").await.unwrap();
677 assert!(!lines.is_empty());
678 }
679
680 #[tokio::test]
681 async fn test_list() {
682 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
683 .await
684 .unwrap();
685
686 let lines = client.list(true, None).await.unwrap();
687 assert!(!lines.is_empty());
688 }
689
690 #[serial]
691 #[tokio::test]
692 async fn test_suspend_and_resume() {
693 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
694 .await
695 .unwrap();
696
697 create_simple_rrd(&mut client, "test-suspend".to_string()).await;
698
699 client.update_one("test-suspend", None, 4.2).await.unwrap();
701 client.flush("test-suspend").await.unwrap();
702
703 client.suspend("test-suspend").await.unwrap();
704 client.resume("test-suspend").await.unwrap();
705 }
706
707 #[serial]
708 #[tokio::test]
709 async fn test_suspend_all_and_resume_all() {
710 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
711 .await
712 .unwrap();
713
714 client.suspend_all().await.unwrap();
715 client.resume_all().await.unwrap();
716 }
717
718 #[tokio::test]
719 async fn test_quit() {
720 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
721 .await
722 .unwrap();
723
724 client.quit().await.unwrap();
725
726 let result = client.ping().await;
728 assert!(result.is_err());
729 }
730
731 #[tokio::test]
732 async fn test_batch() {
733 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
734 .await
735 .unwrap();
736
737 create_simple_rrd(&mut client, "test-batch-1".to_string()).await;
738 create_simple_rrd(&mut client, "test-batch-2".to_string()).await;
739
740 let commands = vec![
741 BatchUpdate::new("test-batch-1", None, vec![1.0]).unwrap(),
742 BatchUpdate::new("test-batch-2", None, vec![2.0]).unwrap(),
743 ];
744 client.batch(commands).await.unwrap();
745
746 let commands = vec![
749 BatchUpdate::new("test-batch-1", None, vec![3.0]).unwrap(),
750 BatchUpdate::new("test-batch-2", None, vec![4.0]).unwrap(),
751 ];
752 let result = client.batch(commands).await;
753 assert!(result.is_err());
754 }
755
756 #[serial]
757 #[tokio::test]
758 async fn test_fetch() {
759 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
760 .await
761 .unwrap();
762
763 client
764 .create(CreateArguments {
765 path: "test-fetch".to_string(),
766 data_sources: vec![
767 CreateDataSource {
768 name: "ds1".to_string(),
769 minimum: None,
770 maximum: None,
771 heartbeat: 10,
772 serie_type: CreateDataSourceType::Gauge,
773 },
774 CreateDataSource {
775 name: "ds2".to_string(),
776 minimum: Some(0.0),
777 maximum: Some(100.0),
778 heartbeat: 10,
779 serie_type: CreateDataSourceType::Gauge,
780 },
781 ],
782 round_robin_archives: vec![
783 CreateRoundRobinArchive {
784 consolidation_function: ConsolidationFunction::Average,
785 xfiles_factor: 0.5,
786 steps: 1,
787 rows: 10,
788 },
789 CreateRoundRobinArchive {
790 consolidation_function: ConsolidationFunction::Average,
791 xfiles_factor: 0.5,
792 steps: 10,
793 rows: 10,
794 },
795 ],
796 start_timestamp: 1609459200,
797 step_seconds: 1,
798 })
799 .await
800 .unwrap();
801
802 let result = client
803 .fetch(
804 "test-fetch",
805 ConsolidationFunction::Average,
806 None,
807 None,
808 None,
809 )
810 .await
811 .unwrap();
812
813 assert_eq!(result.flush_version, 1);
814 assert!(result.start > 0);
815 assert!(result.end > 0);
816 assert_eq!(result.step, 1);
817 assert_eq!(result.ds_count, 2);
818 assert_eq!(result.ds_names, vec!["ds1".to_string(), "ds2".to_string()]);
819 assert!(!result.data.is_empty());
820
821 let result = client
823 .fetch(
824 "test-fetch",
825 ConsolidationFunction::Average,
826 None,
827 Some(1609459200),
828 None,
829 )
830 .await;
831 assert!(result.is_err());
832
833 let result = client
834 .fetch(
835 "test-fetch",
836 ConsolidationFunction::Average,
837 Some(1609459200),
838 None,
839 Some(vec!["ds1".to_string(), "ds2".to_string()]),
840 )
841 .await;
842 assert!(result.is_err());
843
844 let now_timestamp = now_timestamp().unwrap();
845 let result = client
846 .fetch(
847 "test-fetch",
848 ConsolidationFunction::Average,
849 Some(now_timestamp as i64 - 10),
850 Some(now_timestamp as i64),
851 Some(vec!["not-found".to_string()]),
852 )
853 .await;
854
855 assert!(result.is_err());
856 let result = client
857 .fetch(
858 "test-fetch",
859 ConsolidationFunction::Average,
860 Some(now_timestamp as i64 - 10),
861 Some(now_timestamp as i64),
862 Some(vec!["ds2".to_string()]),
863 )
864 .await
865 .unwrap();
866 assert_eq!(result.ds_count, 1);
867
868 let result = client
870 .fetch(
871 "test-fetch",
872 ConsolidationFunction::Average,
873 Some(-10),
874 Some(0),
875 Some(vec!["ds2".to_string()]),
876 )
877 .await
878 .unwrap();
879 assert_eq!(result.ds_count, 1);
880 }
881}