neptune_cash/
lib.rs

1// recursion limit for macros (e.g. triton_asm!)
2#![recursion_limit = "2048"]
3#![deny(clippy::shadow_unrelated)]
4//
5// enables nightly feature async_fn_track_caller for crate feature log-slow-write-lock.
6// log-slow-write-lock logs warning when a write-lock is held longer than 100 millis.
7// to enable: cargo +nightly build --features log-slow-write-lock
8#![cfg_attr(feature = "track-lock-location", feature(async_fn_track_caller))]
9//
10// If code coverage tool `cargo-llvm-cov` is running with the nightly toolchain,
11// enable the unstable “coverage” attribute. This allows using the annotation
12// `#[coverage(off)]` to explicitly exclude certain parts of the code from
13// being considered as “code under test.” Most prominently, the annotation
14// should be added to every `#[cfg(test)]` module. Since the “coverage”
15// feature is enable only conditionally, the annotation to use is:
16// `#[cfg_attr(coverage_nightly, coverage(off))]`.
17//
18// See also:
19// - https://github.com/Neptune-Crypto/neptune-core/issues/570
20// - https://github.com/taiki-e/cargo-llvm-cov#exclude-code-from-coverage
21// - https://github.com/rust-lang/rust/issues/84605
22#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
23
24// danda: making all of these pub for now, so docs are generated.
25// later maybe we ought to split some stuff out into re-usable crate(s)...?
26pub mod api;
27pub mod application;
28pub mod macros;
29pub mod prelude;
30pub mod protocol;
31pub mod state;
32pub mod util_types;
33
34#[cfg(test)]
35#[cfg_attr(coverage_nightly, coverage(off))]
36pub mod tests;
37
38#[cfg_attr(coverage_nightly, coverage(off))]
39pub mod bench_helpers;
40
41use std::env;
42use std::path::PathBuf;
43
44use anyhow::Context;
45use anyhow::Result;
46use application::config::cli_args;
47use chrono::DateTime;
48use chrono::Local;
49use chrono::Utc;
50use futures::future;
51use futures::Future;
52use futures::StreamExt;
53use itertools::Itertools;
54use prelude::tasm_lib;
55use prelude::triton_vm;
56use prelude::twenty_first;
57use protocol::consensus::block::Block;
58use protocol::peer::handshake_data::HandshakeData;
59use state::wallet::wallet_file::WalletFileContext;
60use state::GlobalState;
61use tarpc::server;
62use tarpc::server::incoming::Incoming;
63use tarpc::server::Channel;
64use tarpc::tokio_serde::formats::*;
65use tokio::net::TcpListener;
66use tokio::sync::broadcast;
67use tokio::sync::mpsc;
68use tokio::time::Instant;
69use tracing::debug;
70use tracing::info;
71use triton_vm::prelude::BFieldElement;
72
73use crate::application::config::data_directory::DataDirectory;
74use crate::application::json_rpc::server::http::RpcServer;
75use crate::application::locks::tokio as sync_tokio;
76use crate::application::loops::channel::MainToMiner;
77use crate::application::loops::channel::MainToPeerTask;
78use crate::application::loops::channel::MinerToMain;
79use crate::application::loops::channel::PeerTaskToMain;
80use crate::application::loops::channel::RPCServerToMain;
81use crate::application::loops::connect_to_peers::call_peer;
82use crate::application::loops::main_loop::MainLoopHandler;
83use crate::application::rpc::server::RPC;
84use crate::state::archival_state::ArchivalState;
85use crate::state::wallet::wallet_state::WalletState;
86use crate::state::GlobalStateLock;
87
88pub const SUCCESS_EXIT_CODE: i32 = 0;
89pub const COMPOSITION_FAILED_EXIT_CODE: i32 = 159;
90
91/// Magic string to ensure other program is Neptune Core
92pub const MAGIC_STRING_REQUEST: &[u8; 15] = b"7B8AB7FC438F411";
93pub const MAGIC_STRING_RESPONSE: &[u8; 15] = b"Hello Neptune!\n";
94const PEER_CHANNEL_CAPACITY: usize = 1000;
95const MINER_CHANNEL_CAPACITY: usize = 10;
96const RPC_CHANNEL_CAPACITY: usize = 1000;
97const VERSION: &str = env!("CARGO_PKG_VERSION");
98
99/// Causes compilation failures on targets where `u32` does not fit within a
100/// `usize`.
101const _MIN_PTR_WIDTH: () = {
102    #[cfg(target_pointer_width = "16")]
103    compile_error!("This crate requires a target pointer width of at least 32 bits.");
104};
105
106pub async fn initialize(cli_args: cli_args::Args) -> Result<MainLoopHandler> {
107    async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
108        tokio::spawn(fut);
109    }
110
111    // see comment for Network::performs_automated_mining()
112    if cli_args.mine() && !cli_args.network.performs_automated_mining() {
113        anyhow::bail!("Automatic mining is not supported for network {}.  Try again without --compose or --guess flags.", cli_args.network);
114    }
115
116    info!("Starting neptune-core node on {}.", cli_args.network);
117
118    // Get data directory (wallet, block database), create one if none exists
119    let data_directory = DataDirectory::get(cli_args.data_dir.clone(), cli_args.network)?;
120    DataDirectory::create_dir_if_not_exists(&data_directory.root_dir_path()).await?;
121    info!("Data directory is {}", data_directory);
122
123    let (rpc_server_to_main_tx, rpc_server_to_main_rx) =
124        mpsc::channel::<RPCServerToMain>(RPC_CHANNEL_CAPACITY);
125    let genesis = Block::genesis(cli_args.network);
126    let global_state =
127        GlobalState::try_new(data_directory.clone(), genesis, cli_args.clone()).await?;
128    let mut global_state_lock =
129        GlobalStateLock::from_global_state(global_state, rpc_server_to_main_tx.clone());
130
131    // Construct the broadcast channel to communicate from the main task to peer tasks
132    let (main_to_peer_broadcast_tx, _main_to_peer_broadcast_rx) =
133        broadcast::channel::<MainToPeerTask>(PEER_CHANNEL_CAPACITY);
134
135    // Add the MPSC (multi-producer, single consumer) channel for peer-task-to-main communication
136    let (peer_task_to_main_tx, peer_task_to_main_rx) =
137        mpsc::channel::<PeerTaskToMain>(PEER_CHANNEL_CAPACITY);
138
139    if let Some(block_import_directory) =
140        global_state_lock.cli().import_blocks_from_directory.clone()
141    {
142        info!(
143            "Importing blocks from directory \"{}\"",
144            block_import_directory.to_string_lossy()
145        );
146
147        let flush_period = global_state_lock.cli().import_block_flush_period;
148        let validate_blocks = !global_state_lock.cli().disable_validation_in_block_import;
149        let num_blocks_read = global_state_lock
150            .lock_guard_mut()
151            .await
152            .import_blocks_from_directory(&block_import_directory, flush_period, validate_blocks)
153            .await?;
154        info!("Successfully imported {num_blocks_read} blocks.");
155    }
156
157    if !cli_args.triton_vm_env_vars.is_empty() {
158        info!(
159            "Triton VM environment variables set to: {}",
160            cli_args.triton_vm_env_vars
161        );
162    }
163
164    if !cli_args.whitelisted_composers.is_empty() {
165        info!(
166            "Whitelisted composers:\n{}",
167            cli_args.whitelisted_composers.iter().join("\n")
168        );
169    }
170
171    // Check if we need to restore the wallet database, and if so, do it.
172    info!("Checking if we need to restore UTXOs");
173    global_state_lock
174        .lock_guard_mut()
175        .await
176        .restore_monitored_utxos_from_recovery_data()
177        .await?;
178    info!("UTXO restoration check complete");
179
180    // Bind socket to port on this machine, to handle incoming connections from peers
181    let incoming_peer_listener = if let Some(incoming_peer_listener) = cli_args.own_listen_port() {
182        let ret = TcpListener::bind((cli_args.peer_listen_addr, incoming_peer_listener))
183           .await
184           .with_context(|| format!("Failed to bind to local TCP port {}:{}. Is an instance of this program already running?", cli_args.peer_listen_addr, incoming_peer_listener))?;
185        info!("Now listening for incoming peer-connections");
186        ret
187    } else {
188        info!("Not accepting incoming peer-connections");
189        TcpListener::bind("127.0.0.1:0").await?
190    };
191
192    // Connect to peers, and provide each peer task with a thread-safe copy of the state
193    let own_handshake_data: HandshakeData =
194        global_state_lock.lock_guard().await.get_own_handshakedata();
195    info!(
196        "Most known canonical block has height {}",
197        own_handshake_data.tip_header.height
198    );
199    let mut task_join_handles = vec![];
200    for peer_address in global_state_lock.cli().peers.clone() {
201        let peer_state_var = global_state_lock.clone(); // bump arc refcount
202        let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> =
203            main_to_peer_broadcast_tx.subscribe();
204        let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = peer_task_to_main_tx.clone();
205        let peer_join_handle = tokio::task::spawn(async move {
206            call_peer(
207                peer_address,
208                peer_state_var.clone(),
209                main_to_peer_broadcast_rx_clone,
210                peer_task_to_main_tx_clone,
211                own_handshake_data,
212                1, // All outgoing connections have distance 1
213            )
214            .await;
215        });
216        task_join_handles.push(peer_join_handle);
217    }
218    debug!("Made outgoing connections to peers");
219
220    // Start mining tasks if requested
221    let (miner_to_main_tx, miner_to_main_rx) = mpsc::channel::<MinerToMain>(MINER_CHANNEL_CAPACITY);
222    let (main_to_miner_tx, main_to_miner_rx) = mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
223    let miner_state_lock = global_state_lock.clone(); // bump arc refcount.
224    if global_state_lock.cli().mine() {
225        let miner_join_handle = tokio::task::spawn(async move {
226            application::loops::mine_loop::mine(
227                main_to_miner_rx,
228                miner_to_main_tx,
229                miner_state_lock,
230            )
231            .await
232            .expect("Error in mining task");
233        });
234        task_join_handles.push(miner_join_handle);
235        info!("Started mining task");
236    }
237
238    // Start RPC server for CLI request and more. It's important that this is done as late
239    // as possible, so requests do not hang while initialization code runs.
240    let mut rpc_listener = tarpc::serde_transport::tcp::listen(
241        format!("127.0.0.1:{}", global_state_lock.cli().rpc_port),
242        Json::default,
243    )
244    .await?;
245    rpc_listener.config_mut().max_frame_length(usize::MAX);
246
247    let rpc_state_lock = global_state_lock.clone();
248
249    // each time we start neptune-core a new RPC cookie is generated.
250    let valid_tokens: Vec<application::rpc::auth::Token> = vec![
251        crate::application::rpc::auth::Cookie::try_new(&data_directory)
252            .await?
253            .into(),
254    ];
255
256    let rpc_join_handle = tokio::spawn(async move {
257        rpc_listener
258            // Ignore accept errors.
259            .filter_map(|r| future::ready(r.ok()))
260            .map(server::BaseChannel::with_defaults)
261            // Limit channels to 5 per IP. 1 for dashboard and a few more for CLI interactions
262            .max_channels_per_key(5, |t| t.transport().peer_addr().unwrap().ip())
263            // serve is generated by the service attribute. It takes as input any type implementing
264            // the generated RPC trait.
265            .map(move |channel| {
266                let server = application::rpc::server::NeptuneRPCServer::new(
267                    rpc_state_lock.clone(),
268                    rpc_server_to_main_tx.clone(),
269                    data_directory.clone(),
270                    valid_tokens.clone(),
271                );
272
273                channel.execute(server.serve()).for_each(spawn)
274            })
275            // Max 10 channels.
276            .buffer_unordered(10)
277            .for_each(|_| async {})
278            .await;
279    });
280    task_join_handles.push(rpc_join_handle);
281    info!("Started RPC server");
282
283    if let Some(addr) = global_state_lock.cli().listen_rpc {
284        let listener = TcpListener::bind(addr).await.unwrap();
285        let json_rpc_state_lock = global_state_lock.clone();
286
287        let json_rpc_join_handle = tokio::spawn(async move {
288            let rpc_server = RpcServer::new(json_rpc_state_lock);
289            rpc_server.serve(listener).await;
290        });
291        task_join_handles.push(json_rpc_join_handle);
292
293        info!("Started HTTP-JSON RPC server on {}.", addr);
294    }
295
296    // Handle incoming connections, messages from peer tasks, and messages from the mining task
297    Ok(MainLoopHandler::new(
298        incoming_peer_listener,
299        global_state_lock,
300        main_to_peer_broadcast_tx,
301        peer_task_to_main_tx,
302        main_to_miner_tx,
303        peer_task_to_main_rx,
304        miner_to_main_rx,
305        rpc_server_to_main_rx,
306        task_join_handles,
307    ))
308}
309
310/// Time a fn call.  Duration is returned as a float in seconds.
311pub fn time_fn_call<O>(f: impl FnOnce() -> O) -> (O, f64) {
312    let start = Instant::now();
313    let output = f();
314    let elapsed = start.elapsed();
315    let total_time = elapsed.as_secs() as f64 + f64::from(elapsed.subsec_nanos()) / 1e9;
316    (output, total_time)
317}
318
319/// Time an async fn call.  Duration is returned as a float in seconds.
320pub async fn time_fn_call_async<F, O>(f: F) -> (O, f64)
321where
322    F: std::future::Future<Output = O>,
323{
324    let start = Instant::now();
325    let output = f.await;
326    let elapsed = start.elapsed();
327    let total_time = elapsed.as_secs() as f64 + f64::from(elapsed.subsec_nanos()) / 1e9;
328    (output, total_time)
329}
330
331/// Converts a UTC millisecond timestamp (millis since 1970 UTC) into
332/// a `DateTime<Local>`, ie local-time.
333///
334/// # Return Value
335///
336/// Returns `None` if
337///  - the given argument cannot be converted to an `i64`, or
338///  - the given argument, after conversion to `i64`, is out of range (whatever
339///    that means).
340pub fn utc_timestamp_to_localtime<T>(timestamp: T) -> Option<DateTime<Local>>
341where
342    T: TryInto<i64>,
343    <T as TryInto<i64>>::Error: std::fmt::Debug,
344{
345    let millis: i64 = timestamp.try_into().ok()?;
346    let utc = DateTime::<Utc>::from_timestamp_millis(millis)?;
347    Some(utc.with_timezone(&Local))
348}
349
350#[cfg(feature = "log-lock_events")]
351pub(crate) fn current_thread_id() -> u64 {
352    // workaround: parse thread_id debug output into a u64.
353    // (because ThreadId::as_u64() is unstable)
354    let thread_id_dbg: String = format!("{:?}", std::thread::current().id());
355    let nums_u8 = &thread_id_dbg
356        .chars()
357        .filter_map(|c| {
358            if c.is_ascii_digit() {
359                Some(c as u8)
360            } else {
361                None
362            }
363        })
364        .collect::<Vec<u8>>();
365    let nums = String::from_utf8_lossy(nums_u8).to_string();
366
367    nums.parse::<u64>().unwrap()
368}
369
370// This is a callback fn passed to AtomicRw, AtomicMutex
371// and called when a lock event occurs.  This way
372// we can track which threads+tasks are acquiring
373// which locks for reads and/or mutations.
374pub(crate) fn log_tokio_lock_event_cb(lock_event: sync_tokio::LockEvent) {
375    #[cfg(feature = "log-lock_events")]
376    log_tokio_lock_event(&lock_event);
377
378    match lock_event.acquisition() {
379        #[cfg(feature = "log-slow-read-lock")]
380        sync_tokio::LockAcquisition::Read => log_slow_locks(&lock_event, "read"),
381        #[cfg(feature = "log-slow-write-lock")]
382        sync_tokio::LockAcquisition::Write => log_slow_locks(&lock_event, "write"),
383
384        _ => {}
385    }
386}
387
388// notes:
389//   1. this feature is very verbose in the logs.
390//   2. It's not really needed except when debugging lock acquisitions
391//   3. tracing-tests causes a big mem-leak for tests with this.
392#[cfg(feature = "log-lock_events")]
393pub(crate) fn log_tokio_lock_event(lock_event: &sync_tokio::LockEvent) {
394    use std::ops::Sub;
395
396    let tokio_id = match tokio::task::try_id() {
397        Some(id) => format!("{}", id),
398        None => "?".to_string(),
399    };
400
401    let location_str = match lock_event.location() {
402        Some(l) => format!("\n\t|-- acquirer: {}", l),
403        None => String::default(),
404    };
405    let waited_for_acquire_str = match (lock_event.try_acquire_at(), lock_event.acquire_at()) {
406        (Some(t), Some(a)) => format!(
407            "\n\t|-- waited for acquire: {} secs",
408            a.sub(t).as_secs_f32()
409        ),
410        _ => String::default(),
411    };
412    let held_str = match lock_event.acquire_at() {
413        Some(t) if matches!(lock_event, sync_tokio::LockEvent::Release { .. }) => {
414            format!("\n\t|-- held: {} secs", t.elapsed().as_secs_f32())
415        }
416        _ => String::default(),
417    };
418
419    let info = lock_event.info();
420
421    tracing::trace!(
422            ?lock_event,
423            "{} tokio lock `{}` of type `{}` for `{}` by\n\t|-- thread {}, (`{}`)\n\t|-- tokio task {}{}{}{}\n\t|--",
424            lock_event.event_type_name(),
425            info.name().unwrap_or("?"),
426            info.lock_type(),
427            lock_event.acquisition(),
428            current_thread_id(),
429            std::thread::current().name().unwrap_or("?"),
430            tokio_id,
431            location_str,
432            waited_for_acquire_str,
433            held_str,
434    );
435}
436
437#[cfg(any(feature = "log-slow-read-lock", feature = "log-slow-write-lock"))]
438pub(crate) fn log_slow_locks(event: &sync_tokio::LockEvent, read_or_write: &str) {
439    use std::ops::Sub;
440    if matches!(event, sync_tokio::LockEvent::Acquire { .. }) {
441        if let (Some(try_acquire_at), Some(acquire_at), Some(location)) =
442            (event.try_acquire_at(), event.acquire_at(), event.location())
443        {
444            let duration = acquire_at.sub(try_acquire_at);
445            let env_var = format!(
446                "LOG_SLOW_{}_LOCK_ACQUIRE_THRESHOLD",
447                read_or_write.to_uppercase()
448            );
449            let max_duration_secs = match std::env::var(env_var) {
450                Ok(t) => t.parse().unwrap(),
451                Err(_) => 0.1,
452            };
453
454            if duration.as_secs_f32() > max_duration_secs {
455                tracing::warn!(
456                    "{}-lock held for {} seconds. (exceeds max: {} secs)  location: {}",
457                    read_or_write,
458                    duration.as_secs_f32(),
459                    max_duration_secs,
460                    location
461                );
462            }
463        }
464    }
465
466    if let (Some(acquired_at), Some(location)) = (event.acquire_at(), event.location()) {
467        let duration = acquired_at.elapsed();
468        let env_var = format!("LOG_SLOW_{}_LOCK_THRESHOLD", read_or_write.to_uppercase());
469        let max_duration_secs = match std::env::var(env_var) {
470            Ok(t) => t.parse().unwrap(),
471            Err(_) => 0.1,
472        };
473
474        if duration.as_secs_f32() > max_duration_secs {
475            tracing::warn!(
476                "{}-lock held for {} seconds. (exceeds max: {} secs)  location: {}",
477                read_or_write,
478                duration.as_secs_f32(),
479                max_duration_secs,
480                location
481            );
482        }
483    }
484}
485
486const LOG_TOKIO_LOCK_EVENT_CB: sync_tokio::LockCallbackFn = log_tokio_lock_event_cb;
487
488/// for logging how long a scope takes to execute.
489///
490/// If an optional threshold value is provided then nothing will be
491/// logged unless execution duration exceeds the threshold.
492/// In that case a tracing::warn!() is logged.
493///
494/// If no threshold value is provided then a tracing::debug!()
495/// is always logged with the duration.
496///
497/// for convenience see macros:
498///  crate::macros::log_slow_scope,
499///  crate::macros::log_scope_duration,
500#[derive(Debug, Clone)]
501pub struct ScopeDurationLogger<'a> {
502    start: Instant,
503    description: &'a str,
504    log_slow_fn_threshold: Option<f64>,
505    location: &'static std::panic::Location<'static>,
506}
507impl<'a> ScopeDurationLogger<'a> {
508    #[track_caller]
509    pub fn new(description: &'a str, log_slow_fn_threshold: Option<f64>) -> Self {
510        Self {
511            start: Instant::now(),
512            description,
513            log_slow_fn_threshold,
514            location: std::panic::Location::caller(),
515        }
516    }
517
518    #[track_caller]
519    pub fn new_with_threshold(description: &'a str, log_slow_fn_threshold: f64) -> Self {
520        Self::new(description, Some(log_slow_fn_threshold))
521    }
522
523    #[track_caller]
524    pub fn new_default_threshold(description: &'a str) -> Self {
525        Self::new_with_threshold(
526            description,
527            match env::var("LOG_SLOW_SCOPE_THRESHOLD") {
528                Ok(t) => t.parse().unwrap(),
529                Err(_) => 0.001,
530            },
531        )
532    }
533
534    #[track_caller]
535    pub fn new_without_threshold(description: &'a str) -> Self {
536        Self::new(description, None)
537    }
538}
539
540impl Drop for ScopeDurationLogger<'_> {
541    fn drop(&mut self) {
542        let elapsed = self.start.elapsed();
543        let duration = elapsed.as_secs_f64();
544
545        if let Some(threshold) = self.log_slow_fn_threshold {
546            if duration >= threshold {
547                let msg = format!(
548                    "executed {} in {} secs.  exceeds slow fn threshold of {} secs.  location: {}",
549                    self.description, duration, threshold, self.location,
550                );
551
552                tracing::debug!("{}", msg);
553            }
554        } else {
555            let msg = format!(
556                "executed {} in {} secs.  location: {}",
557                self.description, duration, self.location,
558            );
559
560            tracing::debug!("{}", msg);
561        }
562    }
563}
564
565/// recursively copy source dir to destination
566pub(crate) fn copy_dir_recursive(source: &PathBuf, destination: &PathBuf) -> std::io::Result<()> {
567    if !source.is_dir() {
568        return Err(std::io::Error::new(
569            std::io::ErrorKind::NotADirectory,
570            format!("not a directory: {}", source.display()),
571        ));
572    }
573    std::fs::create_dir_all(destination)?;
574    for entry in std::fs::read_dir(source)? {
575        let entry = entry?;
576        let dest_path = &destination.join(entry.file_name());
577        if entry.path().is_dir() {
578            copy_dir_recursive(&entry.path(), dest_path)?;
579        } else {
580            std::fs::copy(entry.path(), dest_path)?;
581        }
582    }
583    Ok(())
584}