rustfs_lock/
local_locker.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use std::io::{Error, Result};
17use std::{
18    collections::HashMap,
19    time::{Duration, Instant},
20};
21
22use crate::{Locker, lock_args::LockArgs};
23
24pub const MAX_DELETE_LIST: usize = 1000;
25
26#[derive(Clone, Debug)]
27struct LockRequesterInfo {
28    name: String,
29    writer: bool,
30    uid: String,
31    time_stamp: Instant,
32    time_last_refresh: Instant,
33    source: String,
34    group: bool,
35    owner: String,
36    quorum: usize,
37    idx: usize,
38}
39
40impl Default for LockRequesterInfo {
41    fn default() -> Self {
42        Self {
43            name: Default::default(),
44            writer: Default::default(),
45            uid: Default::default(),
46            time_stamp: Instant::now(),
47            time_last_refresh: Instant::now(),
48            source: Default::default(),
49            group: Default::default(),
50            owner: Default::default(),
51            quorum: Default::default(),
52            idx: Default::default(),
53        }
54    }
55}
56
57fn is_write_lock(lri: &[LockRequesterInfo]) -> bool {
58    lri.len() == 1 && lri[0].writer
59}
60
61#[derive(Debug, Default)]
62pub struct LockStats {
63    total: usize,
64    writes: usize,
65    reads: usize,
66}
67
68#[derive(Debug, Default)]
69pub struct LocalLocker {
70    lock_map: HashMap<String, Vec<LockRequesterInfo>>,
71    lock_uid: HashMap<String, String>,
72}
73
74impl LocalLocker {
75    pub fn new() -> Self {
76        LocalLocker::default()
77    }
78}
79
80impl LocalLocker {
81    fn can_take_lock(&self, resource: &[String]) -> bool {
82        resource.iter().fold(true, |acc, x| !self.lock_map.contains_key(x) && acc)
83    }
84
85    pub fn stats(&self) -> LockStats {
86        let mut st = LockStats {
87            total: self.lock_map.len(),
88            ..Default::default()
89        };
90
91        self.lock_map.iter().for_each(|(_, value)| {
92            if !value.is_empty() {
93                if value[0].writer {
94                    st.writes += 1;
95                } else {
96                    st.reads += 1;
97                }
98            }
99        });
100
101        st
102    }
103
104    fn dump_lock_map(&mut self) -> HashMap<String, Vec<LockRequesterInfo>> {
105        let mut lock_copy = HashMap::new();
106        self.lock_map.iter().for_each(|(key, value)| {
107            lock_copy.insert(key.to_string(), value.to_vec());
108        });
109
110        lock_copy
111    }
112
113    fn expire_old_locks(&mut self, interval: Duration) {
114        self.lock_map.iter_mut().for_each(|(_, lris)| {
115            lris.retain(|lri| {
116                if Instant::now().duration_since(lri.time_last_refresh) > interval {
117                    let mut key = lri.uid.to_string();
118                    format_uuid(&mut key, &lri.idx);
119                    self.lock_uid.remove(&key);
120                    return false;
121                }
122
123                true
124            });
125        });
126    }
127}
128
129#[async_trait]
130impl Locker for LocalLocker {
131    async fn lock(&mut self, args: &LockArgs) -> Result<bool> {
132        if args.resources.len() > MAX_DELETE_LIST {
133            return Err(Error::other(format!(
134                "internal error: LocalLocker.lock called with more than {MAX_DELETE_LIST} resources"
135            )));
136        }
137
138        if !self.can_take_lock(&args.resources) {
139            return Ok(false);
140        }
141
142        args.resources.iter().enumerate().for_each(|(idx, resource)| {
143            self.lock_map.insert(
144                resource.to_string(),
145                vec![LockRequesterInfo {
146                    name: resource.to_string(),
147                    writer: true,
148                    source: args.source.to_string(),
149                    owner: args.owner.to_string(),
150                    uid: args.uid.to_string(),
151                    group: args.resources.len() > 1,
152                    quorum: args.quorum,
153                    idx,
154                    ..Default::default()
155                }],
156            );
157
158            let mut uuid = args.uid.to_string();
159            format_uuid(&mut uuid, &idx);
160            self.lock_uid.insert(uuid, resource.to_string());
161        });
162
163        Ok(true)
164    }
165
166    async fn unlock(&mut self, args: &LockArgs) -> Result<bool> {
167        if args.resources.len() > MAX_DELETE_LIST {
168            return Err(Error::other(format!(
169                "internal error: LocalLocker.unlock called with more than {MAX_DELETE_LIST} resources"
170            )));
171        }
172
173        let mut reply = false;
174        let mut err_info = String::new();
175        for resource in args.resources.iter() {
176            match self.lock_map.get_mut(resource) {
177                Some(lris) => {
178                    if !is_write_lock(lris) {
179                        if err_info.is_empty() {
180                            err_info = format!("unlock attempted on a read locked entity: {resource}");
181                        } else {
182                            err_info.push_str(&format!(", {resource}"));
183                        }
184                    } else {
185                        lris.retain(|lri| {
186                            if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
187                                let mut key = args.uid.to_string();
188                                format_uuid(&mut key, &lri.idx);
189                                self.lock_uid.remove(&key).unwrap();
190                                reply |= true;
191                                return false;
192                            }
193
194                            true
195                        });
196                    }
197                    if lris.is_empty() {
198                        self.lock_map.remove(resource);
199                    }
200                }
201                None => {
202                    continue;
203                }
204            };
205        }
206
207        Ok(reply)
208    }
209
210    async fn rlock(&mut self, args: &LockArgs) -> Result<bool> {
211        if args.resources.len() != 1 {
212            return Err(Error::other("internal error: localLocker.RLock called with more than one resource"));
213        }
214
215        let resource = &args.resources[0];
216        match self.lock_map.get_mut(resource) {
217            Some(lri) => {
218                if !is_write_lock(lri) {
219                    lri.push(LockRequesterInfo {
220                        name: resource.to_string(),
221                        writer: false,
222                        source: args.source.to_string(),
223                        owner: args.owner.to_string(),
224                        uid: args.uid.to_string(),
225                        quorum: args.quorum,
226                        ..Default::default()
227                    });
228                } else {
229                    return Ok(false);
230                }
231            }
232            None => {
233                self.lock_map.insert(
234                    resource.to_string(),
235                    vec![LockRequesterInfo {
236                        name: resource.to_string(),
237                        writer: false,
238                        source: args.source.to_string(),
239                        owner: args.owner.to_string(),
240                        uid: args.uid.to_string(),
241                        quorum: args.quorum,
242                        ..Default::default()
243                    }],
244                );
245            }
246        }
247        let mut uuid = args.uid.to_string();
248        format_uuid(&mut uuid, &0);
249        self.lock_uid.insert(uuid, resource.to_string());
250
251        Ok(true)
252    }
253
254    async fn runlock(&mut self, args: &LockArgs) -> Result<bool> {
255        if args.resources.len() != 1 {
256            return Err(Error::other("internal error: localLocker.RLock called with more than one resource"));
257        }
258
259        let mut reply = false;
260        let resource = &args.resources[0];
261        match self.lock_map.get_mut(resource) {
262            Some(lris) => {
263                if is_write_lock(lris) {
264                    return Err(Error::other(format!("runlock attempted on a write locked entity: {resource}")));
265                } else {
266                    lris.retain(|lri| {
267                        if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
268                            let mut key = args.uid.to_string();
269                            format_uuid(&mut key, &lri.idx);
270                            self.lock_uid.remove(&key).unwrap();
271                            reply |= true;
272                            return false;
273                        }
274
275                        true
276                    });
277                }
278                if lris.is_empty() {
279                    self.lock_map.remove(resource);
280                }
281            }
282            None => {
283                return Ok(reply);
284            }
285        };
286
287        Ok(reply)
288    }
289
290    async fn refresh(&mut self, args: &LockArgs) -> Result<bool> {
291        let mut idx = 0;
292        let mut key = args.uid.to_string();
293        format_uuid(&mut key, &idx);
294        match self.lock_uid.get(&key) {
295            Some(resource) => {
296                let mut resource = resource;
297                loop {
298                    match self.lock_map.get_mut(resource) {
299                        Some(_lris) => {}
300                        None => {
301                            let mut key = args.uid.to_string();
302                            format_uuid(&mut key, &0);
303                            self.lock_uid.remove(&key);
304                            return Ok(idx > 0);
305                        }
306                    }
307
308                    idx += 1;
309                    let mut key = args.uid.to_string();
310                    format_uuid(&mut key, &idx);
311                    resource = match self.lock_uid.get(&key) {
312                        Some(resource) => resource,
313                        None => return Ok(true),
314                    };
315                }
316            }
317            None => Ok(false),
318        }
319    }
320
321    // TODO: need add timeout mechanism
322    async fn force_unlock(&mut self, args: &LockArgs) -> Result<bool> {
323        if args.uid.is_empty() {
324            args.resources.iter().for_each(|resource| {
325                if let Some(lris) = self.lock_map.get(resource) {
326                    lris.iter().for_each(|lri| {
327                        let mut key = lri.uid.to_string();
328                        format_uuid(&mut key, &lri.idx);
329                        self.lock_uid.remove(&key);
330                    });
331                    if lris.is_empty() {
332                        self.lock_map.remove(resource);
333                    }
334                }
335            });
336
337            return Ok(true);
338        }
339        let mut idx = 0;
340        let mut need_remove_resource = Vec::new();
341        let mut need_remove_map_id = Vec::new();
342        let reply = loop {
343            let mut map_id = args.uid.to_string();
344            format_uuid(&mut map_id, &idx);
345            match self.lock_uid.get(&map_id) {
346                Some(resource) => match self.lock_map.get_mut(resource) {
347                    Some(lris) => {
348                        {
349                            lris.retain(|lri| {
350                                if lri.uid == args.uid && (args.owner.is_empty() || lri.owner == args.owner) {
351                                    let mut key = args.uid.to_string();
352                                    format_uuid(&mut key, &lri.idx);
353                                    need_remove_map_id.push(key);
354                                    return false;
355                                }
356
357                                true
358                            });
359                        }
360                        idx += 1;
361                        if lris.is_empty() {
362                            need_remove_resource.push(resource.to_string());
363                        }
364                    }
365                    None => {
366                        need_remove_map_id.push(map_id);
367                        idx += 1;
368                        continue;
369                    }
370                },
371                None => {
372                    break idx > 0;
373                }
374            }
375        };
376        need_remove_resource.into_iter().for_each(|resource| {
377            self.lock_map.remove(&resource);
378        });
379        need_remove_map_id.into_iter().for_each(|map_id| {
380            self.lock_uid.remove(&map_id);
381        });
382
383        Ok(reply)
384    }
385
386    async fn close(&self) {}
387
388    async fn is_online(&self) -> bool {
389        true
390    }
391
392    async fn is_local(&self) -> bool {
393        true
394    }
395}
396
397fn format_uuid(s: &mut String, idx: &usize) {
398    s.push_str(&idx.to_string());
399}
400
401#[cfg(test)]
402mod test {
403    use super::LocalLocker;
404    use crate::{Locker, lock_args::LockArgs};
405    use std::io::Result;
406    use tokio;
407
408    #[tokio::test]
409    async fn test_lock_unlock() -> Result<()> {
410        let mut local_locker = LocalLocker::new();
411        let args = LockArgs {
412            uid: "1111".to_string(),
413            resources: vec!["dandan".to_string()],
414            owner: "dd".to_string(),
415            source: "".to_string(),
416            quorum: 3,
417        };
418        local_locker.lock(&args).await?;
419
420        println!("lock local_locker: {local_locker:?} \n");
421
422        local_locker.unlock(&args).await?;
423        println!("unlock local_locker: {local_locker:?}");
424
425        Ok(())
426    }
427}