use crate::config::ScheduleConfig;
use crate::error::{HeraldError, Result};
use crate::generator::GeneratedTweet;
use chrono::{DateTime, Utc, Timelike, Duration};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tracing::{debug, info};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledTweet {
pub id: String,
pub content: String,
pub scheduled_for: DateTime<Utc>,
pub event_id: Option<String>,
pub status: ScheduleStatus,
pub created_at: DateTime<Utc>,
pub posted_id: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ScheduleStatus {
Pending,
Posting,
Posted,
Failed,
Cancelled,
}
pub struct Scheduler {
config: ScheduleConfig,
queue_path: PathBuf,
}
impl Scheduler {
pub fn new(config: ScheduleConfig) -> Result<Self> {
let queue_path = config.queue_file.clone().unwrap_or_else(|| {
directories::ProjectDirs::from("io", "moltenlabs", "herald")
.map(|d| d.data_dir().join("queue.json"))
.unwrap_or_else(|| PathBuf::from("herald_queue.json"))
});
if let Some(parent) = queue_path.parent() {
std::fs::create_dir_all(parent)?;
}
Ok(Self { config, queue_path })
}
pub fn schedule(&self, tweet: &GeneratedTweet, time: Option<DateTime<Utc>>) -> Result<ScheduledTweet> {
let scheduled_time = time.unwrap_or_else(|| self.next_available_slot());
let scheduled = ScheduledTweet {
id: uuid::Uuid::new_v4().to_string(),
content: tweet.content.clone(),
scheduled_for: scheduled_time,
event_id: Some(tweet.event_id.clone()),
status: ScheduleStatus::Pending,
created_at: Utc::now(),
posted_id: None,
error: None,
};
let mut queue = self.load_queue()?;
queue.push(scheduled.clone());
self.save_queue(&queue)?;
info!("Scheduled tweet for {}", scheduled_time);
Ok(scheduled)
}
pub fn schedule_text(&self, content: &str, time: Option<DateTime<Utc>>) -> Result<ScheduledTweet> {
let scheduled_time = time.unwrap_or_else(|| self.next_available_slot());
let scheduled = ScheduledTweet {
id: uuid::Uuid::new_v4().to_string(),
content: content.to_string(),
scheduled_for: scheduled_time,
event_id: None,
status: ScheduleStatus::Pending,
created_at: Utc::now(),
posted_id: None,
error: None,
};
let mut queue = self.load_queue()?;
queue.push(scheduled.clone());
self.save_queue(&queue)?;
Ok(scheduled)
}
pub fn next_available_slot(&self) -> DateTime<Utc> {
let queue = self.load_queue().unwrap_or_default();
let now = Utc::now();
let last_scheduled = queue
.iter()
.filter(|t| t.status == ScheduleStatus::Pending)
.map(|t| t.scheduled_for)
.max();
let min_gap = Duration::hours(self.config.min_hours_between as i64);
let earliest = match last_scheduled {
Some(last) => {
let with_gap = last + min_gap;
if with_gap > now { with_gap } else { now }
}
None => now,
};
self.find_next_preferred_time(earliest)
}
fn find_next_preferred_time(&self, after: DateTime<Utc>) -> DateTime<Utc> {
if self.config.preferred_times.is_empty() {
return after + Duration::hours(1);
}
let today_times: Vec<(u32, u32)> = self.config.preferred_times
.iter()
.filter_map(|t| {
let parts: Vec<&str> = t.split(':').collect();
if parts.len() == 2 {
Some((
parts[0].parse().ok()?,
parts[1].parse().ok()?,
))
} else {
None
}
})
.collect();
let after_hour = after.hour();
let after_minute = after.minute();
for &(hour, minute) in &today_times {
if hour > after_hour || (hour == after_hour && minute > after_minute) {
return after
.with_hour(hour).unwrap()
.with_minute(minute).unwrap()
.with_second(0).unwrap();
}
}
if let Some(&(hour, minute)) = today_times.first() {
return (after + Duration::days(1))
.with_hour(hour).unwrap()
.with_minute(minute).unwrap()
.with_second(0).unwrap();
}
after + Duration::hours(1)
}
pub fn pending(&self) -> Result<Vec<ScheduledTweet>> {
let queue = self.load_queue()?;
Ok(queue
.into_iter()
.filter(|t| t.status == ScheduleStatus::Pending)
.collect())
}
pub fn due(&self) -> Result<Vec<ScheduledTweet>> {
let now = Utc::now();
let queue = self.load_queue()?;
Ok(queue
.into_iter()
.filter(|t| t.status == ScheduleStatus::Pending && t.scheduled_for <= now)
.collect())
}
pub fn mark_posted(&self, id: &str, posted_id: &str) -> Result<()> {
let mut queue = self.load_queue()?;
if let Some(tweet) = queue.iter_mut().find(|t| t.id == id) {
tweet.status = ScheduleStatus::Posted;
tweet.posted_id = Some(posted_id.to_string());
}
self.save_queue(&queue)
}
pub fn mark_failed(&self, id: &str, error: &str) -> Result<()> {
let mut queue = self.load_queue()?;
if let Some(tweet) = queue.iter_mut().find(|t| t.id == id) {
tweet.status = ScheduleStatus::Failed;
tweet.error = Some(error.to_string());
}
self.save_queue(&queue)
}
pub fn cancel(&self, id: &str) -> Result<()> {
let mut queue = self.load_queue()?;
if let Some(tweet) = queue.iter_mut().find(|t| t.id == id) {
tweet.status = ScheduleStatus::Cancelled;
}
self.save_queue(&queue)
}
pub fn reschedule(&self, id: &str, new_time: DateTime<Utc>) -> Result<()> {
let mut queue = self.load_queue()?;
if let Some(tweet) = queue.iter_mut().find(|t| t.id == id) {
tweet.scheduled_for = new_time;
tweet.status = ScheduleStatus::Pending;
}
self.save_queue(&queue)
}
pub fn cleanup(&self) -> Result<usize> {
let mut queue = self.load_queue()?;
let before = queue.len();
queue.retain(|t| t.status == ScheduleStatus::Pending);
self.save_queue(&queue)?;
Ok(before - queue.len())
}
pub fn stats(&self) -> Result<QueueStats> {
let queue = self.load_queue()?;
Ok(QueueStats {
pending: queue.iter().filter(|t| t.status == ScheduleStatus::Pending).count(),
posted: queue.iter().filter(|t| t.status == ScheduleStatus::Posted).count(),
failed: queue.iter().filter(|t| t.status == ScheduleStatus::Failed).count(),
cancelled: queue.iter().filter(|t| t.status == ScheduleStatus::Cancelled).count(),
})
}
fn load_queue(&self) -> Result<Vec<ScheduledTweet>> {
if !self.queue_path.exists() {
return Ok(Vec::new());
}
let content = std::fs::read_to_string(&self.queue_path)?;
if content.trim().is_empty() {
return Ok(Vec::new());
}
serde_json::from_str(&content).map_err(|e| {
HeraldError::Schedule(format!("Failed to parse queue: {}", e))
})
}
fn save_queue(&self, queue: &[ScheduledTweet]) -> Result<()> {
let content = serde_json::to_string_pretty(queue)?;
std::fs::write(&self.queue_path, content)?;
debug!("Saved {} tweets to queue", queue.len());
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueStats {
pub pending: usize,
pub posted: usize,
pub failed: usize,
pub cancelled: usize,
}
impl QueueStats {
pub fn total(&self) -> usize {
self.pending + self.posted + self.failed + self.cancelled
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::{tempdir, TempDir};
fn test_scheduler() -> (TempDir, Scheduler) {
let dir = tempdir().unwrap();
let config = ScheduleConfig {
queue_file: Some(dir.path().join("test_queue.json")),
..Default::default()
};
let scheduler = Scheduler::new(config).unwrap();
(dir, scheduler)
}
#[test]
fn test_schedule_text() {
let (_dir, scheduler) = test_scheduler();
let scheduled = scheduler.schedule_text("Hello world!", None).unwrap();
assert_eq!(scheduled.content, "Hello world!");
assert_eq!(scheduled.status, ScheduleStatus::Pending);
}
#[test]
fn test_pending() {
let (_dir, scheduler) = test_scheduler();
scheduler.schedule_text("Tweet 1", None).unwrap();
scheduler.schedule_text("Tweet 2", None).unwrap();
let pending = scheduler.pending().unwrap();
assert_eq!(pending.len(), 2);
}
#[test]
fn test_mark_posted() {
let (_dir, scheduler) = test_scheduler();
let scheduled = scheduler.schedule_text("Hello!", None).unwrap();
scheduler.mark_posted(&scheduled.id, "twitter-123").unwrap();
let queue = scheduler.load_queue().unwrap();
let tweet = queue.iter().find(|t| t.id == scheduled.id).unwrap();
assert_eq!(tweet.status, ScheduleStatus::Posted);
assert_eq!(tweet.posted_id, Some("twitter-123".to_string()));
}
#[test]
fn test_cancel() {
let (_dir, scheduler) = test_scheduler();
let scheduled = scheduler.schedule_text("Cancel me!", None).unwrap();
scheduler.cancel(&scheduled.id).unwrap();
let pending = scheduler.pending().unwrap();
assert!(pending.is_empty());
}
#[test]
fn test_stats() {
let (_dir, scheduler) = test_scheduler();
scheduler.schedule_text("Tweet 1", None).unwrap();
let s2 = scheduler.schedule_text("Tweet 2", None).unwrap();
scheduler.mark_posted(&s2.id, "123").unwrap();
let stats = scheduler.stats().unwrap();
assert_eq!(stats.pending, 1);
assert_eq!(stats.posted, 1);
assert_eq!(stats.total(), 2);
}
#[test]
fn test_cleanup() {
let (_dir, scheduler) = test_scheduler();
let s1 = scheduler.schedule_text("Posted", None).unwrap();
scheduler.schedule_text("Pending", None).unwrap();
scheduler.mark_posted(&s1.id, "123").unwrap();
let removed = scheduler.cleanup().unwrap();
assert_eq!(removed, 1);
let stats = scheduler.stats().unwrap();
assert_eq!(stats.total(), 1);
}
#[test]
fn test_schedule_status_serialization() {
let status = ScheduleStatus::Pending;
let json = serde_json::to_string(&status).unwrap();
assert_eq!(json, "\"pending\"");
let parsed: ScheduleStatus = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, ScheduleStatus::Pending);
}
}