loom/rt/
thread.rs

1use crate::rt::execution;
2use crate::rt::object::Operation;
3use crate::rt::vv::VersionVec;
4
5use std::{any::Any, collections::HashMap, fmt, ops};
6
7use super::Location;
8pub(crate) struct Thread {
9    pub id: Id,
10
11    /// If the thread is runnable, blocked, or terminated.
12    pub state: State,
13
14    /// True if the thread is in a critical section
15    pub critical: bool,
16
17    /// The operation the thread is about to take
18    pub(super) operation: Option<Operation>,
19
20    /// Tracks observed causality
21    pub causality: VersionVec,
22
23    /// Tracks the view of the lastest release fence
24    pub released: VersionVec,
25
26    /// Tracks DPOR relations
27    pub dpor_vv: VersionVec,
28
29    /// Version at which the thread last yielded
30    pub last_yield: Option<u16>,
31
32    /// Number of times the thread yielded
33    pub yield_count: usize,
34
35    locals: LocalMap,
36
37    /// `tracing` span used to associate diagnostics with the current thread.
38    span: tracing::Span,
39}
40
41#[derive(Debug)]
42pub(crate) struct Set {
43    /// Unique execution identifier
44    execution_id: execution::Id,
45
46    /// Set of threads
47    threads: Vec<Thread>,
48
49    /// Currently scheduled thread.
50    ///
51    /// `None` signifies that no thread is runnable.
52    active: Option<usize>,
53
54    /// Sequential consistency causality. All sequentially consistent operations
55    /// synchronize with this causality.
56    pub seq_cst_causality: VersionVec,
57
58    /// `tracing` span used as the parent for new thread spans.
59    iteration_span: tracing::Span,
60}
61
62#[derive(Eq, PartialEq, Hash, Copy, Clone)]
63pub(crate) struct Id {
64    execution_id: execution::Id,
65    id: usize,
66}
67
68impl Id {
69    /// Returns an integer ID unique to this current execution (for use in
70    /// [`thread::ThreadId`]'s `Debug` impl)
71    pub(crate) fn public_id(&self) -> usize {
72        self.id
73    }
74}
75
76#[derive(Debug, Clone, Copy)]
77pub(crate) enum State {
78    Runnable { unparked: bool },
79    Blocked(#[allow(dead_code)] Location),
80    Yield,
81    Terminated,
82}
83
84type LocalMap = HashMap<LocalKeyId, LocalValue>;
85
86#[derive(Eq, PartialEq, Hash, Copy, Clone)]
87struct LocalKeyId(usize);
88
89struct LocalValue(Option<Box<dyn Any>>);
90
91impl Thread {
92    fn new(id: Id, parent_span: &tracing::Span) -> Thread {
93        Thread {
94            id,
95            span: tracing::info_span!(parent: parent_span.id(), "thread", id = id.id),
96            state: State::Runnable { unparked: false },
97            critical: false,
98            operation: None,
99            causality: VersionVec::new(),
100            released: VersionVec::new(),
101            dpor_vv: VersionVec::new(),
102            last_yield: None,
103            yield_count: 0,
104            locals: HashMap::new(),
105        }
106    }
107
108    pub(crate) fn is_runnable(&self) -> bool {
109        matches!(self.state, State::Runnable { .. })
110    }
111
112    pub(crate) fn set_runnable(&mut self) {
113        self.state = State::Runnable { unparked: false };
114    }
115
116    pub(crate) fn set_blocked(&mut self, location: Location) {
117        self.state = State::Blocked(location);
118    }
119
120    pub(crate) fn is_blocked(&self) -> bool {
121        matches!(self.state, State::Blocked(..))
122    }
123
124    pub(crate) fn is_yield(&self) -> bool {
125        matches!(self.state, State::Yield)
126    }
127
128    pub(crate) fn set_yield(&mut self) {
129        self.state = State::Yield;
130        self.last_yield = Some(self.causality[self.id]);
131        self.yield_count += 1;
132    }
133
134    pub(crate) fn is_terminated(&self) -> bool {
135        matches!(self.state, State::Terminated)
136    }
137
138    pub(crate) fn set_terminated(&mut self) {
139        self.state = State::Terminated;
140    }
141
142    pub(crate) fn drop_locals(&mut self) -> Box<dyn std::any::Any> {
143        let mut locals = Vec::with_capacity(self.locals.len());
144
145        // run the Drop impls of any mock thread-locals created by this thread.
146        for local in self.locals.values_mut() {
147            locals.push(local.0.take());
148        }
149
150        Box::new(locals)
151    }
152
153    pub(crate) fn unpark(&mut self, unparker: &Thread) {
154        self.causality.join(&unparker.causality);
155        self.set_unparked();
156    }
157
158    /// Unpark a thread's state. If it is already runnable, store the unpark for
159    /// a future call to `park`.
160    fn set_unparked(&mut self) {
161        if self.is_blocked() || self.is_yield() {
162            self.set_runnable();
163        } else if self.is_runnable() {
164            self.state = State::Runnable { unparked: true }
165        }
166    }
167}
168
169impl fmt::Debug for Thread {
170    // Manual debug impl is necessary because thread locals are represented as
171    // `dyn Any`, which does not implement `Debug`.
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        f.debug_struct("Thread")
174            .field("id", &self.id)
175            .field("state", &self.state)
176            .field("critical", &self.critical)
177            .field("operation", &self.operation)
178            .field("causality", &self.causality)
179            .field("released", &self.released)
180            .field("dpor_vv", &self.dpor_vv)
181            .field("last_yield", &self.last_yield)
182            .field("yield_count", &self.yield_count)
183            .field("locals", &format_args!("[..locals..]"))
184            .finish()
185    }
186}
187
188impl Set {
189    /// Create an empty thread set.
190    ///
191    /// The set may contain up to `max_threads` threads.
192    pub(crate) fn new(execution_id: execution::Id, max_threads: usize) -> Set {
193        let mut threads = Vec::with_capacity(max_threads);
194        // Capture the current iteration's span to be used as each thread
195        // span's parent.
196        let iteration_span = tracing::Span::current();
197        // Push initial thread
198        threads.push(Thread::new(Id::new(execution_id, 0), &iteration_span));
199
200        Set {
201            execution_id,
202            threads,
203            active: Some(0),
204            seq_cst_causality: VersionVec::new(),
205            iteration_span,
206        }
207    }
208
209    pub(crate) fn execution_id(&self) -> execution::Id {
210        self.execution_id
211    }
212
213    /// Create a new thread
214    pub(crate) fn new_thread(&mut self) -> Id {
215        assert!(self.threads.len() < self.max());
216
217        // Get the identifier for the thread about to be created
218        let id = self.threads.len();
219
220        // Push the thread onto the stack
221        self.threads.push(Thread::new(
222            Id::new(self.execution_id, id),
223            &self.iteration_span,
224        ));
225
226        Id::new(self.execution_id, id)
227    }
228
229    pub(crate) fn max(&self) -> usize {
230        self.threads.capacity()
231    }
232
233    pub(crate) fn is_active(&self) -> bool {
234        self.active.is_some()
235    }
236
237    pub(crate) fn is_complete(&self) -> bool {
238        if self.active.is_none() {
239            // All threads should be terminated
240            for thread in &self.threads {
241                assert!(
242                    thread.is_terminated(),
243                    "thread not terminated; {:#?}",
244                    thread
245                );
246            }
247
248            true
249        } else {
250            false
251        }
252    }
253
254    pub(crate) fn active_id(&self) -> Id {
255        Id::new(self.execution_id, self.active.unwrap())
256    }
257
258    pub(crate) fn active(&self) -> &Thread {
259        &self.threads[self.active.unwrap()]
260    }
261
262    pub(crate) fn set_active(&mut self, id: Option<Id>) {
263        tracing::dispatcher::get_default(|subscriber| {
264            if let Some(span_id) = self.active().span.id() {
265                subscriber.exit(&span_id)
266            }
267
268            if let Some(span_id) = id.and_then(|id| self.threads.get(id.id)?.span.id()) {
269                subscriber.enter(&span_id);
270            }
271        });
272        self.active = id.map(Id::as_usize);
273    }
274
275    pub(crate) fn active_mut(&mut self) -> &mut Thread {
276        &mut self.threads[self.active.unwrap()]
277    }
278
279    /// Get the active thread and second thread
280    pub(crate) fn active2_mut(&mut self, other: Id) -> (&mut Thread, &mut Thread) {
281        let active = self.active.unwrap();
282        let other = other.id;
283
284        if other >= active {
285            let (l, r) = self.threads.split_at_mut(other);
286
287            (&mut l[active], &mut r[0])
288        } else {
289            let (l, r) = self.threads.split_at_mut(active);
290
291            (&mut r[0], &mut l[other])
292        }
293    }
294
295    pub(crate) fn active_causality_inc(&mut self) {
296        let id = self.active_id();
297        self.active_mut().causality.inc(id);
298    }
299
300    pub(crate) fn active_atomic_version(&self) -> u16 {
301        let id = self.active_id();
302        self.active().causality[id]
303    }
304
305    pub(crate) fn unpark(&mut self, id: Id) {
306        if id == self.active_id() {
307            // The thread is unparking itself. We don't have to join its
308            // causality with the unparker's causality in this case, since the
309            // thread *is* the unparker. Just unpark its state.
310            self.active_mut().set_unparked();
311            return;
312        }
313
314        // Synchronize memory
315        let (active, th) = self.active2_mut(id);
316        th.unpark(active);
317    }
318
319    /// Insert a point of sequential consistency
320    /// TODO
321    /// - Deprecate SeqCst accesses and allow SeqCst fences only. The semantics of SeqCst accesses
322    ///   is complex and difficult to implement correctly. On the other hand, SeqCst fence has
323    ///   well-understood and clear semantics in the absence of SeqCst accesses, and can be used
324    ///   for enforcing the read-after-write (RAW) ordering which is probably what the user want to
325    ///   achieve with SeqCst.
326    /// - Revisit the other uses of this function. They probably don't require sequential
327    ///   consistency. E.g. see https://en.cppreference.com/w/cpp/named_req/Mutex
328    ///
329    /// References
330    /// - The "scfix" paper, which proposes a memory model called RC11 that fixes SeqCst
331    ///   semantics. of C11. https://plv.mpi-sws.org/scfix/
332    /// - Some fixes from the "scfix" paper has been incorporated into C/C++20:
333    ///   http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0668r5.html
334    /// - The "promising semantics" paper, which propose an intuitive semantics of SeqCst fence in
335    ///   the absence of SC accesses. https://sf.snu.ac.kr/promise-concurrency/
336    pub(crate) fn seq_cst(&mut self) {
337        // The previous implementation of sequential consistency was incorrect (though it's correct
338        // for `fence(SeqCst)`-only scenario; use `seq_cst_fence` for `fence(SeqCst)`).
339        // As a quick fix, just disable it. This may fail to model correct code,
340        // but will not silently allow bugs.
341    }
342
343    pub(crate) fn seq_cst_fence(&mut self) {
344        self.threads[self.active.unwrap()]
345            .causality
346            .join(&self.seq_cst_causality);
347        self.seq_cst_causality
348            .join(&self.threads[self.active.unwrap()].causality);
349    }
350
351    pub(crate) fn clear(&mut self, execution_id: execution::Id) {
352        self.iteration_span = tracing::Span::current();
353        self.threads.clear();
354        self.threads
355            .push(Thread::new(Id::new(execution_id, 0), &self.iteration_span));
356
357        self.execution_id = execution_id;
358        self.active = Some(0);
359        self.seq_cst_causality = VersionVec::new();
360    }
361
362    pub(crate) fn iter(&self) -> impl ExactSizeIterator<Item = (Id, &Thread)> + '_ {
363        let execution_id = self.execution_id;
364        self.threads
365            .iter()
366            .enumerate()
367            .map(move |(id, thread)| (Id::new(execution_id, id), thread))
368    }
369
370    pub(crate) fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = (Id, &mut Thread)> + '_ {
371        let execution_id = self.execution_id;
372        self.threads
373            .iter_mut()
374            .enumerate()
375            .map(move |(id, thread)| (Id::new(execution_id, id), thread))
376    }
377
378    /// Split the set of threads into the active thread and an iterator of all
379    /// other threads.
380    pub(crate) fn split_active(&mut self) -> (&mut Thread, impl Iterator<Item = &mut Thread>) {
381        let active = self.active.unwrap();
382        let (one, two) = self.threads.split_at_mut(active);
383        let (active, two) = two.split_at_mut(1);
384
385        let iter = one.iter_mut().chain(two.iter_mut());
386
387        (&mut active[0], iter)
388    }
389
390    pub(crate) fn local<T: 'static>(
391        &mut self,
392        key: &'static crate::thread::LocalKey<T>,
393    ) -> Option<Result<&T, AccessError>> {
394        self.active_mut()
395            .locals
396            .get(&LocalKeyId::new(key))
397            .map(|local_value| local_value.get())
398    }
399
400    pub(crate) fn local_init<T: 'static>(
401        &mut self,
402        key: &'static crate::thread::LocalKey<T>,
403        value: T,
404    ) {
405        assert!(self
406            .active_mut()
407            .locals
408            .insert(LocalKeyId::new(key), LocalValue::new(value))
409            .is_none())
410    }
411}
412
413impl ops::Index<Id> for Set {
414    type Output = Thread;
415
416    fn index(&self, index: Id) -> &Thread {
417        &self.threads[index.id]
418    }
419}
420
421impl ops::IndexMut<Id> for Set {
422    fn index_mut(&mut self, index: Id) -> &mut Thread {
423        &mut self.threads[index.id]
424    }
425}
426
427impl Id {
428    pub(crate) fn new(execution_id: execution::Id, id: usize) -> Id {
429        Id { execution_id, id }
430    }
431
432    pub(crate) fn as_usize(self) -> usize {
433        self.id
434    }
435}
436
437impl From<Id> for usize {
438    fn from(src: Id) -> usize {
439        src.id
440    }
441}
442
443impl fmt::Display for Id {
444    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
445        self.id.fmt(fmt)
446    }
447}
448
449impl fmt::Debug for Id {
450    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
451        write!(fmt, "Id({})", self.id)
452    }
453}
454
455impl LocalKeyId {
456    fn new<T>(key: &'static crate::thread::LocalKey<T>) -> Self {
457        Self(key as *const _ as usize)
458    }
459}
460
461impl LocalValue {
462    fn new<T: 'static>(value: T) -> Self {
463        Self(Some(Box::new(value)))
464    }
465
466    fn get<T: 'static>(&self) -> Result<&T, AccessError> {
467        self.0
468            .as_ref()
469            .ok_or(AccessError { _private: () })
470            .map(|val| {
471                val.downcast_ref::<T>()
472                    .expect("local value must downcast to expected type")
473            })
474    }
475}
476
477/// An error returned by [`LocalKey::try_with`](struct.LocalKey.html#method.try_with).
478pub struct AccessError {
479    _private: (),
480}
481
482impl fmt::Debug for AccessError {
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484        f.debug_struct("AccessError").finish()
485    }
486}
487
488impl fmt::Display for AccessError {
489    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490        fmt::Display::fmt("already destroyed", f)
491    }
492}