dynomite-engine 0.0.2

Embeddable Dynamo-style distributed replication engine: token-ring partitioning, gossip cluster, hinted handoff, anti-entropy, RediSearch FT.* surface.
Documentation
//! Request lifecycle helpers.
//!
//! Request handling splits into two layers: request-side data
//! manipulation (which fragments are done, which are in error,
//! what error to send) and connection-side plumbing (timeout
//! queues, recv/send done callbacks, peer forwarding).
//!
//! This module owns the data-side helpers; the connection-side
//! helpers land in Stage 9 once the connection FSM exists.

use crate::core::types::MsgId;

use super::message::Msg;
use super::queue::MsgQueue;

/// Mark `req` as in-error with `error_code` (a libc errno-shaped
/// value) and the matching `dyn_error_code`. The flag is set so the
/// response path can synthesise an error reply on the next pass.
///
/// Returns `true` when the message transitions from healthy to
/// error; subsequent calls are no-ops.
///
/// # Examples
///
/// ```
/// use dynomite::msg::{request, DynErrorCode, Msg, MsgType};
///
/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
/// assert!(request::set_error(&mut req, 13, DynErrorCode::PeerHostDown));
/// assert!(req.flags().is_error);
/// assert!(!request::set_error(&mut req, 13, DynErrorCode::PeerHostDown));
/// ```
pub fn set_error(req: &mut Msg, error_code: i32, dyn_error_code: super::DynErrorCode) -> bool {
    if req.flags().is_error {
        return false;
    }
    req.set_is_error(true);
    req.flags_mut().done = true;
    req.set_error_code(error_code);
    req.set_dyn_error_code(dyn_error_code);
    true
}

/// True when the request has been resolved end-to-end: a response
/// has been selected, all fragments are accounted for, and (for
/// fragment vectors) the parent fragment has finished aggregating.
///
/// The connection-coupled propagation pass that walks the client
/// queue and marks every sibling fragment lands in Stage 9; this
/// helper returns the data-shape answer for a single request.
///
/// # Examples
///
/// ```
/// use dynomite::msg::{request, Msg, MsgType};
///
/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
/// assert!(!request::is_done(&req));
/// req.set_selected_rsp(Some(2));
/// assert!(request::is_done(&req));
/// ```
#[must_use]
pub fn is_done(req: &Msg) -> bool {
    if req.selected_rsp().is_none() {
        return false;
    }
    if req.fragment_ids().is_empty() {
        return true;
    }
    req.flags().fdone
}

/// True when the request is in error: either marked directly or
/// flagged through fragment-error propagation.
///
/// # Examples
///
/// ```
/// use dynomite::msg::{request, DynErrorCode, Msg, MsgType};
///
/// let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
/// assert!(!request::is_error(&req));
/// request::set_error(&mut req, 13, DynErrorCode::PeerHostDown);
/// assert!(request::is_error(&req));
/// ```
#[must_use]
pub fn is_error(req: &Msg) -> bool {
    req.flags().is_error || req.flags().is_ferror
}

/// Drain `from` of every request whose `selected_rsp` matches `id`
/// and forward them to `to`.
///
/// This is the data-shape building block the connection-level
/// `req_send_next` / `req_send_done` functions consume; it lets
/// tests exercise the sibling-walk without standing up the full
/// connection FSM.
///
/// # Examples
///
/// ```
/// use dynomite::msg::{request, Msg, MsgQueue, MsgType};
///
/// let mut a = MsgQueue::new();
/// let mut b = MsgQueue::new();
/// let mut m = Msg::new(1, MsgType::ReqRedisGet, true);
/// m.set_selected_rsp(Some(99));
/// a.push_back(m);
/// request::move_completed(&mut a, &mut b, 99);
/// assert!(a.is_empty());
/// assert_eq!(b.len(), 1);
/// ```
pub fn move_completed(from: &mut MsgQueue, to: &mut MsgQueue, id: MsgId) {
    let mut keep = MsgQueue::new();
    while let Some(msg) = from.pop_front() {
        if msg.selected_rsp() == Some(id) {
            to.push_back(msg);
        } else {
            keep.push_back(msg);
        }
    }
    while let Some(msg) = keep.pop_front() {
        from.push_back(msg);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::msg::{DynErrorCode, MsgType};

    #[test]
    fn error_flags_propagate() {
        let mut req = Msg::new(1, MsgType::ReqRedisGet, true);
        assert!(set_error(&mut req, 7, DynErrorCode::PeerHostDown));
        assert!(req.flags().is_error);
        assert!(req.flags().done);
        assert_eq!(req.error_code(), 7);
        assert_eq!(req.dyn_error_code(), DynErrorCode::PeerHostDown);
        assert!(is_error(&req));
        assert!(!set_error(&mut req, 9, DynErrorCode::DynomiteUnknownError));
        assert_eq!(req.error_code(), 7);
    }

    #[test]
    fn fragmented_request_done_requires_fdone() {
        let mut req = Msg::new(1, MsgType::ReqRedisMget, true);
        req.push_fragment_id(2);
        req.set_selected_rsp(Some(99));
        assert!(!is_done(&req));
        req.flags_mut().fdone = true;
        assert!(is_done(&req));
    }
}