use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::northbound::types::*;
pub struct LocalTaskCache {
pending_tasks: Mutex<HashMap<String, SimpleTaskRequest>>,
completed_results: Mutex<HashMap<String, SimpleTaskResponse>>,
sync_queue: Mutex<VecDeque<SyncItem>>,
start_time: u64,
}
#[derive(Debug, Clone)]
pub struct SyncItem {
pub task_id: String,
pub operation: SyncOperation,
pub timestamp: u64,
}
#[derive(Debug, Clone)]
pub enum SyncOperation {
UploadResult,
DownloadTask,
UpdateStatus,
}
impl LocalTaskCache {
pub fn new() -> Self {
Self {
pending_tasks: Mutex::new(HashMap::new()),
completed_results: Mutex::new(HashMap::new()),
sync_queue: Mutex::new(VecDeque::new()),
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
pub fn add_pending_task(&self, task: SimpleTaskRequest) -> Result<(), String> {
let mut tasks = self
.pending_tasks
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
let task_id = task.task_id.clone();
tasks.insert(task_id.clone(), task);
tracing::info!("Added pending task: {}", task_id);
Ok(())
}
pub fn add_completed_result(&self, result: SimpleTaskResponse) -> Result<(), String> {
{
let mut results = self
.completed_results
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
results.insert(result.task_id.clone(), result.clone());
}
{
let mut tasks = self
.pending_tasks
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
tasks.remove(&result.task_id);
}
{
let mut queue = self
.sync_queue
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
queue.push_back(SyncItem {
task_id: result.task_id.clone(),
operation: SyncOperation::UploadResult,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
});
}
tracing::info!("Added completed result: {}", result.task_id);
Ok(())
}
pub fn get_task_status(&self, task_id: &str) -> Option<SimpleTaskResponse> {
if let Ok(results) = self.completed_results.lock() {
if let Some(result) = results.get(task_id) {
return Some(result.clone());
}
}
if let Ok(tasks) = self.pending_tasks.lock() {
if let Some(task) = tasks.get(task_id) {
return Some(SimpleTaskResponse {
task_id: task.task_id.clone(),
status: TaskStatus::Queued,
result: None,
error: None,
timestamp: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
});
}
}
None
}
pub fn get_batch_task_status(&self, task_ids: &[String]) -> Vec<SimpleTaskResponse> {
let mut results = Vec::new();
for task_id in task_ids {
if let Some(response) = self.get_task_status(task_id) {
results.push(response);
}
}
results
}
pub fn get_pending_sync_items(&self) -> Result<Vec<SyncItem>, String> {
let queue = self
.sync_queue
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
Ok(queue.iter().cloned().collect())
}
pub fn clear_sync_queue(&self) -> Result<(), String> {
let mut queue = self
.sync_queue
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
queue.clear();
Ok(())
}
pub fn get_stats(&self) -> Result<CacheStats, String> {
let pending_count = {
let tasks = self
.pending_tasks
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
tasks.len() as u32
};
let completed_count = {
let results = self
.completed_results
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
results.len() as u32
};
let sync_queue_size = {
let queue = self
.sync_queue
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
queue.len() as u32
};
let uptime = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- self.start_time;
Ok(CacheStats {
pending_tasks: pending_count,
completed_tasks: completed_count,
sync_queue_size,
uptime,
})
}
pub fn update_task_status(
&self,
task_id: &str,
status: TaskStatus,
result: Option<String>,
error: Option<String>,
) -> Result<(), String> {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let response = SimpleTaskResponse {
task_id: task_id.to_string(),
status: status.clone(),
result,
error,
timestamp,
};
if status == TaskStatus::Completed || status == TaskStatus::Failed {
self.add_completed_result(response)?;
} else {
let mut tasks = self
.pending_tasks
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
if let Some(task) = tasks.get_mut(task_id) {
let updated_task = SimpleTaskRequest {
task_id: task.task_id.clone(),
task_type: task.task_type.clone(),
input_data: task.input_data.clone(),
priority: task.priority,
};
tasks.insert(task_id.to_string(), updated_task);
}
}
Ok(())
}
pub fn cleanup_old_results(&self, max_age_seconds: u64) -> Result<u32, String> {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let mut results = self
.completed_results
.lock()
.map_err(|_| "Failed to acquire lock".to_string())?;
let initial_count = results.len();
results.retain(|_, result| current_time - result.timestamp < max_age_seconds);
let removed_count = initial_count - results.len();
tracing::info!("Cleaned up {} old results", removed_count);
Ok(removed_count as u32)
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub pending_tasks: u32,
pub completed_tasks: u32,
pub sync_queue_size: u32,
pub uptime: u64,
}
impl Default for LocalTaskCache {
fn default() -> Self {
Self::new()
}
}