todc_utils/linearizability/
history.rs

1//! A sequence of operations applied to a shared object.
2use std::collections::VecDeque;
3use std::iter::repeat_with;
4use std::ops::{Index, IndexMut};
5
6/// A identifier for an [`Entry`]
7pub type EntryId = usize;
8
9/// A process identifier.
10pub type ProcessId = usize;
11
12/// An action that occurs as part of an operation on a shared object.
13#[derive(PartialEq, Eq, Clone, Debug)]
14pub enum Action<T> {
15    /// A `Call` indicates the beginning of an operation.
16    Call(T),
17    /// A `Response` indicates the end of an operation.
18    Response(T),
19}
20
21/// An entry in a history that represents the call to an operation.
22#[derive(PartialEq, Eq, Clone, Debug)]
23pub struct CallEntry<T> {
24    /// The identifier for this [`CallEntry`].
25    pub id: EntryId,
26    /// The operation being called.
27    pub operation: T,
28    /// The identifier of the [`ResponseEntry`] that stores the response to this
29    /// operation.
30    pub response: EntryId,
31}
32
33/// An entry in a history that represents the response from an operation.
34#[derive(PartialEq, Eq, Clone, Debug)]
35pub struct ResponseEntry<T> {
36    /// The identifier for this [`ResponseEntry`].
37    pub id: EntryId,
38    /// The operation being responded to.
39    pub operation: T,
40}
41
42/// An entry in a history.
43#[derive(PartialEq, Eq, Clone, Debug)]
44pub enum Entry<T> {
45    Call(CallEntry<T>),
46    Response(ResponseEntry<T>),
47}
48
49impl<T> Entry<T> {
50    /// Returns a unique identifier for this [`Entry`].
51    pub fn id(&self) -> EntryId {
52        match self {
53            Entry::Call(entry) => entry.id,
54            Entry::Response(entry) => entry.id,
55        }
56    }
57}
58
59/// A sequence of operations applied to a shared object.
60///
61/// A history is a sequence of operations that have been applied to a shared
62/// object. Each [`Entry`] in the history indicates either a call to or response
63/// from an operation performed by a specific process. It is possible for
64/// operations from different processes to be performed concurrently, which
65/// is modeled in a history by interleaving the call and response entries
66/// for those operations.
67///
68/// # Examples
69///
70/// Consider a history of operations performed on a shared register, where
71/// processes can write values and read values that have been written. In
72/// the following example process `P0` performs a write, after which process
73/// `P1` performs a read.
74///
75/// ```
76/// use std::matches;
77/// use todc_utils::{History, Action::{Call, Response}};
78/// use todc_utils::linearizability::history::Entry;
79/// use todc_utils::specifications::register::RegisterOperation::{Read, Write};
80///
81/// // P0 |--------|            Write("Hello, World!")
82/// // P1            |--------| Read("Hello, World!")
83/// let actions = vec![
84///     (0, Call(Write("Hello, World!"))),
85///     (0, Response(Write("World, World!"))),
86///     (1, Call(Read(None))),
87///     (1, Response(Read(Some("Hello, World!")))),
88/// ];
89///
90/// let history = History::from_actions(actions);
91/// assert!(matches!(&history[0], Entry::Call(x)));
92/// ```
93///
94/// In the next example processes `P0`, `P1`, and `P2` each perform
95/// a write while `P3` performs three reads. Notice that the reads performed by
96/// `P3` occur concurrently with the writes performed by other processes.
97///
98/// ```
99/// # use std::matches;
100/// # use todc_utils::{History, Action::{Call, Response}};
101/// # use todc_utils::linearizability::history::Entry;
102/// # use todc_utils::specifications::register::RegisterOperation::{Read, Write};
103/// // P0 |--------------------| Write(0)
104/// // P1 |--------------------| Write(1)
105/// // P2 |--------------------| Write(2)
106/// // P3   |--|                 Read(2)
107/// // P3          |--|          Read(1)
108/// // P3                 |--|   Read(0)
109/// let actions = vec![
110///     (0, Call(Write(0))),
111///     (1, Call(Write(1))),
112///     (2, Call(Write(2))),
113///     (3, Call(Read(None))),
114///     (3, Response(Read(Some(2)))),
115///     (3, Call(Read(None))),
116///     (3, Response(Read(Some(1)))),
117///     (3, Call(Read(None))),
118///     (3, Response(Read(Some(0)))),
119///     (0, Response(Write(0))),
120///     (1, Response(Write(1))),
121///     (2, Response(Write(2))),
122/// ];
123///
124/// let history = History::from_actions(actions);
125/// assert!(matches!(&history[0], Entry::Call(x)));
126/// ```
127#[derive(PartialEq, Eq, Clone, Debug)]
128pub struct History<T> {
129    pub(super) entries: Vec<Entry<T>>,
130    // When an entry is removed from this history, its index is recorded here.
131    removed_from: Vec<Option<EntryId>>,
132}
133
134impl<T> History<T> {
135    /// Creates a history from a sequence of actions.
136    ///
137    /// # Panics
138    ///
139    /// Panics if `actions` is empty.
140    ///
141    /// Panics if the resulting history would be incomplete. That is, if there is some
142    /// `Call` action that does not have a corresponding `Response`.
143    ///
144    /// ```should_panic
145    /// # use std::matches;
146    /// # use todc_utils::{History, Action::{Call, Response}};
147    /// # use todc_utils::specifications::register::RegisterOperation::{Write};
148    /// let incomplete_actions = vec![
149    ///     (0, Call(Write("Hello"))),
150    ///     (1, Call(Write("World"))),
151    ///     (0, Response(Write("Hello"))),
152    ///     // <-- Missing response to the call by process 1!
153    /// ];
154    ///
155    /// let history = History::from_actions(incomplete_actions);
156    /// ```
157    pub fn from_actions(actions: Vec<(ProcessId, Action<T>)>) -> Self {
158        let (processes, actions): (Vec<ProcessId>, Vec<Action<T>>) = actions.into_iter().unzip();
159
160        let num_processes = processes.iter().max().unwrap();
161        let mut calls: Vec<VecDeque<usize>> = repeat_with(VecDeque::new)
162            .take(*num_processes + 1)
163            .collect();
164        let mut responses = calls.clone();
165        for (i, process) in processes.iter().enumerate() {
166            match &actions[i] {
167                Action::Call(_) => calls[*process].push_back(i),
168                Action::Response(_) => responses[*process].push_back(i),
169            }
170        }
171
172        Self {
173            entries: actions
174                .into_iter()
175                .enumerate()
176                .map(|(i, action)| match action {
177                    Action::Call(operation) => Entry::Call(CallEntry {
178                        id: i,
179                        operation,
180                        response: responses[processes[i]].pop_front().unwrap(),
181                    }),
182                    Action::Response(operation) => {
183                        Entry::Response(ResponseEntry { id: i, operation })
184                    }
185                })
186                .collect(),
187            removed_from: repeat_with(|| None).take(processes.len()).collect(),
188        }
189    }
190
191    // TODO: This operation is very expensive. Implementing History as a doubly-linked list could
192    // greatly improve performance.
193    pub(super) fn index_of_id(&self, id: EntryId) -> usize {
194        self.iter().position(|e| e.id() == id).unwrap()
195    }
196
197    /// # Panics
198    ///
199    /// Panics if input entry was not previously removed from the history.
200    fn insert(&mut self, entry: Entry<T>) -> usize {
201        match self.removed_from[entry.id()].take() {
202            Some(index) => {
203                self.entries.insert(index, entry);
204                index
205            }
206            None => panic!(
207                "Index that entry {} was removed from is unknown",
208                entry.id()
209            ),
210        }
211    }
212
213    pub(super) fn is_empty(&self) -> bool {
214        self.entries.is_empty()
215    }
216
217    pub(super) fn iter(&self) -> impl Iterator<Item = &Entry<T>> {
218        self.entries.iter()
219    }
220
221    pub(super) fn len(&self) -> usize {
222        self.entries.len()
223    }
224
225    pub(super) fn lift(&mut self, i: usize) -> (Entry<T>, Entry<T>) {
226        match self.remove(i) {
227            Entry::Response(_) => panic!("Cannot lift a response entry out of the history"),
228            Entry::Call(call) => {
229                let response = self.remove(self.index_of_id(call.response));
230                (Entry::Call(call), response)
231            }
232        }
233    }
234
235    fn remove(&mut self, i: usize) -> Entry<T> {
236        let entry = self.entries.remove(i);
237        self.removed_from[entry.id()] = Some(i);
238        entry
239    }
240
241    pub(super) fn unlift(&mut self, call: Entry<T>, response: Entry<T>) -> (usize, usize) {
242        let response_index = self.insert(response);
243        let call_index = self.insert(call);
244        (call_index, response_index)
245    }
246}
247
248impl<T> Index<usize> for History<T> {
249    type Output = Entry<T>;
250
251    fn index(&self, i: usize) -> &Self::Output {
252        self.entries.index(i)
253    }
254}
255
256impl<T> IndexMut<usize> for History<T> {
257    fn index_mut(&mut self, i: usize) -> &mut Self::Output {
258        self.entries.index_mut(i)
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use std::iter::zip;
266    use Action::{Call, Response};
267
268    mod from_actions {
269        use super::*;
270
271        #[test]
272        fn creates_sequential_ids() {
273            let history = History::from_actions(vec![
274                (0, Call("a")),
275                (0, Response("a")),
276                (0, Call("b")),
277                (0, Response("b")),
278            ]);
279            for (i, entry) in history.iter().enumerate() {
280                assert_eq!(entry.id(), i);
281            }
282        }
283
284        #[test]
285        fn links_actions_of_multiple_processes() {
286            let history = History::from_actions(vec![
287                (0, Call("a")),
288                (1, Call("b")),
289                (2, Call("c")),
290                (0, Response("a")),
291                (1, Response("b")),
292                (2, Response("c")),
293                (0, Call("e")),
294                (1, Call("f")),
295                (2, Call("g")),
296                (0, Response("e")),
297                (1, Response("f")),
298                (2, Response("g")),
299            ]);
300            for entry in history.iter() {
301                println!("{:?}", entry);
302                if let Entry::Call(call) = entry {
303                    match &history[history.index_of_id(call.response)] {
304                        Entry::Response(response) => assert_eq!(call.operation, response.operation),
305                        Entry::Call(_) => panic!("Call entry was linked to another call entry"),
306                    }
307                }
308            }
309        }
310
311        #[test]
312        fn links_actions_of_single_process() {
313            let history = History::from_actions(vec![
314                (0, Call("a")),
315                (0, Response("a")),
316                (0, Call("b")),
317                (0, Response("b")),
318                (0, Call("c")),
319                (0, Response("c")),
320            ]);
321            for entry in history.iter() {
322                if let Entry::Call(call) = entry {
323                    match &history[history.index_of_id(call.response)] {
324                        Entry::Response(response) => assert_eq!(call.operation, response.operation),
325                        Entry::Call(_) => panic!("Call entry was linked to another call entry"),
326                    }
327                }
328            }
329        }
330    }
331
332    mod insert {
333        use super::*;
334
335        #[test]
336        fn is_inverse_of_remove() {
337            let history = History::from_actions(vec![
338                (0, Call("a")),
339                (1, Call("b")),
340                (2, Call("c")),
341                (0, Response("a")),
342                (1, Response("b")),
343                (2, Response("c")),
344            ]);
345            let mut copy = history.clone();
346            let entry = copy.remove(1);
347            copy.insert(entry);
348            for (copy, entry) in zip(copy.iter(), history.iter()) {
349                assert_eq!(copy, entry);
350            }
351        }
352    }
353
354    mod lift {
355        use super::*;
356
357        #[test]
358        fn removes_call_and_response_entries() {
359            let mut history = History::from_actions(vec![
360                (0, Call("a")),
361                (1, Call("b")),
362                (2, Call("c")),
363                (0, Response("a")),
364                (1, Response("b")),
365                (2, Response("c")),
366            ]);
367            history.lift(0);
368            for (entry, letter) in zip(history.iter(), ["b", "c", "b", "c"]) {
369                match entry {
370                    Entry::Call(call) => assert_eq!(call.operation, letter),
371                    Entry::Response(resp) => assert_eq!(resp.operation, letter),
372                }
373            }
374        }
375    }
376
377    mod remove {
378        use super::*;
379
380        #[test]
381        fn removes_only_requested_entry() {
382            let mut history = History::from_actions(vec![
383                (0, Call("a")),
384                (1, Call("b")),
385                (0, Response("a")),
386                (1, Response("b")),
387            ]);
388            match history.remove(1) {
389                Entry::Call(call) => assert_eq!(call.operation, "b"),
390                Entry::Response(_) => panic!("Removed incorrect entry"),
391            }
392            for (entry, letter) in zip(history.iter(), ["a", "a", "b"]) {
393                match entry {
394                    Entry::Call(entry) => assert_eq!(entry.operation, letter),
395                    Entry::Response(entry) => assert_eq!(entry.operation, letter),
396                }
397            }
398        }
399    }
400
401    mod unlift {
402        use super::*;
403
404        #[test]
405        fn is_inverse_of_lift() {
406            let mut history = History::from_actions(vec![
407                (0, Call("a")),
408                (1, Call("b")),
409                (0, Response("a")),
410                (1, Response("b")),
411            ]);
412            let copy = history.clone();
413            let (call, response) = history.lift(0);
414            history.unlift(call, response);
415            assert_eq!(history, copy)
416        }
417    }
418}