1#[doc(hidden)]
2#[cfg(not(feature = "fault_injection"))]
3pub async fn debug_delay() {}
4
5mod bitset;
6#[cfg(feature = "fault_injection")]
7mod debug_delay;
8mod network;
9mod paxos;
10mod serialization;
11mod simulator;
12mod udp_net;
13mod versioned_storage;
14
15pub use {
16 network::Net,
17 paxos::{Client, Server},
18 simulator::simulate,
19};
20
21use std::{io, net::ToSocketAddrs};
22
23#[cfg(feature = "fault_injection")]
24pub use debug_delay::debug_delay;
25
26#[derive(Default, Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
28pub struct VersionedValue {
29 pub ballot: u64,
30 pub value: Option<Vec<u8>>,
31}
32
33impl std::ops::Deref for VersionedValue {
34 type Target = Option<Vec<u8>>;
35
36 fn deref(&self) -> &Option<Vec<u8>> {
37 &self.value
38 }
39}
40
41impl std::ops::DerefMut for VersionedValue {
42 fn deref_mut(&mut self) -> &mut Option<Vec<u8>> {
43 &mut self.value
44 }
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48enum Message {
49 Request(Request),
50 Response(Response),
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54enum Request {
55 Ping,
57 Prepare { ballot: u64, key: Vec<u8> },
59 Accept { key: Vec<u8>, value: VersionedValue },
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64enum Response {
65 Pong,
67 Promise {
69 success: Result<VersionedValue, u64>,
70 },
71 Accepted {
73 success: Result<(), u64>,
74 },
75}
76
77impl Response {
78 fn to_promise(self) -> Result<VersionedValue, u64> {
79 if let Response::Promise { success } = self {
80 success
81 } else {
82 panic!("called to_promise on {:?}", self);
83 }
84 }
85
86 fn to_accepted(self) -> Result<(), u64> {
87 if let Response::Accepted { success } = self {
88 success
89 } else {
90 panic!("called to_promise on {:?}", self);
91 }
92 }
93
94 fn is_pong(self) -> bool {
95 if let Response::Pong = self {
96 true
97 } else {
98 false
99 }
100 }
101
102 fn is_success(&self) -> bool {
103 match self {
104 Response::Pong => true,
105 Response::Accepted { success } => success.is_ok(),
106 Response::Promise { success } => success.is_ok(),
107 }
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112struct Envelope {
113 uuid: uuid::Uuid,
114 message: Message,
115}
116pub fn start_udp_client<
117 A: ToSocketAddrs + std::fmt::Display,
118 B: ToSocketAddrs + std::fmt::Display,
119>(
120 listen_addr: A,
121 servers: &[B],
122 timeout: std::time::Duration,
123) -> std::io::Result<Client> {
124 let mut known_servers = vec![];
125
126 for server in servers {
127 let mut addrs_iter = server.to_socket_addrs()?;
128 if let Some(addr) = addrs_iter.next() {
130 known_servers.push(addr);
131 } else {
132 return Err(io::Error::new(
133 io::ErrorKind::AddrNotAvailable,
134 format!("the address {} could not be resolved", server),
135 ));
136 }
137 }
138
139 let (process_task, net) = Net::new_udp(listen_addr, timeout)?;
140
141 let processor = smol::spawn(process_task);
142
143 Ok(Client {
144 known_servers,
145 net,
146 cache: Default::default(),
147 processor: Some(processor),
148 })
149}
150
151pub fn start_udp_server<
152 A: ToSocketAddrs + std::fmt::Display,
153 P: AsRef<std::path::Path>,
154>(
155 listen_addr: A,
156 storage_directory: P,
157 timeout: std::time::Duration,
158) -> std::io::Result<Server> {
159 let db = match sled::open(&storage_directory) {
160 Ok(db) => versioned_storage::VersionedStorage { db },
161 Err(e) => {
162 return Err(io::Error::new(
163 io::ErrorKind::Other,
164 format!(
165 "failed to open database at {:?}: {:?}",
166 storage_directory.as_ref(),
167 e
168 ),
169 ))
170 }
171 };
172
173 let (process_task, net) = Net::new_udp(listen_addr, timeout)?;
174
175 let processor = smol::spawn(process_task);
176
177 Ok(Server {
178 net,
179 db,
180 processor: Some(processor),
181 promises: Default::default(),
182 })
183}