use super::common::*;
use crate::context::ParsedFile;
use pecto_core::model::*;
pub fn extract(file: &ParsedFile) -> Option<Capability> {
let root = file.tree.root_node();
let source = file.source.as_bytes();
let full_text = &file.source;
if !full_text.contains("@app.task")
&& !full_text.contains("@shared_task")
&& !full_text.contains("@periodic_task")
&& !full_text.contains("@celery")
&& !full_text.contains("crontab")
&& !full_text.contains("add_job")
&& !full_text.contains("APScheduler")
&& !full_text.contains("BlockingScheduler")
&& !full_text.contains("BackgroundScheduler")
&& !full_text.contains("AsyncIOScheduler")
{
return None;
}
let mut tasks = Vec::new();
for i in 0..root.named_child_count() {
let node = root.named_child(i).unwrap();
if node.kind() == "decorated_definition" {
let decorators = collect_decorators(&node, source);
let inner = match get_inner_definition(&node) {
Some(n) if n.kind() == "function_definition" => n,
_ => continue,
};
let is_task = decorators.iter().any(|d| {
d.name == "task"
|| d.name == "shared_task"
|| d.name == "periodic_task"
|| d.full_name.contains("celery")
|| d.full_name.contains("app.task")
});
if !is_task {
continue;
}
let func_name = get_def_name(&inner, source);
let schedule = decorators
.iter()
.find(|d| d.name == "periodic_task")
.and_then(|d| {
d.args
.iter()
.find(|a| a.contains("crontab") || a.contains("schedule"))
.cloned()
})
.unwrap_or_else(|| "celery-task".to_string());
tasks.push(ScheduledTask {
name: func_name,
schedule: clean_string_literal(&schedule),
description: Some("Celery task".to_string()),
});
}
}
if full_text.contains("add_job") {
extract_apscheduler_jobs(full_text, &mut tasks);
}
if tasks.is_empty() {
return None;
}
let file_stem = file
.path
.rsplit('/')
.next()
.unwrap_or(&file.path)
.trim_end_matches(".py");
let capability_name = to_kebab_case(file_stem);
let mut capability = Capability::new(capability_name, file.path.clone());
capability.scheduled_tasks = tasks;
Some(capability)
}
fn extract_apscheduler_jobs(source: &str, tasks: &mut Vec<ScheduledTask>) {
let mut blocks = Vec::new();
let mut current_block = String::new();
let mut paren_depth = 0i32;
let mut in_add_job = false;
for line in source.lines() {
let trimmed = line.trim();
if !in_add_job {
if trimmed.contains("add_job(") {
in_add_job = true;
current_block.clear();
for ch in trimmed.chars() {
if ch == '(' {
paren_depth += 1;
} else if ch == ')' {
paren_depth -= 1;
}
}
current_block.push_str(trimmed);
if paren_depth <= 0 {
blocks.push(current_block.clone());
in_add_job = false;
paren_depth = 0;
}
}
} else {
for ch in trimmed.chars() {
if ch == '(' {
paren_depth += 1;
} else if ch == ')' {
paren_depth -= 1;
}
}
current_block.push(' ');
current_block.push_str(trimmed);
if paren_depth <= 0 {
blocks.push(current_block.clone());
in_add_job = false;
paren_depth = 0;
}
}
}
for block in &blocks {
let after_add_job = match block.split("add_job(").nth(1) {
Some(s) => s,
None => continue,
};
let inner = strip_outer_closing_paren(after_add_job.trim_end());
let args: Vec<&str> = inner.split(',').map(|s| s.trim()).collect();
if args.is_empty() {
continue;
}
let func_ref = args[0].trim();
let task_name = func_ref
.rsplit('.')
.next()
.unwrap_or(func_ref)
.trim_matches('"')
.trim_matches('\'')
.to_string();
let display_name = args
.iter()
.find_map(|a| extract_simple_kwarg(a, "id"))
.map(|v| v.trim_matches('"').trim_matches('\'').to_string())
.unwrap_or(task_name);
let mut schedule = "apscheduler-job".to_string();
let full_block = block.to_lowercase();
if full_block.contains("intervaltrigger")
|| full_block.contains("'interval'")
|| full_block.contains("\"interval\"")
{
for arg in &args {
if let Some(v) = extract_simple_kwarg(arg, "minutes") {
if v.chars().all(|c| c.is_ascii_digit()) {
schedule = format!("every {}min", v);
} else {
schedule = "interval".to_string();
}
break;
} else if let Some(v) = extract_simple_kwarg(arg, "hours") {
if v.chars().all(|c| c.is_ascii_digit()) {
schedule = format!("every {}h", v);
} else {
schedule = "interval".to_string();
}
break;
} else if let Some(v) = extract_simple_kwarg(arg, "seconds") {
if v.chars().all(|c| c.is_ascii_digit()) {
schedule = format!("every {}s", v);
} else {
schedule = "interval".to_string();
}
break;
}
}
} else if full_block.contains("crontrigger")
|| full_block.contains("'cron'")
|| full_block.contains("\"cron\"")
{
schedule = "cron".to_string();
}
tasks.push(ScheduledTask {
name: display_name,
schedule,
description: Some("APScheduler job".to_string()),
});
}
}
fn strip_outer_closing_paren(s: &str) -> &str {
if let Some(pos) = s.rfind(')') {
let mut depth = 1i32; for (i, ch) in s.char_indices() {
match ch {
'(' => depth += 1,
')' => {
depth -= 1;
if depth == 0 {
return &s[..i];
}
}
_ => {}
}
}
&s[..pos]
} else {
s
}
}
fn extract_simple_kwarg<'a>(text: &'a str, key: &str) -> Option<&'a str> {
let pattern = format!("{}=", key);
let start = text.find(&pattern)? + pattern.len();
let remaining = &text[start..];
let end = remaining.find([',', ')', ' ']).unwrap_or(remaining.len());
let val = remaining[..end].trim();
if val.is_empty() { None } else { Some(val) }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::ParsedFile;
fn parse_file(source: &str, path: &str) -> ParsedFile {
ParsedFile::parse(source.to_string(), path.to_string()).unwrap()
}
#[test]
fn test_celery_task() {
let source = r#"
from celery import shared_task
@shared_task
def send_email(to: str, subject: str):
mail.send(to, subject)
@shared_task
def process_order(order_id: int):
Order.process(order_id)
"#;
let file = parse_file(source, "tasks/email_tasks.py");
let capability = extract(&file).unwrap();
assert_eq!(capability.scheduled_tasks.len(), 2);
assert_eq!(capability.scheduled_tasks[0].name, "send_email");
assert_eq!(capability.scheduled_tasks[1].name, "process_order");
}
#[test]
fn test_no_tasks() {
let source = r#"
def regular_function():
pass
"#;
let file = parse_file(source, "utils.py");
assert!(extract(&file).is_none());
}
#[test]
fn test_apscheduler_multiline() {
let source = r#"
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
scheduler = BlockingScheduler()
scheduler.add_job(
analyst.run_cycle,
trigger=IntervalTrigger(minutes=30),
id="agent_cycle",
name="Agent Cycle",
max_instances=1,
)
scheduler.add_job(
portfolio.snapshot,
trigger=IntervalTrigger(hours=1),
id="portfolio_snapshot",
name="Portfolio Snapshot",
)
"#;
let file = parse_file(source, "scheduler.py");
let capability = extract(&file).unwrap();
assert_eq!(capability.scheduled_tasks.len(), 2);
assert_eq!(capability.scheduled_tasks[0].name, "agent_cycle");
assert_eq!(capability.scheduled_tasks[0].schedule, "every 30min");
assert_eq!(capability.scheduled_tasks[1].name, "portfolio_snapshot");
assert_eq!(capability.scheduled_tasks[1].schedule, "every 1h");
}
#[test]
fn test_apscheduler_compact_no_trailing_comma() {
let source = r#"
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
scheduler = BlockingScheduler()
scheduler.add_job(my_func, IntervalTrigger(hours=2), id="cleanup")
"#;
let file = parse_file(source, "scheduler.py");
let capability = extract(&file).unwrap();
assert_eq!(capability.scheduled_tasks.len(), 1);
assert_eq!(capability.scheduled_tasks[0].name, "cleanup");
assert_eq!(capability.scheduled_tasks[0].schedule, "every 2h");
}
}