1use core::fmt;
2use std::sync::{Arc, LockResult, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError, TryLockResult};
3use std::{ptr, thread};
4use std::cell::{Cell, RefCell};
5use std::sync::atomic::AtomicPtr;
6use std::time::{Duration, Instant};
7use log::{debug, info, warn};
8use serde::{Serialize, Serializer};
9use serde::ser::Error;
10use crate::stacktrace_util::{backtrack_frame, BacktrackError, Frame, Stracktrace, ThreadInfo};
11use crate::thresholds_config;
12
13pub struct RwLockWrapped<T: ?Sized> {
15 stack_created: Option<Stracktrace>,
16 last_returned_lock_from: Arc<Mutex<Option<Stracktrace>>>,
18 inner: RwLock<T>,
20}
21
22unsafe impl<T: ?Sized + Send> Send for RwLockWrapped<T> {}
24unsafe impl<T: ?Sized + Send + Sync> Sync for RwLockWrapped<T> {}
25
26impl<T: ?Sized> Serialize for RwLockWrapped<T>
28 where
29 T: Serialize,
30{
31 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
32 where
33 S: Serializer,
34 {
35 match self.inner.read() {
36 Ok(locked) => locked.serialize(serializer),
37 Err(_) => Err(Error::custom("lock poison error while serializing")),
38 }
39 }
40}
41
42
43impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWrapped<T> {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 self.inner.fmt(f)
46 }
47}
48
49const LIB_VERSION: &str = env!("CARGO_PKG_VERSION");
50
51impl<T> RwLockWrapped<T> {
52 pub fn new(t: T) -> RwLockWrapped<T> {
53 info!("NEW WRAPPED RWLOCK (v{})", LIB_VERSION);
54 return match get_current_stracktrace() {
55 Ok(stracktrace) => {
56 RwLockWrapped {
57 inner: RwLock::new(t),
58 stack_created: Option::from(stracktrace),
59 last_returned_lock_from: Arc::new(Mutex::new(None)) }
60 }
61 Err(backtrack_error) => {
62 warn!("Unable to determine stacktrace - continue without! (error: {})", backtrack_error);
63 RwLockWrapped {
64 inner: RwLock::new(t),
65 stack_created: None,
66 last_returned_lock_from: Arc::new(Mutex::new(None)) }
67 }
68 }
69 }
70
71 pub fn to_rwlock(&self) -> &RwLock<T> {
72 &self.inner
73 }
74
75 pub fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> {
76 write_smart(&self)
77 }
78
79 pub fn try_read(&self) -> TryLockResult<RwLockReadGuard<'_, T>> {
80 self.inner.try_read()
81 }
82
83 pub fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> {
84 read_smart(&self)
85 }
86}
87
88impl<T: Default> Default for RwLockWrapped<T> {
89 fn default() -> RwLockWrapped<T> {
91 RwLockWrapped::new(Default::default())
92 }
93}
94
95fn write_smart<T>(rwlock_wrapped: &RwLockWrapped<T>) -> LockResult<RwLockWriteGuard<'_, T>> {
105 let rwlock = &rwlock_wrapped.inner;
106 let mut last_returned = &rwlock_wrapped.last_returned_lock_from;
107
108 let mut cnt: u64 = 0;
109 let wait_since = Instant::now();
111 loop {
112 match rwlock.try_write() {
113 Ok(guard) => {
114 let stack_caller = get_current_stracktrace();
115 *last_returned.lock().unwrap() = Some(stack_caller.expect("stacktrace should be available"));
116 return Ok(guard);
117 }
118 Err(err) => {
119 match err {
120 TryLockError::Poisoned(poison) => {
121 return Err(poison);
122 }
123 TryLockError::WouldBlock => {
124 let waittime_elapsed = wait_since.elapsed();
125 if thresholds_config::should_inspect_lock(cnt) {
126 let stack_caller = get_current_stracktrace();
127 let thread = thread::current();
128 let thread_info = ThreadInfo { thread_id: thread.id(), name: thread.name().unwrap_or("no_thread").to_string() };
129 let stacktrace_created = &rwlock_wrapped.stack_created;
130 let last_lock_from = &rwlock_wrapped.last_returned_lock_from;
131
132 handle_blocked_writer_event(wait_since, waittime_elapsed, cnt,
135 thread_info,
136 stacktrace_created.clone(),
137 last_lock_from.clone(),
138 &stack_caller.ok());
139 }
140
141 thresholds_config::sleep_backoff(cnt);
142 cnt += 1;
143 }
144 }
145 }
146 }
147 }
148}
149
150
151fn read_smart<T>(rwlock_wrapped: &RwLockWrapped<T>) -> LockResult<RwLockReadGuard<'_, T>> {
152 let rwlock = &rwlock_wrapped.inner;
153 let mut last_returned = &rwlock_wrapped.last_returned_lock_from;
154
155 let mut cnt: u64 = 0;
156 let wait_since = Instant::now();
158 loop {
159 match rwlock.try_read() {
160 Ok(guard) => {
161 let stack_caller = get_current_stracktrace();
162 *last_returned.lock().unwrap() = Some(stack_caller.expect("stacktrace should be available"));
163 return Ok(guard);
164 }
165 Err(err) => {
166 match err {
167 TryLockError::Poisoned(poison) => {
168 return Err(poison);
169 }
170 TryLockError::WouldBlock => {
171 let waittime_elapsed = wait_since.elapsed();
172 if thresholds_config::should_inspect_lock(cnt) {
173 let stack_caller = get_current_stracktrace();
174 let thread = thread::current();
175 let thread_info = ThreadInfo { thread_id: thread.id(), name: thread.name().unwrap_or("no_thread").to_string() };
176 let stacktrace_created = &rwlock_wrapped.stack_created;
177 let last_lock_from = &rwlock_wrapped.last_returned_lock_from;
178
179 handle_blocked_reader_event(wait_since, waittime_elapsed, cnt,
182 thread_info,
183 stacktrace_created.clone(),
184 last_lock_from.clone(),
185 &stack_caller.ok());
186 }
187
188 thresholds_config::sleep_backoff(cnt);
189 cnt += 1;
190 }
191 }
192 }
193 }
194 }
195}
196
197fn handle_blocked_writer_event(_since: Instant, elapsed: Duration,
200 cnt: u64,
201 thread: ThreadInfo,
202 stacktrace_created: &Option<Stracktrace>,
203 last_returned_lock_from: Arc<Mutex<Option<Stracktrace>>>,
204 stacktrace_caller: &Option<Stracktrace>) {
205 let locktag = get_lock_identifier(stacktrace_created);
206
207 info!("WRITER BLOCKED on thread {} for {:?} (locktag {})", thread, elapsed, locktag);
208
209 match stacktrace_caller {
210 None => {}
211 Some(stacktrace) => {
212 log_frames("blocking call", locktag, &stacktrace);
213 }
214 }
215
216 match last_returned_lock_from.lock().unwrap().as_ref() {
217 None => {}
218 Some(stacktrace) => {
219 log_frames("current lock holder", locktag, &stacktrace);
220 }
221 }
222
223 match stacktrace_created {
224 None => {}
225 Some(stacktrace) => {
226 log_frames("rwlock constructed here", locktag, &stacktrace);
227 }
228 }
229}
230
231fn handle_blocked_reader_event(_since: Instant, elapsed: Duration,
232 cnt: u64,
233 thread: ThreadInfo,
234 stacktrace_created: &Option<Stracktrace>,
235 last_returned_lock_from: Arc<Mutex<Option<Stracktrace>>>,
236 stacktrace_caller: &Option<Stracktrace>) {
237 let locktag = get_lock_identifier(stacktrace_created);
238
239 info!("READER BLOCKED on thread {} for {:?} (locktag {})", thread, elapsed, locktag);
240
241 match stacktrace_caller {
242 None => {}
243 Some(stacktrace) => {
244 log_frames("blocking call", locktag, &stacktrace);
245 }
246 }
247
248 match last_returned_lock_from.lock().unwrap().as_ref() {
249 None => {}
250 Some(stacktrace) => {
251 log_frames("current lock holder", locktag, &stacktrace);
252 }
253 }
254
255 match stacktrace_created {
256 None => {}
257 Some(stacktrace) => {
258 log_frames("rwlock constructed here", locktag, &stacktrace);
259 }
260 }
261}
262
263fn log_frames(msg: &str, locktag: &str, stacktrace: &&Stracktrace) {
264 debug!(" |{}>\t{}:", locktag, msg);
265 for frame in &stacktrace.frames {
266 debug!(" |{}>\t {}!{}:{}", locktag, frame.filename, frame.method, frame.line_no);
267 }
268}
269
270fn get_lock_identifier(stacktrace_created: &Option<Stracktrace>) -> &str {
272 match stacktrace_created {
273 None => "n/a",
274 Some(stacktrace) => &stacktrace.hash.as_ref()
275 }
276}
277
278fn get_current_stracktrace() -> Result<Stracktrace, BacktrackError> {
279 const OMIT_FRAME_SUFFIX1: &str = "rust_debugging_locks:";
283 const OMIT_FRAME_SUFFIX2: &str = "<rust_debugging_locks:";
285 backtrack_frame(|symbol_name| symbol_name.starts_with(OMIT_FRAME_SUFFIX1) || symbol_name.starts_with(OMIT_FRAME_SUFFIX2))
286}
287