exonum-node 1.0.0

Node of the Exonum blockchain framework.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
// Copyright 2020 The Exonum Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Exonum node that handles consensus algorithm, interaction with other nodes and external clients.
//!
//! # Overview
//!
//! This crate contains the following APIs:
//!
//! - [`Node`] encapsulates a full-fledged Exonum node
//! - [`NodeChannel`] and [`ShutdownHandle`] allow to interact with the node
//!   (mind that `NodeChannel` is relatively low-level)
//! - Configuration types, "rooted" in [`NodeConfig`], allow to configure aspects
//!   of the `Node` behavior
//!
//! There are also some types / methods excluded from the docs, but they are hidden for a reason:
//! such APIs are considered an implementation detail and are exempt from semantic versioning.
//! (In other words, these APIs may change or be removed in any release without prior warning.)
//!
//! [`Node`]: struct.Node.html
//! [`NodeChannel`]: struct.NodeChannel.html
//! [`ShutdownHandle`]: struct.ShutdownHandle.html
//! [`NodeConfig`]: struct.NodeConfig.html

// spell-checker:ignore cors

#![warn(
    missing_debug_implementations,
    missing_docs,
    unsafe_code,
    bare_trait_objects
)]
#![warn(clippy::pedantic, clippy::nursery)]
#![allow(
    // Next `cast_*` lints don't give alternatives.
    clippy::cast_possible_wrap, clippy::cast_possible_truncation, clippy::cast_sign_loss,
    // Next lints produce too much noise/false positives.
    clippy::module_name_repetitions, clippy::similar_names, clippy::must_use_candidate,
    clippy::pub_enum_variant_names,
    // '... may panic' lints.
    clippy::indexing_slicing,
    // Too much work to fix.
    clippy::missing_errors_doc, clippy::missing_const_for_fn
)]

pub use crate::{
    connect_list::{ConnectInfo, ConnectListConfig},
    plugin::{NodePlugin, PluginApiContext, SharedNodeState},
};

use actix_rt::System;
use anyhow::{ensure, format_err};
use exonum::{
    blockchain::{
        config::GenesisConfig, ApiSender, Blockchain, BlockchainBuilder, BlockchainMut,
        ConsensusConfig, Schema, SendError,
    },
    crypto::{self, Hash, PublicKey},
    helpers::{user_agent, Height, Milliseconds, Round, ValidateInput, ValidatorId},
    keys::Keys,
    merkledb::{Database, ObjectHash},
    messages::{AnyTx, IntoMessage, SignedMessage, Verified},
    runtime::RuntimeInstance,
};
use exonum_api::{
    AllowOrigin, ApiAccess, ApiAggregator, ApiManager, ApiManagerConfig, UpdateEndpoints,
    WebServerConfig,
};
use futures::{
    channel::{mpsc, oneshot},
    prelude::*,
};
use log::{info, trace};
use serde_derive::{Deserialize, Serialize};
use tokio::time::delay_for;

use std::{
    collections::{HashMap, HashSet},
    convert::TryFrom,
    fmt, io,
    net::SocketAddr,
    sync::Arc,
    thread,
    time::{Duration, SystemTime},
};

use crate::{
    connect_list::ConnectList,
    events::{
        noise::HandshakeParams, HandlerPart, InternalEvent, InternalPart, InternalRequest,
        NetworkEvent, NetworkPart, NetworkRequest, SyncSender, TimeoutRequest,
    },
    messages::Connect,
    proposer::{ProposeBlock, StandardProposer},
    schema::NodeSchema,
    state::{RequestData, State},
};

mod basic;
mod connect_list;
mod consensus;
mod events;
mod events_impl;
pub mod helpers;
mod messages;
mod plugin;
pub mod proposer;
mod proto;
mod requests;
#[cfg(test)]
mod sandbox;
mod schema;
mod state;

// Logically private types re-exported for benchmarks.
#[doc(hidden)]
pub mod _bench_types {
    pub use crate::{
        events::{
            Event, EventHandler, EventOutcome, HandlerPart, InternalPart, InternalRequest,
            NetworkEvent,
        },
        messages::Message as PeerMessage,
    };
}

/// External messages sent to the node.
#[derive(Debug)]
#[non_exhaustive]
pub enum ExternalMessage {
    /// Add a new connection.
    PeerAdd(ConnectInfo),
    /// Enable or disable the node.
    Enable(bool),
    /// Shutdown the node.
    Shutdown,
}

/// Node timeout types.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) enum NodeTimeout {
    /// Status timeout with the current height.
    Status(Height),
    /// Round timeout.
    Round(Height, Round),
    /// `RequestData` timeout.
    Request(RequestData, Option<PublicKey>),
    /// Propose timeout.
    Propose(Height, Round),
    /// Update api shared state.
    UpdateApiState,
    /// Exchange peers timeout.
    PeerExchange,
    /// Flush uncommitted transactions into the database.
    FlushPool,
}

/// A helper trait that provides the node with information about the state of the system such
/// as current time or listen address.
pub(crate) trait SystemStateProvider: fmt::Debug + Send + 'static {
    /// Returns the current address that the node listens on.
    fn listen_address(&self) -> SocketAddr;
    /// Return the current system time.
    fn current_time(&self) -> SystemTime;
}

/// Handler responsible for the consensus algorithm.
///
/// # Stability
///
/// This type and its methods are considered an implementation detail of the Exonum node and are
/// thus exempt from semantic versioning.
pub(crate) struct NodeHandler {
    /// Shared API state.
    pub api_state: SharedNodeState,
    /// Blockchain.
    pub blockchain: BlockchainMut,
    /// Node plugins.
    plugins: Vec<Box<dyn NodePlugin>>,
    /// State of the `NodeHandler`.
    state: State,
    /// System state.
    system_state: Box<dyn SystemStateProvider>,
    /// Channel for messages and timeouts.
    channel: NodeSender,
    /// Known peer addresses.
    peer_discovery: Vec<String>,
    /// Does this node participate in the consensus?
    is_enabled: bool,
    /// Node role.
    node_role: NodeRole,
    /// Configuration file manager.
    config_manager: Option<Box<dyn ConfigManager>>,
    /// Can we speed up Propose with transaction pressure?
    allow_expedited_propose: bool,
    /// Block proposer.
    block_proposer: Box<dyn ProposeBlock>,
}

/// HTTP API configuration options.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct NodeApiConfig {
    /// Timeout to update API state.
    pub state_update_timeout: usize,
    /// Listen address for public API endpoints.
    pub public_api_address: Option<SocketAddr>,
    /// Listen address for private API endpoints.
    pub private_api_address: Option<SocketAddr>,
    /// Cross-origin resource sharing ([CORS][cors]) options for responses returned
    /// by public API handlers.
    ///
    /// [cors]: https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS
    pub public_allow_origin: Option<AllowOrigin>,
    /// Cross-origin resource sharing ([CORS][cors]) options for responses returned
    /// by private API handlers.
    ///
    /// [cors]: https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS
    pub private_allow_origin: Option<AllowOrigin>,
    /// HTTP server restart policy. The server is restarted each time the list of endpoints
    /// is updated (e.g., due to a new service initialization).
    #[serde(default)]
    pub server_restart: ServerRestartPolicy,
}

impl Default for NodeApiConfig {
    fn default() -> Self {
        Self {
            state_update_timeout: 10_000,
            public_api_address: None,
            private_api_address: None,
            public_allow_origin: None,
            private_allow_origin: None,
            server_restart: ServerRestartPolicy::default(),
        }
    }
}

/// HTTP server restart policy.
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct ServerRestartPolicy {
    /// The number of attempts to restart the HTTP server.
    pub max_retries: u16,
    /// The interval in milliseconds between attempts of restarting the HTTP server.
    pub retry_timeout: u64,
}

impl Default for ServerRestartPolicy {
    fn default() -> Self {
        Self {
            max_retries: 20,
            retry_timeout: 500,
        }
    }
}

/// P2P network configuration of an Exonum node.
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct NetworkConfiguration {
    /// Maximum number of incoming connections established with peers at any given time.
    pub max_incoming_connections: usize,
    /// Maximum number of outgoing connections established with peers at any given time.
    pub max_outgoing_connections: usize,
    /// Switches on [`TCP_NODELAY`] option.
    ///
    /// [`TCP_NODELAY`]: https://doc.rust-lang.org/std/net/struct.TcpStream.html#method.set_nodelay
    pub tcp_nodelay: bool,
    /// Allows to set interval between keep-alive TCP probes. If set to `None`, keep-alive probes
    /// will be disabled.
    pub tcp_keep_alive: Option<u64>,
    /// Retry timeout if an outgoing connection to a peer fails.
    pub tcp_connect_retry_timeout: Milliseconds,
    /// Maximum number of retries when connecting to a peer.
    pub tcp_connect_max_retries: u64,
}

impl Default for NetworkConfiguration {
    fn default() -> Self {
        Self {
            max_incoming_connections: 128,
            max_outgoing_connections: 128,
            tcp_keep_alive: None,
            tcp_nodelay: true,
            tcp_connect_retry_timeout: 15_000,
            tcp_connect_max_retries: 10,
        }
    }
}

/// Events pool capacities.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct EventsPoolCapacity {
    /// Maximum number of queued outgoing network messages.
    network_requests_capacity: usize,
    /// Maximum number of queued incoming network messages.
    network_events_capacity: usize,
    /// Maximum number of queued internal events.
    internal_events_capacity: usize,
    /// Maximum number of queued requests from api.
    api_requests_capacity: usize,
}

impl Default for EventsPoolCapacity {
    fn default() -> Self {
        Self {
            network_requests_capacity: 512,
            network_events_capacity: 512,
            internal_events_capacity: 128,
            api_requests_capacity: 1024,
        }
    }
}

/// Memory pool configuration parameters.
///
/// # Examples
///
/// For most applications, you should use the value returned by `MemoryPoolConfig::default()`, maybe
/// overwriting some fields afterwards:
///
/// ```
/// # use exonum_node::{FlushPoolStrategy, MemoryPoolConfig};
/// let mut pool_config = MemoryPoolConfig::default();
/// // Increase flush pool interval to 100 milliseconds.
/// pool_config.flush_pool_strategy = FlushPoolStrategy::Timeout {
///     timeout: 100,
/// };
/// // Use the config somewhere...
/// ```
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub struct MemoryPoolConfig {
    /// Sets the maximum number of messages that can be buffered on the event loop's
    /// notification channel before a send will fail.
    pub events_pool_capacity: EventsPoolCapacity,

    /// Interval in milliseconds between flushing the transaction cache to the persistent pool.
    ///
    /// This value influences how fast transactions appear in the persistent pool after they
    /// have been initially processed by the node. With `Never` setting, transactions will not
    /// be persisted at all before appearing in a block, which may negatively influence
    /// some applications (e.g., the `GET transactions` endpoint of the explorer service).
    /// On the other hand of the spectrum, there is `Immediate`, which flushes each transaction
    /// separately after its initial processing. In the middle, there is `Timeout`, which
    /// allows to specify the coherence interval for the pool.
    #[serde(default)]
    pub flush_pool_strategy: FlushPoolStrategy,
}

/// Strategy to flush transactions into the pool.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[non_exhaustive]
pub enum FlushPoolStrategy {
    /// Never flush the transactions to the persistent pool.
    ///
    /// This setting is best for performance, but may harm the availability of some APIs of the node
    /// (e.g., the `GET transactions` endpoint of the explorer service).
    Never,

    /// Flush transactions on the specified timeout in milliseconds.
    ///
    /// The recommended values are order of 20ms.
    Timeout {
        /// Timeout value in milliseconds.
        timeout: Milliseconds,
    },

    /// Flush each transaction after receiving it.
    ///
    /// Beware that this setting can harm performance of the node under high load.
    Immediate,
}

impl Default for FlushPoolStrategy {
    fn default() -> Self {
        Self::Timeout { timeout: 20 }
    }
}

/// Configuration for the `Node`.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct NodeConfig {
    /// Initial consensus configuration that will be written in the genesis block.
    pub consensus: ConsensusConfig,
    /// Network listening address.
    pub listen_address: SocketAddr,
    /// Remote Network address used by this node.
    pub external_address: String,
    /// P2P network configuration.
    pub network: NetworkConfiguration,
    /// HTTP API configuration.
    pub api: NodeApiConfig,
    /// Memory pool configuration.
    pub mempool: MemoryPoolConfig,
    /// List of peers the node will connect to on start.
    pub connect_list: ConnectListConfig,
    /// Number of threads allocated for transaction verification.
    pub thread_pool_size: Option<u8>,
}

impl ValidateInput for NodeConfig {
    type Error = anyhow::Error;

    fn validate(&self) -> Result<(), Self::Error> {
        let capacity = &self.mempool.events_pool_capacity;
        ensure!(
            capacity.internal_events_capacity > 3,
            "internal_events_capacity({}) must be strictly larger than 2",
            capacity.internal_events_capacity
        );
        ensure!(
            capacity.network_requests_capacity > 0,
            "network_requests_capacity({}) must be strictly larger than 0",
            capacity.network_requests_capacity
        );

        let restart_policy = &self.api.server_restart;
        ensure!(
            restart_policy.max_retries > 0,
            "`server_restart.max_retries` must be strictly larger than 0"
        );
        ensure!(
            restart_policy.retry_timeout > 0,
            "`server_restart.retry_timeout` must be strictly larger than 0"
        );

        // Sanity checks for cases of accidental negative overflows.
        let sanity_max = 2_usize.pow(16);
        ensure!(
            capacity.internal_events_capacity < sanity_max,
            "internal_events_capacity({}) must be smaller than {}",
            capacity.internal_events_capacity,
            sanity_max,
        );
        ensure!(
            capacity.network_requests_capacity < sanity_max,
            "network_requests_capacity({}) must be smaller than {}",
            capacity.network_requests_capacity,
            sanity_max,
        );
        self.consensus.validate()
    }
}

/// Configuration for the `NodeHandler`.
///
/// This type is considered an implementation detail of the node handler; it is exempt from
/// semantic versioning.
#[derive(Debug, Clone)]
pub(crate) struct Configuration {
    /// Connection list.
    pub connect_list: ConnectList,
    /// Network configuration.
    pub network: NetworkConfiguration,
    /// Known peer addresses.
    pub peer_discovery: Vec<String>,
    /// Memory pool configuration.
    pub mempool: MemoryPoolConfig,
    /// Validator keys.
    pub keys: Keys,
}

/// Channel for messages, timeouts and api requests. Consumed by the `NodeHandler` constructor.
#[derive(Debug)]
pub(crate) struct NodeSender {
    /// Internal requests sender.
    pub internal_requests: SyncSender<InternalRequest>,
    /// Network requests sender.
    pub network_requests: SyncSender<NetworkRequest>,
    /// Transactions sender.
    pub transactions: SyncSender<Verified<AnyTx>>,
    /// Api requests sender.
    pub api_requests: SyncSender<ExternalMessage>,
}

/// Node role.
#[derive(Debug, Clone, Copy)]
pub(crate) enum NodeRole {
    /// Validator node.
    Validator(ValidatorId),
    /// Auditor node.
    Auditor,
}

impl Default for NodeRole {
    fn default() -> Self {
        Self::Auditor
    }
}

impl NodeRole {
    /// Constructs new `NodeRole` from `validator_id`.
    pub fn new(validator_id: Option<ValidatorId>) -> Self {
        match validator_id {
            Some(validator_id) => Self::Validator(validator_id),
            None => Self::Auditor,
        }
    }

    /// Checks if node is validator.
    pub fn is_validator(self) -> bool {
        match self {
            Self::Validator(_) => true,
            _ => false,
        }
    }
}

impl NodeHandler {
    /// Creates `NodeHandler` using specified `Configuration`.
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        blockchain: BlockchainMut,
        external_address: &str,
        sender: NodeSender,
        system_state: Box<dyn SystemStateProvider>,
        config: Configuration,
        api_state: SharedNodeState,
        config_manager: Option<Box<dyn ConfigManager>>,
        block_proposer: Box<dyn ProposeBlock>,
    ) -> Self {
        let snapshot = blockchain.snapshot();
        let schema = Schema::new(&snapshot);
        let last_block = schema.last_block();
        let last_block_skip = schema.block_skip();
        let consensus_config = schema.consensus_config();
        info!("Creating a node with config: {:#?}", consensus_config);

        let connect = Connect::new(
            external_address,
            system_state.current_time().into(),
            &user_agent(),
        );
        let peers = NodeSchema::new(&blockchain.snapshot())
            .peers_cache()
            .iter()
            .collect();
        let peer_discovery = config.peer_discovery.clone();

        let state = State::new(
            config,
            consensus_config,
            connect,
            peers,
            &last_block,
            last_block_skip.as_ref(),
            system_state.current_time(),
        );

        let validator_id = state.validator_id();
        let node_role = NodeRole::new(validator_id);
        let is_enabled = api_state.is_enabled();
        api_state.set_node_role(node_role);

        Self {
            blockchain,
            api_state,
            plugins: vec![],
            system_state,
            state,
            channel: sender,
            peer_discovery,
            is_enabled,
            node_role,
            config_manager,
            allow_expedited_propose: true,
            block_proposer,
        }
    }

    fn sign_message<T>(&self, message: T) -> Verified<T>
    where
        T: TryFrom<SignedMessage> + IntoMessage,
    {
        Verified::from_value(
            message,
            self.state.keys().consensus_pk(),
            self.state.keys().consensus_sk(),
        )
    }

    /// Return internal `SharedNodeState`
    fn api_state(&self) -> &SharedNodeState {
        &self.api_state
    }

    /// Returns value of the `first_round_timeout` field from the current `ConsensusConfig`.
    fn first_round_timeout(&self) -> Milliseconds {
        self.state().consensus_config().first_round_timeout
    }

    /// Returns value of the `round_timeout_increase` field from the current `ConsensusConfig`.
    fn round_timeout_increase(&self) -> Milliseconds {
        (self.state().consensus_config().first_round_timeout
            * ConsensusConfig::TIMEOUT_LINEAR_INCREASE_PERCENT)
            / 100
    }

    /// Returns value of the `status_timeout` field from the current `ConsensusConfig`.
    fn status_timeout(&self) -> Milliseconds {
        self.state().consensus_config().status_timeout
    }

    /// Returns value of the `peers_timeout` field from the current `ConsensusConfig`.
    fn peers_timeout(&self) -> Milliseconds {
        self.state().consensus_config().peers_timeout
    }

    /// Returns value of the minimal propose timeout.
    fn min_propose_timeout(&self) -> Milliseconds {
        self.state().consensus_config().min_propose_timeout
    }

    /// Returns value of the maximal propose timeout.
    fn max_propose_timeout(&self) -> Milliseconds {
        self.state().consensus_config().max_propose_timeout
    }

    /// Returns threshold starting from which the minimal propose timeout value is used.
    fn propose_timeout_threshold(&self) -> u32 {
        self.state().consensus_config().propose_timeout_threshold
    }

    /// Returns `State` of the node.
    pub(crate) fn state(&self) -> &State {
        &self.state
    }

    /// Returns a mutable reference to the `State` of the node.
    #[cfg(test)]
    pub(crate) fn state_mut(&mut self) -> &mut State {
        &mut self.state
    }

    /// Performs node initialization, so it starts consensus process from the first round.
    pub fn initialize(&mut self) {
        let listen_address = self.system_state.listen_address();
        info!("Start listening address={}", listen_address);

        let peers: HashSet<_> = {
            let it = self.state.peers().values().map(Verified::author);
            let it = it.chain(
                self.state()
                    .connect_list()
                    .peers()
                    .into_iter()
                    .map(|i| i.public_key),
            );
            let it = it.filter(|address| address != &self.state.our_connect_message().author());
            it.collect()
        };

        for key in peers {
            self.connect(key);
            info!("Trying to connect with peer {}", key);
        }

        // Re-initialize epoch start to make it closer to the actual start time of the node.
        // While this is mainly useful for tests, there may be other cases in which the node
        // is not immediately started after construction.
        self.state
            .set_epoch_start_time(self.system_state.current_time());

        let snapshot = self.blockchain.snapshot();
        let schema = NodeSchema::new(&snapshot);
        // Recover previous saved round if any.
        let round = schema.consensus_round();
        self.state.jump_round(round);
        info!("Jump to round {}", round);

        self.add_timeouts();
        if self.state.is_leader() && round == Round(1) {
            // Propose the block after a short delay, which is used to gather initial transactions
            // (if any), connect to peers, etc.
            //
            // Note that this handles the case when the node has already proposed a block
            // at the same `(epoch, round)`; in this case, the node has at least its own `Prevote`,
            // so the `Propose` creation will be skipped in `handle_propose_timeout`.
            //
            // Restricting `round == Round(1)` is in order to prevent proposals with a large delay.
            // (Indeed, the delay in this case will be at least `first_round_timeout`, i.e.,
            // order of 3 seconds.) Heuristically, `round > Round(1)` increases probability that
            // the node's `(epoch, round)` is outdated.
            self.add_propose_timeout();
        }

        // Recover cached consensus messages if any. We do this after main initialization and before
        // the start of event processing.
        let messages = schema.consensus_messages_cache();
        for msg in messages.iter() {
            self.handle_message(msg);
        }
    }

    /// Runs the node's basic timers.
    fn add_timeouts(&mut self) {
        self.add_round_timeout();
        self.add_status_timeout();
        self.add_peer_exchange_timeout();
        self.add_update_api_state_timeout();
        self.maybe_add_flush_pool_timeout();
    }

    /// Sends the given message to a peer by its public key.
    fn send_to_peer<T: Into<SignedMessage>>(&mut self, public_key: PublicKey, message: T) {
        let message = message.into();
        let request = NetworkRequest::SendMessage(public_key, message);
        self.channel.network_requests.send(request);
    }

    /// Broadcasts given message to all peers.
    fn broadcast<M: Into<SignedMessage>>(&mut self, message: M) {
        let peers: Vec<PublicKey> = self
            .state
            .peers()
            .iter()
            .filter_map(|(pubkey, _)| {
                if self.state.connect_list().is_peer_allowed(pubkey) {
                    Some(*pubkey)
                } else {
                    None
                }
            })
            .collect();
        let message = message.into();
        for address in peers {
            self.send_to_peer(address, message.clone());
        }
    }

    /// Performs connection to the specified network address.
    fn connect(&mut self, key: PublicKey) {
        let connect = self.state.our_connect_message().clone();
        self.send_to_peer(key, connect);
    }

    /// Adds a timeout request.
    fn add_timeout(&mut self, timeout: NodeTimeout, time: SystemTime) {
        let request = TimeoutRequest(time, timeout);
        self.channel.internal_requests.send(request.into());
    }

    /// Adds request timeout if it isn't already requested.
    fn request(&mut self, data: RequestData, peer: PublicKey) {
        let is_new = self.state.request(data.clone(), peer);
        if is_new {
            self.add_request_timeout(data, None);
        }
    }

    /// Adds `NodeTimeout::Round` timeout to the channel.
    fn add_round_timeout(&mut self) {
        let time = self.round_start_time(self.state.round().next());
        trace!(
            "ADD ROUND TIMEOUT: time={:?}, height={}, round={}",
            time,
            self.state.epoch(),
            self.state.round()
        );
        let timeout = NodeTimeout::Round(self.state.epoch(), self.state.round());
        self.add_timeout(timeout, time);
    }

    /// Adds `NodeTimeout::Propose` timeout to the channel.
    fn add_propose_timeout(&mut self) {
        let timeout = if self.need_faster_propose() {
            self.min_propose_timeout()
        } else {
            self.max_propose_timeout()
        };

        let time = self.round_start_time(self.state.round()) + Duration::from_millis(timeout);

        trace!(
            "ADD PROPOSE TIMEOUT: time={:?}, height={}, round={}",
            time,
            self.state.epoch(),
            self.state.round()
        );
        let timeout = NodeTimeout::Propose(self.state.epoch(), self.state.round());
        self.add_timeout(timeout, time);
    }

    fn maybe_add_propose_timeout(&mut self) {
        if self.allow_expedited_propose && self.need_faster_propose() {
            info!("Add expedited propose timeout");
            self.add_propose_timeout();
            self.allow_expedited_propose = false;
        }
    }

    fn need_faster_propose(&self) -> bool {
        let snapshot = self.blockchain.snapshot();
        let pending_tx_count =
            Schema::new(&snapshot).transactions_pool_len() + self.state.tx_cache_len() as u64;
        pending_tx_count >= u64::from(self.propose_timeout_threshold())
    }

    /// Adds `NodeTimeout::Status` timeout to the channel.
    fn add_status_timeout(&mut self) {
        let time = self.system_state.current_time() + Duration::from_millis(self.status_timeout());
        let height = self.state.epoch();
        self.add_timeout(NodeTimeout::Status(height), time);
    }

    /// Adds `NodeTimeout::Request` timeout with `RequestData` to the channel.
    fn add_request_timeout(&mut self, data: RequestData, peer: Option<PublicKey>) {
        trace!("ADD REQUEST TIMEOUT");
        let time = self.system_state.current_time() + data.timeout();
        self.add_timeout(NodeTimeout::Request(data, peer), time);
    }

    /// Adds `NodeTimeout::PeerExchange` timeout to the channel.
    fn add_peer_exchange_timeout(&mut self) {
        trace!("ADD PEER EXCHANGE TIMEOUT");
        let time = self.system_state.current_time() + Duration::from_millis(self.peers_timeout());
        self.add_timeout(NodeTimeout::PeerExchange, time);
    }

    /// Adds `NodeTimeout::UpdateApiState` timeout to the channel.
    fn add_update_api_state_timeout(&mut self) {
        let time = self.system_state.current_time()
            + Duration::from_millis(self.api_state().state_update_timeout());
        self.add_timeout(NodeTimeout::UpdateApiState, time);
    }

    fn maybe_add_flush_pool_timeout(&mut self) {
        if let Some(timeout) = self.state().flush_pool_timeout() {
            let time = self.system_state.current_time() + timeout;
            self.add_timeout(NodeTimeout::FlushPool, time);
        }
    }

    /// Returns hash of the last block.
    fn last_block_hash(&self) -> Hash {
        self.blockchain.as_ref().last_block().object_hash()
    }

    /// Returns the number of uncommitted transactions.
    fn uncommitted_txs_count(&self) -> u64 {
        self.blockchain.as_ref().pool_size() + self.state.tx_cache_len() as u64
    }

    /// Returns start time of the requested round.
    fn round_start_time(&self, round: Round) -> SystemTime {
        // Round start time = H + (r - 1) * t0 + (r-1)(r-2)/2 * dt
        // Where:
        // H - height start time
        // t0 - Round(1) timeout length, dt - timeout increase value
        // r - round number, r = 1,2,...
        let previous_round: u64 = round.previous().into();
        let ms = previous_round * self.first_round_timeout()
            + (previous_round * previous_round.saturating_sub(1)) / 2
                * self.round_timeout_increase();
        self.state.epoch_start_time() + Duration::from_millis(ms)
    }
}

impl fmt::Debug for NodeHandler {
    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
        formatter
            .debug_struct("NodeHandler")
            .field("channel", &self.channel)
            .field("blockchain", &self.blockchain)
            .field("peer_discovery", &self.peer_discovery)
            .finish()
    }
}

/// Handle allowing to shut down the node.
#[derive(Debug, Clone)]
pub struct ShutdownHandle {
    inner: ApiSender<ExternalMessage>,
}

impl ShutdownHandle {
    /// Shuts down the node.
    ///
    /// # Return value
    ///
    /// The failure means that the node is already being shut down.
    pub async fn shutdown(mut self) -> Result<(), SendError> {
        self.inner.send_message(ExternalMessage::Shutdown).await
    }
}

/// Default system state provider implementation which uses `SystemTime::now`
/// to get the current time.
#[derive(Debug)]
struct DefaultSystemState(SocketAddr);

impl SystemStateProvider for DefaultSystemState {
    fn listen_address(&self) -> SocketAddr {
        self.0
    }

    fn current_time(&self) -> SystemTime {
        SystemTime::now()
    }
}

/// Channel between the node and external event producers / consumers.
#[derive(Debug)]
pub struct NodeChannel {
    /// Channel for network requests.
    pub(crate) network_requests: (mpsc::Sender<NetworkRequest>, mpsc::Receiver<NetworkRequest>),

    /// Channel for timeout requests.
    #[doc(hidden)] // public because of the `transactions` benchmark
    pub internal_requests: (
        mpsc::Sender<InternalRequest>,
        mpsc::Receiver<InternalRequest>,
    ),

    /// Channel for transferring API endpoints from producers (e.g., Rust runtime) to the
    /// `ApiManager`.
    endpoints: (
        mpsc::Sender<UpdateEndpoints>,
        mpsc::Receiver<UpdateEndpoints>,
    ),

    /// Channel for externally generated transactions.
    #[doc(hidden)] // public because of the `transactions` benchmark
    pub transactions: (
        mpsc::Sender<Verified<AnyTx>>,
        mpsc::Receiver<Verified<AnyTx>>,
    ),

    /// Channel for API requests.
    #[doc(hidden)] // public because of the `transactions` benchmark
    pub api_requests: (
        mpsc::Sender<ExternalMessage>,
        mpsc::Receiver<ExternalMessage>,
    ),

    /// Channel for network events.
    #[doc(hidden)] // public because of the `transactions` benchmark
    pub network_events: (mpsc::Sender<NetworkEvent>, mpsc::Receiver<NetworkEvent>),

    /// Channel for internal events.
    #[doc(hidden)] // public because of the `transactions` benchmark
    pub internal_events: (mpsc::Sender<InternalEvent>, mpsc::Receiver<InternalEvent>),
}

/// Interface of the configuration manager usable for updating node configuration on
/// the fly.
pub trait ConfigManager: Send {
    /// Update connect list in the node configuration.
    fn store_connect_list(&mut self, connect_list: ConnectListConfig);
}

/// Node capable of processing requests from external clients and participating in the consensus
/// algorithm.
///
/// # Signal Handling
///
/// By default, the node installs several signal hooks. This can be disabled by using
/// [`disable_signals()`] method in `NodeBuilder`.
///
/// On Unix, the node will gracefully shut down on receiving any of SIGINT, SIGTERM
/// or SIGQUIT and will ignore SIGHUP. On Windows, the node will gracefully shut down on
/// receiving `Ctrl + C` break. Graceful shutdown means, among other things, flushing uncommitted
/// transactions into the persistent storage.
///
/// [`disable_signals()`]: struct.NodeBuilder.html#method.disable_signals
#[derive(Debug)]
pub struct Node {
    api_manager_config: ApiManagerConfig,
    api_options: NodeApiConfig,
    network_config: NetworkConfiguration,
    handler: NodeHandler,
    channel: NodeChannel,
    max_message_len: u32,
    thread_pool_size: Option<u8>,
    disable_signals: bool,
}

impl Default for NodeChannel {
    fn default() -> Self {
        Self::new(&EventsPoolCapacity::default())
    }
}

impl NodeChannel {
    /// Creates `NodeChannel` with the given pool capacities.
    pub fn new(buffer_sizes: &EventsPoolCapacity) -> Self {
        Self {
            network_requests: mpsc::channel(buffer_sizes.network_requests_capacity),
            internal_requests: mpsc::channel(buffer_sizes.internal_events_capacity),
            endpoints: mpsc::channel(buffer_sizes.internal_events_capacity),
            transactions: mpsc::channel(buffer_sizes.api_requests_capacity),
            api_requests: mpsc::channel(buffer_sizes.api_requests_capacity),
            network_events: mpsc::channel(buffer_sizes.network_events_capacity),
            internal_events: mpsc::channel(buffer_sizes.internal_events_capacity),
        }
    }

    /// Returns the sender for API requests.
    pub fn api_sender(&self) -> ApiSender {
        ApiSender::new(self.transactions.0.clone())
    }

    /// Returns the sender for HTTP endpoints.
    pub fn endpoints_sender(&self) -> mpsc::Sender<UpdateEndpoints> {
        self.endpoints.0.clone()
    }

    /// Returns the channel for sending timeouts, networks and API requests.
    fn node_sender(&self) -> NodeSender {
        NodeSender {
            internal_requests: SyncSender::new(self.internal_requests.0.clone()),
            network_requests: SyncSender::new(self.network_requests.0.clone()),
            transactions: SyncSender::new(self.transactions.0.clone()),
            api_requests: SyncSender::new(self.api_requests.0.clone()),
        }
    }
}

/// Builder for `Node`.
pub struct NodeBuilder {
    channel: NodeChannel,
    blockchain_builder: BlockchainBuilder,
    node_config: NodeConfig,
    node_keys: Keys,
    config_manager: Option<Box<dyn ConfigManager>>,
    block_proposer: Box<dyn ProposeBlock>,
    plugins: Vec<Box<dyn NodePlugin>>,
    disable_signals: bool,
}

impl fmt::Debug for NodeBuilder {
    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
        formatter
            .debug_struct("NodeBuilder")
            .field("channel", &self.channel)
            .field("blockchain_builder", &self.blockchain_builder)
            .field("node_config", &self.node_config)
            .finish()
    }
}

impl NodeBuilder {
    /// Instantiates a builder.
    pub fn new(
        database: impl Into<Arc<dyn Database>>,
        node_config: NodeConfig,
        node_keys: Keys,
    ) -> Self {
        node_config
            .validate()
            .expect("Node configuration is inconsistent");

        let channel = NodeChannel::new(&node_config.mempool.events_pool_capacity);
        let blockchain = Blockchain::new(database, node_keys.service.clone(), channel.api_sender());
        let blockchain_builder = BlockchainBuilder::new(blockchain);

        Self {
            channel,
            blockchain_builder,
            node_config,
            node_keys,
            config_manager: None,
            plugins: vec![],
            block_proposer: Box::new(StandardProposer),
            disable_signals: false,
        }
    }

    /// Adds a genesis config to use if the blockchain is not initialized yet.
    pub fn with_genesis_config(mut self, genesis_config: GenesisConfig) -> Self {
        self.blockchain_builder = self.blockchain_builder.with_genesis_config(genesis_config);
        self
    }

    /// Adds a runtime to the blockchain.
    pub fn with_runtime<T>(mut self, runtime: T) -> Self
    where
        T: Into<RuntimeInstance>,
    {
        self.blockchain_builder = self.blockchain_builder.with_runtime(runtime);
        self
    }

    /// Adds a runtime which depends on a `NodeChannel` (e.g., to update HTTP API of the node).
    pub fn with_runtime_fn<T, F>(mut self, runtime_fn: F) -> Self
    where
        T: Into<RuntimeInstance>,
        F: FnOnce(&NodeChannel) -> T,
    {
        let runtime = runtime_fn(&self.channel);
        self.blockchain_builder = self.blockchain_builder.with_runtime(runtime);
        self
    }

    /// Adds the configuration manager.
    pub fn with_config_manager<T: ConfigManager + 'static>(mut self, manager: T) -> Self {
        self.config_manager = Some(Box::new(manager));
        self
    }

    /// Sets custom `Propose` creation logic for the node.
    ///
    /// # Stability and safety
    ///
    /// Using a custom proposer **CAN LEAD TO CONSENSUS FAILURE.** See the [`proposer`] module docs
    /// for more details.
    ///
    /// [`proposer`]: proposer/index.html
    pub fn with_block_proposer<T: ProposeBlock + 'static>(mut self, proposer: T) -> Self {
        self.block_proposer = Box::new(proposer);
        self
    }

    /// Adds a plugin.
    pub fn with_plugin<T: NodePlugin + 'static>(mut self, plugin: T) -> Self {
        self.plugins.push(Box::new(plugin));
        self
    }

    /// Switches off [default signal handling] for the node.
    /// This is useful to implement more complex signal handling, or one that differs
    /// from the default.
    ///
    /// If you use custom signal handlers, the node may be shut down gracefully using
    /// [`ShutdownHandle`].
    ///
    /// [`ShutdownHandle`]: struct.ShutdownHandle.html
    /// [default signal handling]: struct.Node.html#signal-handling
    pub fn disable_signals(mut self) -> Self {
        self.disable_signals = true;
        self
    }

    /// Converts this builder into a `Node`.
    pub fn build(self) -> Node {
        let blockchain = self.blockchain_builder.build();
        let mut node = Node::with_blockchain(
            blockchain,
            self.channel,
            self.node_config,
            self.node_keys,
            self.config_manager,
            self.plugins,
            self.block_proposer,
        );
        node.disable_signals = self.disable_signals;
        node
    }
}

impl Node {
    /// Creates a node for the given blockchain and node configuration.
    fn with_blockchain(
        blockchain: BlockchainMut,
        channel: NodeChannel,
        node_cfg: NodeConfig,
        node_keys: Keys,
        config_manager: Option<Box<dyn ConfigManager>>,
        plugins: Vec<Box<dyn NodePlugin>>,
        block_proposer: Box<dyn ProposeBlock>,
    ) -> Self {
        crypto::init();

        let peers = node_cfg.connect_list.addresses();
        let config = Configuration {
            connect_list: ConnectList::from_config(node_cfg.connect_list),
            mempool: node_cfg.mempool,
            network: node_cfg.network,
            peer_discovery: peers,
            keys: node_keys,
        };

        let api_state = SharedNodeState::new(node_cfg.api.state_update_timeout as u64);
        let mut api_aggregator = ApiAggregator::new();
        let plugin_api_context = PluginApiContext::new(
            blockchain.as_ref(),
            &api_state,
            ApiSender::new(channel.api_requests.0.clone()),
        );
        for plugin in &plugins {
            let endpoints = plugin.wire_api(plugin_api_context.clone());
            api_aggregator.extend(endpoints);
        }

        let system_state = Box::new(DefaultSystemState(node_cfg.listen_address));
        let network_config = config.network;
        let api_cfg = node_cfg.api.clone();

        let mut servers = HashMap::new();
        if let Some(listen_address) = api_cfg.public_api_address {
            let mut server_config = WebServerConfig::new(listen_address);
            server_config.allow_origin = api_cfg.public_allow_origin.clone();
            servers.insert(ApiAccess::Public, server_config);
        }
        if let Some(listen_address) = api_cfg.private_api_address {
            let mut server_config = WebServerConfig::new(listen_address);
            server_config.allow_origin = api_cfg.private_allow_origin.clone();
            servers.insert(ApiAccess::Private, server_config);
        }

        let restart_policy = node_cfg.api.server_restart;
        let api_runtime_config = ApiManagerConfig::new(servers, api_aggregator)
            .with_retries(restart_policy.retry_timeout, restart_policy.max_retries);

        let mut handler = NodeHandler::new(
            blockchain,
            &node_cfg.external_address,
            channel.node_sender(),
            system_state,
            config,
            api_state,
            config_manager,
            block_proposer,
        );
        handler.plugins = plugins;

        Self {
            api_options: api_cfg,
            handler,
            channel,
            network_config,
            max_message_len: node_cfg.consensus.max_message_len,
            thread_pool_size: node_cfg.thread_pool_size,
            api_manager_config: api_runtime_config,
            disable_signals: false,
        }
    }

    /// Launches only consensus messages handler.
    /// This may be used if you want to customize api with the `ApiContext`.
    async fn run_handler(mut self, handshake_params: HandshakeParams) -> anyhow::Result<()> {
        // Stop timeout sufficient to prevent undefined behavior in RocksDB destructor code
        // (see below).
        const STOP_TIMEOUT: Duration = Duration::from_millis(50);

        self.handler.initialize();
        let res = Reactor::new(self).run(handshake_params).await;

        // Wait for a little bit to prevent undefined behavior with RocksDB, when it is dropped
        // concurrently with the process exiting. By delaying, we give time for
        // the `actix` servers to exit (these servers contain the bulk of `Arc` pointers to RocksDB).
        // The exit happens once the node infrastructure (in particular, the sender of endpoints to
        // `exonum_api::ApiManager`) is dropped, i.e., before this point.
        //
        // We assume that waiting `STOP_TIMEOUT` is acceptable for typical use cases;
        // even if the process does not terminate with the node exit,
        // running a node is generally understood as a long-term task.
        delay_for(STOP_TIMEOUT).await;
        res
    }

    /// Launches a `Node` and optionally creates threads for public and private API handlers,
    /// depending on the provided `NodeConfig`.
    pub async fn run(self) -> anyhow::Result<()> {
        trace!("Running node.");

        // Runs NodeHandler.
        let handshake_params = HandshakeParams::new(
            &self.state().keys().consensus,
            self.state().connect_list(),
            self.state().our_connect_message().clone(),
            self.max_message_len,
        );
        self.run_handler(handshake_params).await
    }

    /// Returns `State` of the node.
    fn state(&self) -> &State {
        self.handler.state()
    }

    /// Returns the blockchain handle, which can be used to read blockchain state and send
    /// transactions to the node.
    pub fn blockchain(&self) -> &Blockchain {
        self.handler.blockchain.as_ref()
    }

    /// Returns a shutdown handle for the node. It is possible to instantiate multiple handles
    /// using this method; only the first call to shutdown the node is guaranteed to succeed
    /// (but this single call is enough to stop the node).
    pub fn shutdown_handle(&self) -> ShutdownHandle {
        ShutdownHandle {
            inner: ApiSender::new(self.channel.api_requests.0.clone()),
        }
    }
}

struct Reactor {
    handler_part: HandlerPart<NodeHandler>,
    network_part: NetworkPart,
    internal_part: InternalPart,
    api_part: oneshot::Receiver<io::Result<()>>,
    shutdown_handle: ShutdownHandle,
    // Flag indicating whether the reactor should explicitly handle signals.
    // If there is at least one actix HTTP server, signal handling will be performed by it,
    // so there is no need to perform it explicitly.
    needs_signal_handler: bool,
}

impl Reactor {
    fn new(node: Node) -> Self {
        let connect_message = node.state().our_connect_message().clone();
        let connect_list = node.state().connect_list();
        let shutdown_handle = node.shutdown_handle();

        let mut api_config = node.api_manager_config;
        api_config.disable_signals = node.disable_signals;
        let needs_signal_handler = !node.disable_signals && api_config.servers.is_empty();
        let api_manager = ApiManager::new(api_config);
        let endpoints = node.channel.endpoints.1;
        let api_task = api_manager.run(endpoints);
        let (api_part_tx, api_part) = oneshot::channel();

        // Creating a separate thread here seems easier than making `Node::run()` return
        // a non-`Send` future (which, e.g., precludes running nodes with `tokio::spawn`).
        thread::spawn(|| {
            let res = System::new("exonum-node").block_on(api_task);
            if let Err(ref err) = res {
                log::error!("Error in actix thread: {}", err);
            }
            api_part_tx.send(res).ok();
        });

        let (network_tx, network_rx) = node.channel.network_events;
        let internal_requests_rx = node.channel.internal_requests.1;
        let network_part = NetworkPart {
            our_connect_message: connect_message,
            listen_address: node.handler.system_state.listen_address(),
            network_requests: node.channel.network_requests.1,
            network_tx,
            network_config: node.network_config,
            max_message_len: node.max_message_len,
            connect_list,
        };

        let (internal_tx, internal_rx) = node.channel.internal_events;
        let handler_part = HandlerPart {
            handler: node.handler,
            internal_rx,
            network_rx,
            transactions_rx: node.channel.transactions.1,
            api_rx: node.channel.api_requests.1,
        };

        let internal_part = InternalPart {
            internal_tx,
            internal_requests_rx,
        };

        Self {
            handler_part,
            network_part,
            internal_part,
            api_part,
            shutdown_handle,
            needs_signal_handler,
        }
    }

    /// Listens to the same 3 signals as `actix`: SIGINT, SIGTERM, and SIGQUIT.
    #[cfg(unix)]
    #[allow(clippy::mut_mut)] // occurs in the `select!` macro
    async fn listen_to_signals() {
        use futures::StreamExt;
        use tokio::signal::unix::{signal, SignalKind};

        let int_listener = tokio::signal::ctrl_c().fuse();
        futures::pin_mut!(int_listener);

        // If setting the signal listener fails, we replace the listener with a stream
        // that never resolves.
        let mut term_listener = signal(SignalKind::terminate())
            .map_or_else(|_| stream::pending().right_stream(), StreamExt::left_stream)
            .fuse();

        let mut quit_listener = signal(SignalKind::quit())
            .map_or_else(|_| stream::pending().right_stream(), StreamExt::left_stream)
            .fuse();

        // We also register a SIGHUP listener which ignores the signal.
        if let Ok(mut hangup_listener) = signal(SignalKind::hangup()) {
            tokio::spawn(async move {
                while let Some(()) = hangup_listener.next().await {
                    log::info!("Received SIGHUP; ignoring");
                }
            });
        }

        futures::select! {
            _ = int_listener => (),
            _ = term_listener.next() => (),
            _ = quit_listener.next() => (),
        }
    }

    #[cfg(not(unix))]
    async fn listen_to_signals() {
        tokio::signal::ctrl_c().await.ok();
    }

    #[allow(clippy::mut_mut)] // occurs in the `select!` macro
    async fn run(self, handshake_params: HandshakeParams) -> anyhow::Result<()> {
        let internal_task = self.internal_part.run().fuse();
        futures::pin_mut!(internal_task);
        let network_task = self.network_part.run(handshake_params).fuse();
        futures::pin_mut!(network_task);
        let handler_task = self.handler_part.run().fuse();
        futures::pin_mut!(handler_task);
        let mut api_task = self.api_part.fuse();

        if self.needs_signal_handler {
            // Send the shutdown signal once we received a signal.
            let shutdown_handle = self.shutdown_handle.clone();
            let signal_rx = Self::listen_to_signals();

            tokio::spawn(async {
                signal_rx.await;
                shutdown_handle.shutdown().await.ok();
            });
        };

        // The remaining tasks are dropped after the first task is terminated,
        // which should free all associated resources.
        let (res, should_clean_up) = futures::select! {
            () = internal_task => (Ok(()), true),
            () = network_task => (Ok(()), true),
            () = handler_task => (Ok(()), false),

            res = api_task => {
                let res = match res {
                    Err(_) => Err(format_err!("Actix thread panicked")),
                    Ok(res) => res.map_err(From::from),
                };
                (res, true)
            }
        };

        if should_clean_up {
            // Ensure that we run the shutdown code, such as flushing messages to the storage.
            self.shutdown_handle.shutdown().await.ok();
            handler_task.await;
        }

        log::info!("Node terminated with status {:?}", res);
        res
    }
}

#[doc(hidden)]
pub fn generate_testnet_config(count: u16, start_port: u16) -> Vec<(NodeConfig, Keys)> {
    use exonum::blockchain::ValidatorKeys;

    let keys: Vec<_> = (0..count as usize).map(|_| Keys::random()).collect();
    let validator_keys = keys
        .iter()
        .map(|keys| ValidatorKeys::new(keys.consensus_pk(), keys.service_pk()))
        .collect();
    let consensus = ConsensusConfig::default().with_validator_keys(validator_keys);

    let peers = (0..keys.len())
        .map(|x| format!("127.0.0.1:{}", start_port + x as u16))
        .collect::<Vec<_>>();

    keys.into_iter()
        .enumerate()
        .map(|(idx, keys)| {
            let config = NodeConfig {
                listen_address: peers[idx].parse().unwrap(),
                external_address: peers[idx].clone(),
                network: NetworkConfiguration::default(),
                consensus: consensus.clone(),
                connect_list: ConnectListConfig::from_validator_keys(
                    &consensus.validator_keys,
                    &peers,
                ),
                api: NodeApiConfig::default(),
                mempool: MemoryPoolConfig::default(),
                thread_pool_size: None,
            };
            (config, keys)
        })
        .collect::<Vec<_>>()
}

#[cfg(test)]
mod tests {
    use exonum::merkledb::TemporaryDB;

    use super::*;

    #[test]
    fn test_good_internal_events_config() {
        let db = TemporaryDB::new();
        let (node_cfg, node_keys) = generate_testnet_config(1, 16_500).pop().unwrap();
        NodeBuilder::new(db, node_cfg, node_keys);
    }

    #[test]
    #[should_panic(expected = "internal_events_capacity(0) must be strictly larger than 2")]
    fn test_bad_internal_events_capacity_too_small() {
        let db = TemporaryDB::new();
        let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500).pop().unwrap();
        node_cfg
            .mempool
            .events_pool_capacity
            .internal_events_capacity = 0;
        NodeBuilder::new(db, node_cfg, node_keys);
    }

    #[test]
    #[should_panic(expected = "network_requests_capacity(0) must be strictly larger than 0")]
    fn test_bad_network_requests_capacity_too_small() {
        let db = TemporaryDB::new();
        let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500)[0].clone();
        node_cfg
            .mempool
            .events_pool_capacity
            .network_requests_capacity = 0;
        NodeBuilder::new(db, node_cfg, node_keys);
    }

    #[test]
    #[should_panic(expected = "must be smaller than 65536")]
    fn test_bad_internal_events_capacity_too_large() {
        let accidental_large_value = usize::max_value();
        let db = TemporaryDB::new();

        let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500).pop().unwrap();
        node_cfg
            .mempool
            .events_pool_capacity
            .internal_events_capacity = accidental_large_value;
        NodeBuilder::new(db, node_cfg, node_keys);
    }

    #[test]
    #[should_panic(expected = "must be smaller than 65536")]
    fn test_bad_network_requests_capacity_too_large() {
        let accidental_large_value = usize::max_value();
        let db = TemporaryDB::new();

        let (mut node_cfg, node_keys) = generate_testnet_config(1, 16_500)[0].clone();
        node_cfg
            .mempool
            .events_pool_capacity
            .network_requests_capacity = accidental_large_value;
        NodeBuilder::new(db, node_cfg, node_keys);
    }

    #[test]
    fn flush_pool_strategy_is_serializable() {
        let mut mempool_config = MemoryPoolConfig::default();
        let s = toml::to_string(&mempool_config).unwrap();
        let restored: MemoryPoolConfig = toml::from_str(&s).unwrap();
        assert_eq!(restored, mempool_config);

        mempool_config.flush_pool_strategy = FlushPoolStrategy::Never;
        let s = toml::to_string(&mempool_config).unwrap();
        let restored: MemoryPoolConfig = toml::from_str(&s).unwrap();
        assert_eq!(restored, mempool_config);

        let config_without_strategy = r#"
            [events_pool_capacity]
            network_requests_capacity = 512
            network_events_capacity = 512
            internal_events_capacity = 128
            api_requests_capacity = 1024
        "#;
        let restored: MemoryPoolConfig = toml::from_str(config_without_strategy).unwrap();
        assert_eq!(restored, MemoryPoolConfig::default());
    }
}