reddb-io-file 1.11.0

RedDB file artifact layer: single-file .rdb layout, WAL, snapshots, checkpoints, locks, and recovery.
Documentation
use super::*;

use serde_json::{Map as JsonMap, Value as JsonValue};
use std::sync::atomic::{AtomicU64, Ordering};

pub const SERVERLESS_WRITER_LEASE_DEFAULT_TERM: u64 = 1;
static SERVERLESS_WRITER_LEASE_TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServerlessWriterLease {
    pub database_key: String,
    pub holder_id: String,
    pub term: u64,
    pub generation: u64,
    pub acquired_at_ms: u64,
    pub expires_at_ms: u64,
}

impl ServerlessWriterLease {
    pub fn is_expired(&self, now_ms: u64) -> bool {
        self.expires_at_ms <= now_ms
    }

    pub fn fenced_by_term(&self, current_term: u64) -> bool {
        self.term < current_term
    }

    pub fn fencing_token(&self) -> (u64, u64) {
        (self.term, self.generation)
    }
}

pub fn serverless_writer_lease_key(prefix: &str, database_key: &str) -> String {
    format!("{prefix}{database_key}.lease.json")
}

pub fn serverless_writer_lease_temp_path(
    kind: &str,
    process_id: u32,
    now_unix_nanos: u128,
    unique: u64,
) -> PathBuf {
    std::env::temp_dir().join(format!(
        "reddb-lease-{kind}-{process_id}-{now_unix_nanos}-{unique}.json"
    ))
}

#[derive(Debug)]
pub struct ServerlessWriterLeaseTempFile {
    path: PathBuf,
}

impl ServerlessWriterLeaseTempFile {
    pub fn new(kind: &str) -> Self {
        let unique = SERVERLESS_WRITER_LEASE_TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
        Self::with_clock(kind, std::process::id(), now_unix_nanos(), unique)
    }

    pub fn with_clock(kind: &str, process_id: u32, now_unix_nanos: u128, unique: u64) -> Self {
        Self {
            path: serverless_writer_lease_temp_path(kind, process_id, now_unix_nanos, unique),
        }
    }

    pub fn path(&self) -> &Path {
        &self.path
    }

    pub fn write_bytes(&self, bytes: &[u8]) -> RdbFileResult<()> {
        fs::write(&self.path, bytes)?;
        Ok(())
    }

    pub fn read_bytes(&self) -> RdbFileResult<Vec<u8>> {
        Ok(fs::read(&self.path)?)
    }

    pub fn cleanup(&self) -> RdbFileResult<()> {
        match fs::remove_file(&self.path) {
            Ok(()) => Ok(()),
            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
            Err(err) => Err(err.into()),
        }
    }
}

impl Drop for ServerlessWriterLeaseTempFile {
    fn drop(&mut self) {
        let _ = self.cleanup();
    }
}

fn now_unix_nanos() -> u128 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos()
}

pub fn encode_serverless_writer_lease_json(
    lease: &ServerlessWriterLease,
) -> RdbFileResult<Vec<u8>> {
    let mut object = JsonMap::new();
    object.insert(
        "database_key".to_string(),
        JsonValue::String(lease.database_key.clone()),
    );
    object.insert(
        "holder_id".to_string(),
        JsonValue::String(lease.holder_id.clone()),
    );
    object.insert("term".to_string(), JsonValue::Number(lease.term.into()));
    object.insert(
        "generation".to_string(),
        JsonValue::Number(lease.generation.into()),
    );
    object.insert(
        "acquired_at_ms".to_string(),
        JsonValue::Number(lease.acquired_at_ms.into()),
    );
    object.insert(
        "expires_at_ms".to_string(),
        JsonValue::Number(lease.expires_at_ms.into()),
    );
    serde_json::to_vec(&JsonValue::Object(object))
        .map_err(|err| RdbFileError::InvalidOperation(format!("encode writer lease: {err}")))
}

pub fn decode_serverless_writer_lease_json(bytes: &[u8]) -> RdbFileResult<ServerlessWriterLease> {
    let value: JsonValue = serde_json::from_slice(bytes).map_err(|err| {
        RdbFileError::InvalidOperation(format!("decode writer lease json: {err}"))
    })?;
    let object = value
        .as_object()
        .ok_or_else(|| RdbFileError::InvalidOperation("lease json is not an object".into()))?;
    Ok(ServerlessWriterLease {
        database_key: required_string(object, "database_key")?,
        holder_id: required_string(object, "holder_id")?,
        term: object
            .get("term")
            .and_then(JsonValue::as_u64)
            .unwrap_or(SERVERLESS_WRITER_LEASE_DEFAULT_TERM),
        generation: required_u64(object, "generation")?,
        acquired_at_ms: required_u64(object, "acquired_at_ms")?,
        expires_at_ms: required_u64(object, "expires_at_ms")?,
    })
}

fn required_string(object: &JsonMap<String, JsonValue>, field: &str) -> RdbFileResult<String> {
    object
        .get(field)
        .and_then(JsonValue::as_str)
        .map(ToString::to_string)
        .ok_or_else(|| RdbFileError::InvalidOperation(format!("missing {field}")))
}

fn required_u64(object: &JsonMap<String, JsonValue>, field: &str) -> RdbFileResult<u64> {
    object
        .get(field)
        .and_then(JsonValue::as_u64)
        .ok_or_else(|| RdbFileError::InvalidOperation(format!("missing {field}")))
}