1use core::fmt;
2use std::{
3 collections::HashSet,
4 sync::{Arc, Mutex},
5 thread::sleep,
6 time::Instant,
7};
8
9use log::error;
10
11use crate::{
12 btree::page::BTreePageID,
13 error::SmallError,
14 transaction::Transaction,
15 types::{ConcurrentHashMap, SmallResult},
16 utils::HandyRwLock,
17 Unique,
18};
19
20#[derive(Debug)]
21pub enum Lock {
22 XLock,
23 SLock,
24}
25
26#[derive(Debug)]
27pub enum Permission {
28 ReadOnly,
29 ReadWrite,
30}
31
32impl Permission {
33 pub fn to_lock(&self) -> Lock {
34 match self {
35 Permission::ReadOnly => Lock::SLock,
36 Permission::ReadWrite => Lock::XLock,
37 }
38 }
39}
40
41pub enum AcquireResult {
42 Acquired,
43 Granted,
44}
45
46pub struct ConcurrentStatus {
49 s_lock_map: ConcurrentHashMap<BTreePageID, HashSet<Transaction>>,
50 x_lock_map: ConcurrentHashMap<BTreePageID, Transaction>,
51 pub hold_pages:
52 ConcurrentHashMap<Transaction, HashSet<BTreePageID>>,
53 modification_lock: Arc<Mutex<()>>,
54}
55
56impl ConcurrentStatus {
57 pub fn new() -> Self {
58 Self {
59 s_lock_map: ConcurrentHashMap::new(),
60 x_lock_map: ConcurrentHashMap::new(),
61 hold_pages: ConcurrentHashMap::new(),
62 modification_lock: Arc::new(Mutex::new(())),
63 }
64 }
65
66 pub fn request_lock(
67 &self,
68 tx: &Transaction,
69 lock: &Lock,
70 page_id: &BTreePageID,
71 ) -> Result<(), SmallError> {
72 let start_time = Instant::now();
78 while Instant::now().duration_since(start_time).as_secs() < 3
79 {
80 if Unique::concurrent_status()
81 .add_lock(tx, lock, page_id)?
82 {
83 return Ok(());
84 }
85
86 sleep(std::time::Duration::from_millis(10));
87 }
88
89 error!(
90 "acquire_lock timeout, tx: {}, lock: {:?}, page_id: {:?},
91 concurrent_status_map: {:?}",
92 tx, lock, page_id, self,
93 );
94
95 panic!("acquire_lock timeout");
96
97 return Err(SmallError::new("acquire lock timeout"));
98 }
99
100 fn add_lock(
118 &self,
119 tx: &Transaction,
120 lock: &Lock,
121 page_id: &BTreePageID,
122 ) -> Result<bool, SmallError> {
123 let _guard = self.modification_lock.lock().unwrap();
124
125 if !self.x_lock_map.exact_or_empty(page_id, tx) {
126 return Ok(false);
127 }
128
129 match lock {
130 Lock::SLock => {
131 self.s_lock_map.alter_value(
132 page_id,
133 |s_lock_set| {
134 s_lock_set.insert(tx.clone());
135 Ok(())
136 },
137 )?;
138 }
139 Lock::XLock => {
140 self.x_lock_map
141 .get_inner()
142 .wl()
143 .insert(page_id.clone(), tx.clone());
144 }
145 }
146
147 self.hold_pages.alter_value(tx, |hold_pages_set| {
148 hold_pages_set.insert(*page_id);
149 Ok(())
150 })?;
151
152 return Ok(true);
157 }
158
159 pub fn release_lock_by_tx(
160 &self,
161 tx: &Transaction,
162 ) -> SmallResult {
163 if !self.hold_pages.get_inner().rl().contains_key(tx) {
164 return Ok(());
165 }
166
167 let hold_pages =
168 self.hold_pages.get_inner().rl().get(tx).unwrap().clone();
169 for page_id in hold_pages {
170 self.release_lock(tx, &page_id)?;
171 }
172
173 self.hold_pages.remove(tx);
174
175 return Ok(());
176 }
177
178 fn release_lock(
179 &self,
180 tx: &Transaction,
181 page_id: &BTreePageID,
182 ) -> SmallResult {
183 let mut s_lock_map = self.s_lock_map.get_inner_wl();
184 if let Some(v) = s_lock_map.get_mut(page_id) {
185 v.remove(tx);
190 if v.len() == 0 {
191 s_lock_map.remove(page_id);
192 }
193 }
194
195 let mut x_lock_map = self.x_lock_map.get_inner_wl();
196 if let Some(_) = x_lock_map.get_mut(page_id) {
197 x_lock_map.remove(page_id);
202 }
203
204 return Ok(());
205 }
206
207 pub fn holds_lock(
208 &self,
209 tx: &Transaction,
210 page_id: &BTreePageID,
211 ) -> bool {
212 let s_lock_map = self.s_lock_map.get_inner_rl();
213 let x_lock_map = self.x_lock_map.get_inner_rl();
214
215 if let Some(v) = s_lock_map.get(page_id) {
216 if v.contains(tx) {
217 return true;
218 }
219 }
220
221 if let Some(v) = x_lock_map.get(page_id) {
222 if v == tx {
223 return true;
224 }
225 }
226
227 return false;
228 }
229
230 pub fn get_page_tx(
231 &self,
232 page_id: &BTreePageID,
233 ) -> Option<Transaction> {
234 let x_lock_map = self.x_lock_map.get_inner_rl();
235 if let Some(v) = x_lock_map.get(page_id) {
236 return Some(v.clone());
237 }
238
239 return None;
240 }
241
242 pub fn clear(&self) {
243 self.s_lock_map.get_inner().wl().clear();
244 self.x_lock_map.get_inner().wl().clear();
245 self.hold_pages.clear();
246 }
247}
248
249impl fmt::Display for ConcurrentStatus {
250 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251 let mut depiction = "\n".to_string();
252
253 depiction.push_str("s_lock_map.get_inner().rl(): {");
255 for (k, v) in self.s_lock_map.get_inner().rl().iter() {
256 depiction.push_str(&format!(
257 "\n\t{:?} -> [",
258 k.get_short_repr()
259 ));
260 for tx in v {
261 depiction.push_str(&format!("\n\t\t{:?}, ", tx));
262 }
263 depiction.push_str("\n\t]");
264 }
265 depiction.push_str("\n}\n");
266
267 depiction.push_str("x_lock_map.get_inner().rl(): {");
269 for (k, v) in self.x_lock_map.get_inner().rl().iter() {
270 depiction.push_str(&format!(
271 "\n\t{:?} -> {:?}, ",
272 k.get_short_repr(),
273 v
274 ));
275 }
276 depiction.push_str("\n}\n");
277
278 depiction.push_str("hold_pages: {");
280 for (k, v) in self.hold_pages.get_inner().rl().iter() {
281 depiction.push_str(&format!("\n\t{:?} -> [", k));
282 for page_id in v {
283 depiction.push_str(&format!(
284 "\n\t\t{:?}, ",
285 page_id.get_short_repr()
286 ));
287 }
288 depiction.push_str("\n\t]\n");
289 }
290
291 return write!(f, "{}", depiction);
292 }
293}
294
295impl fmt::Debug for ConcurrentStatus {
296 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
297 return write!(f, "{}", self);
298 }
299}