use std::collections::HashMap;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use super::schedule::{ScheduleSink, Scheduler};
use crate::agent_api::{Agent, AgentSession, SessionOptions};
use crate::config::{AgentDir, ScheduleSpec};
use crate::error::Result;
struct SessionScheduleSink {
sessions: HashMap<String, Arc<AgentSession>>,
}
#[async_trait::async_trait]
impl ScheduleSink for SessionScheduleSink {
async fn fire(&self, spec: &ScheduleSpec) {
let Some(session) = self.sessions.get(&spec.name) else {
tracing::warn!(schedule = %spec.name, "no session for schedule; skipping fire");
return;
};
match session.send(&spec.prompt, None).await {
Ok(_) => tracing::info!(schedule = %spec.name, "scheduled turn completed"),
Err(e) => {
tracing::warn!(schedule = %spec.name, error = %e, "scheduled turn failed")
}
}
}
}
pub async fn serve_agent_dir(
agent: &Agent,
agent_dir: &AgentDir,
workspace: impl Into<String> + Clone,
extra: Option<SessionOptions>,
cancel: CancellationToken,
) -> Result<()> {
let extra = extra.unwrap_or_default();
let mut sessions = HashMap::new();
for spec in agent_dir.schedules.iter().filter(|s| s.enabled) {
let session = build_session(
agent,
agent_dir,
format!("schedule:{}", spec.name),
workspace.clone(),
&extra,
)
.await?;
sessions.insert(spec.name.clone(), Arc::new(session));
}
let scheduler = Scheduler::new(agent_dir.schedules.clone())?;
let sink: Arc<dyn ScheduleSink> = Arc::new(SessionScheduleSink { sessions });
scheduler.run(sink, cancel).await;
Ok(())
}
async fn build_session(
agent: &Agent,
agent_dir: &AgentDir,
session_id: String,
workspace: impl Into<String>,
extra: &SessionOptions,
) -> Result<AgentSession> {
let mut opts = extra.clone();
if opts.prompt_slots.is_none() {
opts.prompt_slots = Some(agent_dir.prompt_slots.clone());
}
opts.skill_dirs
.extend(agent_dir.config.skill_dirs.iter().cloned());
opts.session_id = Some(session_id.clone());
let resume = match &opts.session_store {
Some(store) => store.exists(&session_id).await?,
None => false,
};
let session = if resume {
agent.resume_session(&session_id, opts)?
} else {
agent.session(workspace, Some(opts))?
};
super::tools::install_agent_dir_tools(&session, &agent_dir.tools).await?;
Ok(session)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::CodeConfig;
use crate::prompts::SystemPromptSlots;
fn test_agent_config() -> CodeConfig {
let acl = r#"
default_model = "anthropic/claude-sonnet-4-20250514"
providers "anthropic" {
api_key = "test-key"
models "claude-sonnet-4-20250514" { name = "Claude Sonnet 4" }
}
"#;
CodeConfig::from_acl(acl).unwrap()
}
fn agent_dir_with(schedules: Vec<ScheduleSpec>) -> AgentDir {
AgentDir {
dir: std::path::PathBuf::from("/tmp/serve-test-agent"),
config: CodeConfig::default(),
prompt_slots: SystemPromptSlots {
role: Some("a scheduled test agent".to_string()),
..Default::default()
},
schedules,
tools: vec![],
}
}
#[tokio::test]
async fn serve_with_no_schedules_returns_immediately() {
let agent = Agent::from_config(test_agent_config()).await.unwrap();
let dir = agent_dir_with(vec![]);
let cancel = CancellationToken::new();
serve_agent_dir(&agent, &dir, "/tmp/ws".to_string(), None, cancel)
.await
.unwrap();
}
#[tokio::test]
async fn serve_builds_per_schedule_session_and_stops_on_cancel() {
let agent = Agent::from_config(test_agent_config()).await.unwrap();
let dir = agent_dir_with(vec![ScheduleSpec {
name: "tick".to_string(),
cron: "* * * * *".to_string(),
prompt: "do the scheduled work".to_string(),
enabled: true,
}]);
let cancel = CancellationToken::new();
cancel.cancel();
serve_agent_dir(&agent, &dir, "/tmp/ws".to_string(), None, cancel)
.await
.unwrap();
}
fn tick_dir() -> AgentDir {
agent_dir_with(vec![ScheduleSpec {
name: "tick".to_string(),
cron: "0 9 * * *".to_string(),
prompt: "do the scheduled work".to_string(),
enabled: true,
}])
}
#[tokio::test(flavor = "multi_thread")]
async fn rehydrates_existing_schedule_session_from_store() {
use crate::llm::Message;
let store: Arc<dyn crate::store::SessionStore> =
Arc::new(crate::store::MemorySessionStore::new());
let agent = Agent::from_config(test_agent_config()).await.unwrap();
let seed_opts = SessionOptions::new()
.with_session_store(store.clone())
.with_session_id("schedule:tick");
agent
.session("/tmp/ws", Some(seed_opts))
.unwrap()
.save()
.await
.unwrap();
let mut data = store.load("schedule:tick").await.unwrap().unwrap();
data.messages = vec![Message::user("prior turn")];
store.save(&data).await.unwrap();
let dir = tick_dir();
let extra = SessionOptions::new().with_session_store(store.clone());
let session = build_session(
&agent,
&dir,
"schedule:tick".to_string(),
"/tmp/ws".to_string(),
&extra,
)
.await
.unwrap();
assert_eq!(session.session_id(), "schedule:tick");
let history = session.history();
assert_eq!(history.len(), 1, "prior context must rehydrate on boot");
assert_eq!(history[0].text(), "prior turn");
}
#[tokio::test(flavor = "multi_thread")]
async fn fresh_schedule_session_when_store_has_no_prior() {
let store: Arc<dyn crate::store::SessionStore> =
Arc::new(crate::store::MemorySessionStore::new());
let agent = Agent::from_config(test_agent_config()).await.unwrap();
let dir = tick_dir();
let extra = SessionOptions::new().with_session_store(store.clone());
let session = build_session(
&agent,
&dir,
"schedule:tick".to_string(),
"/tmp/ws".to_string(),
&extra,
)
.await
.unwrap();
assert_eq!(session.session_id(), "schedule:tick");
assert!(
session.history().is_empty(),
"no prior session → fresh empty history"
);
}
#[tokio::test]
async fn fresh_schedule_session_when_no_store_configured() {
let agent = Agent::from_config(test_agent_config()).await.unwrap();
let dir = tick_dir();
let session = build_session(
&agent,
&dir,
"schedule:tick".to_string(),
"/tmp/ws".to_string(),
&SessionOptions::new(),
)
.await
.unwrap();
assert_eq!(session.session_id(), "schedule:tick");
assert!(session.history().is_empty());
}
}