valkey-module 0.1.11

A toolkit for building valkey modules in Rust
Documentation
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};

use valkey_module::alloc::ValkeyAlloc;
use valkey_module::server_events::{
    ClientChangeSubevent, EventLoopSubevent, ForkChildSubevent, KeyChangeSubevent, LoadingProgress,
    MasterLinkChangeSubevent, PersistenceSubevent, ReplAsyncLoadSubevent, ReplicaChangeSubevent,
};
use valkey_module::{
    server_events::FlushSubevent, valkey_module, Context, ModuleOptions, Status, ValkeyResult,
    ValkeyString, ValkeyValue,
};
use valkey_module_macros::{
    client_changed_event_handler, config_changed_event_handler, cron_event_handler,
    event_loop_event_handler, flush_event_handler, fork_child_event_handler, key_event_handler,
    loading_progress_event_handler, master_link_change_event_handler, persistence_event_handler,
    repl_async_load_event_handler, replica_change_event_handler, shutdown_event_handler,
    swapdb_event_handler,
};

static NUM_FLUSHES: AtomicI64 = AtomicI64::new(0);
static NUM_CONNECTS: AtomicI64 = AtomicI64::new(0);
static NUM_CRONS: AtomicI64 = AtomicI64::new(0);
static NUM_MAX_MEMORY_CONFIGURATION_CHANGES: AtomicI64 = AtomicI64::new(0);
static NUM_KEY_EVENTS: AtomicI64 = AtomicI64::new(0);
static NUM_PERSISTENCE_EVENTS: AtomicI64 = AtomicI64::new(0);
static NUM_MASTER_LINK_CHANGE_EVENTS: AtomicI64 = AtomicI64::new(0);
static IS_MASTER_LINK_UP: AtomicBool = AtomicBool::new(false);
static NUM_FORK_CHILD_EVENTS: AtomicI64 = AtomicI64::new(0);
static NUM_REPLICA_CHANGE_EVENTS: AtomicI64 = AtomicI64::new(0);
static NUM_REPL_ASYNC_LOAD_EVENTS: AtomicI64 = AtomicI64::new(0);
static NUM_SWAP_DB_EVENTS: AtomicI64 = AtomicI64::new(0);
static NUM_LOADING_PROGRESS_RDB: AtomicI64 = AtomicI64::new(0);
static NUM_LOADING_PROGRESS_AOF: AtomicI64 = AtomicI64::new(0);
static NUM_EVENT_LOOP_BEFORE_SLEEP: AtomicI64 = AtomicI64::new(0);
static NUM_EVENT_LOOP_AFTER_SLEEP: AtomicI64 = AtomicI64::new(0);

#[flush_event_handler]
fn flushed_event_handler(_ctx: &Context, flush_event: FlushSubevent) {
    if let FlushSubevent::Started = flush_event {
        NUM_FLUSHES.fetch_add(1, Ordering::SeqCst);
    }
}

#[config_changed_event_handler]
fn config_changed_event_handler(_ctx: &Context, changed_configs: &[&str]) {
    changed_configs
        .iter()
        .find(|v| **v == "maxmemory")
        .map(|_| NUM_MAX_MEMORY_CONFIGURATION_CHANGES.fetch_add(1, Ordering::SeqCst));
}

#[cron_event_handler]
fn cron_event_handler(_ctx: &Context, _hz: u64) {
    NUM_CRONS.fetch_add(1, Ordering::SeqCst);
}

#[client_changed_event_handler]
fn client_changed_event_handler(ctx: &Context, client_event: ClientChangeSubevent) {
    match client_event {
        ClientChangeSubevent::Connected => {
            ctx.log_notice("Connected");
            NUM_CONNECTS.fetch_add(1, Ordering::SeqCst);
        }
        ClientChangeSubevent::Disconnected => {
            ctx.log_notice("Disconnected");
            NUM_CONNECTS.fetch_sub(1, Ordering::SeqCst);
        }
    }
}

#[key_event_handler]
fn key_event_handler(ctx: &Context, key_event: KeyChangeSubevent) {
    match key_event {
        KeyChangeSubevent::Deleted => {
            ctx.log_notice("Key deleted");
        }
        KeyChangeSubevent::Evicted => {
            ctx.log_notice("Key evicted");
        }
        KeyChangeSubevent::Overwritten => {
            ctx.log_notice("Key overwritten");
        }
        KeyChangeSubevent::Expired => {
            ctx.log_notice("Key expired");
        }
    }
    NUM_KEY_EVENTS.fetch_add(1, Ordering::SeqCst);
}

#[shutdown_event_handler]
fn shutdown_event_handler(ctx: &Context, _event: u64) {
    ctx.log_notice("Sever shutdown callback event ...");
    // Check if test file shutdown_log.txt exists and wrie the above log to it
    let shutdown_log_path = "shutdown_log.txt";

    // Attempt to write the log message to the file
    if let Err(e) = std::fs::write(shutdown_log_path, "Server shutdown callback event ...\n") {
        ctx.log_warning(&format!("Failed to write to shutdown log file: {}", e));
    }
}

#[persistence_event_handler]
fn persistence_event_handler(ctx: &Context, persistence_event: PersistenceSubevent) {
    match persistence_event {
        PersistenceSubevent::RdbStart => {
            ctx.log_notice("RDB persistence started");
        }
        PersistenceSubevent::AofStart => {
            ctx.log_notice("AOF persistence started");
        }
        PersistenceSubevent::SyncRdbStart => {
            ctx.log_notice("Sync RDB persistence started");
        }
        PersistenceSubevent::SyncAofStart => {
            ctx.log_notice("Sync AOF persistence started");
        }
        PersistenceSubevent::Ended => {
            ctx.log_notice("Persistence operation ended");
        }
        PersistenceSubevent::Failed => {
            ctx.log_warning("Persistence operation failed");
        }
    }
    NUM_PERSISTENCE_EVENTS.fetch_add(1, Ordering::SeqCst);
}

#[master_link_change_event_handler]
fn master_link_change_event_handler(
    ctx: &Context,
    master_link_change_subevent: MasterLinkChangeSubevent,
) {
    match master_link_change_subevent {
        MasterLinkChangeSubevent::Up => {
            ctx.log_warning("Master link status up");
            NUM_MASTER_LINK_CHANGE_EVENTS.fetch_add(1, Ordering::SeqCst);
            IS_MASTER_LINK_UP.store(true, Ordering::SeqCst);
        }
        MasterLinkChangeSubevent::Down => {
            ctx.log_warning("Master link status down");
            NUM_MASTER_LINK_CHANGE_EVENTS.fetch_add(1, Ordering::SeqCst);
            IS_MASTER_LINK_UP.store(false, Ordering::SeqCst);
        }
    }
}

#[fork_child_event_handler]
fn fork_child_event_handler(ctx: &Context, fork_child_subevent: ForkChildSubevent) {
    match fork_child_subevent {
        ForkChildSubevent::Born => {
            ctx.log_warning("Fork child born");
        }
        ForkChildSubevent::Died => {
            ctx.log_warning("Fork child died");
        }
    }
    NUM_FORK_CHILD_EVENTS.fetch_add(1, Ordering::SeqCst);
}

#[replica_change_event_handler]
fn replica_change_event_handler(ctx: &Context, replica_change_subevent: ReplicaChangeSubevent) {
    match replica_change_subevent {
        ReplicaChangeSubevent::Online => {
            ctx.log_notice("Replica online");
        }
        ReplicaChangeSubevent::Offline => {
            ctx.log_notice("Replica offline");
        }
    }
    NUM_REPLICA_CHANGE_EVENTS.fetch_add(1, Ordering::SeqCst);
}

#[repl_async_load_event_handler]
fn repl_async_load_event_handler(ctx: &Context, repl_async_load_subevent: ReplAsyncLoadSubevent) {
    match repl_async_load_subevent {
        ReplAsyncLoadSubevent::Started => {
            ctx.log_notice("Repl async load started");
        }
        ReplAsyncLoadSubevent::Aborted => {
            ctx.log_notice("Repl async load aborted");
        }
        ReplAsyncLoadSubevent::Completed => {
            ctx.log_notice("Repl async load completed");
        }
    }
    NUM_REPL_ASYNC_LOAD_EVENTS.fetch_add(1, Ordering::SeqCst);
}

#[swapdb_event_handler]
fn swapdb_event_handler(ctx: &Context, _: u64) {
    NUM_SWAP_DB_EVENTS.fetch_add(1, Ordering::SeqCst);
    ctx.log_notice("Databases swapped");
}

#[loading_progress_event_handler]
fn loading_progress_event_handler(ctx: &Context, info: LoadingProgress) {
    let msg = format!(
        "Loading progress {:?}: hz={}, progress={}",
        info.subevent, info.hz, info.progress
    );
    ctx.log_notice(&msg);
    match info.subevent {
        valkey_module::server_events::LoadingProgressSubevent::Rdb => {
            NUM_LOADING_PROGRESS_RDB.fetch_add(1, Ordering::SeqCst);
        }
        valkey_module::server_events::LoadingProgressSubevent::Aof => {
            NUM_LOADING_PROGRESS_AOF.fetch_add(1, Ordering::SeqCst);
        }
    }
}

#[event_loop_event_handler]
fn event_loop_event_handler(ctx: &Context, subevent: EventLoopSubevent) {
    match subevent {
        EventLoopSubevent::BeforeSleep => {
            NUM_EVENT_LOOP_BEFORE_SLEEP.fetch_add(1, Ordering::SeqCst);
        }
        EventLoopSubevent::AfterSleep => {
            NUM_EVENT_LOOP_AFTER_SLEEP.fetch_add(1, Ordering::SeqCst);
        }
    }
}

fn num_flushed(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(NUM_FLUSHES.load(Ordering::SeqCst)))
}

fn num_crons(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(NUM_CRONS.load(Ordering::SeqCst)))
}

fn num_maxmemory_changes(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_MAX_MEMORY_CONFIGURATION_CHANGES.load(Ordering::SeqCst),
    ))
}

fn num_connects(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(NUM_CONNECTS.load(Ordering::SeqCst)))
}

fn num_key_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(NUM_KEY_EVENTS.load(Ordering::SeqCst)))
}

fn num_master_link_change_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_MASTER_LINK_CHANGE_EVENTS.load(Ordering::SeqCst),
    ))
}

fn is_master_link_up(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Bool(IS_MASTER_LINK_UP.load(Ordering::SeqCst)))
}

fn num_persistence_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_PERSISTENCE_EVENTS.load(Ordering::SeqCst),
    ))
}

fn num_fork_child_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_FORK_CHILD_EVENTS.load(Ordering::SeqCst),
    ))
}

fn num_replica_change_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_REPLICA_CHANGE_EVENTS.load(Ordering::SeqCst),
    ))
}

fn num_repl_async_load_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_REPL_ASYNC_LOAD_EVENTS.load(Ordering::SeqCst),
    ))
}

fn num_swapdb_events(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_SWAP_DB_EVENTS.load(Ordering::SeqCst),
    ))
}

fn num_loading_progress_rdb(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_LOADING_PROGRESS_RDB.load(Ordering::SeqCst),
    ))
}

fn num_loading_progress_aof(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_LOADING_PROGRESS_AOF.load(Ordering::SeqCst),
    ))
}

fn num_event_loop_before_sleep(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_EVENT_LOOP_BEFORE_SLEEP.load(Ordering::SeqCst),
    ))
}

fn num_event_loop_after_sleep(_ctx: &Context, _args: Vec<ValkeyString>) -> ValkeyResult {
    Ok(ValkeyValue::Integer(
        NUM_EVENT_LOOP_AFTER_SLEEP.load(Ordering::SeqCst),
    ))
}

fn init(ctx: &Context, _args: &[ValkeyString]) -> Status {
    // https://valkey.io/topics/modules-api-ref/#ValkeyModule_SetModuleOptions
    // otherwise you get:  Skipping diskless-load because there are modules that are not aware of async replication.
    // needed for repl_async_load_event_handler
    ctx.set_module_options(ModuleOptions::HANDLE_REPL_ASYNC_LOAD);
    Status::Ok
}

//////////////////////////////////////////////////////

valkey_module! {
    name: "srv_events",
    version: 1,
    allocator: (ValkeyAlloc, ValkeyAlloc),
    data_types: [],
    init: init,
    commands: [
        ["num_flushed", num_flushed, "readonly", 0, 0, 0],
        ["num_max_memory_changes", num_maxmemory_changes, "readonly", 0, 0, 0],
        ["num_crons", num_crons, "readonly", 0, 0, 0],
        ["num_connects", num_connects, "readonly", 0, 0, 0],
        ["num_key_events", num_key_events, "readonly", 0, 0, 0],
        ["num_persistence_events", num_persistence_events, "readonly", 0, 0, 0],
        ["num_master_link_change_events", num_master_link_change_events, "readonly", 0, 0, 0],
        ["is_master_link_up", is_master_link_up, "readonly", 0, 0, 0],
        ["num_fork_child_events", num_fork_child_events, "readonly", 0, 0, 0],
        ["num_replica_change_events", num_replica_change_events, "readonly", 0, 0, 0],
        ["num_repl_async_load_events", num_repl_async_load_events, "readonly", 0, 0, 0],
        ["num_swapdb_events", num_swapdb_events, "readonly", 0, 0, 0],
        ["num_loading_progress_rdb", num_loading_progress_rdb, "readonly", 0, 0, 0],
        ["num_loading_progress_aof", num_loading_progress_aof, "readonly", 0, 0, 0],
        ["num_event_loop_before_sleep", num_event_loop_before_sleep, "readonly", 0, 0, 0],
        ["num_event_loop_after_sleep", num_event_loop_after_sleep, "readonly", 0, 0, 0],
    ]
}