1use crate::error::{MetricsError, Result};
10use serde::{Deserialize, Serialize};
11use std::collections::{BTreeMap, HashMap, HashSet};
12use std::hash::{Hash, Hasher};
13use std::net::SocketAddr;
14use std::sync::{Arc, RwLock};
15use std::time::{Duration, Instant, SystemTime};
16
17pub use super::config::{HashFunction, ShardingConfig, ShardingStrategy};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DataShard {
22 pub id: String,
24 pub range: DataRange,
26 pub primary_node: String,
28 pub replicas: Vec<String>,
30 pub size_bytes: u64,
32 pub key_count: usize,
34 pub last_access: SystemTime,
36 pub status: ShardStatus,
38 pub migration: Option<ShardMigration>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44pub enum ShardStatus {
45 Active,
47 Migrating,
49 Splitting,
51 Merging,
53 Inactive,
55 Error(String),
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub enum DataRange {
62 Hash { start: u64, end: u64 },
64 Key { start: String, end: String },
66 Numeric { start: f64, end: f64 },
68 Time { start: SystemTime, end: SystemTime },
70 Geographic {
72 lat_min: f64,
73 lat_max: f64,
74 lon_min: f64,
75 lon_max: f64,
76 },
77 Custom {
79 range_type: String,
80 range_data: Vec<u8>,
81 },
82}
83
84impl DataRange {
85 pub fn contains_key(&self, key: &str) -> bool {
87 match self {
88 DataRange::Hash { start, end } => {
89 let hash = self.hash_key(key);
90 hash >= *start && hash <= *end
91 }
92 DataRange::Key { start, end } => key >= start.as_str() && key <= end.as_str(),
93 DataRange::Numeric { start, end } => {
94 if let Ok(num) = key.parse::<f64>() {
95 num >= *start && num <= *end
96 } else {
97 false
98 }
99 }
100 DataRange::Time { start, end } => {
101 if let Ok(timestamp_str) = key.parse::<u64>() {
103 if let Some(timestamp) =
104 SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(timestamp_str))
105 {
106 timestamp >= *start && timestamp <= *end
107 } else {
108 false
109 }
110 } else {
111 false
112 }
113 }
114 DataRange::Geographic { .. } => {
115 false
118 }
119 DataRange::Custom { .. } => {
120 false
122 }
123 }
124 }
125
126 fn hash_key(&self, key: &str) -> u64 {
128 use std::collections::hash_map::DefaultHasher;
129 let mut hasher = DefaultHasher::new();
130 key.hash(&mut hasher);
131 hasher.finish()
132 }
133
134 pub fn overlaps_with(&self, other: &DataRange) -> bool {
136 match (self, other) {
137 (DataRange::Hash { start: s1, end: e1 }, DataRange::Hash { start: s2, end: e2 }) => {
138 s1 <= e2 && s2 <= e1
139 }
140 (DataRange::Key { start: s1, end: e1 }, DataRange::Key { start: s2, end: e2 }) => {
141 s1 <= e2 && s2 <= e1
142 }
143 (
144 DataRange::Numeric { start: s1, end: e1 },
145 DataRange::Numeric { start: s2, end: e2 },
146 ) => s1 <= e2 && s2 <= e1,
147 (DataRange::Time { start: s1, end: e1 }, DataRange::Time { start: s2, end: e2 }) => {
148 s1 <= e2 && s2 <= e1
149 }
150 _ => false, }
152 }
153
154 pub fn split(&self) -> Result<(DataRange, DataRange)> {
156 match self {
157 DataRange::Hash { start, end } => {
158 let mid = start + (end - start) / 2;
159 Ok((
160 DataRange::Hash {
161 start: *start,
162 end: mid,
163 },
164 DataRange::Hash {
165 start: mid + 1,
166 end: *end,
167 },
168 ))
169 }
170 DataRange::Key { start, end } => {
171 let mid = format!("{}_{}", start, end);
173 Ok((
174 DataRange::Key {
175 start: start.clone(),
176 end: mid.clone(),
177 },
178 DataRange::Key {
179 start: mid,
180 end: end.clone(),
181 },
182 ))
183 }
184 DataRange::Numeric { start, end } => {
185 let mid = start + (end - start) / 2.0;
186 Ok((
187 DataRange::Numeric {
188 start: *start,
189 end: mid,
190 },
191 DataRange::Numeric {
192 start: mid,
193 end: *end,
194 },
195 ))
196 }
197 DataRange::Time { start, end } => {
198 let duration = end
199 .duration_since(*start)
200 .map_err(|_| MetricsError::ShardingError("Invalid time range".to_string()))?;
201 let mid_duration = duration / 2;
202 let mid = *start + mid_duration;
203 Ok((
204 DataRange::Time {
205 start: *start,
206 end: mid,
207 },
208 DataRange::Time {
209 start: mid,
210 end: *end,
211 },
212 ))
213 }
214 _ => Err(MetricsError::ShardingError(
215 "Cannot split this range type".to_string(),
216 )),
217 }
218 }
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct ShardMigration {
224 pub id: String,
226 pub source_node: String,
228 pub target_node: String,
230 pub progress: f64,
232 pub started_at: SystemTime,
234 pub estimated_completion: Option<SystemTime>,
236 pub status: MigrationStatus,
238 pub error: Option<String>,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
244pub enum MigrationStatus {
245 Planned,
247 InProgress,
249 Completed,
251 Failed,
253 Cancelled,
255}
256
257#[derive(Debug)]
259pub struct ShardManager {
260 config: ShardingConfig,
262 shards: Arc<RwLock<HashMap<String, DataShard>>>,
264 node_assignments: Arc<RwLock<HashMap<String, Vec<String>>>>,
266 hash_ring: Arc<RwLock<BTreeMap<u64, String>>>,
268 migrations: Arc<RwLock<HashMap<String, ShardMigration>>>,
270 stats: ShardingStats,
272}
273
274impl ShardManager {
275 pub fn new(config: ShardingConfig) -> Self {
277 Self {
278 config,
279 shards: Arc::new(RwLock::new(HashMap::new())),
280 node_assignments: Arc::new(RwLock::new(HashMap::new())),
281 hash_ring: Arc::new(RwLock::new(BTreeMap::new())),
282 migrations: Arc::new(RwLock::new(HashMap::new())),
283 stats: ShardingStats::default(),
284 }
285 }
286
287 pub fn initialize(&mut self, nodes: Vec<String>) -> Result<()> {
289 match self.config.strategy {
290 ShardingStrategy::ConsistentHash => {
291 self.initialize_consistent_hash(nodes)?;
292 }
293 ShardingStrategy::Hash => {
294 self.initialize_hash_sharding(nodes)?;
295 }
296 ShardingStrategy::Range => {
297 self.initialize_range_sharding(nodes)?;
298 }
299 _ => {
300 return Err(MetricsError::ShardingError(
301 "Sharding strategy not implemented".to_string(),
302 ));
303 }
304 }
305
306 Ok(())
307 }
308
309 fn initialize_consistent_hash(&mut self, nodes: Vec<String>) -> Result<()> {
311 let mut hash_ring = self.hash_ring.write().expect("Operation failed");
312 let mut shards = self.shards.write().expect("Operation failed");
313
314 hash_ring.clear();
315 shards.clear();
316
317 for node in &nodes {
319 for i in 0..self.config.virtual_nodes {
320 let virtual_node_key = format!("{}:{}", node, i);
321 let hash = self.hash_string(&virtual_node_key);
322 hash_ring.insert(hash, node.clone());
323 }
324 }
325
326 let mut prev_hash = 0u64;
328 let ring_keys: Vec<u64> = hash_ring.keys().cloned().collect();
329
330 for (i, &hash) in ring_keys.iter().enumerate() {
331 let shard_id = format!("shard_{}", i);
332 let node = hash_ring.get(&hash).expect("Operation failed").clone();
333
334 let shard = DataShard {
335 id: shard_id.clone(),
336 range: DataRange::Hash {
337 start: prev_hash,
338 end: hash,
339 },
340 primary_node: node.clone(),
341 replicas: self.select_replicas(&node, &nodes),
342 size_bytes: 0,
343 key_count: 0,
344 last_access: SystemTime::now(),
345 status: ShardStatus::Active,
346 migration: None,
347 };
348
349 shards.insert(shard_id, shard);
350 prev_hash = hash + 1;
351 }
352
353 Ok(())
354 }
355
356 fn initialize_hash_sharding(&mut self, nodes: Vec<String>) -> Result<()> {
358 let mut shards = self.shards.write().expect("Operation failed");
359 shards.clear();
360
361 let hash_range_size = u64::MAX / self.config.shard_count as u64;
362
363 for i in 0..self.config.shard_count {
364 let shard_id = format!("shard_{}", i);
365 let start_hash = i as u64 * hash_range_size;
366 let end_hash = if i == self.config.shard_count - 1 {
367 u64::MAX
368 } else {
369 (i + 1) as u64 * hash_range_size - 1
370 };
371
372 let node = &nodes[i % nodes.len()];
373
374 let shard = DataShard {
375 id: shard_id.clone(),
376 range: DataRange::Hash {
377 start: start_hash,
378 end: end_hash,
379 },
380 primary_node: node.clone(),
381 replicas: self.select_replicas(node, &nodes),
382 size_bytes: 0,
383 key_count: 0,
384 last_access: SystemTime::now(),
385 status: ShardStatus::Active,
386 migration: None,
387 };
388
389 shards.insert(shard_id, shard);
390 }
391
392 Ok(())
393 }
394
395 fn initialize_range_sharding(&mut self, nodes: Vec<String>) -> Result<()> {
397 let mut shards = self.shards.write().expect("Operation failed");
398 shards.clear();
399
400 for i in 0..self.config.shard_count {
403 let shard_id = format!("shard_{}", i);
404 let start_key = format!("{:04}", i * 1000);
405 let end_key = format!("{:04}", (i + 1) * 1000 - 1);
406
407 let node = &nodes[i % nodes.len()];
408
409 let shard = DataShard {
410 id: shard_id.clone(),
411 range: DataRange::Key {
412 start: start_key,
413 end: end_key,
414 },
415 primary_node: node.clone(),
416 replicas: self.select_replicas(node, &nodes),
417 size_bytes: 0,
418 key_count: 0,
419 last_access: SystemTime::now(),
420 status: ShardStatus::Active,
421 migration: None,
422 };
423
424 shards.insert(shard_id, shard);
425 }
426
427 Ok(())
428 }
429
430 fn select_replicas(&self, primary: &str, all_nodes: &[String]) -> Vec<String> {
432 let mut replicas = Vec::new();
433 let mut count = 0;
434
435 for node in all_nodes {
436 if node != primary && count < self.config.replication_factor - 1 {
437 replicas.push(node.clone());
438 count += 1;
439 }
440 }
441
442 replicas
443 }
444
445 pub fn find_shard(&self, key: &str) -> Result<String> {
447 let shards = self.shards.read().expect("Operation failed");
448
449 for shard in shards.values() {
450 if shard.range.contains_key(key) {
451 return Ok(shard.id.clone());
452 }
453 }
454
455 Err(MetricsError::ShardingError(
456 "No shard found for key".to_string(),
457 ))
458 }
459
460 pub fn get_node_for_key(&self, key: &str) -> Result<String> {
462 match self.config.strategy {
463 ShardingStrategy::ConsistentHash => self.get_node_consistent_hash(key),
464 _ => {
465 let shard_id = self.find_shard(key)?;
466 let shards = self.shards.read().expect("Operation failed");
467 if let Some(shard) = shards.get(&shard_id) {
468 Ok(shard.primary_node.clone())
469 } else {
470 Err(MetricsError::ShardingError("Shard not found".to_string()))
471 }
472 }
473 }
474 }
475
476 fn get_node_consistent_hash(&self, key: &str) -> Result<String> {
478 let hash_ring = self.hash_ring.read().expect("Operation failed");
479 if hash_ring.is_empty() {
480 return Err(MetricsError::ShardingError(
481 "Hash ring is empty".to_string(),
482 ));
483 }
484
485 let key_hash = self.hash_string(key);
486
487 for (&node_hash, node) in hash_ring.range(key_hash..) {
489 if node_hash >= key_hash {
490 return Ok(node.clone());
491 }
492 }
493
494 if let Some((_, node)) = hash_ring.iter().next() {
496 Ok(node.clone())
497 } else {
498 Err(MetricsError::ShardingError(
499 "No nodes in hash ring".to_string(),
500 ))
501 }
502 }
503
504 fn hash_string(&self, s: &str) -> u64 {
506 match self.config.hash_function {
507 HashFunction::Murmur3 | HashFunction::XxHash => {
508 use std::collections::hash_map::DefaultHasher;
510 let mut hasher = DefaultHasher::new();
511 s.hash(&mut hasher);
512 hasher.finish()
513 }
514 HashFunction::Crc32 => {
515 let mut crc = 0xFFFFFFFFu32;
517 for byte in s.bytes() {
518 crc ^= byte as u32;
519 for _ in 0..8 {
520 if crc & 1 != 0 {
521 crc = (crc >> 1) ^ 0xEDB88320;
522 } else {
523 crc >>= 1;
524 }
525 }
526 }
527 (crc ^ 0xFFFFFFFF) as u64
528 }
529 _ => {
530 use std::collections::hash_map::DefaultHasher;
532 let mut hasher = DefaultHasher::new();
533 s.hash(&mut hasher);
534 hasher.finish()
535 }
536 }
537 }
538
539 pub fn add_node(&mut self, node_id: String) -> Result<()> {
541 match self.config.strategy {
542 ShardingStrategy::ConsistentHash => self.add_node_consistent_hash(node_id),
543 _ => {
544 self.rebalance_shards_with_new_node(node_id)
546 }
547 }
548 }
549
550 fn add_node_consistent_hash(&mut self, node_id: String) -> Result<()> {
552 {
553 let mut hash_ring = self.hash_ring.write().expect("Operation failed");
554
555 for i in 0..self.config.virtual_nodes {
557 let virtual_node_key = format!("{}:{}", node_id, i);
558 let hash = self.hash_string(&virtual_node_key);
559 hash_ring.insert(hash, node_id.clone());
560 }
561 } self.trigger_rebalancing()?;
565
566 Ok(())
567 }
568
569 pub fn remove_node(&mut self, node_id: &str) -> Result<()> {
571 match self.config.strategy {
572 ShardingStrategy::ConsistentHash => self.remove_node_consistent_hash(node_id),
573 _ => self.migrate_shards_from_node(node_id),
574 }
575 }
576
577 fn remove_node_consistent_hash(&mut self, node_id: &str) -> Result<()> {
579 {
580 let mut hash_ring = self.hash_ring.write().expect("Operation failed");
581
582 hash_ring.retain(|_, node| node != node_id);
584 } self.migrate_shards_from_node(node_id)?;
588
589 Ok(())
590 }
591
592 fn rebalance_shards_with_new_node(&mut self, _node_id: String) -> Result<()> {
594 self.trigger_rebalancing()
596 }
597
598 fn migrate_shards_from_node(&mut self, node_id: &str) -> Result<()> {
600 let shards = self.shards.read().expect("Operation failed");
601 let affected_shards: Vec<_> = shards
602 .values()
603 .filter(|shard| shard.primary_node == node_id)
604 .map(|shard| shard.id.clone())
605 .collect();
606 drop(shards);
607
608 for shard_id in affected_shards {
609 self.migrate_shard(&shard_id, None)?;
610 }
611
612 Ok(())
613 }
614
615 fn trigger_rebalancing(&mut self) -> Result<()> {
617 Ok(())
621 }
622
623 pub fn migrate_shard(&mut self, shard_id: &str, target_node: Option<String>) -> Result<String> {
625 let migration_id = {
626 let mut shards = self.shards.write().expect("Operation failed");
627 let mut migrations = self.migrations.write().expect("Operation failed");
628
629 let shard = shards
630 .get_mut(shard_id)
631 .ok_or_else(|| MetricsError::ShardingError("Shard not found".to_string()))?;
632
633 if shard.status == ShardStatus::Migrating {
634 return Err(MetricsError::ShardingError(
635 "Shard is already being migrated".to_string(),
636 ));
637 }
638
639 let target = target_node.unwrap_or_else(|| {
641 shard
643 .replicas
644 .first()
645 .cloned()
646 .unwrap_or_else(|| "default_node".to_string())
647 });
648
649 let migration_id = format!(
650 "migration_{}_{}",
651 shard_id,
652 SystemTime::now()
653 .duration_since(std::time::UNIX_EPOCH)
654 .expect("Operation failed")
655 .as_millis()
656 );
657
658 let migration = ShardMigration {
659 id: migration_id.clone(),
660 source_node: shard.primary_node.clone(),
661 target_node: target.clone(),
662 progress: 0.0,
663 started_at: SystemTime::now(),
664 estimated_completion: None,
665 status: MigrationStatus::Planned,
666 error: None,
667 };
668
669 shard.status = ShardStatus::Migrating;
670 shard.migration = Some(migration.clone());
671 migrations.insert(migration_id.clone(), migration);
672
673 migration_id.clone()
674 }; self.start_migration(&migration_id)?;
678
679 Ok(migration_id)
680 }
681
682 fn start_migration(&mut self, migration_id: &str) -> Result<()> {
684 let mut migrations = self.migrations.write().expect("Operation failed");
685
686 if let Some(migration) = migrations.get_mut(migration_id) {
687 migration.status = MigrationStatus::InProgress;
688 }
691
692 Ok(())
693 }
694
695 pub fn complete_migration(&mut self, migration_id: &str) -> Result<()> {
697 let mut migrations = self.migrations.write().expect("Operation failed");
698 let mut shards = self.shards.write().expect("Operation failed");
699
700 let migration = migrations
701 .get_mut(migration_id)
702 .ok_or_else(|| MetricsError::ShardingError("Migration not found".to_string()))?;
703
704 migration.status = MigrationStatus::Completed;
705 migration.progress = 1.0;
706
707 for shard in shards.values_mut() {
709 if let Some(ref shard_migration) = shard.migration {
710 if shard_migration.id == migration_id {
711 shard.primary_node = migration.target_node.clone();
712 shard.status = ShardStatus::Active;
713 shard.migration = None;
714 break;
715 }
716 }
717 }
718
719 Ok(())
720 }
721
722 pub fn get_stats(&self) -> ShardingStats {
724 let shards = self.shards.read().expect("Operation failed");
725 let migrations = self.migrations.read().expect("Operation failed");
726
727 let total_shards = shards.len();
728 let active_migrations = migrations
729 .values()
730 .filter(|m| m.status == MigrationStatus::InProgress)
731 .count();
732
733 let total_size: u64 = shards.values().map(|s| s.size_bytes).sum();
734 let total_keys: usize = shards.values().map(|s| s.key_count).sum();
735
736 ShardingStats {
737 total_shards,
738 active_migrations,
739 total_size_bytes: total_size,
740 total_keys,
741 replication_factor: self.config.replication_factor,
742 last_rebalance: SystemTime::now(), }
744 }
745
746 pub fn list_shards(&self) -> Vec<DataShard> {
748 let shards = self.shards.read().expect("Operation failed");
749 shards.values().cloned().collect()
750 }
751
752 pub fn get_shard(&self, shard_id: &str) -> Option<DataShard> {
754 let shards = self.shards.read().expect("Operation failed");
755 shards.get(shard_id).cloned()
756 }
757
758 pub fn update_shard_stats(
760 &mut self,
761 shard_id: &str,
762 size_bytes: u64,
763 key_count: usize,
764 ) -> Result<()> {
765 let mut shards = self.shards.write().expect("Operation failed");
766
767 if let Some(shard) = shards.get_mut(shard_id) {
768 shard.size_bytes = size_bytes;
769 shard.key_count = key_count;
770 shard.last_access = SystemTime::now();
771 Ok(())
772 } else {
773 Err(MetricsError::ShardingError("Shard not found".to_string()))
774 }
775 }
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
780pub struct ShardingStats {
781 pub total_shards: usize,
783 pub active_migrations: usize,
785 pub total_size_bytes: u64,
787 pub total_keys: usize,
789 pub replication_factor: usize,
791 pub last_rebalance: SystemTime,
793}
794
795impl Default for ShardingStats {
796 fn default() -> Self {
797 Self {
798 total_shards: 0,
799 active_migrations: 0,
800 total_size_bytes: 0,
801 total_keys: 0,
802 replication_factor: 1,
803 last_rebalance: SystemTime::now(),
804 }
805 }
806}
807
808#[cfg(test)]
809mod tests {
810 use super::*;
811
812 #[test]
813 fn test_data_range_contains_key() {
814 let hash_range = DataRange::Hash {
815 start: 1000,
816 end: 2000,
817 };
818 assert!(hash_range.contains_key("test") || !hash_range.contains_key("test"));
820
821 let key_range = DataRange::Key {
822 start: "a".to_string(),
823 end: "z".to_string(),
824 };
825 assert!(key_range.contains_key("m"));
826 assert!(!key_range.contains_key("z1"));
827
828 let numeric_range = DataRange::Numeric {
829 start: 10.0,
830 end: 20.0,
831 };
832 assert!(numeric_range.contains_key("15"));
833 assert!(!numeric_range.contains_key("25"));
834 }
835
836 #[test]
837 fn test_data_range_split() {
838 let hash_range = DataRange::Hash {
839 start: 1000,
840 end: 2000,
841 };
842 let (left, right) = hash_range.split().expect("Operation failed");
843
844 if let (DataRange::Hash { start: s1, end: e1 }, DataRange::Hash { start: s2, end: e2 }) =
845 (left, right)
846 {
847 assert_eq!(s1, 1000);
848 assert_eq!(e2, 2000);
849 assert_eq!(e1 + 1, s2);
850 } else {
851 panic!("Unexpected range types after split");
852 }
853 }
854
855 #[test]
856 fn test_shard_manager_creation() {
857 let config = ShardingConfig::default();
858 let manager = ShardManager::new(config);
859 assert_eq!(manager.list_shards().len(), 0);
860 }
861
862 #[test]
863 fn test_shard_manager_initialization() {
864 let config = ShardingConfig {
865 strategy: ShardingStrategy::Hash,
866 shard_count: 4,
867 replication_factor: 2,
868 hash_function: HashFunction::Murmur3,
869 virtual_nodes: 256,
870 dynamic_resharding: true,
871 migration_threshold: 0.8,
872 };
873
874 let mut manager = ShardManager::new(config);
875 let nodes = vec![
876 "node1".to_string(),
877 "node2".to_string(),
878 "node3".to_string(),
879 ];
880
881 manager.initialize(nodes).expect("Operation failed");
882 assert_eq!(manager.list_shards().len(), 4);
883 }
884
885 #[test]
886 fn test_find_shard() {
887 let config = ShardingConfig {
888 strategy: ShardingStrategy::Hash,
889 shard_count: 2,
890 replication_factor: 1,
891 hash_function: HashFunction::Murmur3,
892 virtual_nodes: 256,
893 dynamic_resharding: true,
894 migration_threshold: 0.8,
895 };
896
897 let mut manager = ShardManager::new(config);
898 let nodes = vec!["node1".to_string(), "node2".to_string()];
899
900 manager.initialize(nodes).expect("Operation failed");
901
902 let shard_id = manager.find_shard("test_key");
904 assert!(shard_id.is_ok());
905 }
906
907 #[test]
908 fn test_shard_migration() {
909 let config = ShardingConfig::default();
910 let mut manager = ShardManager::new(config);
911 let nodes = vec!["node1".to_string(), "node2".to_string()];
912
913 manager.initialize(nodes).expect("Operation failed");
914 let shards = manager.list_shards();
915
916 if let Some(shard) = shards.first() {
917 let migration_id = manager.migrate_shard(&shard.id, Some("node2".to_string()));
918 assert!(migration_id.is_ok());
919 }
920 }
921
922 #[test]
923 fn test_consistent_hash_node_operations() {
924 let config = ShardingConfig {
925 strategy: ShardingStrategy::ConsistentHash,
926 shard_count: 4,
927 replication_factor: 2,
928 hash_function: HashFunction::Murmur3,
929 virtual_nodes: 4, dynamic_resharding: true,
931 migration_threshold: 0.8,
932 };
933
934 let mut manager = ShardManager::new(config);
935 let nodes = vec!["node1".to_string(), "node2".to_string()];
936
937 manager.initialize(nodes).expect("Operation failed");
938
939 manager
941 .add_node("node3".to_string())
942 .expect("Operation failed");
943
944 manager.remove_node("node1").expect("Operation failed");
946 }
947}