ave-core 0.8.0

Averiun Ledger core runtime and node API
Documentation
use std::sync::Arc;

use async_trait::async_trait;
use ave_actors::{
    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
    NotPersistentActor,
};
use ave_common::{
    Namespace,
    identity::{DigestIdentifier, PublicKey},
};
use serde::{Deserialize, Serialize};
use tracing::{Span, debug, error, info_span, warn};

use crate::{
    distribution::{Distribution, DistributionMessage, DistributionType},
    governance::model::WitnessesData,
    helpers::network::service::NetworkSender,
    model::common::{
        emit_fail,
        node::i_can_send_last_ledger,
        subject::{get_gov, get_last_ledger_event},
    },
};

pub struct ManualDistribution {
    our_key: Arc<PublicKey>,
}

impl ManualDistribution {
    pub const fn new(our_key: Arc<PublicKey>) -> Self {
        Self { our_key }
    }
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ManualDistributionMessage {
    Update(DigestIdentifier),
}

impl Message for ManualDistributionMessage {}

impl NotPersistentActor for ManualDistribution {}

#[async_trait]
impl Actor for ManualDistribution {
    type Message = ManualDistributionMessage;
    type Event = ();
    type Response = ();

    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
        parent_span.map_or_else(
            || info_span!("ManualDistribution"),
            |parent_span| info_span!(parent: parent_span, "ManualDistribution"),
        )
    }
}

#[async_trait]
impl Handler<Self> for ManualDistribution {
    async fn handle_message(
        &mut self,
        _sender: ActorPath,
        msg: ManualDistributionMessage,
        ctx: &mut ave_actors::ActorContext<Self>,
    ) -> Result<(), ActorError> {
        match msg {
            ManualDistributionMessage::Update(subject_id) => {
                let data = i_can_send_last_ledger(ctx, &subject_id)
                    .await
                    .map_err(|e| {
                        error!(
                            msg_type = "Update",
                            subject_id = %subject_id,
                            error = %e,
                            "Failed to check if we can send last ledger"
                        );
                        e
                    })?;

                let Some(data) = data else {
                    warn!(
                        msg_type = "Update",
                        subject_id = %subject_id,
                        "Not the owner of the subject nor rejected transfer"
                    );
                    return Err(ActorError::Functional {
                        description: "Not the owner of the subject, nor have I refused the transfer".to_owned(),
                    });
                };

                let ledger = get_last_ledger_event(ctx, &subject_id)
                    .await
                    .map_err(|e| {
                        error!(
                            msg_type = "Update",
                            subject_id = %subject_id,
                            error = %e,
                            "Failed to get last ledger event"
                        );
                        e
                    })?;

                let Some(ledger) = ledger else {
                    error!(
                        msg_type = "Update",
                        subject_id = %subject_id,
                        "No ledger event found for subject"
                    );
                    return Err(ActorError::Functional {
                        description: "Cannot obtain last ledger event"
                            .to_string(),
                    });
                };

                let governance_id =
                    data.get_governance_id().as_ref().map_or_else(
                        || subject_id.clone(),
                        |governance_id| governance_id.clone(),
                    );

                let gov = get_gov(ctx, &governance_id).await.map_err(|e| {
                    error!(
                        msg_type = "Update",
                        subject_id = %subject_id,
                        governance_id = %governance_id,
                        error = %e,
                        "Failed to get governance"
                    );
                    e
                })?;

                let schema_id = data.get_schema_id();

                let is_gov = schema_id.is_gov();
                let witnesses_data = if is_gov {
                    WitnessesData::Gov
                } else {
                    WitnessesData::Schema {
                        creator: (*self.our_key).clone(),
                        schema_id: schema_id.clone(),
                        namespace: Namespace::from(data.get_namespace()),
                    }
                };

                let mut witnesses =
                    gov.get_witnesses(witnesses_data).map_err(|e| {
                        error!(
                            msg_type = "Update",
                            subject_id = %subject_id,
                            is_gov = is_gov,
                            error = %e,
                            "Failed to get witnesses from governance"
                        );
                        ActorError::Functional {
                            description: e.to_string(),
                        }
                    })?;

                witnesses.remove(&*self.our_key);
                if witnesses.is_empty() {
                    warn!(
                        msg_type = "Update",
                        subject_id = %subject_id,
                        "No witnesses available for manual distribution"
                    );
                    return Err(ActorError::Functional {
                        description: "No witnesses available to manually send the last ledger event".to_string()
                    });
                }

                let witnesses_count = witnesses.len();

                let Some(network) = ctx
                    .system()
                    .get_helper::<Arc<NetworkSender>>("network")
                    .await
                else {
                    error!(
                        msg_type = "Update",
                        subject_id = %subject_id,
                        "Network helper not found"
                    );
                    return Err(ActorError::Helper {
                        name: "network".to_owned(),
                        reason: "Not found".to_owned(),
                    });
                };

                let distribution = Distribution::new(
                    network,
                    DistributionType::Manual,
                    DigestIdentifier::default(),
                );

                let distribution_actor = ctx.create_child(&subject_id.to_string(), distribution).await.map_err(|e| {
                    warn!(
                        msg_type = "Update",
                        subject_id = %subject_id,
                        error = %e,
                        "Manual distribution already in progress"
                    );
                    ActorError::Functional {
                        description: "Manual distribution already in progress for this subject".to_owned()
                    }
                })?;

                if let Err(e) = distribution_actor
                    .tell(DistributionMessage::Create {
                        witnesses: witnesses.clone(),
                        ledger: Box::new(ledger),
                    })
                    .await
                {
                    error!(
                        msg_type = "Update",
                        subject_id = %subject_id,
                        witnesses_count = witnesses_count,
                        error = %e,
                        "Failed to start manual distribution"
                    );
                    return Err(ActorError::Functional {
                        description: format!(
                            "Failed to start manual distribution: {}",
                            e
                        ),
                    });
                };

                debug!(
                    msg_type = "Update",
                    subject_id = %subject_id,
                    witnesses_count = witnesses_count,
                    is_gov = is_gov,
                    "Manual distribution started successfully"
                );

                Ok(())
            }
        }
    }

    async fn on_child_fault(
        &mut self,
        error: ActorError,
        ctx: &mut ActorContext<Self>,
    ) -> ChildAction {
        error!(
            error = %error,
            "Child actor fault in manual distribution"
        );
        emit_fail(ctx, error).await;
        ChildAction::Stop
    }
}