1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
use crate::cnf::NORMAL_FETCH_SIZE;
use crate::dbs::node::Node;
use crate::err::Error;
use crate::kvs::Datastore;
use crate::kvs::Live;
use crate::kvs::LockType::*;
use crate::kvs::TransactionType::*;
use crate::sql::statements::LiveStatement;
use std::time::Duration;
const TARGET: &str = "surrealdb::core::kvs::node";
impl Datastore {
	/// Inserts a node for the first time into the cluster.
	///
	/// This function should be run at server or database startup.
	///
	/// This function ensures that this node is entered into the clister
	/// membership entries. This function must be run at server or database
	/// startup, in order to write the initial entry and timestamp to storage.
	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
	pub async fn insert_node(&self, id: uuid::Uuid) -> Result<(), Error> {
		// Log when this method is run
		trace!(target: TARGET, "Inserting node in the cluster");
		// Refresh system usage metrics
		crate::sys::refresh().await;
		// Open transaction and set node data
		let txn = self.transaction(Write, Optimistic).await?;
		let key = crate::key::root::nd::Nd::new(id);
		let now = self.clock_now().await;
		let val = Node::new(id, now, false);
		match run!(txn, txn.put(key, val, None).await) {
			Err(Error::TxKeyAlreadyExists) => Err(Error::ClAlreadyExists {
				value: id.to_string(),
			}),
			other => other,
		}
	}
	/// Updates an already existing node in the cluster.
	///
	/// This function should be run periodically at a regular interval.
	///
	/// This function updates the entry for this node with an up-to-date
	/// timestamp. This ensures that the node is not marked as expired by any
	/// garbage collection tasks, preventing any data cleanup for this node.
	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
	pub async fn update_node(&self, id: uuid::Uuid) -> Result<(), Error> {
		// Log when this method is run
		trace!(target: TARGET, "Updating node in the cluster");
		// Refresh system usage metrics
		crate::sys::refresh().await;
		// Open transaction and set node data
		let txn = self.transaction(Write, Optimistic).await?;
		let key = crate::key::root::nd::new(id);
		let now = self.clock_now().await;
		let val = Node::new(id, now, false);
		run!(txn, txn.replace(key, val).await)
	}
	/// Deletes a node from the cluster.
	///
	/// This function should be run when a node is shutting down.
	///
	/// This function marks the node as archived, ready for garbage collection.
	/// Later on when garbage collection is running the live queries assigned
	/// to this node will be removed, along with the node itself.
	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
	pub async fn delete_node(&self, id: uuid::Uuid) -> Result<(), Error> {
		// Log when this method is run
		trace!(target: TARGET, "Archiving node in the cluster");
		// Open transaction and set node data
		let txn = self.transaction(Write, Optimistic).await?;
		let key = crate::key::root::nd::new(id);
		let val = catch!(txn, txn.get_node(id).await);
		let val = val.as_ref().archive();
		run!(txn, txn.replace(key, val).await)
	}
	/// Expires nodes which have timedout from the cluster.
	///
	/// This function should be run periodically at an interval.
	///
	/// This function marks the node as archived, ready for garbage collection.
	/// Later on when garbage collection is running the live queries assigned
	/// to this node will be removed, along with the node itself.
	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
	pub async fn expire_nodes(&self) -> Result<(), Error> {
		// Log when this method is run
		trace!(target: TARGET, "Archiving expired nodes in the cluster");
		// Fetch all of the inactive nodes
		let inactive = {
			let txn = self.transaction(Read, Optimistic).await?;
			let nds = catch!(txn, txn.all_nodes().await);
			let now = self.clock_now().await;
			catch!(txn, txn.cancel().await);
			// Filter the inactive nodes
			nds.iter()
				.filter_map(|n| {
					// Check that the node is active and has expired
					match n.is_active() && n.hb < now - Duration::from_secs(30) {
						true => Some(n.to_owned()),
						false => None,
					}
				})
				.collect::<Vec<_>>()
		};
		// Check if there are inactive nodes
		if !inactive.is_empty() {
			// Open a writeable transaction
			let txn = self.transaction(Write, Optimistic).await?;
			// Archive the inactive nodes
			for nd in inactive.iter() {
				// Log the live query scanning
				trace!(target: TARGET, id = %nd.id, "Archiving node in the cluster");
				// Mark the node as archived
				let val = nd.archive();
				// Get the key for the node entry
				let key = crate::key::root::nd::new(nd.id);
				// Update the node entry
				catch!(txn, txn.replace(key, val).await);
			}
			// Commit the changes
			catch!(txn, txn.commit().await);
		}
		// Everything was successful
		Ok(())
	}
	/// Removes and cleans up nodes which are no longer in this cluster.
	///
	/// This function should be run periodically at an interval.
	///
	/// This function clears up all nodes which have been marked as archived.
	/// When a matching node is found, all node queries, and table queries are
	/// garbage collected, before the node itself is completely deleted.
	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
	pub async fn remove_nodes(&self) -> Result<(), Error> {
		// Log when this method is run
		trace!(target: TARGET, "Cleaning up archived nodes in the cluster");
		// Fetch all of the archived nodes
		let archived = {
			let txn = self.transaction(Read, Optimistic).await?;
			let nds = catch!(txn, txn.all_nodes().await);
			catch!(txn, txn.cancel().await);
			// Filter the archived nodes
			nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
		};
		// Loop over the archived nodes
		for id in archived.iter() {
			// Open a writeable transaction
			let txn = self.transaction(Write, Optimistic).await?;
			{
				// Log the live query scanning
				trace!(target: TARGET, id = %id, "Deleting live queries for node");
				// Scan the live queries for this node
				let beg = crate::key::node::lq::prefix(*id);
				let end = crate::key::node::lq::suffix(*id);
				let mut next = Some(beg..end);
				while let Some(rng) = next {
					// Pause and yield execution
					yield_now!();
					// Fetch the next batch of keys and values
					let max = *NORMAL_FETCH_SIZE;
					let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
					next = res.next;
					for (k, v) in res.result.iter() {
						// Decode the data for this live query
						let val: Live = v.into();
						// Get the key for this node live query
						let nlq = catch!(txn, crate::key::node::lq::Lq::decode(k));
						// Check that the node for this query is archived
						if archived.contains(&nlq.nd) {
							// Get the key for this table live query
							let tlq = crate::key::table::lq::new(&val.ns, &val.db, &val.tb, nlq.lq);
							// Delete the table live query
							catch!(txn, txn.clr(tlq).await);
							// Delete the node live query
							catch!(txn, txn.clr(nlq).await);
						}
					}
				}
			}
			{
				// Log the node deletion
				trace!(target: TARGET, id = %id, "Deleting node from the cluster");
				// Get the key for the node entry
				let key = crate::key::root::nd::new(*id);
				// Delete the cluster node entry
				catch!(txn, txn.clr(key).await);
			}
			// Commit the changes
			catch!(txn, txn.commit().await);
		}
		// Everything was successful
		Ok(())
	}
	/// Clean up all other miscellaneous data.
	///
	/// This function should be run periodically at an interval.
	///
	/// This function clears up all data which might have been missed from
	/// previous cleanup runs, or when previous runs failed. This function
	/// currently deletes all live queries, for nodes which no longer exist
	/// in the cluster, from all namespaces, databases, and tables. It uses
	/// a number of transactions in order to prevent failure of large or
	/// long-running transactions on distributed storage engines.
	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
	pub async fn garbage_collect(&self) -> Result<(), Error> {
		// Log the node deletion
		trace!(target: TARGET, "Garbage collecting all miscellaneous data");
		// Fetch archived nodes
		let archived = {
			let txn = self.transaction(Read, Optimistic).await?;
			let nds = catch!(txn, txn.all_nodes().await);
			// Filter the archived nodes
			nds.iter().filter_map(Node::archived).collect::<Vec<_>>()
		};
		// Fetch all namespaces
		let nss = {
			let txn = self.transaction(Read, Optimistic).await?;
			catch!(txn, txn.all_ns().await)
		};
		// Loop over all namespaces
		for ns in nss.iter() {
			// Log the namespace
			trace!(target: TARGET, "Garbage collecting data in namespace {}", ns.name);
			// Fetch all databases
			let dbs = {
				let txn = self.transaction(Read, Optimistic).await?;
				catch!(txn, txn.all_db(&ns.name).await)
			};
			// Loop over all databases
			for db in dbs.iter() {
				// Log the namespace
				trace!(target: TARGET, "Garbage collecting data in database {}/{}", ns.name, db.name);
				// Fetch all tables
				let tbs = {
					let txn = self.transaction(Read, Optimistic).await?;
					catch!(txn, txn.all_tb(&ns.name, &db.name, None).await)
				};
				// Loop over all tables
				for tb in tbs.iter() {
					// Log the namespace
					trace!(target: TARGET, "Garbage collecting data in table {}/{}/{}", ns.name, db.name, tb.name);
					// Iterate over the table live queries
					let txn = self.transaction(Write, Optimistic).await?;
					let beg = crate::key::table::lq::prefix(&ns.name, &db.name, &tb.name);
					let end = crate::key::table::lq::suffix(&ns.name, &db.name, &tb.name);
					let mut next = Some(beg..end);
					while let Some(rng) = next {
						// Pause and yield execution
						yield_now!();
						// Fetch the next batch of keys and values
						let max = *NORMAL_FETCH_SIZE;
						let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
						next = res.next;
						for (k, v) in res.result.iter() {
							// Decode the LIVE query statement
							let stm: LiveStatement = v.into();
							// Get the node id and the live query id
							let (nid, lid) = (stm.node.0, stm.id.0);
							// Check that the node for this query is archived
							if archived.contains(&stm.node) {
								// Get the key for this node live query
								let tlq = catch!(txn, crate::key::table::lq::Lq::decode(k));
								// Get the key for this table live query
								let nlq = crate::key::node::lq::new(nid, lid);
								// Delete the node live query
								catch!(txn, txn.clr(nlq).await);
								// Delete the table live query
								catch!(txn, txn.clr(tlq).await);
							}
						}
					}
					// Commit the changes
					txn.commit().await?;
				}
			}
		}
		// All ok
		Ok(())
	}
	/// Clean up the live queries for a disconnected connection.
	///
	/// This function should be run when a WebSocket disconnects.
	///
	/// This function clears up the live queries on the current node, which
	/// are specified by uique live query UUIDs. This is necessary when a
	/// WebSocket disconnects, and any associated live queries need to be
	/// cleaned up and removed.
	#[instrument(err, level = "trace", target = "surrealdb::core::kvs::node", skip(self))]
	pub async fn delete_queries(&self, ids: Vec<uuid::Uuid>) -> Result<(), Error> {
		// Log the node deletion
		trace!(target: TARGET, "Deleting live queries for a connection");
		// Fetch expired nodes
		let txn = self.transaction(Write, Optimistic).await?;
		// Loop over the live query unique ids
		for id in ids.into_iter() {
			// Get the key for this node live query
			let nlq = crate::key::node::lq::new(self.id(), id);
			// Fetch the LIVE meta data node entry
			if let Some(val) = catch!(txn, txn.get(nlq, None).await) {
				// Decode the data for this live query
				let lq: Live = val.into();
				// Get the key for this node live query
				let nlq = crate::key::node::lq::new(self.id(), id);
				// Get the key for this table live query
				let tlq = crate::key::table::lq::new(&lq.ns, &lq.db, &lq.tb, id);
				// Delete the table live query
				catch!(txn, txn.clr(tlq).await);
				// Delete the node live query
				catch!(txn, txn.clr(nlq).await);
			}
		}
		// Commit the changes
		txn.commit().await?;
		// All ok
		Ok(())
	}
}