tc_server/
builder.rs

1use std::collections::{BTreeSet, HashMap, HashSet};
2use std::net::{IpAddr, Ipv4Addr};
3use std::sync::Arc;
4use std::time::Duration;
5
6use freqfs::DirLock;
7use futures::TryFutureExt;
8use gethostname::gethostname;
9use log::{info, trace, warn};
10use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo};
11
12#[cfg(feature = "service")]
13use tc_state::chain::Recover;
14use tc_state::CacheBlock;
15use tc_transact::{fs, TxnId};
16use tc_value::{Host, Link, Protocol};
17use tcgeneric::NetworkTime;
18
19use crate::aes256::Key as Aes256Key;
20use crate::client::Client;
21use crate::kernel::{Kernel, Schema};
22use crate::server::Server;
23use crate::txn::TxnServer;
24use crate::{RPCClient, DEFAULT_MAX_RETRIES, DEFAULT_PORT, DEFAULT_TTL, SERVICE_TYPE};
25
26pub struct Broadcast {
27    hostname: Option<String>,
28    daemon: ServiceDaemon,
29    peers: HashMap<String, (HashSet<IpAddr>, u16)>,
30}
31
32impl Broadcast {
33    pub fn new() -> Self {
34        Self {
35            hostname: None,
36            daemon: ServiceDaemon::new().expect("mDNS daemon"),
37            peers: HashMap::new(),
38        }
39    }
40
41    pub fn hostname(&mut self) -> &str {
42        if self.hostname.is_none() {
43            self.hostname = gethostname().into_string().expect("hostname").into();
44        }
45
46        self.hostname.as_ref().expect("hostname")
47    }
48
49    pub fn set_hostname(mut self, hostname: String) -> Self {
50        self.hostname = Some(hostname);
51        self
52    }
53
54    pub fn peers(&self, protocol: Protocol) -> BTreeSet<Host> {
55        let mut peers = BTreeSet::new();
56
57        for (name, (ip_addrs, port)) in &self.peers {
58            if ip_addrs.len() > 1 {
59                info!("host {name} provided multiple IP addresses");
60            }
61
62            if let Some(ip_addr) = ip_addrs.into_iter().next() {
63                peers.insert(Host::from((protocol, (*ip_addr).into(), *port)));
64            } else {
65                warn!("host {name} provided an empty address list");
66            }
67        }
68
69        peers
70    }
71
72    pub async fn discover(&mut self) -> mdns_sd::Result<()> {
73        let receiver = self.daemon.browse(SERVICE_TYPE)?;
74
75        let mut search_started = false;
76
77        loop {
78            match receiver.recv_async().await {
79                Ok(event) => match event {
80                    ServiceEvent::SearchStarted(_params) if search_started => {
81                        trace!("mDNS discovered {} peers", self.peers.len());
82                        break Ok(());
83                    }
84                    ServiceEvent::SearchStarted(params) => {
85                        trace!("searching for peers of {params}");
86                        search_started = true;
87                    }
88                    ServiceEvent::ServiceFound(name, addr) => {
89                        trace!("discovered peer of {name} at {addr}")
90                    }
91                    ServiceEvent::ServiceResolved(info) => {
92                        let full_name = info.get_fullname();
93                        let addresses = info.get_addresses().clone();
94                        let port = info.get_port();
95
96                        self.peers.insert(full_name.to_string(), (addresses, port));
97
98                        trace!("resolved peer: {full_name}")
99                    }
100                    other => trace!("ignoring mDNS event: {:?}", other),
101                },
102                Err(cause) => trace!("mDNS error: {cause}"),
103            }
104        }
105    }
106
107    pub async fn make_discoverable(&mut self, host: &Host) -> mdns_sd::Result<()> {
108        let hostname = if self.hostname().ends_with(".local") {
109            self.hostname().to_string()
110        } else {
111            format!("{}.local.", self.hostname())
112        };
113
114        let address = host.address().as_ip().expect("IP address");
115
116        let my_service = ServiceInfo::new(
117            SERVICE_TYPE,
118            "one",
119            &hostname,
120            &address,
121            host.port().unwrap_or(DEFAULT_PORT),
122            HashMap::<String, String>::default(),
123        )?;
124
125        info!("registering mDNS service for {} at {}", host, hostname);
126
127        self.daemon.register(my_service)?;
128
129        Ok(())
130    }
131}
132
133pub struct Replicator {
134    kernel: Arc<Kernel>,
135    txn_server: TxnServer,
136    peers: BTreeSet<Host>,
137    max_retries: u8,
138}
139
140impl Replicator {
141    pub fn max_retries(&self) -> u8 {
142        self.max_retries
143    }
144
145    pub fn set_max_retries(mut self, max_retries: u8) -> Self {
146        self.max_retries = max_retries;
147        self
148    }
149
150    pub fn peers(&self) -> &BTreeSet<Host> {
151        &self.peers
152    }
153
154    pub fn with_peers(mut self, peers: impl IntoIterator<Item = Host>) -> Self {
155        self.peers.extend(peers);
156        self
157    }
158
159    pub async fn replicate_and_join(&self) -> bool {
160        let mut i = 1;
161        let joined = loop {
162            match self
163                .kernel
164                .replicate_and_join(&self.txn_server, &self.peers)
165                .await
166            {
167                Ok(()) => {
168                    break true;
169                }
170                Err(progress) => {
171                    if progress {
172                        i = 0
173                    } else if i < self.max_retries {
174                        i += 1
175                    } else {
176                        break false;
177                    }
178                }
179            }
180        };
181
182        joined
183    }
184}
185
186impl<'a> From<&'a Server> for Replicator {
187    fn from(server: &'a Server) -> Self {
188        Self {
189            kernel: server.kernel(),
190            txn_server: server.txn_server(),
191            peers: BTreeSet::new(),
192            max_retries: DEFAULT_MAX_RETRIES,
193        }
194    }
195}
196
197/// A builder struct for a [`Server`].
198///
199/// The expected sequence of events to bootstrap a server is:
200///  1. Initialize a [`freqfs::Cache`]
201///  2. Load the data directory and transactional workspace into the cache
202///  3. Initialize an implementation of [`RPCClient`]
203///  4. Create a new [`Builder`] with the data directory, workspace, and RPC client
204///  5. Register one or more [`crate::aes256::Key`]s to use for symmetric encryption
205///  6. For a secure server, provide links to the user and group authorized to administer the server
206///  7. Load the kernel
207///  8. Start the public interface for the server (e.g. HTTP or HTTPS)
208///  9. Discover peers via mDNS
209///  10. Replicate from the first peer to respond using one of the provided encryption keys
210///    10.1 For each dir, replicate the entries in the dir (not their state, i.e. not recursively)
211///    10.2 Repeat step 10.1 until all directory entries are replicated
212///    10.3 Replicate each chain in each service
213///  11. Send requests to join the replica set authenticated using the hash of the present replica state, all in a single transaction
214///  12. Repeat steps 10-11 until successful
215///  13. Mark the server ready to receive requests from a load balancer
216///  14. Broadcast the server's availability via mDNS
217///
218/// See the `examples` dir for usage examples.
219pub struct Builder {
220    protocol: Protocol,
221    address: IpAddr,
222    port: u16,
223    request_ttl: Duration,
224    rpc_client: Arc<dyn RPCClient>,
225    data_dir: DirLock<CacheBlock>,
226    workspace: DirLock<CacheBlock>,
227    lead: Option<Host>,
228    owner: Option<Link>,
229    group: Option<Link>,
230    keys: HashSet<Aes256Key>,
231    secure: bool,
232}
233
234impl Builder {
235    pub fn load(
236        data_dir: DirLock<CacheBlock>,
237        workspace: DirLock<CacheBlock>,
238        rpc_client: Arc<dyn RPCClient>,
239    ) -> Self {
240        Self {
241            protocol: Protocol::default(),
242            address: Ipv4Addr::LOCALHOST.into(),
243            port: DEFAULT_PORT,
244            request_ttl: DEFAULT_TTL,
245            rpc_client,
246            data_dir,
247            workspace,
248            lead: None,
249            owner: None,
250            group: None,
251            keys: HashSet::new(),
252            secure: true,
253        }
254    }
255
256    pub async fn build(mut self) -> Server {
257        let host = self.host();
258        let lead = self.lead.unwrap_or_else(|| host.clone());
259
260        if self.secure {
261            if self.owner.is_none() {
262                panic!("a server without an owner cannot be secure--specify an owner or disable security");
263            } else if self.group.is_none() {
264                self.group = self.owner.clone();
265            }
266        }
267
268        let txn_id = TxnId::new(NetworkTime::now());
269
270        let data_dir = fs::Dir::load(txn_id, self.data_dir)
271            .await
272            .expect("data dir");
273
274        let schema = Schema::new(lead, host.clone(), self.owner, self.group, self.keys);
275
276        let kernel: Arc<Kernel> = fs::Persist::load_or_create(txn_id, schema, data_dir)
277            .map_ok(Arc::new)
278            .await
279            .expect("kernel");
280
281        kernel.commit(txn_id).await;
282        kernel.finalize(&txn_id).await;
283
284        info!("committed kernel");
285
286        let client = Client::new(host, kernel.clone(), self.rpc_client);
287        let txn_server = TxnServer::create(client, self.workspace, self.request_ttl);
288
289        #[cfg(feature = "service")]
290        {
291            let txn = txn_server
292                .create_txn(NetworkTime::now())
293                .expect("transaction context");
294
295            kernel.recover(&txn).await.expect("recover service state");
296        }
297
298        Server::new(kernel, txn_server).expect("server")
299    }
300
301    pub fn detect_address(mut self) -> Self {
302        self.address = local_ip_address::local_ip().expect("local IP address");
303        self
304    }
305
306    pub fn host(&mut self) -> Host {
307        Host::from((self.protocol, self.address.into(), self.port))
308    }
309
310    pub fn set_port(mut self, port: u16) -> Self {
311        self.port = port;
312        self
313    }
314
315    pub fn set_group(mut self, group: Link) -> Self {
316        self.group = Some(group);
317        self
318    }
319
320    pub fn set_owner(mut self, owner: Link) -> Self {
321        self.owner = Some(owner);
322        self
323    }
324
325    pub fn set_secure(mut self, secure: bool) -> Self {
326        self.secure = secure;
327        self
328    }
329
330    pub fn with_keys<Keys: IntoIterator<Item = Aes256Key>>(mut self, keys: Keys) -> Self {
331        self.keys.extend(keys);
332        self
333    }
334}