todc_utils/specifications/
etcd.rs

1//! A sequential specification of an [etcd](https://etcd.io/) key-value store.
2use std::fs::File;
3use std::io::{self, BufRead};
4use std::path::Path;
5
6use crate::linearizability::history::{Action, History};
7use crate::specifications::Specification;
8
9type ProcessID = usize;
10
11/// Returns the contents of the file, line by line.
12///
13/// Recipe from: https://doc.rust-lang.org/rust-by-example/std_misc/file/read_lines.html
14fn read_lines<P>(filename: P) -> io::Result<io::Lines<io::BufReader<File>>>
15where
16    P: AsRef<Path>,
17{
18    let file = File::open(filename)?;
19    Ok(io::BufReader::new(file).lines())
20}
21
22/// Returns a history of operations performed on a etcd server being
23/// tested by [Jepsen](https://github.com/jepsen-io/jepsen).
24///
25/// The history is created by parsing logs from Jepsen. See
26/// [here](https://github.com/kaymanb/todc/blob/main/todc-utils/tests/linearizability/etcd/etcd_000.log)
27/// for an example of such a log file.
28pub fn history_from_log(filename: String) -> History<EtcdOperation> {
29    let mut unknowns: Vec<(ProcessID, Action<EtcdOperation>)> = Vec::new();
30    let mut actions: Vec<(ProcessID, Action<EtcdOperation>)> = Vec::new();
31    for line in read_lines(filename).unwrap() {
32        let line = line.unwrap();
33        let words: Vec<&str> = line.split_whitespace().collect();
34        if words.len() < 7 {
35            continue;
36        };
37        if words[1] != "jepsen.util" {
38            continue;
39        };
40        if words[3] == ":nemesis" {
41            continue;
42        };
43
44        let process: usize = words[3].parse().unwrap();
45        // Logs are marked with :info when the success of the operation is unknown. It
46        // suffices to consider a history where all such operations eventually finish,
47        // but at the very end of the history.
48        // See: https://aphyr.com/posts/316-jepsen-etcd-and-consul#writing-a-client
49        if words[4] == ":info" {
50            let (_, call) = actions
51                .iter()
52                .rev()
53                .find(|(pid, _)| *pid == process)
54                .unwrap()
55                .clone();
56            let response = match call {
57                Action::Call(operation) => match operation {
58                    // Reads are a special case, in that they do not affect the state of the
59                    // object. Instead of the operations success being unknown, they can simply
60                    // be treated as having failed, and we expect them to be marked as such in the logs.
61                    Read(_, _) => panic!("Success of read operation cannot be unknown"),
62                    Write(_, value) => Write(Unknown, value),
63                    CompareAndSwap(_, cas) => CompareAndSwap(Unknown, cas),
64                },
65                Action::Response(_) => {
66                    panic!("Expected previous operation by process {process} to be a call")
67                }
68            };
69            unknowns.push((process, Action::Response(response)));
70            continue;
71        }
72
73        let status = EtcdStatus::from_log(words[4]);
74        let operation = EtcdOperation::from_log(&words[4..]);
75        let action = match status {
76            EtcdStatus::Invoke => Action::Call(operation),
77            _ => Action::Response(operation),
78        };
79
80        actions.push((process, action))
81    }
82
83    // Append responses for operations whose status was unknown to the end of the
84    // history.
85    for item in unknowns.into_iter() {
86        actions.push(item);
87    }
88    History::from_actions(actions)
89}
90
91/// The status of an etcd operation.
92#[derive(PartialEq, Eq, Debug, Copy, Clone)]
93pub enum EtcdStatus {
94    Invoke,
95    Okay,
96    Fail,
97    Unknown,
98}
99
100impl EtcdStatus {
101    fn from_log(string: &str) -> Self {
102        if string == ":invoke" {
103            Self::Invoke
104        } else if string == ":ok" {
105            Self::Okay
106        } else if string == ":fail" {
107            Self::Fail
108        } else if string == ":info" {
109            Self::Unknown
110        } else {
111            panic!("Unexpected status: '{string}'")
112        }
113    }
114}
115
116use EtcdStatus::*;
117
118/// An etcd operation containing [`u32`] values.
119#[derive(Debug, Copy, Clone)]
120pub enum EtcdOperation {
121    Read(EtcdStatus, Option<u32>),
122    Write(EtcdStatus, u32),
123    CompareAndSwap(EtcdStatus, (u32, u32)),
124}
125
126impl EtcdOperation {
127    fn from_log(words: &[&str]) -> Self {
128        let status = EtcdStatus::from_log(words[0]);
129        let operation = words[1];
130        if operation == ":read" {
131            let value = if words[2] == "nil" || words[2] == ":timed-out" {
132                None
133            } else {
134                Some(words[2].parse::<u32>().unwrap())
135            };
136            Self::Read(status, value)
137        } else if operation == ":write" {
138            let value = words[2].parse::<u32>().unwrap();
139            Self::Write(status, value)
140        } else if operation == ":cas" {
141            let value = (
142                words[2][1..].parse().unwrap(),
143                words[3][..1].parse().unwrap(),
144            );
145            Self::CompareAndSwap(status, value)
146        } else {
147            panic!("Unexpected operation: '{operation}'")
148        }
149    }
150}
151
152use EtcdOperation::*;
153
154/// A sequential specification of an [etcd](https://etcd.io/) key-value store.
155///
156/// The specification allows for reads, writes, and compare-and-swap (CAS) operations to be
157/// performed on a single shared register containing [`u32`] values. In practice, etcd
158/// stores exposes many such registers, each indexed by unique key.
159pub struct EtcdSpecification;
160
161impl Specification for EtcdSpecification {
162    type State = Option<u32>;
163    type Operation = EtcdOperation;
164
165    fn init() -> Self::State {
166        None
167    }
168
169    fn apply(operation: &Self::Operation, state: &Self::State) -> (bool, Self::State) {
170        match operation {
171            Read(status, value) => match status {
172                Okay => (value == state, *state),
173                Fail => (value != state, *state),
174                _ => panic!("Cannot apply read that has not succeeded or failed"),
175            },
176            Write(status, value) => match status {
177                Invoke => panic!("Cannot apply write that has only been invoked"),
178                Okay => (true, Some(*value)),
179                Fail => (true, *state),
180                // A write whose status is unknown can be assumed to have completed
181                // successfuly. If, in reality, the write failed, then the result
182                // is indistinguishable to a success at the very end of a sequence
183                // of operations.
184                Unknown => (true, Some(*value)),
185            },
186            CompareAndSwap(status, (compare, swap)) => {
187                let success = match state {
188                    Some(value) => compare == value,
189                    None => false,
190                };
191                match status {
192                    Invoke => panic!("Cannot apply CAS that has only been invoked"),
193                    Okay => (success, if success { Some(*swap) } else { *state }),
194                    Fail => (!success, *state),
195                    // A CAS whose status is unkown can be assumed to have completed
196                    // successfuly, for the same reason as explained above for writes.
197                    Unknown => (true, if success { Some(*swap) } else { *state }),
198                }
199            }
200        }
201    }
202}
203
204#[cfg(test)]
205mod test {
206    use super::*;
207
208    type Spec = EtcdSpecification;
209
210    mod init {
211        use super::*;
212
213        #[test]
214        fn initializes_state_to_none() {
215            assert_eq!(Spec::init(), None);
216        }
217    }
218
219    mod apply {
220        use super::*;
221
222        #[test]
223        fn read_does_not_mutate_state() {
224            let (_, new_state) = Spec::apply(&Read(Okay, None), &Spec::init());
225            assert_eq!(new_state, Spec::init());
226        }
227
228        #[test]
229        fn read_of_state_is_valid() {
230            let state = Some(42);
231            let (is_valid, _) = Spec::apply(&Read(Okay, state), &state);
232            assert!(is_valid);
233        }
234
235        #[test]
236        fn read_of_bad_value_is_invalid() {
237            let (is_valid, _) = Spec::apply(&Read(Okay, Some(42)), &None);
238            assert!(!is_valid);
239        }
240
241        #[test]
242        fn write_sets_new_state_to_written_value() {
243            let value = 123;
244            let (_, new_state) = Spec::apply(&Write(Okay, value), &Spec::init());
245            assert_eq!(new_state, Some(value));
246        }
247
248        #[test]
249        fn cas_of_bad_value_is_invalid() {
250            let (is_valid, _) = Spec::apply(&CompareAndSwap(Okay, (1, 2)), &None);
251            assert!(!is_valid);
252        }
253    }
254}