use crate::channels::ChannelFactory;
use crate::config::Config;
use crate::db::CronJobRepository;
use crate::db::CronJobRunRepository;
use crate::db::models::{CronJob, CronJobRun};
use crate::services::{ServiceContext, SessionService};
use chrono::Utc;
use cron::Schedule;
use std::str::FromStr;
use std::sync::Arc;
use uuid::Uuid;
const CRON_SESSION_NAME: &str = "Cron";
fn is_active_profile(job_profile: Option<&str>, active: Option<&str>) -> bool {
match job_profile {
None => true,
Some(stamped) => stamped == active.unwrap_or("default"),
}
}
pub const REBUILD_JOB_NAME: &str = "__opencrabs_rebuild__";
pub async fn schedule_background_rebuild(
pool: crate::db::Pool,
session_id: Uuid,
deliver_to: Option<String>,
) -> anyhow::Result<()> {
let repo = CronJobRepository::new(pool);
if let Ok(existing) = repo.list_all().await {
for j in existing.iter().filter(|j| j.name == REBUILD_JOB_NAME) {
let _ = repo.delete(&j.id.to_string()).await;
}
}
let now = Utc::now();
let job = CronJob {
id: Uuid::new_v4(),
name: REBUILD_JOB_NAME.to_string(),
cron_expr: "* * * * *".to_string(),
timezone: "UTC".to_string(),
prompt: session_id.to_string(),
provider: None,
model: None,
thinking: "off".to_string(),
auto_approve: true,
deliver_to,
deliver_api_key: None,
enabled: true,
last_run_at: None,
next_run_at: None,
created_at: now,
updated_at: now,
profile_name: Some(crate::config::profile::current_profile_name()),
};
repo.insert(&job).await?;
tracing::info!("Background rebuild queued for session {session_id}");
Ok(())
}
async fn run_rebuild_job(job: &CronJob, ctx: &ServiceContext) -> anyhow::Result<()> {
use crate::brain::SelfUpdater;
let repo = CronJobRepository::new(ctx.pool());
if let Err(e) = repo.delete(&job.id.to_string()).await {
tracing::error!("rebuild job: failed to delete self: {e}");
}
let session_id = Uuid::parse_str(job.prompt.trim()).unwrap_or_else(|_| Uuid::nil());
tracing::info!("Background rebuild starting (will resume session {session_id})");
let updater =
SelfUpdater::auto_detect().map_err(|e| anyhow::anyhow!("rebuild: auto_detect: {e}"))?;
match updater
.build_streaming(|line| tracing::debug!("rebuild: {line}"))
.await
{
Ok(built_path) => {
tracing::info!(
"Background rebuild succeeded: {} — reloading",
built_path.display()
);
deliver_rebuild_status(
job,
"✅ Rebuilt from source — reloading into the new binary now.",
)
.await;
if let Err(e) = SelfUpdater::restart_into(&built_path, session_id) {
tracing::error!("Background rebuild restart failed: {e}");
return Err(anyhow::anyhow!("rebuild restart failed: {e}"));
}
Ok(()) }
Err(out) => {
tracing::error!("Background rebuild failed: {out}");
deliver_rebuild_status(job, &format!("⚠️ Background rebuild failed:\n{out}")).await;
Ok(())
}
}
}
async fn deliver_rebuild_status(job: &CronJob, msg: &str) {
if let Some(ref deliver_to) = job.deliver_to {
for target in deliver_to
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
{
deliver_result(target, &job.name, msg, job.deliver_api_key.as_deref()).await;
}
}
}
pub struct CronScheduler {
repo: CronJobRepository,
run_repo: CronJobRunRepository,
factory: Arc<ChannelFactory>,
service_context: ServiceContext,
cron_session_id: Option<Uuid>,
}
impl CronScheduler {
pub fn new(
repo: CronJobRepository,
run_repo: CronJobRunRepository,
factory: Arc<ChannelFactory>,
service_context: ServiceContext,
) -> Self {
Self {
repo,
run_repo,
factory,
service_context,
cron_session_id: None,
}
}
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
tokio::spawn(self.run())
}
pub async fn run(mut self) {
match self.resolve_or_create_cron_session().await {
Ok(id) => {
self.cron_session_id = Some(id);
tracing::info!(
"Cron scheduler started — polling every 60s, cron session: {}",
id
);
}
Err(e) => {
tracing::error!("Cron scheduler failed to create session: {e}");
}
}
loop {
if let Err(e) = self.tick().await {
tracing::error!("Cron scheduler tick error: {e}");
}
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
}
async fn resolve_or_create_cron_session(&self) -> anyhow::Result<Uuid> {
use crate::db::repository::SessionListOptions;
let session_svc = SessionService::new(self.service_context.clone());
let sessions = session_svc
.list_sessions(SessionListOptions {
include_archived: false,
limit: None,
offset: 0,
query: None,
})
.await?;
if let Some(existing) = sessions
.iter()
.find(|s| s.title.as_deref().is_some_and(|n| n == CRON_SESSION_NAME))
{
return Ok(existing.id);
}
let config = Config::load()?;
let provider = config.cron.default_provider.clone();
let model = config.cron.default_model.clone();
let session = session_svc
.create_session_with_provider(Some(CRON_SESSION_NAME.to_string()), provider, model)
.await?;
Ok(session.id)
}
async fn tick(&self) -> anyhow::Result<()> {
let jobs = self.repo.list_enabled().await?;
let now = Utc::now();
for job in &jobs {
if self.is_due(job, now) {
tracing::info!("Cron job '{}' ({}) is due — executing", job.name, job.id);
let next_run = self.next_run_after(job, now);
let next_run_str = next_run.map(|dt| dt.to_rfc3339());
self.repo
.update_last_run(&job.id.to_string(), next_run_str.as_deref())
.await?;
let Some(cron_sid) = self.cron_session_id else {
tracing::error!(
"Cron job '{}' — no cron session available, skipping",
job.name
);
continue;
};
let job = job.clone();
let factory = self.factory.clone();
let ctx = self.service_context.clone();
let run_repo = self.run_repo.clone();
tokio::spawn(async move {
let profile = job.profile_name.as_deref();
let active = crate::config::profile::active_profile().unwrap_or("default");
let needs_scope = profile.is_some() && profile != Some(active);
let result = if needs_scope {
crate::config::profile::with_profile_home_async(profile, async {
tracing::info!(
"Cron job '{}' — task-local profile home set to {:?}",
job.name,
crate::config::opencrabs_home()
);
execute_job(&job, &factory, &ctx, cron_sid, &run_repo).await
})
.await
} else {
execute_job(&job, &factory, &ctx, cron_sid, &run_repo).await
};
if let Err(e) = result {
tracing::error!("Cron job '{}' failed: {e}", job.name);
}
});
}
}
Ok(())
}
fn is_due(&self, job: &CronJob, now: chrono::DateTime<Utc>) -> bool {
match &job.next_run_at {
Some(next) => *next <= now,
None => {
let cron_str = format!("0 {}", job.cron_expr);
if let Ok(schedule) = Schedule::from_str(&cron_str) {
if let Some(next) = schedule.upcoming(Utc).next() {
let diff = next - now;
diff.num_seconds() <= 60
} else {
false
}
} else {
tracing::warn!(
"Invalid cron expression for job '{}': {}",
job.name,
job.cron_expr
);
false
}
}
}
}
fn next_run_after(
&self,
job: &CronJob,
after: chrono::DateTime<Utc>,
) -> Option<chrono::DateTime<Utc>> {
let cron_str = format!("0 {}", job.cron_expr);
Schedule::from_str(&cron_str)
.ok()
.and_then(|s| s.after(&after).next())
}
}
async fn resolve_job_agent(
job: &CronJob,
factory: &ChannelFactory,
ctx: &ServiceContext,
) -> anyhow::Result<(Config, Arc<crate::brain::agent::AgentService>)> {
let current = crate::config::profile::current_profile_name();
if is_active_profile(job.profile_name.as_deref(), Some(¤t)) {
return Ok((Config::load()?, factory.create_agent_service().await));
}
let profile = job.profile_name.as_deref();
tracing::info!(
"Cron job '{}' belongs to profile {:?} (current profile {:?}); \
running under its own profile context",
job.name,
profile,
current
);
let (config, brain, home) = crate::config::profile::with_profile_home(profile, || {
let config = Config::load()?;
let home = crate::config::opencrabs_home();
let brain = crate::brain::prompt_builder::BrainLoader::new(home.clone())
.build_core_brain(None, None);
anyhow::Ok((config, brain, home))
})?;
let provider = crate::brain::provider::create_provider(&config).await?;
let mut builder = crate::brain::agent::AgentService::new(provider, ctx.clone(), &config)
.await
.with_system_brain(brain)
.with_working_directory(home.clone())
.with_brain_path(home);
if let Some(registry) = factory.tool_registry() {
builder = builder.with_tool_registry(registry);
}
Ok((config, Arc::new(builder)))
}
async fn execute_job(
job: &CronJob,
factory: &ChannelFactory,
ctx: &ServiceContext,
cron_session_id: Uuid,
run_repo: &CronJobRunRepository,
) -> anyhow::Result<()> {
if job.name == REBUILD_JOB_NAME {
return run_rebuild_job(job, ctx).await;
}
let (config, agent) = resolve_job_agent(job, factory, ctx).await?;
let effective_provider = job
.provider
.clone()
.or_else(|| config.cron.default_provider.clone());
let effective_model = job
.model
.clone()
.or_else(|| config.cron.default_model.clone());
if let Some(ref provider_name) = effective_provider
&& let Some(ref model) = effective_model
{
match crate::brain::provider::create_provider_by_name(&config, provider_name).await {
Ok(provider) => {
let supported = provider.supported_models();
if !supported.is_empty() && !supported.iter().any(|m| m == model) {
tracing::error!(
"Cron job '{}' — model '{}' is NOT supported by provider '{}' \
(supported: {}). SKIPPING job — fix cron config. \
Either set a valid model or remove the model override to use \
the provider's default ('{}').",
job.name,
model,
provider_name,
supported.join(", "),
provider.default_model(),
);
let run = CronJobRun::new_running(
job.id,
job.name.clone(),
effective_provider.clone(),
effective_model.clone(),
);
let run_id = run.id.to_string();
if let Err(e) = run_repo.insert(&run).await {
tracing::error!("Failed to insert cron run record: {e}");
}
let err_msg = format!(
"model '{}' not supported by provider '{}' — cron config invalid",
model, provider_name
);
if let Err(db_err) = run_repo.complete_error(&run_id, &err_msg).await {
tracing::error!("Failed to save cron run error to DB: {db_err}");
}
return Ok(());
}
}
Err(e) => {
tracing::warn!(
"Cron job '{}' — cannot pre-validate model (provider '{}' creation \
failed: {e}) — proceeding with default validation",
job.name,
provider_name
);
}
}
}
let run = CronJobRun::new_running(
job.id,
job.name.clone(),
effective_provider.clone(),
effective_model.clone(),
);
let run_id = run.id.to_string();
if let Err(e) = run_repo.insert(&run).await {
tracing::error!("Failed to insert cron run record: {e}");
}
let session_id = cron_session_id;
tracing::info!(
"Cron job '{}' — using cron session {}",
job.name,
session_id
);
if let Some(ref provider_name) = effective_provider {
match crate::brain::provider::create_provider_by_name(&config, provider_name).await {
Ok(provider) => {
tracing::info!(
"Cron job '{}' — using provider '{}'",
job.name,
provider_name
);
agent.swap_provider_for_session(
cron_session_id,
provider.clone(),
provider.default_model().to_string(),
);
}
Err(e) => {
tracing::warn!(
"Cron job '{}' — failed to create provider '{}': {e}, using system default",
job.name,
provider_name
);
}
}
}
let result = agent
.send_message_with_tools_and_callback(
session_id,
job.prompt.clone(),
effective_model,
None, Some(Arc::new(|_| {
Box::pin(async { Ok((true, false)) })
})),
None, None, "cron",
None,
)
.await;
match result {
Ok(response) => {
let clean = crate::utils::sanitize::strip_llm_artifacts(&response.content);
tracing::info!(
"Cron job '{}' completed — {} tokens, ${:.6}",
job.name,
response.usage.input_tokens + response.usage.output_tokens,
response.cost
);
if let Err(e) = run_repo
.complete_success(
&run_id,
&clean,
response.usage.input_tokens as i64,
response.usage.output_tokens as i64,
response.cost,
)
.await
{
tracing::error!("Failed to save cron run result to DB: {e}");
}
if let Some(ref deliver_to) = job.deliver_to {
for target in deliver_to
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
{
deliver_result(target, &job.name, &clean, job.deliver_api_key.as_deref()).await;
}
}
}
Err(e) => {
tracing::error!("Cron job '{}' agent error: {e}", job.name);
let error_msg = format!("{e}");
if let Err(db_err) = run_repo.complete_error(&run_id, &error_msg).await {
tracing::error!("Failed to save cron run error to DB: {db_err}");
}
if let Some(ref deliver_to) = job.deliver_to {
let msg = format!("Cron job '{}' failed: {e}", job.name);
for target in deliver_to
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
{
deliver_result(target, &job.name, &msg, job.deliver_api_key.as_deref()).await;
}
}
}
}
Ok(())
}
async fn deliver_result(deliver_to: &str, job_name: &str, content: &str, api_key: Option<&str>) {
if deliver_to.starts_with("http://") || deliver_to.starts_with("https://") {
deliver_http(deliver_to, job_name, content, api_key).await;
return;
}
let parts: Vec<&str> = deliver_to.splitn(2, ':').collect();
if parts.len() != 2 {
tracing::warn!(
"Invalid deliver_to format '{}' for job '{}' — expected 'channel:id' or HTTP URL",
deliver_to,
job_name
);
return;
}
let (channel, target_id) = (parts[0], parts[1]);
let max_len = 4000;
let msg = if content.len() > max_len {
format!(
"{}...\n\n(truncated — full output in session)",
&content[..max_len]
)
} else {
content.to_string()
};
let delivery_msg = format!("⏰ **Cron: {job_name}**\n\n{msg}");
match channel {
"telegram" => {
#[cfg(feature = "telegram")]
{
tracing::info!("Delivering cron result to Telegram chat {target_id}");
deliver_telegram(target_id, &delivery_msg).await;
}
#[cfg(not(feature = "telegram"))]
{
tracing::warn!("Telegram feature not enabled — cannot deliver cron result");
}
}
"discord" => {
tracing::info!("Delivering cron result to Discord channel {target_id}");
tracing::warn!("Discord cron delivery not yet wired — result logged only");
}
"slack" => {
tracing::info!("Delivering cron result to Slack channel {target_id}");
tracing::warn!("Slack cron delivery not yet wired — result logged only");
}
other => {
tracing::warn!("Unknown delivery channel '{other}' for job '{job_name}'");
}
}
}
async fn deliver_http(url: &str, job_name: &str, content: &str, api_key: Option<&str>) {
let client = reqwest::Client::new();
let body = serde_json::json!({
"job_name": job_name,
"content": content,
"timestamp": chrono::Utc::now().to_rfc3339(),
});
let mut request = client.post(url).json(&body);
if let Some(key) = api_key {
request = request.header("Authorization", format!("Bearer {key}"));
}
match request.send().await {
Ok(resp) if resp.status().is_success() => {
tracing::info!("Cron result for '{job_name}' delivered to {url}");
}
Ok(resp) => {
tracing::warn!(
"HTTP delivery to {url} failed ({}): {:?}",
resp.status(),
resp.text().await.unwrap_or_default()
);
}
Err(e) => {
tracing::error!("HTTP delivery to {url} error: {e}");
}
}
}
#[cfg(feature = "telegram")]
async fn deliver_telegram(chat_id: &str, message: &str) {
let brain_path = crate::brain::BrainLoader::resolve_path();
let keys_path = brain_path.join("keys.toml");
let token = if let Ok(content) = std::fs::read_to_string(&keys_path) {
content.parse::<toml::Table>().ok().and_then(|t| {
t.get("channels")?
.as_table()?
.get("telegram")?
.as_table()?
.get("token")?
.as_str()
.map(String::from)
})
} else {
None
};
let Some(token) = token else {
tracing::warn!("No Telegram bot token found in keys.toml — cannot deliver cron result");
return;
};
let url = format!("https://api.telegram.org/bot{}/sendMessage", token);
let client = reqwest::Client::new();
let body = serde_json::json!({
"chat_id": chat_id,
"text": message,
"parse_mode": "Markdown"
});
match client.post(&url).json(&body).send().await {
Ok(resp) if resp.status().is_success() => {
tracing::info!("Cron result delivered to Telegram chat {chat_id}");
}
Ok(resp) => {
tracing::warn!(
"Telegram delivery failed ({}): {:?}",
resp.status(),
resp.text().await.unwrap_or_default()
);
}
Err(e) => {
tracing::error!("Telegram delivery HTTP error: {e}");
}
}
}