ferriskey 0.8.0

Rust client for Valkey, built for FlowFabric. Forked from glide-core (valkey-glide).
Documentation
#![macro_use]

use crate::cmd::{Cmd, cmd, cmd_len};
use crate::value::{
    ErrorKind, FromValue, HashSet, Result, ToArgs, Value, from_owned_value,
};
use std::sync::Arc;

/// Represents a valkey command pipeline.
#[derive(Clone, Debug)]
pub struct Pipeline {
    commands: Vec<Arc<Cmd>>,
    transaction_mode: bool,
    ignored_commands: HashSet<usize>,
}

/// A pipeline allows you to send multiple commands in one go to the
/// Valkey server.  API wise it's very similar to just using a command
/// but it allows multiple commands to be chained and some features such
/// as iteration are not available.
///
/// Basic example:
///
/// ```rust,no_run,ignore
/// # let client = ferriskey::Client::open("redis://127.0.0.1/").unwrap();
/// # let mut con = client.get_connection(None).unwrap();
/// let ((k1, k2),) : ((i32, i32),) = ferriskey::pipe()
///     .cmd("SET").arg("key_1").arg(42).ignore()
///     .cmd("SET").arg("key_2").arg(43).ignore()
///     .cmd("MGET").arg(&["key_1", "key_2"]).query(&mut con).unwrap();
/// ```
///
/// As you can see with `cmd` you can start a new command.  By default
/// each command produces a value but for some you can ignore them by
/// calling `ignore` on the command.  That way it will be skipped in the
/// return value which is useful for `SET` commands and others, which
/// do not have a useful return value.
impl Pipeline {
    /// Creates an empty pipeline.  For consistency with the `cmd`
    /// api a `pipe` function is provided as alias.
    pub fn new() -> Pipeline {
        Self::with_capacity(0)
    }

    /// Creates an empty pipeline with pre-allocated capacity.
    pub(crate) fn with_capacity(capacity: usize) -> Pipeline {
        Pipeline {
            commands: Vec::with_capacity(capacity),
            transaction_mode: false,
            ignored_commands: HashSet::new(),
        }
    }

    /// This enables atomic mode.  In atomic mode the whole pipeline is
    /// enclosed in `MULTI`/`EXEC`.  From the user's point of view nothing
    /// changes however.  This is easier than using `MULTI`/`EXEC` yourself
    /// as the format does not change.
    ///
    /// ```rust,ignore
    /// # let client = ferriskey::Client::open("redis://127.0.0.1/").unwrap();
    /// # let mut con = client.get_connection(None).unwrap();
    /// let (k1, k2) : (i32, i32) = ferriskey::pipe()
    ///     .atomic()
    ///     .cmd("GET").arg("key_1")
    ///     .cmd("GET").arg("key_2").query(&mut con).unwrap();
    /// ```
    #[inline]
    pub fn atomic(&mut self) -> &mut Pipeline {
        self.transaction_mode = true;
        self
    }

    /// Returns the encoded pipeline commands.
    pub(crate) fn get_packed_pipeline(&self) -> bytes::Bytes {
        encode_pipeline(&self.commands, self.transaction_mode).into()
    }

    async fn execute_pipelined_async<C>(&self, con: &mut C) -> Result<Value>
    where
        C: crate::connection::ConnectionLike,
    {
        let value = con
            .req_packed_commands(self, 0, self.commands.len(), None)
            .await?;
        self.make_pipeline_results(value)
    }

    async fn execute_transaction_async<C>(&self, con: &mut C) -> Result<Value>
    where
        C: crate::connection::ConnectionLike,
    {
        let mut resp = con
            .req_packed_commands(self, self.commands.len() + 1, 1, None)
            .await?;
        match resp.pop() {
            Some(Ok(Value::Nil)) => Ok(Value::Nil),
            Some(Ok(Value::Array(items))) => Ok(self.make_pipeline_results(items)?),
            Some(Err(e)) => Err(e),
            _ => Err((
                ErrorKind::ResponseError,
                "Invalid response when parsing multi response",
            )
                .into()),
        }
    }

    /// Executes the pipeline and fetches the return values.
    #[inline]
    pub async fn query_async<C, T: FromValue>(&self, con: &mut C) -> Result<T>
    where
        C: crate::connection::ConnectionLike,
    {
        let v = if self.commands.is_empty() {
            return from_owned_value(Value::Array(vec![]));
        } else if self.transaction_mode {
            self.execute_transaction_async(con).await?
        } else {
            self.execute_pipelined_async(con).await?
        };
        from_owned_value(v)
    }

    /// Returns whether the pipeline is in transaction mode (atomic).
    ///
    /// When in transaction mode, all commands in the pipeline are executed
    /// as a single atomic operation.
    pub(crate) fn is_atomic(&self) -> bool {
        self.transaction_mode
    }

    /// Returns the number of commands in the pipeline.
    pub(crate) fn len(&self) -> usize {
        self.commands.len()
    }

    /// Returns `true` if the pipeline contains no commands.
    pub(crate) fn is_empty(&self) -> bool {
        self.commands.is_empty()
    }

    /// Returns the command at the given index, or `None` if the index is out of bounds.
    pub(crate) fn get_command(&self, index: usize) -> Option<Arc<Cmd>> {
        self.commands.get(index).cloned()
    }
}

fn encode_pipeline(cmds: &[Arc<Cmd>], atomic: bool) -> Vec<u8> {
    let mut rv = vec![];
    write_pipeline(&mut rv, cmds, atomic);
    rv
}

fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Arc<Cmd>], atomic: bool) {
    let cmds_len = cmds.iter().map(cmd_len).sum();

    if atomic {
        let multi = cmd("MULTI");
        let exec = cmd("EXEC");
        rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);

        multi.write_packed_command_preallocated(rv);
        for cmd in cmds {
            cmd.write_packed_command_preallocated(rv);
        }
        exec.write_packed_command_preallocated(rv);
    } else {
        rv.reserve(cmds_len);

        for cmd in cmds {
            cmd.write_packed_command_preallocated(rv);
        }
    }
}

// Macro to implement shared methods between Pipeline and ClusterPipeline
macro_rules! implement_pipeline_commands {
    ($struct_name:ident) => {
        impl $struct_name {
            /// Adds a command to the cluster pipeline.
            #[inline]
            pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
                self.add_command_with_arc(cmd.into())
            }

            /// The provided command **must** be uniquely owned (i.e. not cloned or shared)
            /// at the time it is added, as the pipeline's internal API assumes unique ownership
            /// for later mutation via `get_last_command()`. If this invariant is violated,
            /// `get_last_command()` will panic.
            #[inline]
            pub(crate) fn add_command_with_arc(&mut self, cmd: Arc<Cmd>) -> &mut Self {
                self.commands.push(cmd);
                self
            }

            /// Starts a new command. Functions such as `arg` then become
            /// available to add more arguments to that command.
            #[inline]
            pub fn cmd(&mut self, name: &str) -> &mut Self {
                self.add_command(cmd(name))
            }

            /// Returns an iterator over all the commands currently in the pipeline.
            pub fn cmd_iter(&self) -> impl Iterator<Item = &Arc<Cmd>> {
                self.commands.iter()
            }

            /// Instructs the pipeline to ignore the return value of this command.
            /// It will still be ensured that it is not an error, but any successful
            /// result is just thrown away.  This makes result processing through
            /// tuples much easier because you do not need to handle all the items
            /// you do not care about.
            #[inline]
            pub fn ignore(&mut self) -> &mut Self {
                match self.commands.len() {
                    0 => true,
                    x => self.ignored_commands.insert(x - 1),
                };
                self
            }

            /// Adds an argument to the last started command. This works similar
            /// to the `arg` method of the `Cmd` object.
            ///
            /// Note that this function fails the task if executed on an empty pipeline.
            #[inline]
            pub fn arg<T: ToArgs>(&mut self, arg: T) -> &mut Self {
                {
                    let cmd = self.get_last_command();
                    cmd.arg(arg);
                }
                self
            }

            /// Clear a pipeline object's internal data structure.
            ///
            /// This allows reusing a pipeline object as a clear object while performing a minimal
            /// amount of memory released/reallocated.
            #[inline]
            pub fn clear(&mut self) {
                self.commands.clear();
                self.ignored_commands.clear();
            }

            #[inline]
            fn get_last_command(&mut self) -> &mut Cmd {
                let idx = self.commands.len().checked_sub(1)
                    .expect("No command on stack");
                Arc::get_mut(&mut self.commands[idx]).expect("Cannot modify the last command: multiple active references exist. Ensure the command is uniquely owned before mutating.")
            }

            fn make_pipeline_results(&self, resp: Vec<Result<Value>>) -> Result<Value> {
                let mut rv = Vec::with_capacity(resp.len().saturating_sub(self.ignored_commands.len()));
                for (idx, result) in resp.into_iter().enumerate() {
                    if !self.ignored_commands.contains(&idx) {
                        rv.push(result);
                    }
                }
                Ok(Value::Array(rv))
            }
        }

        impl Default for $struct_name {
            fn default() -> Self {
                Self::new()
            }
        }
    };
}

implement_pipeline_commands!(Pipeline);

#[derive(Debug, Clone, Copy, Default)]
/// Defines a retry strategy for pipeline requests, allowing control over retries in case of server or connection errors.
///
/// This strategy determines whether failed commands should be retried, which can impact execution order and potential side effects.
///
/// # Notes
/// - Retrying on **server errors** may lead to reordering of commands within the same slot.
/// - Retrying on **connection errors** is riskier, as it is unclear which commands have already succeeded. This can result in unintended behavior, such as executing an `INCR` command multiple times.
///
/// TODO: Add a link to the wiki with further details.
pub struct PipelineRetryStrategy {
    /// If `true`, failed commands with a retriable `RetryMethod` will be retried.
    ///
    /// # Effect
    /// - Commands may be reordered within the same slot during retries.
    pub retry_server_error: bool,
    /// If `true`, sub-pipeline requests will be retried in case of connection errors.
    ///
    /// # Caution
    /// - Since a connection error does not indicate which commands succeeded or failed, retrying may lead to duplicate executions.
    /// - This is particularly risky for non-idempotent commands like `INCR`, which modify state irreversibly.
    pub retry_connection_error: bool,
}

impl PipelineRetryStrategy {
    /// Creates a new `PipelineRetryStrategy` with the specified flags for retrying server and connection errors.
    pub fn new(retry_server_error: bool, retry_connection_error: bool) -> Self {
        Self {
            retry_server_error,
            retry_connection_error,
        }
    }
}