1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::iter::zip;

use crate::{
    resp::{Array, Command, FromValue, ResultValueExt, Value},
    BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
    HashCommands, HyperLogLogCommands, InnerClient, ListCommands, PreparedCommand, Result,
    ScriptingCommands, ServerCommands, SetCommands, SortedSetCommands, StreamCommands,
    StringCommands,
};

pub struct Pipeline {
    client: InnerClient,
    commands: Vec<Command>,
    forget_flags: Vec<bool>,
}

impl Pipeline {
    pub(crate) fn new(client: InnerClient) -> Pipeline {
        Pipeline {
            client,
            commands: Vec::new(),
            forget_flags: Vec::new(),
        }
    }

    /// Queue a command
    pub fn queue(&mut self, command: Command) {
        self.commands.push(command);
        self.forget_flags.push(false);
    }

    /// Queue a command and forget its response
    pub fn forget(&mut self, command: Command) {
        self.commands.push(command);
        self.forget_flags.push(true);
    }

    pub async fn execute<T: FromValue>(mut self) -> Result<T> {
        let result = self.client.send_batch(self.commands).await?;

        match result {
            Value::Array(Array::Vec(results)) => {
                let mut filtered_results = zip(results, self.forget_flags.iter())
                    .filter_map(
                        |(value, forget_flag)| if *forget_flag { None } else { Some(value) },
                    )
                    .collect::<Vec<_>>();

                if filtered_results.len() == 1 {
                    let value = filtered_results.pop().unwrap();
                    Ok(value).into_result()?.into()
                } else {
                    Value::Array(Array::Vec(filtered_results)).into()
                }
            }
            _ => Ok(result).into_result()?.into(),
        }
    }
}

pub trait PipelinePreparedCommand<'a, R>
where
    R: FromValue,
{
    /// Queue a command.
    fn queue(self);

    /// Queue a command and forget its response.
    fn forget(self);
}

impl<'a, R> PipelinePreparedCommand<'a, R> for PreparedCommand<'a, Pipeline, R>
where
    R: FromValue + Send + 'a,
{
    /// Queue a command.
    fn queue(self) {
        self.executor.queue(self.command)
    }

    /// Queue a command and forget its response.
    fn forget(self) {
        self.executor.forget(self.command)
    }
}

impl BitmapCommands for Pipeline {}
impl ClusterCommands for Pipeline {}
impl ConnectionCommands for Pipeline {}
impl GenericCommands for Pipeline {}
impl GeoCommands for Pipeline {}
impl HashCommands for Pipeline {}
impl HyperLogLogCommands for Pipeline {}
impl ListCommands for Pipeline {}
impl SetCommands for Pipeline {}
impl ScriptingCommands for Pipeline {}
impl ServerCommands for Pipeline {}
impl SortedSetCommands for Pipeline {}
impl StreamCommands for Pipeline {}
impl StringCommands for Pipeline {}