use crate::utils::lock::LockRecover;
use uuid::Uuid;
use crate::error::AppError;
use crate::paths::workbenches_dir;
use crate::routine_storage::{remove_routine_dir, write_routine};
use crate::utils::cron::{normalize_schedule, validate_cron};
use crate::utils::time::now_secs;
use super::agents::{available_agents, load_agent_command, AgentLoadError};
use super::cleanup::{
cleanup_expired_workbenches, max_runtime_ceiling_secs, parse_workbench_name, ttl_ceiling_secs,
};
use super::command::{build_routine_command, slugify};
use super::model::{
CleanupResponse, CreateRoutineRequest, Repository, Routine, RoutineListQuery, RoutineResponse,
RoutineSort, RoutineStore, SortOrder, UpdateRoutineRequest,
};
fn reject_blank(field: &str, value: &str) -> Result<(), AppError> {
if value.trim().is_empty() {
return Err(AppError::BadRequest(format!(
"routine {field} must not be empty"
)));
}
Ok(())
}
fn reject_zero_secs(field: &str, value: Option<u64>) -> Result<(), AppError> {
if value == Some(0) {
return Err(AppError::BadRequest(format!(
"routine {field} must be greater than zero"
)));
}
Ok(())
}
fn reject_over_ceiling(field: &str, value: Option<u64>, ceiling: u64) -> Result<(), AppError> {
if let Some(secs) = value {
if secs > ceiling {
return Err(AppError::BadRequest(format!(
"routine {field} {secs} exceeds the ceiling of {ceiling}s derived from this routine's schedule"
)));
}
}
Ok(())
}
fn repo_sort_key(routine: &Routine) -> (bool, String) {
match routine.repositories.first() {
Some(repo) => (false, repo.repository.to_lowercase()),
None => (true, String::new()),
}
}
fn validate_agent(agent: &str) -> Result<(), AppError> {
let agents = available_agents();
if !agents.iter().any(|known| known == agent) {
return Err(AppError::BadRequest(format!(
"unknown agent \"{agent}\"; valid agents: {}",
agents.join(", ")
)));
}
match load_agent_command(agent) {
Ok(_) | Err(AgentLoadError::Missing) => Ok(()),
Err(AgentLoadError::Parse(err)) => Err(AppError::BadRequest(format!(
"agent {agent:?} has a malformed config: {err}"
))),
Err(AgentLoadError::Unreadable(err)) => Err(AppError::BadRequest(format!(
"agent {agent:?} has an unreadable config: {err}"
))),
}
}
pub fn svc_list(store: &RoutineStore, query: &RoutineListQuery) -> Vec<RoutineResponse> {
let lock = store.lock_recover();
let mut routines: Vec<Routine> = lock.values().cloned().collect();
drop(lock);
if let Some(needle) = query
.repository
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
let needle = needle.to_lowercase();
routines.retain(|routine| {
routine
.repositories
.iter()
.any(|repo| repo.repository.to_lowercase().contains(&needle))
});
}
if query.local_only.unwrap_or(false) {
let me = crate::machine::current_machine();
routines.retain(|routine| crate::machine::targets(&routine.machines, &me));
}
match query.sort {
RoutineSort::Created => routines.sort_by_key(|routine| routine.created_at),
RoutineSort::Updated => routines.sort_by_key(|routine| routine.updated_at),
RoutineSort::Title => routines.sort_by_key(|routine| routine.title.to_lowercase()),
RoutineSort::Repository => routines.sort_by_key(repo_sort_key),
}
if query.order == SortOrder::Desc {
routines.reverse();
}
let include_prompts = query.include_prompts.unwrap_or(false);
routines
.into_iter()
.map(|mut routine| {
if !include_prompts {
routine.prompt.clear();
}
RoutineResponse::from_routine(routine)
})
.collect()
}
pub fn svc_get(store: &RoutineStore, id: &str) -> Result<RoutineResponse, AppError> {
let routine = store
.lock_recover()
.get(id)
.cloned()
.ok_or(AppError::NotFound)?;
Ok(RoutineResponse::from_routine(routine))
}
fn validate_prompt(prompt: &str) -> Result<(), AppError> {
if prompt.trim().is_empty() {
return Err(AppError::BadRequest("prompt must not be empty".to_string()));
}
Ok(())
}
const MAX_TITLE_LEN: usize = 200;
fn validate_title(title: &str) -> Result<(), AppError> {
if !title.chars().any(|ch| ch.is_ascii_alphanumeric()) {
return Err(AppError::BadRequest(
"title must contain at least one alphanumeric character".to_string(),
));
}
if title.trim().chars().count() > MAX_TITLE_LEN {
return Err(AppError::BadRequest(format!(
"title must be at most {MAX_TITLE_LEN} characters"
)));
}
Ok(())
}
fn validate_repositories(repos: &[Repository]) -> Result<Vec<Repository>, AppError> {
let mut normalized = Vec::with_capacity(repos.len());
for (index, repo) in repos.iter().enumerate() {
let repository = repo.repository.trim();
if repository.is_empty() {
return Err(AppError::BadRequest(format!(
"repositories[{index}].repository must not be empty or whitespace-only"
)));
}
let branch = match &repo.branch {
Some(branch) => {
let trimmed = branch.trim();
if trimmed.is_empty() {
return Err(AppError::BadRequest(format!(
"repositories[{index}].branch must not be empty or whitespace-only when set"
)));
}
Some(trimmed.to_string())
}
None => None,
};
normalized.push(Repository {
repository: repository.to_string(),
branch,
});
}
Ok(normalized)
}
fn validate_tags(tags: &[String]) -> Result<Vec<String>, AppError> {
let mut normalized = Vec::with_capacity(tags.len());
for (index, tag) in tags.iter().enumerate() {
let trimmed = tag.trim();
if trimmed.is_empty() {
return Err(AppError::BadRequest(format!(
"tags[{index}] must not be empty or whitespace-only"
)));
}
normalized.push(trimmed.to_string());
}
Ok(normalized)
}
pub fn svc_create(
store: &RoutineStore,
req: CreateRoutineRequest,
) -> Result<RoutineResponse, AppError> {
validate_cron(&req.schedule)?;
reject_blank("title", &req.title)?;
validate_prompt(&req.prompt)?;
reject_zero_secs("ttl_secs", req.ttl_secs)?;
reject_zero_secs("max_runtime_secs", req.max_runtime_secs)?;
let ceiling_schedule = normalize_schedule(&req.schedule);
reject_over_ceiling(
"ttl_secs",
req.ttl_secs,
ttl_ceiling_secs(&ceiling_schedule),
)?;
reject_over_ceiling(
"max_runtime_secs",
req.max_runtime_secs,
max_runtime_ceiling_secs(&ceiling_schedule),
)?;
validate_title(&req.title)?;
validate_agent(&req.agent)?;
let repositories = validate_repositories(&req.repositories)?;
let tags = validate_tags(&req.tags)?;
let slug = slugify(&req.title);
{
let lock = store.lock_recover();
if lock.values().any(|routine| slugify(&routine.title) == slug) {
return Err(AppError::Conflict(format!(
"a routine with the name \"{slug}\" already exists"
)));
}
}
let now = now_secs();
let routine = Routine {
id: Uuid::new_v4().to_string(),
schedule: normalize_schedule(&req.schedule),
title: req.title,
agent: req.agent,
prompt: req.prompt,
repositories,
machines: req.machines,
enabled: req.enabled,
source: "managed".to_string(),
created_at: now,
updated_at: now,
last_manual_trigger_at: None,
last_scheduled_trigger_at: None,
ttl_secs: req.ttl_secs,
max_runtime_secs: req.max_runtime_secs,
tags,
};
write_routine(&routine).map_err(|_| AppError::Internal)?;
store
.lock_recover()
.insert(routine.id.clone(), routine.clone());
if let Err(err) = crate::sync::routines::sync_routines_to_crontab(store) {
log::warn!("crontab sync after routine create failed: {err}");
}
Ok(RoutineResponse::from_routine(routine))
}
pub fn svc_update(
store: &RoutineStore,
id: &str,
req: UpdateRoutineRequest,
) -> Result<RoutineResponse, AppError> {
if let Some(ref sched) = req.schedule {
validate_cron(sched)?;
}
if let Some(ref title) = req.title {
reject_blank("title", title)?;
validate_title(title)?;
}
if let Some(ref prompt) = req.prompt {
validate_prompt(prompt)?;
}
if let Some(ref agent) = req.agent {
validate_agent(agent)?;
}
reject_zero_secs("ttl_secs", req.ttl_secs)?;
reject_zero_secs("max_runtime_secs", req.max_runtime_secs)?;
let repositories = match req.repositories {
Some(ref repos) => Some(validate_repositories(repos)?),
None => None,
};
let tags = match req.tags {
Some(ref tags) => Some(validate_tags(tags)?),
None => None,
};
let mut lock = store.lock_recover();
let old_slug = slugify(&lock.get(id).ok_or(AppError::NotFound)?.title);
if let Some(ref new_title) = req.title {
let new_slug = slugify(new_title);
if new_slug != old_slug
&& lock
.values()
.any(|routine| routine.id != id && slugify(&routine.title) == new_slug)
{
return Err(AppError::Conflict(format!(
"a routine with the name \"{new_slug}\" already exists"
)));
}
}
let effective_schedule = match req.schedule.as_deref() {
Some(schedule) => normalize_schedule(schedule),
None => lock
.get(id)
.expect("id existence checked above, and the lock has been held continuously since")
.schedule
.clone(),
};
reject_over_ceiling(
"ttl_secs",
req.ttl_secs,
ttl_ceiling_secs(&effective_schedule),
)?;
reject_over_ceiling(
"max_runtime_secs",
req.max_runtime_secs,
max_runtime_ceiling_secs(&effective_schedule),
)?;
let routine = lock
.get_mut(id)
.expect("id existence checked above, and the lock has been held continuously since");
if let Some(schedule) = req.schedule {
routine.schedule = normalize_schedule(&schedule);
}
if let Some(title) = req.title {
routine.title = title;
}
if let Some(agent) = req.agent {
routine.agent = agent;
}
if let Some(prompt) = req.prompt {
routine.prompt = prompt;
}
if let Some(repositories) = repositories {
routine.repositories = repositories;
}
if let Some(machines) = req.machines {
routine.machines = machines;
}
if let Some(enabled) = req.enabled {
routine.enabled = enabled;
}
if let Some(ttl) = req.ttl_secs {
routine.ttl_secs = Some(ttl);
}
if let Some(max_runtime) = req.max_runtime_secs {
routine.max_runtime_secs = Some(max_runtime);
}
if let Some(tags) = tags {
routine.tags = tags;
}
routine.updated_at = now_secs();
let routine = routine.clone();
drop(lock);
let new_slug = slugify(&routine.title);
write_routine(&routine).map_err(|_| AppError::Internal)?;
if new_slug != old_slug {
remove_routine_dir(&old_slug).map_err(|_| AppError::Internal)?;
}
if let Err(err) = crate::sync::routines::sync_routines_to_crontab(store) {
log::warn!("crontab sync after routine update failed: {err}");
}
Ok(RoutineResponse::from_routine(routine))
}
pub fn svc_delete(store: &RoutineStore, id: &str) -> Result<RoutineResponse, AppError> {
let routine = store.lock_recover().remove(id).ok_or(AppError::NotFound)?;
remove_routine_dir(&slugify(&routine.title)).map_err(|_| AppError::Internal)?;
if let Err(err) = crate::sync::routines::sync_routines_to_crontab(store) {
log::warn!("crontab sync after routine delete failed: {err}");
}
Ok(RoutineResponse::from_routine(routine))
}
pub fn svc_trigger(store: &RoutineStore, id: &str) -> Result<Routine, AppError> {
if crate::global_lock::is_globally_locked() {
return Err(AppError::Locked("routines are globally locked".into()));
}
let mut lock = store.lock_recover();
let routine = lock.get_mut(id).ok_or(AppError::NotFound)?;
routine.last_manual_trigger_at = Some(now_secs());
let routine = routine.clone();
drop(lock);
write_routine(&routine).map_err(|_| AppError::Internal)?;
spawn_routine_command(&routine);
Ok(routine)
}
pub fn svc_trigger_scheduled(store: &RoutineStore, id: &str) -> Result<Routine, AppError> {
if crate::global_lock::is_globally_locked() {
return Err(AppError::Locked("routines are globally locked".into()));
}
let routine = store
.lock_recover()
.get(id)
.cloned()
.ok_or(AppError::NotFound)?;
spawn_routine_command(&routine);
Ok(routine)
}
fn spawn_routine_command(routine: &Routine) {
match load_agent_command(&routine.agent) {
Ok(agent) => {
let cmd = build_routine_command(routine, &agent);
let mut command = std::process::Command::new("sh");
command.arg("-lc").arg(&cmd);
crate::utils::process::spawn_and_reap(command, "routine command");
}
Err(err) => log::warn!(
"trigger: cannot load agent {:?} ({}) for routine {:?}",
routine.agent,
err,
routine.id
),
}
}
pub fn svc_cleanup(store: &RoutineStore) -> CleanupResponse {
CleanupResponse {
removed: cleanup_expired_workbenches(store),
}
}
pub fn svc_logs(store: &RoutineStore, id: &str) -> Result<String, AppError> {
let routine = store
.lock_recover()
.get(id)
.cloned()
.ok_or(AppError::NotFound)?;
let slug = slugify(&routine.title);
let mut newest: Option<(u64, String)> = None;
if let Ok(entries) = std::fs::read_dir(workbenches_dir()) {
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().into_owned();
if let Some((dir_slug, ts)) = parse_workbench_name(&name) {
if dir_slug == slug && newest.as_ref().is_none_or(|(newest_ts, _)| ts > *newest_ts)
{
newest = Some((ts, name));
}
}
}
}
let Some((_, dir)) = newest else {
return Ok(String::new());
};
let log_path = workbenches_dir().join(dir).join("agent.log");
if !log_path.exists() {
return Ok(String::new());
}
std::fs::read_to_string(&log_path).map_err(|_| AppError::Internal)
}
#[cfg(test)]
#[path = "service_tests.rs"]
mod service_tests;