kevy-rt 1.4.1

kevy thread-per-core shared-nothing runtime — pure Rust, zero deps.
Documentation
//! `RENAME` / `RENAMENX` orchestration. The runtime decides whether
//! both keys live on the same shard (atomic single-Op route) or split
//! across shards (Take-Put orchestrator landing in v2-3b — until then
//! the cross-shard arm returns `-CROSSSHARD ...`).
//!
//! Why this can't be served by `Route::Single`: a single-key route hits
//! the source shard, where a same-shard atomic is straightforward but a
//! cross-shard hop needs to know nshards + emit a different Op to ship
//! the value to the destination shard. Routing is the runtime's job;
//! the dispatch layer (`kevy::cmd::*`) sees only one shard at a time.

use crate::message::{Agg, Inbound, Op, Part, PendingSlot, RenameStep};
use crate::reduce::{drain_front, shard_of};
use crate::shard::Shard;
use crate::{Commands, RespVersion};
use kevy_resp::ArgvView;

impl<C: Commands> Shard<C> {
    /// `RENAME` / `RENAMENX` — see [`Route::Rename`].
    pub(crate) fn start_rename<A: ArgvView + ?Sized>(
        &mut self,
        conn_id: u64,
        seq: u64,
        args: &A,
        nx: bool,
    ) {
        // Arity: RENAME source destination → 3 args.
        if args.len() != 3 {
            let cmd_name = if nx { "renamenx" } else { "rename" };
            let err = format!("-ERR wrong number of arguments for '{cmd_name}' command\r\n");
            self.fold_rename_reply(conn_id, seq, err.into_bytes());
            return;
        }
        let src = args[1].to_vec();
        let dst = args[2].to_vec();
        let src_shard = shard_of(&src, self.nshards);
        let dst_shard = shard_of(&dst, self.nshards);

        if src_shard == dst_shard {
            // Same-shard: one atomic Op::Rename. Route to the owning
            // shard — exec_op runs store.rename + bumps WATCH versions
            // + AOF logs + emits keyspace notifications.
            self.push_pending_slot(conn_id, 1, Agg::First(None), false);
            let op = Op::Rename { src, dst, nx };
            if src_shard == self.id {
                let part = self.exec_op(op);
                self.fold(conn_id, seq, part);
            } else {
                self.send_to(
                    src_shard,
                    Inbound::Request {
                        origin: self.id,
                        conn: conn_id,
                        seq,
                        op,
                    },
                );
            }
            return;
        }

        // Cross-shard: orchestrator. Push a single pending slot with
        // Agg::RenameOrchestrator; step 1 emits Op::RenameTake to
        // src_shard. Fold receives Part::RenameTaken (or NoSuchSrc),
        // step transitions to Put, emits Op::RenamePut. Step 2's
        // Part::RenamePutDone triggers the +OK / :1 / :0 reply.
        let agg = Agg::RenameOrchestrator {
            step: RenameStep::Take,
            nx,
            src: src.clone(),
            dst,
            dst_shard,
            taken: None,
            put_stored: None,
        };
        if let Some(c) = self.conns.get_mut(&conn_id) {
            let proto = c.proto;
            c.pending.push_back(PendingSlot {
                remaining: 1,
                agg,
                done: None,
                proto,
            });
        }
        let take_op = Op::RenameTake(src);
        if src_shard == self.id {
            let part = self.exec_op(take_op);
            self.fold(conn_id, seq, part);
        } else {
            self.send_to(
                src_shard,
                Inbound::Request {
                    origin: self.id,
                    conn: conn_id,
                    seq,
                    op: take_op,
                },
            );
        }
    }

    /// Resume the cross-shard RENAME after a sub-reply lands. Called
    /// from `Shard::fold` once `slot.remaining == 0` for an
    /// `Agg::RenameOrchestrator` slot.
    ///
    /// On step-1 completion: if Take succeeded → ship step 2 to
    /// dst_shard, re-arm the slot. If Take missed → finalize with
    /// `-ERR no such key`.
    ///
    /// On step-2 completion: finalize with `+OK` (RENAME ok) or `:1`
    /// (RENAMENX ok) or `:0` (RENAMENX-blocked: dst already existed
    /// on dst_shard at the moment of Put; we accept the data-loss
    /// race vs adding a third "restore-src" step — Redis cluster has
    /// the same trade-off via MIGRATE).
    pub(crate) fn finalize_rename_agg(&mut self, conn_id: u64, seq: u64, agg: Agg) {
        let Agg::RenameOrchestrator {
            step,
            nx,
            src,
            dst,
            dst_shard,
            taken,
            put_stored,
        } = agg
        else {
            return;
        };
        match step {
            RenameStep::Take => self.advance_rename_to_put(conn_id, seq, nx, src, dst, dst_shard, taken),
            RenameStep::Put => self.finish_rename_put(conn_id, seq, nx, put_stored),
        }
    }

    /// Step 1 → step 2 transition. If src didn't exist, finalize with
    /// `-ERR no such key`. Otherwise re-arm the slot for Put + ship
    /// Op::RenamePut to dst_shard.
    #[allow(clippy::too_many_arguments)]
    fn advance_rename_to_put(
        &mut self,
        conn_id: u64,
        seq: u64,
        nx: bool,
        _src: Vec<u8>,
        dst: Vec<u8>,
        dst_shard: usize,
        taken: Option<(kevy_store::Value, Option<u64>)>,
    ) {
        let Some((value, ttl_ms)) = taken else {
            // Take returned NoSuchSrc — finalize.
            self.fill_rename_slot(conn_id, seq, b"-ERR no such key\r\n".to_vec());
            return;
        };
        // Re-arm the same slot for step 2. Step transitions to Put.
        if let Some(c) = self.conns.get_mut(&conn_id) {
            let idx = (seq - c.next_emit) as usize;
            if let Some(slot) = c.pending.get_mut(idx) {
                slot.remaining = 1;
                slot.agg = Agg::RenameOrchestrator {
                    step: RenameStep::Put,
                    nx,
                    src: Vec::new(), // src no longer needed
                    dst: dst.clone(),
                    dst_shard,
                    taken: None,        // step 2 doesn't buffer Value
                    put_stored: None,   // fold fills this from RenamePutDone
                };
            }
        }
        let put_op = Op::RenamePut {
            dst,
            value,
            ttl_ms,
            nx,
        };
        if dst_shard == self.id {
            let part = self.exec_op(put_op);
            self.fold(conn_id, seq, part);
        } else {
            self.send_to(
                dst_shard,
                Inbound::Request {
                    origin: self.id,
                    conn: conn_id,
                    seq,
                    op: put_op,
                },
            );
        }
    }

    /// Step 2 finished. Reply +OK / :1 / :0 depending on Put's `stored`
    /// flag + the NX flag. `put_stored` is filled by `Shard::fold` from
    /// `Part::RenamePutDone.stored` before this is called.
    fn finish_rename_put(
        &mut self,
        conn_id: u64,
        seq: u64,
        nx: bool,
        put_stored: Option<bool>,
    ) {
        let stored = put_stored.unwrap_or(false);
        let reply = if stored {
            if nx { b":1\r\n".to_vec() } else { b"+OK\r\n".to_vec() }
        } else {
            // Only reachable on RENAMENX cross-shard with pre-existing
            // dst at Put time. The data-loss race here (src already
            // taken at this point — Put refused, src gone) is
            // documented. Future v3 could add a restore-src step or
            // 2-phase pre-check.
            b":0\r\n".to_vec()
        };
        self.fill_rename_slot(conn_id, seq, reply);
    }

    /// Drop a literal RESP frame into the orchestrator's slot + drain.
    fn fill_rename_slot(&mut self, conn_id: u64, seq: u64, bytes: Vec<u8>) {
        if let Some(c) = self.conns.get_mut(&conn_id) {
            let idx = (seq - c.next_emit) as usize;
            if let Some(slot) = c.pending.get_mut(idx) {
                slot.done = Some(bytes);
            }
            drain_front(c);
        }
    }

    /// Push a single pending slot + immediately fold a pre-built reply
    /// into it. Used by the synchronous error paths (arity check,
    /// cross-shard rejection) so the reply preserves seq order without
    /// going through the broader `immediate_reply` (which assigns a
    /// fresh seq — we already have one).
    fn fold_rename_reply(&mut self, conn_id: u64, seq: u64, reply: Vec<u8>) {
        if let Some(c) = self.conns.get_mut(&conn_id) {
            let proto = c.proto;
            c.pending.push_back(PendingSlot {
                remaining: 1,
                agg: Agg::First(None),
                done: None,
                proto,
            });
            // Silence unused — proto only matters for the few aggs that
            // care about RESP3 shape, not for a fixed-bytes reply.
            let _ = RespVersion::V2;
        }
        self.fold(conn_id, seq, Part::Reply(reply));
    }
}