rrdcached_client/
client.rs

1use crate::batch_update::BatchUpdate;
2use crate::consolidation_function::ConsolidationFunction;
3use crate::create::*;
4use crate::errors::RRDCachedClientError;
5use crate::fetch::FetchResponse;
6use crate::parsers::*;
7use crate::sanitisation::check_rrd_path;
8use std::collections::HashMap;
9use tokio::io::AsyncBufReadExt;
10use tokio::io::AsyncWriteExt;
11use tokio::net::UnixStream;
12use tokio::{io::BufReader, net::TcpStream};
13
14/// A client to interact with a RRDCached server.
15#[derive(Debug)]
16pub struct RRDCachedClient<T = TcpStream> {
17    stream: BufReader<T>,
18}
19
20impl RRDCachedClient<TcpStream> {
21    /// Connect to a RRDCached server over TCP.
22    pub async fn connect_tcp(addr: &str) -> Result<Self, RRDCachedClientError> {
23        let stream = TcpStream::connect(addr).await?;
24        let stream = BufReader::new(stream);
25        Ok(Self { stream })
26    }
27}
28
29impl RRDCachedClient<UnixStream> {
30    /// Connect to a RRDCached server over a Unix socket.
31    pub async fn connect_unix(addr: &str) -> Result<Self, RRDCachedClientError> {
32        let stream = UnixStream::connect(addr).await?;
33        let stream = BufReader::new(stream);
34        Ok(Self { stream })
35    }
36}
37
38impl<T> RRDCachedClient<T>
39where
40    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
41{
42    fn assert_response_code(&self, code: i64, message: &str) -> Result<(), RRDCachedClientError> {
43        if code < 0 {
44            Err(RRDCachedClientError::UnexpectedResponse(
45                code,
46                message.to_string(),
47            ))
48        } else {
49            Ok(())
50        }
51    }
52
53    async fn read_line(&mut self) -> Result<String, RRDCachedClientError> {
54        let mut line = String::new();
55        self.stream.read_line(&mut line).await?;
56        Ok(line)
57    }
58
59    async fn read_n_lines(&mut self, n: usize) -> Result<Vec<String>, RRDCachedClientError> {
60        let mut lines = Vec::with_capacity(n);
61        for _ in 0..n {
62            lines.push(self.read_line().await?);
63        }
64        Ok(lines)
65    }
66
67    async fn send_command(
68        &mut self,
69        command: &str,
70    ) -> Result<(usize, String), RRDCachedClientError> {
71        // Send command
72        self.stream.write_all(command.as_bytes()).await?;
73        // Read response
74        let response_line = self.read_line().await?;
75        let (code, message) = parse_response_line(&response_line)?;
76        self.assert_response_code(code, message)?;
77        let nb_lines = usize::try_from(code).map_err(|_| {
78            RRDCachedClientError::UnexpectedResponse(code, "invalid number of lines".to_string())
79        })?;
80        Ok((nb_lines, message.to_string()))
81    }
82
83    /// Retreive documentation for humans.
84    ///
85    /// Can take an optional command to get help for a specific command.
86    pub async fn help(
87        &mut self,
88        command: Option<&str>,
89    ) -> Result<(String, Vec<String>), RRDCachedClientError> {
90        let command = match command {
91            Some(command) => {
92                let mut a = String::with_capacity(6 + command.len());
93                a.push_str("HELP ");
94                a.push_str(command);
95                a.push('\n');
96                a
97            }
98            None => "HELP\n".to_string(),
99        };
100        let (nb_lines, header) = self.send_command(&command).await?;
101        let lines = self.read_n_lines(nb_lines).await?;
102
103        Ok((header, lines))
104    }
105
106    /// Ping the server to check if it's alive.
107    pub async fn ping(&mut self) -> Result<(), RRDCachedClientError> {
108        let (_, message) = self.send_command("PING\n").await?;
109        assert!(message == "PONG");
110        Ok(())
111    }
112
113    /// Create a new RRD
114    pub async fn create(&mut self, arguments: CreateArguments) -> Result<(), RRDCachedClientError> {
115        let arguments_str = arguments.to_str();
116        let mut command = String::with_capacity(7 + arguments_str.len());
117        command.push_str("CREATE ");
118        command.push_str(&arguments_str);
119        command.push('\n');
120        let (_, message) = self.send_command(&command).await?;
121        if message != "RRD created OK" {
122            return Err(RRDCachedClientError::UnexpectedResponse(
123                0,
124                message.to_string(),
125            ));
126        }
127        Ok(())
128    }
129
130    /// Flush a RRD
131    pub async fn flush(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
132        check_rrd_path(path)?;
133        let mut command = String::with_capacity(6 + path.len() + 5);
134        command.push_str("FLUSH ");
135        command.push_str(path);
136        command.push_str(".rrd\n");
137        let _ = self.send_command(&command).await?;
138        Ok(())
139    }
140
141    /// Flush all RRDs
142    pub async fn flush_all(&mut self) -> Result<(), RRDCachedClientError> {
143        let _ = self.send_command("FLUSHALL\n").await?;
144        Ok(())
145    }
146
147    /// Pending updates
148    pub async fn pending(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
149        check_rrd_path(path)?;
150        let mut command = String::with_capacity(7 + path.len() + 5);
151        command.push_str("PENDING ");
152        command.push_str(path);
153        command.push_str(".rrd\n");
154        let (nb_lines, _) = self.send_command(&command).await?;
155        if nb_lines > 0 {
156            let lines = self.read_n_lines(nb_lines).await?;
157            Ok(lines)
158        } else {
159            Ok(vec![])
160        }
161    }
162
163    /// Forget pending updates
164    pub async fn forget(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
165        check_rrd_path(path)?;
166        let mut command = String::with_capacity(7 + path.len() + 5);
167        command.push_str("FORGET ");
168        command.push_str(path);
169        command.push_str(".rrd\n");
170        let _ = self.send_command(&command).await?;
171        Ok(())
172    }
173
174    /// Get the queue information
175    pub async fn queue(&mut self) -> Result<Vec<(String, usize)>, RRDCachedClientError> {
176        let (nb_lines, _message) = self.send_command("QUEUE\n").await?;
177        let nb_lines = self.read_n_lines(nb_lines).await?;
178        let parsed_lines = nb_lines
179            .iter()
180            .map(|line| {
181                let (path, pending) = parse_queue_line(line)?;
182                Ok((path.to_string(), pending))
183            })
184            .collect::<Result<Vec<(String, usize)>, RRDCachedClientError>>()?;
185        Ok(parsed_lines)
186    }
187
188    /// Get the server stats
189    pub async fn stats(&mut self) -> Result<HashMap<String, i64>, RRDCachedClientError> {
190        let (nb_lines, _message) = self.send_command("STATS\n").await?;
191        let lines = self.read_n_lines(nb_lines).await?;
192        let parsed_lines = lines
193            .iter()
194            .map(|line| {
195                let (name, value) = parse_stats_line(line)?;
196                Ok((name.to_string(), value))
197            })
198            .collect::<Result<HashMap<String, i64>, RRDCachedClientError>>()?;
199        Ok(parsed_lines)
200    }
201
202    /// Get the first CDP (whatever that is)
203    pub async fn first(
204        &mut self,
205        path: &str,
206        round_robin_archive: Option<usize>,
207    ) -> Result<usize, RRDCachedClientError> {
208        check_rrd_path(path)?;
209        let round_robin_archive = round_robin_archive.unwrap_or(0);
210        let rranum_str = round_robin_archive.to_string();
211
212        let mut command = String::with_capacity(6 + path.len() + 5 + rranum_str.len() + 1);
213        command.push_str("FIRST ");
214        command.push_str(path);
215        command.push_str(".rrd ");
216        command.push_str(&rranum_str);
217        command.push('\n');
218        let (_, message) = self.send_command(&command).await?;
219        let timestamp = parse_timestamp(&message)?;
220        Ok(timestamp)
221    }
222
223    /// Retrieve the last update timestamp
224    pub async fn last(&mut self, path: &str) -> Result<usize, RRDCachedClientError> {
225        check_rrd_path(path)?;
226
227        let mut command = String::with_capacity(5 + path.len() + 5);
228        command.push_str("LAST ");
229        command.push_str(path);
230        command.push_str(".rrd\n");
231        let (_, message) = self.send_command(&command).await?;
232        let timestamp = parse_timestamp(&message)?;
233        Ok(timestamp)
234    }
235
236    /// Retreive information about a RRD
237    pub async fn info(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
238        check_rrd_path(path)?;
239        let mut command = String::with_capacity(5 + path.len() + 5);
240        command.push_str("INFO ");
241        command.push_str(path);
242        command.push_str(".rrd\n");
243        let (nb_lines, _message) = self.send_command(&command).await?;
244        let lines = self.read_n_lines(nb_lines).await?;
245        Ok(lines)
246    }
247
248    /// List RRDs
249    pub async fn list(
250        &mut self,
251        recursive: bool,
252        path: Option<&str>,
253    ) -> Result<Vec<String>, RRDCachedClientError> {
254        let path = path.unwrap_or("/");
255        let mut command =
256            String::with_capacity(5 + path.len() + 1 + (if recursive { 10 } else { 0 }));
257        command.push_str("LIST ");
258        if recursive {
259            command.push_str("RECURSIVE ");
260        }
261        command.push_str(path);
262        command.push('\n');
263        let (nb_lines, _message) = self.send_command(&command).await?;
264        let lines = self.read_n_lines(nb_lines).await?;
265        Ok(lines)
266    }
267
268    /// Suspend a RRD
269    pub async fn suspend(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
270        check_rrd_path(path)?;
271        let mut command = String::with_capacity(8 + path.len() + 5);
272        command.push_str("SUSPEND ");
273        command.push_str(path);
274        command.push_str(".rrd\n");
275        let _ = self.send_command(&command).await?;
276        Ok(())
277    }
278
279    /// Resume a RRD
280    pub async fn resume(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
281        check_rrd_path(path)?;
282        let mut command = String::with_capacity(7 + path.len() + 5);
283        command.push_str("RESUME ");
284        command.push_str(path);
285        command.push_str(".rrd\n");
286        let _ = self.send_command(&command).await?;
287        Ok(())
288    }
289
290    /// Suspend all RRDs
291    pub async fn suspend_all(&mut self) -> Result<(), RRDCachedClientError> {
292        let _ = self.send_command("SUSPENDALL\n").await?;
293        Ok(())
294    }
295
296    /// Resume all RRDs
297    pub async fn resume_all(&mut self) -> Result<(), RRDCachedClientError> {
298        let _ = self.send_command("RESUMEALL\n").await?;
299        Ok(())
300    }
301
302    /// Close the connection to the server
303    pub async fn quit(&mut self) -> Result<(), RRDCachedClientError> {
304        // Send directly without checking the response
305        self.stream.write_all("QUIT\n".as_bytes()).await?;
306        Ok(())
307    }
308
309    /// Update a RRD with a list of values at a specific timestamp
310    ///
311    /// The order is important as it must match the order of the data sources in the RRD
312    pub async fn update(
313        &mut self,
314        path: &str,
315        timestamp: Option<usize>,
316        data: Vec<f64>,
317    ) -> Result<(), RRDCachedClientError> {
318        let command = BatchUpdate::new(path, timestamp, data)?;
319        let command_str = command.to_command_string()?;
320        let _ = self.send_command(&command_str).await?;
321        Ok(())
322    }
323
324    /// Update a RRD with a single value at a specific timestamp.
325    ///
326    /// Convenient helper when a RRD contains only one data source.
327    pub async fn update_one(
328        &mut self,
329        path: &str,
330        timestamp: Option<usize>,
331        data: f64,
332    ) -> Result<(), RRDCachedClientError> {
333        self.update(path, timestamp, vec![data]).await
334    }
335
336    /// Batch updates.
337    ///
338    /// RRDCached presents this as a more efficient way to update multiple RRDs at once.
339    /// You may want to sort the updates by timestamp ascending as RDDtool will
340    /// reject updates of older timestamps.
341    pub async fn batch(&mut self, commands: Vec<BatchUpdate>) -> Result<(), RRDCachedClientError> {
342        let _ = self.send_command("BATCH\n").await?;
343        for command in commands {
344            let command_str = command.to_command_string()?;
345            // write the command directly
346            self.stream.write_all(command_str.as_bytes()).await?;
347        }
348        // Send a dot to end the batch
349        let (nb_lines, message) = self.send_command(".\n").await?;
350
351        // It returns errors line by line if there are any
352        if nb_lines > 0 {
353            let lines = self.read_n_lines(nb_lines).await?;
354            return Err(RRDCachedClientError::BatchUpdateErrorResponse(
355                message, lines,
356            ));
357        }
358        Ok(())
359    }
360
361    /// Fetch the content of a Round Robin Database (RRD)
362    ///
363    /// Note that we use the Ascii protocol, as the binary protocol is not documented
364    /// and it's unsure whether it's consitent between versions of RRDCached or
365    /// system architectures.
366    ///
367    ///
368    pub async fn fetch(
369        &mut self,
370        path: &str,
371        consolidation_function: ConsolidationFunction,
372        start: Option<i64>,
373        end: Option<i64>,
374        columns: Option<Vec<String>>,
375    ) -> Result<FetchResponse, RRDCachedClientError> {
376        check_rrd_path(path)?;
377        let consolidation_function_str = consolidation_function.to_str();
378        // FETCH path.rrd CF [--start start] [--end end] [--columns columns]
379        let mut capacity = 6 + path.len() + 5 + consolidation_function_str.len() + 1;
380        let mut start_str: Option<String> = None;
381        let mut end_str: Option<String> = None;
382        let mut columns_str: Option<String> = None;
383        match start {
384            Some(start) => {
385                let start_string = start.to_string();
386                capacity += 1 + start_string.len();
387                start_str = Some(start_string);
388
389                if let Some(end) = end {
390                    let end_string = end.to_string();
391                    capacity += 1 + end_string.len();
392                    end_str = Some(end_string);
393                    if let Some(columns) = columns {
394                        let columns_string = columns.join(" ");
395                        capacity += 1 + columns_string.len();
396                        columns_str = Some(columns_string);
397                    }
398                } else if columns.is_some() {
399                    return Err(RRDCachedClientError::InvalidFetch(
400                        "end must be specified".to_string(),
401                    ));
402                }
403            }
404            None => {
405                if end.is_some() || columns.is_some() {
406                    return Err(RRDCachedClientError::InvalidFetch(
407                        "start must be specified".to_string(),
408                    ));
409                }
410            }
411        }
412        let mut command = String::with_capacity(capacity);
413        command.push_str("FETCH ");
414        command.push_str(path);
415        command.push_str(".rrd ");
416        command.push_str(consolidation_function_str);
417        if let Some(start_str) = start_str {
418            command.push(' ');
419            command.push_str(&start_str);
420            if let Some(end_str) = end_str {
421                command.push(' ');
422                command.push_str(&end_str);
423                if let Some(columns_str) = columns_str {
424                    command.push(' ');
425                    command.push_str(&columns_str);
426                }
427            }
428        }
429        command.push('\n');
430        assert!(command.len() == capacity);
431
432        let (nb_lines, _message) = self.send_command(&command).await?;
433        let lines = self.read_n_lines(nb_lines).await?;
434
435        let response = FetchResponse::from_lines(lines)?;
436
437        Ok(response)
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use crate::now::now_timestamp;
444
445    use super::*;
446    use serial_test::serial;
447
448    #[tokio::test]
449    async fn test_ping_tcp() {
450        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
451            .await
452            .unwrap();
453        client.ping().await.unwrap();
454    }
455
456    #[tokio::test]
457    async fn test_ping_unix() {
458        let mut client = RRDCachedClient::connect_unix("./rrdcached.sock")
459            .await
460            .unwrap();
461        client.ping().await.unwrap();
462    }
463
464    #[tokio::test]
465    async fn test_help() {
466        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
467            .await
468            .unwrap();
469
470        let (header, lines) = client.help(None).await.unwrap();
471        assert_eq!(header, "Command overview");
472        assert_eq!(lines.len(), 22);
473
474        let (header, lines) = client.help(Some("PING")).await.unwrap();
475        assert_eq!(header, "Help for PING");
476        assert!(!lines.is_empty());
477    }
478
479    async fn create_simple_rrd(client: &mut RRDCachedClient<TcpStream>, name: String) {
480        client
481            .create(CreateArguments {
482                path: name,
483                data_sources: vec![CreateDataSource {
484                    name: "ds1".to_string(),
485                    minimum: None,
486                    maximum: None,
487                    heartbeat: 10,
488                    serie_type: CreateDataSourceType::Gauge,
489                }],
490                round_robin_archives: vec![CreateRoundRobinArchive {
491                    consolidation_function: ConsolidationFunction::Average,
492                    xfiles_factor: 0.5,
493                    steps: 1,
494                    rows: 100,
495                }],
496                start_timestamp: 1609459200,
497                step_seconds: 1,
498            })
499            .await
500            .unwrap();
501    }
502
503    #[tokio::test]
504    async fn test_create() {
505        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
506            .await
507            .unwrap();
508
509        client
510            .create(CreateArguments {
511                path: "test-create".to_string(),
512                data_sources: vec![
513                    CreateDataSource {
514                        name: "ds1".to_string(),
515                        minimum: None,
516                        maximum: None,
517                        heartbeat: 10,
518                        serie_type: CreateDataSourceType::Gauge,
519                    },
520                    CreateDataSource {
521                        name: "ds2".to_string(),
522                        minimum: Some(0.0),
523                        maximum: Some(100.0),
524                        heartbeat: 10,
525                        serie_type: CreateDataSourceType::Gauge,
526                    },
527                ],
528                round_robin_archives: vec![
529                    CreateRoundRobinArchive {
530                        consolidation_function: ConsolidationFunction::Average,
531                        xfiles_factor: 0.5,
532                        steps: 1,
533                        rows: 10,
534                    },
535                    CreateRoundRobinArchive {
536                        consolidation_function: ConsolidationFunction::Average,
537                        xfiles_factor: 0.5,
538                        steps: 10,
539                        rows: 10,
540                    },
541                ],
542                start_timestamp: 1609459200,
543                step_seconds: 1,
544            })
545            .await
546            .unwrap();
547    }
548
549    #[tokio::test]
550    async fn test_update() {
551        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
552            .await
553            .unwrap();
554
555        create_simple_rrd(&mut client, "test-update".to_string()).await;
556
557        client.update_one("test-update", None, 4.2).await.unwrap();
558    }
559
560    #[tokio::test]
561    async fn test_error() {
562        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
563            .await
564            .unwrap();
565
566        let result = client.list(false, Some("not-found-path")).await;
567        assert!(result.is_err());
568    }
569
570    #[tokio::test]
571    async fn test_flush() {
572        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
573            .await
574            .unwrap();
575
576        create_simple_rrd(&mut client, "test-flush".to_string()).await;
577
578        client.update_one("test-flush", None, 4.2).await.unwrap();
579        client.flush("test-flush").await.unwrap();
580    }
581
582    #[tokio::test]
583    async fn test_flush_all() {
584        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
585            .await
586            .unwrap();
587
588        client.flush_all().await.unwrap();
589    }
590
591    #[serial]
592    #[tokio::test]
593    async fn test_pending() {
594        // Wait 0.1s
595        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
596
597        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
598            .await
599            .unwrap();
600
601        create_simple_rrd(&mut client, "test-pending".to_string()).await;
602
603        client.update_one("test-pending", None, 4.2).await.unwrap();
604
605        let lines = client.pending("test-pending").await.unwrap();
606        assert_eq!(lines.len(), 1);
607
608        // Flush
609        client.flush("test-pending").await.unwrap();
610        // Wait 0.1s
611        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
612
613        let lines = client.pending("test-pending").await.unwrap();
614        assert!(lines.is_empty());
615    }
616
617    #[tokio::test]
618    async fn test_forget() {
619        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
620            .await
621            .unwrap();
622
623        create_simple_rrd(&mut client, "test-forget".to_string()).await;
624
625        client.update_one("test-forget", None, 4.2).await.unwrap();
626        client.forget("test-forget").await.unwrap();
627    }
628
629    #[tokio::test]
630    async fn test_queue() {
631        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
632            .await
633            .unwrap();
634
635        let lines = client.queue().await.unwrap();
636
637        // I didn't manage to get a non-empty queue...
638        assert!(lines.is_empty());
639    }
640
641    #[tokio::test]
642    async fn test_stats() {
643        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
644            .await
645            .unwrap();
646
647        let stats = client.stats().await.unwrap();
648        assert!(!stats.is_empty());
649    }
650
651    #[tokio::test]
652    async fn test_first_and_last() {
653        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
654            .await
655            .unwrap();
656
657        create_simple_rrd(&mut client, "test-first".to_string()).await;
658
659        // This can fail for subsequent runs
660        let _ = client.update_one("test-first", Some(1612345678), 4.2).await;
661
662        let timestamp = client.first("test-first", None).await.unwrap();
663        assert_eq!(timestamp, 1609459101); // I'm guessing some alignment is happening
664
665        let timestamp = client.last("test-first").await.unwrap();
666        assert_eq!(timestamp, 1612345678);
667    }
668
669    #[tokio::test]
670    async fn test_info() {
671        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
672            .await
673            .unwrap();
674
675        create_simple_rrd(&mut client, "test-info".to_string()).await;
676        let lines = client.info("test-info").await.unwrap();
677        assert!(!lines.is_empty());
678    }
679
680    #[tokio::test]
681    async fn test_list() {
682        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
683            .await
684            .unwrap();
685
686        let lines = client.list(true, None).await.unwrap();
687        assert!(!lines.is_empty());
688    }
689
690    #[serial]
691    #[tokio::test]
692    async fn test_suspend_and_resume() {
693        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
694            .await
695            .unwrap();
696
697        create_simple_rrd(&mut client, "test-suspend".to_string()).await;
698
699        // insert some data and flush
700        client.update_one("test-suspend", None, 4.2).await.unwrap();
701        client.flush("test-suspend").await.unwrap();
702
703        client.suspend("test-suspend").await.unwrap();
704        client.resume("test-suspend").await.unwrap();
705    }
706
707    #[serial]
708    #[tokio::test]
709    async fn test_suspend_all_and_resume_all() {
710        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
711            .await
712            .unwrap();
713
714        client.suspend_all().await.unwrap();
715        client.resume_all().await.unwrap();
716    }
717
718    #[tokio::test]
719    async fn test_quit() {
720        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
721            .await
722            .unwrap();
723
724        client.quit().await.unwrap();
725
726        // check that the connection is closed
727        let result = client.ping().await;
728        assert!(result.is_err());
729    }
730
731    #[tokio::test]
732    async fn test_batch() {
733        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
734            .await
735            .unwrap();
736
737        create_simple_rrd(&mut client, "test-batch-1".to_string()).await;
738        create_simple_rrd(&mut client, "test-batch-2".to_string()).await;
739
740        let commands = vec![
741            BatchUpdate::new("test-batch-1", None, vec![1.0]).unwrap(),
742            BatchUpdate::new("test-batch-2", None, vec![2.0]).unwrap(),
743        ];
744        client.batch(commands).await.unwrap();
745
746        // Let's do the errors, it will fail
747        // because the time is the same
748        let commands = vec![
749            BatchUpdate::new("test-batch-1", None, vec![3.0]).unwrap(),
750            BatchUpdate::new("test-batch-2", None, vec![4.0]).unwrap(),
751        ];
752        let result = client.batch(commands).await;
753        assert!(result.is_err());
754    }
755
756    #[serial]
757    #[tokio::test]
758    async fn test_fetch() {
759        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
760            .await
761            .unwrap();
762
763        client
764            .create(CreateArguments {
765                path: "test-fetch".to_string(),
766                data_sources: vec![
767                    CreateDataSource {
768                        name: "ds1".to_string(),
769                        minimum: None,
770                        maximum: None,
771                        heartbeat: 10,
772                        serie_type: CreateDataSourceType::Gauge,
773                    },
774                    CreateDataSource {
775                        name: "ds2".to_string(),
776                        minimum: Some(0.0),
777                        maximum: Some(100.0),
778                        heartbeat: 10,
779                        serie_type: CreateDataSourceType::Gauge,
780                    },
781                ],
782                round_robin_archives: vec![
783                    CreateRoundRobinArchive {
784                        consolidation_function: ConsolidationFunction::Average,
785                        xfiles_factor: 0.5,
786                        steps: 1,
787                        rows: 10,
788                    },
789                    CreateRoundRobinArchive {
790                        consolidation_function: ConsolidationFunction::Average,
791                        xfiles_factor: 0.5,
792                        steps: 10,
793                        rows: 10,
794                    },
795                ],
796                start_timestamp: 1609459200,
797                step_seconds: 1,
798            })
799            .await
800            .unwrap();
801
802        let result = client
803            .fetch(
804                "test-fetch",
805                ConsolidationFunction::Average,
806                None,
807                None,
808                None,
809            )
810            .await
811            .unwrap();
812
813        assert_eq!(result.flush_version, 1);
814        assert!(result.start > 0);
815        assert!(result.end > 0);
816        assert_eq!(result.step, 1);
817        assert_eq!(result.ds_count, 2);
818        assert_eq!(result.ds_names, vec!["ds1".to_string(), "ds2".to_string()]);
819        assert!(!result.data.is_empty());
820
821        // Test the errors in parameters
822        let result = client
823            .fetch(
824                "test-fetch",
825                ConsolidationFunction::Average,
826                None,
827                Some(1609459200),
828                None,
829            )
830            .await;
831        assert!(result.is_err());
832
833        let result = client
834            .fetch(
835                "test-fetch",
836                ConsolidationFunction::Average,
837                Some(1609459200),
838                None,
839                Some(vec!["ds1".to_string(), "ds2".to_string()]),
840            )
841            .await;
842        assert!(result.is_err());
843
844        let now_timestamp = now_timestamp().unwrap();
845        let result = client
846            .fetch(
847                "test-fetch",
848                ConsolidationFunction::Average,
849                Some(now_timestamp as i64 - 10),
850                Some(now_timestamp as i64),
851                Some(vec!["not-found".to_string()]),
852            )
853            .await;
854
855        assert!(result.is_err());
856        let result = client
857            .fetch(
858                "test-fetch",
859                ConsolidationFunction::Average,
860                Some(now_timestamp as i64 - 10),
861                Some(now_timestamp as i64),
862                Some(vec!["ds2".to_string()]),
863            )
864            .await
865            .unwrap();
866        assert_eq!(result.ds_count, 1);
867
868        // Relative timestamp
869        let result = client
870            .fetch(
871                "test-fetch",
872                ConsolidationFunction::Average,
873                Some(-10),
874                Some(0),
875                Some(vec!["ds2".to_string()]),
876            )
877            .await
878            .unwrap();
879        assert_eq!(result.ds_count, 1);
880    }
881}