use super::job::{IndexJob, JobState, ResourceMode, SchedulerEvent};
use super::limits::{MAX_JOB_ATTEMPTS, SchedulerConfig};
use super::queue::QueueSet;
use orbok_core::{JobId, JobStatus, OrbokResult, SourceId, now_iso8601};
use orbok_db::Catalog;
use orbok_db::repo::IndexJobRepository;
pub struct Scheduler {
#[allow(dead_code)] config: SchedulerConfig,
queues: QueueSet,
resource_mode: ResourceMode,
events: Vec<SchedulerEvent>,
completed_count: u64,
failed_count: u64,
}
impl Scheduler {
pub fn new(config: SchedulerConfig) -> Self {
Self {
queues: QueueSet::new(&config.capacity),
config,
resource_mode: ResourceMode::default(),
events: Vec::new(),
completed_count: 0,
failed_count: 0,
}
}
pub fn with_defaults() -> Self {
Self::new(SchedulerConfig::default())
}
pub fn notify_user_active(&mut self) {
if self.resource_mode != ResourceMode::Paused {
let changed = self.resource_mode != ResourceMode::UserActive;
self.resource_mode = ResourceMode::UserActive;
if changed {
self.emit(SchedulerEvent::UserActivityDetected);
self.emit(SchedulerEvent::ResourceModeChanged(
ResourceMode::UserActive,
));
}
}
}
pub fn notify_user_idle(&mut self) {
if self.resource_mode == ResourceMode::UserActive {
self.resource_mode = ResourceMode::Normal;
self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Normal));
}
}
pub fn resource_mode(&self) -> ResourceMode {
self.resource_mode
}
pub fn pause(&mut self, catalog: &Catalog) -> OrbokResult<()> {
if self.resource_mode == ResourceMode::Paused {
return Ok(());
}
self.resource_mode = ResourceMode::Paused;
self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Paused));
let conn = catalog.lock();
conn.execute(
"UPDATE index_jobs SET status = 'paused', updated_at = ?1 WHERE status = 'queued'",
rusqlite::params![now_iso8601()],
)
.map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
Ok(())
}
pub fn resume(&mut self, catalog: &Catalog) -> OrbokResult<()> {
if self.resource_mode != ResourceMode::Paused {
return Ok(());
}
self.resource_mode = ResourceMode::Normal;
self.emit(SchedulerEvent::ResourceModeChanged(ResourceMode::Normal));
let conn = catalog.lock();
conn.execute(
"UPDATE index_jobs SET status = 'queued', updated_at = ?1 WHERE status = 'paused'",
rusqlite::params![now_iso8601()],
)
.map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
Ok(())
}
pub fn cancel_source(&mut self, source_id: &SourceId, catalog: &Catalog) -> OrbokResult<usize> {
let cancelled = self.queues.cancel_source(source_id);
let conn = catalog.lock();
conn.execute(
"UPDATE index_jobs SET status = 'canceled', updated_at = ?1 \
WHERE source_id = ?2 AND status IN ('queued','paused')",
rusqlite::params![now_iso8601(), source_id.as_str()],
)
.map_err(|e| orbok_core::OrbokError::Database(e.to_string()))?;
Ok(cancelled)
}
pub fn enqueue(&mut self, job: IndexJob, catalog: &Catalog) -> OrbokResult<()> {
let queue = self.queues.queue_for(job.kind);
if queue.is_full() {
let kind = queue.kind();
if !queue.backpressure_active {
queue.backpressure_active = true;
self.events
.push(SchedulerEvent::QueueBackpressureApplied(kind));
}
return Err(orbok_core::OrbokError::BackpressureActive {
queue: format!("{kind:?}"),
});
}
if queue.backpressure_active && !queue.is_full() {
queue.backpressure_active = false;
let kind = queue.kind();
self.events
.push(SchedulerEvent::QueueBackpressureReleased(kind));
}
let jobs = IndexJobRepository::new(catalog);
jobs.enqueue_with_priority(
job.kind.as_job_type(),
Some(&job.source_id),
job.file_id.as_ref(),
job.priority.as_i64(),
)?;
let id = job.id.clone();
self.queues.queue_for(job.kind).push(job);
self.emit(SchedulerEvent::JobQueued(id));
Ok(())
}
pub fn tick(&mut self) -> Option<IndexJob> {
if self.resource_mode == ResourceMode::Paused {
return None;
}
let job = self.queues.pop_next(self.resource_mode)?;
let queue = self.queues.queue_for(job.kind);
if queue.backpressure_active && !queue.is_full() {
queue.backpressure_active = false;
let kind = queue.kind();
self.events
.push(SchedulerEvent::QueueBackpressureReleased(kind));
}
self.emit(SchedulerEvent::JobStarted(job.id.clone()));
Some(job)
}
pub fn complete(&mut self, job_id: &JobId, catalog: &Catalog) -> OrbokResult<()> {
let jobs = IndexJobRepository::new(catalog);
jobs.set_status(job_id, JobStatus::Succeeded)?;
self.completed_count += 1;
self.emit(SchedulerEvent::JobCompleted(job_id.clone()));
self.emit_readiness(catalog);
Ok(())
}
pub fn fail(
&mut self,
mut job: IndexJob,
error_kind: &str,
catalog: &Catalog,
) -> OrbokResult<()> {
job.attempt_count += 1;
job.last_error_kind = Some(error_kind.to_string());
if job.attempt_count < MAX_JOB_ATTEMPTS {
tracing::debug!(
job = job.id.as_str(),
attempt = job.attempt_count,
error = error_kind,
"job failed — will retry"
);
job.state = JobState::Pending;
let id = job.id.clone();
let queue = self.queues.queue_for(job.kind);
if !queue.is_full() {
queue.push(job);
}
let jobs = IndexJobRepository::new(catalog);
jobs.set_status(&id, JobStatus::Queued)?;
jobs.increment_attempt(&id, error_kind)?;
} else {
tracing::warn!(
job = job.id.as_str(),
attempts = job.attempt_count,
error = error_kind,
"job permanently failed after max attempts"
);
let jobs = IndexJobRepository::new(catalog);
jobs.set_status(&job.id, JobStatus::Failed)?;
self.failed_count += 1;
self.emit(SchedulerEvent::JobFailed {
id: job.id.clone(),
error_kind: error_kind.to_string(),
});
}
Ok(())
}
pub fn pending_count(&self) -> usize {
self.queues.total_pending()
}
pub fn completed_count(&self) -> u64 {
self.completed_count
}
pub fn failed_count(&self) -> u64 {
self.failed_count
}
pub fn is_idle(&self) -> bool {
self.queues.total_pending() == 0
}
pub fn drain_events(&mut self) -> Vec<SchedulerEvent> {
std::mem::take(&mut self.events)
}
fn emit(&mut self, event: SchedulerEvent) {
self.events.push(event);
}
fn emit_readiness(&mut self, catalog: &Catalog) {
let conn = catalog.lock();
let ready: i64 = conn
.query_row(
"SELECT COUNT(*) FROM files WHERE file_status = 'indexed'",
[],
|r| r.get(0),
)
.unwrap_or(0);
let pending = self.pending_count() as u64;
self.events.push(SchedulerEvent::PartialReadinessChanged {
ready_count: ready as u64,
pending_count: pending,
});
}
}