ave-core 0.11.0

Averiun Ledger core runtime and node API
Documentation
use std::{
    collections::{BTreeMap, BTreeSet},
    sync::Arc,
};

use async_trait::async_trait;
use ave_actors::{
    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
    NotPersistentActor,
};
use ave_common::{
    Namespace, SchemaType, ValueWrapper,
    identity::{DigestIdentifier, HashAlgorithm, PublicKey},
};
use ave_network::ComunicateInfo;
use tracing::{Span, debug, error, info_span, warn};

use crate::{
    Signed,
    governance::role_register::CurrentSchemaRoles,
    helpers::network::service::NetworkSender,
    metrics::try_core_metrics,
    model::common::emit_fail,
    validation::worker::{CurrentWorkerRoles, ValiWorker, ValiWorkerMessage},
};

use super::request::ValidationReq;

#[derive(Clone, Debug)]
pub struct ValidationSchema {
    pub our_key: Arc<PublicKey>,
    pub governance_id: DigestIdentifier,
    pub gov_version: u64,
    pub sn: u64,
    pub schema_id: SchemaType,
    pub creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
    pub init_state: ValueWrapper,
    pub current_roles: CurrentSchemaRoles,
    pub hash: HashAlgorithm,
    pub network: Arc<NetworkSender>,
}

#[derive(Debug, Clone)]
pub enum ValidationSchemaMessage {
    NetworkRequest {
        validation_req: Box<Signed<ValidationReq>>,
        info: ComunicateInfo,
        sender: PublicKey,
    },
    Update {
        creators: BTreeMap<PublicKey, BTreeSet<Namespace>>,
        sn: u64,
        gov_version: u64,
        init_state: ValueWrapper,
        current_roles: CurrentSchemaRoles,
    },
}

impl Message for ValidationSchemaMessage {}

impl NotPersistentActor for ValidationSchema {}

#[async_trait]
impl Actor for ValidationSchema {
    type Event = ();
    type Message = ValidationSchemaMessage;
    type Response = ();

    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
        parent_span.map_or_else(
            || info_span!("ValidationSchema", id),
            |parent_span| info_span!(parent: parent_span, "ValidationSchema", id),
        )
    }
}

#[async_trait]
impl Handler<Self> for ValidationSchema {
    async fn handle_message(
        &mut self,
        _sender: ActorPath,
        msg: ValidationSchemaMessage,
        ctx: &mut ActorContext<Self>,
    ) -> Result<(), ActorError> {
        match msg {
            ValidationSchemaMessage::NetworkRequest {
                validation_req,
                info,
                sender,
            } => {
                let observe = |result: &'static str| {
                    if let Some(metrics) = try_core_metrics() {
                        metrics
                            .observe_schema_event("validation_schema", result);
                    }
                };
                if sender != validation_req.signature().signer {
                    observe("rejected");
                    warn!(
                        msg_type = "NetworkRequest",
                        sender = %sender,
                        signer = %validation_req.signature().signer,
                        "Signer and sender are not the same"
                    );
                    return Ok(());
                }

                let governance_id =
                    match validation_req.content().get_governance_id() {
                        Ok(governance_id) => governance_id,
                        Err(e) => {
                            observe("rejected");
                            warn!(
                                msg_type = "NetworkRequest",
                                error = %e,
                                "Failed to get governance_id"
                            );
                            return Ok(());
                        }
                    };

                if self.governance_id != governance_id {
                    observe("rejected");
                    warn!(
                        msg_type = "NetworkRequest",
                        expected_governance_id = %self.governance_id,
                        received_governance_id = %governance_id,
                        "Invalid governance_id"
                    );
                    return Ok(());
                }

                let schema_id = match validation_req.content().get_schema_id() {
                    Ok(schema_id) => schema_id,
                    Err(e) => {
                        observe("rejected");
                        warn!(
                            msg_type = "NetworkRequest",
                            error = %e,
                            "Failed to get schema_id"
                        );
                        return Ok(());
                    }
                };

                if self.schema_id != schema_id {
                    observe("rejected");
                    warn!(
                        msg_type = "NetworkRequest",
                        expected_schema_id = ?self.schema_id,
                        received_schema_id = ?schema_id,
                        "Invalid schema_id"
                    );
                    return Ok(());
                }

                if let Some(ns) = self.creators.get(&sender) {
                    let namespace =
                        match validation_req.content().get_namespace() {
                            Ok(namespace) => namespace,
                            Err(e) => {
                                observe("rejected");
                                warn!(
                                    msg_type = "NetworkRequest",
                                    error = %e,
                                    "Failed to get namespace"
                                );
                                return Ok(());
                            }
                        };
                    if !ns.contains(&namespace) {
                        observe("rejected");
                        warn!(
                            msg_type = "NetworkRequest",
                            sender = %sender,
                            namespace = ?namespace,
                            "Invalid sender namespace"
                        );
                        return Ok(());
                    }
                } else {
                    observe("rejected");
                    warn!(
                        msg_type = "NetworkRequest",
                        sender = %sender,
                        "Sender is not a creator"
                    );
                    return Ok(());
                }

                if self.gov_version < validation_req.content().get_gov_version()
                {
                    observe("rejected");
                    warn!(
                        msg_type = "NetworkRequest",
                        local_gov_version = self.gov_version,
                        request_gov_version = validation_req.content().get_gov_version(),
                        governance_id = %self.governance_id,
                        sender = %sender,
                        "Ignoring request with newer governance version; service nodes must update governance through resilience protocols"
                    );
                    return Ok(());
                }

                let child = ctx
                    .create_child(
                        &format!("{}", validation_req.signature().signer),
                        ValiWorker {
                            init_state: Some(self.init_state.clone()),
                            node_key: sender.clone(),
                            our_key: self.our_key.clone(),
                            governance_id: self.governance_id.clone(),
                            gov_version: self.gov_version,
                            sn: self.sn,
                            current_roles: CurrentWorkerRoles {
                                evaluation: crate::governance::role_register::RoleDataRegister {
                                    workers: self
                                        .current_roles
                                        .evaluation
                                        .iter()
                                        .filter(|role| role.namespace.is_ancestor_or_equal_of(&validation_req.content().get_namespace().unwrap_or_default()))
                                        .map(|role| role.key.clone())
                                        .collect(),
                                    quorum: self.current_roles.evaluation_quorum.clone(),
                                },
                                approval: crate::governance::role_register::RoleDataRegister {
                                    workers: std::collections::HashSet::new(),
                                    quorum: crate::governance::model::Quorum::default(),
                                },
                            },
                            hash: self.hash,
                            network: self.network.clone(),
                            stop: true,
                        },
                    )
                    .await;

                let validator_actor = match child {
                    Ok(child) => child,
                    Err(e) => {
                        if let ActorError::Exists { .. } = e {
                            observe("rejected");
                            warn!(
                                msg_type = "NetworkRequest",
                                error = %e,
                                "Validator actor already exists"
                            );
                            return Ok(());
                        } else {
                            error!(
                                msg_type = "NetworkRequest",
                                error = %e,
                                "Failed to create validator actor"
                            );
                            return Err(emit_fail(ctx, e).await);
                        }
                    }
                };

                if let Err(e) = validator_actor
                    .tell(ValiWorkerMessage::NetworkRequest {
                        validation_req,
                        info,
                        sender: sender.clone(),
                    })
                    .await
                {
                    warn!(
                        msg_type = "NetworkRequest",
                        error = %e,
                        "Failed to send request to validator"
                    );
                } else {
                    observe("delegated");
                    debug!(
                        msg_type = "NetworkRequest",
                        sender = %sender,
                        "Validation request delegated to worker"
                    );
                }
            }
            ValidationSchemaMessage::Update {
                creators,
                sn,
                gov_version,
                init_state,
                current_roles,
            } => {
                if let Some(metrics) = try_core_metrics() {
                    metrics.observe_schema_event("validation_schema", "update");
                }
                self.creators = creators;
                self.gov_version = gov_version;
                self.sn = sn;
                self.init_state = init_state;
                self.current_roles = current_roles;

                debug!(
                    msg_type = "Update",
                    sn = self.sn,
                    gov_version = self.gov_version,
                    "Schema updated successfully"
                );
            }
        };
        Ok(())
    }

    async fn on_child_fault(
        &mut self,
        error: ActorError,
        ctx: &mut ActorContext<Self>,
    ) -> ChildAction {
        if let Some(metrics) = try_core_metrics() {
            metrics.observe_schema_event("validation_schema", "child_fault");
        }
        error!(
            governance_id = %self.governance_id,
            schema_id = ?self.schema_id,
            gov_version = self.gov_version,
            error = %error,
            "Child fault in validation schema"
        );
        emit_fail(ctx, error).await;
        ChildAction::Stop
    }
}