Skip to main content

rover/cli/
task.rs

1//! `rover task <id>` body (also drives `rover batch <id>` via `expect_kind`).
2//!
3//! Pure reader except for `--cancel`, which is a single UPDATE. Opens the
4//! cache database directly and queries `tasks` + `task_events`. No HTTP, no
5//! scheduler, no server-process responsibilities.
6
7use std::io::Write;
8use std::path::Path;
9use std::time::Duration;
10
11use anyhow::{Context, anyhow};
12use jiff::Timestamp;
13
14use crate::config;
15use crate::storage::Db;
16use crate::storage::events::{EventRow, count_by_kind, last_for_task, range_since};
17use crate::storage::tasks::{TaskKind, TaskStatus, get, set_cancellation_requested};
18
19/// Arguments shared by `rover task <id>` and `rover batch <id>`.
20pub struct Args {
21    pub id: String,
22    pub monitor: bool,
23    pub cancel: bool,
24    pub format: OutputFormat,
25    pub from_event: Option<i64>,
26    /// If `Some`, the loaded task's `kind` must match this string or the
27    /// command errors out. `rover batch` sets this to `Some("batch_fetch")`;
28    /// `rover task` leaves it `None`.
29    pub expect_kind: Option<&'static str>,
30}
31
32#[derive(Debug, Clone, Copy)]
33pub enum OutputFormat {
34    Human,
35    Ndjson,
36}
37
38/// Entry point used by `main.rs`. Locks stdout and delegates to
39/// [`run_with_writers`], which is the testable seam.
40pub async fn run(args: Args, config_path: Option<&Path>) -> anyhow::Result<()> {
41    let mut out = std::io::stdout().lock();
42    run_with_writers(args, config_path, &mut out).await
43}
44
45/// Library entry point used by integration tests. Accepts any `Write`
46/// implementation so the test suite can capture output without going through
47/// a subprocess.
48pub async fn run_with_writers<W: Write>(
49    args: Args,
50    config_path: Option<&Path>,
51    out: &mut W,
52) -> anyhow::Result<()> {
53    let _cfg = config::load_resolved(config_path).context("loading config")?;
54    let data_dir = crate::paths::data_dir();
55    let db = Db::open(data_dir.join("rover.db"))
56        .await
57        .context("opening cache database")?;
58
59    let row = get(&db, &args.id)
60        .await?
61        .ok_or_else(|| anyhow!("task {} not found", args.id))?;
62    if let Some(want) = args.expect_kind
63        && row.kind.as_str() != want
64    {
65        return Err(anyhow!(
66            "task {} is kind={}, expected {}",
67            args.id,
68            row.kind.as_str(),
69            want
70        ));
71    }
72
73    if args.cancel {
74        let changed = set_cancellation_requested(&db, &args.id).await?;
75        if changed {
76            writeln!(out, "Cancellation requested for {}.", args.id)?;
77        } else if row.cancellation_requested {
78            writeln!(out, "Cancellation already requested for {}.", args.id)?;
79        } else {
80            writeln!(
81                out,
82                "Task {} is in a terminal state; nothing to cancel.",
83                args.id
84            )?;
85        }
86        return Ok(());
87    }
88
89    if args.monitor {
90        return monitor_loop(&db, &args, out).await;
91    }
92    print_snapshot(&db, &args, &row, out).await
93}
94
95async fn print_snapshot<W: Write>(
96    db: &Db,
97    args: &Args,
98    row: &crate::storage::tasks::TaskRow,
99    out: &mut W,
100) -> anyhow::Result<()> {
101    let liveness = check_liveness(db).await?;
102    let now_ms = Timestamp::now().as_millisecond();
103    let counts = count_by_kind(db, &args.id).await?;
104    let succeeded = counts
105        .iter()
106        .find(|(k, _)| k == "item_done")
107        .map(|(_, n)| *n)
108        .unwrap_or(0);
109    let failed = counts
110        .iter()
111        .find(|(k, _)| k == "item_failed")
112        .map(|(_, n)| *n)
113        .unwrap_or(0);
114    let started = counts
115        .iter()
116        .find(|(k, _)| k == "item_started")
117        .map(|(_, n)| *n)
118        .unwrap_or(0);
119    let total: i64 = if row.kind == TaskKind::BatchFetch {
120        serde_json::from_str::<serde_json::Value>(&row.params_json)
121            .ok()
122            .and_then(|v| {
123                v.get("urls")
124                    .and_then(|u| u.as_array())
125                    .map(|a| a.len() as i64)
126            })
127            .unwrap_or(0)
128    } else {
129        0
130    };
131    let in_flight = (started - succeeded - failed).max(0);
132    let last = last_for_task(db, &args.id).await?;
133
134    match args.format {
135        OutputFormat::Ndjson => {
136            let snap = serde_json::json!({
137                "ts": rfc3339(now_ms),
138                "kind": "snapshot",
139                "task_id": row.id,
140                "task_kind": row.kind.as_str(),
141                "status": row.status.as_str(),
142                "total": total,
143                "succeeded": succeeded,
144                "failed": failed,
145                "in_flight": in_flight,
146                "completed": succeeded + failed,
147                "started_at": rfc3339(row.created_at),
148                "last_event_id": last.as_ref().map(|e| e.id),
149                "eta_s": eta_seconds(succeeded, total, row.created_at, now_ms),
150            });
151            writeln!(out, "{snap}")?;
152        }
153        OutputFormat::Human => {
154            if let Some(warn) = liveness {
155                writeln!(out, "{warn}")?;
156            }
157            if row.kind == TaskKind::BatchFetch {
158                writeln!(out, "Batch {} — {}", row.id, row.status.as_str())?;
159            } else {
160                writeln!(
161                    out,
162                    "Task {} — {} (kind: {})",
163                    row.id,
164                    row.status.as_str(),
165                    row.kind.as_str()
166                )?;
167            }
168            writeln!(out, "Started {}", relative_human(now_ms - row.created_at))?;
169            if row.kind == TaskKind::BatchFetch && total > 0 {
170                let pct = (succeeded + failed) * 100 / total;
171                writeln!(
172                    out,
173                    "Progress: {}/{} ({}%)  ✓ {}  ✗ {}  ⋯ {} in flight",
174                    succeeded + failed,
175                    total,
176                    pct,
177                    succeeded,
178                    failed,
179                    in_flight,
180                )?;
181                if let Some(eta) = eta_seconds(succeeded, total, row.created_at, now_ms) {
182                    writeln!(out, "ETA ~{eta}s")?;
183                }
184            }
185            if let Some(ev) = last.as_ref() {
186                writeln!(
187                    out,
188                    "Last event: {} ({})",
189                    summarise_event(ev),
190                    relative_human(now_ms - ev.ts)
191                )?;
192            }
193            if !row.status.is_terminal() {
194                writeln!(out, "Tip: use `rover task {} --cancel` to stop.", row.id)?;
195            }
196        }
197    }
198    Ok(())
199}
200
201async fn monitor_loop<W: Write>(db: &Db, args: &Args, out: &mut W) -> anyhow::Result<()> {
202    let mut last_seen = args.from_event.unwrap_or(0);
203    loop {
204        let rows = range_since(db, &args.id, last_seen, 1000).await?;
205        for r in &rows {
206            emit_wire_line(r, out)?;
207            last_seen = r.id;
208        }
209        if rows.is_empty() {
210            let row = get(db, &args.id)
211                .await?
212                .ok_or_else(|| anyhow!("task {} disappeared", args.id))?;
213            if row.status.is_terminal() {
214                // Drain one more time in case the terminal event was written
215                // between our SELECT and now.
216                let extras = range_since(db, &args.id, last_seen, 1000).await?;
217                for r in &extras {
218                    emit_wire_line(r, out)?;
219                }
220                return match row.status {
221                    TaskStatus::Completed => Ok(()),
222                    TaskStatus::Failed => Err(anyhow!("task failed")),
223                    TaskStatus::Cancelled => Err(anyhow!("task cancelled")),
224                    _ => Ok(()),
225                };
226            }
227            tokio::time::sleep(Duration::from_millis(200)).await;
228        }
229    }
230}
231
232fn emit_wire_line<W: Write>(r: &EventRow, out: &mut W) -> anyhow::Result<()> {
233    // DB payload is already a JSON object; merge with the envelope keys.
234    let payload: serde_json::Value =
235        serde_json::from_str(&r.payload_json).unwrap_or(serde_json::json!({}));
236    let mut obj = serde_json::Map::new();
237    obj.insert("ts".into(), serde_json::Value::String(rfc3339(r.ts)));
238    obj.insert("kind".into(), serde_json::Value::String(r.kind.clone()));
239    obj.insert("event_id".into(), serde_json::Value::Number(r.id.into()));
240    if let serde_json::Value::Object(map) = payload {
241        for (k, v) in map {
242            obj.entry(k).or_insert(v);
243        }
244    }
245    writeln!(out, "{}", serde_json::Value::Object(obj))?;
246    Ok(())
247}
248
249async fn check_liveness(db: &Db) -> anyhow::Result<Option<String>> {
250    let servers = db.list_servers().await?;
251    let now_s = Timestamp::now().as_second();
252    let warn = match servers.iter().map(|s| s.last_heartbeat).max() {
253        None => Some("⚠ No `rover mcp` process appears to be alive.".to_string()),
254        Some(hb) if now_s - hb > 30 => Some(format!(
255            "⚠ Task is marked `running` but no `rover mcp` process appears to be alive (last heartbeat {}s ago).",
256            now_s - hb,
257        )),
258        _ => None,
259    };
260    Ok(warn)
261}
262
263fn relative_human(ms: i64) -> String {
264    let s = (ms / 1000).max(0);
265    if s < 60 {
266        format!("{s}s ago")
267    } else if s < 3600 {
268        format!("{}m{}s ago", s / 60, s % 60)
269    } else {
270        format!("{}h{}m ago", s / 3600, (s % 3600) / 60)
271    }
272}
273
274fn rfc3339(ms: i64) -> String {
275    let ts = Timestamp::from_millisecond(ms).unwrap_or_else(|_| Timestamp::now());
276    ts.to_string()
277}
278
279fn eta_seconds(succeeded: i64, total: i64, started_ms: i64, now_ms: i64) -> Option<i64> {
280    if succeeded < 3 || total == 0 {
281        return None;
282    }
283    let elapsed_ms = now_ms - started_ms;
284    if elapsed_ms <= 0 {
285        return None;
286    }
287    let avg_per_item = elapsed_ms as f64 / succeeded as f64;
288    let remaining = (total - succeeded).max(0) as f64;
289    Some(((avg_per_item * remaining) / 1000.0) as i64)
290}
291
292fn summarise_event(r: &EventRow) -> String {
293    let v: serde_json::Value =
294        serde_json::from_str(&r.payload_json).unwrap_or(serde_json::json!({}));
295    if let Some(url) = v.get("url").and_then(|u| u.as_str()) {
296        format!("{} {}", r.kind, url)
297    } else {
298        r.kind.clone()
299    }
300}