1use async_trait::async_trait;
16use std::io::{Error, Result};
17use std::{
18 collections::HashMap,
19 time::{Duration, Instant},
20};
21
22use crate::{Locker, lock_args::LockArgs};
23
24pub const MAX_DELETE_LIST: usize = 1000;
25
26#[derive(Clone, Debug)]
27struct LockRequesterInfo {
28 name: String,
29 writer: bool,
30 uid: String,
31 time_stamp: Instant,
32 time_last_refresh: Instant,
33 source: String,
34 group: bool,
35 owner: String,
36 quorum: usize,
37 idx: usize,
38}
39
40impl Default for LockRequesterInfo {
41 fn default() -> Self {
42 Self {
43 name: Default::default(),
44 writer: Default::default(),
45 uid: Default::default(),
46 time_stamp: Instant::now(),
47 time_last_refresh: Instant::now(),
48 source: Default::default(),
49 group: Default::default(),
50 owner: Default::default(),
51 quorum: Default::default(),
52 idx: Default::default(),
53 }
54 }
55}
56
57fn is_write_lock(lri: &[LockRequesterInfo]) -> bool {
58 lri.len() == 1 && lri[0].writer
59}
60
61#[derive(Debug, Default)]
62pub struct LockStats {
63 total: usize,
64 writes: usize,
65 reads: usize,
66}
67
68#[derive(Debug, Default)]
69pub struct LocalLocker {
70 lock_map: HashMap<String, Vec<LockRequesterInfo>>,
71 lock_uid: HashMap<String, String>,
72}
73
74impl LocalLocker {
75 pub fn new() -> Self {
76 LocalLocker::default()
77 }
78}
79
80impl LocalLocker {
81 fn can_take_lock(&self, resource: &[String]) -> bool {
82 resource.iter().fold(true, |acc, x| !self.lock_map.contains_key(x) && acc)
83 }
84
85 pub fn stats(&self) -> LockStats {
86 let mut st = LockStats {
87 total: self.lock_map.len(),
88 ..Default::default()
89 };
90
91 self.lock_map.iter().for_each(|(_, value)| {
92 if !value.is_empty() {
93 if value[0].writer {
94 st.writes += 1;
95 } else {
96 st.reads += 1;
97 }
98 }
99 });
100
101 st
102 }
103
104 fn dump_lock_map(&mut self) -> HashMap<String, Vec<LockRequesterInfo>> {
105 let mut lock_copy = HashMap::new();
106 self.lock_map.iter().for_each(|(key, value)| {
107 lock_copy.insert(key.to_string(), value.to_vec());
108 });
109
110 lock_copy
111 }
112
113 fn expire_old_locks(&mut self, interval: Duration) {
114 self.lock_map.iter_mut().for_each(|(_, lris)| {
115 lris.retain(|lri| {
116 if Instant::now().duration_since(lri.time_last_refresh) > interval {
117 let mut key = lri.uid.to_string();
118 format_uuid(&mut key, &lri.idx);
119 self.lock_uid.remove(&key);
120 return false;
121 }
122
123 true
124 });
125 });
126 }
127}
128
129#[async_trait]
130impl Locker for LocalLocker {
131 async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
132 if args.resources.len() > MAX_DELETE_LIST {
133 return Err(Error::other(format!(
134 "internal error: LocalLocker.lock called with more than {MAX_DELETE_LIST} resources"
135 )));
136 }
137
138 if !self.can_take_lock(&args.resources) {
139 return Ok(false);
140 }
141
142 args.resources.iter().enumerate().for_each(|(idx, resource)| {
143 self.lock_map.insert(
144 resource.to_string(),
145 vec![LockRequesterInfo {
146 name: resource.to_string(),
147 writer: true,
148 source: args.source.to_string(),
149 owner: args.owner.to_string(),
150 uid: args.uid.to_string(),
151 group: args.resources.len() > 1,
152 quorum: args.quorum,
153 idx,
154 ..Default::default()
155 }],
156 );
157
158 let mut uuid = args.uid.to_string();
159 format_uuid(&mut uuid, &idx);
160 self.lock_uid.insert(uuid, resource.to_string());
161 });
162
163 Ok(true)
164 }
165
166 async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
167 if args.resources.len() > MAX_DELETE_LIST {
168 return Err(Error::other(format!(
169 "internal error: LocalLocker.unlock called with more than {MAX_DELETE_LIST} resources"
170 )));
171 }
172
173 let mut reply = false;
174 let mut err_info = String::new();
175 for resource in args.resources.iter() {
176 match self.lock_map.get_mut(resource) {
177 Some(lris) => {
178 if !is_write_lock(lris) {
179 if err_info.is_empty() {
180 err_info = format!("unlock attempted on a read locked entity: {resource}");
181 } else {
182 err_info.push_str(&format!(", {resource}"));
183 }
184 } else {
185 lris.retain(|lri| {
186 if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
187 let mut key = args.uid.to_string();
188 format_uuid(&mut key, &lri.idx);
189 self.lock_uid.remove(&key).unwrap();
190 reply |= true;
191 return false;
192 }
193
194 true
195 });
196 }
197 if lris.is_empty() {
198 self.lock_map.remove(resource);
199 }
200 }
201 None => {
202 continue;
203 }
204 };
205 }
206
207 Ok(reply)
208 }
209
210 async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
211 if args.resources.len() != 1 {
212 return Err(Error::other("internal error: localLocker.RLock called with more than one resource"));
213 }
214
215 let resource = &args.resources[0];
216 match self.lock_map.get_mut(resource) {
217 Some(lri) => {
218 if !is_write_lock(lri) {
219 lri.push(LockRequesterInfo {
220 name: resource.to_string(),
221 writer: false,
222 source: args.source.to_string(),
223 owner: args.owner.to_string(),
224 uid: args.uid.to_string(),
225 quorum: args.quorum,
226 ..Default::default()
227 });
228 } else {
229 return Ok(false);
230 }
231 }
232 None => {
233 self.lock_map.insert(
234 resource.to_string(),
235 vec![LockRequesterInfo {
236 name: resource.to_string(),
237 writer: false,
238 source: args.source.to_string(),
239 owner: args.owner.to_string(),
240 uid: args.uid.to_string(),
241 quorum: args.quorum,
242 ..Default::default()
243 }],
244 );
245 }
246 }
247 let mut uuid = args.uid.to_string();
248 format_uuid(&mut uuid, &0);
249 self.lock_uid.insert(uuid, resource.to_string());
250
251 Ok(true)
252 }
253
254 async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
255 if args.resources.len() != 1 {
256 return Err(Error::other("internal error: localLocker.RLock called with more than one resource"));
257 }
258
259 let mut reply = false;
260 let resource = &args.resources[0];
261 match self.lock_map.get_mut(resource) {
262 Some(lris) => {
263 if is_write_lock(lris) {
264 return Err(Error::other(format!("runlock attempted on a write locked entity: {resource}")));
265 } else {
266 lris.retain(|lri| {
267 if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
268 let mut key = args.uid.to_string();
269 format_uuid(&mut key, &lri.idx);
270 self.lock_uid.remove(&key).unwrap();
271 reply |= true;
272 return false;
273 }
274
275 true
276 });
277 }
278 if lris.is_empty() {
279 self.lock_map.remove(resource);
280 }
281 }
282 None => {
283 return Ok(reply);
284 }
285 };
286
287 Ok(reply)
288 }
289
290 async fn refresh(&mut self, args: &LockArgs) -> Result<bool> {
291 let mut idx = 0;
292 let mut key = args.uid.to_string();
293 format_uuid(&mut key, &idx);
294 match self.lock_uid.get(&key) {
295 Some(resource) => {
296 let mut resource = resource;
297 loop {
298 match self.lock_map.get_mut(resource) {
299 Some(_lris) => {}
300 None => {
301 let mut key = args.uid.to_string();
302 format_uuid(&mut key, &0);
303 self.lock_uid.remove(&key);
304 return Ok(idx > 0);
305 }
306 }
307
308 idx += 1;
309 let mut key = args.uid.to_string();
310 format_uuid(&mut key, &idx);
311 resource = match self.lock_uid.get(&key) {
312 Some(resource) => resource,
313 None => return Ok(true),
314 };
315 }
316 }
317 None => Ok(false),
318 }
319 }
320
321 async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
323 if args.uid.is_empty() {
324 args.resources.iter().for_each(|resource| {
325 if let Some(lris) = self.lock_map.get(resource) {
326 lris.iter().for_each(|lri| {
327 let mut key = lri.uid.to_string();
328 format_uuid(&mut key, &lri.idx);
329 self.lock_uid.remove(&key);
330 });
331 if lris.is_empty() {
332 self.lock_map.remove(resource);
333 }
334 }
335 });
336
337 return Ok(true);
338 }
339 let mut idx = 0;
340 let mut need_remove_resource = Vec::new();
341 let mut need_remove_map_id = Vec::new();
342 let reply = loop {
343 let mut map_id = args.uid.to_string();
344 format_uuid(&mut map_id, &idx);
345 match self.lock_uid.get(&map_id) {
346 Some(resource) => match self.lock_map.get_mut(resource) {
347 Some(lris) => {
348 {
349 lris.retain(|lri| {
350 if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
351 let mut key = args.uid.to_string();
352 format_uuid(&mut key, &lri.idx);
353 need_remove_map_id.push(key);
354 return false;
355 }
356
357 true
358 });
359 }
360 idx += 1;
361 if lris.is_empty() {
362 need_remove_resource.push(resource.to_string());
363 }
364 }
365 None => {
366 need_remove_map_id.push(map_id);
367 idx += 1;
368 continue;
369 }
370 },
371 None => {
372 break idx > 0;
373 }
374 }
375 };
376 need_remove_resource.into_iter().for_each(|resource| {
377 self.lock_map.remove(&resource);
378 });
379 need_remove_map_id.into_iter().for_each(|map_id| {
380 self.lock_uid.remove(&map_id);
381 });
382
383 Ok(reply)
384 }
385
386 async fn close(&self) {}
387
388 async fn is_online(&self) -> bool {
389 true
390 }
391
392 async fn is_local(&self) -> bool {
393 true
394 }
395}
396
397fn format_uuid(s: &mut String, idx: &usize) {
398 s.push_str(&idx.to_string());
399}
400
401#[cfg(test)]
402mod test {
403 use super::LocalLocker;
404 use crate::{Locker, lock_args::LockArgs};
405 use std::io::Result;
406 use tokio;
407
408 #[tokio::test]
409 async fn test_lock_unlock() -> Result<()> {
410 let mut local_locker = LocalLocker::new();
411 let args = LockArgs {
412 uid: "1111".to_string(),
413 resources: vec!["dandan".to_string()],
414 owner: "dd".to_string(),
415 source: "".to_string(),
416 quorum: 3,
417 };
418 local_locker.lock(&args).await?;
419
420 println!("lock local_locker: {local_locker:?} \n");
421
422 local_locker.unlock(&args).await?;
423 println!("unlock local_locker: {local_locker:?}");
424
425 Ok(())
426 }
427}