use chrono::{DateTime, Utc};
use pollen_store::StoreBackend;
use pollen_types::{Result, TaskDef, TaskInstance};
use std::sync::Arc;
use tracing::debug;
pub struct InstanceGenerator {
store: Arc<StoreBackend>,
}
impl InstanceGenerator {
pub fn new(store: Arc<StoreBackend>) -> Self {
Self { store }
}
pub async fn ensure_instance(&self, task: &TaskDef, scheduled_at: DateTime<Utc>) -> Result<bool> {
let task_id = task.id.clone();
let existing = self.store.read(move |r| {
r.list_instances_for_task(&task_id, 10)
}).await?;
for instance in &existing {
let diff = (instance.scheduled_at - scheduled_at).num_seconds().abs();
if diff < 60 {
return Ok(false);
}
}
let instance = TaskInstance::new(task.id.clone(), scheduled_at);
let instance_clone = instance.clone();
self.store.write(move |w| {
w.insert_instance(&instance_clone)
}).await?;
debug!(
"Generated instance {} for task {} at {}",
instance.id, task.name, scheduled_at
);
Ok(true)
}
pub async fn generate_upcoming(&self, tasks: &[TaskDef], horizon: DateTime<Utc>) -> Result<usize> {
let now = Utc::now();
let mut count = 0;
for task in tasks {
if !task.enabled {
continue;
}
if let Some(next) = super::compute_next_execution(&task.schedule, now) {
if next <= horizon
&& self.ensure_instance(task, next).await? {
count += 1;
}
}
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use pollen_store::{MemoryStore, StoreBackend};
use pollen_types::Schedule;
use std::time::Duration;
#[tokio::test]
async fn test_generate_instance() {
let store = Arc::new(StoreBackend::Memory(MemoryStore::new()));
let task = TaskDef::new("test", Schedule::interval(Duration::from_secs(60)));
let task_for_insert = task.clone();
store.write(move |w| w.insert_task(&task_for_insert)).await.unwrap();
let generator = InstanceGenerator::new(store.clone());
let scheduled_at = Utc::now() + chrono::Duration::minutes(5);
let created = generator.ensure_instance(&task, scheduled_at).await.unwrap();
assert!(created);
let created2 = generator.ensure_instance(&task, scheduled_at).await.unwrap();
assert!(!created2);
}
}