1#![recursion_limit = "2048"]
3#![deny(clippy::shadow_unrelated)]
4#![cfg_attr(feature = "track-lock-location", feature(async_fn_track_caller))]
9#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
23
24pub mod api;
27pub mod application;
28pub mod macros;
29pub mod prelude;
30pub mod protocol;
31pub mod state;
32pub mod util_types;
33
34#[cfg(test)]
35#[cfg_attr(coverage_nightly, coverage(off))]
36pub mod tests;
37
38#[cfg_attr(coverage_nightly, coverage(off))]
39pub mod bench_helpers;
40
41use std::env;
42use std::path::PathBuf;
43
44use anyhow::Context;
45use anyhow::Result;
46use application::config::cli_args;
47use chrono::DateTime;
48use chrono::Local;
49use chrono::Utc;
50use futures::future;
51use futures::Future;
52use futures::StreamExt;
53use itertools::Itertools;
54use prelude::tasm_lib;
55use prelude::triton_vm;
56use prelude::twenty_first;
57use protocol::consensus::block::Block;
58use protocol::peer::handshake_data::HandshakeData;
59use state::wallet::wallet_file::WalletFileContext;
60use state::GlobalState;
61use tarpc::server;
62use tarpc::server::incoming::Incoming;
63use tarpc::server::Channel;
64use tarpc::tokio_serde::formats::*;
65use tokio::net::TcpListener;
66use tokio::sync::broadcast;
67use tokio::sync::mpsc;
68use tokio::time::Instant;
69use tracing::debug;
70use tracing::info;
71use triton_vm::prelude::BFieldElement;
72
73use crate::application::config::data_directory::DataDirectory;
74use crate::application::json_rpc::server::http::RpcServer;
75use crate::application::locks::tokio as sync_tokio;
76use crate::application::loops::channel::MainToMiner;
77use crate::application::loops::channel::MainToPeerTask;
78use crate::application::loops::channel::MinerToMain;
79use crate::application::loops::channel::PeerTaskToMain;
80use crate::application::loops::channel::RPCServerToMain;
81use crate::application::loops::connect_to_peers::call_peer;
82use crate::application::loops::main_loop::MainLoopHandler;
83use crate::application::rpc::server::RPC;
84use crate::state::archival_state::ArchivalState;
85use crate::state::wallet::wallet_state::WalletState;
86use crate::state::GlobalStateLock;
87
88pub const SUCCESS_EXIT_CODE: i32 = 0;
89pub const COMPOSITION_FAILED_EXIT_CODE: i32 = 159;
90
91pub const MAGIC_STRING_REQUEST: &[u8; 15] = b"7B8AB7FC438F411";
93pub const MAGIC_STRING_RESPONSE: &[u8; 15] = b"Hello Neptune!\n";
94const PEER_CHANNEL_CAPACITY: usize = 1000;
95const MINER_CHANNEL_CAPACITY: usize = 10;
96const RPC_CHANNEL_CAPACITY: usize = 1000;
97const VERSION: &str = env!("CARGO_PKG_VERSION");
98
99const _MIN_PTR_WIDTH: () = {
102 #[cfg(target_pointer_width = "16")]
103 compile_error!("This crate requires a target pointer width of at least 32 bits.");
104};
105
106pub async fn initialize(cli_args: cli_args::Args) -> Result<MainLoopHandler> {
107 async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
108 tokio::spawn(fut);
109 }
110
111 if cli_args.mine() && !cli_args.network.performs_automated_mining() {
113 anyhow::bail!("Automatic mining is not supported for network {}. Try again without --compose or --guess flags.", cli_args.network);
114 }
115
116 info!("Starting neptune-core node on {}.", cli_args.network);
117
118 let data_directory = DataDirectory::get(cli_args.data_dir.clone(), cli_args.network)?;
120 DataDirectory::create_dir_if_not_exists(&data_directory.root_dir_path()).await?;
121 info!("Data directory is {}", data_directory);
122
123 let (rpc_server_to_main_tx, rpc_server_to_main_rx) =
124 mpsc::channel::<RPCServerToMain>(RPC_CHANNEL_CAPACITY);
125 let genesis = Block::genesis(cli_args.network);
126 let global_state =
127 GlobalState::try_new(data_directory.clone(), genesis, cli_args.clone()).await?;
128 let mut global_state_lock =
129 GlobalStateLock::from_global_state(global_state, rpc_server_to_main_tx.clone());
130
131 let (main_to_peer_broadcast_tx, _main_to_peer_broadcast_rx) =
133 broadcast::channel::<MainToPeerTask>(PEER_CHANNEL_CAPACITY);
134
135 let (peer_task_to_main_tx, peer_task_to_main_rx) =
137 mpsc::channel::<PeerTaskToMain>(PEER_CHANNEL_CAPACITY);
138
139 if let Some(block_import_directory) =
140 global_state_lock.cli().import_blocks_from_directory.clone()
141 {
142 info!(
143 "Importing blocks from directory \"{}\"",
144 block_import_directory.to_string_lossy()
145 );
146
147 let flush_period = global_state_lock.cli().import_block_flush_period;
148 let validate_blocks = !global_state_lock.cli().disable_validation_in_block_import;
149 let num_blocks_read = global_state_lock
150 .lock_guard_mut()
151 .await
152 .import_blocks_from_directory(&block_import_directory, flush_period, validate_blocks)
153 .await?;
154 info!("Successfully imported {num_blocks_read} blocks.");
155 }
156
157 if !cli_args.triton_vm_env_vars.is_empty() {
158 info!(
159 "Triton VM environment variables set to: {}",
160 cli_args.triton_vm_env_vars
161 );
162 }
163
164 if !cli_args.whitelisted_composers.is_empty() {
165 info!(
166 "Whitelisted composers:\n{}",
167 cli_args.whitelisted_composers.iter().join("\n")
168 );
169 }
170
171 info!("Checking if we need to restore UTXOs");
173 global_state_lock
174 .lock_guard_mut()
175 .await
176 .restore_monitored_utxos_from_recovery_data()
177 .await?;
178 info!("UTXO restoration check complete");
179
180 let incoming_peer_listener = if let Some(incoming_peer_listener) = cli_args.own_listen_port() {
182 let ret = TcpListener::bind((cli_args.peer_listen_addr, incoming_peer_listener))
183 .await
184 .with_context(|| format!("Failed to bind to local TCP port {}:{}. Is an instance of this program already running?", cli_args.peer_listen_addr, incoming_peer_listener))?;
185 info!("Now listening for incoming peer-connections");
186 ret
187 } else {
188 info!("Not accepting incoming peer-connections");
189 TcpListener::bind("127.0.0.1:0").await?
190 };
191
192 let own_handshake_data: HandshakeData =
194 global_state_lock.lock_guard().await.get_own_handshakedata();
195 info!(
196 "Most known canonical block has height {}",
197 own_handshake_data.tip_header.height
198 );
199 let mut task_join_handles = vec![];
200 for peer_address in global_state_lock.cli().peers.clone() {
201 let peer_state_var = global_state_lock.clone(); let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> =
203 main_to_peer_broadcast_tx.subscribe();
204 let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = peer_task_to_main_tx.clone();
205 let peer_join_handle = tokio::task::spawn(async move {
206 call_peer(
207 peer_address,
208 peer_state_var.clone(),
209 main_to_peer_broadcast_rx_clone,
210 peer_task_to_main_tx_clone,
211 own_handshake_data,
212 1, )
214 .await;
215 });
216 task_join_handles.push(peer_join_handle);
217 }
218 debug!("Made outgoing connections to peers");
219
220 let (miner_to_main_tx, miner_to_main_rx) = mpsc::channel::<MinerToMain>(MINER_CHANNEL_CAPACITY);
222 let (main_to_miner_tx, main_to_miner_rx) = mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
223 let miner_state_lock = global_state_lock.clone(); if global_state_lock.cli().mine() {
225 let miner_join_handle = tokio::task::spawn(async move {
226 application::loops::mine_loop::mine(
227 main_to_miner_rx,
228 miner_to_main_tx,
229 miner_state_lock,
230 )
231 .await
232 .expect("Error in mining task");
233 });
234 task_join_handles.push(miner_join_handle);
235 info!("Started mining task");
236 }
237
238 let mut rpc_listener = tarpc::serde_transport::tcp::listen(
241 format!("127.0.0.1:{}", global_state_lock.cli().rpc_port),
242 Json::default,
243 )
244 .await?;
245 rpc_listener.config_mut().max_frame_length(usize::MAX);
246
247 let rpc_state_lock = global_state_lock.clone();
248
249 let valid_tokens: Vec<application::rpc::auth::Token> = vec![
251 crate::application::rpc::auth::Cookie::try_new(&data_directory)
252 .await?
253 .into(),
254 ];
255
256 let rpc_join_handle = tokio::spawn(async move {
257 rpc_listener
258 .filter_map(|r| future::ready(r.ok()))
260 .map(server::BaseChannel::with_defaults)
261 .max_channels_per_key(5, |t| t.transport().peer_addr().unwrap().ip())
263 .map(move |channel| {
266 let server = application::rpc::server::NeptuneRPCServer::new(
267 rpc_state_lock.clone(),
268 rpc_server_to_main_tx.clone(),
269 data_directory.clone(),
270 valid_tokens.clone(),
271 );
272
273 channel.execute(server.serve()).for_each(spawn)
274 })
275 .buffer_unordered(10)
277 .for_each(|_| async {})
278 .await;
279 });
280 task_join_handles.push(rpc_join_handle);
281 info!("Started RPC server");
282
283 if let Some(addr) = global_state_lock.cli().listen_rpc {
284 let listener = TcpListener::bind(addr).await.unwrap();
285 let json_rpc_state_lock = global_state_lock.clone();
286
287 let json_rpc_join_handle = tokio::spawn(async move {
288 let rpc_server = RpcServer::new(json_rpc_state_lock);
289 rpc_server.serve(listener).await;
290 });
291 task_join_handles.push(json_rpc_join_handle);
292
293 info!("Started HTTP-JSON RPC server on {}.", addr);
294 }
295
296 Ok(MainLoopHandler::new(
298 incoming_peer_listener,
299 global_state_lock,
300 main_to_peer_broadcast_tx,
301 peer_task_to_main_tx,
302 main_to_miner_tx,
303 peer_task_to_main_rx,
304 miner_to_main_rx,
305 rpc_server_to_main_rx,
306 task_join_handles,
307 ))
308}
309
310pub fn time_fn_call<O>(f: impl FnOnce() -> O) -> (O, f64) {
312 let start = Instant::now();
313 let output = f();
314 let elapsed = start.elapsed();
315 let total_time = elapsed.as_secs() as f64 + f64::from(elapsed.subsec_nanos()) / 1e9;
316 (output, total_time)
317}
318
319pub async fn time_fn_call_async<F, O>(f: F) -> (O, f64)
321where
322 F: std::future::Future<Output = O>,
323{
324 let start = Instant::now();
325 let output = f.await;
326 let elapsed = start.elapsed();
327 let total_time = elapsed.as_secs() as f64 + f64::from(elapsed.subsec_nanos()) / 1e9;
328 (output, total_time)
329}
330
331pub fn utc_timestamp_to_localtime<T>(timestamp: T) -> Option<DateTime<Local>>
341where
342 T: TryInto<i64>,
343 <T as TryInto<i64>>::Error: std::fmt::Debug,
344{
345 let millis: i64 = timestamp.try_into().ok()?;
346 let utc = DateTime::<Utc>::from_timestamp_millis(millis)?;
347 Some(utc.with_timezone(&Local))
348}
349
350#[cfg(feature = "log-lock_events")]
351pub(crate) fn current_thread_id() -> u64 {
352 let thread_id_dbg: String = format!("{:?}", std::thread::current().id());
355 let nums_u8 = &thread_id_dbg
356 .chars()
357 .filter_map(|c| {
358 if c.is_ascii_digit() {
359 Some(c as u8)
360 } else {
361 None
362 }
363 })
364 .collect::<Vec<u8>>();
365 let nums = String::from_utf8_lossy(nums_u8).to_string();
366
367 nums.parse::<u64>().unwrap()
368}
369
370pub(crate) fn log_tokio_lock_event_cb(lock_event: sync_tokio::LockEvent) {
375 #[cfg(feature = "log-lock_events")]
376 log_tokio_lock_event(&lock_event);
377
378 match lock_event.acquisition() {
379 #[cfg(feature = "log-slow-read-lock")]
380 sync_tokio::LockAcquisition::Read => log_slow_locks(&lock_event, "read"),
381 #[cfg(feature = "log-slow-write-lock")]
382 sync_tokio::LockAcquisition::Write => log_slow_locks(&lock_event, "write"),
383
384 _ => {}
385 }
386}
387
388#[cfg(feature = "log-lock_events")]
393pub(crate) fn log_tokio_lock_event(lock_event: &sync_tokio::LockEvent) {
394 use std::ops::Sub;
395
396 let tokio_id = match tokio::task::try_id() {
397 Some(id) => format!("{}", id),
398 None => "?".to_string(),
399 };
400
401 let location_str = match lock_event.location() {
402 Some(l) => format!("\n\t|-- acquirer: {}", l),
403 None => String::default(),
404 };
405 let waited_for_acquire_str = match (lock_event.try_acquire_at(), lock_event.acquire_at()) {
406 (Some(t), Some(a)) => format!(
407 "\n\t|-- waited for acquire: {} secs",
408 a.sub(t).as_secs_f32()
409 ),
410 _ => String::default(),
411 };
412 let held_str = match lock_event.acquire_at() {
413 Some(t) if matches!(lock_event, sync_tokio::LockEvent::Release { .. }) => {
414 format!("\n\t|-- held: {} secs", t.elapsed().as_secs_f32())
415 }
416 _ => String::default(),
417 };
418
419 let info = lock_event.info();
420
421 tracing::trace!(
422 ?lock_event,
423 "{} tokio lock `{}` of type `{}` for `{}` by\n\t|-- thread {}, (`{}`)\n\t|-- tokio task {}{}{}{}\n\t|--",
424 lock_event.event_type_name(),
425 info.name().unwrap_or("?"),
426 info.lock_type(),
427 lock_event.acquisition(),
428 current_thread_id(),
429 std::thread::current().name().unwrap_or("?"),
430 tokio_id,
431 location_str,
432 waited_for_acquire_str,
433 held_str,
434 );
435}
436
437#[cfg(any(feature = "log-slow-read-lock", feature = "log-slow-write-lock"))]
438pub(crate) fn log_slow_locks(event: &sync_tokio::LockEvent, read_or_write: &str) {
439 use std::ops::Sub;
440 if matches!(event, sync_tokio::LockEvent::Acquire { .. }) {
441 if let (Some(try_acquire_at), Some(acquire_at), Some(location)) =
442 (event.try_acquire_at(), event.acquire_at(), event.location())
443 {
444 let duration = acquire_at.sub(try_acquire_at);
445 let env_var = format!(
446 "LOG_SLOW_{}_LOCK_ACQUIRE_THRESHOLD",
447 read_or_write.to_uppercase()
448 );
449 let max_duration_secs = match std::env::var(env_var) {
450 Ok(t) => t.parse().unwrap(),
451 Err(_) => 0.1,
452 };
453
454 if duration.as_secs_f32() > max_duration_secs {
455 tracing::warn!(
456 "{}-lock held for {} seconds. (exceeds max: {} secs) location: {}",
457 read_or_write,
458 duration.as_secs_f32(),
459 max_duration_secs,
460 location
461 );
462 }
463 }
464 }
465
466 if let (Some(acquired_at), Some(location)) = (event.acquire_at(), event.location()) {
467 let duration = acquired_at.elapsed();
468 let env_var = format!("LOG_SLOW_{}_LOCK_THRESHOLD", read_or_write.to_uppercase());
469 let max_duration_secs = match std::env::var(env_var) {
470 Ok(t) => t.parse().unwrap(),
471 Err(_) => 0.1,
472 };
473
474 if duration.as_secs_f32() > max_duration_secs {
475 tracing::warn!(
476 "{}-lock held for {} seconds. (exceeds max: {} secs) location: {}",
477 read_or_write,
478 duration.as_secs_f32(),
479 max_duration_secs,
480 location
481 );
482 }
483 }
484}
485
486const LOG_TOKIO_LOCK_EVENT_CB: sync_tokio::LockCallbackFn = log_tokio_lock_event_cb;
487
488#[derive(Debug, Clone)]
501pub struct ScopeDurationLogger<'a> {
502 start: Instant,
503 description: &'a str,
504 log_slow_fn_threshold: Option<f64>,
505 location: &'static std::panic::Location<'static>,
506}
507impl<'a> ScopeDurationLogger<'a> {
508 #[track_caller]
509 pub fn new(description: &'a str, log_slow_fn_threshold: Option<f64>) -> Self {
510 Self {
511 start: Instant::now(),
512 description,
513 log_slow_fn_threshold,
514 location: std::panic::Location::caller(),
515 }
516 }
517
518 #[track_caller]
519 pub fn new_with_threshold(description: &'a str, log_slow_fn_threshold: f64) -> Self {
520 Self::new(description, Some(log_slow_fn_threshold))
521 }
522
523 #[track_caller]
524 pub fn new_default_threshold(description: &'a str) -> Self {
525 Self::new_with_threshold(
526 description,
527 match env::var("LOG_SLOW_SCOPE_THRESHOLD") {
528 Ok(t) => t.parse().unwrap(),
529 Err(_) => 0.001,
530 },
531 )
532 }
533
534 #[track_caller]
535 pub fn new_without_threshold(description: &'a str) -> Self {
536 Self::new(description, None)
537 }
538}
539
540impl Drop for ScopeDurationLogger<'_> {
541 fn drop(&mut self) {
542 let elapsed = self.start.elapsed();
543 let duration = elapsed.as_secs_f64();
544
545 if let Some(threshold) = self.log_slow_fn_threshold {
546 if duration >= threshold {
547 let msg = format!(
548 "executed {} in {} secs. exceeds slow fn threshold of {} secs. location: {}",
549 self.description, duration, threshold, self.location,
550 );
551
552 tracing::debug!("{}", msg);
553 }
554 } else {
555 let msg = format!(
556 "executed {} in {} secs. location: {}",
557 self.description, duration, self.location,
558 );
559
560 tracing::debug!("{}", msg);
561 }
562 }
563}
564
565pub(crate) fn copy_dir_recursive(source: &PathBuf, destination: &PathBuf) -> std::io::Result<()> {
567 if !source.is_dir() {
568 return Err(std::io::Error::new(
569 std::io::ErrorKind::NotADirectory,
570 format!("not a directory: {}", source.display()),
571 ));
572 }
573 std::fs::create_dir_all(destination)?;
574 for entry in std::fs::read_dir(source)? {
575 let entry = entry?;
576 let dest_path = &destination.join(entry.file_name());
577 if entry.path().is_dir() {
578 copy_dir_recursive(&entry.path(), dest_path)?;
579 } else {
580 std::fs::copy(entry.path(), dest_path)?;
581 }
582 }
583 Ok(())
584}