Skip to main content

chord_dht/core/
node.rs

1use std::{
2	collections::{HashMap},
3	sync::{Arc, RwLock}
4};
5use rand::{Rng, SeedableRng};
6use tarpc::{
7	context,
8	tokio_serde::formats::Bincode,
9	server::Channel,
10	serde::Serialize,
11	serde::Deserialize
12};
13use futures::{future, prelude::*};
14use log::{info, warn, debug};
15use super::{
16	ring::*,
17	config::*,
18	data_store::*,
19	error::{
20		*,
21		DhtError::*
22	}
23};
24use crate::{rpc::*, server::ServerManager};
25use super::calculate_hash;
26
27// Data part of the node
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Node {
30	pub id: Digest,
31	pub addr: String
32}
33
34impl std::fmt::Display for Node {
35	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36		write!(f, "Node({}, {})", self.id, self.addr)
37	}
38}
39
40#[derive(Clone)]
41pub struct NodeServer {
42	node: Node,
43	store: DataStore,
44	config: Config,
45	predecessor: Arc<RwLock<Option<Node>>>,
46	// The first entry is maintained by successor_list[0]
47	finger_table: Arc<RwLock<Vec<Node>>>,
48	// Maintain (fault_tolerance + 1) successors for recovery
49	successor_list: Arc<RwLock<Vec<Node>>>,
50	// connection to remote nodes
51	connection_map: Arc<RwLock<HashMap<Digest, NodeServiceClient>>>
52}
53
54impl NodeServer {
55	pub fn new(node: Node, config: Config) -> Self {
56		assert!(config.replication_factor != 0, "replication_factor equal to 0");
57		assert!(config.replication_factor <= config.fault_tolerance + 1, "replication_factor greater than fault_tolerance + 1");
58
59		// init a ring with only one node
60		// (see second part of n.join in Figure 6)
61		let finger_table = vec![node.clone(); NUM_BITS];
62		let successor_list = vec![node.clone(); config.fault_tolerance as usize + 1];
63
64		NodeServer {
65			node: node.clone(),
66			store: DataStore::new(),
67			config: config,
68			predecessor: Arc::new(RwLock::new(Some(node.clone()))),
69			finger_table: Arc::new(RwLock::new(finger_table)),
70			successor_list: Arc::new(RwLock::new(successor_list)),
71			connection_map: Arc::new(RwLock::new(HashMap::new()))
72		}
73	}
74
75	pub fn get_successor(&self) -> Node {
76		self.successor_list.read().unwrap()[0].clone()
77	}
78
79	pub fn get_successor_list(&self) -> Vec<Node> {
80		self.successor_list.read().unwrap().clone()
81	}
82
83	pub fn set_successor_list(&self, succ_list: Vec<Node>) {
84		*self.successor_list.write().unwrap() = succ_list;
85	}
86
87	pub fn get_predecessor(&self) -> Option<Node> {
88		self.predecessor.read().unwrap().clone()
89	}
90
91	pub fn set_predecessor(&self, node: Option<Node>) {
92		*self.predecessor.write().unwrap() = node;
93	}
94
95	/// Start the server
96	/// Returns if the listener starts
97	pub async fn start(&mut self, join_node: Option<Node>) -> DhtResult<ServerManager> {
98		// channel used to shutdown (true means shutdown)
99		let (tx, rx) = tokio::sync::watch::channel(false);
100
101		// Listen locally first
102		let mut listener = tarpc::serde_transport::tcp::listen(&self.node.addr, Bincode::default).await?;
103		let server = self.clone();
104		let mut listener_rx = rx.clone();
105		// Listen for rpc call
106		let listener_handle = tokio::spawn(async move {
107			listener.config_mut().max_frame_length(usize::MAX);
108			let listener_fut = listener
109				.filter_map(|r| future::ready(r.ok()))
110				.map(tarpc::server::BaseChannel::with_defaults)
111				.map(|channel| async {
112					// Clone a new server to share the data in Arc
113					channel.execute(server.clone().serve()).await;
114				})
115				.buffer_unordered(server.config.max_connections as usize)
116				.for_each(|_| async {});
117
118			// listener_fut.await;
119			debug!("{}: listening", server.node);
120			
121			tokio::select! {
122				_ = listener_fut => {
123					warn!("{}: listener terminated", server.node);
124				},
125				_ = listener_rx.changed() => {
126					debug!("{}: listener stopped gracefully", server.node);
127				}
128			};
129		});
130
131		// Join node after server starts
132		if let Some(n) = join_node.as_ref() {
133			match self.join(&n).await {
134				Ok(_) => (),
135				Err(e) => {
136					return Err(JoinFailure {
137						node: n.clone(),
138						message: e.to_string()
139					});
140				}
141			};
142		}
143
144		// Periodically stabilize
145		let mut server = self.clone();
146		let mut stabilize_rx = rx.clone();
147		let stabilize_interval = self.config.stabilize_interval;
148		let stabilize_handle = tokio::spawn(async move {
149			if stabilize_interval > 0 {
150				let mut interval = tokio::time::interval(
151					tokio::time::Duration::from_millis(stabilize_interval)
152				);
153
154				tokio::select! {
155					_ = async {
156						interval.tick().await;
157						server.stabilize().await;
158					} => (),
159					_ = stabilize_rx.changed() => {
160						debug!("{}: stabilize task stopped gracefully", server.node);
161					}
162				};
163			}
164		});
165
166		// Periodically refresh finger table
167		let mut server = self.clone();
168		let mut fix_finger_rx = rx.clone();
169		let fix_finger_interval = self.config.fix_finger_interval;
170		let fix_finger_handle = tokio::spawn(async move {
171			if fix_finger_interval > 0 {
172				let mut interval = tokio::time::interval(
173					tokio::time::Duration::from_millis(fix_finger_interval)
174				);
175				// StdRng can be sent across threads
176				let mut rng = rand::prelude::StdRng::from_entropy();
177
178				tokio::select! {
179					_ = async {
180						interval.tick().await;
181						let index = rng.gen_range(1..NUM_BITS);
182						server.fix_finger(index).await;
183					} => (),
184					_ = fix_finger_rx.changed() => {
185						debug!("{}: fix_finger task stopped gracefully", server.node);
186					}
187				};
188			}
189		});
190
191		info!("{}: listening at {}", self.node, self.node.addr);
192		// An aggregated handle for all tasks
193		let joined_handle = future::join_all(vec![
194			listener_handle,
195			stabilize_handle,
196			fix_finger_handle
197		]);
198
199		Ok(ServerManager {
200			handle: joined_handle,
201			tx: tx
202		})
203	}
204
205	// Calculate start field of finger table (see Table 1)
206	// k in [0, m)
207	pub fn finger_table_start(&self, k: usize) -> u64 {
208		self.node.id.wrapping_add(1 << k)
209	}
210	
211	async fn get_connection(&mut self, node: &Node) -> DhtResult<NodeServiceClient> {
212		// Use block to drop map immediately after use
213		{
214			let map = self.connection_map.read().unwrap();
215			if let Some(c) = map.get(&node.id) {
216				// client can be cloned with lost cost
217				return Ok(c.clone());
218			}
219		}
220		{
221			debug!("{}: connecting to {}", self.node, node);
222			let c = crate::client::setup_client(&node.addr).await?;
223			debug!("{}: connected to {}", self.node, node);
224			let mut map = self.connection_map.write().unwrap();
225			map.insert(node.id, c.clone());
226			return Ok(c);
227		}
228	}
229	
230	/// Remove broken connections
231	pub fn remove_connection(&self, node: &Node) {
232		let mut map = self.connection_map.write().unwrap();
233		map.remove(&node.id);
234	}
235
236	// Figure 7: n.join
237	pub async fn join(&mut self, node: &Node) -> DhtResult<()> {
238		debug!("{}: joining {}", self.node, node);
239		self.set_predecessor(None);
240		let ctx = context::current();
241		let n = self.get_connection(node).await?;
242		let succ_list = n.find_successor_list_rpc(ctx, self.node.id).await?;
243		self.set_successor_list(succ_list);
244		debug!("{}: joined {}", self.node, node);
245		Ok(())
246	}
247
248	// Figure 7: n.stabilize
249	pub async fn stabilize(&mut self) {
250		let ctx = context::current();
251
252		let successor_list = self.get_successor_list();
253		for mut succ in successor_list.into_iter() {
254			let mut n = match self.get_connection(&succ).await {
255				Ok(v) => v,
256				Err(e) => {
257					warn!("{}: failed to connect to {}: {}", self.node, succ, e);
258					// Try next successor
259					continue;
260				}
261			};
262
263			match n.get_predecessor_rpc(ctx).await {
264				Ok(pred) => {
265					// Update successors normally
266					let x = match pred {
267						Some(v) => v,
268						None => {
269							warn!("{}: empty predecessor of successor {}", self.node, succ);
270							return;
271						}
272					};
273					if in_range(x.id, self.node.id, succ.id) {
274						// update connection because succ change
275						n = match self.get_connection(&x).await {
276							Ok(v) => v,
277							Err(e) => {
278								warn!("{}: failed to connect to {}: {}", self.node, succ, e);
279								// Try next successor
280								continue;
281							}
282						};
283						// update succ
284						succ = x;
285					}
286
287					// Get succ_list from new node
288					// only update list if success
289					if let Ok(mut new_succ_list) = n.get_successor_list_rpc(ctx).await {
290						new_succ_list.pop();
291						new_succ_list.insert(0, succ);
292						self.set_successor_list(new_succ_list);
293						// ignore error here because it can only be fixed by stabilizing again
294						n.notify_rpc(ctx, self.node.clone()).await.unwrap_or(());
295					}
296
297					return;
298				},
299				Err(e) => {
300					warn!("{}: fail to stabilize: {}", self.node, e);
301					// Fail to connect to succ, remove it and try next
302					self.remove_connection(&succ);
303				}
304			}
305		}
306		panic!("{}: no live successors!", self.node);
307	}
308
309	// Figure 7: n.fix_fingers
310	pub async fn fix_finger(&mut self, index: usize) {
311		match self.find_successor_list(self.finger_table_start(index)).await {
312			Ok(succ) => {
313				let mut table = self.finger_table.write().unwrap();
314				table[index] = succ[0].clone();
315			},
316			Err(e) => {
317				warn!("{}: failed to fix finger: {}", self.node, e);
318			}
319		};
320	}
321
322	// A modified version using successor_list
323	// from figure 4: n.find_successor
324	async fn find_successor_list(&mut self, id: Digest) -> DhtResult<Vec<Node>> {
325		let n = self.find_predecessor(id).await?;
326		let c = self.get_connection(&n).await?;
327		let succ_list = c.get_successor_list_rpc(context::current()).await?;
328		Ok(succ_list)
329	}
330
331	// Figure 4: n.find_predecessor
332	async fn find_predecessor(&mut self, id: Digest) -> DhtResult<Node> {
333		debug!("{}: find_predecessor({})", self.node, id);
334		let mut n = self.node.clone();
335		let mut succ = self.get_successor();
336		let mut conn = self.get_connection(&n).await?;
337		let ctx = context::current();
338
339		// stop when id in (n, succ]
340		while !(in_range(id, n.id, succ.id) || id == succ.id) {
341			debug!("{}: find_predecessor range ({}, {}]", self.node, n.id, succ.id);
342			n = conn.closest_preceding_finger_rpc(ctx, id).await?;
343			conn = self.get_connection(&n).await?;
344			succ = conn.get_successor_rpc(ctx).await?;
345		}
346		debug!("{}: find_predecessor({}) returns {}", self.node, id, n);
347		Ok(n)
348	}
349
350	// Figure 4: n.closest_preceding_finger
351	async fn closest_preceding_finger(&mut self, id: Digest) -> Node {
352		let table = self.finger_table.read().unwrap();
353		for i in (0..NUM_BITS).rev() {
354			let f = if i > 0 {
355				table[i].clone()
356			} else {
357				// table[0] is maintained by successor_list[0]
358				self.get_successor()
359			};
360			if in_range(f.id, self.node.id, id) {
361				return f;
362			};
363		}
364		self.node.clone()
365	}
366
367	// Figure 7: n.notify
368	async fn notify(&mut self, node: Node) {
369		let pred = self.get_predecessor();
370		if let Some(p) = pred {
371			if !in_range(node.id, p.id, self.node.id) {
372				return;
373			}
374		}
375
376		debug!("{}: new predecessor set in notify: {}", self.node, node);
377		self.set_predecessor(Some(node));
378	}
379
380	// Get key on the ring
381	async fn get(&mut self, key: Key) -> DhtResult<Option<Value>> {
382		// Try readiing from local replica first
383		match self.store.get(&key) {
384			Some(v) => return Ok(Some(v)),
385			None => ()
386		};
387
388		// Fetch from the responsible node
389		let id = calculate_hash(&key);
390		let succ_list = self.find_successor_list(id).await?;
391		for succ in succ_list.iter() {
392			let c = self.get_connection(&succ).await?;
393			match c.get_local_rpc(context::current(), key.clone()).await {
394				Ok(value) => return Ok(value),
395				Err(e) => {
396					warn!("{}: fail to get key digest {} from {}: {}", self.node, id, succ, e);
397					// Continue trying next replica
398				}
399			};
400		}
401
402		Err(NoLiveReplica(id))
403	}
404
405	// Set key on the ring
406	async fn set(&mut self, key: Key, value: Option<Value>) -> DhtResult<()> {
407		let id = calculate_hash(&key);
408		let succ_list = self.find_successor_list(id).await?;
409		let c = self.get_connection(&succ_list[0]).await?;
410
411		c.replicate_rpc(context::current(), key, value).await?;
412		Ok(())
413	}
414
415	// Replicate key to (num - 1) successors and itself
416	async fn replicate(&mut self, key: Key, value: Option<Value>) -> DhtResult<()> {
417		// replicate it locally
418		self.store.set(key.clone(), value.clone());
419
420		// replicate data to (replication_factor - 1) nodes
421		let num = (self.config.replication_factor - 1) as usize;
422		if num > 0 {
423			let ctx = context::current();
424			// Must store conn because fut_list borrows them
425			let mut conn_list = Vec::new();
426			let mut fut_list = Vec::new();
427			for i in 0..num {
428				let node = self.successor_list.read().unwrap()[i].clone();
429				let c = self.get_connection(&node).await?;
430				conn_list.push(c);
431			}
432
433			for c in conn_list.iter() {
434				let k = key.clone();
435				let v = value.clone();
436				fut_list.push(c.set_local_rpc(ctx, k, v));
437			}
438
439			// replicate data concurrently
440			future::join_all(fut_list)
441				.await
442				.into_iter()
443				.collect::<Result<Vec<_>, _>>()?;
444		}
445		Ok(())
446	}
447}
448
449#[tarpc::server]
450impl NodeService for NodeServer {
451	async fn get_node_rpc(self, _: context::Context) -> Node {
452		self.node.clone()
453	}
454
455	async fn get_predecessor_rpc(self, _: context::Context) -> Option<Node> {
456		self.get_predecessor()
457	}
458
459	async fn get_successor_rpc(self, _: context::Context) -> Node {
460		self.get_successor()
461	}
462
463	async fn get_successor_list_rpc(self, _: context::Context) -> Vec<Node> {
464		self.get_successor_list()
465	}
466
467	async fn find_successor_list_rpc(mut self, _: context::Context, id: Digest) -> Vec<Node> {
468		loop {
469			for i in 0..(self.config.retry_limit+1) {
470				match self.find_successor_list(id).await {
471					Ok(succ_list) => return succ_list,
472					Err(e) => {
473						warn!("{}: find_successor_list_rpc failed (retry {}): {}", self.node, i, e);
474						tokio::time::sleep(
475							tokio::time::Duration::from_millis(self.config.retry_interval)
476						).await;
477					}
478				};
479			}
480
481			warn!("{}: find_successor_list_rpc retry limit reached", self.node);
482			// call stabilize to update successor_list
483			self.stabilize().await;
484		}
485	}
486
487	async fn find_predecessor_rpc(mut self, _: context::Context, id: Digest) -> Node {
488		loop {
489			for i in 0..(self.config.retry_limit+1) {
490				match self.find_predecessor(id).await {
491					Ok(succ_list) => return succ_list,
492					Err(e) => {
493						warn!("{}: find_predecessor_rpc failed (retry {}): {}", self.node, i, e);
494						tokio::time::sleep(
495							tokio::time::Duration::from_millis(self.config.retry_interval)
496						).await;
497					}
498				};
499			}
500
501			warn!("{}: find_predecessor_rpc retry limit reached", self.node);
502			// call stabilize to update successor_list
503			self.stabilize().await;
504		}
505	}
506
507	async fn closest_preceding_finger_rpc(mut self, _: context::Context, id: Digest) -> Node {
508		self.closest_preceding_finger(id).await
509	}
510
511	async fn notify_rpc(mut self, _: context::Context, node: Node) {
512		self.notify(node).await
513	}
514
515	async fn stabilize_rpc(mut self, _: context::Context) {
516		self.stabilize().await
517	}
518
519	async fn get_local_rpc(self, _: context::Context, key: Key) -> Option<Value> {
520		self.store.get(&key)
521	}
522
523	async fn set_local_rpc(self, _: context::Context, key: Key, value: Option<Value>) {
524		self.store.set(key, value)
525	}
526
527	async fn get_rpc(mut self, _: context::Context, key: Key) -> Option<Value> {
528		loop {
529			for i in 0..(self.config.retry_limit+1) {
530				match self.get(key.clone()).await {
531					Ok(value) => return value,
532					Err(e) => {
533						warn!("{}: get_rpc failed (retry {}): {}", self.node, i, e);
534						tokio::time::sleep(
535							tokio::time::Duration::from_millis(self.config.retry_interval)
536						).await;
537					}
538				};
539			}
540
541			warn!("{}: get_rpc retry limit reached", self.node);
542			// call stabilize to update successor_list
543			self.stabilize().await;
544		}
545	}
546
547	async fn set_rpc(mut self, _: context::Context, key: Key, value: Option<Value>) {
548		loop {
549			for i in 0..(self.config.retry_limit+1) {
550				match self.set(key.clone(), value.clone()).await {
551					Ok(_) => return,
552					Err(e) => {
553						warn!("{}: set_rpc failed (retry {}): {}", self.node, i, e);
554						tokio::time::sleep(
555							tokio::time::Duration::from_millis(self.config.retry_interval)
556						).await;
557					}
558				};
559			}
560
561			warn!("{}: set_rpc retry limit reached", self.node);
562			// call stabilize to update successor_list
563			self.stabilize().await;
564		}
565	}
566
567	async fn replicate_rpc(mut self, _: context::Context, key: Key, value: Option<Value>) {
568		loop {
569			for i in 0..(self.config.retry_limit+1) {
570				match self.replicate(key.clone(), value.clone()).await {
571					Ok(_) => return,
572					Err(e) => {
573						warn!("{}: replicate_rpc failed (retry {}): {}", self.node, i, e);
574						tokio::time::sleep(
575							tokio::time::Duration::from_millis(self.config.retry_interval)
576						).await;
577					}
578				};
579			}
580
581			warn!("{}: replicate_rpc retry limit reached", self.node);
582			// call stabilize to update successor_list
583			self.stabilize().await;
584		}
585	}
586}
587
588
589#[cfg(test)]
590mod tests {
591	use super::*;
592
593	async fn fix_all_fingers(server: &mut NodeServer) {
594		for i in 1..NUM_BITS {
595			server.fix_finger(i).await;
596		}
597	}
598
599	/// Test figure 3b, 5a
600	#[tokio::test]
601	async fn test_node_metadata() -> DhtResult<()> {
602		env_logger::init();
603
604		// Node 0
605		let n0 = Node {
606			addr: "localhost:9800".to_string(),
607			id: 0
608		};
609		// Node 1
610		let n1 = Node {
611			addr: "localhost:9801".to_string(),
612			id: 1
613		};
614		// Node 3
615		let n3 = Node {
616			addr: "localhost:9803".to_string(),
617			id: 3
618		};
619		// Node 6
620		let n6 = Node {
621			addr: "localhost:9806".to_string(),
622			id: 6
623		};
624
625		// Disable auto fix_finger and stabilize
626		let config = Config {
627			fix_finger_interval: 0,
628			stabilize_interval: 0,
629			..Config::default()
630		};
631		let mut s0 = NodeServer::new(n0.clone(), config.clone());
632		let m0 = s0.start(None).await?;
633		s0.stabilize().await;
634		// single-node ring
635		assert_eq!(s0.get_predecessor().unwrap().id, 0);
636		assert_eq!(s0.get_successor().id, 0);
637
638
639		// Node 1 joins node 0
640		let mut s1 = NodeServer::new(n1.clone(), config.clone());
641		let m1 = s1.start(Some(n0.clone())).await?;
642		assert_eq!(s1.get_successor().id, 0);
643
644		// Stabilize c1 first to notify c0
645		s1.stabilize().await;
646		assert_eq!(s0.get_predecessor().unwrap().id, 1);
647		s0.stabilize().await;
648		assert_eq!(s0.get_predecessor().unwrap().id, 1);
649		assert_eq!(s0.get_successor().id, 1);
650		assert_eq!(s1.get_predecessor().unwrap().id, 0);
651		assert_eq!(s1.get_successor().id, 0);
652		
653		// Fix fingers
654		fix_all_fingers(&mut s0).await;
655		{
656			let table = s0.finger_table.read().unwrap();
657			assert_eq!(table[1].id, 0);
658		}
659		fix_all_fingers(&mut s1).await;
660		{
661			let table = s1.finger_table.read().unwrap();
662			assert_eq!(table[1].id, 0);
663			assert_eq!(table[2].id, 0);
664		}
665
666
667		// Node 3 joins node 1
668		let mut s3 = NodeServer::new(n3.clone(), config.clone());
669		let m3 = s3.start(Some(n1.clone())).await?;
670		s3.stabilize().await;
671		s1.stabilize().await;
672		s0.stabilize().await;
673
674		assert_eq!(s3.get_predecessor().unwrap().id, 1);
675		assert_eq!(s1.get_predecessor().unwrap().id, 0);
676		assert_eq!(s0.get_predecessor().unwrap().id, 3);
677
678		// See finger table in Figure 3b
679		fix_all_fingers(&mut s0).await;
680		{
681			let table = s0.finger_table.read().unwrap();
682			assert_eq!(s0.get_successor().id, 1);
683			assert_eq!(table[1].id, 3);
684			assert_eq!(table[2].id, 0);
685		}
686		fix_all_fingers(&mut s1).await;
687		{
688			let table = s1.finger_table.read().unwrap();
689			assert_eq!(s1.get_successor().id, 3);
690			assert_eq!(table[1].id, 3);
691			assert_eq!(table[2].id, 0);
692		}
693		fix_all_fingers(&mut s3).await;
694		{
695			let table = s3.finger_table.read().unwrap();
696			assert_eq!(s3.get_successor().id, 0);
697			assert_eq!(table[1].id, 0);
698			assert_eq!(table[2].id, 0);
699		}
700
701
702		// Node 6 joins node 0
703		let mut s6 = NodeServer::new(n6.clone(), config.clone());
704		let m6 = s6.start(Some(n0.clone())).await?;
705		s6.stabilize().await;
706		s3.stabilize().await;
707		s1.stabilize().await;
708		s0.stabilize().await;
709
710		assert_eq!(s6.get_predecessor().unwrap().id, 3);
711		assert_eq!(s0.get_predecessor().unwrap().id, 6);
712		assert_eq!(s1.get_predecessor().unwrap().id, 0);
713		assert_eq!(s3.get_predecessor().unwrap().id, 1);
714
715		// See finger table in Figure 6a
716		fix_all_fingers(&mut s0).await;
717		{
718			let table = s0.finger_table.read().unwrap();
719			assert_eq!(s0.get_successor().id, 1);
720			assert_eq!(table[1].id, 3);
721			assert_eq!(table[2].id, 6);
722		}
723		fix_all_fingers(&mut s1).await;
724		{
725			let table = s1.finger_table.read().unwrap();
726			assert_eq!(s1.get_successor().id, 3);
727			assert_eq!(table[1].id, 3);
728			assert_eq!(table[2].id, 6);
729		}
730		fix_all_fingers(&mut s3).await;
731		{
732			let table = s3.finger_table.read().unwrap();
733			assert_eq!(s3.get_successor().id, 6);
734			assert_eq!(table[1].id, 6);
735			assert_eq!(table[2].id, 0);
736		}
737		fix_all_fingers(&mut s6).await;
738		{
739			let table = s6.finger_table.read().unwrap();
740			assert_eq!(s6.get_successor().id, 0);
741			assert_eq!(table[1].id, 0);
742			// different from figure 6 because of different NUM_BITS
743			assert_eq!(table[2].id, 0);
744		}
745
746		m0.stop().await?;
747		m1.stop().await?;
748		m3.stop().await?;
749		m6.stop().await?;
750		Ok(())
751	}
752}