caspaxos_kv/
lib.rs

1#[doc(hidden)]
2#[cfg(not(feature = "fault_injection"))]
3pub async fn debug_delay() {}
4
5mod bitset;
6#[cfg(feature = "fault_injection")]
7mod debug_delay;
8mod network;
9mod paxos;
10mod serialization;
11mod simulator;
12mod udp_net;
13mod versioned_storage;
14
15pub use {
16    network::Net,
17    paxos::{Client, Server},
18    simulator::simulate,
19};
20
21use std::{io, net::ToSocketAddrs};
22
23#[cfg(feature = "fault_injection")]
24pub use debug_delay::debug_delay;
25
26/// A possibly present value with an associated version number.
27#[derive(Default, Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
28pub struct VersionedValue {
29    pub ballot: u64,
30    pub value: Option<Vec<u8>>,
31}
32
33impl std::ops::Deref for VersionedValue {
34    type Target = Option<Vec<u8>>;
35
36    fn deref(&self) -> &Option<Vec<u8>> {
37        &self.value
38    }
39}
40
41impl std::ops::DerefMut for VersionedValue {
42    fn deref_mut(&mut self) -> &mut Option<Vec<u8>> {
43        &mut self.value
44    }
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48enum Message {
49    Request(Request),
50    Response(Response),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54enum Request {
55    // discriminant 1
56    Ping,
57    // discriminant 2
58    Prepare { ballot: u64, key: Vec<u8> },
59    // discriminant 3
60    Accept { key: Vec<u8>, value: VersionedValue },
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64enum Response {
65    // discriminant 4
66    Pong,
67    // discriminant 5
68    Promise {
69        success: Result<VersionedValue, u64>,
70    },
71    // discriminant 6
72    Accepted {
73        success: Result<(), u64>,
74    },
75}
76
77impl Response {
78    fn to_promise(self) -> Result<VersionedValue, u64> {
79        if let Response::Promise { success } = self {
80            success
81        } else {
82            panic!("called to_promise on {:?}", self);
83        }
84    }
85
86    fn to_accepted(self) -> Result<(), u64> {
87        if let Response::Accepted { success } = self {
88            success
89        } else {
90            panic!("called to_promise on {:?}", self);
91        }
92    }
93
94    fn is_pong(self) -> bool {
95        if let Response::Pong = self {
96            true
97        } else {
98            false
99        }
100    }
101
102    fn is_success(&self) -> bool {
103        match self {
104            Response::Pong => true,
105            Response::Accepted { success } => success.is_ok(),
106            Response::Promise { success } => success.is_ok(),
107        }
108    }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112struct Envelope {
113    uuid: uuid::Uuid,
114    message: Message,
115}
116pub fn start_udp_client<
117    A: ToSocketAddrs + std::fmt::Display,
118    B: ToSocketAddrs + std::fmt::Display,
119>(
120    listen_addr: A,
121    servers: &[B],
122    timeout: std::time::Duration,
123) -> std::io::Result<Client> {
124    let mut known_servers = vec![];
125
126    for server in servers {
127        let mut addrs_iter = server.to_socket_addrs()?;
128        // NB we only use the first address. this is buggy.
129        if let Some(addr) = addrs_iter.next() {
130            known_servers.push(addr);
131        } else {
132            return Err(io::Error::new(
133                io::ErrorKind::AddrNotAvailable,
134                format!("the address {} could not be resolved", server),
135            ));
136        }
137    }
138
139    let (process_task, net) = Net::new_udp(listen_addr, timeout)?;
140
141    let processor = smol::spawn(process_task);
142
143    Ok(Client {
144        known_servers,
145        net,
146        cache: Default::default(),
147        processor: Some(processor),
148    })
149}
150
151pub fn start_udp_server<
152    A: ToSocketAddrs + std::fmt::Display,
153    P: AsRef<std::path::Path>,
154>(
155    listen_addr: A,
156    storage_directory: P,
157    timeout: std::time::Duration,
158) -> std::io::Result<Server> {
159    let db = match sled::open(&storage_directory) {
160        Ok(db) => versioned_storage::VersionedStorage { db },
161        Err(e) => {
162            return Err(io::Error::new(
163                io::ErrorKind::Other,
164                format!(
165                    "failed to open database at {:?}: {:?}",
166                    storage_directory.as_ref(),
167                    e
168                ),
169            ))
170        }
171    };
172
173    let (process_task, net) = Net::new_udp(listen_addr, timeout)?;
174
175    let processor = smol::spawn(process_task);
176
177    Ok(Server {
178        net,
179        db,
180        processor: Some(processor),
181        promises: Default::default(),
182    })
183}