Skip to main content

rrdcached_client/
client.rs

1use crate::batch_update::BatchUpdate;
2use crate::consolidation_function::ConsolidationFunction;
3use crate::create::*;
4use crate::errors::RRDCachedClientError;
5use crate::fetch::FetchResponse;
6use crate::parsers::*;
7use crate::sanitisation::check_rrd_path;
8use std::collections::HashMap;
9use tokio::io::AsyncBufReadExt;
10use tokio::io::AsyncWriteExt;
11use tokio::net::UnixStream;
12use tokio::{io::BufReader, net::TcpStream};
13
14/// A client to interact with a RRDCached server.
15#[derive(Debug)]
16pub struct RRDCachedClient<T = TcpStream> {
17    stream: BufReader<T>,
18}
19
20impl RRDCachedClient<TcpStream> {
21    /// Connect to a RRDCached server over TCP.
22    pub async fn connect_tcp(addr: &str) -> Result<Self, RRDCachedClientError> {
23        let stream = TcpStream::connect(addr).await?;
24        let stream = BufReader::new(stream);
25        Ok(Self { stream })
26    }
27}
28
29impl RRDCachedClient<UnixStream> {
30    /// Connect to a RRDCached server over a Unix socket.
31    pub async fn connect_unix(addr: &str) -> Result<Self, RRDCachedClientError> {
32        let stream = UnixStream::connect(addr).await?;
33        let stream = BufReader::new(stream);
34        Ok(Self { stream })
35    }
36}
37
38impl<T> RRDCachedClient<T>
39where
40    T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
41{
42    fn assert_response_code(&self, code: i64, message: &str) -> Result<(), RRDCachedClientError> {
43        if code < 0 {
44            Err(RRDCachedClientError::UnexpectedResponse(
45                code,
46                message.to_string(),
47            ))
48        } else {
49            Ok(())
50        }
51    }
52
53    async fn read_line(&mut self) -> Result<String, RRDCachedClientError> {
54        let mut line = String::new();
55        self.stream.read_line(&mut line).await?;
56        Ok(line)
57    }
58
59    async fn read_n_lines(&mut self, n: usize) -> Result<Vec<String>, RRDCachedClientError> {
60        let mut lines = Vec::with_capacity(n);
61        for _ in 0..n {
62            lines.push(self.read_line().await?);
63        }
64        Ok(lines)
65    }
66
67    async fn send_command(
68        &mut self,
69        command: &str,
70    ) -> Result<(usize, String), RRDCachedClientError> {
71        // Send command
72        self.stream.write_all(command.as_bytes()).await?;
73        // Read response
74        let response_line = self.read_line().await?;
75        let (code, message) = parse_response_line(&response_line)?;
76        self.assert_response_code(code, message)?;
77        let nb_lines = usize::try_from(code).map_err(|_| {
78            RRDCachedClientError::UnexpectedResponse(code, "invalid number of lines".to_string())
79        })?;
80        Ok((nb_lines, message.to_string()))
81    }
82
83    /// Retreive documentation for humans.
84    ///
85    /// Can take an optional command to get help for a specific command.
86    pub async fn help(
87        &mut self,
88        command: Option<&str>,
89    ) -> Result<(String, Vec<String>), RRDCachedClientError> {
90        let command = match command {
91            Some(command) => {
92                let mut a = String::with_capacity(6 + command.len());
93                a.push_str("HELP ");
94                a.push_str(command);
95                a.push('\n');
96                a
97            }
98            None => "HELP\n".to_string(),
99        };
100        let (nb_lines, header) = self.send_command(&command).await?;
101        let lines = self.read_n_lines(nb_lines).await?;
102
103        Ok((header, lines))
104    }
105
106    /// Ping the server to check if it's alive.
107    pub async fn ping(&mut self) -> Result<(), RRDCachedClientError> {
108        let (_, message) = self.send_command("PING\n").await?;
109        if message != "PONG" {
110            return Err(RRDCachedClientError::UnexpectedResponse(
111                0,
112                message.to_string(),
113            ));
114        }
115        Ok(())
116    }
117
118    /// Create a new RRD
119    pub async fn create(&mut self, arguments: CreateArguments) -> Result<(), RRDCachedClientError> {
120        let arguments_str = arguments.to_str();
121        let mut command = String::with_capacity(7 + arguments_str.len());
122        command.push_str("CREATE ");
123        command.push_str(&arguments_str);
124        command.push('\n');
125        let (_, message) = self.send_command(&command).await?;
126        if message != "RRD created OK" {
127            return Err(RRDCachedClientError::UnexpectedResponse(
128                0,
129                message.to_string(),
130            ));
131        }
132        Ok(())
133    }
134
135    /// Flush a RRD
136    pub async fn flush(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
137        check_rrd_path(path)?;
138        let mut command = String::with_capacity(6 + path.len() + 5);
139        command.push_str("FLUSH ");
140        command.push_str(path);
141        command.push_str(".rrd\n");
142        let _ = self.send_command(&command).await?;
143        Ok(())
144    }
145
146    /// Flush all RRDs
147    pub async fn flush_all(&mut self) -> Result<(), RRDCachedClientError> {
148        let _ = self.send_command("FLUSHALL\n").await?;
149        Ok(())
150    }
151
152    /// Pending updates
153    pub async fn pending(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
154        check_rrd_path(path)?;
155        let mut command = String::with_capacity(7 + path.len() + 5);
156        command.push_str("PENDING ");
157        command.push_str(path);
158        command.push_str(".rrd\n");
159        let (nb_lines, _) = self.send_command(&command).await?;
160        if nb_lines > 0 {
161            let lines = self.read_n_lines(nb_lines).await?;
162            Ok(lines)
163        } else {
164            Ok(vec![])
165        }
166    }
167
168    /// Forget pending updates
169    pub async fn forget(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
170        check_rrd_path(path)?;
171        let mut command = String::with_capacity(7 + path.len() + 5);
172        command.push_str("FORGET ");
173        command.push_str(path);
174        command.push_str(".rrd\n");
175        let _ = self.send_command(&command).await?;
176        Ok(())
177    }
178
179    /// Get the queue information
180    pub async fn queue(&mut self) -> Result<Vec<(String, usize)>, RRDCachedClientError> {
181        let (nb_lines, _message) = self.send_command("QUEUE\n").await?;
182        let nb_lines = self.read_n_lines(nb_lines).await?;
183        let parsed_lines = nb_lines
184            .iter()
185            .map(|line| {
186                let (path, pending) = parse_queue_line(line)?;
187                Ok((path.to_string(), pending))
188            })
189            .collect::<Result<Vec<(String, usize)>, RRDCachedClientError>>()?;
190        Ok(parsed_lines)
191    }
192
193    /// Get the server stats
194    pub async fn stats(&mut self) -> Result<HashMap<String, i64>, RRDCachedClientError> {
195        let (nb_lines, _message) = self.send_command("STATS\n").await?;
196        let lines = self.read_n_lines(nb_lines).await?;
197        let parsed_lines = lines
198            .iter()
199            .map(|line| {
200                let (name, value) = parse_stats_line(line)?;
201                Ok((name.to_string(), value))
202            })
203            .collect::<Result<HashMap<String, i64>, RRDCachedClientError>>()?;
204        Ok(parsed_lines)
205    }
206
207    /// Get the first CDP (whatever that is)
208    pub async fn first(
209        &mut self,
210        path: &str,
211        round_robin_archive: Option<usize>,
212    ) -> Result<usize, RRDCachedClientError> {
213        check_rrd_path(path)?;
214        let round_robin_archive = round_robin_archive.unwrap_or(0);
215        let rranum_str = round_robin_archive.to_string();
216
217        let mut command = String::with_capacity(6 + path.len() + 5 + rranum_str.len() + 1);
218        command.push_str("FIRST ");
219        command.push_str(path);
220        command.push_str(".rrd ");
221        command.push_str(&rranum_str);
222        command.push('\n');
223        let (_, message) = self.send_command(&command).await?;
224        let timestamp = parse_timestamp(&message)?;
225        Ok(timestamp)
226    }
227
228    /// Retrieve the last update timestamp
229    pub async fn last(&mut self, path: &str) -> Result<usize, RRDCachedClientError> {
230        check_rrd_path(path)?;
231
232        let mut command = String::with_capacity(5 + path.len() + 5);
233        command.push_str("LAST ");
234        command.push_str(path);
235        command.push_str(".rrd\n");
236        let (_, message) = self.send_command(&command).await?;
237        let timestamp = parse_timestamp(&message)?;
238        Ok(timestamp)
239    }
240
241    /// Retreive information about a RRD
242    pub async fn info(&mut self, path: &str) -> Result<Vec<String>, RRDCachedClientError> {
243        check_rrd_path(path)?;
244        let mut command = String::with_capacity(5 + path.len() + 5);
245        command.push_str("INFO ");
246        command.push_str(path);
247        command.push_str(".rrd\n");
248        let (nb_lines, _message) = self.send_command(&command).await?;
249        let lines = self.read_n_lines(nb_lines).await?;
250        Ok(lines)
251    }
252
253    /// List RRDs
254    pub async fn list(
255        &mut self,
256        recursive: bool,
257        path: Option<&str>,
258    ) -> Result<Vec<String>, RRDCachedClientError> {
259        let path = path.unwrap_or("/");
260        let mut command =
261            String::with_capacity(5 + path.len() + 1 + (if recursive { 10 } else { 0 }));
262        command.push_str("LIST ");
263        if recursive {
264            command.push_str("RECURSIVE ");
265        }
266        command.push_str(path);
267        command.push('\n');
268        let (nb_lines, _message) = self.send_command(&command).await?;
269        let lines = self.read_n_lines(nb_lines).await?;
270        Ok(lines)
271    }
272
273    /// Suspend a RRD
274    pub async fn suspend(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
275        check_rrd_path(path)?;
276        let mut command = String::with_capacity(8 + path.len() + 5);
277        command.push_str("SUSPEND ");
278        command.push_str(path);
279        command.push_str(".rrd\n");
280        let _ = self.send_command(&command).await?;
281        Ok(())
282    }
283
284    /// Resume a RRD
285    pub async fn resume(&mut self, path: &str) -> Result<(), RRDCachedClientError> {
286        check_rrd_path(path)?;
287        let mut command = String::with_capacity(7 + path.len() + 5);
288        command.push_str("RESUME ");
289        command.push_str(path);
290        command.push_str(".rrd\n");
291        let _ = self.send_command(&command).await?;
292        Ok(())
293    }
294
295    /// Suspend all RRDs
296    pub async fn suspend_all(&mut self) -> Result<(), RRDCachedClientError> {
297        let _ = self.send_command("SUSPENDALL\n").await?;
298        Ok(())
299    }
300
301    /// Resume all RRDs
302    pub async fn resume_all(&mut self) -> Result<(), RRDCachedClientError> {
303        let _ = self.send_command("RESUMEALL\n").await?;
304        Ok(())
305    }
306
307    /// Close the connection to the server
308    pub async fn quit(&mut self) -> Result<(), RRDCachedClientError> {
309        // Send directly without checking the response
310        self.stream.write_all("QUIT\n".as_bytes()).await?;
311        Ok(())
312    }
313
314    /// Update a RRD with a list of values at a specific timestamp
315    ///
316    /// The order is important as it must match the order of the data sources in the RRD
317    pub async fn update(
318        &mut self,
319        path: &str,
320        timestamp: Option<usize>,
321        data: Vec<f64>,
322    ) -> Result<(), RRDCachedClientError> {
323        let command = BatchUpdate::new(path, timestamp, data)?;
324        let command_str = command.to_command_string()?;
325        let _ = self.send_command(&command_str).await?;
326        Ok(())
327    }
328
329    /// Update a RRD with a single value at a specific timestamp.
330    ///
331    /// Convenient helper when a RRD contains only one data source.
332    pub async fn update_one(
333        &mut self,
334        path: &str,
335        timestamp: Option<usize>,
336        data: f64,
337    ) -> Result<(), RRDCachedClientError> {
338        self.update(path, timestamp, vec![data]).await
339    }
340
341    /// Batch updates.
342    ///
343    /// RRDCached presents this as a more efficient way to update multiple RRDs at once.
344    /// You may want to sort the updates by timestamp ascending as RDDtool will
345    /// reject updates of older timestamps.
346    pub async fn batch(&mut self, commands: Vec<BatchUpdate>) -> Result<(), RRDCachedClientError> {
347        let _ = self.send_command("BATCH\n").await?;
348        for command in commands {
349            let command_str = command.to_command_string()?;
350            // write the command directly
351            self.stream.write_all(command_str.as_bytes()).await?;
352        }
353        // Send a dot to end the batch
354        let (nb_lines, message) = self.send_command(".\n").await?;
355
356        // It returns errors line by line if there are any
357        if nb_lines > 0 {
358            let lines = self.read_n_lines(nb_lines).await?;
359            return Err(RRDCachedClientError::BatchUpdateErrorResponse(
360                message, lines,
361            ));
362        }
363        Ok(())
364    }
365
366    /// Fetch the content of a Round Robin Database (RRD)
367    ///
368    /// Note that we use the Ascii protocol, as the binary protocol is not documented
369    /// and it's unsure whether it's consitent between versions of RRDCached or
370    /// system architectures.
371    ///
372    ///
373    pub async fn fetch(
374        &mut self,
375        path: &str,
376        consolidation_function: ConsolidationFunction,
377        start: Option<i64>,
378        end: Option<i64>,
379        columns: Option<Vec<String>>,
380    ) -> Result<FetchResponse, RRDCachedClientError> {
381        check_rrd_path(path)?;
382        let consolidation_function_str = consolidation_function.to_str();
383        // FETCH path.rrd CF [--start start] [--end end] [--columns columns]
384        let mut capacity = 6 + path.len() + 5 + consolidation_function_str.len() + 1;
385        let mut start_str: Option<String> = None;
386        let mut end_str: Option<String> = None;
387        let mut columns_str: Option<String> = None;
388        match start {
389            Some(start) => {
390                let start_string = start.to_string();
391                capacity += 1 + start_string.len();
392                start_str = Some(start_string);
393
394                if let Some(end) = end {
395                    let end_string = end.to_string();
396                    capacity += 1 + end_string.len();
397                    end_str = Some(end_string);
398                    if let Some(columns) = columns {
399                        let columns_string = columns.join(" ");
400                        capacity += 1 + columns_string.len();
401                        columns_str = Some(columns_string);
402                    }
403                } else if columns.is_some() {
404                    return Err(RRDCachedClientError::InvalidFetch(
405                        "end must be specified".to_string(),
406                    ));
407                }
408            }
409            None => {
410                if end.is_some() || columns.is_some() {
411                    return Err(RRDCachedClientError::InvalidFetch(
412                        "start must be specified".to_string(),
413                    ));
414                }
415            }
416        }
417        let mut command = String::with_capacity(capacity);
418        command.push_str("FETCH ");
419        command.push_str(path);
420        command.push_str(".rrd ");
421        command.push_str(consolidation_function_str);
422        if let Some(start_str) = start_str {
423            command.push(' ');
424            command.push_str(&start_str);
425            if let Some(end_str) = end_str {
426                command.push(' ');
427                command.push_str(&end_str);
428                if let Some(columns_str) = columns_str {
429                    command.push(' ');
430                    command.push_str(&columns_str);
431                }
432            }
433        }
434        command.push('\n');
435        debug_assert_eq!(command.len(), capacity);
436
437        let (nb_lines, _message) = self.send_command(&command).await?;
438        let lines = self.read_n_lines(nb_lines).await?;
439
440        let response = FetchResponse::from_lines(lines)?;
441
442        Ok(response)
443    }
444}
445
446#[cfg(test)]
447mod tests {
448    use crate::now::now_timestamp;
449
450    use super::*;
451    use serial_test::serial;
452
453    #[tokio::test]
454    async fn test_ping_tcp() {
455        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
456            .await
457            .unwrap();
458        client.ping().await.unwrap();
459    }
460
461    #[tokio::test]
462    async fn test_ping_unix() {
463        let mut client = RRDCachedClient::connect_unix("./rrdcached.sock")
464            .await
465            .unwrap();
466        client.ping().await.unwrap();
467    }
468
469    #[tokio::test]
470    async fn test_help() {
471        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
472            .await
473            .unwrap();
474
475        let (header, lines) = client.help(None).await.unwrap();
476        assert_eq!(header, "Command overview");
477        assert_eq!(lines.len(), 22);
478
479        let (header, lines) = client.help(Some("PING")).await.unwrap();
480        assert_eq!(header, "Help for PING");
481        assert!(!lines.is_empty());
482    }
483
484    async fn create_simple_rrd(client: &mut RRDCachedClient<TcpStream>, name: String) {
485        client
486            .create(CreateArguments {
487                path: name,
488                data_sources: vec![CreateDataSource {
489                    name: "ds1".to_string(),
490                    minimum: None,
491                    maximum: None,
492                    heartbeat: 10,
493                    serie_type: CreateDataSourceType::Gauge,
494                }],
495                round_robin_archives: vec![CreateRoundRobinArchive {
496                    consolidation_function: ConsolidationFunction::Average,
497                    xfiles_factor: 0.5,
498                    steps: 1,
499                    rows: 100,
500                }],
501                start_timestamp: 1609459200,
502                step_seconds: 1,
503            })
504            .await
505            .unwrap();
506    }
507
508    #[tokio::test]
509    async fn test_create() {
510        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
511            .await
512            .unwrap();
513
514        client
515            .create(CreateArguments {
516                path: "test-create".to_string(),
517                data_sources: vec![
518                    CreateDataSource {
519                        name: "ds1".to_string(),
520                        minimum: None,
521                        maximum: None,
522                        heartbeat: 10,
523                        serie_type: CreateDataSourceType::Gauge,
524                    },
525                    CreateDataSource {
526                        name: "ds2".to_string(),
527                        minimum: Some(0.0),
528                        maximum: Some(100.0),
529                        heartbeat: 10,
530                        serie_type: CreateDataSourceType::Gauge,
531                    },
532                ],
533                round_robin_archives: vec![
534                    CreateRoundRobinArchive {
535                        consolidation_function: ConsolidationFunction::Average,
536                        xfiles_factor: 0.5,
537                        steps: 1,
538                        rows: 10,
539                    },
540                    CreateRoundRobinArchive {
541                        consolidation_function: ConsolidationFunction::Average,
542                        xfiles_factor: 0.5,
543                        steps: 10,
544                        rows: 10,
545                    },
546                ],
547                start_timestamp: 1609459200,
548                step_seconds: 1,
549            })
550            .await
551            .unwrap();
552    }
553
554    #[tokio::test]
555    async fn test_update() {
556        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
557            .await
558            .unwrap();
559
560        create_simple_rrd(&mut client, "test-update".to_string()).await;
561
562        client.update_one("test-update", None, 4.2).await.unwrap();
563    }
564
565    #[tokio::test]
566    async fn test_error() {
567        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
568            .await
569            .unwrap();
570
571        let result = client.list(false, Some("not-found-path")).await;
572        assert!(result.is_err());
573    }
574
575    #[tokio::test]
576    async fn test_flush() {
577        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
578            .await
579            .unwrap();
580
581        create_simple_rrd(&mut client, "test-flush".to_string()).await;
582
583        client.update_one("test-flush", None, 4.2).await.unwrap();
584        client.flush("test-flush").await.unwrap();
585    }
586
587    #[tokio::test]
588    async fn test_flush_all() {
589        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
590            .await
591            .unwrap();
592
593        client.flush_all().await.unwrap();
594    }
595
596    #[serial]
597    #[tokio::test]
598    async fn test_pending() {
599        // Wait 0.1s
600        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
601
602        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
603            .await
604            .unwrap();
605
606        create_simple_rrd(&mut client, "test-pending".to_string()).await;
607
608        client.update_one("test-pending", None, 4.2).await.unwrap();
609
610        let lines = client.pending("test-pending").await.unwrap();
611        assert_eq!(lines.len(), 1);
612
613        // Flush
614        client.flush("test-pending").await.unwrap();
615        // Wait 0.1s
616        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
617
618        let lines = client.pending("test-pending").await.unwrap();
619        assert!(lines.is_empty());
620    }
621
622    #[tokio::test]
623    async fn test_forget() {
624        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
625            .await
626            .unwrap();
627
628        create_simple_rrd(&mut client, "test-forget".to_string()).await;
629
630        client.update_one("test-forget", None, 4.2).await.unwrap();
631        client.forget("test-forget").await.unwrap();
632    }
633
634    #[tokio::test]
635    async fn test_queue() {
636        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
637            .await
638            .unwrap();
639
640        let lines = client.queue().await.unwrap();
641
642        // I didn't manage to get a non-empty queue...
643        assert!(lines.is_empty());
644    }
645
646    #[tokio::test]
647    async fn test_stats() {
648        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
649            .await
650            .unwrap();
651
652        let stats = client.stats().await.unwrap();
653        assert!(!stats.is_empty());
654    }
655
656    #[tokio::test]
657    async fn test_first_and_last() {
658        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
659            .await
660            .unwrap();
661
662        create_simple_rrd(&mut client, "test-first".to_string()).await;
663
664        // This can fail for subsequent runs
665        let _ = client.update_one("test-first", Some(1612345678), 4.2).await;
666
667        let timestamp = client.first("test-first", None).await.unwrap();
668        assert_eq!(timestamp, 1609459101); // I'm guessing some alignment is happening
669
670        let timestamp = client.last("test-first").await.unwrap();
671        assert_eq!(timestamp, 1612345678);
672    }
673
674    #[tokio::test]
675    async fn test_info() {
676        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
677            .await
678            .unwrap();
679
680        create_simple_rrd(&mut client, "test-info".to_string()).await;
681        let lines = client.info("test-info").await.unwrap();
682        assert!(!lines.is_empty());
683    }
684
685    #[tokio::test]
686    async fn test_list() {
687        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
688            .await
689            .unwrap();
690
691        let lines = client.list(true, None).await.unwrap();
692        assert!(!lines.is_empty());
693    }
694
695    #[serial]
696    #[tokio::test]
697    async fn test_suspend_and_resume() {
698        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
699            .await
700            .unwrap();
701
702        create_simple_rrd(&mut client, "test-suspend".to_string()).await;
703
704        // insert some data and flush
705        client.update_one("test-suspend", None, 4.2).await.unwrap();
706        client.flush("test-suspend").await.unwrap();
707
708        client.suspend("test-suspend").await.unwrap();
709        client.resume("test-suspend").await.unwrap();
710    }
711
712    #[serial]
713    #[tokio::test]
714    async fn test_suspend_all_and_resume_all() {
715        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
716            .await
717            .unwrap();
718
719        client.suspend_all().await.unwrap();
720        client.resume_all().await.unwrap();
721    }
722
723    #[tokio::test]
724    async fn test_quit() {
725        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
726            .await
727            .unwrap();
728
729        client.quit().await.unwrap();
730
731        // check that the connection is closed
732        let result = client.ping().await;
733        assert!(result.is_err());
734    }
735
736    #[tokio::test]
737    async fn test_batch() {
738        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
739            .await
740            .unwrap();
741
742        create_simple_rrd(&mut client, "test-batch-1".to_string()).await;
743        create_simple_rrd(&mut client, "test-batch-2".to_string()).await;
744
745        let timestamp = now_timestamp().unwrap();
746
747        let commands = vec![
748            BatchUpdate::new("test-batch-1", Some(timestamp), vec![1.0]).unwrap(),
749            BatchUpdate::new("test-batch-2", Some(timestamp), vec![2.0]).unwrap(),
750        ];
751        client.batch(commands).await.unwrap();
752
753        // Reuse the same timestamp so the second update is rejected deterministically.
754        let commands = vec![
755            BatchUpdate::new("test-batch-1", Some(timestamp), vec![3.0]).unwrap(),
756            BatchUpdate::new("test-batch-2", Some(timestamp), vec![4.0]).unwrap(),
757        ];
758        let result = client.batch(commands).await;
759        assert!(result.is_err());
760    }
761
762    #[serial]
763    #[tokio::test]
764    async fn test_fetch() {
765        let mut client = RRDCachedClient::connect_tcp("localhost:42217")
766            .await
767            .unwrap();
768
769        client
770            .create(CreateArguments {
771                path: "test-fetch".to_string(),
772                data_sources: vec![
773                    CreateDataSource {
774                        name: "ds1".to_string(),
775                        minimum: None,
776                        maximum: None,
777                        heartbeat: 10,
778                        serie_type: CreateDataSourceType::Gauge,
779                    },
780                    CreateDataSource {
781                        name: "ds2".to_string(),
782                        minimum: Some(0.0),
783                        maximum: Some(100.0),
784                        heartbeat: 10,
785                        serie_type: CreateDataSourceType::Gauge,
786                    },
787                ],
788                round_robin_archives: vec![
789                    CreateRoundRobinArchive {
790                        consolidation_function: ConsolidationFunction::Average,
791                        xfiles_factor: 0.5,
792                        steps: 1,
793                        rows: 10,
794                    },
795                    CreateRoundRobinArchive {
796                        consolidation_function: ConsolidationFunction::Average,
797                        xfiles_factor: 0.5,
798                        steps: 10,
799                        rows: 10,
800                    },
801                ],
802                start_timestamp: 1609459200,
803                step_seconds: 1,
804            })
805            .await
806            .unwrap();
807
808        let result = client
809            .fetch(
810                "test-fetch",
811                ConsolidationFunction::Average,
812                None,
813                None,
814                None,
815            )
816            .await
817            .unwrap();
818
819        assert_eq!(result.flush_version, 1);
820        assert!(result.start > 0);
821        assert!(result.end > 0);
822        assert_eq!(result.step, 1);
823        assert_eq!(result.ds_count, 2);
824        assert_eq!(result.ds_names, vec!["ds1".to_string(), "ds2".to_string()]);
825        assert!(!result.data.is_empty());
826
827        // Test the errors in parameters
828        let result = client
829            .fetch(
830                "test-fetch",
831                ConsolidationFunction::Average,
832                None,
833                Some(1609459200),
834                None,
835            )
836            .await;
837        assert!(result.is_err());
838
839        let result = client
840            .fetch(
841                "test-fetch",
842                ConsolidationFunction::Average,
843                Some(1609459200),
844                None,
845                Some(vec!["ds1".to_string(), "ds2".to_string()]),
846            )
847            .await;
848        assert!(result.is_err());
849
850        let now_timestamp = now_timestamp().unwrap();
851        let result = client
852            .fetch(
853                "test-fetch",
854                ConsolidationFunction::Average,
855                Some(now_timestamp as i64 - 10),
856                Some(now_timestamp as i64),
857                Some(vec!["not-found".to_string()]),
858            )
859            .await;
860
861        assert!(result.is_err());
862        let result = client
863            .fetch(
864                "test-fetch",
865                ConsolidationFunction::Average,
866                Some(now_timestamp as i64 - 10),
867                Some(now_timestamp as i64),
868                Some(vec!["ds2".to_string()]),
869            )
870            .await
871            .unwrap();
872        assert_eq!(result.ds_count, 1);
873
874        // Relative timestamp
875        let result = client
876            .fetch(
877                "test-fetch",
878                ConsolidationFunction::Average,
879                Some(-10),
880                Some(0),
881                Some(vec!["ds2".to_string()]),
882            )
883            .await
884            .unwrap();
885        assert_eq!(result.ds_count, 1);
886    }
887}