subliminal 0.0.4

Base crate for subliminal microservices project
Documentation
pub mod errors;

use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use log::{debug, error, info};

use crate::database::TaskExecutionRecord;
use futures_util::StreamExt;
use google_cloud_pubsub::{
    client::{Client, ClientConfig},
    subscriber::SubscriberConfig,
    subscription::MessageStream,
};
use serde::{Deserialize, Serialize};

use utoipa::{IntoParams, ToSchema};

use errors::MessageQueueError;

#[derive(ToSchema, Debug, Serialize, Deserialize, IntoParams)]
pub struct UpdateExecutionRecordQuery {
    // Parent task request ID
    pub request_id: String,

    // Execution ID to update
    pub execution_id: String,

    // Seconds it took to complete the execution
    pub duration_seconds: u64,

    // Whether or not the execution was successful
    pub is_complete: bool,

    // Result of the execution
    pub result: String,

    // Status code of the execution
    pub status: i32,
}

/// Wrapper around the Google Cloud PubSub client to work with Subliminal.
pub struct MessageQueue {
    pub pubsub_client: Client,
    pub executions_topic_id: Option<String>,
    pub executions_stream: Option<MessageStream>,
    pub execution_updates_topic_id: Option<String>,
}

/// Builder for the message queue client.
pub struct MessageQueueBuilder {
    pub project_id: String,
    pub incoming_executions_subscription_id: Option<String>,
    pub execution_requests_topic_id: Option<String>,
    pub execution_updates_topic_id: Option<String>,
}

/// Representation of an enqueued task execution. This is what the consumers receive from the message queue.
#[derive(Debug, Serialize, Deserialize)]
pub struct TaskExecutionEnqueued {
    // Topic to send the task execution to
    pub topic: String,

    // Task execution record
    pub record: TaskExecutionRecord,

    // Serialized task data from the parent task request
    pub task_data: String,
}

impl Default for MessageQueueBuilder {
    fn default() -> Self {
        MessageQueueBuilder {
            project_id: String::new(),
            incoming_executions_subscription_id: None,
            execution_requests_topic_id: None,
            execution_updates_topic_id: None,
        }
    }
}

impl MessageQueueBuilder {
    // Create a new message queue builder
    pub fn new() -> Self {
        Default::default()
    }

    // Set Google Project ID
    pub fn with_project_id(mut self, project_id: &str) -> Self {
        self.project_id = project_id.to_string();
        self
    }

    // Set subscription ID to pull incoming execution requests from
    pub fn with_incoming_executions_subscription_id(
        mut self,
        incoming_executions_subscription_id: &str,
    ) -> Self {
        self.incoming_executions_subscription_id =
            Some(incoming_executions_subscription_id.to_string());
        self
    }

    // Set topic ID to push execution updates to
    pub fn with_execution_updates_topic_id(mut self, execution_updates_topic_id: &str) -> Self {
        self.execution_updates_topic_id = Some(execution_updates_topic_id.to_string());
        self
    }

    // Build the message queue client
    pub async fn build(self) -> Result<MessageQueue, MessageQueueError> {
        MessageQueue::new(
            &self.project_id,
            self.incoming_executions_subscription_id,
            self.execution_requests_topic_id,
            self.execution_updates_topic_id,
        )
        .await
    }
}

impl MessageQueue {
    /// Creates a new message queue client.
    pub async fn new(
        project_id: &str,
        incoming_executions_subscription_id: Option<String>,
        execution_requests_topic_id: Option<String>,
        execution_updates_topic_id: Option<String>,
    ) -> Result<MessageQueue, MessageQueueError> {
        let config = ClientConfig {
            project_id: Some(project_id.to_string()),
            ..Default::default()
        }
        .with_auth()
        .await
        .map_err(|e| {
            error!("Error creating PubSub client config: {}", e);
            MessageQueueError::ConfigConstructionError(e.to_string())
        })?;

        // Create the PubSub client from the configuration
        let client = Client::new(config).await.map_err(|e| {
            error!("Error creating PubSub client: {}", e);
            MessageQueueError::ClientConnectionError(e.to_string())
        })?;

        // We want a reusable stream of messages from the subscription, as opposed to creating a new one each time.
        let stream = match incoming_executions_subscription_id.clone() {
            Some(id) => Some(
                client
                    .subscription(id.as_str())
                    .subscribe(Some(SubscriberConfig {
                        ping_interval: std::time::Duration::from_secs(1),
                        ..Default::default()
                    }))
                    .await
                    .map_err(|e| {
                        error!("Error subscribing to task execution queue: {}", e);
                        MessageQueueError::SubscriptionConnectionError(e.to_string())
                    })?,
            ),
            None => None,
        };

        // Return the message queue client
        Ok(MessageQueue {
            pubsub_client: client,
            executions_stream: stream,
            executions_topic_id: execution_requests_topic_id,
            execution_updates_topic_id: execution_updates_topic_id,
        })
    }

    /// Pushes a task execution to the message queue on a particular topic.
    pub async fn push_task_execution(
        &self,
        task_execution: TaskExecutionEnqueued,
    ) -> Result<(), MessageQueueError> {
        // Need a valid topic ID from the queue request
        let topic = self.pubsub_client.topic(&task_execution.topic);
        if !topic.exists(None).await.unwrap() {
            error!("Topic {} does not exist", task_execution.topic);
            return Err(MessageQueueError::NonExistentTopic(
                task_execution.topic.to_string(),
            ));
        }

        // Start publisher on the topic
        let publisher = topic.new_publisher(None);

        // Construct the message to send to PubSub
        let mut msg = Vec::new();
        msg.push(PubsubMessage {
            data: serde_json::to_string(&task_execution).unwrap().into(),
            ..Default::default()
        });

        // Publish the message to PubSub immediately
        if let Err(e) = publisher.publish_immediately(msg, None).await {
            error!("Error publishing message: {}", e);
            return Err(MessageQueueError::FailedToPublish(e.to_string()));
        } else {
            info!("Published message to PubSub");
            Ok(())
        }
    }

    /// Pops a task execution from the message queue.
    pub async fn pop_task_execution(
        &mut self,
    ) -> Result<Option<TaskExecutionEnqueued>, MessageQueueError> {
        info!("Awaiting task execution from queue...");

        // If we don't have a subscription stream configured, return an error
        if let None = self.executions_stream {
            error!("No incoming executions stream found!");
            return Err(MessageQueueError::SubscriptionNotConfiguredError(
                String::from("No incoming executions stream found!"),
            ));
        }

        // Get the next message from the subscription stream
        let message = self.executions_stream.as_mut().unwrap().next().await;
        match message {
            Some(msg) => {
                debug!("Received message: {:?}", msg);
                let message_data = msg.message.data.clone();

                // Decode the message data
                let decoded_data = match String::from_utf8(message_data) {
                    Ok(data) => {
                        debug!("String data: {:#?}", data);
                        data
                    }
                    Err(e) => {
                        error!("Error decoding message data: {}", e);
                        let _ = msg.nack().await;
                        return Err(MessageQueueError::DecodeError(e.to_string()));
                    }
                };

                // Attempt to get a TaskExecutionEnqueued from the message data
                let task_execution_enqueued: TaskExecutionEnqueued =
                    match serde_json::from_str(&decoded_data) {
                        Ok(data) => data,
                        Err(e) => {
                            error!("Error deserializing message data: {}", e);
                            let _ = msg.nack().await;
                            return Err(MessageQueueError::DecodeError(e.to_string()));
                        }
                    };

                // Only ack the message if we were able to deserialize it properly
                info!("Received task execution: {:?}", task_execution_enqueued);
                msg.ack().await.unwrap();
                Ok(Some(task_execution_enqueued))
            }
            None => {
                info!("No messages found in task execution queue");
                Ok(None)
            }
        }
    }

    /// Pushes an update to a task execution record to the message queue.
    pub async fn push_task_execution_update(
        &self,
        update: UpdateExecutionRecordQuery,
    ) -> Result<(), MessageQueueError> {
        // Make sure we have a topic ID configured to push updates to
        if let None = self.execution_updates_topic_id {
            error!("No execution updates topic ID found!");
            return Err(MessageQueueError::SubscriptionNotConfiguredError(
                String::from("No execution updates topic ID found!"),
            ));
        } else {
            let topic_id = self.execution_updates_topic_id.clone().unwrap();
            // Create topic object, ensuring it exists
            let topic = self.pubsub_client.topic(&topic_id);
            if !topic.exists(None).await.unwrap() {
                error!("Topic {} does not exist", topic_id);
                return Err(MessageQueueError::NonExistentTopic(topic_id));
            }

            // Start publisher on the topic
            let publisher = topic.new_publisher(None);

            // Construct the message to send to PubSub
            let mut msg = Vec::new();
            msg.push(PubsubMessage {
                data: serde_json::to_string(&update).unwrap().into(),
                ..Default::default()
            });

            // Publish the message to PubSub immediately
            if let Err(e) = publisher.publish_immediately(msg, None).await {
                error!("Error publishing message: {}", e);
                return Err(MessageQueueError::FailedToPublish(e.to_string()));
            } else {
                Ok(())
            }
        }
    }
}