Skip to main content

reddb_file/serverless/
lease.rs

1use super::*;
2
3use serde_json::{Map as JsonMap, Value as JsonValue};
4use std::sync::atomic::{AtomicU64, Ordering};
5
6pub const SERVERLESS_WRITER_LEASE_DEFAULT_TERM: u64 = 1;
7static SERVERLESS_WRITER_LEASE_TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ServerlessWriterLease {
11    pub database_key: String,
12    pub holder_id: String,
13    pub term: u64,
14    pub generation: u64,
15    pub acquired_at_ms: u64,
16    pub expires_at_ms: u64,
17}
18
19impl ServerlessWriterLease {
20    pub fn is_expired(&self, now_ms: u64) -> bool {
21        self.expires_at_ms <= now_ms
22    }
23
24    pub fn fenced_by_term(&self, current_term: u64) -> bool {
25        self.term < current_term
26    }
27
28    pub fn fencing_token(&self) -> (u64, u64) {
29        (self.term, self.generation)
30    }
31}
32
33pub fn serverless_writer_lease_key(prefix: &str, database_key: &str) -> String {
34    format!("{prefix}{database_key}.lease.json")
35}
36
37pub fn serverless_writer_lease_temp_path(
38    kind: &str,
39    process_id: u32,
40    now_unix_nanos: u128,
41    unique: u64,
42) -> PathBuf {
43    std::env::temp_dir().join(format!(
44        "reddb-lease-{kind}-{process_id}-{now_unix_nanos}-{unique}.json"
45    ))
46}
47
48#[derive(Debug)]
49pub struct ServerlessWriterLeaseTempFile {
50    path: PathBuf,
51}
52
53impl ServerlessWriterLeaseTempFile {
54    pub fn new(kind: &str) -> Self {
55        let unique = SERVERLESS_WRITER_LEASE_TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
56        Self::with_clock(kind, std::process::id(), now_unix_nanos(), unique)
57    }
58
59    pub fn with_clock(kind: &str, process_id: u32, now_unix_nanos: u128, unique: u64) -> Self {
60        Self {
61            path: serverless_writer_lease_temp_path(kind, process_id, now_unix_nanos, unique),
62        }
63    }
64
65    pub fn path(&self) -> &Path {
66        &self.path
67    }
68
69    pub fn write_bytes(&self, bytes: &[u8]) -> RdbFileResult<()> {
70        fs::write(&self.path, bytes)?;
71        Ok(())
72    }
73
74    pub fn read_bytes(&self) -> RdbFileResult<Vec<u8>> {
75        Ok(fs::read(&self.path)?)
76    }
77
78    pub fn cleanup(&self) -> RdbFileResult<()> {
79        match fs::remove_file(&self.path) {
80            Ok(()) => Ok(()),
81            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
82            Err(err) => Err(err.into()),
83        }
84    }
85}
86
87impl Drop for ServerlessWriterLeaseTempFile {
88    fn drop(&mut self) {
89        let _ = self.cleanup();
90    }
91}
92
93fn now_unix_nanos() -> u128 {
94    std::time::SystemTime::now()
95        .duration_since(std::time::UNIX_EPOCH)
96        .unwrap_or_default()
97        .as_nanos()
98}
99
100pub fn encode_serverless_writer_lease_json(
101    lease: &ServerlessWriterLease,
102) -> RdbFileResult<Vec<u8>> {
103    let mut object = JsonMap::new();
104    object.insert(
105        "database_key".to_string(),
106        JsonValue::String(lease.database_key.clone()),
107    );
108    object.insert(
109        "holder_id".to_string(),
110        JsonValue::String(lease.holder_id.clone()),
111    );
112    object.insert("term".to_string(), JsonValue::Number(lease.term.into()));
113    object.insert(
114        "generation".to_string(),
115        JsonValue::Number(lease.generation.into()),
116    );
117    object.insert(
118        "acquired_at_ms".to_string(),
119        JsonValue::Number(lease.acquired_at_ms.into()),
120    );
121    object.insert(
122        "expires_at_ms".to_string(),
123        JsonValue::Number(lease.expires_at_ms.into()),
124    );
125    serde_json::to_vec(&JsonValue::Object(object))
126        .map_err(|err| RdbFileError::InvalidOperation(format!("encode writer lease: {err}")))
127}
128
129pub fn decode_serverless_writer_lease_json(bytes: &[u8]) -> RdbFileResult<ServerlessWriterLease> {
130    let value: JsonValue = serde_json::from_slice(bytes).map_err(|err| {
131        RdbFileError::InvalidOperation(format!("decode writer lease json: {err}"))
132    })?;
133    let object = value
134        .as_object()
135        .ok_or_else(|| RdbFileError::InvalidOperation("lease json is not an object".into()))?;
136    Ok(ServerlessWriterLease {
137        database_key: required_string(object, "database_key")?,
138        holder_id: required_string(object, "holder_id")?,
139        term: object
140            .get("term")
141            .and_then(JsonValue::as_u64)
142            .unwrap_or(SERVERLESS_WRITER_LEASE_DEFAULT_TERM),
143        generation: required_u64(object, "generation")?,
144        acquired_at_ms: required_u64(object, "acquired_at_ms")?,
145        expires_at_ms: required_u64(object, "expires_at_ms")?,
146    })
147}
148
149fn required_string(object: &JsonMap<String, JsonValue>, field: &str) -> RdbFileResult<String> {
150    object
151        .get(field)
152        .and_then(JsonValue::as_str)
153        .map(ToString::to_string)
154        .ok_or_else(|| RdbFileError::InvalidOperation(format!("missing {field}")))
155}
156
157fn required_u64(object: &JsonMap<String, JsonValue>, field: &str) -> RdbFileResult<u64> {
158    object
159        .get(field)
160        .and_then(JsonValue::as_u64)
161        .ok_or_else(|| RdbFileError::InvalidOperation(format!("missing {field}")))
162}