use futures::{stream, StreamExt};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::sleep;
use crate::types::{TitoEngine, TitoEvent, TitoTransaction};
use crate::TitoError;
#[derive(Clone)]
pub struct TitoQueue<E: TitoEngine> {
pub engine: E,
}
impl<E: TitoEngine> TitoQueue<E> {
pub async fn pull_partition_range(
&self,
event_type: String,
start_partition: u32,
end_partition: u32,
limit: u32,
_reverse: bool, ) -> Result<Vec<TitoEvent>, TitoError> {
use crate::types::{PARTITION_DIGITS, SEQUENCE_DIGITS, QueueCheckpoint, QueueProgress};
use chrono::Utc;
self.engine
.transaction(|tx| async move {
let mut jobs = Vec::new();
let mut jobs_collected = 0u32;
for partition in start_partition..end_partition {
if jobs_collected >= limit {
break;
}
let checkpoint_key = format!("queue_checkpoint:{}:{:0width$}", event_type, partition, width = PARTITION_DIGITS);
let last_sequence = match tx.get(checkpoint_key.as_bytes()).await
.map_err(|e| TitoError::QueryFailed(format!("Checkpoint read failed: {}", e)))? {
Some(bytes) => {
serde_json::from_slice::<QueueCheckpoint>(&bytes)
.map(|ckpt| ckpt.last_sequence)
.unwrap_or(0)
}
None => 0,
};
let event_start_key = format!(
"event:{}:{:0pwidth$}:{:0swidth$}",
event_type,
partition,
last_sequence + 1,
pwidth = PARTITION_DIGITS,
swidth = SEQUENCE_DIGITS
);
let event_end_key = format!(
"event:{}:{:0pwidth$}:",
event_type,
partition + 1,
pwidth = PARTITION_DIGITS
);
let events = tx.scan(
event_start_key.as_bytes()..event_end_key.as_bytes(),
limit - jobs_collected
)
.await
.map_err(|e| TitoError::QueryFailed(format!("Event scan failed: {}", e)))?;
for (event_key, event_bytes) in events {
if let Ok(event) = serde_json::from_slice::<TitoEvent>(&event_bytes) {
let event_key_str = String::from_utf8_lossy(&event_key);
let parts: Vec<&str> = event_key_str.split(':').collect();
if parts.len() < 4 {
continue;
}
let partition_str = parts[2];
let sequence = parts[3];
let completed_key = format!("queue:{}:COMPLETED:{}:{}", event_type, partition_str, sequence);
let progress_key = format!("queue:{}:PROGRESS:{}:{}", event_type, partition_str, sequence);
let is_completed = tx.get(completed_key.as_bytes()).await
.map_err(|e| TitoError::QueryFailed(format!("Completed check failed: {}", e)))?
.is_some();
let is_in_progress = tx.get(progress_key.as_bytes()).await
.map_err(|e| TitoError::QueryFailed(format!("Progress check failed: {}", e)))?
.is_some();
if !is_completed && !is_in_progress {
let progress = QueueProgress {
retries: event.retries,
updated_at: Utc::now().timestamp(),
};
tx.put(progress_key.as_bytes(), serde_json::to_vec(&progress).unwrap())
.await
.map_err(|e| TitoError::UpdateFailed(format!("Progress put failed: {}", e)))?;
jobs.push(event);
jobs_collected += 1;
if jobs_collected >= limit {
break;
}
}
}
}
}
Ok::<_, TitoError>(jobs)
})
.await
}
pub async fn success_job(&self, job_id: String) -> Result<(), TitoError> {
use crate::types::{QueueCompleted, QueueCheckpoint};
use chrono::Utc;
self.engine
.transaction(|tx| async move {
let parts: Vec<&str> = job_id.split(':').collect();
if parts.len() < 4 {
return Err(TitoError::InvalidInput("Invalid event key format".to_string()));
}
let event_type = parts[1];
let partition_str = parts[2];
let sequence_str = parts[3];
let sequence: i64 = sequence_str.parse()
.map_err(|_| TitoError::InvalidInput("Invalid sequence".to_string()))?;
let progress_key = format!("queue:{}:PROGRESS:{}:{}", event_type, partition_str, sequence_str);
tx.delete(progress_key.as_bytes())
.await
.map_err(|e| TitoError::DeleteFailed(format!("Delete progress failed: {}", e)))?;
let completed_key = format!("queue:{}:COMPLETED:{}:{}", event_type, partition_str, sequence_str);
let completed = QueueCompleted {
updated_at: Utc::now().timestamp(),
};
tx.put(completed_key.as_bytes(), serde_json::to_vec(&completed).unwrap())
.await
.map_err(|e| TitoError::UpdateFailed(format!("Put completed failed: {}", e)))?;
let checkpoint_key = format!("queue_checkpoint:{}:{}", event_type, partition_str);
let current_checkpoint = match tx.get(checkpoint_key.as_bytes()).await
.map_err(|e| TitoError::QueryFailed(format!("Checkpoint read failed: {}", e)))? {
Some(bytes) => {
serde_json::from_slice::<QueueCheckpoint>(&bytes)
.map(|ckpt| ckpt.last_sequence)
.unwrap_or(0)
}
None => 0,
};
if sequence > current_checkpoint {
let new_checkpoint = QueueCheckpoint {
last_sequence: sequence,
};
tx.put(checkpoint_key.as_bytes(), serde_json::to_vec(&new_checkpoint).unwrap())
.await
.map_err(|e| TitoError::UpdateFailed(format!("Checkpoint update failed: {}", e)))?;
}
Ok::<_, TitoError>(())
})
.await?;
Ok(())
}
pub async fn fail_job(&self, job_id: String) -> Result<(), TitoError> {
use crate::types::{QueueProgress, QueueFailed};
use chrono::Utc;
self.engine
.transaction(|tx| async move {
let parts: Vec<&str> = job_id.split(':').collect();
if parts.len() < 4 {
return Err(TitoError::InvalidInput("Invalid event key format".to_string()));
}
let event_type = parts[1];
let partition_str = parts[2];
let sequence_str = parts[3];
let progress_key = format!("queue:{}:PROGRESS:{}:{}", event_type, partition_str, sequence_str);
let progress_data = tx.get(progress_key.as_bytes())
.await
.map_err(|e| TitoError::QueryFailed(format!("Get progress failed: {}", e)))?;
if let Some(progress_bytes) = progress_data {
let mut progress: QueueProgress = serde_json::from_slice(&progress_bytes)
.map_err(|_| TitoError::DeserializationFailed(String::from("Failed to deserialize progress")))?;
const MAX_RETRIES: u32 = 5;
if progress.retries < MAX_RETRIES {
progress.retries += 1;
progress.updated_at = Utc::now().timestamp();
tx.put(progress_key.as_bytes(), serde_json::to_vec(&progress).unwrap())
.await
.map_err(|e| TitoError::UpdateFailed(format!("Update progress failed: {}", e)))?;
} else {
tx.delete(progress_key.as_bytes())
.await
.map_err(|e| TitoError::DeleteFailed(format!("Delete progress failed: {}", e)))?;
let failed_key = format!("queue:{}:FAILED:{}:{}", event_type, partition_str, sequence_str);
let failed = QueueFailed {
retries: progress.retries,
updated_at: Utc::now().timestamp(),
error: Some("Max retries exceeded".to_string()),
};
tx.put(failed_key.as_bytes(), serde_json::to_vec(&failed).unwrap())
.await
.map_err(|e| TitoError::UpdateFailed(format!("Put failed entry: {}", e)))?;
}
}
Ok::<_, TitoError>(())
})
.await?;
Ok(())
}
pub async fn clear(&self) -> Result<(), TitoError> {
Ok(())
}
}
pub async fn run_worker<E: TitoEngine + 'static, H>(
queue: Arc<TitoQueue<E>>,
event_type: String,
handler: H,
partition_config: crate::types::PartitionConfig,
is_leader: Arc<AtomicBool>,
concurrency: u32,
mut shutdown: broadcast::Receiver<()>,
) -> tokio::task::JoinHandle<()>
where
H: Fn(TitoEvent) -> futures::future::BoxFuture<'static, Result<(), TitoError>>
+ Clone
+ Send
+ Sync
+ 'static,
{
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown.recv() => {
break;
}
_ = async {
let is_leader_val = is_leader.load(Ordering::SeqCst);
if is_leader_val {
match queue.pull_partition_range(
event_type.clone(),
partition_config.start,
partition_config.end,
20,
true
).await {
Ok(jobs) => {
stream::iter(jobs.into_iter().map(|job| {
let queue = Arc::clone(&queue);
let handler = handler.clone();
async move {
let result = handler(job.clone()).await;
let _ = match result {
Ok(_) => queue.success_job(job.key).await,
Err(_) => queue.fail_job(job.key).await,
};
}
}))
.for_each_concurrent(concurrency as usize, |job_future| job_future)
.await;
}
Err(_) => {
sleep(Duration::from_millis(500)).await;
}
}
}
sleep(Duration::from_millis(125)).await;
} => {}
}
}
})
}