pub mod errors;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use log::{debug, error, info};
use crate::database::TaskExecutionRecord;
use futures_util::StreamExt;
use google_cloud_pubsub::{
client::{Client, ClientConfig},
subscriber::SubscriberConfig,
subscription::MessageStream,
};
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
use errors::MessageQueueError;
#[derive(ToSchema, Debug, Serialize, Deserialize, IntoParams)]
pub struct UpdateExecutionRecordQuery {
pub request_id: String,
pub execution_id: String,
pub duration_seconds: u64,
pub is_complete: bool,
pub result: String,
pub status: i32,
}
pub struct MessageQueue {
pub pubsub_client: Client,
pub executions_topic_id: Option<String>,
pub executions_stream: Option<MessageStream>,
pub execution_updates_topic_id: Option<String>,
}
pub struct MessageQueueBuilder {
pub project_id: String,
pub incoming_executions_subscription_id: Option<String>,
pub execution_requests_topic_id: Option<String>,
pub execution_updates_topic_id: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TaskExecutionEnqueued {
pub topic: String,
pub record: TaskExecutionRecord,
pub task_data: String,
}
impl Default for MessageQueueBuilder {
fn default() -> Self {
MessageQueueBuilder {
project_id: String::new(),
incoming_executions_subscription_id: None,
execution_requests_topic_id: None,
execution_updates_topic_id: None,
}
}
}
impl MessageQueueBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_project_id(mut self, project_id: &str) -> Self {
self.project_id = project_id.to_string();
self
}
pub fn with_incoming_executions_subscription_id(
mut self,
incoming_executions_subscription_id: &str,
) -> Self {
self.incoming_executions_subscription_id =
Some(incoming_executions_subscription_id.to_string());
self
}
pub fn with_execution_updates_topic_id(mut self, execution_updates_topic_id: &str) -> Self {
self.execution_updates_topic_id = Some(execution_updates_topic_id.to_string());
self
}
pub async fn build(self) -> Result<MessageQueue, MessageQueueError> {
MessageQueue::new(
&self.project_id,
self.incoming_executions_subscription_id,
self.execution_requests_topic_id,
self.execution_updates_topic_id,
)
.await
}
}
impl MessageQueue {
pub async fn new(
project_id: &str,
incoming_executions_subscription_id: Option<String>,
execution_requests_topic_id: Option<String>,
execution_updates_topic_id: Option<String>,
) -> Result<MessageQueue, MessageQueueError> {
let config = ClientConfig {
project_id: Some(project_id.to_string()),
..Default::default()
}
.with_auth()
.await
.map_err(|e| {
error!("Error creating PubSub client config: {}", e);
MessageQueueError::ConfigConstructionError(e.to_string())
})?;
let client = Client::new(config).await.map_err(|e| {
error!("Error creating PubSub client: {}", e);
MessageQueueError::ClientConnectionError(e.to_string())
})?;
let stream = match incoming_executions_subscription_id.clone() {
Some(id) => Some(
client
.subscription(id.as_str())
.subscribe(Some(SubscriberConfig {
ping_interval: std::time::Duration::from_secs(1),
..Default::default()
}))
.await
.map_err(|e| {
error!("Error subscribing to task execution queue: {}", e);
MessageQueueError::SubscriptionConnectionError(e.to_string())
})?,
),
None => None,
};
Ok(MessageQueue {
pubsub_client: client,
executions_stream: stream,
executions_topic_id: execution_requests_topic_id,
execution_updates_topic_id: execution_updates_topic_id,
})
}
pub async fn push_task_execution(
&self,
task_execution: TaskExecutionEnqueued,
) -> Result<(), MessageQueueError> {
let topic = self.pubsub_client.topic(&task_execution.topic);
if !topic.exists(None).await.unwrap() {
error!("Topic {} does not exist", task_execution.topic);
return Err(MessageQueueError::NonExistentTopic(
task_execution.topic.to_string(),
));
}
let publisher = topic.new_publisher(None);
let mut msg = Vec::new();
msg.push(PubsubMessage {
data: serde_json::to_string(&task_execution).unwrap().into(),
..Default::default()
});
if let Err(e) = publisher.publish_immediately(msg, None).await {
error!("Error publishing message: {}", e);
return Err(MessageQueueError::FailedToPublish(e.to_string()));
} else {
info!("Published message to PubSub");
Ok(())
}
}
pub async fn pop_task_execution(
&mut self,
) -> Result<Option<TaskExecutionEnqueued>, MessageQueueError> {
info!("Awaiting task execution from queue...");
if let None = self.executions_stream {
error!("No incoming executions stream found!");
return Err(MessageQueueError::SubscriptionNotConfiguredError(
String::from("No incoming executions stream found!"),
));
}
let message = self.executions_stream.as_mut().unwrap().next().await;
match message {
Some(msg) => {
debug!("Received message: {:?}", msg);
let message_data = msg.message.data.clone();
let decoded_data = match String::from_utf8(message_data) {
Ok(data) => {
debug!("String data: {:#?}", data);
data
}
Err(e) => {
error!("Error decoding message data: {}", e);
let _ = msg.nack().await;
return Err(MessageQueueError::DecodeError(e.to_string()));
}
};
let task_execution_enqueued: TaskExecutionEnqueued =
match serde_json::from_str(&decoded_data) {
Ok(data) => data,
Err(e) => {
error!("Error deserializing message data: {}", e);
let _ = msg.nack().await;
return Err(MessageQueueError::DecodeError(e.to_string()));
}
};
info!("Received task execution: {:?}", task_execution_enqueued);
msg.ack().await.unwrap();
Ok(Some(task_execution_enqueued))
}
None => {
info!("No messages found in task execution queue");
Ok(None)
}
}
}
pub async fn push_task_execution_update(
&self,
update: UpdateExecutionRecordQuery,
) -> Result<(), MessageQueueError> {
if let None = self.execution_updates_topic_id {
error!("No execution updates topic ID found!");
return Err(MessageQueueError::SubscriptionNotConfiguredError(
String::from("No execution updates topic ID found!"),
));
} else {
let topic_id = self.execution_updates_topic_id.clone().unwrap();
let topic = self.pubsub_client.topic(&topic_id);
if !topic.exists(None).await.unwrap() {
error!("Topic {} does not exist", topic_id);
return Err(MessageQueueError::NonExistentTopic(topic_id));
}
let publisher = topic.new_publisher(None);
let mut msg = Vec::new();
msg.push(PubsubMessage {
data: serde_json::to_string(&update).unwrap().into(),
..Default::default()
});
if let Err(e) = publisher.publish_immediately(msg, None).await {
error!("Error publishing message: {}", e);
return Err(MessageQueueError::FailedToPublish(e.to_string()));
} else {
Ok(())
}
}
}
}