snarkos-utilities 4.6.2

Utilities for a decentralized operating system
Documentation
// Copyright (c) 2019-2026 Provable Inc.
// This file is part of the snarkOS library.

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "locktick")]
use locktick::parking_lot::{Mutex, RwLock};
#[cfg(not(feature = "locktick"))]
use parking_lot::{Mutex, RwLock};

use std::sync::{
    Arc,
    atomic::{AtomicBool, Ordering},
};

use tokio::sync::oneshot;

use tracing::{debug, error, trace};

/// Generic trait that can be queried for whether current process should be stopped.
/// This is implemented by `SignalHandler` and `SimpleStoppable`.
pub trait Stoppable: Send + Sync {
    /// Initiates shutdown of the node.
    fn stop(&self);

    /// Returns `true` if the node is (in the process of being) stopped.
    fn is_stopped(&self) -> bool;
}

/// Wrapper around `AtomicBool` that implements the `Stoppable` trait.
///
/// This is useful when no signal or complex shutdown handling is necessary (e.g., in a test environment).
pub struct SimpleStoppable {
    state: AtomicBool,
}

impl SimpleStoppable {
    pub fn new() -> Arc<Self> {
        Arc::new(Self { state: AtomicBool::new(false) })
    }
}

impl Stoppable for SimpleStoppable {
    fn stop(&self) {
        self.state.store(true, Ordering::SeqCst);
    }

    fn is_stopped(&self) -> bool {
        self.state.load(Ordering::SeqCst)
    }
}

/// Helper for signal handling that implements the `Stoppable` trait.
///
/// This struct will set itself to "stopped" as soon as the process receives Ctrl+C.
/// It can also be manually stopped (e.g., when the node encounters a fatal error)
pub struct SignalHandler {
    /// This sender is used to notify a waiting task that the node has been stopped.
    /// If this is `None`, the node is in the process of shutting down.
    stopped_sender: RwLock<Option<oneshot::Sender<()>>>,

    /// This receiver is used to wait for the node to be stopped.
    stopped_receiver: Mutex<Option<oneshot::Receiver<()>>>,
}

impl SignalHandler {
    /// Spawns a background tasks that listens for Ctrl+C and returns `Self`.
    pub fn new() -> Arc<Self> {
        let (stopped_sender, stopped_receiver) = oneshot::channel();
        let obj = Arc::new(Self {
            stopped_sender: RwLock::new(Some(stopped_sender)),
            stopped_receiver: Mutex::new(Some(stopped_receiver)),
        });

        {
            let obj = obj.clone();
            tokio::spawn(async move {
                obj.handle_signals().await;
            });
        }

        obj
    }

    /// Logic for the background task that waits for a signal.
    async fn handle_signals(&self) {
        #[cfg(target_family = "unix")]
        let signal_listener = async move {
            use tokio::signal::unix::{SignalKind, signal};

            // Handle SIGINT, SIGTERM, SIGQUIT, and SIGHUP.
            let mut s_int = signal(SignalKind::interrupt())?;
            let mut s_term = signal(SignalKind::terminate())?;
            let mut s_quit = signal(SignalKind::quit())?;
            let mut s_hup = signal(SignalKind::hangup())?;

            tokio::select!(
                _ = s_int.recv() => trace!("Received SIGINT"),
                _ = s_term.recv() => trace!("Received SIGTERM"),
                _ = s_quit.recv() => trace!("Received SIGQUIT"),
                _ = s_hup.recv() => trace!("Received SIGHUP"),
            );

            std::io::Result::<()>::Ok(())
        };

        #[cfg(not(target_family = "unix"))]
        let signal_listener = async move {
            tokio::signal::ctrl_c().await?;
            std::io::Result::<()>::Ok(())
        };

        // Block until we receive a signal.
        match signal_listener.await {
            Ok(()) => debug!("Received signal, shutting down..."),
            Err(error) => error!("tokio::signal encountered an error: {error}"),
        }

        self.stop();
    }

    /// Waits until the signal handler was invoked or the stopped flag was set some other way.
    ///
    /// Note: This can only be called once, and must not be called concurrently.
    pub async fn wait_for_signals(&self) {
        let Some(receiver) = self.stopped_receiver.lock().take() else {
            panic!("wait_for_signals must be called at most once");
        };

        if let Err(err) = receiver.await {
            error!("wait_for_signals encountered an error: {err}");
        }
    }
}

impl Stoppable for SignalHandler {
    fn stop(&self) {
        if let Some(stopped_sender) = self.stopped_sender.write().take() {
            let _ = stopped_sender.send(());
        }
    }

    fn is_stopped(&self) -> bool {
        self.stopped_sender.read().is_none()
    }
}