taple-core 0.3.3

TAPLE Protocol reference implementation
Documentation
use borsh::{BorshDeserialize, BorshSerialize};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;

#[cfg(feature = "approval")]
use crate::approval::ApprovalResponses;
use crate::evaluator::EvaluatorMessage;
#[cfg(feature = "evaluation")]
use crate::evaluator::EvaluatorResponse;
use crate::validation::ValidationCommand;
#[cfg(feature = "validation")]
use crate::validation::ValidationResponse;
use crate::{approval::ApprovalMessages, Notification};
use crate::{
    commons::channel::{ChannelData, MpscChannel, SenderEnd},
    distribution::{error::DistributionErrorResponses, DistributionMessagesNew},
    event::{EventCommand, EventResponse},
    ledger::{LedgerCommand, LedgerResponse},
    message::{MessageContent, TaskCommandContent},
    signature::Signed,
};

mod error;
use error::ProtocolErrors;

#[derive(Debug, Clone, Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
pub enum TapleMessages {
    DistributionMessage(DistributionMessagesNew),
    EvaluationMessage(EvaluatorMessage),
    ValidationMessage(ValidationCommand),
    EventMessage(EventCommand),
    ApprovalMessages(ApprovalMessages),
    LedgerMessages(LedgerCommand),
}

impl TaskCommandContent for TapleMessages {}

pub struct ProtocolManager {
    input: MpscChannel<Signed<MessageContent<TapleMessages>>, ()>,
    distribution_sx: SenderEnd<DistributionMessagesNew, Result<(), DistributionErrorResponses>>,
    #[cfg(feature = "evaluation")]
    evaluation_sx: SenderEnd<EvaluatorMessage, EvaluatorResponse>,
    #[cfg(feature = "validation")]
    validation_sx: SenderEnd<ValidationCommand, ValidationResponse>,
    event_sx: SenderEnd<EventCommand, EventResponse>,
    #[cfg(feature = "approval")]
    approval_sx: SenderEnd<ApprovalMessages, ApprovalResponses>,
    ledger_sx: SenderEnd<LedgerCommand, LedgerResponse>,
    token: CancellationToken,
    notification_tx: tokio::sync::mpsc::Sender<Notification>,
}

impl ProtocolManager {
    pub fn new(
        input: MpscChannel<Signed<MessageContent<TapleMessages>>, ()>,
        distribution_sx: SenderEnd<DistributionMessagesNew, Result<(), DistributionErrorResponses>>,
        #[cfg(feature = "evaluation")] evaluation_sx: SenderEnd<
            EvaluatorMessage,
            EvaluatorResponse,
        >,
        #[cfg(feature = "validation")] validation_sx: SenderEnd<
            ValidationCommand,
            ValidationResponse,
        >,
        event_sx: SenderEnd<EventCommand, EventResponse>,
        #[cfg(feature = "approval")] approval_sx: SenderEnd<ApprovalMessages, ApprovalResponses>,
        ledger_sx: SenderEnd<LedgerCommand, LedgerResponse>,
        token: CancellationToken,
        notification_tx: tokio::sync::mpsc::Sender<Notification>,
    ) -> Self {
        Self {
            input,
            distribution_sx,
            #[cfg(feature = "evaluation")]
            evaluation_sx,
            #[cfg(feature = "validation")]
            validation_sx,
            event_sx,
            #[cfg(feature = "approval")]
            approval_sx,
            ledger_sx,
            token,
            notification_tx,
        }
    }

    pub async fn run(mut self) {
        loop {
            tokio::select! {
                command = self.input.receive() => {
                    match command {
                        Some(command) => {
                            let result = self.process_command(command).await;
                            if result.is_err() {
                                log::error!("Protocol Manager: {}", result.unwrap_err());
                                break;
                            }
                        }
                        None => {
                            break;
                        },
                    }
                },
                _ = self.token.cancelled() => {
                    log::debug!("Shutdown received");
                    break;
                }
            }
        }
        self.token.cancel();
        log::info!("Ended");
    }

    #[allow(unused_variables)]
    async fn process_command(
        &self,
        command: ChannelData<Signed<MessageContent<TapleMessages>>, ()>,
    ) -> Result<(), ProtocolErrors> {
        let message = match command {
            ChannelData::AskData(_data) => {
                return Err(ProtocolErrors::AskCommandDetected);
            }
            ChannelData::TellData(data) => {
                let data = data.get();
                data
            }
        };
        let msg = message.content.content;
        let sender = message.content.sender_id;
        match msg {
            TapleMessages::DistributionMessage(data) => {
                self.distribution_sx
                    .tell(data)
                    .await
                    .map_err(|_| ProtocolErrors::ChannelClosed)?;
            }
            TapleMessages::EventMessage(data) => self
                .event_sx
                .tell(data)
                .await
                .map_err(|_| ProtocolErrors::ChannelClosed)?,
            TapleMessages::EvaluationMessage(data) => {
                #[cfg(feature = "evaluation")]
                {
                    let evaluation_command = match data {
                        EvaluatorMessage::EvaluationEvent { .. } => {
                            log::error!("Evaluation Event Received in protocol manager");
                            return Ok(());
                        }
                        EvaluatorMessage::AskForEvaluation(evaluation_request) => {
                            EvaluatorMessage::EvaluationEvent {
                                evaluation_request,
                                sender,
                            }
                        }
                    };
                    return Ok(self
                        .evaluation_sx
                        .tell(evaluation_command)
                        .await
                        .map_err(|_| ProtocolErrors::ChannelClosed)?);
                }
                #[cfg(not(feature = "evaluation"))]
                log::trace!("Evaluation Message received. Current node is not able to evaluate");
            }
            TapleMessages::ValidationMessage(data) => {
                #[cfg(feature = "validation")]
                {
                    let validation_command = match data {
                        ValidationCommand::ValidationEvent { .. } => {
                            log::error!("Validation Event Received in protocol manager");
                            return Ok(());
                        }
                        ValidationCommand::AskForValidation(validation_event) => {
                            ValidationCommand::ValidationEvent {
                                validation_event,
                                sender,
                            }
                        }
                    };
                    return Ok(self
                        .validation_sx
                        .tell(validation_command)
                        .await
                        .map_err(|_| ProtocolErrors::ChannelClosed)?);
                }
                #[cfg(not(feature = "validation"))]
                log::trace!("Validation Message received. Current node is not able to validate");
            }
            TapleMessages::ApprovalMessages(data) => {
                #[cfg(feature = "approval")]
                {
                    let approval_command = match data {
                        ApprovalMessages::RequestApproval(approval) => {
                            ApprovalMessages::RequestApprovalWithSender { approval, sender }
                        }
                        ApprovalMessages::RequestApprovalWithSender { .. } => {
                            log::error!(
                                "Request Approval with Sender Received in protocol manager"
                            );
                            return Ok(());
                        }
                        _ => data,
                    };
                    return Ok(self
                        .approval_sx
                        .tell(approval_command)
                        .await
                        .map_err(|_| ProtocolErrors::ChannelClosed)?);
                }
                #[cfg(not(feature = "approval"))]
                log::trace!("Approval Message received. Current node is not able to aprove");
            }
            TapleMessages::LedgerMessages(data) => self
                .ledger_sx
                .tell(data)
                .await
                .map_err(|_| ProtocolErrors::ChannelClosed)?,
        }
        Ok(())
    }
}