subliminal 0.0.4

Base crate for subliminal microservices project
Documentation
use std::{fmt::Debug, future::Future, panic, pin::Pin};

use log::{error, info};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};

use crate::message_queue::{MessageQueue, MessageQueueBuilder};

use super::consumer::consume;
use super::dispatcher::TaskDispatcher;
use crate::task::{Task, TaskExecutionData};

struct ExecutionNode;

// Responsible for maintaining the consumers and the task dispatcher
impl ExecutionNode {
    /// Creates a new execution node
    pub async fn spawn(
        task_consumers: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
        n_workers: u32,
        message_queue: MessageQueue,
        task_request_rx: Receiver<TaskExecutionData>,
    ) {
        // Spin up the consumers in individual threads
        info!("Starting execution node...");
        for consumer_closure in task_consumers {
            info!("Spawning consumer...");
            tokio::spawn(consumer_closure);
        }

        // Spin up the dispatcher
        info!("Spawning dispatcher...");
        TaskDispatcher::start(n_workers, task_request_rx, message_queue).await;
    }
}

pub struct ExecutionNodeBuilder {
    // Pull tasks from the message queue and place them into the task channel
    task_consumers: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,

    // This is the channel that the consumers will use to place tasks into the dispatcher
    task_request_tx: Sender<TaskExecutionData>,

    // This is the channel that the dispatcher will use to receive tasks from the consumers
    task_request_rx: Receiver<TaskExecutionData>,

    // This is the number of worker threads that will be spawned
    n_workers: u32,

    // Google Cloud Project ID used to contruct the message queues for the consumers and the dispatcher
    gcp_project_id: String,

    // Google Cloud PubSub topic ID used to contruct the message queue for the dispatcher to push updates to
    execution_updates_topic_id: String,
}

impl ExecutionNodeBuilder {
    /// Creates a new ExecutionNodeBuilder
    pub async fn new(n_workers: u32, project_id: &str, execution_updates_topic_id: &str) -> Self {
        // Set a panic hook so panics in threads exit the main thread as well
        let orig_hook = panic::take_hook();
        panic::set_hook(Box::new(move |panic_info| {
            orig_hook(panic_info);
            std::process::exit(1);
        }));

        // Create the task transfer channel
        let (task_request_tx, task_request_rx) =
            tokio::sync::mpsc::channel::<TaskExecutionData>(n_workers as usize * 2);

        // Return the default builder
        Self {
            task_consumers: Vec::new(),
            n_workers,
            task_request_tx,
            task_request_rx,
            gcp_project_id: project_id.to_string(),
            execution_updates_topic_id: execution_updates_topic_id.to_string(),
        }
    }

    /// Adds a consumer to the ExecutionNodeBuilder
    pub async fn with_consumer<T>(mut self, subscription_id: &str) -> Self
    where
        T: DeserializeOwned + Serialize + Send + Sync + Debug + Clone + Task + 'static,
    {
        // Every consumer gets its own message queue client solely for subscribing to messages
        // from the designated channel (No ability to push updates)
        let consumer_message_queue = match MessageQueueBuilder::new()
            .with_project_id(&self.gcp_project_id)
            .with_incoming_executions_subscription_id(subscription_id)
            .build()
            .await
        {
            Ok(queue) => queue,
            Err(e) => {
                error!("Error creating consumer message queue: {:?}", e);
                std::process::exit(1);
            }
        };

        self.task_consumers.push(Box::pin(consume::<T>(
            consumer_message_queue,
            self.task_request_tx.clone(),
        )));

        self
    }

    /// Builds the ExecutionNode
    pub async fn build(self) {
        // Pull out the fields from the builder
        let Self {
            task_consumers,
            n_workers,
            task_request_rx,
            ..
        } = self;

        // The dispatcher itself gets a MessageQueue that is used solely for pushing task execution updates
        let dispatcher_message_queue = match MessageQueueBuilder::new()
            .with_project_id(&self.gcp_project_id)
            .with_execution_updates_topic_id(&self.execution_updates_topic_id)
            .build()
            .await
        {
            Ok(queue) => queue,
            Err(e) => {
                error!("Error creating message queue for dispatch: {:?}", e);
                std::process::exit(1);
            }
        };

        // Spawn the execution node with the fields from the builder
        ExecutionNode::spawn(
            task_consumers,
            n_workers,
            dispatcher_message_queue,
            task_request_rx,
        )
        .await;
    }
}