1use crate::semantics::{ConsistencyTester, SequentialSpec};
4use std::collections::{btree_map, BTreeMap, VecDeque};
5use std::fmt::Debug;
6
7#[derive(Clone, Debug, Eq, Hash, PartialEq)]
56#[allow(clippy::type_complexity)]
57pub struct LinearizabilityTester<ThreadId, RefObj: SequentialSpec> {
58 init_ref_obj: RefObj,
59 history_by_thread: BTreeMap<ThreadId, VecDeque<Complete<ThreadId, RefObj::Op, RefObj::Ret>>>,
60 in_flight_by_thread: BTreeMap<ThreadId, InFlight<ThreadId, RefObj::Op>>,
61 is_valid_history: bool,
62}
63
64type LastCompletedOpMap<ThreadId> = BTreeMap<ThreadId, usize>;
65type Complete<ThreadId, Op, Ret> = (LastCompletedOpMap<ThreadId>, Op, Ret);
66type InFlight<ThreadId, Op> = (LastCompletedOpMap<ThreadId>, Op);
67
68#[allow(clippy::len_without_is_empty)] impl<T: Ord, RefObj: SequentialSpec> LinearizabilityTester<T, RefObj> {
70 pub fn new(init_ref_obj: RefObj) -> Self {
72 Self {
73 init_ref_obj,
74 history_by_thread: Default::default(),
75 in_flight_by_thread: Default::default(),
76 is_valid_history: true,
77 }
78 }
79
80 pub fn len(&self) -> usize {
83 let mut len = self.in_flight_by_thread.len();
84 for history in self.history_by_thread.values() {
85 len += history.len();
86 }
87 len
88 }
89}
90
91impl<T, RefObj> ConsistencyTester<T, RefObj> for LinearizabilityTester<T, RefObj>
92where
93 T: Copy + Debug + Ord,
94 RefObj: Clone + SequentialSpec,
95 RefObj::Op: Clone + Debug,
96 RefObj::Ret: Clone + Debug + PartialEq,
97{
98 fn on_invoke(&mut self, thread_id: T, op: RefObj::Op) -> Result<&mut Self, String> {
103 if !self.is_valid_history {
104 return Err("Earlier history was invalid.".to_string());
105 }
106 let in_flight_elem = self.in_flight_by_thread.entry(thread_id);
107 if let btree_map::Entry::Occupied(occupied_op_entry) = in_flight_elem {
108 self.is_valid_history = false;
109 let (_, op) = occupied_op_entry.get();
110 return Err(format!(
111 "Thread already has an operation in flight. thread_id={:?}, op={:?}, history_by_thread={:?}",
112 thread_id, op, self.history_by_thread));
113 };
114 let last_completed = self
115 .history_by_thread
116 .iter()
117 .filter_map(|(id, cs)| {
118 if id == &thread_id || cs.is_empty() {
120 None
121 } else {
122 Some((*id, cs.len() - 1))
123 }
124 })
125 .collect::<BTreeMap<_, _>>();
126 in_flight_elem.or_insert((last_completed, op));
127 self.history_by_thread.entry(thread_id).or_default(); Ok(self)
129 }
130
131 fn on_return(&mut self, thread_id: T, ret: RefObj::Ret) -> Result<&mut Self, String> {
136 if !self.is_valid_history {
137 return Err("Earlier history was invalid.".to_string());
138 }
139 let (completed, op) = match self.in_flight_by_thread.remove(&thread_id) {
140 None => {
141 self.is_valid_history = false;
142 return Err(format!(
143 "There is no in-flight invocation for this thread ID. \
144 thread_id={:?}, unexpected_return={:?}, history={:?}",
145 thread_id,
146 ret,
147 self.history_by_thread.entry(thread_id).or_default()
148 ));
149 }
150 Some(x) => x,
151 };
152 self.history_by_thread
153 .entry(thread_id)
154 .or_default()
155 .push_back((completed, op, ret));
156 Ok(self)
157 }
158
159 fn is_consistent(&self) -> bool {
161 self.serialized_history().is_some()
162 }
163}
164
165impl<T, RefObj> LinearizabilityTester<T, RefObj>
166where
167 T: Copy + Debug + Ord,
168 RefObj: Clone + SequentialSpec,
169 RefObj::Op: Clone + Debug,
170 RefObj::Ret: Clone + Debug + PartialEq,
171{
172 pub fn serialized_history(&self) -> Option<Vec<(RefObj::Op, RefObj::Ret)>> {
176 if !self.is_valid_history {
177 return None;
178 }
179 let history_by_thread = self
180 .history_by_thread
181 .iter()
182 .map(|(t, cs)| (*t, cs.clone().into_iter().enumerate().collect()))
183 .collect();
184 Self::serialize(
185 Vec::new(),
186 &self.init_ref_obj,
187 &history_by_thread,
188 &self.in_flight_by_thread,
189 )
190 }
191
192 #[allow(clippy::type_complexity)]
193 fn serialize(
194 valid_history: Vec<(RefObj::Op, RefObj::Ret)>, ref_obj: &RefObj,
196 remaining_history_by_thread: &BTreeMap<
197 T,
198 VecDeque<(usize, Complete<T, RefObj::Op, RefObj::Ret>)>,
199 >, in_flight_by_thread: &BTreeMap<T, InFlight<T, RefObj::Op>>,
201 ) -> Option<Vec<(RefObj::Op, RefObj::Ret)>> {
202 let done = remaining_history_by_thread
204 .iter()
205 .all(|(_id, h)| h.is_empty());
206 if done {
207 return Some(valid_history);
208 }
209
210 for (thread_id, remaining_history) in remaining_history_by_thread.iter() {
212 let mut remaining_history_by_thread =
213 std::borrow::Cow::Borrowed(remaining_history_by_thread);
214 let mut in_flight_by_thread = std::borrow::Cow::Borrowed(in_flight_by_thread);
215 let (ref_obj, valid_history) = if remaining_history.is_empty() {
216 if !in_flight_by_thread.contains_key(thread_id) {
218 continue;
219 }
220 let (cs, op) = in_flight_by_thread.to_mut().remove(thread_id).unwrap(); let violation = cs.iter().any(|(peer_id, min_peer_time)| {
222 if let Some(ops) = remaining_history_by_thread.get(peer_id) {
224 if let Some((next_peer_time, _)) = ops.iter().next() {
225 if next_peer_time <= min_peer_time {
226 return true;
227 }
228 }
229 }
230 false
231 });
232 if violation {
233 continue;
234 }
235 let mut ref_obj = ref_obj.clone();
236 let ret = ref_obj.invoke(&op);
237 let mut valid_history = valid_history.clone();
238 valid_history.push((op, ret));
239 (ref_obj, valid_history)
240 } else {
241 let (_t, (cs, op, ret)) = remaining_history_by_thread
243 .to_mut()
244 .get_mut(thread_id)
245 .unwrap() .pop_front()
247 .unwrap(); let violation = cs.iter().any(|(peer_id, min_peer_time)| {
249 if let Some(ops) = remaining_history_by_thread.get(peer_id) {
251 if let Some((next_peer_time, _)) = ops.iter().next() {
252 if next_peer_time <= min_peer_time {
253 return true;
254 }
255 }
256 }
257 false
258 });
259 if violation {
260 continue;
261 }
262 let mut ref_obj = ref_obj.clone();
263 if !ref_obj.is_valid_step(&op, &ret) {
264 continue;
265 }
266 let mut valid_history = valid_history.clone();
267 valid_history.push((op, ret));
268 (ref_obj, valid_history)
269 };
270 if let Some(valid_history) = Self::serialize(
271 valid_history,
272 &ref_obj,
273 &remaining_history_by_thread,
274 &in_flight_by_thread,
275 ) {
276 return Some(valid_history);
277 }
278 }
279 None
280 }
281}
282
283impl<T: Ord, RefObj> Default for LinearizabilityTester<T, RefObj>
284where
285 RefObj: Default + SequentialSpec,
286{
287 fn default() -> Self {
288 Self::new(RefObj::default())
289 }
290}
291
292impl<T, RefObj> serde::Serialize for LinearizabilityTester<T, RefObj>
293where
294 RefObj: serde::Serialize + SequentialSpec,
295 RefObj::Op: serde::Serialize,
296 RefObj::Ret: serde::Serialize,
297 T: Ord + serde::Serialize,
298{
299 fn serialize<Ser: serde::Serializer>(&self, ser: Ser) -> Result<Ser::Ok, Ser::Error> {
300 use serde::ser::SerializeStruct;
301 let mut out = ser.serialize_struct("LinearizabilityTester", 4)?;
302 out.serialize_field("init_ref_obj", &self.init_ref_obj)?;
303 out.serialize_field("history_by_thread", &self.history_by_thread)?;
304 out.serialize_field("in_flight_by_thread", &self.in_flight_by_thread)?;
305 out.serialize_field("is_valid_history", &self.is_valid_history)?;
306 out.end()
307 }
308}
309
310#[cfg(test)]
311mod test {
312 use super::*;
313 use crate::semantics::register::*;
314 use crate::semantics::vec::*;
315
316 #[test]
317 fn rejects_invalid_history() -> Result<(), String> {
318 assert_eq!(
319 LinearizabilityTester::new(Register('A'))
320 .on_invoke(99, RegisterOp::Write('B'))?
321 .on_invoke(99, RegisterOp::Write('C')),
322 Err("Thread already has an operation in flight. thread_id=99, op=Write('B'), history_by_thread={99: []}".to_string()));
323 assert_eq!(
324 LinearizabilityTester::new(Register('A'))
325 .on_invret(99, RegisterOp::Write('B'), RegisterRet::WriteOk)?
326 .on_invret(99, RegisterOp::Write('C'), RegisterRet::WriteOk)?
327 .on_return(99, RegisterRet::WriteOk),
328 Err("There is no in-flight invocation for this thread ID. \
329 thread_id=99, \
330 unexpected_return=WriteOk, \
331 history=[({}, Write('B'), WriteOk), ({}, Write('C'), WriteOk)]"
332 .to_string())
333 );
334 Ok(())
335 }
336
337 #[test]
338 fn identifies_linearizable_register_history() -> Result<(), String> {
339 assert_eq!(
340 LinearizabilityTester::new(Register('A'))
341 .on_invoke(0, RegisterOp::Write('B'))?
342 .on_invret(1, RegisterOp::Read, RegisterRet::ReadOk('A'))?
343 .serialized_history(),
344 Some(vec![(RegisterOp::Read, RegisterRet::ReadOk('A')),])
345 );
346 assert_eq!(
347 LinearizabilityTester::new(Register('A'))
348 .on_invoke(0, RegisterOp::Read)?
349 .on_invoke(1, RegisterOp::Write('B'))?
350 .on_return(0, RegisterRet::ReadOk('B'))?
351 .serialized_history(),
352 Some(vec![
353 (RegisterOp::Write('B'), RegisterRet::WriteOk),
354 (RegisterOp::Read, RegisterRet::ReadOk('B')),
355 ])
356 );
357 Ok(())
358 }
359
360 #[test]
361 fn identifies_unlinearizable_register_history() -> Result<(), String> {
362 assert_eq!(
363 LinearizabilityTester::new(Register('A'))
364 .on_invret(0, RegisterOp::Read, RegisterRet::ReadOk('B'))?
365 .serialized_history(),
366 None
367 );
368 assert_eq!(
369 LinearizabilityTester::new(Register('A'))
370 .on_invret(0, RegisterOp::Read, RegisterRet::ReadOk('B'))?
371 .on_invoke(1, RegisterOp::Write('B'))?
372 .serialized_history(),
373 None );
375 Ok(())
376 }
377
378 #[test]
379 fn identifies_linearizable_vec_history() -> Result<(), String> {
380 assert_eq!(
381 LinearizabilityTester::new(Vec::new())
382 .on_invoke(0, VecOp::Push(10))?
383 .serialized_history(),
384 Some(vec![])
385 );
386 assert_eq!(
387 LinearizabilityTester::new(Vec::new())
388 .on_invoke(0, VecOp::Push(10))?
389 .on_invret(1, VecOp::Pop, VecRet::PopOk(None))?
390 .serialized_history(),
391 Some(vec![(VecOp::Pop, VecRet::PopOk(None)),])
392 );
393 assert_eq!(
394 LinearizabilityTester::new(Vec::new())
395 .on_invoke(0, VecOp::Push(10))?
396 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
397 .serialized_history(),
398 Some(vec![
399 (VecOp::Push(10), VecRet::PushOk),
400 (VecOp::Pop, VecRet::PopOk(Some(10))),
401 ])
402 );
403 assert_eq!(
404 LinearizabilityTester::new(Vec::new())
405 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
406 .on_invoke(0, VecOp::Push(20))?
407 .on_invret(1, VecOp::Len, VecRet::LenOk(1))?
408 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
409 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
410 .serialized_history(),
411 Some(vec![
412 (VecOp::Push(10), VecRet::PushOk),
413 (VecOp::Len, VecRet::LenOk(1)),
414 (VecOp::Push(20), VecRet::PushOk),
415 (VecOp::Pop, VecRet::PopOk(Some(20))),
416 (VecOp::Pop, VecRet::PopOk(Some(10))),
417 ])
418 );
419 assert_eq!(
420 LinearizabilityTester::new(Vec::new())
421 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
422 .on_invoke(0, VecOp::Push(20))?
423 .on_invret(1, VecOp::Len, VecRet::LenOk(1))?
424 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
425 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
426 .serialized_history(),
427 Some(vec![
428 (VecOp::Push(10), VecRet::PushOk),
429 (VecOp::Len, VecRet::LenOk(1)),
430 (VecOp::Pop, VecRet::PopOk(Some(10))),
431 (VecOp::Push(20), VecRet::PushOk),
432 (VecOp::Pop, VecRet::PopOk(Some(20))),
433 ])
434 );
435 assert_eq!(
436 LinearizabilityTester::new(Vec::new())
437 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
438 .on_invoke(0, VecOp::Push(20))?
439 .on_invret(1, VecOp::Len, VecRet::LenOk(2))?
440 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
441 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
442 .serialized_history(),
443 Some(vec![
444 (VecOp::Push(10), VecRet::PushOk),
445 (VecOp::Push(20), VecRet::PushOk),
446 (VecOp::Len, VecRet::LenOk(2)),
447 (VecOp::Pop, VecRet::PopOk(Some(20))),
448 (VecOp::Pop, VecRet::PopOk(Some(10))),
449 ])
450 );
451 assert_eq!(
452 LinearizabilityTester::new(Vec::new())
453 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
454 .on_invoke(1, VecOp::Len)?
455 .on_invoke(0, VecOp::Push(20))?
456 .on_return(1, VecRet::LenOk(1))?
457 .serialized_history(),
458 Some(vec![
459 (VecOp::Push(10), VecRet::PushOk),
460 (VecOp::Len, VecRet::LenOk(1)),
461 ])
462 );
463 assert_eq!(
464 LinearizabilityTester::new(Vec::new())
465 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
466 .on_invoke(1, VecOp::Len)?
467 .on_invoke(0, VecOp::Push(20))?
468 .on_return(1, VecRet::LenOk(2))?
469 .serialized_history(),
470 Some(vec![
471 (VecOp::Push(10), VecRet::PushOk),
472 (VecOp::Push(20), VecRet::PushOk),
473 (VecOp::Len, VecRet::LenOk(2)),
474 ])
475 );
476 Ok(())
477 }
478
479 #[test]
480 fn identifies_unlinearizable_vec_history() -> Result<(), String> {
481 assert_eq!(
482 LinearizabilityTester::new(Vec::new())
483 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
484 .on_invret(1, VecOp::Pop, VecRet::PopOk(None))?
485 .serialized_history(),
486 None );
488 assert_eq!(
489 LinearizabilityTester::new(Vec::new())
490 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
491 .on_invoke(1, VecOp::Len)?
492 .on_invoke(0, VecOp::Push(20))?
493 .on_return(1, VecRet::LenOk(0))?
494 .serialized_history(),
495 None
496 );
497 assert_eq!(
498 LinearizabilityTester::new(Vec::new())
499 .on_invret(0, VecOp::Push(10), VecRet::PushOk)?
500 .on_invoke(0, VecOp::Push(20))?
501 .on_invret(1, VecOp::Len, VecRet::LenOk(2))?
502 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(10)))?
503 .on_invret(1, VecOp::Pop, VecRet::PopOk(Some(20)))?
504 .serialized_history(),
505 None
506 );
507 Ok(())
508 }
509}