veilid-core 0.5.3

Core library used to create a Veilid node and operate it as part of an application
Documentation
use super::*;

impl_veilid_log_facility!("rpc");

#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)]
pub struct StatusResult {
    pub opt_sender_info: Option<SenderInfo>,
    pub opt_previous_sender_info: Option<SenderInfo>,
}

impl RPCProcessor {
    // Send StatusQ RPC request, receive StatusA answer
    // Can be sent via relays or routes, but will have less information via routes
    // sender:
    // unsafe -> node status
    // safe -> nothing
    // receiver:
    // direct -> node status + sender info
    // safety -> node status
    // private -> nothing
    #[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self), ret, err(level=Level::DEBUG)))]
    pub async fn rpc_call_status(
        &self,
        dest: Destination,
    ) -> RPCNetworkResult<Answer<StatusResult>> {
        let _guard = self
            .startup_context
            .startup_lock
            .enter()
            .map_err(RPCError::map_try_again("not started up"))?;

        // Determine routing domain and node status to send
        let routing_table = self.routing_table();
        let (opt_target_nr, routing_domain, node_status) = if let Some(UnsafeRoutingInfo {
            opt_node,
            opt_routing_domain,
        }) =
            dest.get_unsafe_routing_info(&routing_table)
        {
            let Some(routing_domain) = opt_routing_domain else {
                // Because this exits before calling 'question()',
                // a failure to find a routing domain constitutes a send failure
                // Record the send failure on the node
                let send_ts = Timestamp::now_non_decreasing();
                if let Some(node) = &opt_node {
                    self.record_send_failure(RPCKind::Question, send_ts, node.clone(), None, None);
                }
                return Ok(NetworkResult::no_connection_other(
                    "no routing domain for target",
                ));
            };

            let node_status = Some(self.network_manager().generate_node_status(routing_domain));
            (opt_node, routing_domain, node_status)
        } else {
            // Safety route means we don't exchange node status and things are all PublicInternet RoutingDomain
            (None, RoutingDomain::PublicInternet, None)
        };

        // Create status rpc question
        let status_q = RPCOperationStatusQ::new(node_status);
        let question = RPCQuestion::new(
            network_result_try!(self.get_destination_respond_to(&dest).await?),
            RPCQuestionDetail::StatusQ(Box::new(status_q)),
        );

        let debug_string = format!("Status => {}", dest);

        // Send the info request
        let waitable_reply =
            network_result_try!(self.question(dest.clone(), question, None, None).await?);

        // Note what kind of ping this was and to what peer scope
        let send_data_method = waitable_reply.context.send_data_result.clone();

        // Wait for reply
        let (msg, answer_context) = match self.wait_for_reply(waitable_reply, debug_string).await? {
            TimeoutOr::Timeout => return Ok(NetworkResult::Timeout),
            TimeoutOr::Value(v) => v,
        };

        // Get the right answer type
        let (_, _, kind) = msg.operation.destructure();
        let status_a = match kind {
            RPCOperationKind::Answer(a) => match a.destructure() {
                RPCAnswerDetail::StatusA(a) => a,
                _ => return Ok(NetworkResult::invalid_message("not a status answer")),
            },
            _ => return Ok(NetworkResult::invalid_message("not an answer")),
        };
        let (a_node_status, sender_info) = status_a.destructure();

        // Ensure the returned node status is the kind for the routing domain we asked for
        if let Some(target_nr) = opt_target_nr {
            if let Some(a_node_status) = a_node_status {
                // Update latest node status in routing table
                target_nr.update_node_status(routing_domain, a_node_status.clone());
            }
        }

        // Report sender_info IP addresses to network manager
        // Don't need to validate these addresses for the current routing domain
        // the address itself is irrelevant, and the remote node can lie anyway
        let mut opt_sender_info = None;
        let mut opt_previous_sender_info = None;
        match dest {
            Destination::Direct {
                node: target,
                safety_selection,
            } => {
                if matches!(safety_selection, SafetySelection::Unsafe(_)) {
                    if let Some(sender_info) = sender_info {
                        if send_data_method.is_direct() {
                            // Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as
                            // If this changes, we'd want to know about that to reset the networking stack
                            opt_previous_sender_info = target.report_sender_info(
                                routing_domain,
                                send_data_method.unique_flow().flow.protocol_type(),
                                send_data_method.unique_flow().flow.address_type(),
                                sender_info,
                            );
                        };
                        opt_sender_info = Some(sender_info);

                        // Report ping status results to network manager
                        if let Err(e) = self.event_bus().post(SocketAddressChangeEvent {
                            routing_domain,
                            socket_address: sender_info.socket_address,
                            old_socket_address: opt_previous_sender_info.map(|s| s.socket_address),
                            flow: send_data_method.unique_flow().flow,
                            reporting_peer: target.unfiltered(),
                        }) {
                            veilid_log!(self debug "Failed to post event: {}", e);
                        }
                    }
                }
            }
            Destination::Relay {
                relay_di: _,
                node: _,
            }
            | Destination::PrivateRoute {
                private_route: _,
                safety_selection: _,
            } => {
                // sender info is irrelevant over relays and routes
            }
        };
        Ok(NetworkResult::value(Answer::new(
            answer_context,
            StatusResult {
                opt_sender_info,
                opt_previous_sender_info,
            },
        )))
    }

    ////////////////////////////////////////////////////////////////////////////////////////////////

    #[cfg_attr(feature = "instrument", instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err))]
    pub(super) async fn process_status_q(&self, msg: Message) -> RPCNetworkResult<()> {
        // Get the question
        let kind = msg.operation.kind().clone();
        let status_q = match kind {
            RPCOperationKind::Question(q) => match q.destructure() {
                (_, RPCQuestionDetail::StatusQ(q)) => q,
                _ => panic!("not a status question"),
            },
            _ => panic!("not a question"),
        };
        let q_node_status = status_q.destructure();

        let (node_status, sender_info) = match &msg.header.detail {
            RPCMessageHeaderDetail::Direct(detail) => {
                let flow = detail.flow;
                let routing_domain = detail.routing_domain;

                // Ensure the node status from the question is the kind for the routing domain we received the request in
                if let Some(q_node_status) = q_node_status {
                    // update node status for the requesting node to our routing table
                    if let Some(sender_nr) = msg.opt_sender_nr.clone() {
                        // Update latest node status in routing table for the statusq sender
                        sender_nr.update_node_status(routing_domain, q_node_status.clone());
                    }
                }

                // Get the peer address in the returned sender info
                let sender_info = SenderInfo {
                    socket_address: *flow.remote_address(),
                };

                // Make status answer
                let node_status = self.network_manager().generate_node_status(routing_domain);
                (Some(node_status), Some(sender_info))
            }
            RPCMessageHeaderDetail::SafetyRouted(_) => {
                // Make status answer
                let node_status = self
                    .network_manager()
                    .generate_node_status(RoutingDomain::PublicInternet);
                (Some(node_status), None)
            }
            RPCMessageHeaderDetail::PrivateRouted(_) => (None, None),
        };

        // Make status answer
        let status_a = RPCOperationStatusA::new(node_status, sender_info);

        // Send status answer
        self.answer(
            msg,
            RPCAnswer::new(RPCAnswerDetail::StatusA(Box::new(status_a))),
            None,
        )
        .await
    }
}