nodedb 0.0.0-beta.1

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
//! Hash field RESP command handlers: HGET, HMGET, HSET, FLUSHDB.

use crate::bridge::envelope::{PhysicalPlan, Status};
use crate::bridge::physical_plan::KvOp;
use crate::control::state::SharedState;

use super::codec::RespValue;
use super::command::RespCommand;
use super::handler::{dispatch_kv, dispatch_kv_write};
use super::session::RespSession;

pub(super) async fn handle_hget(
    cmd: &RespCommand,
    session: &RespSession,
    state: &SharedState,
) -> RespValue {
    if cmd.argc() < 2 {
        return RespValue::err("ERR wrong number of arguments for 'hget' command");
    }

    let key = cmd.args[0].clone();
    let field = cmd.arg_str(1).unwrap_or("").to_string();

    let plan = PhysicalPlan::Kv(KvOp::FieldGet {
        collection: session.collection.clone(),
        key,
        fields: vec![field.clone()],
    });

    match dispatch_kv(state, session, plan).await {
        Ok(resp) if resp.status == Status::Ok => {
            let json: serde_json::Value = serde_json::from_slice(&resp.payload).unwrap_or_default();
            match json.get(&field) {
                Some(serde_json::Value::Null) | None => RespValue::nil(),
                Some(serde_json::Value::String(s)) => RespValue::bulk_str(s),
                Some(v) => RespValue::bulk(v.to_string().into_bytes()),
            }
        }
        Ok(_) => RespValue::nil(),
        Err(e) => RespValue::err(format!("ERR {e}")),
    }
}

pub(super) async fn handle_hmget(
    cmd: &RespCommand,
    session: &RespSession,
    state: &SharedState,
) -> RespValue {
    if cmd.argc() < 2 {
        return RespValue::err("ERR wrong number of arguments for 'hmget' command");
    }

    let key = cmd.args[0].clone();
    let fields: Vec<String> = cmd.args[1..]
        .iter()
        .filter_map(|a| std::str::from_utf8(a).ok().map(|s| s.to_string()))
        .collect();

    let plan = PhysicalPlan::Kv(KvOp::FieldGet {
        collection: session.collection.clone(),
        key,
        fields: fields.clone(),
    });

    match dispatch_kv(state, session, plan).await {
        Ok(resp) if resp.status == Status::Ok => {
            let json: serde_json::Value = serde_json::from_slice(&resp.payload).unwrap_or_default();
            let items: Vec<RespValue> = fields
                .iter()
                .map(|f| match json.get(f) {
                    Some(serde_json::Value::Null) | None => RespValue::nil(),
                    Some(serde_json::Value::String(s)) => RespValue::bulk_str(s),
                    Some(v) => RespValue::bulk(v.to_string().into_bytes()),
                })
                .collect();
            RespValue::array(items)
        }
        Ok(_) => RespValue::nil_array(),
        Err(e) => RespValue::err(format!("ERR {e}")),
    }
}

pub(super) async fn handle_hset(
    cmd: &RespCommand,
    session: &RespSession,
    state: &SharedState,
) -> RespValue {
    if cmd.argc() < 3 || !(cmd.argc() - 1).is_multiple_of(2) {
        return RespValue::err("ERR wrong number of arguments for 'hset' command");
    }

    let key = cmd.args[0].clone();
    let updates: Vec<(String, Vec<u8>)> = cmd.args[1..]
        .chunks(2)
        .filter_map(|pair| {
            let field = std::str::from_utf8(&pair[0]).ok()?.to_string();
            let json_value =
                serde_json::Value::String(String::from_utf8_lossy(&pair[1]).into_owned());
            Some((field, serde_json::to_vec(&json_value).ok()?))
        })
        .collect();

    let plan = PhysicalPlan::Kv(KvOp::FieldSet {
        collection: session.collection.clone(),
        key,
        updates,
    });

    match dispatch_kv_write(state, session, plan).await {
        Ok(resp) if resp.status == Status::Ok => RespValue::integer(cmd.argc() as i64 / 2),
        Ok(_) => RespValue::integer(0),
        Err(e) => RespValue::err(format!("ERR {e}")),
    }
}

pub(super) async fn handle_flushdb(session: &RespSession, state: &SharedState) -> RespValue {
    let plan = PhysicalPlan::Kv(KvOp::Truncate {
        collection: session.collection.clone(),
    });

    match dispatch_kv_write(state, session, plan).await {
        Ok(_) => RespValue::ok(),
        Err(e) => RespValue::err(format!("ERR {e}")),
    }
}