use std::{
collections::{HashSet, VecDeque},
fs,
path::PathBuf,
str::FromStr,
sync::{Arc, Mutex},
time::SystemTime,
};
use chrono::{Local, Utc};
use chrono_tz::Tz;
use cron::Schedule;
use serde::{
Deserialize, Serialize,
de::{EnumAccess, IgnoredAny, MapAccess, VariantAccess, Visitor},
};
use tracing::{debug, info, warn};
use crate::{
config::{Config, CronConfig},
error::ProcessManagerError,
runtime,
};
const MAX_EXECUTION_HISTORY: usize = 10;
mod systemtime_serde {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let duration = time
.duration_since(UNIX_EPOCH)
.map_err(serde::ser::Error::custom)?;
serializer.serialize_u64(duration.as_secs())
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(deserializer)?;
Ok(UNIX_EPOCH + Duration::from_secs(secs))
}
}
mod systemtime_serde_opt {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(
time: &Option<SystemTime>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match time {
Some(t) => {
let duration = t
.duration_since(UNIX_EPOCH)
.map_err(serde::ser::Error::custom)?;
serializer.serialize_u64(duration.as_secs())
}
None => serializer.serialize_u64(0), }
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<SystemTime>, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(deserializer)?;
if secs == 0 {
Ok(None)
} else {
Ok(Some(UNIX_EPOCH + Duration::from_secs(secs)))
}
}
}
#[derive(Debug, Clone, Serialize)]
pub enum CronExecutionStatus {
Success,
Failed(String),
OverlapError,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum FailedReasonValue {
Plain(String),
Text {
#[serde(rename = "$text")]
value: String,
},
}
impl FailedReasonValue {
fn into_reason(self) -> String {
match self {
Self::Plain(reason) => reason,
Self::Text { value } => value,
}
}
}
impl<'de> Deserialize<'de> for CronExecutionStatus {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct CronExecutionStatusVisitor;
impl<'de> Visitor<'de> for CronExecutionStatusVisitor {
type Value = CronExecutionStatus;
fn expecting(
&self,
formatter: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
formatter.write_str("a cron execution status in enum-tag or text form")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match value {
"Success" => Ok(CronExecutionStatus::Success),
"OverlapError" => Ok(CronExecutionStatus::OverlapError),
"Failed" => Ok(CronExecutionStatus::Failed("failed".to_string())),
other => Err(E::unknown_variant(
other,
&["Success", "Failed", "OverlapError"],
)),
}
}
fn visit_string<E>(self, value: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_str(&value)
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: EnumAccess<'de>,
{
let (variant, access) = data.variant::<String>()?;
match variant.as_str() {
"Success" => {
access.unit_variant()?;
Ok(CronExecutionStatus::Success)
}
"OverlapError" => {
access.unit_variant()?;
Ok(CronExecutionStatus::OverlapError)
}
"Failed" => {
let reason = access.newtype_variant::<FailedReasonValue>()?;
Ok(CronExecutionStatus::Failed(reason.into_reason()))
}
other => Err(serde::de::Error::unknown_variant(
other,
&["Success", "Failed", "OverlapError"],
)),
}
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut text_variant: Option<String> = None;
let mut failed_reason: Option<String> = None;
let mut tagged_variant: Option<CronExecutionStatus> = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"$text" => text_variant = Some(map.next_value::<String>()?),
"$value" => failed_reason = Some(map.next_value::<String>()?),
"Success" => {
let _: IgnoredAny = map.next_value()?;
tagged_variant = Some(CronExecutionStatus::Success);
}
"OverlapError" => {
let _: IgnoredAny = map.next_value()?;
tagged_variant = Some(CronExecutionStatus::OverlapError);
}
"Failed" => {
let value = map.next_value::<FailedReasonValue>()?;
let reason = value.into_reason();
failed_reason = Some(reason.clone());
tagged_variant = Some(CronExecutionStatus::Failed(reason));
}
_ => {
let _: IgnoredAny = map.next_value()?;
}
}
}
if let Some(status) = tagged_variant {
return Ok(status);
}
if let Some(text) = text_variant {
return match text.as_str() {
"Success" => Ok(CronExecutionStatus::Success),
"OverlapError" => Ok(CronExecutionStatus::OverlapError),
"Failed" => Ok(CronExecutionStatus::Failed(
failed_reason.unwrap_or_else(|| "failed".to_string()),
)),
other => Err(serde::de::Error::unknown_variant(
other,
&["Success", "Failed", "OverlapError"],
)),
};
}
if let Some(reason) = failed_reason {
return Ok(CronExecutionStatus::Failed(reason));
}
Err(serde::de::Error::custom(
"missing cron execution status value",
))
}
}
deserializer.deserialize_any(CronExecutionStatusVisitor)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronExecutionRecord {
#[serde(with = "systemtime_serde")]
pub started_at: SystemTime,
#[serde(with = "systemtime_serde_opt")]
pub completed_at: Option<SystemTime>,
pub status: Option<CronExecutionStatus>,
pub exit_code: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub command: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub metrics: Vec<crate::metrics::MetricSample>,
}
#[derive(Debug, Clone)]
pub struct CronJobState {
pub service_name: String,
pub service_hash: String,
pub schedule: Schedule,
pub last_execution: Option<SystemTime>,
pub next_execution: Option<SystemTime>,
pub currently_running: bool,
pub execution_history: VecDeque<CronExecutionRecord>,
pub timezone: EffectiveTimezone,
pub timezone_label: String,
}
impl CronJobState {
pub fn new(
service_name: String,
service_hash: String,
schedule: Schedule,
timezone: EffectiveTimezone,
timezone_label: String,
persisted: Option<PersistedCronJobState>,
) -> Self {
let next_execution = compute_next_execution(&schedule, timezone);
let mut state = Self {
service_name,
service_hash,
schedule,
last_execution: None,
next_execution,
currently_running: false,
execution_history: VecDeque::with_capacity(MAX_EXECUTION_HISTORY),
timezone,
timezone_label,
};
if let Some(persisted) = persisted {
state.last_execution = persisted.last_execution;
state.execution_history = persisted.execution_history;
while state.execution_history.len() > MAX_EXECUTION_HISTORY {
state.execution_history.pop_front();
}
}
state
}
pub fn add_execution_record(&mut self, record: CronExecutionRecord) {
if self.execution_history.len() >= MAX_EXECUTION_HISTORY {
self.execution_history.pop_front();
}
self.execution_history.push_back(record);
}
pub fn update_next_execution(&mut self) {
self.next_execution = compute_next_execution(&self.schedule, self.timezone);
}
}
#[derive(Clone, Copy, Debug)]
pub enum EffectiveTimezone {
Local,
Utc,
Named(Tz),
}
fn compute_next_execution(
schedule: &Schedule,
tz: EffectiveTimezone,
) -> Option<SystemTime> {
match tz {
EffectiveTimezone::Local => schedule
.upcoming(Local)
.next()
.map(|dt| dt.with_timezone(&Utc).into()),
EffectiveTimezone::Utc => schedule.upcoming(Utc).next().map(|dt| dt.into()),
EffectiveTimezone::Named(tz) => schedule
.upcoming(tz)
.next()
.map(|dt| dt.with_timezone(&Utc).into()),
}
}
#[derive(Clone)]
pub struct CronManager {
jobs: Arc<Mutex<Vec<CronJobState>>>,
state_file: Arc<Mutex<CronStateFile>>,
}
impl Default for CronManager {
fn default() -> Self {
let state_file =
CronStateFile::load().unwrap_or_else(|_| CronStateFile::default());
Self {
jobs: Arc::new(Mutex::new(Vec::new())),
state_file: Arc::new(Mutex::new(state_file)),
}
}
}
impl CronManager {
pub fn new() -> Self {
Self::default()
}
fn build_job_state(
&self,
service_name: &str,
service_hash: &str,
cron_config: &CronConfig,
) -> Result<(CronJobState, bool, String), ProcessManagerError> {
let (effective_timezone, timezone_label) =
resolve_timezone(cron_config, service_name)?;
let (normalized_expression, normalized) =
normalize_cron_expression(&cron_config.expression);
let schedule = Schedule::from_str(&normalized_expression).map_err(|e| {
let error_msg = format!(
"Invalid cron expression '{}': {}",
cron_config.expression, e
);
ProcessManagerError::ServiceStartError {
service: service_name.to_string(),
source: std::io::Error::new(std::io::ErrorKind::InvalidInput, error_msg),
}
})?;
let persisted_state = self
.state_file
.lock()
.ok()
.and_then(|state| state.jobs.get(service_hash).cloned());
let job_state = CronJobState::new(
service_name.to_string(),
service_hash.to_string(),
schedule,
effective_timezone,
timezone_label.clone(),
persisted_state,
);
Ok((job_state, normalized, normalized_expression))
}
pub fn register_job(
&self,
service_name: &str,
service_hash: &str,
cron_config: &CronConfig,
) -> Result<(), ProcessManagerError> {
let (job_state, normalized, normalized_expression) =
self.build_job_state(service_name, service_hash, cron_config)?;
let timezone_label = job_state.timezone_label.clone();
let mut jobs = self.jobs.lock().unwrap();
self.persist_job_state(&job_state);
jobs.push(job_state.clone());
if normalized {
debug!(
"Cron job '{}' expression normalized to '{}'",
service_name, normalized_expression
);
}
if let Some(next_exec) = job_state.next_execution {
let now = SystemTime::now();
let next_dt: chrono::DateTime<Utc> = next_exec.into();
let now_dt: chrono::DateTime<Utc> = now.into();
debug!(
"Cron job '{}' scheduled with timezone {}. Next execution: {} (now: {})",
service_name, timezone_label, next_dt, now_dt
);
} else {
debug!(
"Cron job '{}' scheduled with timezone {} but next_execution is None",
service_name, timezone_label
);
}
info!("Registered cron job for service '{}'", service_name);
Ok(())
}
pub fn sync_from_config(&self, config: &Config) -> Result<(), ProcessManagerError> {
let mut active_jobs = Vec::new();
let mut active_hashes = HashSet::new();
for (service_name, service_config) in &config.services {
if let Some(cron_config) = &service_config.cron {
let service_hash = service_config.compute_hash();
let (job_state, normalized, normalized_expression) =
self.build_job_state(service_name, &service_hash, cron_config)?;
let timezone_label = job_state.timezone_label.clone();
self.persist_job_state(&job_state);
if normalized {
debug!(
"Cron job '{}' expression normalized to '{}'",
service_name, normalized_expression
);
}
if let Some(next_exec) = job_state.next_execution {
let now = SystemTime::now();
let next_dt: chrono::DateTime<Utc> = next_exec.into();
let now_dt: chrono::DateTime<Utc> = now.into();
debug!(
"Cron job '{}' scheduled with timezone {}. Next execution: {} (now: {})",
service_name, timezone_label, next_dt, now_dt
);
} else {
debug!(
"Cron job '{}' scheduled with timezone {} but next_execution is None",
service_name, timezone_label
);
}
active_hashes.insert(service_hash);
info!("Registered cron job for service '{}'", service_name);
active_jobs.push(job_state);
}
}
{
let mut jobs_guard = self.jobs.lock().unwrap();
*jobs_guard = active_jobs;
}
self.prune_inactive_jobs(&active_hashes);
Ok(())
}
pub fn get_due_jobs(&self) -> Vec<String> {
let mut jobs = self.jobs.lock().unwrap();
let now = SystemTime::now();
let mut due_jobs = Vec::new();
for job in jobs.iter_mut() {
if let Some(next_exec) = job.next_execution
&& now >= next_exec
{
let next_dt: chrono::DateTime<Utc> = next_exec.into();
let now_dt: chrono::DateTime<Utc> = now.into();
debug!(
"Cron job '{}' is due (next_exec: {}, now: {})",
job.service_name, next_dt, now_dt
);
if job.currently_running {
warn!(
"Cron job '{}' is scheduled to run but previous execution is still running",
job.service_name
);
let record = CronExecutionRecord {
started_at: now,
completed_at: Some(now),
status: Some(CronExecutionStatus::OverlapError),
exit_code: None,
pid: None,
user: None,
command: None,
metrics: vec![],
};
job.add_execution_record(record);
job.update_next_execution();
self.persist_job_state(job);
} else {
due_jobs.push(job.service_name.clone());
job.currently_running = true;
job.last_execution = Some(now);
let record = CronExecutionRecord {
started_at: now,
completed_at: None,
status: None,
exit_code: None,
pid: None,
user: None,
command: None,
metrics: vec![],
};
job.add_execution_record(record);
job.update_next_execution();
self.persist_job_state(job);
}
}
}
due_jobs
}
pub fn mark_job_completed(
&self,
service_name: &str,
status: CronExecutionStatus,
exit_code: Option<i32>,
metrics: Vec<crate::metrics::MetricSample>,
) {
let mut jobs = self.jobs.lock().unwrap();
if let Some(job) = jobs.iter_mut().find(|j| j.service_name == service_name) {
job.currently_running = false;
if let Some(record) = job.execution_history.back_mut() {
record.completed_at = Some(SystemTime::now());
record.status = Some(status);
record.exit_code = exit_code;
record.metrics = metrics;
}
debug!("Cron job '{}' completed", service_name);
self.persist_job_state(job);
}
}
pub fn annotate_job_execution(
&self,
service_name: &str,
pid: Option<u32>,
user: Option<String>,
command: Option<String>,
) {
let mut jobs = self.jobs.lock().unwrap();
if let Some(job) = jobs.iter_mut().find(|j| j.service_name == service_name)
&& let Some(record) = job.execution_history.back_mut()
{
if pid.is_some() {
record.pid = pid;
}
if user.is_some() {
record.user = user;
}
if command.is_some() {
record.command = command;
}
self.persist_job_state(job);
}
}
pub fn get_all_jobs(&self) -> Vec<CronJobState> {
let jobs = self.jobs.lock().unwrap();
jobs.iter()
.map(|job| CronJobState {
service_name: job.service_name.clone(),
service_hash: job.service_hash.clone(),
schedule: Schedule::from_str(&job.schedule.to_string()).unwrap(),
last_execution: job.last_execution,
next_execution: job.next_execution,
currently_running: job.currently_running,
execution_history: job.execution_history.clone(),
timezone: job.timezone,
timezone_label: job.timezone_label.clone(),
})
.collect()
}
pub fn clear_all_jobs(&self) {
let mut jobs = self.jobs.lock().unwrap();
jobs.clear();
}
pub fn get_last_execution_status(
&self,
service_name: &str,
) -> Option<CronExecutionStatus> {
let jobs = self.jobs.lock().unwrap();
if let Some(job) = jobs.iter().find(|j| j.service_name == service_name) {
job.execution_history
.back()
.and_then(|record| record.status.clone())
} else {
None
}
}
fn prune_inactive_jobs(&self, active_hashes: &HashSet<String>) {
if let Ok(mut state) = self.state_file.lock() {
let original_len = state.jobs.len();
state.jobs.retain(|hash, _| active_hashes.contains(hash));
if state.jobs.len() != original_len
&& let Err(err) = state.save()
{
warn!("Failed to persist pruned cron state: {}", err);
}
}
}
fn persist_job_state(&self, job: &CronJobState) {
if let Ok(mut state) = self.state_file.lock() {
state.jobs.insert(
job.service_hash.clone(),
PersistedCronJobState {
last_execution: job.last_execution,
execution_history: job.execution_history.clone(),
timezone_label: job.timezone_label.clone(),
timezone: match job.timezone {
EffectiveTimezone::Local => None,
EffectiveTimezone::Utc => Some("UTC".to_string()),
EffectiveTimezone::Named(tz) => Some(tz.name().to_string()),
},
},
);
if let Err(err) = state.save() {
warn!(
"Failed to persist cron state for '{}': {}",
job.service_name, err
);
}
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct CronJobEntry {
hash: String,
state: PersistedCronJobState,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CronStateFile {
#[serde(
serialize_with = "serialize_cron_jobs",
deserialize_with = "deserialize_cron_jobs"
)]
jobs: std::collections::BTreeMap<String, PersistedCronJobState>,
}
fn serialize_cron_jobs<S>(
map: &std::collections::BTreeMap<String, PersistedCronJobState>,
s: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = s.serialize_seq(Some(map.len()))?;
for (k, v) in map {
seq.serialize_element(&CronJobEntry {
hash: k.clone(),
state: v.clone(),
})?;
}
seq.end()
}
fn deserialize_cron_jobs<'de, D>(
d: D,
) -> Result<std::collections::BTreeMap<String, PersistedCronJobState>, D::Error>
where
D: serde::Deserializer<'de>,
{
let entries: Vec<CronJobEntry> = Vec::deserialize(d)?;
Ok(entries.into_iter().map(|e| (e.hash, e.state)).collect())
}
impl CronStateFile {
fn path() -> PathBuf {
runtime::state_dir().join("cron_state.xml")
}
pub(crate) fn save(&self) -> Result<(), std::io::Error> {
let path = Self::path();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let data = quick_xml::se::to_string(self).map_err(std::io::Error::other)?;
use std::io::Write;
let mut file = fs::File::create(&path)?;
file.write_all(data.as_bytes())?;
file.sync_all()?;
Ok(())
}
pub fn load() -> Result<Self, std::io::Error> {
let path = Self::path();
if !path.exists() {
return Ok(Self::default());
}
let raw = fs::read_to_string(&path)?;
if raw.trim().is_empty() || raw.trim() == "<CronStateFile/>" {
return Ok(Self::default());
}
match quick_xml::de::from_str(&raw) {
Ok(state) => Ok(state),
Err(err) => {
eprintln!(
"Warning: Failed to deserialize cron state file at {:?}: {}. Using default state.",
path, err
);
Ok(Self::default())
}
}
}
pub fn jobs(&self) -> &std::collections::BTreeMap<String, PersistedCronJobState> {
&self.jobs
}
pub(crate) fn prune_jobs_not_in(&mut self, valid_hashes: &HashSet<String>) -> bool {
let original_len = self.jobs.len();
self.jobs.retain(|hash, _| valid_hashes.contains(hash));
original_len != self.jobs.len()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedCronJobState {
#[serde(with = "systemtime_serde_opt", default)]
pub last_execution: Option<SystemTime>,
#[serde(default)]
pub execution_history: VecDeque<CronExecutionRecord>,
#[serde(default)]
pub timezone_label: String,
#[serde(default)]
pub timezone: Option<String>,
}
impl Default for PersistedCronJobState {
fn default() -> Self {
Self {
last_execution: None,
execution_history: VecDeque::with_capacity(MAX_EXECUTION_HISTORY),
timezone_label: "".to_string(),
timezone: None,
}
}
}
fn normalize_cron_expression(expr: &str) -> (String, bool) {
let parts: Vec<&str> = expr.split_whitespace().collect();
match parts.len() {
5 => (format!("0 {}", parts.join(" ")), true),
_ => (parts.join(" "), false),
}
}
fn resolve_timezone(
cron_config: &CronConfig,
service_name: &str,
) -> Result<(EffectiveTimezone, String), ProcessManagerError> {
if let Some(tz_raw) = cron_config
.timezone
.as_ref()
.map(|tz| tz.trim())
.filter(|tz| !tz.is_empty())
{
if tz_raw.eq_ignore_ascii_case("utc") {
return Ok((EffectiveTimezone::Utc, "UTC".to_string()));
}
if tz_raw.eq_ignore_ascii_case("local") {
let label = format!("local ({})", Local::now().format("%Z%:z"));
return Ok((EffectiveTimezone::Local, label));
}
match tz_raw.parse::<Tz>() {
Ok(tz) => {
let label = tz.name().to_string();
Ok((EffectiveTimezone::Named(tz), label))
}
Err(e) => Err(ProcessManagerError::ServiceStartError {
service: service_name.to_string(),
source: std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Invalid timezone '{}': {}", tz_raw, e),
),
}),
}
} else {
let label = format!("local ({})", Local::now().format("%Z%:z"));
Ok((EffectiveTimezone::Local, label))
}
}
#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, VecDeque},
fs,
time::{Duration, SystemTime},
};
use super::*;
use crate::config::ServiceConfig;
fn compute_test_hash(cron_config: &CronConfig) -> String {
let service_config = ServiceConfig {
command: "test_command".to_string(),
env: None,
user: None,
group: None,
supplementary_groups: None,
limits: None,
capabilities: None,
isolation: None,
restart_policy: None,
backoff: None,
max_restarts: None,
depends_on: None,
deployment: None,
hooks: None,
cron: Some(cron_config.clone()),
skip: None,
spawn: None,
};
service_config.compute_hash()
}
#[test]
fn test_cron_manager_registration() {
let manager = CronManager::new();
let cron_config = CronConfig {
expression: "0 * * * * *".to_string(),
timezone: Some("UTC".into()),
};
let service_hash = compute_test_hash(&cron_config);
assert!(
manager
.register_job("test_service", &service_hash, &cron_config)
.is_ok()
);
let jobs = manager.get_all_jobs();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].service_name, "test_service");
assert!(matches!(jobs[0].timezone, EffectiveTimezone::Utc));
}
#[test]
fn test_invalid_cron_expression() {
let manager = CronManager::new();
let cron_config = CronConfig {
expression: "invalid cron".to_string(),
timezone: None,
};
let service_hash = compute_test_hash(&cron_config);
assert!(
manager
.register_job("test_service", &service_hash, &cron_config)
.is_err()
);
}
#[test]
fn test_five_field_expression_normalizes() {
let manager = CronManager::new();
let cron_config = CronConfig {
expression: "* * * * *".to_string(),
timezone: None,
};
let service_hash = compute_test_hash(&cron_config);
assert!(
manager
.register_job("test_service", &service_hash, &cron_config)
.is_ok()
);
let jobs = manager.get_all_jobs();
assert!(jobs[0].next_execution.is_some());
}
#[test]
fn persists_execution_history_with_exit_codes() {
let _guard = crate::test_utils::env_lock();
let base = std::env::current_dir()
.expect("current_dir")
.join("target/tmp-home");
fs::create_dir_all(&base).unwrap();
let temp = tempfile::tempdir_in(&base).unwrap();
let home = temp.path();
let original_home = std::env::var("HOME").ok();
unsafe {
std::env::set_var("HOME", home);
}
crate::runtime::init(crate::runtime::RuntimeMode::User);
crate::runtime::set_drop_privileges(false);
let manager = CronManager::new();
let cron_config = CronConfig {
expression: "* * * * * *".to_string(),
timezone: Some("UTC".into()),
};
let service_hash = compute_test_hash(&cron_config);
manager
.register_job("persisted_service", &service_hash, &cron_config)
.unwrap();
{
let mut jobs = manager.jobs.lock().unwrap();
let job = jobs
.iter_mut()
.find(|j| j.service_name == "persisted_service")
.expect("job registered");
job.next_execution = Some(SystemTime::now() - Duration::from_secs(1));
}
let due = manager.get_due_jobs();
assert_eq!(due, vec!["persisted_service".to_string()]);
manager.mark_job_completed(
"persisted_service",
CronExecutionStatus::Success,
Some(0),
vec![],
);
manager.annotate_job_execution(
"persisted_service",
Some(4242),
Some("postgres".to_string()),
Some("/bin/true".to_string()),
);
let service_hash = compute_test_hash(&cron_config);
let state = CronStateFile::load().expect("load cron state");
let persisted = state.jobs().get(&service_hash).expect("persisted cron job");
assert_eq!(persisted.execution_history.len(), 1);
let record = persisted.execution_history.back().unwrap();
assert!(matches!(record.status, Some(CronExecutionStatus::Success)));
assert_eq!(record.exit_code, Some(0));
assert_eq!(record.pid, Some(4242));
assert_eq!(record.user.as_deref(), Some("postgres"));
assert_eq!(record.command.as_deref(), Some("/bin/true"));
match original_home {
Some(val) => unsafe { std::env::set_var("HOME", val) },
None => unsafe { std::env::remove_var("HOME") },
}
crate::runtime::init(crate::runtime::RuntimeMode::User);
crate::runtime::set_drop_privileges(false);
}
fn service_with_cron(expr: &str) -> ServiceConfig {
ServiceConfig {
command: "/bin/true".into(),
env: None,
user: None,
group: None,
supplementary_groups: None,
limits: None,
capabilities: None,
isolation: None,
restart_policy: None,
backoff: None,
max_restarts: None,
depends_on: None,
deployment: None,
hooks: None,
cron: Some(CronConfig {
expression: expr.to_string(),
timezone: None,
}),
skip: None,
spawn: None,
}
}
#[test]
fn sync_from_config_prunes_removed_jobs() {
let _guard = crate::test_utils::env_lock();
let base = std::env::current_dir()
.expect("current_dir")
.join("target/tmp-home");
fs::create_dir_all(&base).unwrap();
let temp = tempfile::tempdir_in(&base).unwrap();
let home = temp.path();
let original_home = std::env::var("HOME").ok();
unsafe {
std::env::set_var("HOME", home);
}
crate::runtime::init_with_test_home(home);
crate::runtime::set_drop_privileges(false);
let manager = CronManager::new();
let mut services_v1 = HashMap::new();
services_v1.insert("job_one".to_string(), service_with_cron("* * * * * *"));
services_v1.insert("job_two".to_string(), service_with_cron("*/2 * * * * *"));
let config_v1 = Config {
version: "1".to_string(),
services: services_v1,
project_dir: None,
env: None,
metrics: crate::config::MetricsConfig::default(),
};
manager.sync_from_config(&config_v1).unwrap();
let mut services_v2 = HashMap::new();
services_v2.insert("job_two".to_string(), service_with_cron("*/2 * * * * *"));
services_v2.insert("job_three".to_string(), service_with_cron("0 */5 * * * *"));
let config_v2 = Config {
version: "1".to_string(),
services: services_v2,
project_dir: None,
env: None,
metrics: crate::config::MetricsConfig::default(),
};
let job_two_hash = service_with_cron("*/2 * * * * *").compute_hash();
let job_three_hash = service_with_cron("0 */5 * * * *").compute_hash();
let job_one_hash = service_with_cron("* * * * * *").compute_hash();
manager.sync_from_config(&config_v2).unwrap();
let job_names: Vec<String> = manager
.get_all_jobs()
.into_iter()
.map(|job| job.service_name)
.collect();
assert_eq!(job_names.len(), 2);
assert!(job_names.contains(&"job_two".to_string()));
assert!(job_names.contains(&"job_three".to_string()));
assert!(!job_names.contains(&"job_one".to_string()));
let state = CronStateFile::load().expect("load cron state");
assert!(state.jobs().contains_key(&job_two_hash));
assert!(state.jobs().contains_key(&job_three_hash));
assert!(!state.jobs().contains_key(&job_one_hash));
match original_home {
Some(val) => unsafe { std::env::set_var("HOME", val) },
None => unsafe { std::env::remove_var("HOME") },
}
crate::runtime::init(crate::runtime::RuntimeMode::User);
crate::runtime::set_drop_privileges(false);
}
#[test]
fn cron_execution_status_accepts_text_compat_shape() {
let status: CronExecutionStatus = serde_json::from_str(r#"{"$text":"Success"}"#)
.expect("deserialize compat text status");
assert!(matches!(status, CronExecutionStatus::Success));
}
#[test]
fn cron_state_deserializes_legacy_text_status_entries() {
let mut state = CronStateFile::default();
let mut history = VecDeque::new();
history.push_back(CronExecutionRecord {
started_at: SystemTime::UNIX_EPOCH + Duration::from_secs(10),
completed_at: Some(SystemTime::UNIX_EPOCH + Duration::from_secs(12)),
status: Some(CronExecutionStatus::Success),
exit_code: Some(0),
pid: None,
user: None,
command: None,
metrics: vec![],
});
state.jobs.insert(
"legacy-hash".to_string(),
PersistedCronJobState {
last_execution: Some(SystemTime::UNIX_EPOCH + Duration::from_secs(10)),
execution_history: history,
timezone_label: "UTC".to_string(),
timezone: Some("UTC".to_string()),
},
);
let xml = quick_xml::se::to_string(&state).expect("serialize cron state");
let parsed: CronStateFile =
quick_xml::de::from_str(&xml).expect("deserialize legacy state");
let record = parsed
.jobs()
.get("legacy-hash")
.and_then(|job| job.execution_history.back())
.expect("legacy record present");
assert!(matches!(record.status, Some(CronExecutionStatus::Success)));
}
}