stateright/semantics/
linearizability.rs

1//! Private module for selective re-export. See [`LinearizabilityTester`].
2
3use crate::semantics::{ConsistencyTester, SequentialSpec};
4use std::collections::{btree_map, BTreeMap, VecDeque};
5use std::fmt::Debug;
6
7// This implementation is based on `SequentialConsistencyTester` and will be
8// easier to follow if you are already familiar with that code. The key
9// difference is that upon starting an operation, this tester also records
10// the index of the last operation completed by every other thread, and those
11// same indices are also preserved if/when the operation completes. That data
12// allows the tester to reject histories that violate "real time" ordering.
13
14/// This tester captures a potentially concurrent history of operations and
15/// validates that it adheres to a [`SequentialSpec`] based on the
16/// [linearizability] consistency model. This model requires that operations be
17/// applied atomically and that sequenced (non-concurrent) operations are
18/// applied in order.
19///
20/// If you're not sure whether to pick this or [`SequentialConsistencyTester`], favor
21/// `LinearizabilityTester`.
22///
23/// # Linearizability
24///
25/// Unlike with [sequential consistency], all sequenced (non-concurrent) operations must respect a
26/// happens-before relationship, even across threads, which ensures that histories respect "real
27/// time" ordering (defined more precisely below).  Anomalies are prevented because threads all
28/// agree on the viable order of operations.  For example, the later read by Thread 2 must read the
29/// value of Thread 1's write (rather than a differing earlier value) since those two operations
30/// are not concurrent:
31///
32/// ```text
33///           -----------Time------------------------------>
34/// Thread 1: [write invoked... and returns]
35/// Thread 2:                                 [read invoked... and returns]
36/// ```
37///
38/// While "real time" is a common way to phrase an implicit total ordering on non-concurrent events
39/// spanning threads, a more precise way to think about this is that prior to Thread 2 starting its
40/// read, Thread 1 is capable of communicating with Thread 2 indicating that the write finished.
41/// This perspective avoids introducing the notion of a shared global time, which is often a
42/// misleading perspective when it comes to distributed systems (or even modern physics in
43/// general).
44///
45/// The [`SequentialSpec`] will imply additional ordering constraints based on semantics specific
46/// to each operation. For example, a value cannot be popped off a stack before it is pushed. It is
47/// then the responsibility of this tester to establish whether a valid total ordering of events
48/// exists under these constraints.
49///
50/// See also: [`SequentialConsistencyTester`].
51///
52/// [linearizability]: https://en.wikipedia.org/wiki/Linearizability
53/// [sequential consistency]: https://en.wikipedia.org/wiki/Sequential_consistency
54/// [`SequentialConsistencyTester`]: crate::semantics::SequentialConsistencyTester
55#[derive(Clone, Debug, Eq, Hash, PartialEq)]
56#[allow(clippy::type_complexity)]
57pub struct LinearizabilityTester<ThreadId, RefObj: SequentialSpec> {
58    init_ref_obj: RefObj,
59    history_by_thread: BTreeMap<ThreadId, VecDeque<Complete<ThreadId, RefObj::Op, RefObj::Ret>>>,
60    in_flight_by_thread: BTreeMap<ThreadId, InFlight<ThreadId, RefObj::Op>>,
61    is_valid_history: bool,
62}
63
64type LastCompletedOpMap<ThreadId> = BTreeMap<ThreadId, usize>;
65type Complete<ThreadId, Op, Ret> = (LastCompletedOpMap<ThreadId>, Op, Ret);
66type InFlight<ThreadId, Op> = (LastCompletedOpMap<ThreadId>, Op);
67
68#[allow(clippy::len_without_is_empty)] // no use case for an emptiness check
69impl<T: Ord, RefObj: SequentialSpec> LinearizabilityTester<T, RefObj> {
70    /// Constructs a [`LinearizabilityTester`].
71    pub fn new(init_ref_obj: RefObj) -> Self {
72        Self {
73            init_ref_obj,
74            history_by_thread: Default::default(),
75            in_flight_by_thread: Default::default(),
76            is_valid_history: true,
77        }
78    }
79
80    /// Indicates the aggregate number of operations completed or in flight
81    /// across all threads.
82    pub fn len(&self) -> usize {
83        let mut len = self.in_flight_by_thread.len();
84        for history in self.history_by_thread.values() {
85            len += history.len();
86        }
87        len
88    }
89}
90
91impl<T, RefObj> ConsistencyTester<T, RefObj> for LinearizabilityTester<T, RefObj>
92where
93    T: Copy + Debug + Ord,
94    RefObj: Clone + SequentialSpec,
95    RefObj::Op: Clone + Debug,
96    RefObj::Ret: Clone + Debug + PartialEq,
97{
98    /// Indicates that a thread invoked an operation. Returns `Ok(...)` if the
99    /// history is valid, even if it is not lineariable.
100    ///
101    /// See [`LinearizabilityTester::serialized_history`].
102    fn on_invoke(&mut self, thread_id: T, op: RefObj::Op) -> Result<&mut Self, String> {
103        if !self.is_valid_history {
104            return Err("Earlier history was invalid.".to_string());
105        }
106        let in_flight_elem = self.in_flight_by_thread.entry(thread_id);
107        if let btree_map::Entry::Occupied(occupied_op_entry) = in_flight_elem {
108            self.is_valid_history = false;
109            let (_, op) = occupied_op_entry.get();
110            return Err(format!(
111                    "Thread already has an operation in flight. thread_id={:?}, op={:?}, history_by_thread={:?}",
112                    thread_id, op, self.history_by_thread));
113        };
114        let last_completed = self
115            .history_by_thread
116            .iter()
117            .filter_map(|(id, cs)| {
118                // collect last completed op index for every other thread
119                if id == &thread_id || cs.is_empty() {
120                    None
121                } else {
122                    Some((*id, cs.len() - 1))
123                }
124            })
125            .collect::<BTreeMap<_, _>>();
126        in_flight_elem.or_insert((last_completed, op));
127        self.history_by_thread.entry(thread_id).or_default(); // `serialize` requires entry
128        Ok(self)
129    }
130
131    /// Indicates that a thread's earlier operation invocation returned. Returns
132    /// `Ok(...)` if the history is valid, even if it is not linearizable.
133    ///
134    /// See [`LinearizabilityTester::serialized_history`].
135    fn on_return(&mut self, thread_id: T, ret: RefObj::Ret) -> Result<&mut Self, String> {
136        if !self.is_valid_history {
137            return Err("Earlier history was invalid.".to_string());
138        }
139        let (completed, op) = match self.in_flight_by_thread.remove(&thread_id) {
140            None => {
141                self.is_valid_history = false;
142                return Err(format!(
143                    "There is no in-flight invocation for this thread ID. \
144                     thread_id={:?}, unexpected_return={:?}, history={:?}",
145                    thread_id,
146                    ret,
147                    self.history_by_thread.entry(thread_id).or_default()
148                ));
149            }
150            Some(x) => x,
151        };
152        self.history_by_thread
153            .entry(thread_id)
154            .or_default()
155            .push_back((completed, op, ret));
156        Ok(self)
157    }
158
159    /// Indicates whether the recorded history is linearizable.
160    fn is_consistent(&self) -> bool {
161        self.serialized_history().is_some()
162    }
163}
164
165impl<T, RefObj> LinearizabilityTester<T, RefObj>
166where
167    T: Copy + Debug + Ord,
168    RefObj: Clone + SequentialSpec,
169    RefObj::Op: Clone + Debug,
170    RefObj::Ret: Clone + Debug + PartialEq,
171{
172    /// Attempts to serialize the recorded partially ordered operation history
173    /// into a total order that is consistent with a reference object's
174    /// operational semantics.
175    pub fn serialized_history(&self) -> Option<Vec<(RefObj::Op, RefObj::Ret)>> {
176        if !self.is_valid_history {
177            return None;
178        }
179        let history_by_thread = self
180            .history_by_thread
181            .iter()
182            .map(|(t, cs)| (*t, cs.clone().into_iter().enumerate().collect()))
183            .collect();
184        Self::serialize(
185            Vec::new(),
186            &self.init_ref_obj,
187            &history_by_thread,
188            &self.in_flight_by_thread,
189        )
190    }
191
192    #[allow(clippy::type_complexity)]
193    fn serialize(
194        valid_history: Vec<(RefObj::Op, RefObj::Ret)>, // total order
195        ref_obj: &RefObj,
196        remaining_history_by_thread: &BTreeMap<
197            T,
198            VecDeque<(usize, Complete<T, RefObj::Op, RefObj::Ret>)>,
199        >, // partial order
200        in_flight_by_thread: &BTreeMap<T, InFlight<T, RefObj::Op>>,
201    ) -> Option<Vec<(RefObj::Op, RefObj::Ret)>> {
202        // Return collected total order when there is no remaining partial order to interleave.
203        let done = remaining_history_by_thread
204            .iter()
205            .all(|(_id, h)| h.is_empty());
206        if done {
207            return Some(valid_history);
208        }
209
210        // Otherwise try remaining interleavings.
211        for (thread_id, remaining_history) in remaining_history_by_thread.iter() {
212            let mut remaining_history_by_thread =
213                std::borrow::Cow::Borrowed(remaining_history_by_thread);
214            let mut in_flight_by_thread = std::borrow::Cow::Borrowed(in_flight_by_thread);
215            let (ref_obj, valid_history) = if remaining_history.is_empty() {
216                // Case 1: No remaining history to interleave. Maybe in-flight.
217                if !in_flight_by_thread.contains_key(thread_id) {
218                    continue;
219                }
220                let (cs, op) = in_flight_by_thread.to_mut().remove(thread_id).unwrap(); // `contains_key` above
221                let violation = cs.iter().any(|(peer_id, min_peer_time)| {
222                    // Ensure all pre-req operations were completed by peers
223                    if let Some(ops) = remaining_history_by_thread.get(peer_id) {
224                        if let Some((next_peer_time, _)) = ops.iter().next() {
225                            if next_peer_time <= min_peer_time {
226                                return true;
227                            }
228                        }
229                    }
230                    false
231                });
232                if violation {
233                    continue;
234                }
235                let mut ref_obj = ref_obj.clone();
236                let ret = ref_obj.invoke(&op);
237                let mut valid_history = valid_history.clone();
238                valid_history.push((op, ret));
239                (ref_obj, valid_history)
240            } else {
241                // Case 2: Has remaining history to interleave.
242                let (_t, (cs, op, ret)) = remaining_history_by_thread
243                    .to_mut()
244                    .get_mut(thread_id)
245                    .unwrap() // iterator returned this thread ID
246                    .pop_front()
247                    .unwrap(); // `!is_empty()` above
248                let violation = cs.iter().any(|(peer_id, min_peer_time)| {
249                    // Ensure all pre-req operations were completed by peers
250                    if let Some(ops) = remaining_history_by_thread.get(peer_id) {
251                        if let Some((next_peer_time, _)) = ops.iter().next() {
252                            if next_peer_time <= min_peer_time {
253                                return true;
254                            }
255                        }
256                    }
257                    false
258                });
259                if violation {
260                    continue;
261                }
262                let mut ref_obj = ref_obj.clone();
263                if !ref_obj.is_valid_step(&op, &ret) {
264                    continue;
265                }
266                let mut valid_history = valid_history.clone();
267                valid_history.push((op, ret));
268                (ref_obj, valid_history)
269            };
270            if let Some(valid_history) = Self::serialize(
271                valid_history,
272                &ref_obj,
273                &remaining_history_by_thread,
274                &in_flight_by_thread,
275            ) {
276                return Some(valid_history);
277            }
278        }
279        None
280    }
281}
282
283impl<T: Ord, RefObj> Default for LinearizabilityTester<T, RefObj>
284where
285    RefObj: Default + SequentialSpec,
286{
287    fn default() -> Self {
288        Self::new(RefObj::default())
289    }
290}
291
292impl<T, RefObj> serde::Serialize for LinearizabilityTester<T, RefObj>
293where
294    RefObj: serde::Serialize + SequentialSpec,
295    RefObj::Op: serde::Serialize,
296    RefObj::Ret: serde::Serialize,
297    T: Ord + serde::Serialize,
298{
299    fn serialize<Ser: serde::Serializer>(&self, ser: Ser) -> Result<Ser::Ok, Ser::Error> {
300        use serde::ser::SerializeStruct;
301        let mut out = ser.serialize_struct("LinearizabilityTester", 4)?;
302        out.serialize_field("init_ref_obj", &self.init_ref_obj)?;
303        out.serialize_field("history_by_thread", &self.history_by_thread)?;
304        out.serialize_field("in_flight_by_thread", &self.in_flight_by_thread)?;
305        out.serialize_field("is_valid_history", &self.is_valid_history)?;
306        out.end()
307    }
308}
309
310#[cfg(test)]
311mod test {
312    use super::*;
313    use crate::semantics::register::*;
314    use crate::semantics::vec::*;
315
316    #[test]
317    fn rejects_invalid_history() -> Result<(), String> {
318        assert_eq!(
319            LinearizabilityTester::new(Register('A'))
320                .on_invoke(99, RegisterOp::Write('B'))?
321                .on_invoke(99, RegisterOp::Write('C')),
322            Err("Thread already has an operation in flight. thread_id=99, op=Write('B'), history_by_thread={99: []}".to_string()));
323        assert_eq!(
324            LinearizabilityTester::new(Register('A'))
325                .on_invret(99, RegisterOp::Write('B'), RegisterRet::WriteOk)?
326                .on_invret(99, RegisterOp::Write('C'), RegisterRet::WriteOk)?
327                .on_return(99, RegisterRet::WriteOk),
328            Err("There is no in-flight invocation for this thread ID. \
329                 thread_id=99, \
330                 unexpected_return=WriteOk, \
331                 history=[({}, Write('B'), WriteOk), ({}, Write('C'), WriteOk)]"
332                .to_string())
333        );
334        Ok(())
335    }
336
337    #[test]
338    fn identifies_linearizable_register_history() -> Result<(), String> {
339        assert_eq!(
340            LinearizabilityTester::new(Register('A'))
341                .on_invoke(0, RegisterOp::Write('B'))?
342                .on_invret(1, RegisterOp::Read, RegisterRet::ReadOk('A'))?
343                .serialized_history(),
344            Some(vec![(RegisterOp::Read, RegisterRet::ReadOk('A')),])
345        );
346        assert_eq!(
347            LinearizabilityTester::new(Register('A'))
348                .on_invoke(0, RegisterOp::Read)?
349                .on_invoke(1, RegisterOp::Write('B'))?
350                .on_return(0, RegisterRet::ReadOk('B'))?
351                .serialized_history(),
352            Some(vec![
353                (RegisterOp::Write('B'), RegisterRet::WriteOk),
354                (RegisterOp::Read, RegisterRet::ReadOk('B')),
355            ])
356        );
357        Ok(())
358    }
359
360    #[test]
361    fn identifies_unlinearizable_register_history() -> Result<(), String> {
362        assert_eq!(
363            LinearizabilityTester::new(Register('A'))
364                .on_invret(0, RegisterOp::Read, RegisterRet::ReadOk('B'))?
365                .serialized_history(),
366            None
367        );
368        assert_eq!(
369            LinearizabilityTester::new(Register('A'))
370                .on_invret(0, RegisterOp::Read, RegisterRet::ReadOk('B'))?
371                .on_invoke(1, RegisterOp::Write('B'))?
372                .serialized_history(),
373            None // SC but not lineariable
374        );
375        Ok(())
376    }
377
378    #[test]
379    fn identifies_linearizable_vec_history() -> Result<(), String> {
380        assert_eq!(
381            LinearizabilityTester::new(Vec::new())
382                .on_invoke(0, VecOp::Push(10))?
383                .serialized_history(),
384            Some(vec![])
385        );
386        assert_eq!(
387            LinearizabilityTester::new(Vec::new())
388                .on_invoke(0, VecOp::Push(10))?
389                .on_invret(1, VecOp::Pop, VecRet::PopOk(None))?
390                .serialized_history(),
391            Some(vec![(VecOp::Pop, VecRet::PopOk(None)),])
392        );
393        assert_eq!(
394            LinearizabilityTester::new(Vec::new())
395                .on_invoke(0, VecOp::Push(10))?
396                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
397                .serialized_history(),
398            Some(vec![
399                (VecOp::Push(10), VecRet::PushOk),
400                (VecOp::Pop, VecRet::PopOk(Some(10))),
401            ])
402        );
403        assert_eq!(
404            LinearizabilityTester::new(Vec::new())
405                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
406                .on_invoke(0, VecOp::Push(20))?
407                .on_invret(1, VecOp::Len, VecRet::LenOk(1))?
408                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
409                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
410                .serialized_history(),
411            Some(vec![
412                (VecOp::Push(10), VecRet::PushOk),
413                (VecOp::Len, VecRet::LenOk(1)),
414                (VecOp::Push(20), VecRet::PushOk),
415                (VecOp::Pop, VecRet::PopOk(Some(20))),
416                (VecOp::Pop, VecRet::PopOk(Some(10))),
417            ])
418        );
419        assert_eq!(
420            LinearizabilityTester::new(Vec::new())
421                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
422                .on_invoke(0, VecOp::Push(20))?
423                .on_invret(1, VecOp::Len, VecRet::LenOk(1))?
424                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
425                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
426                .serialized_history(),
427            Some(vec![
428                (VecOp::Push(10), VecRet::PushOk),
429                (VecOp::Len, VecRet::LenOk(1)),
430                (VecOp::Pop, VecRet::PopOk(Some(10))),
431                (VecOp::Push(20), VecRet::PushOk),
432                (VecOp::Pop, VecRet::PopOk(Some(20))),
433            ])
434        );
435        assert_eq!(
436            LinearizabilityTester::new(Vec::new())
437                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
438                .on_invoke(0, VecOp::Push(20))?
439                .on_invret(1, VecOp::Len, VecRet::LenOk(2))?
440                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
441                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
442                .serialized_history(),
443            Some(vec![
444                (VecOp::Push(10), VecRet::PushOk),
445                (VecOp::Push(20), VecRet::PushOk),
446                (VecOp::Len, VecRet::LenOk(2)),
447                (VecOp::Pop, VecRet::PopOk(Some(20))),
448                (VecOp::Pop, VecRet::PopOk(Some(10))),
449            ])
450        );
451        assert_eq!(
452            LinearizabilityTester::new(Vec::new())
453                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
454                .on_invoke(1, VecOp::Len)?
455                .on_invoke(0, VecOp::Push(20))?
456                .on_return(1, VecRet::LenOk(1))?
457                .serialized_history(),
458            Some(vec![
459                (VecOp::Push(10), VecRet::PushOk),
460                (VecOp::Len, VecRet::LenOk(1)),
461            ])
462        );
463        assert_eq!(
464            LinearizabilityTester::new(Vec::new())
465                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
466                .on_invoke(1, VecOp::Len)?
467                .on_invoke(0, VecOp::Push(20))?
468                .on_return(1, VecRet::LenOk(2))?
469                .serialized_history(),
470            Some(vec![
471                (VecOp::Push(10), VecRet::PushOk),
472                (VecOp::Push(20), VecRet::PushOk),
473                (VecOp::Len, VecRet::LenOk(2)),
474            ])
475        );
476        Ok(())
477    }
478
479    #[test]
480    fn identifies_unlinearizable_vec_history() -> Result<(), String> {
481        assert_eq!(
482            LinearizabilityTester::new(Vec::new())
483                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
484                .on_invret(1, VecOp::Pop, VecRet::PopOk(None))?
485                .serialized_history(),
486            None // SC but not lineariable
487        );
488        assert_eq!(
489            LinearizabilityTester::new(Vec::new())
490                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
491                .on_invoke(1, VecOp::Len)?
492                .on_invoke(0, VecOp::Push(20))?
493                .on_return(1, VecRet::LenOk(0))?
494                .serialized_history(),
495            None
496        );
497        assert_eq!(
498            LinearizabilityTester::new(Vec::new())
499                .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
500                .on_invoke(0, VecOp::Push(20))?
501                .on_invret(1, VecOp::Len, VecRet::LenOk(2))?
502                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
503                .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
504                .serialized_history(),
505            None
506        );
507        Ok(())
508    }
509}