1use crate::batch_update::BatchUpdate;
2use crate::consolidation_function::ConsolidationFunction;
3use crate::create::*;
4use crate::errors::RRDCachedClientError;
5use crate::fetch::FetchResponse;
6use crate::parsers::*;
7use crate::sanitisation::check_rrd_path;
8use std::collections::HashMap;
9use tokio::io::AsyncBufReadExt;
10use tokio::io::AsyncWriteExt;
11use tokio::net::UnixStream;
12use tokio::{io::BufReader, net::TcpStream};
13
14#[derive(Debug)]
16pub struct RRDCachedClient<T = TcpStream> {
17 stream: BufReader<T>,
18}
19
20impl RRDCachedClient<TcpStream> {
21 pub async fn connect_tcp(addr: &str) -> Result<Self, RRDCachedClientError> {
23 let stream = TcpStream::connect(addr).await?;
24 let stream = BufReader::new(stream);
25 Ok(Self { stream })
26 }
27}
28
29impl RRDCachedClient<UnixStream> {
30 pub async fn connect_unix(addr: &str) -> Result<Self, RRDCachedClientError> {
32 let stream = UnixStream::connect(addr).await?;
33 let stream = BufReader::new(stream);
34 Ok(Self { stream })
35 }
36}
37
38impl<T> RRDCachedClient<T>
39where
40 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
41{
42 fn assert_response_code(&self, code: i64, message: &str) -> Result<(), RRDCachedClientError> {
43 if code < 0 {
44 Err(RRDCachedClientError::UnexpectedResponse(
45 code,
46 message.to_string(),
47 ))
48 } else {
49 Ok(())
50 }
51 }
52
53 async fn read_line(&mut self) -> Result<String, RRDCachedClientError> {
54 let mut line = String::new();
55 self.stream.read_line(&mut line).await?;
56 Ok(line)
57 }
58
59 async fn read_n_lines(&mut self, n: usize) -> Result<Vec<String>, RRDCachedClientError> {
60 let mut lines = Vec::with_capacity(n);
61 for _ in 0..n {
62 lines.push(self.read_line().await?);
63 }
64 Ok(lines)
65 }
66
67 async fn send_command(
68 &mut self,
69 command: &str,
70 ) -> Result<(usize, String), RRDCachedClientError> {
71 self.stream.write_all(command.as_bytes()).await?;
73 let response_line = self.read_line().await?;
75 let (code, message) = parse_response_line(&response_line)?;
76 self.assert_response_code(code, message)?;
77 let nb_lines = usize::try_from(code).map_err(|_| {
78 RRDCachedClientError::UnexpectedResponse(code, "invalid number of lines".to_string())
79 })?;
80 Ok((nb_lines, message.to_string()))
81 }
82
83 pub async fn help(
87 &mut self,
88 command: Option<&str>,
89 ) -> Result<(String, Vec<String>), RRDCachedClientError> {
90 let command = match command {
91 Some(command) => {
92 let mut a = String::with_capacity(6 + command.len());
93 a.push_str("HELP ");
94 a.push_str(command);
95 a.push('\n');
96 a
97 }
98 None => "HELP\n".to_string(),
99 };
100 let (nb_lines, header) = self.send_command(&command).await?;
101 let lines = self.read_n_lines(nb_lines).await?;
102
103 Ok((header, lines))
104 }
105
106 pub async fn ping(&mut self) -> Result<(), RRDCachedClientError> {
108 let (_, message) = self.send_command("PING\n").await?;
109 if message != "PONG" {
110 return Err(RRDCachedClientError::UnexpectedResponse(
111 0,
112 message.to_string(),
113 ));
114 }
115 Ok(())
116 }
117
118 pub async fn create(&mut self, arguments: CreateArguments) -> Result<(), RRDCachedClientError> {
120 let arguments_str = arguments.to_str();
121 let mut command = String::with_capacity(7 + arguments_str.len());
122 command.push_str("CREATE ");
123 command.push_str(&arguments_str);
124 command.push('\n');
125 let (_, message) = self.send_command(&command).await?;
126 if message != "RRD created OK" {
127 return Err(RRDCachedClientError::UnexpectedResponse(
128 0,
129 message.to_string(),
130 ));
131 }
132 Ok(())
133 }
134
135 pub async fn flush(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
137 check_rrd_path(path)?;
138 let mut command = String::with_capacity(6 + path.len() + 5);
139 command.push_str("FLUSH ");
140 command.push_str(path);
141 command.push_str(".rrd\n");
142 let _ = self.send_command(&command).await?;
143 Ok(())
144 }
145
146 pub async fn flush_all(&mut self) -> Result<(), RRDCachedClientError> {
148 let _ = self.send_command("FLUSHALL\n").await?;
149 Ok(())
150 }
151
152 pub async fn pending(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
154 check_rrd_path(path)?;
155 let mut command = String::with_capacity(7 + path.len() + 5);
156 command.push_str("PENDING ");
157 command.push_str(path);
158 command.push_str(".rrd\n");
159 let (nb_lines, _) = self.send_command(&command).await?;
160 if nb_lines > 0 {
161 let lines = self.read_n_lines(nb_lines).await?;
162 Ok(lines)
163 } else {
164 Ok(vec![])
165 }
166 }
167
168 pub async fn forget(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
170 check_rrd_path(path)?;
171 let mut command = String::with_capacity(7 + path.len() + 5);
172 command.push_str("FORGET ");
173 command.push_str(path);
174 command.push_str(".rrd\n");
175 let _ = self.send_command(&command).await?;
176 Ok(())
177 }
178
179 pub async fn queue(&mut self) -> Result<Vec<(String, usize)>, RRDCachedClientError> {
181 let (nb_lines, _message) = self.send_command("QUEUE\n").await?;
182 let nb_lines = self.read_n_lines(nb_lines).await?;
183 let parsed_lines = nb_lines
184 .iter()
185 .map(|line| {
186 let (path, pending) = parse_queue_line(line)?;
187 Ok((path.to_string(), pending))
188 })
189 .collect::<Result<Vec<(String, usize)>, RRDCachedClientError>>()?;
190 Ok(parsed_lines)
191 }
192
193 pub async fn stats(&mut self) -> Result<HashMap<String, i64>, RRDCachedClientError> {
195 let (nb_lines, _message) = self.send_command("STATS\n").await?;
196 let lines = self.read_n_lines(nb_lines).await?;
197 let parsed_lines = lines
198 .iter()
199 .map(|line| {
200 let (name, value) = parse_stats_line(line)?;
201 Ok((name.to_string(), value))
202 })
203 .collect::<Result<HashMap<String, i64>, RRDCachedClientError>>()?;
204 Ok(parsed_lines)
205 }
206
207 pub async fn first(
209 &mut self,
210 path: &str,
211 round_robin_archive: Option<usize>,
212 ) -> Result<usize, RRDCachedClientError> {
213 check_rrd_path(path)?;
214 let round_robin_archive = round_robin_archive.unwrap_or(0);
215 let rranum_str = round_robin_archive.to_string();
216
217 let mut command = String::with_capacity(6 + path.len() + 5 + rranum_str.len() + 1);
218 command.push_str("FIRST ");
219 command.push_str(path);
220 command.push_str(".rrd ");
221 command.push_str(&rranum_str);
222 command.push('\n');
223 let (_, message) = self.send_command(&command).await?;
224 let timestamp = parse_timestamp(&message)?;
225 Ok(timestamp)
226 }
227
228 pub async fn last(&mut self, path: &str) -> Result<usize, RRDCachedClientError> {
230 check_rrd_path(path)?;
231
232 let mut command = String::with_capacity(5 + path.len() + 5);
233 command.push_str("LAST ");
234 command.push_str(path);
235 command.push_str(".rrd\n");
236 let (_, message) = self.send_command(&command).await?;
237 let timestamp = parse_timestamp(&message)?;
238 Ok(timestamp)
239 }
240
241 pub async fn info(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
243 check_rrd_path(path)?;
244 let mut command = String::with_capacity(5 + path.len() + 5);
245 command.push_str("INFO ");
246 command.push_str(path);
247 command.push_str(".rrd\n");
248 let (nb_lines, _message) = self.send_command(&command).await?;
249 let lines = self.read_n_lines(nb_lines).await?;
250 Ok(lines)
251 }
252
253 pub async fn list(
255 &mut self,
256 recursive: bool,
257 path: Option<&str>,
258 ) -> Result<Vec<String>, RRDCachedClientError> {
259 let path = path.unwrap_or("/");
260 let mut command =
261 String::with_capacity(5 + path.len() + 1 + (if recursive { 10 } else { 0 }));
262 command.push_str("LIST ");
263 if recursive {
264 command.push_str("RECURSIVE ");
265 }
266 command.push_str(path);
267 command.push('\n');
268 let (nb_lines, _message) = self.send_command(&command).await?;
269 let lines = self.read_n_lines(nb_lines).await?;
270 Ok(lines)
271 }
272
273 pub async fn suspend(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
275 check_rrd_path(path)?;
276 let mut command = String::with_capacity(8 + path.len() + 5);
277 command.push_str("SUSPEND ");
278 command.push_str(path);
279 command.push_str(".rrd\n");
280 let _ = self.send_command(&command).await?;
281 Ok(())
282 }
283
284 pub async fn resume(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
286 check_rrd_path(path)?;
287 let mut command = String::with_capacity(7 + path.len() + 5);
288 command.push_str("RESUME ");
289 command.push_str(path);
290 command.push_str(".rrd\n");
291 let _ = self.send_command(&command).await?;
292 Ok(())
293 }
294
295 pub async fn suspend_all(&mut self) -> Result<(), RRDCachedClientError> {
297 let _ = self.send_command("SUSPENDALL\n").await?;
298 Ok(())
299 }
300
301 pub async fn resume_all(&mut self) -> Result<(), RRDCachedClientError> {
303 let _ = self.send_command("RESUMEALL\n").await?;
304 Ok(())
305 }
306
307 pub async fn quit(&mut self) -> Result<(), RRDCachedClientError> {
309 self.stream.write_all("QUIT\n".as_bytes()).await?;
311 Ok(())
312 }
313
314 pub async fn update(
318 &mut self,
319 path: &str,
320 timestamp: Option<usize>,
321 data: Vec<f64>,
322 ) -> Result<(), RRDCachedClientError> {
323 let command = BatchUpdate::new(path, timestamp, data)?;
324 let command_str = command.to_command_string()?;
325 let _ = self.send_command(&command_str).await?;
326 Ok(())
327 }
328
329 pub async fn update_one(
333 &mut self,
334 path: &str,
335 timestamp: Option<usize>,
336 data: f64,
337 ) -> Result<(), RRDCachedClientError> {
338 self.update(path, timestamp, vec![data]).await
339 }
340
341 pub async fn batch(&mut self, commands: Vec<BatchUpdate>) -> Result<(), RRDCachedClientError> {
347 let _ = self.send_command("BATCH\n").await?;
348 for command in commands {
349 let command_str = command.to_command_string()?;
350 self.stream.write_all(command_str.as_bytes()).await?;
352 }
353 let (nb_lines, message) = self.send_command(".\n").await?;
355
356 if nb_lines > 0 {
358 let lines = self.read_n_lines(nb_lines).await?;
359 return Err(RRDCachedClientError::BatchUpdateErrorResponse(
360 message, lines,
361 ));
362 }
363 Ok(())
364 }
365
366 pub async fn fetch(
374 &mut self,
375 path: &str,
376 consolidation_function: ConsolidationFunction,
377 start: Option<i64>,
378 end: Option<i64>,
379 columns: Option<Vec<String>>,
380 ) -> Result<FetchResponse, RRDCachedClientError> {
381 check_rrd_path(path)?;
382 let consolidation_function_str = consolidation_function.to_str();
383 let mut capacity = 6 + path.len() + 5 + consolidation_function_str.len() + 1;
385 let mut start_str: Option<String> = None;
386 let mut end_str: Option<String> = None;
387 let mut columns_str: Option<String> = None;
388 match start {
389 Some(start) => {
390 let start_string = start.to_string();
391 capacity += 1 + start_string.len();
392 start_str = Some(start_string);
393
394 if let Some(end) = end {
395 let end_string = end.to_string();
396 capacity += 1 + end_string.len();
397 end_str = Some(end_string);
398 if let Some(columns) = columns {
399 let columns_string = columns.join(" ");
400 capacity += 1 + columns_string.len();
401 columns_str = Some(columns_string);
402 }
403 } else if columns.is_some() {
404 return Err(RRDCachedClientError::InvalidFetch(
405 "end must be specified".to_string(),
406 ));
407 }
408 }
409 None => {
410 if end.is_some() || columns.is_some() {
411 return Err(RRDCachedClientError::InvalidFetch(
412 "start must be specified".to_string(),
413 ));
414 }
415 }
416 }
417 let mut command = String::with_capacity(capacity);
418 command.push_str("FETCH ");
419 command.push_str(path);
420 command.push_str(".rrd ");
421 command.push_str(consolidation_function_str);
422 if let Some(start_str) = start_str {
423 command.push(' ');
424 command.push_str(&start_str);
425 if let Some(end_str) = end_str {
426 command.push(' ');
427 command.push_str(&end_str);
428 if let Some(columns_str) = columns_str {
429 command.push(' ');
430 command.push_str(&columns_str);
431 }
432 }
433 }
434 command.push('\n');
435 debug_assert_eq!(command.len(), capacity);
436
437 let (nb_lines, _message) = self.send_command(&command).await?;
438 let lines = self.read_n_lines(nb_lines).await?;
439
440 let response = FetchResponse::from_lines(lines)?;
441
442 Ok(response)
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use crate::now::now_timestamp;
449
450 use super::*;
451 use serial_test::serial;
452
453 #[tokio::test]
454 async fn test_ping_tcp() {
455 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
456 .await
457 .unwrap();
458 client.ping().await.unwrap();
459 }
460
461 #[tokio::test]
462 async fn test_ping_unix() {
463 let mut client = RRDCachedClient::connect_unix("./rrdcached.sock")
464 .await
465 .unwrap();
466 client.ping().await.unwrap();
467 }
468
469 #[tokio::test]
470 async fn test_help() {
471 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
472 .await
473 .unwrap();
474
475 let (header, lines) = client.help(None).await.unwrap();
476 assert_eq!(header, "Command overview");
477 assert_eq!(lines.len(), 22);
478
479 let (header, lines) = client.help(Some("PING")).await.unwrap();
480 assert_eq!(header, "Help for PING");
481 assert!(!lines.is_empty());
482 }
483
484 async fn create_simple_rrd(client: &mut RRDCachedClient<TcpStream>, name: String) {
485 client
486 .create(CreateArguments {
487 path: name,
488 data_sources: vec![CreateDataSource {
489 name: "ds1".to_string(),
490 minimum: None,
491 maximum: None,
492 heartbeat: 10,
493 serie_type: CreateDataSourceType::Gauge,
494 }],
495 round_robin_archives: vec![CreateRoundRobinArchive {
496 consolidation_function: ConsolidationFunction::Average,
497 xfiles_factor: 0.5,
498 steps: 1,
499 rows: 100,
500 }],
501 start_timestamp: 1609459200,
502 step_seconds: 1,
503 })
504 .await
505 .unwrap();
506 }
507
508 #[tokio::test]
509 async fn test_create() {
510 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
511 .await
512 .unwrap();
513
514 client
515 .create(CreateArguments {
516 path: "test-create".to_string(),
517 data_sources: vec![
518 CreateDataSource {
519 name: "ds1".to_string(),
520 minimum: None,
521 maximum: None,
522 heartbeat: 10,
523 serie_type: CreateDataSourceType::Gauge,
524 },
525 CreateDataSource {
526 name: "ds2".to_string(),
527 minimum: Some(0.0),
528 maximum: Some(100.0),
529 heartbeat: 10,
530 serie_type: CreateDataSourceType::Gauge,
531 },
532 ],
533 round_robin_archives: vec![
534 CreateRoundRobinArchive {
535 consolidation_function: ConsolidationFunction::Average,
536 xfiles_factor: 0.5,
537 steps: 1,
538 rows: 10,
539 },
540 CreateRoundRobinArchive {
541 consolidation_function: ConsolidationFunction::Average,
542 xfiles_factor: 0.5,
543 steps: 10,
544 rows: 10,
545 },
546 ],
547 start_timestamp: 1609459200,
548 step_seconds: 1,
549 })
550 .await
551 .unwrap();
552 }
553
554 #[tokio::test]
555 async fn test_update() {
556 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
557 .await
558 .unwrap();
559
560 create_simple_rrd(&mut client, "test-update".to_string()).await;
561
562 client.update_one("test-update", None, 4.2).await.unwrap();
563 }
564
565 #[tokio::test]
566 async fn test_error() {
567 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
568 .await
569 .unwrap();
570
571 let result = client.list(false, Some("not-found-path")).await;
572 assert!(result.is_err());
573 }
574
575 #[tokio::test]
576 async fn test_flush() {
577 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
578 .await
579 .unwrap();
580
581 create_simple_rrd(&mut client, "test-flush".to_string()).await;
582
583 client.update_one("test-flush", None, 4.2).await.unwrap();
584 client.flush("test-flush").await.unwrap();
585 }
586
587 #[tokio::test]
588 async fn test_flush_all() {
589 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
590 .await
591 .unwrap();
592
593 client.flush_all().await.unwrap();
594 }
595
596 #[serial]
597 #[tokio::test]
598 async fn test_pending() {
599 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
601
602 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
603 .await
604 .unwrap();
605
606 create_simple_rrd(&mut client, "test-pending".to_string()).await;
607
608 client.update_one("test-pending", None, 4.2).await.unwrap();
609
610 let lines = client.pending("test-pending").await.unwrap();
611 assert_eq!(lines.len(), 1);
612
613 client.flush("test-pending").await.unwrap();
615 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
617
618 let lines = client.pending("test-pending").await.unwrap();
619 assert!(lines.is_empty());
620 }
621
622 #[tokio::test]
623 async fn test_forget() {
624 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
625 .await
626 .unwrap();
627
628 create_simple_rrd(&mut client, "test-forget".to_string()).await;
629
630 client.update_one("test-forget", None, 4.2).await.unwrap();
631 client.forget("test-forget").await.unwrap();
632 }
633
634 #[tokio::test]
635 async fn test_queue() {
636 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
637 .await
638 .unwrap();
639
640 let lines = client.queue().await.unwrap();
641
642 assert!(lines.is_empty());
644 }
645
646 #[tokio::test]
647 async fn test_stats() {
648 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
649 .await
650 .unwrap();
651
652 let stats = client.stats().await.unwrap();
653 assert!(!stats.is_empty());
654 }
655
656 #[tokio::test]
657 async fn test_first_and_last() {
658 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
659 .await
660 .unwrap();
661
662 create_simple_rrd(&mut client, "test-first".to_string()).await;
663
664 let _ = client.update_one("test-first", Some(1612345678), 4.2).await;
666
667 let timestamp = client.first("test-first", None).await.unwrap();
668 assert_eq!(timestamp, 1609459101); let timestamp = client.last("test-first").await.unwrap();
671 assert_eq!(timestamp, 1612345678);
672 }
673
674 #[tokio::test]
675 async fn test_info() {
676 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
677 .await
678 .unwrap();
679
680 create_simple_rrd(&mut client, "test-info".to_string()).await;
681 let lines = client.info("test-info").await.unwrap();
682 assert!(!lines.is_empty());
683 }
684
685 #[tokio::test]
686 async fn test_list() {
687 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
688 .await
689 .unwrap();
690
691 let lines = client.list(true, None).await.unwrap();
692 assert!(!lines.is_empty());
693 }
694
695 #[serial]
696 #[tokio::test]
697 async fn test_suspend_and_resume() {
698 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
699 .await
700 .unwrap();
701
702 create_simple_rrd(&mut client, "test-suspend".to_string()).await;
703
704 client.update_one("test-suspend", None, 4.2).await.unwrap();
706 client.flush("test-suspend").await.unwrap();
707
708 client.suspend("test-suspend").await.unwrap();
709 client.resume("test-suspend").await.unwrap();
710 }
711
712 #[serial]
713 #[tokio::test]
714 async fn test_suspend_all_and_resume_all() {
715 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
716 .await
717 .unwrap();
718
719 client.suspend_all().await.unwrap();
720 client.resume_all().await.unwrap();
721 }
722
723 #[tokio::test]
724 async fn test_quit() {
725 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
726 .await
727 .unwrap();
728
729 client.quit().await.unwrap();
730
731 let result = client.ping().await;
733 assert!(result.is_err());
734 }
735
736 #[tokio::test]
737 async fn test_batch() {
738 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
739 .await
740 .unwrap();
741
742 create_simple_rrd(&mut client, "test-batch-1".to_string()).await;
743 create_simple_rrd(&mut client, "test-batch-2".to_string()).await;
744
745 let timestamp = now_timestamp().unwrap();
746
747 let commands = vec![
748 BatchUpdate::new("test-batch-1", Some(timestamp), vec![1.0]).unwrap(),
749 BatchUpdate::new("test-batch-2", Some(timestamp), vec![2.0]).unwrap(),
750 ];
751 client.batch(commands).await.unwrap();
752
753 let commands = vec![
755 BatchUpdate::new("test-batch-1", Some(timestamp), vec![3.0]).unwrap(),
756 BatchUpdate::new("test-batch-2", Some(timestamp), vec![4.0]).unwrap(),
757 ];
758 let result = client.batch(commands).await;
759 assert!(result.is_err());
760 }
761
762 #[serial]
763 #[tokio::test]
764 async fn test_fetch() {
765 let mut client = RRDCachedClient::connect_tcp("localhost:42217")
766 .await
767 .unwrap();
768
769 client
770 .create(CreateArguments {
771 path: "test-fetch".to_string(),
772 data_sources: vec![
773 CreateDataSource {
774 name: "ds1".to_string(),
775 minimum: None,
776 maximum: None,
777 heartbeat: 10,
778 serie_type: CreateDataSourceType::Gauge,
779 },
780 CreateDataSource {
781 name: "ds2".to_string(),
782 minimum: Some(0.0),
783 maximum: Some(100.0),
784 heartbeat: 10,
785 serie_type: CreateDataSourceType::Gauge,
786 },
787 ],
788 round_robin_archives: vec![
789 CreateRoundRobinArchive {
790 consolidation_function: ConsolidationFunction::Average,
791 xfiles_factor: 0.5,
792 steps: 1,
793 rows: 10,
794 },
795 CreateRoundRobinArchive {
796 consolidation_function: ConsolidationFunction::Average,
797 xfiles_factor: 0.5,
798 steps: 10,
799 rows: 10,
800 },
801 ],
802 start_timestamp: 1609459200,
803 step_seconds: 1,
804 })
805 .await
806 .unwrap();
807
808 let result = client
809 .fetch(
810 "test-fetch",
811 ConsolidationFunction::Average,
812 None,
813 None,
814 None,
815 )
816 .await
817 .unwrap();
818
819 assert_eq!(result.flush_version, 1);
820 assert!(result.start > 0);
821 assert!(result.end > 0);
822 assert_eq!(result.step, 1);
823 assert_eq!(result.ds_count, 2);
824 assert_eq!(result.ds_names, vec!["ds1".to_string(), "ds2".to_string()]);
825 assert!(!result.data.is_empty());
826
827 let result = client
829 .fetch(
830 "test-fetch",
831 ConsolidationFunction::Average,
832 None,
833 Some(1609459200),
834 None,
835 )
836 .await;
837 assert!(result.is_err());
838
839 let result = client
840 .fetch(
841 "test-fetch",
842 ConsolidationFunction::Average,
843 Some(1609459200),
844 None,
845 Some(vec!["ds1".to_string(), "ds2".to_string()]),
846 )
847 .await;
848 assert!(result.is_err());
849
850 let now_timestamp = now_timestamp().unwrap();
851 let result = client
852 .fetch(
853 "test-fetch",
854 ConsolidationFunction::Average,
855 Some(now_timestamp as i64 - 10),
856 Some(now_timestamp as i64),
857 Some(vec!["not-found".to_string()]),
858 )
859 .await;
860
861 assert!(result.is_err());
862 let result = client
863 .fetch(
864 "test-fetch",
865 ConsolidationFunction::Average,
866 Some(now_timestamp as i64 - 10),
867 Some(now_timestamp as i64),
868 Some(vec!["ds2".to_string()]),
869 )
870 .await
871 .unwrap();
872 assert_eq!(result.ds_count, 1);
873
874 let result = client
876 .fetch(
877 "test-fetch",
878 ConsolidationFunction::Average,
879 Some(-10),
880 Some(0),
881 Some(vec!["ds2".to_string()]),
882 )
883 .await
884 .unwrap();
885 assert_eq!(result.ds_count, 1);
886 }
887}