use std::thread;
use chrono::Utc;
use colored::Colorize;
use inquire::{Confirm, Select, Text};
use uuid::Uuid;
use kto::agent::{self, DeepResearchResult, EnhancedSetupSuggestion};
use kto::config::Config;
use kto::db::Database;
use kto::extract;
use kto::fetch::{self, check_playwright, PageContent, PlaywrightStatus};
use kto::normalize::{hash_content, normalize};
use kto::transforms::{self, Intent, TransformMatch};
use kto::watch::{AgentConfig, Engine, Extraction, Snapshot, Watch};
use kto::error::Result;
use crate::utils::{extract_url, format_interval, get_clipboard_content, parse_interval_str, truncate_str};
use super::platform_detect;
use super::prompt_notification_setup;
const CONFIDENCE_THRESHOLD: f32 = 0.7;
pub fn is_daemon_running() -> bool {
let home = match std::env::var("HOME") {
Ok(h) => h,
Err(_) => return check_daemon_process(),
};
let pid_path = std::path::Path::new(&home).join(".local/share/kto/daemon.pid");
if let Ok(pid_str) = std::fs::read_to_string(&pid_path) {
if let Ok(pid) = pid_str.trim().parse::<u32>() {
if std::path::Path::new("/proc").join(pid.to_string()).exists() {
return true;
}
}
}
if let Ok(output) = std::process::Command::new("systemctl")
.args(["--user", "is-active", "kto"])
.output()
{
if output.status.success() {
let status = String::from_utf8_lossy(&output.stdout);
if status.trim() == "active" {
return true;
}
}
}
check_daemon_process()
}
fn check_daemon_process() -> bool {
if let Ok(output) = std::process::Command::new("pgrep")
.args(["-f", "kto daemon"])
.output()
{
return output.status.success() && !output.stdout.is_empty();
}
false
}
pub fn cmd_new(
description: Option<String>,
name_override: Option<String>,
interval_str: String,
use_js: bool,
use_rss: bool,
use_shell: bool,
use_agent: bool,
agent_instructions: Option<String>,
selector: Option<String>,
clipboard: bool,
tags: Vec<String>,
use_profile: bool,
research: bool,
yes: bool,
) -> Result<()> {
let db = Database::open()?;
let interval = parse_interval_str(&interval_str)?;
if yes && description.is_none() && !clipboard {
return Err(kto::KtoError::ConfigError(
"--yes requires a description argument or --clipboard".into()
));
}
let interactive = !yes && name_override.is_none() && atty::is(atty::Stream::Stdin);
let input = if clipboard {
match get_clipboard_content() {
Some(content) => {
println!(" Read from clipboard: {}", truncate_str(&content, 60));
content
}
None => {
return Err(kto::KtoError::ConfigError(
"Could not read from clipboard. Make sure you have content copied.".into()
));
}
}
} else {
match description {
Some(d) => d,
None if interactive => {
Text::new("What do you want to watch?")
.with_help_message("Enter a URL and optionally describe what to watch for")
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?
}
None => {
return Err(kto::KtoError::ConfigError(
"URL required. Usage: kto new <URL> --name <NAME>".into()
));
}
}
};
if use_shell {
let command = input.trim().to_string();
let name = name_override.unwrap_or_else(|| {
let first_word = command.split_whitespace().next().unwrap_or("shell");
format!("shell:{}", first_word)
});
println!("\n Executing: {}", command);
let content = fetch::fetch("", Engine::Shell { command: command.clone() }, &std::collections::HashMap::new())?;
let extracted = content.text.clone().unwrap_or_default();
if extracted.is_empty() {
println!(" Warning: Command produced no output.");
} else {
println!(" Got {} bytes of output.", extracted.len());
}
let mut watch = Watch::new(name.clone(), format!("shell://{}", command));
watch.interval_secs = interval.max(10);
watch.engine = Engine::Shell { command };
watch.extraction = Extraction::Full;
watch.tags = tags;
if use_agent {
watch.agent_config = Some(AgentConfig {
enabled: true,
prompt_template: None,
instructions: agent_instructions,
});
}
let db = Database::open()?;
db.insert_watch(&watch)?;
let normalized = normalize(&extracted, &watch.normalization);
let hash = hash_content(&normalized);
let snapshot = Snapshot {
id: Uuid::new_v4(),
watch_id: watch.id,
fetched_at: Utc::now(),
raw_html: None, extracted: normalized,
content_hash: hash.clone(),
};
db.insert_snapshot(&snapshot)?;
println!("\n Created shell watch \"{}\"", name);
println!(" Initial hash: {}", &hash[..8]);
if watch.agent_config.is_some() {
println!(" AI Agent: enabled");
}
if !watch.tags.is_empty() {
println!(" Tags: {}", watch.tags.join(", "));
}
println!(" Checking every {}", format_interval(watch.interval_secs));
if !is_daemon_running() {
println!("\n Run `kto daemon` to start monitoring.");
}
return Ok(());
}
let mut url = extract_url(&input).ok_or_else(|| {
kto::KtoError::ConfigError(format!(
"No URL found in: '{}'\n Tip: Paste the full URL (e.g., https://example.com/page)",
truncate_str(&input, 50)
))
})?;
let detected_intent = Intent::detect(&input);
let transform_match = if detected_intent != Intent::Generic {
if let Ok(parsed_url) = url::Url::parse(&url) {
transforms::match_transform(&parsed_url, detected_intent)
} else {
None
}
} else {
None
};
if let Some(ref transform) = transform_match {
if transform.confidence >= 0.8 {
return create_watch_from_transform_magical(
&db,
&url,
transform,
name_override,
interval,
tags,
use_profile,
interactive,
yes,
);
} else if transform.confidence >= 0.5 && interactive && !yes {
let accepted = display_transform_suggestion(
&url,
transform,
&name_override,
interval,
&tags,
use_profile,
yes,
interactive,
)?;
if let Some((name, final_url, final_engine, final_extraction, final_interval)) = accepted {
return create_watch_from_transform(
&db,
name,
final_url,
final_engine,
final_extraction,
final_interval,
tags,
use_profile,
interactive,
yes,
);
}
}
}
let has_intent = input.contains(" for ") || input.contains(" when ") || input.contains(" if ")
|| input.contains("watch for") || input.contains("notify me") || input.contains("alert")
|| input.contains("price") || input.contains("stock") || input.contains("available")
|| input.contains("back in") || input.contains("drop");
let claude_available = agent::claude_version().is_some();
let use_enhanced_wizard = has_intent && claude_available && !use_agent && !use_rss && !use_shell;
let should_research = research;
if should_research && claude_available {
return run_deep_research_flow(
&input,
&url,
name_override,
interval,
tags,
use_profile,
yes,
interactive,
);
}
let (engine, content, extracted, title, enhanced_suggestion) = if use_enhanced_wizard {
println!("\n Analyzing {}...", url);
let (http_content, js_content) = dual_fetch(&url)?;
let platform_analysis = platform_detect::analyze_url_with_platform_kb(
&url,
detected_intent,
http_content.as_ref(),
js_content.as_ref(),
).ok();
if let Some(ref analysis) = platform_analysis {
if analysis.has_platform() {
if let Some(ref pm) = analysis.platform_match {
println!(" Platform: {} ({:.0}% confidence)", pm.platform_name.cyan(), pm.score * 100.0);
}
}
}
let http_extracted = http_content.as_ref()
.and_then(|c| extract::extract(c, &Extraction::Auto).ok());
let js_extracted = js_content.as_ref()
.and_then(|c| extract::extract(c, &Extraction::Auto).ok());
let title = js_content.as_ref()
.and_then(|c| extract::extract_title(&c.html))
.or_else(|| http_content.as_ref().and_then(|c| extract::extract_title(&c.html)))
.unwrap_or_else(|| "Untitled".to_string());
println!(" Analyzing with AI (dual fetch)...");
let suggestion = match agent::analyze_for_setup_v2(
&input,
http_extracted.as_deref(),
js_extracted.as_deref(),
) {
Ok(mut s) => {
if let Some(ref analysis) = platform_analysis {
if let Some(ref best) = analysis.best_strategy {
if matches!(best.engine, Engine::Playwright) && !s.needs_js {
if analysis.confidence > s.confidence {
s.needs_js = true;
s.js_reason = Some(format!(
"KB recommends for {}: {}",
analysis.platform_match.as_ref().map(|p| p.platform_name.as_str()).unwrap_or("platform"),
best.reason
));
}
}
}
}
s
}
Err(e) => {
eprintln!(" AI analysis failed: {} (using fallback)", e);
if let Some(ref analysis) = platform_analysis {
if let Some(ref best) = analysis.best_strategy {
let mut fallback = EnhancedSetupSuggestion::fallback(&url, &input);
fallback.needs_js = matches!(best.engine, Engine::Playwright);
fallback.js_reason = Some(format!("KB: {}", best.reason));
fallback.confidence = analysis.confidence;
fallback
} else {
EnhancedSetupSuggestion::fallback(&url, &input)
}
} else {
EnhancedSetupSuggestion::fallback(&url, &input)
}
}
};
let (final_engine, final_content) = if suggestion.needs_js && js_content.is_some() {
(Engine::Playwright, js_content.unwrap())
} else if http_content.is_some() {
(Engine::Http, http_content.unwrap())
} else if js_content.is_some() {
(Engine::Playwright, js_content.unwrap())
} else {
return Err(kto::KtoError::ConfigError("Both HTTP and JS fetches failed".into()));
};
let final_extracted = if suggestion.needs_js && js_extracted.is_some() {
js_extracted.unwrap()
} else {
http_extracted.or(js_extracted).unwrap_or_default()
};
(final_engine, final_content, final_extracted, title, Some(suggestion))
} else {
let engine = if use_rss {
if !fetch::detect_rss_url(&url) {
eprintln!(" Note: URL doesn't look like an RSS feed, but --rss was specified.");
eprintln!(" Will attempt to parse as RSS anyway.");
}
Engine::Rss
} else if use_js {
match check_playwright() {
PlaywrightStatus::Ready => Engine::Playwright,
status => {
eprintln!(" Warning: Playwright not ready. {}", status.install_instructions());
eprintln!(" Falling back to HTTP fetch.");
Engine::Http
}
}
} else if interactive {
println!("\n Analyzing {}...", url);
match fetch::probe_url(&url) {
Ok(probe) => {
if let Some(ref msg) = probe.message {
println!(" {}", msg);
}
if probe.suggested_engine == Engine::Rss {
println!(" Using RSS engine.");
Engine::Rss
}
else if let Some(ref rss_link) = probe.rss_url {
let use_rss = Confirm::new(&format!("RSS feed found at {}. Use that instead?", rss_link))
.with_default(true)
.prompt()
.unwrap_or(false);
if use_rss {
println!(" Switching to RSS feed.");
url = rss_link.clone();
Engine::Rss
} else {
probe.suggested_engine
}
}
else if probe.suggested_engine == Engine::Playwright {
match check_playwright() {
PlaywrightStatus::Ready => {
let use_js = Confirm::new("Enable JavaScript rendering?")
.with_default(true)
.prompt()
.unwrap_or(false);
if use_js { Engine::Playwright } else { Engine::Http }
}
status => {
println!(" JavaScript rendering recommended but not available.");
println!(" {}", status.install_instructions());
Engine::Http
}
}
} else {
probe.suggested_engine
}
}
Err(e) => {
eprintln!(" Could not analyze page: {}", e);
if fetch::detect_rss_url(&url) {
println!(" URL looks like RSS feed, using RSS engine.");
Engine::Rss
} else {
Engine::Http
}
}
}
} else if fetch::detect_rss_url(&url) {
println!("\n Detected RSS feed URL, using RSS engine.");
Engine::Rss
} else {
Engine::Http
};
let engine_label = match &engine {
Engine::Playwright => " (with JS)".to_string(),
Engine::Rss => " (as RSS feed)".to_string(),
Engine::Http => "".to_string(),
Engine::Shell { .. } => " (shell command)".to_string(),
};
println!(" Fetching {}{}...", url, engine_label);
let content = match fetch::fetch(&url, engine.clone(), &std::collections::HashMap::new()) {
Ok(c) => c,
Err(e) => {
let msg = platform_detect::friendly_error_message(&e.to_string(), &url);
return Err(kto::KtoError::ConfigError(msg));
}
};
let extraction = match (&selector, &engine) {
(Some(ref sel), _) => Extraction::Selector { selector: sel.clone() },
(None, Engine::Rss) => Extraction::Rss,
(None, _) => Extraction::Auto,
};
let mut extracted = extract::extract(&content, &extraction)?;
let title = extract::extract_title(&content.html)
.unwrap_or_else(|| "Untitled".to_string());
let (final_engine, final_content) = if extracted.len() < 200 && !use_js && engine == Engine::Http {
if check_playwright().is_ready() {
println!(" Site needs visual rendering. Switching to browser mode...");
match fetch::fetch(&url, Engine::Playwright, &std::collections::HashMap::new()) {
Ok(js_content) => {
let js_extracted = extract::extract(&js_content, &extraction)
.unwrap_or_else(|_| extracted.clone());
if js_extracted.len() > extracted.len() {
extracted = js_extracted;
(Engine::Playwright, js_content)
} else {
(engine, content)
}
}
Err(_) => (engine, content),
}
} else {
println!("\n Note: Page may need JavaScript. Run `kto init` to enable browser mode.");
(engine, content)
}
} else {
(engine, content)
};
(final_engine, final_content, extracted, title, None)
};
let extraction = match (&selector, &engine) {
(Some(ref sel), _) => Extraction::Selector { selector: sel.clone() },
(None, Engine::Rss) => Extraction::Rss,
(None, _) => Extraction::Auto,
};
let (name, final_url, final_interval, final_agent_enabled, final_agent_instructions, final_extraction, final_engine) =
if let Some(ref suggestion) = enhanced_suggestion {
match display_enhanced_confirmation(
&url,
suggestion,
&extraction,
engine.clone(),
&name_override,
interval,
yes,
) {
Ok(result) => result,
Err(kto::KtoError::RetryWithDeepResearch) => {
return run_deep_research_flow(
&input,
&url,
name_override,
interval,
tags,
use_profile,
yes,
interactive,
);
}
Err(e) => return Err(e),
}
} else {
if !yes {
let preview: String = extracted.chars().take(200).collect();
println!("\n Title: {}", title);
println!(" Content preview: {}...\n", preview.trim());
}
let name = match name_override {
Some(n) => n,
None if interactive => {
Text::new("Name for this watch?")
.with_default(&title)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?
}
None => title.clone(),
};
let (agent_enabled, final_instructions) = if use_agent {
(true, agent_instructions.clone())
} else if interactive {
println!();
let intent_options = vec![
"Price changes (sales, drops, increases)",
"Back in stock / availability",
"New content or updates",
"Any changes (notify on all)",
"Custom (I'll describe it)",
];
let choice = Select::new("What do you want to watch for?", intent_options)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
let (agent_needed, instructions) = match choice {
"Price changes (sales, drops, increases)" => {
(true, Some("Alert when price changes. Include old and new price.".to_string()))
}
"Back in stock / availability" => {
(true, Some("Alert when item becomes available or goes out of stock.".to_string()))
}
"New content or updates" => {
(true, Some("Alert when new content is added. Summarize what's new.".to_string()))
}
"Any changes (notify on all)" => {
(false, None)
}
"Custom (I'll describe it)" => {
let custom_intent = Text::new("Describe what changes matter:")
.with_help_message("e.g., 'price drops below $50', 'new job postings'")
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
if custom_intent.trim().is_empty() {
(false, None)
} else {
(true, Some(custom_intent.trim().to_string()))
}
}
_ => (false, None),
};
if agent_needed && !claude_available {
println!(" Note: Smart filtering requires Claude CLI.");
println!(" Install: curl -fsSL https://claude.ai/install.sh | bash");
println!(" Will notify on all changes for now.");
(false, None)
} else {
(agent_needed, instructions)
}
} else {
(false, agent_instructions.clone())
};
(name, url.clone(), interval, agent_enabled, final_instructions, extraction.clone(), engine)
};
if let Some(ref instructions) = final_agent_instructions {
if instructions.contains('$') {
println!(" Note: Instructions contain '$' - if using prices, this looks correct.");
} else if instructions.chars().any(|c| c.is_ascii_digit()) {
let has_bare_number = instructions.split_whitespace().any(|word| {
word.chars().all(|c| c.is_ascii_digit() || c == '.')
&& word.parse::<f64>().is_ok()
});
if has_bare_number && !instructions.contains('$') {
println!(" Warning: Instructions contain numbers without '$' symbol.");
println!(" If you meant a price (e.g., $170), the '$' may have been");
println!(" eaten by bash. Use single quotes: --agent-instructions 'price < $170'");
}
}
}
let mut watch = Watch::new(name.clone(), final_url.clone());
watch.interval_secs = final_interval.max(10);
watch.engine = final_engine;
watch.extraction = final_extraction;
watch.tags = tags;
watch.use_profile = use_profile;
if final_agent_enabled {
watch.agent_config = Some(AgentConfig {
enabled: true,
prompt_template: None,
instructions: final_agent_instructions,
});
}
db.insert_watch(&watch)?;
let normalized = normalize(&extracted, &watch.normalization);
let hash = hash_content(&normalized);
let snapshot = Snapshot {
id: Uuid::new_v4(),
watch_id: watch.id,
fetched_at: Utc::now(),
raw_html: Some(zstd::encode_all(content.html.as_bytes(), 3)?),
extracted: normalized,
content_hash: hash.clone(),
};
db.insert_snapshot(&snapshot)?;
let has_agent = watch.agent_config.is_some();
let agent_instructions = watch.agent_config.as_ref().and_then(|c| c.instructions.as_deref());
let intent_description = platform_detect::describe_watch_intent(
&watch.engine,
has_agent,
agent_instructions,
);
let success_msg = platform_detect::format_watch_created(
&name,
&intent_description,
watch.interval_secs,
has_agent,
);
println!("{}", success_msg);
if !watch.tags.is_empty() {
println!(" Tags: {}", watch.tags.join(", "));
}
let mut config = Config::load()?;
if config.default_notify.is_none() && interactive && !yes {
println!();
if let Some(target) = prompt_notification_setup()? {
config.default_notify = Some(target);
config.save()?;
println!(" Notification settings saved.");
}
}
if !is_daemon_running() {
println!("\n Run `kto daemon` to start monitoring.");
}
Ok(())
}
pub fn cmd_list(verbose: bool, tag_filter: Option<String>, json: bool) -> Result<()> {
let db = Database::open()?;
let mut watches = db.list_watches()?;
if let Some(ref tag) = tag_filter {
watches.retain(|w| w.tags.iter().any(|t| t.eq_ignore_ascii_case(tag)));
}
if json {
println!("{}", serde_json::to_string_pretty(&watches)?);
return Ok(());
}
if watches.is_empty() {
if tag_filter.is_some() {
println!("No watches found with tag '{}'.", tag_filter.unwrap());
} else {
println!("No watches configured. Run `kto new` to create one.");
}
return Ok(());
}
let use_color = atty::is(atty::Stream::Stdout);
println!("\nWatches:\n");
if verbose {
for watch in watches {
let status = if watch.enabled {
if use_color { "active".green().to_string() } else { "active".to_string() }
} else {
if use_color { "paused".yellow().to_string() } else { "paused".to_string() }
};
println!(" {} ({})", watch.name.bold(), &watch.id.to_string()[..8]);
println!(" URL: {}", watch.url);
println!(" Status: {}, every {}", status, format_interval(watch.interval_secs));
println!(" Engine: {:?}", watch.engine);
if watch.agent_config.is_some() {
println!(" AI Agent: enabled");
}
if !watch.tags.is_empty() {
println!(" Tags: {}", watch.tags.join(", "));
}
println!();
}
} else {
let max_name_len = watches.iter().map(|w| w.name.len()).max().unwrap_or(20).min(30);
for watch in watches {
let status_indicator = if watch.enabled {
if use_color { "●".green().to_string() } else { "[active]".to_string() }
} else {
if use_color { "○".yellow().to_string() } else { "[paused]".to_string() }
};
let engine_badge = if watch.engine == Engine::Rss {
if use_color { " RSS".magenta().to_string() } else { " [RSS]".to_string() }
} else {
"".to_string()
};
let ai_badge = if watch.agent_config.is_some() {
if use_color { " AI".cyan().to_string() } else { " [AI]".to_string() }
} else {
"".to_string()
};
let name = truncate_str(&watch.name, max_name_len);
let padded_name = format!("{:width$}", name, width = max_name_len);
let url = truncate_str(&watch.url, 50);
let interval = format_interval(watch.interval_secs);
println!(" {} {}{}{} {} ({})",
status_indicator,
if use_color { padded_name.bold().to_string() } else { padded_name },
engine_badge,
ai_badge,
url.dimmed(),
interval);
}
}
println!();
Ok(())
}
pub fn cmd_show(id_or_name: &str, json: bool) -> Result<()> {
let db = Database::open()?;
let watch = db.get_watch(id_or_name)?
.ok_or_else(|| kto::KtoError::WatchNotFound(id_or_name.to_string()))?;
let changes = db.get_recent_changes(&watch.id, 5)?;
if json {
let output = serde_json::json!({
"watch": watch,
"recent_changes": changes
});
println!("{}", serde_json::to_string_pretty(&output)?);
return Ok(());
}
println!("\nWatch: {}\n", watch.name);
println!(" ID: {}", watch.id);
println!(" URL: {}", watch.url);
println!(" Status: {}", if watch.enabled { "active" } else { "paused" });
println!(" Interval: {}", format_interval(watch.interval_secs));
println!(" Engine: {:?}", watch.engine);
if let Some(ref agent_config) = watch.agent_config {
println!(" AI Agent: {}", if agent_config.enabled { "enabled" } else { "disabled" });
if let Some(ref instructions) = agent_config.instructions {
println!(" Instructions: {}", instructions);
}
}
if watch.use_profile {
println!(" Profile: enabled");
}
println!(" Created: {}", watch.created_at.format("%Y-%m-%d %H:%M"));
if !changes.is_empty() {
println!("\n Recent changes:");
for change in changes {
let notified = if change.notified { "notified" } else { "not notified" };
println!(" {} - {}", change.detected_at.format("%Y-%m-%d %H:%M"), notified);
}
}
Ok(())
}
pub fn cmd_edit(
id_or_name: &str,
new_name: Option<String>,
new_interval: Option<String>,
new_enabled: Option<bool>,
new_agent: Option<bool>,
new_agent_instructions: Option<String>,
new_selector: Option<String>,
new_notify: Option<String>,
new_use_profile: Option<bool>,
) -> Result<()> {
use inquire::Select;
let db = Database::open()?;
let mut watch = db.get_watch(id_or_name)?
.ok_or_else(|| kto::KtoError::WatchNotFound(id_or_name.to_string()))?;
let has_flags = new_name.is_some() || new_interval.is_some() || new_enabled.is_some()
|| new_agent.is_some() || new_agent_instructions.is_some() || new_selector.is_some()
|| new_notify.is_some() || new_use_profile.is_some();
if has_flags {
let mut changes = Vec::new();
if let Some(name) = new_name {
watch.name = name.clone();
changes.push(format!("name -> {}", name));
}
if let Some(ref interval_str) = new_interval {
let interval = parse_interval_str(interval_str)?;
watch.interval_secs = interval;
changes.push(format!("interval -> {}", format_interval(interval)));
}
if let Some(enabled) = new_enabled {
watch.enabled = enabled;
changes.push(format!("enabled -> {}", enabled));
}
if let Some(agent) = new_agent {
if agent {
if watch.agent_config.is_none() {
watch.agent_config = Some(AgentConfig {
enabled: true,
prompt_template: None,
instructions: None,
});
} else if let Some(ref mut config) = watch.agent_config {
config.enabled = true;
}
changes.push("agent -> enabled".to_string());
} else {
if let Some(ref mut config) = watch.agent_config {
config.enabled = false;
}
changes.push("agent -> disabled".to_string());
}
}
if let Some(instructions) = new_agent_instructions {
if watch.agent_config.is_none() {
watch.agent_config = Some(AgentConfig {
enabled: true,
prompt_template: None,
instructions: Some(instructions.clone()),
});
} else if let Some(ref mut config) = watch.agent_config {
config.instructions = Some(instructions.clone());
}
changes.push(format!("agent_instructions -> {}", instructions));
}
if let Some(selector) = new_selector {
watch.extraction = Extraction::Selector { selector: selector.clone() };
changes.push(format!("selector -> {}", selector));
}
if let Some(notify_str) = new_notify {
if notify_str.to_lowercase() == "none" || notify_str.to_lowercase() == "clear" {
watch.notify_target = None;
changes.push("notify -> cleared (will use global default)".to_string());
} else {
let target = super::parse_notify_string(¬ify_str)?;
let description = super::describe_notify_target(&target);
watch.notify_target = Some(target);
changes.push(format!("notify -> {}", description));
}
}
if let Some(profile) = new_use_profile {
watch.use_profile = profile;
changes.push(format!("use_profile -> {}", profile));
}
db.update_watch(&watch)?;
println!("\nUpdated watch '{}':", watch.name);
for change in changes {
println!(" {}", change);
}
} else if atty::is(atty::Stream::Stdin) {
println!("\nEditing watch: {}\n", watch.name);
println!(" Current settings:");
println!(" Name: {}", watch.name);
println!(" URL: {}", watch.url);
println!(" Interval: {}", format_interval(watch.interval_secs));
println!(" Status: {}", if watch.enabled { "active" } else { "paused" });
println!(" Engine: {:?}", watch.engine);
if let Some(ref config) = watch.agent_config {
println!(" AI Agent: {}", if config.enabled { "enabled" } else { "disabled" });
if let Some(ref inst) = config.instructions {
println!(" Instructions: {}", inst);
}
} else {
println!(" AI Agent: not configured");
}
println!();
loop {
let options = vec![
"Change name",
"Change interval",
"Toggle pause/resume",
"Toggle AI agent",
"Set agent instructions",
"Done",
];
let choice = Select::new("What would you like to change?", options)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
match choice {
"Change name" => {
let new = Text::new("New name:")
.with_default(&watch.name)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
watch.name = new;
println!(" Name updated.");
}
"Change interval" => {
let current = format_interval(watch.interval_secs);
let new = Text::new("New interval (e.g., 5m, 1h, 30s):")
.with_default(¤t)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
if let Ok(secs) = parse_interval_str(&new) {
watch.interval_secs = secs;
println!(" Interval updated to {}.", format_interval(secs));
} else {
println!(" Invalid interval format. Use 30s, 5m, 1h, etc.");
}
}
"Toggle pause/resume" => {
watch.enabled = !watch.enabled;
println!(" Watch {}.", if watch.enabled { "resumed" } else { "paused" });
}
"Toggle AI agent" => {
if let Some(ref mut config) = watch.agent_config {
config.enabled = !config.enabled;
println!(" AI agent {}.", if config.enabled { "enabled" } else { "disabled" });
} else {
watch.agent_config = Some(AgentConfig {
enabled: true,
prompt_template: None,
instructions: None,
});
println!(" AI agent enabled.");
}
}
"Set agent instructions" => {
let current = watch.agent_config.as_ref()
.and_then(|c| c.instructions.as_deref())
.unwrap_or("");
let new = Text::new("Agent instructions:")
.with_default(current)
.with_help_message("What should the AI focus on when analyzing changes?")
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
if watch.agent_config.is_none() {
watch.agent_config = Some(AgentConfig {
enabled: true,
prompt_template: None,
instructions: if new.is_empty() { None } else { Some(new) },
});
} else if let Some(ref mut config) = watch.agent_config {
config.instructions = if new.is_empty() { None } else { Some(new) };
}
println!(" Instructions updated.");
}
"Done" => break,
_ => {}
}
}
db.update_watch(&watch)?;
println!("\nWatch '{}' updated.", watch.name);
} else {
println!("No flags provided and not running interactively.");
println!("Use flags like --interval 300 or run in a terminal for interactive mode.");
}
Ok(())
}
pub fn cmd_pause(id_or_name: &str) -> Result<()> {
let db = Database::open()?;
let mut watch = db.get_watch(id_or_name)?
.ok_or_else(|| kto::KtoError::WatchNotFound(id_or_name.to_string()))?;
watch.enabled = false;
db.update_watch(&watch)?;
println!("Paused watch: {}", watch.name);
Ok(())
}
pub fn cmd_resume(id_or_name: &str) -> Result<()> {
let db = Database::open()?;
let mut watch = db.get_watch(id_or_name)?
.ok_or_else(|| kto::KtoError::WatchNotFound(id_or_name.to_string()))?;
watch.enabled = true;
db.update_watch(&watch)?;
println!("Resumed watch: {}", watch.name);
Ok(())
}
pub fn cmd_delete(id_or_name: &str, skip_confirm: bool) -> Result<()> {
let db = Database::open()?;
let watch = db.get_watch(id_or_name)?
.ok_or_else(|| kto::KtoError::WatchNotFound(id_or_name.to_string()))?;
if !skip_confirm {
let confirm = Confirm::new(&format!("Delete watch '{}'?", watch.name))
.with_default(false)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
if !confirm {
println!("Cancelled.");
return Ok(());
}
}
db.delete_watch(&watch.id)?;
println!("Deleted watch: {}", watch.name);
Ok(())
}
fn dual_fetch(url: &str) -> Result<(Option<PageContent>, Option<PageContent>)> {
dual_fetch_with_hint(url, false)
}
fn dual_fetch_with_hint(url: &str, skip_http: bool) -> Result<(Option<PageContent>, Option<PageContent>)> {
let url_owned = url.to_string();
let http_handle = if !skip_http {
let url_http = url_owned.clone();
Some(thread::spawn(move || {
fetch::fetch(&url_http, Engine::Http, &std::collections::HashMap::new())
}))
} else {
None
};
let playwright_available = check_playwright().is_ready();
let js_handle = if playwright_available {
let url_js = url_owned.clone();
Some(thread::spawn(move || {
fetch::fetch(&url_js, Engine::Playwright, &std::collections::HashMap::new())
}))
} else {
None
};
let http_content = if let Some(handle) = http_handle {
handle
.join()
.map_err(|_| kto::KtoError::ConfigError("HTTP fetch thread panicked".into()))?
.ok()
} else {
None
};
let js_content = if let Some(handle) = js_handle {
handle
.join()
.map_err(|_| kto::KtoError::ConfigError("Playwright fetch thread panicked".into()))?
.ok()
} else {
None
};
let http_status = if skip_http {
"–" } else if http_content.is_some() {
"✓"
} else {
"✗"
};
let js_status = if js_content.is_some() {
"✓"
} else if playwright_available {
"✗"
} else {
"–"
};
println!(" Fetched: HTTP {} | JS {}", http_status, js_status);
Ok((http_content, js_content))
}
fn display_enhanced_confirmation(
url: &str,
suggestion: &EnhancedSetupSuggestion,
default_extraction: &Extraction,
default_engine: Engine,
name_override: &Option<String>,
_default_interval: u64,
yes: bool,
) -> Result<(String, String, u64, bool, Option<String>, Extraction, Engine)> {
let low_confidence = suggestion.confidence < CONFIDENCE_THRESHOLD;
if !yes {
println!();
println!(" {}", "Analysis Results".bold().underline());
println!();
if let Some(ref status) = suggestion.current_status {
println!(" Status: {}", status.cyan());
}
let engine_text = if suggestion.needs_js {
let reason = suggestion.js_reason.as_ref().map(|r| format!(" ({})", r)).unwrap_or_default();
format!("{}{}", "JavaScript required".yellow(), reason)
} else {
"HTTP".to_string()
};
println!(" Engine: {}", engine_text);
if !suggestion.variants.is_empty() {
println!();
let more = if suggestion.variants.len() > 5 {
format!(" (+{} more)", suggestion.variants.len() - 5)
} else {
String::new()
};
println!(" Variants:{}", more);
for (i, variant) in suggestion.variants.iter().take(5).enumerate() {
let status_str = variant.status.as_deref().unwrap_or("?");
let is_match = suggestion.intent_match.as_ref().map(|m| m.variant_index == i).unwrap_or(false);
let marker = if is_match { " ← intent".yellow().to_string() } else { "".to_string() };
println!(" {}. {} - {}{}", i + 1, variant.name, status_str, marker);
}
}
println!();
println!(" Suggested:");
println!(" Name: {}", suggestion.name);
println!(" Interval: {}", format_interval(suggestion.interval_secs));
if let Some(ref instructions) = suggestion.agent_instructions {
let display_instructions = truncate_str(instructions, 60);
println!(" AI: \"{}\"", display_instructions);
}
if low_confidence && !suggestion.uncertainty_reasons.is_empty() {
println!();
println!(" {} Low confidence ({:.0}%):", "⚠".yellow(), suggestion.confidence * 100.0);
for reason in &suggestion.uncertainty_reasons {
println!(" • {}", reason);
}
}
println!();
}
let claude_available = agent::claude_version().is_some();
let final_url = if let Some(ref intent_match) = suggestion.intent_match {
if let Some(variant) = suggestion.variants.get(intent_match.variant_index) {
if let Some(ref url_hint) = variant.url_hint {
construct_variant_url(url, url_hint)
} else {
url.to_string()
}
} else {
url.to_string()
}
} else {
url.to_string()
};
if final_url != url && !yes {
println!(" Using variant URL: {}", final_url.cyan());
println!();
}
if yes {
let name = name_override.clone().unwrap_or_else(|| suggestion.name.clone());
let engine = if suggestion.needs_js { Engine::Playwright } else { default_engine };
let extraction = suggestion.selector_hint.as_ref()
.map(|sel| Extraction::Selector { selector: sel.clone() })
.unwrap_or_else(|| default_extraction.clone());
return Ok((
name,
final_url,
suggestion.interval_secs,
suggestion.agent_enabled,
suggestion.agent_instructions.clone(),
extraction,
engine,
));
}
let mut choices = vec!["Create Watch"];
if low_confidence && claude_available {
choices.insert(0, "Run Deep Research");
}
if !suggestion.variants.is_empty() && suggestion.variants.len() > 1 {
choices.push("Select Different Variant");
}
choices.push("Customize");
choices.push("Cancel");
let choice = Select::new("What would you like to do?", choices)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
match choice {
"Run Deep Research" => {
return Err(kto::KtoError::RetryWithDeepResearch);
}
"Create Watch" => {
let name = name_override.clone().unwrap_or_else(|| suggestion.name.clone());
let engine = if suggestion.needs_js { Engine::Playwright } else { default_engine };
let extraction = suggestion.selector_hint.as_ref()
.map(|sel| Extraction::Selector { selector: sel.clone() })
.unwrap_or_else(|| default_extraction.clone());
Ok((
name,
final_url,
suggestion.interval_secs,
suggestion.agent_enabled,
suggestion.agent_instructions.clone(),
extraction,
engine,
))
}
"Select Different Variant" => {
let variant_names: Vec<String> = suggestion.variants.iter()
.enumerate()
.map(|(i, v)| {
let status = v.status.as_deref().unwrap_or("unknown");
format!("{}. {} - {}", i + 1, v.name, status)
})
.collect();
let selected = Select::new("Which variant do you want to monitor?", variant_names)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
let selected_idx = selected.split('.').next()
.and_then(|s| s.trim().parse::<usize>().ok())
.map(|n| n - 1)
.unwrap_or(0);
let selected_variant = &suggestion.variants[selected_idx];
let variant_url = if let Some(ref hint) = selected_variant.url_hint {
construct_variant_url(url, hint)
} else {
url.to_string()
};
let name = name_override.clone().unwrap_or_else(|| {
format!("{} {}", suggestion.name, selected_variant.name)
});
let instructions = Some(format!(
"Monitor {} variant. Alert when status changes from '{}'",
selected_variant.name,
selected_variant.status.as_deref().unwrap_or("current")
));
let engine = if suggestion.needs_js { Engine::Playwright } else { default_engine };
let extraction = suggestion.selector_hint.as_ref()
.map(|sel| Extraction::Selector { selector: sel.clone() })
.unwrap_or_else(|| default_extraction.clone());
println!(" Selected variant: {}", selected_variant.name);
if variant_url != url {
println!(" Using URL: {}", variant_url.cyan());
}
Ok((
name,
variant_url,
suggestion.interval_secs,
true,
instructions,
extraction,
engine,
))
}
"Customize" => {
let name = Text::new("Name for this watch?")
.with_default(&name_override.clone().unwrap_or_else(|| suggestion.name.clone()))
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
let interval_str = Text::new("Check interval (e.g., 5m, 1h)?")
.with_default(&format_interval(suggestion.interval_secs))
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
let custom_interval = crate::utils::parse_interval_str(&interval_str)
.unwrap_or(suggestion.interval_secs);
let use_ai = Confirm::new("Enable AI analysis?")
.with_default(suggestion.agent_enabled)
.prompt()
.unwrap_or(suggestion.agent_enabled);
let instructions = if use_ai {
let inst = Text::new("What should AI watch for?")
.with_default(suggestion.agent_instructions.as_deref().unwrap_or(""))
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
if inst.is_empty() { None } else { Some(inst) }
} else {
None
};
let use_js = if suggestion.needs_js {
Confirm::new("Use JavaScript rendering (recommended)?")
.with_default(true)
.prompt()
.unwrap_or(true)
} else {
Confirm::new("Use JavaScript rendering?")
.with_default(false)
.prompt()
.unwrap_or(false)
};
let engine = if use_js { Engine::Playwright } else { Engine::Http };
let extraction = suggestion.selector_hint.as_ref()
.map(|sel| Extraction::Selector { selector: sel.clone() })
.unwrap_or_else(|| default_extraction.clone());
Ok((
name,
final_url,
custom_interval,
use_ai,
instructions,
extraction,
engine,
))
}
"Cancel" | _ => {
Err(kto::KtoError::ConfigError("Watch creation cancelled".into()))
}
}
}
fn construct_variant_url(base_url: &str, url_hint: &str) -> String {
if let Ok(mut parsed) = url::Url::parse(base_url) {
if url_hint.contains('=') {
for param in url_hint.split('&') {
if let Some((key, value)) = param.split_once('=') {
let pairs: Vec<(String, String)> = parsed.query_pairs()
.filter(|(k, _)| k != key)
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
parsed.set_query(None);
for (k, v) in pairs {
parsed.query_pairs_mut().append_pair(&k, &v);
}
parsed.query_pairs_mut().append_pair(key, value);
}
}
} else {
let query = parsed.query().map(|q| format!("{}&{}", q, url_hint))
.unwrap_or_else(|| url_hint.to_string());
parsed.set_query(Some(&query));
}
parsed.to_string()
} else {
if base_url.contains('?') {
format!("{}&{}", base_url, url_hint)
} else {
format!("{}?{}", base_url, url_hint)
}
}
}
fn display_transform_suggestion(
original_url: &str,
transform: &TransformMatch,
name_override: &Option<String>,
default_interval: u64,
_tags: &[String],
_use_profile: bool,
yes: bool,
interactive: bool,
) -> Result<Option<(String, String, Engine, Extraction, u64)>> {
let transformed_url = transform.url.as_str();
let default_name = generate_name_from_url(&transform.url);
if yes {
let name = name_override.clone().unwrap_or(default_name);
let extraction = if transform.engine == Engine::Rss {
Extraction::Rss
} else {
Extraction::Auto
};
println!("\n Found: {}", transform.description);
return Ok(Some((
name,
transformed_url.to_string(),
transform.engine.clone(),
extraction,
default_interval,
)));
}
if !interactive {
return Ok(None);
}
println!();
let platform_name = detect_platform_name(transform.url.host_str());
println!(" ✨ {} detected", platform_name.cyan());
println!();
println!(" Found: {}", transform.description.green());
println!();
if transform.engine == Engine::Rss {
println!(" Will notify you when new items are published.");
} else {
println!(" Will monitor the page for changes.");
}
println!();
let choices = vec!["Accept (recommended)", "Use original URL instead", "Cancel"];
let choice = Select::new("How would you like to proceed?", choices)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
match choice {
"Accept (recommended)" => {
let name = match name_override {
Some(n) => n.clone(),
None => {
Text::new("Name for this watch?")
.with_default(&default_name)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?
}
};
let extraction = if transform.engine == Engine::Rss {
Extraction::Rss
} else {
Extraction::Auto
};
Ok(Some((
name,
transformed_url.to_string(),
transform.engine.clone(),
extraction,
default_interval,
)))
}
"Use original URL instead" => {
println!(" Using original URL: {}", original_url);
Ok(None)
}
"Cancel" | _ => {
Err(kto::KtoError::ConfigError("Watch creation cancelled".into()))
}
}
}
fn create_watch_from_transform(
db: &Database,
name: String,
url: String,
engine: Engine,
extraction: Extraction,
interval: u64,
tags: Vec<String>,
use_profile: bool,
interactive: bool,
yes: bool,
) -> Result<()> {
println!("\n Fetching {}...", url);
let content = fetch::fetch(&url, engine.clone(), &std::collections::HashMap::new())?;
let extracted = extract::extract(&content, &extraction)?;
let mut watch = Watch::new(name.clone(), url);
watch.interval_secs = interval.max(10);
watch.engine = engine;
watch.extraction = extraction;
watch.tags = tags;
watch.use_profile = use_profile;
db.insert_watch(&watch)?;
let normalized = normalize(&extracted, &watch.normalization);
let hash = hash_content(&normalized);
let snapshot = Snapshot {
id: Uuid::new_v4(),
watch_id: watch.id,
fetched_at: Utc::now(),
raw_html: Some(zstd::encode_all(content.html.as_bytes(), 3)?),
extracted: normalized,
content_hash: hash.clone(),
};
db.insert_snapshot(&snapshot)?;
let intent_description = platform_detect::describe_watch_intent(&watch.engine, false, None);
let success_msg = platform_detect::format_watch_created(
&name,
&intent_description,
watch.interval_secs,
false,
);
println!("{}", success_msg);
if !watch.tags.is_empty() {
println!(" Tags: {}", watch.tags.join(", "));
}
let mut config = Config::load()?;
if config.default_notify.is_none() && interactive && !yes {
println!();
if let Some(target) = super::prompt_notification_setup()? {
config.default_notify = Some(target);
config.save()?;
println!(" Notification settings saved.");
}
}
if !is_daemon_running() {
println!("\n Run `kto daemon` to start monitoring.");
}
Ok(())
}
fn create_watch_from_transform_magical(
db: &Database,
original_url: &str,
transform: &TransformMatch,
name_override: Option<String>,
default_interval: u64,
tags: Vec<String>,
use_profile: bool,
interactive: bool,
yes: bool,
) -> Result<()> {
let transformed_url = transform.url.as_str();
let engine = transform.engine.clone();
println!("\n Analyzing {}...", original_url.split('/').take(3).collect::<Vec<_>>().join("/"));
let content = match fetch::fetch(transformed_url, engine.clone(), &std::collections::HashMap::new()) {
Ok(c) => c,
Err(e) => {
let msg = platform_detect::friendly_error_message(&e.to_string(), original_url);
return Err(kto::KtoError::ConfigError(msg));
}
};
let extraction = if engine == Engine::Rss {
Extraction::Rss
} else {
Extraction::Auto
};
let extracted = extract::extract(&content, &extraction)?;
let default_name = generate_name_from_url(&transform.url);
let name = name_override.unwrap_or(default_name);
let latest_item = if engine == Engine::Rss {
extract_first_rss_item(&extracted)
} else {
None
};
println!();
let preview = platform_detect::format_known_platform_preview(
original_url,
&detect_platform_name(transform.url.host_str()),
transform.description,
latest_item.as_deref(),
);
println!("{}", preview);
let mut watch = Watch::new(name.clone(), transformed_url.to_string());
watch.interval_secs = default_interval.max(10);
watch.engine = engine.clone();
watch.extraction = extraction;
watch.tags = tags;
watch.use_profile = use_profile;
db.insert_watch(&watch)?;
let normalized = normalize(&extracted, &watch.normalization);
let hash = hash_content(&normalized);
let snapshot = Snapshot {
id: Uuid::new_v4(),
watch_id: watch.id,
fetched_at: Utc::now(),
raw_html: Some(zstd::encode_all(content.html.as_bytes(), 3)?),
extracted: normalized,
content_hash: hash,
};
db.insert_snapshot(&snapshot)?;
let intent_description = platform_detect::describe_watch_intent(&watch.engine, false, None);
let success_msg = platform_detect::format_watch_created(
&name,
&intent_description,
watch.interval_secs,
false,
);
println!("{}", success_msg);
let mut config = Config::load()?;
if config.default_notify.is_none() && interactive && !yes {
println!();
if let Some(target) = super::prompt_notification_setup()? {
config.default_notify = Some(target);
config.save()?;
println!(" Notification settings saved.");
}
}
if !is_daemon_running() {
println!("\n Run `kto daemon` to start monitoring.");
}
Ok(())
}
fn extract_first_rss_item(rss_content: &str) -> Option<String> {
for line in rss_content.lines() {
let trimmed = line.trim();
if !trimmed.is_empty() && !trimmed.starts_with('-') && !trimmed.starts_with('[') {
if trimmed.len() > 5 && trimmed.len() < 100 {
return Some(trimmed.to_string());
}
}
}
None
}
fn detect_platform_name(host: Option<&str>) -> String {
match host {
Some("github.com") => "GitHub Repository".to_string(),
Some("gitlab.com") => "GitLab Project".to_string(),
Some("codeberg.org") => "Codeberg Repository".to_string(),
Some("news.ycombinator.com") => "Hacker News".to_string(),
Some(h) if h.contains("reddit.com") => "Reddit".to_string(),
Some("pypi.org") => "PyPI Package".to_string(),
Some("crates.io") => "Crates.io Package".to_string(),
Some("hub.docker.com") => "Docker Hub".to_string(),
Some("www.npmjs.com") => "npm Package".to_string(),
Some(h) => h.to_string(),
None => "Site".to_string(),
}
}
fn generate_name_from_url(url: &url::Url) -> String {
let path = url.path().trim_matches('/');
let segments: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
if let Some(host) = url.host_str() {
if (host == "github.com" || host == "gitlab.com" || host == "codeberg.org")
&& segments.len() >= 2
{
let owner = segments[0];
let repo = segments[1];
return format!("{}/{}", owner, repo);
}
if host.contains("reddit.com") && segments.len() >= 2 && segments[0] == "r" {
return format!("r/{}", segments[1]);
}
if host == "news.ycombinator.com" {
return "Hacker News".to_string();
}
if host == "pypi.org" && segments.len() >= 2 && segments[0] == "project" {
return format!("PyPI: {}", segments[1]);
}
}
url.host_str()
.unwrap_or("Watch")
.to_string()
}
fn run_deep_research_flow(
input: &str,
url: &str,
name_override: Option<String>,
default_interval: u64,
tags: Vec<String>,
use_profile: bool,
yes: bool,
interactive: bool,
) -> Result<()> {
let db = Database::open()?;
println!("\n {} Deep Research Mode", "🔬".bold());
println!(" Analyzing {}...", url);
let parsed_url = url::Url::parse(url).ok();
let detected_intent = Intent::detect(input);
let transform_match = parsed_url
.as_ref()
.and_then(|u| transforms::match_transform(u, detected_intent));
let skip_http = transform_match
.as_ref()
.map(|m| m.engine == Engine::Playwright)
.unwrap_or(false);
if skip_http {
println!(" Note: Transform rule specifies Playwright - skipping HTTP fetch");
}
let (http_content, js_content) = dual_fetch_with_hint(url, skip_http)?;
let http_html = http_content.as_ref().map(|c| c.html.as_str());
let http_extracted = http_content.as_ref()
.and_then(|c| extract::extract(c, &Extraction::Auto).ok());
let js_extracted = js_content.as_ref()
.and_then(|c| extract::extract(c, &Extraction::Auto).ok());
let site_type = http_html.and_then(|html| fetch::detect_site_type(url, html));
if let Some(ref st) = site_type {
println!(" Detected: {}", st.cyan());
}
let discovered_feeds = if let Some(html) = http_html {
println!(" Discovering feeds...");
let feeds = fetch::discover_feeds(url, html);
if !feeds.is_empty() {
println!(" Found {} feed(s)", feeds.len());
}
feeds
} else {
vec![]
};
let jsonld_data = http_html.and_then(|html| extract::extract_raw_jsonld(html));
if jsonld_data.is_some() {
println!(" Found JSON-LD structured data");
}
println!(" Analyzing with AI (this may take a moment)...");
let research_result = match agent::deep_research_analysis(
url,
input,
http_extracted.as_deref(),
js_extracted.as_deref(),
&discovered_feeds,
jsonld_data.as_deref(),
site_type.as_deref(),
) {
Ok(r) => r,
Err(e) => {
eprintln!(" Research failed: {} (using fallback)", e);
DeepResearchResult::fallback(url, input)
}
};
display_research_results(&research_result);
let modified_url = apply_url_modifications(url, &research_result);
if modified_url != url {
println!(" URL modified: {}", modified_url.cyan());
}
let (name, final_url, final_engine, final_extraction, final_interval, agent_enabled, agent_instructions) =
if yes {
let name = name_override.unwrap_or_else(|| {
if let Ok(parsed) = url::Url::parse(&modified_url) {
generate_name_from_url(&parsed)
} else {
"Watch".to_string()
}
});
let engine = research_result.engine.to_engine();
let extraction = match research_result.extraction.strategy.as_str() {
"selector" => {
if let Some(ref sel) = research_result.extraction.selector {
Extraction::Selector { selector: sel.clone() }
} else {
Extraction::Auto
}
}
"rss" => Extraction::Rss,
"json_ld" => Extraction::JsonLd { types: None },
_ => Extraction::Auto,
};
(
name,
modified_url.clone(),
engine,
extraction,
research_result.interval_secs,
research_result.agent_instructions.is_some(),
research_result.agent_instructions.clone(),
)
} else if interactive {
confirm_research_results(
&modified_url,
&research_result,
&name_override,
default_interval,
)?
} else {
return Err(kto::KtoError::ConfigError(
"Deep research requires interactive mode or --yes flag".into()
));
};
println!("\n Fetching with {:?} engine...", final_engine);
let content = fetch::fetch(&final_url, final_engine.clone(), &std::collections::HashMap::new())?;
let extracted = extract::extract(&content, &final_extraction)?;
let mut watch = Watch::new(name.clone(), final_url);
watch.interval_secs = final_interval.max(10);
watch.engine = final_engine;
watch.extraction = final_extraction;
watch.tags = tags;
watch.use_profile = use_profile;
if agent_enabled {
watch.agent_config = Some(AgentConfig {
enabled: true,
prompt_template: None,
instructions: agent_instructions,
});
}
db.insert_watch(&watch)?;
let normalized = normalize(&extracted, &watch.normalization);
let hash = hash_content(&normalized);
let snapshot = Snapshot {
id: Uuid::new_v4(),
watch_id: watch.id,
fetched_at: Utc::now(),
raw_html: Some(zstd::encode_all(content.html.as_bytes(), 3)?),
extracted: normalized,
content_hash: hash.clone(),
};
db.insert_snapshot(&snapshot)?;
println!("\n Created watch \"{}\"", name);
println!(" Initial hash: {}", &hash[..8]);
println!(" Engine: {:?}", watch.engine);
if watch.agent_config.is_some() {
println!(" AI Agent: enabled");
}
if watch.use_profile {
println!(" Profile: enabled");
}
if !watch.tags.is_empty() {
println!(" Tags: {}", watch.tags.join(", "));
}
println!(" Checking every {}", format_interval(watch.interval_secs));
let mut config = Config::load()?;
if config.default_notify.is_none() && interactive && !yes {
println!();
if let Some(target) = super::prompt_notification_setup()? {
config.default_notify = Some(target);
config.save()?;
println!(" Notification settings saved.");
}
}
if !is_daemon_running() {
println!("\n Run `kto daemon` to start monitoring.");
}
Ok(())
}
fn apply_url_modifications(url: &str, result: &DeepResearchResult) -> String {
if let Some(ref mods) = result.url_modifications {
if let Some(ref variant) = mods.variant_param {
if !variant.is_empty() {
if url.contains('?') {
return format!("{}&variant={}", url, variant);
} else {
return format!("{}?variant={}", url, variant);
}
}
}
}
url.to_string()
}
fn display_research_results(result: &DeepResearchResult) {
println!();
println!(" {}", "Deep Research Results".bold().underline());
println!();
println!(" {}", result.summary);
if let Some(ref web) = result.web_research {
println!();
println!(" {}:", "Web Research".bold());
if !web.queries_made.is_empty() {
println!(" Searched: {}", web.queries_made.join(", "));
}
for finding in &web.relevant_findings {
println!(" • {}", finding);
}
if !web.api_endpoints.is_empty() {
println!();
println!(" Discovered APIs:");
for api in &web.api_endpoints {
let auth = if api.requires_auth { " (auth required)" } else { "" };
println!(" {} - {}{}", api.url_pattern, api.description, auth);
}
}
if !web.community_tips.is_empty() {
println!();
println!(" Community Tips:");
for tip in &web.community_tips {
println!(" • {}", tip);
}
}
}
if !result.discovered_feeds.is_empty() {
println!();
println!(" {}:", "Discovered Feeds".bold());
for feed in &result.discovered_feeds {
let intent_marker = if feed.matches_intent {
" ← matches intent".green().to_string()
} else {
"".to_string()
};
println!(" {} ({}, via {}){}", feed.url, feed.feed_type, feed.discovery_method, intent_marker);
}
}
println!();
println!(" {}:", "Recommended Approach".bold());
println!(" Engine: {}", result.engine.engine_type.cyan());
println!(" {}", result.engine.reason.dimmed());
println!(" Extraction: {}", result.extraction.strategy.cyan());
if let Some(ref sel) = result.extraction.selector {
println!(" Selector: {}", sel);
}
println!(" {}", result.extraction.reason.dimmed());
if let Some(ref mods) = result.url_modifications {
if mods.variant_param.is_some() {
println!(" URL Mod: variant={}", mods.variant_param.as_ref().unwrap().cyan());
println!(" {}", mods.reason.dimmed());
}
}
if !result.selectors.is_empty() {
println!();
println!(" Stable Selectors:");
for sel in &result.selectors {
let stability = format!("{:.0}%", sel.stability_score * 100.0);
println!(" {} ({})", sel.selector, stability.dimmed());
println!(" {}", sel.description.dimmed());
}
}
if !result.insights.is_empty() {
println!();
println!(" {}:", "Key Insights".bold());
for insight in &result.insights {
println!(" • {}", insight);
}
}
if let Some(ref instructions) = result.agent_instructions {
println!();
println!(" AI Instructions: \"{}\"", truncate_str(instructions, 60));
}
println!();
let confidence_color = if result.confidence >= 0.8 {
format!("{:.0}%", result.confidence * 100.0).green()
} else if result.confidence >= 0.5 {
format!("{:.0}%", result.confidence * 100.0).yellow()
} else {
format!("{:.0}%", result.confidence * 100.0).red()
};
println!(" Confidence: {}", confidence_color);
println!();
}
fn confirm_research_results(
url: &str,
result: &DeepResearchResult,
name_override: &Option<String>,
_default_interval: u64,
) -> Result<(String, String, Engine, Extraction, u64, bool, Option<String>)> {
let matching_feed = result.discovered_feeds.iter().find(|f| f.matches_intent);
let mut choices = vec!["Accept recommendations", "Customize"];
if matching_feed.is_some() {
choices.insert(1, "Use discovered feed");
}
choices.push("Cancel");
let choice = Select::new("What would you like to do?", choices)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
match choice {
"Accept recommendations" => {
let name = match name_override {
Some(n) => n.clone(),
None => {
let default_name = if let Ok(parsed) = url::Url::parse(url) {
generate_name_from_url(&parsed)
} else {
"Watch".to_string()
};
Text::new("Name for this watch?")
.with_default(&default_name)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?
}
};
let engine = result.engine.to_engine();
let extraction = match result.extraction.strategy.as_str() {
"selector" => {
if let Some(ref sel) = result.extraction.selector {
Extraction::Selector { selector: sel.clone() }
} else {
Extraction::Auto
}
}
"rss" => Extraction::Rss,
"json_ld" => Extraction::JsonLd { types: None },
_ => Extraction::Auto,
};
Ok((
name,
url.to_string(),
engine,
extraction,
result.interval_secs,
result.agent_instructions.is_some(),
result.agent_instructions.clone(),
))
}
"Use discovered feed" => {
let feed = matching_feed.expect("Feed should exist");
let name = match name_override {
Some(n) => n.clone(),
None => {
let default_name = feed.title.clone().unwrap_or_else(|| {
if let Ok(parsed) = url::Url::parse(&feed.url) {
generate_name_from_url(&parsed)
} else {
"Watch".to_string()
}
});
Text::new("Name for this watch?")
.with_default(&default_name)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?
}
};
println!(" Using feed: {}", feed.url.cyan());
Ok((
name,
feed.url.clone(),
Engine::Rss,
Extraction::Rss,
result.interval_secs,
false, None,
))
}
"Customize" => {
let name = Text::new("Name for this watch?")
.with_default(&name_override.clone().unwrap_or_else(|| {
if let Ok(parsed) = url::Url::parse(url) {
generate_name_from_url(&parsed)
} else {
"Watch".to_string()
}
}))
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
let engine_choices = vec!["HTTP", "JavaScript (Playwright)", "RSS"];
let default_engine_idx = match result.engine.engine_type.as_str() {
"playwright" | "js" => 1,
"rss" => 2,
_ => 0,
};
let engine_choice = Select::new("Engine:", engine_choices)
.with_starting_cursor(default_engine_idx)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
let engine = match engine_choice {
"JavaScript (Playwright)" => Engine::Playwright,
"RSS" => Engine::Rss,
_ => Engine::Http,
};
let extraction = if engine == Engine::Rss {
Extraction::Rss
} else {
let extraction_choices = vec!["Auto", "CSS Selector", "JSON-LD"];
let extraction_choice = Select::new("Extraction:", extraction_choices)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
match extraction_choice {
"CSS Selector" => {
let default_sel = result.extraction.selector.as_deref()
.or_else(|| result.selectors.first().map(|s| s.selector.as_str()))
.unwrap_or("");
let sel = Text::new("CSS Selector:")
.with_default(default_sel)
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
Extraction::Selector { selector: sel }
}
"JSON-LD" => Extraction::JsonLd { types: None },
_ => Extraction::Auto,
}
};
let interval_str = Text::new("Check interval (e.g., 5m, 1h)?")
.with_default(&format_interval(result.interval_secs))
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
let interval = crate::utils::parse_interval_str(&interval_str)
.unwrap_or(result.interval_secs);
let use_ai = Confirm::new("Enable AI analysis?")
.with_default(result.agent_instructions.is_some())
.prompt()
.unwrap_or(false);
let instructions = if use_ai {
let inst = Text::new("AI instructions:")
.with_default(result.agent_instructions.as_deref().unwrap_or(""))
.prompt()
.map_err(|e| kto::KtoError::ConfigError(e.to_string()))?;
if inst.is_empty() { None } else { Some(inst) }
} else {
None
};
Ok((name, url.to_string(), engine, extraction, interval, use_ai, instructions))
}
"Cancel" | _ => {
Err(kto::KtoError::ConfigError("Watch creation cancelled".into()))
}
}
}