reddb_file/serverless/
lease.rs1use super::*;
2
3use serde_json::{Map as JsonMap, Value as JsonValue};
4use std::sync::atomic::{AtomicU64, Ordering};
5
6pub const SERVERLESS_WRITER_LEASE_DEFAULT_TERM: u64 = 1;
7static SERVERLESS_WRITER_LEASE_TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ServerlessWriterLease {
11 pub database_key: String,
12 pub holder_id: String,
13 pub term: u64,
14 pub generation: u64,
15 pub acquired_at_ms: u64,
16 pub expires_at_ms: u64,
17}
18
19impl ServerlessWriterLease {
20 pub fn is_expired(&self, now_ms: u64) -> bool {
21 self.expires_at_ms <= now_ms
22 }
23
24 pub fn fenced_by_term(&self, current_term: u64) -> bool {
25 self.term < current_term
26 }
27
28 pub fn fencing_token(&self) -> (u64, u64) {
29 (self.term, self.generation)
30 }
31}
32
33pub fn serverless_writer_lease_key(prefix: &str, database_key: &str) -> String {
34 format!("{prefix}{database_key}.lease.json")
35}
36
37pub fn serverless_writer_lease_temp_path(
38 kind: &str,
39 process_id: u32,
40 now_unix_nanos: u128,
41 unique: u64,
42) -> PathBuf {
43 std::env::temp_dir().join(format!(
44 "reddb-lease-{kind}-{process_id}-{now_unix_nanos}-{unique}.json"
45 ))
46}
47
48#[derive(Debug)]
49pub struct ServerlessWriterLeaseTempFile {
50 path: PathBuf,
51}
52
53impl ServerlessWriterLeaseTempFile {
54 pub fn new(kind: &str) -> Self {
55 let unique = SERVERLESS_WRITER_LEASE_TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
56 Self::with_clock(kind, std::process::id(), now_unix_nanos(), unique)
57 }
58
59 pub fn with_clock(kind: &str, process_id: u32, now_unix_nanos: u128, unique: u64) -> Self {
60 Self {
61 path: serverless_writer_lease_temp_path(kind, process_id, now_unix_nanos, unique),
62 }
63 }
64
65 pub fn path(&self) -> &Path {
66 &self.path
67 }
68
69 pub fn write_bytes(&self, bytes: &[u8]) -> RdbFileResult<()> {
70 fs::write(&self.path, bytes)?;
71 Ok(())
72 }
73
74 pub fn read_bytes(&self) -> RdbFileResult<Vec<u8>> {
75 Ok(fs::read(&self.path)?)
76 }
77
78 pub fn cleanup(&self) -> RdbFileResult<()> {
79 match fs::remove_file(&self.path) {
80 Ok(()) => Ok(()),
81 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
82 Err(err) => Err(err.into()),
83 }
84 }
85}
86
87impl Drop for ServerlessWriterLeaseTempFile {
88 fn drop(&mut self) {
89 let _ = self.cleanup();
90 }
91}
92
93fn now_unix_nanos() -> u128 {
94 std::time::SystemTime::now()
95 .duration_since(std::time::UNIX_EPOCH)
96 .unwrap_or_default()
97 .as_nanos()
98}
99
100pub fn encode_serverless_writer_lease_json(
101 lease: &ServerlessWriterLease,
102) -> RdbFileResult<Vec<u8>> {
103 let mut object = JsonMap::new();
104 object.insert(
105 "database_key".to_string(),
106 JsonValue::String(lease.database_key.clone()),
107 );
108 object.insert(
109 "holder_id".to_string(),
110 JsonValue::String(lease.holder_id.clone()),
111 );
112 object.insert("term".to_string(), JsonValue::Number(lease.term.into()));
113 object.insert(
114 "generation".to_string(),
115 JsonValue::Number(lease.generation.into()),
116 );
117 object.insert(
118 "acquired_at_ms".to_string(),
119 JsonValue::Number(lease.acquired_at_ms.into()),
120 );
121 object.insert(
122 "expires_at_ms".to_string(),
123 JsonValue::Number(lease.expires_at_ms.into()),
124 );
125 serde_json::to_vec(&JsonValue::Object(object))
126 .map_err(|err| RdbFileError::InvalidOperation(format!("encode writer lease: {err}")))
127}
128
129pub fn decode_serverless_writer_lease_json(bytes: &[u8]) -> RdbFileResult<ServerlessWriterLease> {
130 let value: JsonValue = serde_json::from_slice(bytes).map_err(|err| {
131 RdbFileError::InvalidOperation(format!("decode writer lease json: {err}"))
132 })?;
133 let object = value
134 .as_object()
135 .ok_or_else(|| RdbFileError::InvalidOperation("lease json is not an object".into()))?;
136 Ok(ServerlessWriterLease {
137 database_key: required_string(object, "database_key")?,
138 holder_id: required_string(object, "holder_id")?,
139 term: object
140 .get("term")
141 .and_then(JsonValue::as_u64)
142 .unwrap_or(SERVERLESS_WRITER_LEASE_DEFAULT_TERM),
143 generation: required_u64(object, "generation")?,
144 acquired_at_ms: required_u64(object, "acquired_at_ms")?,
145 expires_at_ms: required_u64(object, "expires_at_ms")?,
146 })
147}
148
149fn required_string(object: &JsonMap<String, JsonValue>, field: &str) -> RdbFileResult<String> {
150 object
151 .get(field)
152 .and_then(JsonValue::as_str)
153 .map(ToString::to_string)
154 .ok_or_else(|| RdbFileError::InvalidOperation(format!("missing {field}")))
155}
156
157fn required_u64(object: &JsonMap<String, JsonValue>, field: &str) -> RdbFileResult<u64> {
158 object
159 .get(field)
160 .and_then(JsonValue::as_u64)
161 .ok_or_else(|| RdbFileError::InvalidOperation(format!("missing {field}")))
162}