1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
use std::collections::HashMap;

use crate::{
    prepare_command,
    resp::{
        cmd, BulkString, CommandArgs, FromSingleValueArray, FromValue, HashMapExt, IntoArgs,
        KeyValueArgOrCollection, SingleArgOrCollection, Value,
    },
    Error, PreparedCommand, Result,
};

/// A group of Redis commands related to [`Cluster Management`](https://redis.io/docs/management/scaling/)
/// # See Also
/// [Redis Cluster Management commands](https://redis.io/commands/?group=cluster)
/// [Redis cluster specification](https://redis.io/docs/reference/cluster-spec/)
pub trait ClusterCommands {
    /// When a cluster client receives an -ASK redirect,
    /// the ASKING command is sent to the target node followed by the command which was redirected.
    /// This is normally done automatically by cluster clients.
    ///
    /// # See Also
    /// [<https://redis.io/commands/asking/>](https://redis.io/commands/asking/)
    #[must_use]
    fn asking(&mut self) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("ASKING"))
    }

    /// This command is useful in order to modify a node's view of the cluster configuration.
    ///
    /// Specifically it assigns a set of hash slots to the node receiving the command.
    /// If the command is successful, the node will map the specified hash slots to itself,
    /// and will start broadcasting the new configuration.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-addslots/>](https://redis.io/commands/cluster-addslots/)
    #[must_use]
    fn cluster_addslots<S>(&mut self, slots: S) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
        S: SingleArgOrCollection<u16>,
    {
        prepare_command(self, cmd("CLUSTER").arg("ADDSLOTS").arg(slots))
    }

    /// This command is similar to the [`cluster_addslots`](crate::ClusterCommands::cluster_addslots)
    /// command in that they both assign hash slots to nodes.
    ///
    /// The difference between the two commands is that [`cluster_addslots`](crate::ClusterCommands::cluster_addslots)
    /// takes a list of slots to assign to the node, while this command takes a list of slot ranges
    /// (specified by a tuple containing start and end slots) to assign to the node.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-addslotsrange/>](https://redis.io/commands/cluster-addslotsrange/)
    #[must_use]
    fn cluster_addslotsrange<S>(&mut self, slots: S) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
        S: KeyValueArgOrCollection<u16, u16>,
    {
        prepare_command(self, cmd("CLUSTER").arg("ADDSLOTSRANGE").arg(slots))
    }

    /// Advances the cluster config epoch.
    ///
    /// # Return
    /// * `Bumped` if the epoch was incremented, or
    /// * `Still` if the node already has the greatest config epoch in the cluster.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-bumpepoch/>](https://redis.io/commands/cluster-bumpepoch/)
    #[must_use]
    fn cluster_bumpepoch(&mut self) -> PreparedCommand<Self, ClusterBumpEpochResult>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("CLUSTER").arg("BUMPEPOCH"))
    }

    /// The command returns the number of failure reports for the specified node.
    ///
    /// # Return
    /// The number of active failure reports for the node.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-count-failure-reports/>](https://redis.io/commands/cluster-count-failure-reports/)
    #[must_use]
    fn cluster_count_failure_reports<I>(&mut self, node_id: I) -> PreparedCommand<Self, usize>
    where
        Self: Sized,
        I: Into<BulkString>,
    {
        prepare_command(
            self,
            cmd("CLUSTER").arg("COUNT-FAILURE-REPORTS").arg(node_id),
        )
    }

    /// Returns the number of keys in the specified Redis Cluster hash slot.
    ///
    /// # Return
    /// The number of keys in the specified hash slot, or an error if the hash slot is invalid.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-countkeysinslot/>](https://redis.io/commands/cluster-countkeysinslot/)
    #[must_use]
    fn cluster_countkeysinslot(&mut self, slot: usize) -> PreparedCommand<Self, usize>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("CLUSTER").arg("COUNTKEYSINSLOT").arg(slot))
    }

    /// In Redis Cluster, each node keeps track of which master is serving a particular hash slot.
    /// This command asks a particular Redis Cluster node to forget which master
    ///  is serving the hash slots specified as arguments.

    /// # See Also
    /// [<https://redis.io/commands/cluster-delslots/>](https://redis.io/commands/cluster-delslots/)
    #[must_use]
    fn cluster_delslots<S>(&mut self, slots: S) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
        S: SingleArgOrCollection<u16>,
    {
        prepare_command(self, cmd("CLUSTER").arg("DELSLOTS").arg(slots))
    }

    /// This command is similar to the [`cluster_delslotsrange`](crate::ClusterCommands::cluster_delslotsrange)
    ///  command in that they both remove hash slots from the node.
    ///
    /// The difference is that [`cluster_delslotsrange`](crate::ClusterCommands::cluster_delslotsrange)
    ///  takes a list of hash slots to remove from the node,
    /// while this command takes a list of slot ranges (specified by a tuple containing start and end slots) to remove from the node.
    /// # See Also
    /// [<https://redis.io/commands/cluster-delslotsrange/>](https://redis.io/commands/cluster-delslotsrange/)
    #[must_use]
    fn cluster_delslotsrange<S>(&mut self, slots: S) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
        S: KeyValueArgOrCollection<u16, u16>,
    {
        prepare_command(self, cmd("CLUSTER").arg("DELSLOTSRANGE").arg(slots))
    }

    /// This command, that can only be sent to a Redis Cluster replica node,
    /// forces the replica to start a manual failover of its master instance.
    ///
    /// # Errors
    /// An error cann occured if the operation cannot be executed,
    /// for example if we are talking with a node which is already a master.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-failover/>](https://redis.io/commands/cluster-failover/)
    #[must_use]
    fn cluster_failover(&mut self, option: ClusterFailoverOption) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("CLUSTER").arg("FAILOVER").arg(option))
    }

    /// Deletes all slots from a node.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-flushslots/>](https://redis.io/commands/cluster-flushslots/)
    #[must_use]
    fn cluster_flushslots(&mut self) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("CLUSTER").arg("FLUSHSLOTS"))
    }

    /// The command is used in order to remove a node, specified via its node ID,
    /// from the set of known nodes of the Redis Cluster node receiving the command.
    /// In other words the specified node is removed from the nodes table of the node receiving the command.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-forget/>](https://redis.io/commands/cluster-forget/)
    #[must_use]
    fn cluster_forget<I>(&mut self, node_id: I) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
        I: Into<BulkString>,
    {
        prepare_command(self, cmd("CLUSTER").arg("FORGET").arg(node_id))
    }

    /// The command returns an array of keys names stored in
    /// the contacted node and hashing to the specified hash slot.
    ///
    /// The maximum number of keys to return is specified via the count argument,
    /// so that it is possible for the user of this API to batch-processing keys.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-getkeysinslot/>](https://redis.io/commands/cluster-getkeysinslot/)
    #[must_use]
    fn cluster_getkeysinslot(&mut self, slot: u16, count: usize) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(
            self,
            cmd("CLUSTER").arg("GETKEYSINSLOT").arg(slot).arg(count),
        )
    }

    /// This command provides [`info`](crate::ServerCommands::info) style information about Redis Cluster vital parameters.
    ///
    /// # Return
    /// The Cluster information
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-info/>](https://redis.io/commands/cluster-info/)
    #[must_use]
    fn cluster_info(&mut self, slot: u16, count: usize) -> PreparedCommand<Self, ClusterInfo>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("CLUSTER").arg("INFO").arg(slot).arg(count))
    }

    /// Returns an integer identifying the hash slot the specified key hashes to.
    ///
    /// # Return
    /// The hash slot number.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-keyslot/>](https://redis.io/commands/cluster-keyslot/)
    #[must_use]
    fn cluster_keyslot<K>(&mut self, key: K) -> PreparedCommand<Self, u16>
    where
        Self: Sized,
        K: Into<BulkString>,
    {
        prepare_command(self, cmd("CLUSTER").arg("KEYSLOT").arg(key))
    }

    /// Each node in a Redis Cluster maintains a pair of long-lived TCP link with each peer in the cluster:
    /// - One for sending outbound messages towards the peer
    /// - and one for receiving inbound messages from the peer.
    ///
    /// This command outputs information of all such peer links as an array,
    /// where each array element is a struct that contains attributes and their values for an individual link.
    ///
    /// # Return
    /// An array of structs where each struct contains various attributes and their values of a cluster link.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-links/>](https://redis.io/commands/cluster-links/)
    #[must_use]
    fn cluster_links<I>(&mut self) -> PreparedCommand<Self, Vec<I>>
    where
        Self: Sized,
        I: FromSingleValueArray<ClusterLinkInfo>,
    {
        prepare_command(self, cmd("CLUSTER").arg("LINKS"))
    }

    /// This command is used in order to connect different Redis nodes with cluster support enabled, into a working cluster.
    ///
    /// # Return
    /// An array of structs where each struct contains various attributes and their values of a cluster link.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-meet/>](https://redis.io/commands/cluster-meet/)
    #[must_use]
    fn cluster_meet<IP>(
        &mut self,
        ip: IP,
        port: u16,
        cluster_bus_port: Option<u16>,
    ) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
        IP: Into<BulkString>,
    {
        prepare_command(
            self,
            cmd("CLUSTER")
                .arg("MEET")
                .arg(ip)
                .arg(port)
                .arg(cluster_bus_port),
        )
    }

    /// This command returns the unique, auto-generated identifier that is associated with the connected cluster node.
    ///
    /// # Return
    ///  The node id.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-myid/>](https://redis.io/commands/cluster-myid/)
    #[must_use]
    fn cluster_myid<N>(&mut self) -> PreparedCommand<Self, N>
    where
        Self: Sized,
        N: FromValue,
    {
        prepare_command(self, cmd("CLUSTER").arg("MYID"))
    }

    /// Each node in a Redis Cluster has its view of the current cluster configuration,
    /// given by the set of known nodes, the state of the connection we have with such nodes,
    /// their flags, properties and assigned slots, and so forth.
    ///
    /// This command provides all this information, that is, the current cluster configuration of the node we are contacting,
    /// in a serialization format which happens to be exactly the same as the one used by Redis Cluster itself
    /// in order to store on disk the cluster state (however the on disk cluster state has a few additional info appended at the end).
    ///
    /// # Return
    /// The serialized cluster configuration.
    /// The output of the command is just a space-separated CSV string, where each line represents a node in the cluster.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-nodes/>](https://redis.io/commands/cluster-nodes/)
    #[must_use]
    fn cluster_nodes<R>(&mut self) -> PreparedCommand<Self, R>
    where
        Self: Sized,
        R: FromValue,
    {
        prepare_command(self, cmd("CLUSTER").arg("NODES"))
    }

    /// The command provides a list of replica nodes replicating from the specified master node.
    ///
    /// # Return
    /// The command returns data in the same format as [`cluster_nodes`](crate::ClusterCommands::cluster_nodes).
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-replicas/>](https://redis.io/commands/cluster-replicas/)
    #[must_use]
    fn cluster_replicas<I, R>(&mut self, node_id: I) -> PreparedCommand<Self, R>
    where
        Self: Sized,
        I: Into<BulkString>,
        R: FromValue,
    {
        prepare_command(self, cmd("CLUSTER").arg("REPLICAS").arg(node_id))
    }

    /// The command reconfigures a node as a replica of the specified master.
    /// If the node receiving the command is an empty master, as a side effect of the command, the node role is changed from master to replica.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-replicate/>](https://redis.io/commands/cluster-replicate/)
    #[must_use]
    fn cluster_replicate<I>(&mut self, node_id: I) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
        I: Into<BulkString>,
    {
        prepare_command(self, cmd("CLUSTER").arg("REPLICATE").arg(node_id))
    }

    /// Reset a Redis Cluster node, in a more or less drastic way depending on the reset type, that can be hard or soft.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-reset/>](https://redis.io/commands/cluster-reset/)
    #[must_use]
    fn cluster_reset(&mut self, reset_type: ClusterResetType) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("CLUSTER").arg("RESET").arg(reset_type))
    }

    /// Forces a node to save the nodes.conf configuration on disk.
    /// Before to return the command calls `fsync(2)` in order to make sure the configuration is flushed on the computer disk.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-saveconfig/>](https://redis.io/commands/cluster-saveconfig/)
    #[must_use]
    fn cluster_saveconfig(&mut self) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("CLUSTER").arg("SAVECONFIG"))
    }

    /// This command sets a specific config epoch in a fresh node.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-set-config-epoch/>](https://redis.io/commands/cluster-set-config-epoch/)
    #[must_use]
    fn cluster_set_config_epoch(&mut self, config_epoch: u64) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(
            self,
            cmd("CLUSTER").arg("SET-CONFIG-EPOCH").arg(config_epoch),
        )
    }

    /// This command is responsible of changing the state of a hash slot in the receiving node in different ways.
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-setslot/>](https://redis.io/commands/cluster-setslot/)
    #[must_use]
    fn cluster_setslot(
        &mut self,
        slot: u16,
        subcommand: ClusterSetSlotSubCommand,
    ) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(
            self,
            cmd("CLUSTER").arg("SETSLOT").arg(slot).arg(subcommand),
        )
    }

    /// This command returns details about the shards of the cluster.
    ///
    /// # Return
    /// A list of shard information for each shard (slot ranges & shard nodes)
    ///
    /// # See Also
    /// [<https://redis.io/commands/cluster-shards/>](https://redis.io/commands/cluster-shards/)
    #[must_use]
    fn cluster_shards<S>(&mut self) -> PreparedCommand<Self, S>
    where
        Self: Sized,
        S: FromSingleValueArray<ClusterShardResult>
    {
        prepare_command(self, cmd("CLUSTER").arg("SHARDS"))
    }

    /// Enables read queries for a connection to a Redis Cluster replica node.
    ///
    /// # See Also
    /// [<https://redis.io/commands/readonly/>](https://redis.io/commands/readonly/)
    #[must_use]
    fn readonly(&mut self) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("READONLY"))
    }

    /// Disables read queries for a connection to a Redis Cluster replica node.
    ///
    /// # See Also
    /// [<https://redis.io/commands/readwrite/>](https://redis.io/commands/readwrite/)
    #[must_use]
    fn readwrite(&mut self) -> PreparedCommand<Self, ()>
    where
        Self: Sized,
    {
        prepare_command(self, cmd("READWRITE"))
    }
}

/// Result for the [`cluster_bumpepoch`](crate::ClusterCommands::cluster_bumpepoch) command
pub enum ClusterBumpEpochResult {
    /// if the epoch was incremented
    Bumped,
    /// if the node already has the greatest config epoch in the cluster.
    Still,
}

impl FromValue for ClusterBumpEpochResult {
    fn from_value(value: Value) -> Result<Self> {
        let result: String = value.into()?;
        match result.as_str() {
            "BUMPED" => Ok(Self::Bumped),
            "STILL" => Ok(Self::Still),
            _ => Err(Error::Client(
                "Unexpected result for command 'CLUSTER BUMPEPOCH'".to_owned(),
            )),
        }
    }
}

/// Options for the [`cluster_failover`](crate::ClusterCommands::cluster_failover) command
pub enum ClusterFailoverOption {
    /// No option
    Default,
    /// FORCE option: manual failover when the master is down
    Force,
    /// TAKEOVER option: manual failover without cluster consensus
    Takeover,
}

impl Default for ClusterFailoverOption {
    fn default() -> Self {
        Self::Default
    }
}

impl IntoArgs for ClusterFailoverOption {
    fn into_args(self, args: CommandArgs) -> CommandArgs {
        match self {
            ClusterFailoverOption::Default => args,
            ClusterFailoverOption::Force => args.arg("FORCE"),
            ClusterFailoverOption::Takeover => args.arg("TAKEOVER"),
        }
    }
}

/// Cluster state used in the `cluster_state`field of [crate::ClusterInfo]
pub enum ClusterState {
    /// State is `ok` if the node is able to receive queries.
    Ok,
    /// `fail` if there is at least one hash slot which is unbound (no node associated),
    /// in error state (node serving it is flagged with FAIL flag),
    /// or if the majority of masters can't be reached by this node.
    Fail,
}

impl FromValue for ClusterState {
    fn from_value(value: Value) -> Result<Self> {
        match value.into::<String>()?.as_str() {
            "ok" => Ok(ClusterState::Ok),
            "fail" => Ok(ClusterState::Fail),
            _ => Err(Error::Client("Unexpected ClusterState result".to_owned())),
        }
    }
}

/// Result for the [`cluster_info`](crate::ClusterCommands::cluster_info) command
pub struct ClusterInfo {
    /// State is ok if the node is able to receive queries.
    /// fail if there is at least one hash slot which is unbound (no node associated),
    /// in error state (node serving it is flagged with FAIL flag),
    /// or if the majority of masters can't be reached by this node.
    pub cluster_state: ClusterState,

    /// Number of slots which are associated to some node (not unbound).
    /// This number should be 16384 for the node to work properly,
    /// which means that each hash slot should be mapped to a node.
    pub cluster_slots_assigned: usize,

    /// Number of hash slots mapping to a node not in FAIL or PFAIL state.
    pub cluster_slots_ok: usize,

    /// Number of hash slots mapping to a node in PFAIL state.
    /// Note that those hash slots still work correctly,
    /// as long as the PFAIL state is not promoted to FAIL by the failure detection algorithm.
    /// PFAIL only means that we are currently not able to talk with the node,
    /// but may be just a transient error.
    pub cluster_slots_pfail: usize,

    /// Number of hash slots mapping to a node in FAIL state.
    /// If this number is not zero the node is not able to serve queries
    /// unless cluster-require-full-coverage is set to no in the configuration.
    pub cluster_slots_fail: usize,

    /// The total number of known nodes in the cluster,
    /// including nodes in HANDSHAKE state that may not currently be proper members of the cluster.
    pub cluster_known_nodes: usize,

    /// The number of master nodes serving at least one hash slot in the cluster.
    pub cluster_size: usize,

    /// The local Current Epoch variable.
    /// This is used in order to create unique increasing version numbers during fail overs.
    pub cluster_current_epoch: usize,

    /// The Config Epoch of the node we are talking with.
    /// This is the current configuration version assigned to this node.
    pub cluster_my_epoch: u64,

    /// Number of messages sent via the cluster node-to-node binary bus.
    pub cluster_stats_messages_sent: usize,

    /// Number of messages received via the cluster node-to-node binary bus.
    pub cluster_stats_messages_received: usize,

    /// Accumulated count of cluster links freed due to exceeding the `cluster-link-sendbuf-limit` configuration.
    pub total_cluster_links_buffer_limit_exceeded: usize,

    /// Cluster bus PING sent (not to be confused with the client command [`ping`](crate::ConnectionCommands::ping)).
    pub cluster_stats_messages_ping_sent: usize,

    /// Cluster bus PING received (not to be confused with the client command [`ping`](crate::ConnectionCommands::ping)).
    pub cluster_stats_messages_ping_received: usize,

    /// PONG sent (reply to PING).
    pub cluster_stats_messages_pong_sent: usize,

    /// PONG received (reply to PING).
    pub cluster_stats_messages_pong_received: usize,

    /// Handshake message sent to a new node, either through gossip or [`cluster_meet`](crate::ClusterCommands::cluster_meet).
    pub cluster_stats_messages_meet_sent: usize,

    /// Handshake message sent to a new node, either through gossip or [`cluster_meet`](crate::ClusterCommands::cluster_meet).
    pub cluster_stats_messages_meet_received: usize,

    /// Mark node xxx as failing.
    pub cluster_stats_messages_fail_sent: usize,

    /// Mark node xxx as failing.    
    pub cluster_stats_messages_fail_received: usize,

    /// Pub/Sub Publish propagation, see [`Pubsub`](https://redis.io/topics/pubsub#pubsub).  
    pub cluster_stats_messages_publish_sent: usize,

    /// Pub/Sub Publish propagation, see [`Pubsub`](https://redis.io/topics/pubsub#pubsub).  
    pub cluster_stats_messages_publish_received: usize,

    /// Replica initiated leader election to replace its master.
    pub cluster_stats_messages_auth_req_sent: usize,

    /// Replica initiated leader election to replace its master.
    pub cluster_stats_messages_auth_req_received: usize,

    /// Message indicating a vote during leader election.
    pub cluster_stats_messages_auth_ack_sent: usize,

    /// Message indicating a vote during leader election.
    pub cluster_stats_messages_auth_ack_received: usize,

    /// Another node slots configuration.
    pub cluster_stats_messages_update_sent: usize,

    /// Another node slots configuration.
    pub cluster_stats_messages_update_received: usize,

    /// Pause clients for manual failover.
    pub cluster_stats_messages_mfstart_sent: usize,

    /// Pause clients for manual failover.
    pub cluster_stats_messages_mfstart_received: usize,

    /// Module cluster API message.
    pub cluster_stats_messages_module_sent: usize,

    /// Module cluster API message.
    pub cluster_stats_messages_module_received: usize,

    /// Pub/Sub Publish shard propagation, see [`Sharded Pubsub`](https://redis.io/topics/pubsub#sharded-pubsub).
    pub cluster_stats_messages_publishshard_sent: usize,

    /// Pub/Sub Publish shard propagation, see [`Sharded Pubsub`](https://redis.io/topics/pubsub#sharded-pubsub).
    pub cluster_stats_messages_publishshard_received: usize,
}

impl FromValue for ClusterInfo {
    fn from_value(value: Value) -> Result<Self> {
        let lines: String = value.into()?;
        let mut values = lines
            .split("\r\n")
            .filter(|line| line.is_empty() || line.starts_with('#'))
            .map(|line| {
                let mut parts = line.split(':');
                match (parts.next(), parts.next(), parts.next()) {
                    (Some(key), Some(value), None) => Ok((
                        key.to_owned(),
                        Value::BulkString(BulkString::Binary(value.as_bytes().to_vec())),
                    )),
                    _ => Err(Error::Client(
                        "Unexpected result for cluster_info".to_owned(),
                    )),
                }
            })
            .collect::<Result<HashMap<String, Value>>>()?;

        Ok(Self {
            cluster_state: values.remove_or_default("cluster_state").into()?,
            cluster_slots_assigned: values.remove_or_default("cluster_slots_assigned").into()?,
            cluster_slots_ok: values.remove_or_default("cluster_slots_ok").into()?,
            cluster_slots_pfail: values.remove_or_default("cluster_slots_pfail").into()?,
            cluster_slots_fail: values.remove_or_default("cluster_slots_fail").into()?,
            cluster_known_nodes: values.remove_or_default("cluster_known_nodes").into()?,
            cluster_size: values.remove_or_default("cluster_size").into()?,
            cluster_current_epoch: values.remove_or_default("cluster_current_epoch").into()?,
            cluster_my_epoch: values.remove_or_default("cluster_my_epoch").into()?,
            cluster_stats_messages_sent: values
                .remove_or_default("cluster_stats_messages_sent")
                .into()?,
            cluster_stats_messages_received: values
                .remove_or_default("cluster_stats_messages_received")
                .into()?,
            total_cluster_links_buffer_limit_exceeded: values
                .remove_or_default("total_cluster_links_buffer_limit_exceeded")
                .into()?,
            cluster_stats_messages_ping_sent: values
                .remove_or_default("cluster_stats_messages_ping_sent")
                .into()?,
            cluster_stats_messages_ping_received: values
                .remove_or_default("cluster_stats_messages_ping_received")
                .into()?,
            cluster_stats_messages_pong_sent: values
                .remove_or_default("cluster_stats_messages_pong_sent")
                .into()?,
            cluster_stats_messages_pong_received: values
                .remove_or_default("cluster_stats_messages_pong_received")
                .into()?,
            cluster_stats_messages_meet_sent: values
                .remove_or_default("cluster_stats_messages_meet_sent")
                .into()?,
            cluster_stats_messages_meet_received: values
                .remove_or_default("cluster_stats_messages_meet_received")
                .into()?,
            cluster_stats_messages_fail_sent: values
                .remove_or_default("cluster_stats_messages_fail_sent")
                .into()?,
            cluster_stats_messages_fail_received: values
                .remove_or_default("cluster_stats_messages_fail_received")
                .into()?,
            cluster_stats_messages_publish_sent: values
                .remove_or_default("cluster_stats_messages_publish_sent")
                .into()?,
            cluster_stats_messages_publish_received: values
                .remove_or_default("cluster_stats_messages_publish_received")
                .into()?,
            cluster_stats_messages_auth_req_sent: values
                .remove_or_default("cluster_stats_messages_auth-req_sent")
                .into()?,
            cluster_stats_messages_auth_req_received: values
                .remove_or_default("cluster_stats_messages_auth-req_received")
                .into()?,
            cluster_stats_messages_auth_ack_sent: values
                .remove_or_default("cluster_stats_messages_auth-ack_sent")
                .into()?,
            cluster_stats_messages_auth_ack_received: values
                .remove_or_default("cluster_stats_messages_auth-ack_received")
                .into()?,
            cluster_stats_messages_update_sent: values
                .remove_or_default("cluster_stats_messages_update_sent")
                .into()?,
            cluster_stats_messages_update_received: values
                .remove_or_default("cluster_stats_messages_update_received")
                .into()?,
            cluster_stats_messages_mfstart_sent: values
                .remove_or_default("cluster_stats_messages_mfstart_sent")
                .into()?,
            cluster_stats_messages_mfstart_received: values
                .remove_or_default("cluster_stats_messages_mfstart_received")
                .into()?,
            cluster_stats_messages_module_sent: values
                .remove_or_default("cluster_stats_messages_mfstart_received")
                .into()?,
            cluster_stats_messages_module_received: values
                .remove_or_default("cluster_stats_messages_module_received")
                .into()?,
            cluster_stats_messages_publishshard_sent: values
                .remove_or_default("cluster_stats_messages_publishshard_sent")
                .into()?,
            cluster_stats_messages_publishshard_received: values
                .remove_or_default("cluster_stats_messages_publishshard_received")
                .into()?,
        })
    }
}

/// This link is established by the local node to the peer, or accepted by the local node from the peer.
pub enum ClusterLinkDirection {
    To,
    From,
}

impl FromValue for ClusterLinkDirection {
    fn from_value(value: Value) -> Result<Self> {
        match value.into::<String>()?.as_str() {
            "to" => Ok(ClusterLinkDirection::To),
            "from" => Ok(ClusterLinkDirection::From),
            _ => Err(Error::Client(
                "Unexpected ClusterLinkDirection result".to_owned(),
            )),
        }
    }
}

/// Result for the [`cluster_links`](crate::ClusterCommands::cluster_links) command
pub struct ClusterLinkInfo {
    /// This link is established by the local node to the peer,
    /// or accepted by the local node from the peer.
    pub direction: ClusterLinkDirection,
    /// The node id of the peer.
    pub node: String,
    /// Creation time of the link. (In the case of a to link,
    /// this is the time when the TCP link is created by the local node,
    /// not the time when it is actually established.)
    pub create_time: u64,
    /// Events currently registered for the link. `r` means readable event, `w` means writable event.
    pub events: String,
    /// Allocated size of the link's send buffer,
    /// which is used to buffer outgoing messages toward the peer.
    pub send_buffer_allocated: usize,
    /// Size of the portion of the link's send buffer that is currently holding data(messages).
    pub send_buffer_used: usize,
}

impl FromValue for ClusterLinkInfo {
    fn from_value(value: Value) -> Result<Self> {
        let mut values: HashMap<String, Value> = value.into()?;

        Ok(Self {
            direction: values.remove_or_default("direction").into()?,
            node: values.remove_or_default("node").into()?,
            create_time: values.remove_or_default("create-time").into()?,
            events: values.remove_or_default("events").into()?,
            send_buffer_allocated: values.remove_or_default("send-buffer-allocated").into()?,
            send_buffer_used: values.remove_or_default("send-buffer-used").into()?,
        })
    }
}

/// Type of [`cluster reset`](crate::ClusterCommands::cluster_reset)
pub enum ClusterResetType {
    Hard,
    Soft,
}

impl IntoArgs for ClusterResetType {
    fn into_args(self, args: CommandArgs) -> CommandArgs {
        match self {
            ClusterResetType::Hard => args.arg("HARD"),
            ClusterResetType::Soft => args.arg("SOFT"),
        }
    }
}

/// Subcommand for the [`cluster_setslot`](crate::ClusterCommands::cluster_setslot) command.
pub enum ClusterSetSlotSubCommand {
    /// Set a hash slot in importing state.
    Importing { node_id: String },
    /// Set a hash slot in migrating state.
    Migrating { node_id: String },
    /// Bind the hash slot to a different node.
    Node { node_id: String },
    /// Clear any importing / migrating state from hash slot.
    Stable,
}

impl IntoArgs for ClusterSetSlotSubCommand {
    fn into_args(self, args: CommandArgs) -> CommandArgs {
        match self {
            ClusterSetSlotSubCommand::Importing { node_id } => args.arg("IMPORTING").arg(node_id),
            ClusterSetSlotSubCommand::Migrating { node_id } => args.arg("MIGRATING").arg(node_id),
            ClusterSetSlotSubCommand::Node { node_id } => args.arg("NODE").arg(node_id),
            ClusterSetSlotSubCommand::Stable => args.arg("STABLE"),
        }
    }
}

/// Result for the [`cluster_shards`](crate::ClusterCommands::cluster_shards) command.
#[derive(Debug)]
pub struct ClusterShardResult {
    pub slots: Vec<(u16, u16)>,
    pub nodes: Vec<ClusterNodeResult>,
}

impl FromValue for ClusterShardResult {
    fn from_value(value: Value) -> Result<Self> {
        let mut values: HashMap<String, Value> = value.into()?;

        Ok(Self {
            slots: values.remove_with_result("slots")?.into()?,
            nodes: values.remove_with_result("nodes")?.into()?,
        })
    }
}

/// Cluster node result for the [`cluster_shards`](crate::ClusterCommands::cluster_shards) command.
#[derive(Debug)]
pub struct ClusterNodeResult {
    /// The unique node id for this particular node.
    pub id: String,

    /// The preferred endpoint to reach the node
    pub endpoint: String,

    /// The IP address to send requests to for this node.
    pub ip: String,

    /// The TCP (non-TLS) port of the node. At least one of port or tls-port will be present.
    pub port: Option<u16>,

    /// The announced hostname to send requests to for this node.
    pub hostname: Option<String>,

    /// The TLS port of the node. At least one of port or tls-port will be present.
    pub tls_port: Option<u16>,

    /// The replication role of this node.
    pub role: String,

    /// The replication offset of this node.
    /// This information can be used to send commands to the most up to date replicas.
    pub replication_offset: usize,

    /// Either `online`, `failed`, or `loading`.
    /// This information should be used to determine which nodes should be sent traffic.
    /// The loading health state should be used to know that a node is not currently eligible to serve traffic,
    /// but may be eligible in the future.
    pub health: ClusterHealthStatus,
}

impl FromValue for ClusterNodeResult {
    fn from_value(value: Value) -> Result<Self> {
        let mut values: HashMap<String, Value> = value.into()?;

        Ok(Self {
            id: values.remove_with_result("id")?.into()?,
            endpoint: values.remove_with_result("endpoint")?.into()?,
            ip: values.remove_with_result("ip")?.into()?,
            port: values.remove_or_default("port").into()?,
            hostname: values.remove_or_default("hostname").into()?,
            tls_port: values.remove_or_default("tls-port").into()?,
            role: values.remove_with_result("role")?.into()?,
            replication_offset: values.remove_with_result("replication-offset")?.into()?,
            health: values.remove_with_result("health")?.into()?,
        })
    }
}

/// Cluster health status for the [`cluster_shards`](crate::ClusterCommands::cluster_shards) command.
#[derive(Debug)]
pub enum ClusterHealthStatus {
    Online,
    Failed,
    Loading,
}

impl FromValue for ClusterHealthStatus {
    fn from_value(value: Value) -> Result<Self> {
        match value.into::<String>()?.as_str() {
            "online" => Ok(Self::Online),
            "failed" => Ok(Self::Failed),
            "loading" => Ok(Self::Loading),
            _ => Err(Error::Client(
                "Unexpected result for ClusterHealthStatus".to_owned(),
            )),
        }
    }
}