small_db/
concurrent_status.rs

1use core::fmt;
2use std::{
3    collections::HashSet,
4    sync::{Arc, Mutex},
5    thread::sleep,
6    time::Instant,
7};
8
9use log::error;
10
11use crate::{
12    btree::page::BTreePageID,
13    error::SmallError,
14    transaction::Transaction,
15    types::{ConcurrentHashMap, SmallResult},
16    utils::HandyRwLock,
17    Unique,
18};
19
20#[derive(Debug)]
21pub enum Lock {
22    XLock,
23    SLock,
24}
25
26#[derive(Debug)]
27pub enum Permission {
28    ReadOnly,
29    ReadWrite,
30}
31
32impl Permission {
33    pub fn to_lock(&self) -> Lock {
34        match self {
35            Permission::ReadOnly => Lock::SLock,
36            Permission::ReadWrite => Lock::XLock,
37        }
38    }
39}
40
41pub enum AcquireResult {
42    Acquired,
43    Granted,
44}
45
46/// reference:
47/// - https://sourcegraph.com/github.com/XiaochenCui/small-db-hw@87607789b677d6afee00a223eacb4f441bd4ae87/-/blob/src/java/smalldb/ConcurrentStatus.java?L12:14&subtree=true
48pub struct ConcurrentStatus {
49    s_lock_map: ConcurrentHashMap<BTreePageID, HashSet<Transaction>>,
50    x_lock_map: ConcurrentHashMap<BTreePageID, Transaction>,
51    pub hold_pages:
52        ConcurrentHashMap<Transaction, HashSet<BTreePageID>>,
53    modification_lock: Arc<Mutex<()>>,
54}
55
56impl ConcurrentStatus {
57    pub fn new() -> Self {
58        Self {
59            s_lock_map: ConcurrentHashMap::new(),
60            x_lock_map: ConcurrentHashMap::new(),
61            hold_pages: ConcurrentHashMap::new(),
62            modification_lock: Arc::new(Mutex::new(())),
63        }
64    }
65
66    pub fn request_lock(
67        &self,
68        tx: &Transaction,
69        lock: &Lock,
70        page_id: &BTreePageID,
71    ) -> Result<(), SmallError> {
72        // debug!(
73        //     "request lock, tx: {:?}, lock: {:?}, page_id: {:?}",
74        //     tx, lock, page_id
75        // );
76
77        let start_time = Instant::now();
78        while Instant::now().duration_since(start_time).as_secs() < 3
79        {
80            if Unique::concurrent_status()
81                .add_lock(tx, lock, page_id)?
82            {
83                return Ok(());
84            }
85
86            sleep(std::time::Duration::from_millis(10));
87        }
88
89        error!(
90            "acquire_lock timeout, tx: {}, lock: {:?}, page_id: {:?},
91        concurrent_status_map: {:?}",
92            tx, lock, page_id, self,
93        );
94
95        panic!("acquire_lock timeout");
96
97        return Err(SmallError::new("acquire lock timeout"));
98    }
99
100    // Add a lock to the given page. This api is idempotent.
101    //
102    // Given the conditions that:
103    // 1. This method could only have at most one runner at a time,
104    // because it need modification actions on several maps.
105    // 2. This method should not ask for exclusive permission (&mut
106    // self) on the ConcurrentStatus, because we granteed that
107    // multiple threads could ask for lock simultaneously (via
108    // request_lock/acquire_lock).
109    //
110    // So, we use a unique lock to prevent this method from being
111    // called by multiple threads at the same time.
112    //
113    // # Return
114    //
115    // Return a bool value to indicate whether the lock is added
116    // successfully.
117    fn add_lock(
118        &self,
119        tx: &Transaction,
120        lock: &Lock,
121        page_id: &BTreePageID,
122    ) -> Result<bool, SmallError> {
123        let _guard = self.modification_lock.lock().unwrap();
124
125        if !self.x_lock_map.exact_or_empty(page_id, tx) {
126            return Ok(false);
127        }
128
129        match lock {
130            Lock::SLock => {
131                self.s_lock_map.alter_value(
132                    page_id,
133                    |s_lock_set| {
134                        s_lock_set.insert(tx.clone());
135                        Ok(())
136                    },
137                )?;
138            }
139            Lock::XLock => {
140                self.x_lock_map
141                    .get_inner()
142                    .wl()
143                    .insert(page_id.clone(), tx.clone());
144            }
145        }
146
147        self.hold_pages.alter_value(tx, |hold_pages_set| {
148            hold_pages_set.insert(*page_id);
149            Ok(())
150        })?;
151
152        // debug!(
153        //     "lock_acquired, tx: {}, lock: {:?}, page_id: {:?}",
154        //     tx, lock, page_id
155        // );
156        return Ok(true);
157    }
158
159    pub fn release_lock_by_tx(
160        &self,
161        tx: &Transaction,
162    ) -> SmallResult {
163        if !self.hold_pages.get_inner().rl().contains_key(tx) {
164            return Ok(());
165        }
166
167        let hold_pages =
168            self.hold_pages.get_inner().rl().get(tx).unwrap().clone();
169        for page_id in hold_pages {
170            self.release_lock(tx, &page_id)?;
171        }
172
173        self.hold_pages.remove(tx);
174
175        return Ok(());
176    }
177
178    fn release_lock(
179        &self,
180        tx: &Transaction,
181        page_id: &BTreePageID,
182    ) -> SmallResult {
183        let mut s_lock_map = self.s_lock_map.get_inner_wl();
184        if let Some(v) = s_lock_map.get_mut(page_id) {
185            // debug!(
186            //     "release_lock_shared, tx: {}, page_id: {:?}",
187            //     tx, page_id
188            // );
189            v.remove(tx);
190            if v.len() == 0 {
191                s_lock_map.remove(page_id);
192            }
193        }
194
195        let mut x_lock_map = self.x_lock_map.get_inner_wl();
196        if let Some(_) = x_lock_map.get_mut(page_id) {
197            // debug!(
198            //     "release_lock_exclusive, tx: {}, page_id: {:?}",
199            //     tx, page_id
200            // );
201            x_lock_map.remove(page_id);
202        }
203
204        return Ok(());
205    }
206
207    pub fn holds_lock(
208        &self,
209        tx: &Transaction,
210        page_id: &BTreePageID,
211    ) -> bool {
212        let s_lock_map = self.s_lock_map.get_inner_rl();
213        let x_lock_map = self.x_lock_map.get_inner_rl();
214
215        if let Some(v) = s_lock_map.get(page_id) {
216            if v.contains(tx) {
217                return true;
218            }
219        }
220
221        if let Some(v) = x_lock_map.get(page_id) {
222            if v == tx {
223                return true;
224            }
225        }
226
227        return false;
228    }
229
230    pub fn get_page_tx(
231        &self,
232        page_id: &BTreePageID,
233    ) -> Option<Transaction> {
234        let x_lock_map = self.x_lock_map.get_inner_rl();
235        if let Some(v) = x_lock_map.get(page_id) {
236            return Some(v.clone());
237        }
238
239        return None;
240    }
241
242    pub fn clear(&self) {
243        self.s_lock_map.get_inner().wl().clear();
244        self.x_lock_map.get_inner().wl().clear();
245        self.hold_pages.clear();
246    }
247}
248
249impl fmt::Display for ConcurrentStatus {
250    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251        let mut depiction = "\n".to_string();
252
253        // s_lock_map.get_inner().rl()
254        depiction.push_str("s_lock_map.get_inner().rl(): {");
255        for (k, v) in self.s_lock_map.get_inner().rl().iter() {
256            depiction.push_str(&format!(
257                "\n\t{:?} -> [",
258                k.get_short_repr()
259            ));
260            for tx in v {
261                depiction.push_str(&format!("\n\t\t{:?}, ", tx));
262            }
263            depiction.push_str("\n\t]");
264        }
265        depiction.push_str("\n}\n");
266
267        // x_lock_map.get_inner().rl()
268        depiction.push_str("x_lock_map.get_inner().rl(): {");
269        for (k, v) in self.x_lock_map.get_inner().rl().iter() {
270            depiction.push_str(&format!(
271                "\n\t{:?} -> {:?}, ",
272                k.get_short_repr(),
273                v
274            ));
275        }
276        depiction.push_str("\n}\n");
277
278        // hold_pages
279        depiction.push_str("hold_pages: {");
280        for (k, v) in self.hold_pages.get_inner().rl().iter() {
281            depiction.push_str(&format!("\n\t{:?} -> [", k));
282            for page_id in v {
283                depiction.push_str(&format!(
284                    "\n\t\t{:?}, ",
285                    page_id.get_short_repr()
286                ));
287            }
288            depiction.push_str("\n\t]\n");
289        }
290
291        return write!(f, "{}", depiction);
292    }
293}
294
295impl fmt::Debug for ConcurrentStatus {
296    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297        return write!(f, "{}", self);
298    }
299}