sn_node 0.72.41

The Safe Network Node Implementation.
Documentation
// Copyright 2023 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::FlowCtrl;

use crate::node::{
    core::NodeContext, flow_ctrl::cmds::Cmd, membership::Membership, node_starter::CmdChannel,
    MyNode,
};

use sn_interface::{messaging::system::NodeMsg, types::log_markers::LogMarker};

use std::{collections::BTreeSet, sync::Arc, time::Duration};
use tokio::{sync::RwLock, time::Instant};

const PROBE_INTERVAL: Duration = Duration::from_secs(300);
const RELOCATION_TIMEOUT_SECS: Duration = Duration::from_secs(60);
const MISSING_VOTE_INTERVAL: Duration = Duration::from_secs(5);
const MISSING_DKG_MSG_INTERVAL: Duration = Duration::from_secs(5);
// const SECTION_PROBE_INTERVAL: Duration = Duration::from_secs(300);
const FAULT_CHECK_INTERVAL: Duration = Duration::from_secs(5);
// 30 adult nodes checked per minute., so each node should be queried 10x in 10 mins
// Which should hopefully trigger fault if we're not getting responses back
// const ADULT_HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(2);
// const ELDER_HEALTH_CHECK_INTERVAL: Duration = Duration::from_secs(3);

pub(super) struct PeriodicChecksTimestamps {
    last_probe: Instant,
    // last_section_probe: Instant,
    // last_adult_health_check: Instant,
    // last_elder_health_check: Instant,
    last_vote_check: Instant,
    last_dkg_msg_check: Instant,
    last_fault_check: Instant,
    last_relocation_retry_check: Instant,
}

impl PeriodicChecksTimestamps {
    pub(super) fn now() -> Self {
        Self {
            last_probe: Instant::now(),
            // last_section_probe: Instant::now(),
            // last_adult_health_check: Instant::now(),
            // last_elder_health_check: Instant::now(),
            last_vote_check: Instant::now(),
            last_dkg_msg_check: Instant::now(),
            last_fault_check: Instant::now(),
            last_relocation_retry_check: Instant::now(),
        }
    }

    /// Check if any of the periodic checks have elapsed
    pub(crate) fn something_expired(&self) -> bool {
        self.last_vote_check.elapsed() > MISSING_VOTE_INTERVAL
            || self.last_probe.elapsed() > PROBE_INTERVAL
            || self.last_dkg_msg_check.elapsed() > MISSING_DKG_MSG_INTERVAL
            || self.last_fault_check.elapsed() > FAULT_CHECK_INTERVAL
            || self.last_relocation_retry_check.elapsed() > RELOCATION_TIMEOUT_SECS
    }
}

impl FlowCtrl {
    /// Generate and fire commands for all types of periodic checks
    pub(super) async fn perform_periodic_checks(&mut self) {
        if !self.timestamps.something_expired() {
            return;
        }

        let context = self.node.read().await.context();

        if !context.is_elder {
            // self.enqueue_cmds_for_adult_periodic_checks(context).await;

            // we've pushed what we have as an adult and processed incoming msgs
            // and cmds... so we can return already
            return;
        }

        self.enqueue_cmds_for_elder_periodic_checks(&context).await;
    }

    // /// Periodic tasks run for adults only
    // async fn enqueue_cmds_for_adult_periodic_checks(&mut self, context: &NodeContext) {
    //     let mut cmds = vec![];

    //     // if we've passed enough time, section probe
    //     if self.timestamps.last_section_probe.elapsed() > SECTION_PROBE_INTERVAL {
    //         self.timestamps.last_section_probe = Instant::now();
    //         cmds.push(Self::probe_the_section(context).await);
    //     }

    //     for cmd in cmds {
    //         if let Err(error) = self.cmd_sender_channel.send((cmd, vec![])).await {
    //             error!("Error queuing adult periodic check: {error:?}");
    //         }
    //     }
    // }

    /// Periodic tasks run for elders only
    async fn enqueue_cmds_for_elder_periodic_checks(&mut self, context: &NodeContext) {
        let now = Instant::now();
        let mut cmds = vec![];

        if self.timestamps.last_probe.elapsed() > PROBE_INTERVAL {
            self.timestamps.last_probe = now;
            if let Some(cmd) = Self::probe_the_network(context).await {
                cmds.push(cmd);
            }
        }

        // TODO: reevaluate both adult and elder health checks. Does this make sense mocking a client req?
        // how do we handle the need for a bidi stream (if using the whole client flow?)

        // if self.timestamps.last_adult_health_check.elapsed() > ADULT_HEALTH_CHECK_INTERVAL {
        //     debug!(" ----> adult health periodics sgtart");
        //     self.timestamps.last_adult_health_check = now;
        //     let health_cmds = match Self::perform_health_checks(self.node.clone()).await {
        //         Ok(cmds) => cmds,
        //         Err(error) => {
        //             error!("Error handling client msg to perform health check: {error:?}");
        //             vec![]
        //         }
        //     };
        //     cmds.extend(health_cmds);
        //     debug!(" ----> adult health periodics done");
        // }

        // // The above health check only queries for chunks
        // // here we specifically ask for AE prob msgs and manually
        // // track faults
        // if self.timestamps.last_elder_health_check.elapsed() > ELDER_HEALTH_CHECK_INTERVAL {
        //     self.timestamps.last_elder_health_check = now;
        //     for cmd in Self::health_check_elders_in_section(context).await {
        //         cmds.push(cmd);
        //     }
        // }

        if self.timestamps.last_vote_check.elapsed() > MISSING_VOTE_INTERVAL {
            self.timestamps.last_vote_check = now;
            let read_locked_node = self.node.read().await;

            debug!(" ----> vote periodics start");
            for cmd in self
                .check_for_missed_votes(context, &read_locked_node.membership)
                .await
            {
                cmds.push(cmd);
            }
            debug!(" ----> vote periodics done");
        }

        if self.timestamps.last_dkg_msg_check.elapsed() > MISSING_DKG_MSG_INTERVAL {
            debug!(" ----> dkg msg periodics start");
            self.timestamps.last_dkg_msg_check = now;
            Self::check_for_missed_dkg_messages(self.node.clone(), self.cmd_sender_channel.clone())
                .await;
            debug!(" ----> dkg msg periodics done");
        }

        if self.timestamps.last_fault_check.elapsed() > FAULT_CHECK_INTERVAL {
            self.timestamps.last_fault_check = now;
            cmds.extend(self.vote_out_faulty_nodes().await);
        }

        // This check keeps relocation retrying if it times out.
        if let Some(proof) = &context.relocation_proof {
            if !context.network_knowledge.is_section_member(&context.name) {
                if self.timestamps.last_relocation_retry_check.elapsed() > RELOCATION_TIMEOUT_SECS {
                    self.timestamps.last_relocation_retry_check = Instant::now();
                    cmds.push(MyNode::send_msg_to_our_elders_await_responses(
                        context.clone(),
                        NodeMsg::TryJoin(Some(proof.clone())),
                    ));
                }
            } else {
                trace!("{}", LogMarker::RelocateEnd);
                debug!("We've joined a section, dropping the relocation proof.");
                let mut node = self.node.write().await;
                debug!("[NODE WRITE]: handling relocation periodic check write gottt...");
                node.relocation_proof = None;
            }
        }

        for cmd in cmds {
            if let Err(error) = self.cmd_sender_channel.send((cmd, vec![])).await {
                error!("Error queuing std periodic check: {error:?}");
            }
        }
    }

    // /// Initiates and generates all the subsequent Cmds to perform a healthcheck
    // async fn perform_health_checks(node: Arc<RwLock<MyNode>>) -> Result<Vec<Cmd>> {
    //     info!("Starting to check the section's health");
    //     let node = node.read().await;
    //     let our_prefix = node.network_knowledge.prefix();

    //     // random chunk addr will be sent to relevant nodes in the section.
    //     let chunk_addr = xor_name::rand::random();

    //     let chunk_addr = our_prefix.substituted_in(chunk_addr);

    //     let msg = ClientMsg::Query(DataQuery {
    //         variant: DataQueryVariant::GetChunk(ChunkAddress(chunk_addr)),
    //         adult_index: 0,
    //     });

    //     let msg_id = MsgId::new();
    //     let our_info = node.info();
    //     let origin = our_info.peer();

    //     let auth = auth(&node, &msg)?;

    //     // generate the cmds, and ensure we go through fault tracking
    //     node.handle_valid_client_msg(
    //         msg_id,
    //         msg,
    //         auth,
    //         origin,
    //     )
    //     .await
    // }

    /// Generates a probe msg, which goes to up to three random sections in order to
    /// passively maintain network knowledge over time.
    async fn probe_the_network(context: &NodeContext) -> Option<Cmd> {
        let prefix = context.network_knowledge.prefix();

        // Send a probe message if we are an elder
        // but dont bother if we're the first section
        if !prefix.is_empty() {
            info!("Probing network");
            match MyNode::generate_probe_msg(context) {
                Ok(cmd) => Some(cmd),
                Err(error) => {
                    error!("Could not generate probe msg: {error:?}");
                    None
                }
            }
        } else {
            None
        }
    }

    // /// Generates a probe msg, which goes to our elders in order to
    // /// passively maintain network knowledge over time
    // async fn probe_the_section(context: &NodeContext) -> Cmd {
    //     // Send a probe message to an elder
    //     info!("Starting to probe section");
    //     MyNode::generate_section_probe_msg(context)
    // }

    // /// Generates a probe msg, which goes to all section elders in order to
    // /// passively maintain network knowledge over time and track faults.
    // /// Tracking faults while awaiting a response.
    // async fn health_check_elders_in_section(context: &NodeContext) -> Vec<Cmd> {
    //     let mut cmds = vec![];

    //     // Send a probe message to an elder
    //     debug!("Going to health check elders");

    //     let elders = context.network_knowledge.elders();
    //     for elder in elders {
    //         // we track a knowledge issue
    //         // whhich is countered when an AE-Update is
    //         cmds.push(Cmd::TrackNodeIssue {
    //             name: elder.name(),
    //             issue: sn_fault_detection::IssueType::AeProbeMsg,
    //         });
    //     }

    //     // Send a probe message to an elder
    //     cmds.push(MyNode::generate_section_probe_msg(context));

    //     cmds
    // }

    /// Checks the interval since last vote received during a generation
    async fn check_for_missed_votes(
        &self,
        context: &NodeContext,
        membership_context: &Option<Membership>,
    ) -> Vec<Cmd> {
        info!("Checking for missed votes");
        let mut cmds = vec![];
        if let Some(membership) = &membership_context {
            let last_received_vote_time = membership.last_received_vote_time();

            if let Some(time) = last_received_vote_time {
                // we want to resend the prev vote
                if time.elapsed() >= MISSING_VOTE_INTERVAL {
                    debug!("Vote consensus appears stalled...");
                    if let Some(cmd) =
                        MyNode::membership_gossip_votes(context, membership_context).await
                    {
                        trace!("Vote resending cmd: {cmd:?}");

                        cmds.push(cmd);
                    }
                    // we may also be behind, so lets request AE incase that is the case!
                    let msg = NodeMsg::MembershipAE(membership.generation());
                    cmds.push(MyNode::send_msg_to_our_elders(context, msg));
                }
            }
        }
        cmds
    }

    /// Checks the interval since last dkg vote received
    async fn check_for_missed_dkg_messages(node: Arc<RwLock<MyNode>>, cmd_channel: CmdChannel) {
        info!("Checking for DKG missed messages");

        // DKG checks can be long running, move off thread to unblock the main loop
        let _handle = tokio::task::spawn(async move {
            debug!("[NODE READ]: dkg msg lock attempt");
            let node = node.read().await;
            debug!("[NODE READ]: dkg msg lock got");

            let dkg_voter = &node.dkg_voter;

            let last_received_dkg_message = dkg_voter.last_received_dkg_message();

            if let Some(time) = last_received_dkg_message {
                if time.elapsed() >= MISSING_DKG_MSG_INTERVAL {
                    let cmds = node.dkg_gossip_msgs();
                    if !cmds.is_empty() {
                        debug!("Dkg msg resending cmd, as Dkg voting appears stalled...");
                    }

                    for cmd in cmds {
                        if let Err(error) = cmd_channel.send((cmd, vec![])).await {
                            error!("Error sending DKG gossip msgs {error:?}");
                        }
                    }
                }
            }
        });
    }

    async fn vote_out_faulty_nodes(&mut self) -> Vec<Cmd> {
        info!("Voting out faulty nodes");
        let mut cmds = vec![];
        let faulty_nodes = self.get_faulty_node_names().await;

        if !faulty_nodes.is_empty() {
            debug!("{:?} : {faulty_nodes:?}", LogMarker::ProposeOffline);
            let mut fault_set = BTreeSet::new();
            for node in faulty_nodes {
                let _prev = fault_set.insert(node);
            }
            cmds.push(Cmd::ProposeVoteNodesOffline(fault_set))
        }

        cmds
    }
}

// fn auth(node: &MyNode, msg: &ClientMsg) -> Result<AuthorityProof<ClientAuth>> {
//     let keypair = node.keypair.clone();
//     let payload = WireMsg::serialize_msg_payload(&msg)?;
//     let signature = keypair.sign(&payload);

//     let auth = ClientAuth {
//         public_key: PublicKey::Ed25519(keypair.public),
//         signature: Signature::Ed25519(signature),
//     };

//     Ok(AuthorityProof::verify(auth, payload)?)
// }