1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::net::{IpAddr, Ipv4Addr};
3use std::sync::Arc;
4use std::time::Duration;
5
6use freqfs::DirLock;
7use futures::TryFutureExt;
8use gethostname::gethostname;
9use log::{info, trace, warn};
10use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo};
11
12#[cfg(feature = "service")]
13use tc_state::chain::Recover;
14use tc_state::CacheBlock;
15use tc_transact::{fs, TxnId};
16use tc_value::{Host, Link, Protocol};
17use tcgeneric::NetworkTime;
18
19use crate::aes256::Key as Aes256Key;
20use crate::client::Client;
21use crate::kernel::{Kernel, Schema};
22use crate::server::Server;
23use crate::txn::TxnServer;
24use crate::{RPCClient, DEFAULT_MAX_RETRIES, DEFAULT_PORT, DEFAULT_TTL, SERVICE_TYPE};
25
26pub struct Broadcast {
27 hostname: Option<String>,
28 daemon: ServiceDaemon,
29 peers: HashMap<String, (HashSet<IpAddr>, u16)>,
30}
31
32impl Broadcast {
33 pub fn new() -> Self {
34 Self {
35 hostname: None,
36 daemon: ServiceDaemon::new().expect("mDNS daemon"),
37 peers: HashMap::new(),
38 }
39 }
40
41 pub fn hostname(&mut self) -> &str {
42 if self.hostname.is_none() {
43 self.hostname = gethostname().into_string().expect("hostname").into();
44 }
45
46 self.hostname.as_ref().expect("hostname")
47 }
48
49 pub fn set_hostname(mut self, hostname: String) -> Self {
50 self.hostname = Some(hostname);
51 self
52 }
53
54 pub fn peers(&self, protocol: Protocol) -> BTreeSet<Host> {
55 let mut peers = BTreeSet::new();
56
57 for (name, (ip_addrs, port)) in &self.peers {
58 if ip_addrs.len() > 1 {
59 info!("host {name} provided multiple IP addresses");
60 }
61
62 if let Some(ip_addr) = ip_addrs.into_iter().next() {
63 peers.insert(Host::from((protocol, (*ip_addr).into(), *port)));
64 } else {
65 warn!("host {name} provided an empty address list");
66 }
67 }
68
69 peers
70 }
71
72 pub async fn discover(&mut self) -> mdns_sd::Result<()> {
73 let receiver = self.daemon.browse(SERVICE_TYPE)?;
74
75 let mut search_started = false;
76
77 loop {
78 match receiver.recv_async().await {
79 Ok(event) => match event {
80 ServiceEvent::SearchStarted(_params) if search_started => {
81 trace!("mDNS discovered {} peers", self.peers.len());
82 break Ok(());
83 }
84 ServiceEvent::SearchStarted(params) => {
85 trace!("searching for peers of {params}");
86 search_started = true;
87 }
88 ServiceEvent::ServiceFound(name, addr) => {
89 trace!("discovered peer of {name} at {addr}")
90 }
91 ServiceEvent::ServiceResolved(info) => {
92 let full_name = info.get_fullname();
93 let addresses = info.get_addresses().clone();
94 let port = info.get_port();
95
96 self.peers.insert(full_name.to_string(), (addresses, port));
97
98 trace!("resolved peer: {full_name}")
99 }
100 other => trace!("ignoring mDNS event: {:?}", other),
101 },
102 Err(cause) => trace!("mDNS error: {cause}"),
103 }
104 }
105 }
106
107 pub async fn make_discoverable(&mut self, host: &Host) -> mdns_sd::Result<()> {
108 let hostname = if self.hostname().ends_with(".local") {
109 self.hostname().to_string()
110 } else {
111 format!("{}.local.", self.hostname())
112 };
113
114 let address = host.address().as_ip().expect("IP address");
115
116 let my_service = ServiceInfo::new(
117 SERVICE_TYPE,
118 "one",
119 &hostname,
120 &address,
121 host.port().unwrap_or(DEFAULT_PORT),
122 HashMap::<String, String>::default(),
123 )?;
124
125 info!("registering mDNS service for {} at {}", host, hostname);
126
127 self.daemon.register(my_service)?;
128
129 Ok(())
130 }
131}
132
133pub struct Replicator {
134 kernel: Arc<Kernel>,
135 txn_server: TxnServer,
136 peers: BTreeSet<Host>,
137 max_retries: u8,
138}
139
140impl Replicator {
141 pub fn max_retries(&self) -> u8 {
142 self.max_retries
143 }
144
145 pub fn set_max_retries(mut self, max_retries: u8) -> Self {
146 self.max_retries = max_retries;
147 self
148 }
149
150 pub fn peers(&self) -> &BTreeSet<Host> {
151 &self.peers
152 }
153
154 pub fn with_peers(mut self, peers: impl IntoIterator<Item = Host>) -> Self {
155 self.peers.extend(peers);
156 self
157 }
158
159 pub async fn replicate_and_join(&self) -> bool {
160 let mut i = 1;
161 let joined = loop {
162 match self
163 .kernel
164 .replicate_and_join(&self.txn_server, &self.peers)
165 .await
166 {
167 Ok(()) => {
168 break true;
169 }
170 Err(progress) => {
171 if progress {
172 i = 0
173 } else if i < self.max_retries {
174 i += 1
175 } else {
176 break false;
177 }
178 }
179 }
180 };
181
182 joined
183 }
184}
185
186impl<'a> From<&'a Server> for Replicator {
187 fn from(server: &'a Server) -> Self {
188 Self {
189 kernel: server.kernel(),
190 txn_server: server.txn_server(),
191 peers: BTreeSet::new(),
192 max_retries: DEFAULT_MAX_RETRIES,
193 }
194 }
195}
196
197pub struct Builder {
220 protocol: Protocol,
221 address: IpAddr,
222 port: u16,
223 request_ttl: Duration,
224 rpc_client: Arc<dyn RPCClient>,
225 data_dir: DirLock<CacheBlock>,
226 workspace: DirLock<CacheBlock>,
227 lead: Option<Host>,
228 owner: Option<Link>,
229 group: Option<Link>,
230 keys: HashSet<Aes256Key>,
231 secure: bool,
232}
233
234impl Builder {
235 pub fn load(
236 data_dir: DirLock<CacheBlock>,
237 workspace: DirLock<CacheBlock>,
238 rpc_client: Arc<dyn RPCClient>,
239 ) -> Self {
240 Self {
241 protocol: Protocol::default(),
242 address: Ipv4Addr::LOCALHOST.into(),
243 port: DEFAULT_PORT,
244 request_ttl: DEFAULT_TTL,
245 rpc_client,
246 data_dir,
247 workspace,
248 lead: None,
249 owner: None,
250 group: None,
251 keys: HashSet::new(),
252 secure: true,
253 }
254 }
255
256 pub async fn build(mut self) -> Server {
257 let host = self.host();
258 let lead = self.lead.unwrap_or_else(|| host.clone());
259
260 if self.secure {
261 if self.owner.is_none() {
262 panic!("a server without an owner cannot be secure--specify an owner or disable security");
263 } else if self.group.is_none() {
264 self.group = self.owner.clone();
265 }
266 }
267
268 let txn_id = TxnId::new(NetworkTime::now());
269
270 let data_dir = fs::Dir::load(txn_id, self.data_dir)
271 .await
272 .expect("data dir");
273
274 let schema = Schema::new(lead, host.clone(), self.owner, self.group, self.keys);
275
276 let kernel: Arc<Kernel> = fs::Persist::load_or_create(txn_id, schema, data_dir)
277 .map_ok(Arc::new)
278 .await
279 .expect("kernel");
280
281 kernel.commit(txn_id).await;
282 kernel.finalize(&txn_id).await;
283
284 info!("committed kernel");
285
286 let client = Client::new(host, kernel.clone(), self.rpc_client);
287 let txn_server = TxnServer::create(client, self.workspace, self.request_ttl);
288
289 #[cfg(feature = "service")]
290 {
291 let txn = txn_server
292 .create_txn(NetworkTime::now())
293 .expect("transaction context");
294
295 kernel.recover(&txn).await.expect("recover service state");
296 }
297
298 Server::new(kernel, txn_server).expect("server")
299 }
300
301 pub fn detect_address(mut self) -> Self {
302 self.address = local_ip_address::local_ip().expect("local IP address");
303 self
304 }
305
306 pub fn host(&mut self) -> Host {
307 Host::from((self.protocol, self.address.into(), self.port))
308 }
309
310 pub fn set_port(mut self, port: u16) -> Self {
311 self.port = port;
312 self
313 }
314
315 pub fn set_group(mut self, group: Link) -> Self {
316 self.group = Some(group);
317 self
318 }
319
320 pub fn set_owner(mut self, owner: Link) -> Self {
321 self.owner = Some(owner);
322 self
323 }
324
325 pub fn set_secure(mut self, secure: bool) -> Self {
326 self.secure = secure;
327 self
328 }
329
330 pub fn with_keys<Keys: IntoIterator<Item = Aes256Key>>(mut self, keys: Keys) -> Self {
331 self.keys.extend(keys);
332 self
333 }
334}