use crate::error::{EventError, FeatureQueueError};
use crate::producer::RustScouterProducer;
use crate::queue::bus::TaskState;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use crossbeam_queue::ArrayQueue;
use scouter_state::app_state;
use scouter_types::MessageRecord;
use scouter_types::QueueExt;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, Instrument};
pub trait FeatureQueue: Send + Sync {
fn create_drift_records_from_batch<T: QueueExt>(
&self,
batch: Vec<T>,
) -> Result<MessageRecord, FeatureQueueError>;
}
pub trait BackgroundTask: Send + Sync + 'static {
type DataItem: QueueExt + Send + Sync + 'static;
type Processor: FeatureQueue + Send + Sync + 'static;
#[allow(clippy::too_many_arguments)]
fn start_background_task(
&self,
data_queue: Arc<ArrayQueue<Self::DataItem>>,
processor: Arc<Self::Processor>,
producer: RustScouterProducer,
last_publish: Arc<RwLock<DateTime<Utc>>>,
queue_capacity: usize,
identifier: String,
task_state: TaskState,
cancellation_token: CancellationToken,
) -> Result<JoinHandle<()>, EventError> {
let span = info_span!("background_task", task = %identifier);
let future = async move {
debug!("Starting background task for {}", identifier);
task_state.set_background_running(true);
task_state.notify_background_started();
debug!("Background task set to running");
sleep(Duration::from_millis(10)).await;
loop {
tokio::select! {
_ = sleep(Duration::from_secs(2)) => {
debug!("Waking up background task");
let now = Utc::now();
let should_process = {
if let Ok(last) = last_publish.read() {
(now - *last).num_seconds() >= 30
} else {
false
}
};
if should_process {
let mut batch = Vec::with_capacity(queue_capacity);
while let Some(item) = data_queue.pop() {
batch.push(item);
}
if let Ok(mut guard) = last_publish.write() {
*guard = now;
}
if !batch.is_empty() {
match processor.create_drift_records_from_batch(batch) {
Ok(records) => {
if let Err(e) = producer.publish(records).await {
error!("Failed to publish records: {}", e);
} else {
info!("Successfully published records");
}
}
Err(e) => error!("Failed to create drift records: {}", e),
}
}
}
}
_ = cancellation_token.cancelled() => {
info!("Stop signal received, shutting down background task");
task_state.set_background_running(false);
break;
}
else => {
info!("Stop signal received, shutting down background task");
task_state.set_background_running(false);
break;
}
}
}
debug!("Background task finished");
};
let handle = app_state()
.handle()
.spawn(async move { future.instrument(span).await });
Ok(handle)
}
}
#[async_trait]
pub trait QueueMethods {
type ItemType: QueueExt + 'static + Clone + Debug;
type FeatureQueue: FeatureQueue + 'static;
fn capacity(&self) -> usize;
fn get_producer(&mut self) -> &mut RustScouterProducer;
fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>>;
fn feature_queue(&self) -> Arc<Self::FeatureQueue>;
fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>>;
fn should_process(&self, current_count: usize) -> bool;
fn update_last_publish(&mut self) -> Result<(), EventError> {
if let Ok(mut last_publish) = self.last_publish().write() {
*last_publish = Utc::now();
}
Ok(())
}
async fn publish(&mut self, records: MessageRecord) -> Result<(), EventError> {
let producer = self.get_producer();
producer.publish(records).await
}
async fn insert(&mut self, item: Self::ItemType) -> Result<(), EventError> {
self.insert_with_backpressure(item).await?;
let queue = self.queue();
if queue.len() >= self.capacity() {
debug!(
"Queue reached capacity, processing queue, current count: {}, current_capacity: {}",
queue.len(),
self.capacity()
);
self.try_publish(queue.clone()).await?;
}
Ok(())
}
async fn try_publish(
&mut self,
queue: Arc<ArrayQueue<Self::ItemType>>,
) -> Result<(), EventError> {
let mut batch = Vec::with_capacity(queue.capacity());
while let Some(metrics) = queue.pop() {
batch.push(metrics);
}
if !batch.is_empty() {
let feature_queue = self.feature_queue();
match feature_queue.create_drift_records_from_batch(batch) {
Ok(records) => {
self.publish(records).await?;
self.update_last_publish()?;
}
Err(e) => error!("Failed to create drift records: {}", e),
}
}
Ok(())
}
async fn flush(&mut self) -> Result<(), EventError>;
async fn insert_with_backpressure(&mut self, item: Self::ItemType) -> Result<(), EventError> {
let queue = self.queue();
let max_retries = 3;
let mut current_retry: u32 = 0;
while current_retry < max_retries {
match queue.push(item.clone()) {
Ok(_) => return Ok(()),
Err(_) => {
current_retry += 1;
if current_retry == max_retries {
return Err(EventError::QueuePushError);
}
sleep(Duration::from_millis(100 * 2_u64.pow(current_retry))).await;
}
}
}
Err(EventError::QueuePushRetryError)
}
}
pub async fn wait_for_background_task(task_state: &TaskState) -> Result<(), EventError> {
if !task_state.has_background_handle() {
debug!("No background handle to wait for {}", task_state.id);
return Ok(());
}
let notify = task_state
.background_task
.read()
.unwrap()
.startup_notify
.clone();
match timeout(Duration::from_secs(10), notify.notified()).await {
Ok(_) => {
if task_state.is_background_running() {
debug!("Background task started successfully for {}", task_state.id);
Ok(())
} else {
error!(
"Background task notification received but task not running for {}",
task_state.id
);
Err(EventError::BackgroundTaskFailedToStartError)
}
}
Err(_) => {
error!(
"Background task failed to start within timeout for {}",
task_state.id
);
Err(EventError::BackgroundTaskFailedToStartError)
}
}
}
pub async fn wait_for_event_task(task_state: &TaskState) -> Result<(), EventError> {
let notify = task_state.event_task.read().unwrap().startup_notify.clone();
match timeout(Duration::from_secs(10), notify.notified()).await {
Ok(_) => {
if task_state.is_event_running() {
debug!("Event task started successfully");
Ok(())
} else {
error!("Event task notification received but task not running");
Err(EventError::EventTaskFailedToStartError)
}
}
Err(_) => {
error!("Event task failed to start within timeout");
Err(EventError::EventTaskFailedToStartError)
}
}
}