use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crossbeam_channel::{Receiver, Sender, bounded, unbounded};
use crate::error::{LaurusError, Result};
use crate::lexical::index::inverted::segment::manager::SegmentManager;
use crate::lexical::index::inverted::segment::merge_engine::MergeEngine;
use crate::lexical::index::inverted::segment::merge_policy::MergePolicy;
use crate::maintenance::deletion::DeletionManager;
#[derive(Debug, Clone, PartialEq)]
pub enum TaskType {
Merge {
segment_ids: Vec<String>,
priority: f64,
},
Compaction {
segment_id: String,
deletion_ratio: f64,
},
Optimization {
target_segments: Vec<String>,
optimization_level: u8,
},
Cleanup { file_paths: Vec<String> },
StatsUpdate,
}
#[derive(Debug, Clone)]
pub struct BackgroundTask {
pub task_id: String,
pub task_type: TaskType,
pub priority: f64,
pub created_at: u64,
pub scheduled_at: u64,
pub retry_count: u8,
pub max_retries: u8,
pub estimated_duration_ms: u64,
pub metadata: Vec<(String, String)>,
}
impl BackgroundTask {
pub fn new(task_type: TaskType, priority: f64) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let task_id = format!("{now:016x}_{}", rand::random::<u32>());
BackgroundTask {
task_id,
task_type,
priority,
created_at: now,
scheduled_at: now,
retry_count: 0,
max_retries: 3,
estimated_duration_ms: 10000, metadata: Vec::new(),
}
}
pub fn is_ready(&self) -> bool {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
now >= self.scheduled_at
}
pub fn is_failed(&self) -> bool {
self.retry_count >= self.max_retries
}
pub fn age_seconds(&self) -> u64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
now.saturating_sub(self.created_at)
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum TaskStatus {
Pending,
Running,
Completed,
Failed(String),
Cancelled,
}
#[derive(Debug, Clone)]
pub struct TaskResult {
pub task_id: String,
pub status: TaskStatus,
pub execution_time_ms: u64,
pub items_processed: u64,
pub bytes_processed: u64,
pub error_message: Option<String>,
pub metadata: Vec<(String, String)>,
}
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
pub worker_threads: usize,
pub max_pending_tasks: usize,
pub task_timeout_secs: u64,
pub health_check_interval_secs: u64,
pub max_memory_mb: u64,
pub enable_prioritization: bool,
pub enable_batching: bool,
pub batch_size: usize,
}
impl Default for SchedulerConfig {
fn default() -> Self {
SchedulerConfig {
worker_threads: 2,
max_pending_tasks: 100,
task_timeout_secs: 300, health_check_interval_secs: 30,
max_memory_mb: 512,
enable_prioritization: true,
enable_batching: true,
batch_size: 5,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SchedulerStats {
pub tasks_submitted: u64,
pub tasks_pending: u64,
pub tasks_running: u64,
pub tasks_completed: u64,
pub tasks_failed: u64,
pub tasks_cancelled: u64,
pub avg_execution_time_ms: f64,
pub total_bytes_processed: u64,
pub current_memory_usage: u64,
pub active_workers: u64,
}
#[derive(Debug)]
pub struct BackgroundScheduler {
config: SchedulerConfig,
task_sender: Sender<BackgroundTask>,
task_receiver: Receiver<BackgroundTask>,
result_sender: Sender<TaskResult>,
result_receiver: Receiver<TaskResult>,
segment_manager: Arc<SegmentManager>,
merge_engine: Arc<MergeEngine>,
deletion_manager: Arc<DeletionManager>,
merge_policy: Arc<dyn MergePolicy>,
running: Arc<AtomicBool>,
stats: Arc<RwLock<SchedulerStats>>,
workers: RwLock<Vec<thread::JoinHandle<()>>>,
task_counter: Arc<AtomicU64>,
}
impl BackgroundScheduler {
pub fn new(
config: SchedulerConfig,
segment_manager: Arc<SegmentManager>,
merge_engine: Arc<MergeEngine>,
deletion_manager: Arc<DeletionManager>,
merge_policy: Arc<dyn MergePolicy>,
) -> Result<Self> {
let (task_sender, task_receiver) = bounded(config.max_pending_tasks);
let (result_sender, result_receiver) = unbounded();
Ok(BackgroundScheduler {
config,
task_sender,
task_receiver,
result_sender,
result_receiver,
segment_manager,
merge_engine,
deletion_manager,
merge_policy,
running: Arc::new(AtomicBool::new(false)),
stats: Arc::new(RwLock::new(SchedulerStats::default())),
workers: RwLock::new(Vec::new()),
task_counter: Arc::new(AtomicU64::new(0)),
})
}
pub fn start(&self) -> Result<()> {
if self.running.load(Ordering::Acquire) {
return Err(LaurusError::index("Background scheduler already running"));
}
self.running.store(true, Ordering::Release);
let mut workers = self.workers.write().unwrap();
for worker_id in 0..self.config.worker_threads {
let worker = self.spawn_worker(worker_id)?;
workers.push(worker);
}
let health_checker = self.spawn_health_checker()?;
workers.push(health_checker);
Ok(())
}
pub fn stop(&self) -> Result<()> {
self.running.store(false, Ordering::Release);
let mut workers = self.workers.write().unwrap();
while let Some(worker) = workers.pop() {
let _ = worker.join();
}
Ok(())
}
pub fn submit_task(&self, task: BackgroundTask) -> Result<()> {
if !self.running.load(Ordering::Acquire) {
return Err(LaurusError::index("Background scheduler not running"));
}
{
let mut stats = self.stats.write().unwrap();
stats.tasks_submitted += 1;
stats.tasks_pending += 1;
}
self.task_sender
.send(task)
.map_err(|_| LaurusError::index("Failed to submit task to queue"))?;
Ok(())
}
pub fn submit_merge_task(&self, segment_ids: Vec<String>, priority: f64) -> Result<()> {
let task_type = TaskType::Merge {
segment_ids,
priority,
};
let task = BackgroundTask::new(task_type, priority);
self.submit_task(task)
}
pub fn submit_compaction_task(&self, segment_id: String, deletion_ratio: f64) -> Result<()> {
let task_type = TaskType::Compaction {
segment_id,
deletion_ratio,
};
let task = BackgroundTask::new(task_type, 5.0 + (deletion_ratio * 10.0));
self.submit_task(task)
}
pub fn check_auto_merge(&self) -> Result<()> {
if !self.running.load(Ordering::Acquire) {
return Ok(());
}
let segments = self.segment_manager.get_segments();
if self.merge_policy.should_merge(&segments) {
let candidates = self.merge_policy.select_merges(&segments);
for candidate in candidates {
self.submit_merge_task(candidate.segments, candidate.priority)?;
}
}
Ok(())
}
pub fn check_auto_compaction(&self) -> Result<()> {
if !self.running.load(Ordering::Acquire) {
return Ok(());
}
let candidates = self.deletion_manager.get_compaction_candidates();
for segment_id in candidates {
let deletion_ratio = self.deletion_manager.get_deletion_ratio(&segment_id);
self.submit_compaction_task(segment_id, deletion_ratio)?;
}
Ok(())
}
pub fn get_results(&self) -> Vec<TaskResult> {
let mut results = Vec::new();
while let Ok(result) = self.result_receiver.try_recv() {
results.push(result);
}
results
}
pub fn get_stats(&self) -> SchedulerStats {
self.stats.read().unwrap().clone()
}
fn spawn_worker(&self, worker_id: usize) -> Result<thread::JoinHandle<()>> {
let task_receiver = self.task_receiver.clone();
let result_sender = self.result_sender.clone();
let segment_manager = self.segment_manager.clone();
let merge_engine = self.merge_engine.clone();
let deletion_manager = self.deletion_manager.clone();
let running = Arc::clone(&self.running);
let stats = Arc::clone(&self.stats);
let timeout = Duration::from_secs(self.config.task_timeout_secs);
let handle = thread::Builder::new()
.name(format!("bg-worker-{worker_id}"))
.spawn(move || {
while running.load(Ordering::Acquire) {
match task_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(task) => {
let result = Self::execute_task(
task,
&segment_manager,
&merge_engine,
&deletion_manager,
timeout,
);
{
let mut stats = stats.write().unwrap();
stats.tasks_pending = stats.tasks_pending.saturating_sub(1);
match result.status {
TaskStatus::Completed => stats.tasks_completed += 1,
TaskStatus::Failed(_) => stats.tasks_failed += 1,
TaskStatus::Cancelled => stats.tasks_cancelled += 1,
_ => {}
}
stats.total_bytes_processed += result.bytes_processed;
let total_completed = stats.tasks_completed + stats.tasks_failed;
if total_completed > 0 {
stats.avg_execution_time_ms = (stats.avg_execution_time_ms
* (total_completed - 1) as f64
+ result.execution_time_ms as f64)
/ total_completed as f64;
}
}
let _ = result_sender.send(result);
}
Err(_) => {
}
}
}
})?;
Ok(handle)
}
fn spawn_health_checker(&self) -> Result<thread::JoinHandle<()>> {
let running = Arc::clone(&self.running);
let _segment_manager = self.segment_manager.clone();
let _deletion_manager = self.deletion_manager.clone();
let _merge_policy = Arc::clone(&self.merge_policy);
let scheduler = self.clone();
let interval = Duration::from_secs(self.config.health_check_interval_secs);
let handle = thread::Builder::new()
.name("bg-health-checker".to_string())
.spawn(move || {
while running.load(Ordering::Acquire) {
let _ = scheduler.check_auto_merge();
let _ = scheduler.check_auto_compaction();
thread::sleep(interval);
}
})?;
Ok(handle)
}
fn execute_task(
task: BackgroundTask,
segment_manager: &SegmentManager,
merge_engine: &MergeEngine,
deletion_manager: &DeletionManager,
_timeout: Duration,
) -> TaskResult {
let start_time = SystemTime::now();
let task_id = task.task_id.clone();
let (status, items_processed, bytes_processed, error_message) = match task.task_type {
TaskType::Merge { segment_ids, .. } => {
Self::execute_merge_task(&segment_ids, segment_manager, merge_engine)
}
TaskType::Compaction { segment_id, .. } => Self::execute_compaction_task(
&segment_id,
deletion_manager,
segment_manager,
merge_engine,
),
TaskType::Optimization {
target_segments,
optimization_level,
} => Self::execute_optimization_task(
&target_segments,
optimization_level,
segment_manager,
merge_engine,
),
TaskType::Cleanup { file_paths } => {
Self::execute_cleanup_task(&file_paths, segment_manager)
}
TaskType::StatsUpdate => {
Self::execute_stats_update_task(segment_manager, deletion_manager)
}
};
let execution_time_ms = start_time.elapsed().unwrap_or_default().as_millis() as u64;
TaskResult {
task_id,
status,
execution_time_ms,
items_processed,
bytes_processed,
error_message,
metadata: Vec::new(),
}
}
fn execute_merge_task(
segment_ids: &[String],
_segment_manager: &SegmentManager,
_merge_engine: &MergeEngine,
) -> (TaskStatus, u64, u64, Option<String>) {
let segments = _segment_manager.get_segments();
let segments_to_merge: Vec<_> = segments
.iter()
.filter(|seg| segment_ids.contains(&seg.segment_info.segment_id))
.collect();
if segments_to_merge.is_empty() {
return (
TaskStatus::Failed("No segments found to merge".to_string()),
0,
0,
Some("No segments found to merge".to_string()),
);
}
let candidate = crate::lexical::index::inverted::segment::manager::MergeCandidate {
segments: segment_ids.to_vec(),
priority: 0.0, estimated_size: 0, strategy: crate::lexical::index::inverted::segment::manager::MergeStrategy::Balanced, };
let generation = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos() as u64;
let merge_result = match _merge_engine.merge_segments(&candidate, &segments, generation) {
Ok(result) => result,
Err(e) => {
return (
TaskStatus::Failed(format!("Merge failed: {}", e)),
0,
0,
Some(format!("Merge failed: {}", e)),
);
}
};
if let Err(e) = _segment_manager.add_segment(
merge_result.new_segment.segment_info,
merge_result.file_paths,
) {
return (
TaskStatus::Failed(format!("Failed to add new segment: {}", e)),
0,
0,
Some(format!("Failed to add new segment: {}", e)),
);
}
for segment_id in segment_ids {
let _ = _segment_manager.remove_segment(segment_id);
}
(
TaskStatus::Completed,
merge_result.stats.segments_merged as u64,
merge_result.stats.size_before,
None,
)
}
fn execute_compaction_task(
segment_id: &str,
deletion_manager: &DeletionManager,
segment_manager: &SegmentManager,
merge_engine: &MergeEngine,
) -> (TaskStatus, u64, u64, Option<String>) {
let _segment = match segment_manager.get_segment(segment_id) {
Some(seg) => seg,
None => {
return (
TaskStatus::Failed(format!("Segment {} not found", segment_id)),
0,
0,
Some(format!("Segment {} not found", segment_id)),
);
}
};
let _deleted_count = deletion_manager.get_deleted_docs(segment_id).len() as u64;
Self::execute_merge_task(&[segment_id.to_string()], segment_manager, merge_engine)
}
fn execute_optimization_task(
target_segments: &[String],
_optimization_level: u8,
segment_manager: &SegmentManager,
merge_engine: &MergeEngine,
) -> (TaskStatus, u64, u64, Option<String>) {
Self::execute_merge_task(target_segments, segment_manager, merge_engine)
}
fn execute_cleanup_task(
file_paths: &[String],
_segment_manager: &SegmentManager,
) -> (TaskStatus, u64, u64, Option<String>) {
match _segment_manager.delete_files(file_paths) {
Ok(_) => (TaskStatus::Completed, file_paths.len() as u64, 0, None),
Err(e) => (
TaskStatus::Failed(format!("Cleanup failed: {}", e)),
0,
0,
Some(format!("Cleanup failed: {}", e)),
),
}
}
fn execute_stats_update_task(
_segment_manager: &SegmentManager,
_deletion_manager: &DeletionManager,
) -> (TaskStatus, u64, u64, Option<String>) {
let _stats = _segment_manager.get_stats();
let _del_stats = _deletion_manager.get_stats();
(TaskStatus::Completed, 1, 0, None)
}
}
impl Clone for BackgroundScheduler {
fn clone(&self) -> Self {
BackgroundScheduler {
config: self.config.clone(),
task_sender: self.task_sender.clone(),
task_receiver: self.task_receiver.clone(),
result_sender: self.result_sender.clone(),
result_receiver: self.result_receiver.clone(),
segment_manager: self.segment_manager.clone(),
merge_engine: self.merge_engine.clone(),
deletion_manager: self.deletion_manager.clone(),
merge_policy: Arc::clone(&self.merge_policy),
running: Arc::clone(&self.running),
stats: Arc::clone(&self.stats),
workers: RwLock::new(Vec::new()), task_counter: Arc::clone(&self.task_counter),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[allow(dead_code)]
#[test]
fn test_background_task_creation() {
let task_type = TaskType::Merge {
segment_ids: vec!["seg1".to_string(), "seg2".to_string()],
priority: 5.0,
};
let task = BackgroundTask::new(task_type, 10.0);
assert_eq!(task.priority, 10.0);
assert_eq!(task.retry_count, 0);
assert_eq!(task.max_retries, 3);
assert!(task.is_ready());
assert!(!task.is_failed());
}
#[test]
fn test_task_status_checks() {
let mut task = BackgroundTask::new(TaskType::StatsUpdate, 1.0);
assert!(task.is_ready());
task.scheduled_at += 3600; assert!(!task.is_ready());
task.retry_count = 5;
assert!(task.is_failed());
}
#[test]
fn test_scheduler_config_default() {
let config = SchedulerConfig::default();
assert_eq!(config.worker_threads, 2);
assert_eq!(config.max_pending_tasks, 100);
assert_eq!(config.task_timeout_secs, 300);
assert!(config.enable_prioritization);
assert!(config.enable_batching);
}
}