use super::messages::EnqueueJob;
use crate::jobs::{JobError, JobId, JobSchedule};
use acton_reactive::prelude::*;
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, error, info};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledJobEntry {
pub id: JobId,
pub job_type: String,
pub payload: Vec<u8>,
pub schedule: JobSchedule,
pub priority: i32,
pub max_retries: u32,
pub timeout: Duration,
pub next_execution: DateTime<Utc>,
pub execution_count: u64,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ScheduledJobMessage {
RegisterScheduledJob {
job_type: String,
payload: Vec<u8>,
schedule: JobSchedule,
priority: i32,
max_retries: u32,
timeout: Duration,
},
UnregisterScheduledJob {
id: JobId,
},
SetScheduledJobEnabled {
id: JobId,
enabled: bool,
},
ProcessScheduledJobs,
GetScheduledJobs,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ScheduledJobResponse {
JobRegistered {
id: JobId,
},
JobUnregistered,
EnabledUpdated,
ScheduledJobs(Vec<ScheduledJobEntry>),
}
#[derive(Debug, Clone)]
pub struct ScheduledJobAgent {
scheduled_jobs: Arc<RwLock<HashMap<JobId, ScheduledJobEntry>>>,
job_agent_handle: Option<AgentHandle>,
}
impl Default for ScheduledJobAgent {
fn default() -> Self {
Self::new()
}
}
impl ScheduledJobAgent {
#[must_use]
pub fn new() -> Self {
Self {
scheduled_jobs: Arc::new(RwLock::new(HashMap::new())),
job_agent_handle: None,
}
}
pub async fn spawn(
runtime: &mut AgentRuntime,
job_agent_handle: AgentHandle,
) -> anyhow::Result<AgentHandle> {
let agent_config =
AgentConfig::new(Ern::with_root("scheduled_job_manager")?, None, None)?;
let mut builder = runtime
.new_agent_with_config::<Self>(agent_config)
.await;
builder.model.job_agent_handle = Some(job_agent_handle);
builder.mutate_on::<ScheduledJobMessage>(|agent, envelope| {
let msg = envelope.message().clone();
let reply_envelope = envelope.reply_envelope();
match msg {
ScheduledJobMessage::RegisterScheduledJob {
job_type,
payload,
schedule,
priority,
max_retries,
timeout,
} => {
let id = JobId::new();
let next_execution = schedule
.next_execution(Utc::now())
.unwrap_or_else(Utc::now);
let entry = ScheduledJobEntry {
id,
job_type,
payload,
schedule,
priority,
max_retries,
timeout,
next_execution,
execution_count: 0,
enabled: true,
};
agent.model.scheduled_jobs.write().insert(id, entry);
info!("Registered scheduled job: {}", id);
AgentReply::from_async(async move {
let response = ScheduledJobResponse::JobRegistered { id };
let _: () = reply_envelope.send(response).await;
})
}
ScheduledJobMessage::UnregisterScheduledJob { id } => {
agent.model.scheduled_jobs.write().remove(&id);
info!("Unregistered scheduled job: {}", id);
AgentReply::from_async(async move {
let response = ScheduledJobResponse::JobUnregistered;
let _: () = reply_envelope.send(response).await;
})
}
ScheduledJobMessage::SetScheduledJobEnabled { id, enabled } => {
if let Some(entry) = agent.model.scheduled_jobs.write().get_mut(&id) {
entry.enabled = enabled;
info!("Set scheduled job {} enabled={}", id, enabled);
}
AgentReply::from_async(async move {
let response = ScheduledJobResponse::EnabledUpdated;
let _: () = reply_envelope.send(response).await;
})
}
ScheduledJobMessage::ProcessScheduledJobs => {
let scheduled_jobs = agent.model.scheduled_jobs.clone();
let job_handle = agent.model.job_agent_handle.clone();
AgentReply::from_async(async move {
Self::process_scheduled_jobs_async(scheduled_jobs, job_handle).await;
})
}
ScheduledJobMessage::GetScheduledJobs => {
let jobs = agent
.model
.scheduled_jobs
.read()
.values()
.cloned()
.collect();
AgentReply::from_async(async move {
let response = ScheduledJobResponse::ScheduledJobs(jobs);
let _: () = reply_envelope.send(response).await;
})
}
}
});
Ok(builder.start().await)
}
#[allow(clippy::cognitive_complexity)]
async fn process_scheduled_jobs_async(
scheduled_jobs: Arc<RwLock<HashMap<JobId, ScheduledJobEntry>>>,
job_handle: Option<AgentHandle>,
) {
let now = Utc::now();
let mut jobs_to_enqueue = Vec::new();
{
let mut jobs = scheduled_jobs.write();
for entry in jobs.values_mut() {
if !entry.enabled {
continue;
}
if entry.next_execution <= now {
if !entry.schedule.has_more_executions(entry.execution_count) {
debug!("Scheduled job {} has no more executions", entry.id);
entry.enabled = false;
continue;
}
jobs_to_enqueue.push(entry.clone());
entry.execution_count += 1;
if let Some(next) = entry.schedule.next_execution(now) {
entry.next_execution = next;
} else {
entry.enabled = false;
}
}
}
}
if let Some(job_agent) = job_handle {
for entry in jobs_to_enqueue {
debug!("Enqueueing scheduled job: {}", entry.id);
let enqueue_msg = EnqueueJob {
id: JobId::new(), job_type: entry.job_type.clone(),
payload: entry.payload.clone(),
priority: entry.priority,
max_retries: entry.max_retries,
timeout: entry.timeout,
};
job_agent.send(enqueue_msg).await;
debug!("Successfully enqueued scheduled job");
}
} else {
error!("Job agent handle not set - cannot enqueue scheduled jobs");
}
}
}
pub async fn start_scheduler_loop(scheduler_handle: AgentHandle) -> Result<(), JobError> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
debug!("Triggering scheduled job processing");
let msg = ScheduledJobMessage::ProcessScheduledJobs;
scheduler_handle.send(msg).await;
}
});
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scheduled_job_entry_creation() {
let entry = ScheduledJobEntry {
id: JobId::new(),
job_type: "TestJob".to_string(),
payload: vec![1, 2, 3],
schedule: JobSchedule::after(Duration::from_secs(60)),
priority: 0,
max_retries: 3,
timeout: Duration::from_secs(300),
next_execution: Utc::now(),
execution_count: 0,
enabled: true,
};
assert_eq!(entry.job_type, "TestJob");
assert!(entry.enabled);
assert_eq!(entry.execution_count, 0);
}
#[test]
fn test_scheduled_job_serialization() {
let entry = ScheduledJobEntry {
id: JobId::new(),
job_type: "TestJob".to_string(),
payload: vec![1, 2, 3],
schedule: JobSchedule::every(Duration::from_secs(60)),
priority: 0,
max_retries: 3,
timeout: Duration::from_secs(300),
next_execution: Utc::now(),
execution_count: 5,
enabled: true,
};
let json = serde_json::to_string(&entry).unwrap();
let deserialized: ScheduledJobEntry = serde_json::from_str(&json).unwrap();
assert_eq!(entry.job_type, deserialized.job_type);
assert_eq!(entry.execution_count, deserialized.execution_count);
}
}