1use std::io::Write;
8use std::path::Path;
9use std::time::Duration;
10
11use anyhow::{Context, anyhow};
12use jiff::Timestamp;
13
14use crate::config;
15use crate::storage::Db;
16use crate::storage::events::{EventRow, count_by_kind, last_for_task, range_since};
17use crate::storage::tasks::{TaskKind, TaskStatus, get, set_cancellation_requested};
18
19pub struct Args {
21 pub id: String,
22 pub monitor: bool,
23 pub cancel: bool,
24 pub format: OutputFormat,
25 pub from_event: Option<i64>,
26 pub expect_kind: Option<&'static str>,
30}
31
32#[derive(Debug, Clone, Copy)]
33pub enum OutputFormat {
34 Human,
35 Ndjson,
36}
37
38pub async fn run(args: Args, config_path: Option<&Path>) -> anyhow::Result<()> {
41 let mut out = std::io::stdout().lock();
42 run_with_writers(args, config_path, &mut out).await
43}
44
45pub async fn run_with_writers<W: Write>(
49 args: Args,
50 config_path: Option<&Path>,
51 out: &mut W,
52) -> anyhow::Result<()> {
53 let _cfg = config::load_resolved(config_path).context("loading config")?;
54 let data_dir = crate::paths::data_dir();
55 let db = Db::open(data_dir.join("rover.db"))
56 .await
57 .context("opening cache database")?;
58
59 let row = get(&db, &args.id)
60 .await?
61 .ok_or_else(|| anyhow!("task {} not found", args.id))?;
62 if let Some(want) = args.expect_kind
63 && row.kind.as_str() != want
64 {
65 return Err(anyhow!(
66 "task {} is kind={}, expected {}",
67 args.id,
68 row.kind.as_str(),
69 want
70 ));
71 }
72
73 if args.cancel {
74 let changed = set_cancellation_requested(&db, &args.id).await?;
75 if changed {
76 writeln!(out, "Cancellation requested for {}.", args.id)?;
77 } else if row.cancellation_requested {
78 writeln!(out, "Cancellation already requested for {}.", args.id)?;
79 } else {
80 writeln!(
81 out,
82 "Task {} is in a terminal state; nothing to cancel.",
83 args.id
84 )?;
85 }
86 return Ok(());
87 }
88
89 if args.monitor {
90 return monitor_loop(&db, &args, out).await;
91 }
92 print_snapshot(&db, &args, &row, out).await
93}
94
95async fn print_snapshot<W: Write>(
96 db: &Db,
97 args: &Args,
98 row: &crate::storage::tasks::TaskRow,
99 out: &mut W,
100) -> anyhow::Result<()> {
101 let liveness = check_liveness(db).await?;
102 let now_ms = Timestamp::now().as_millisecond();
103 let counts = count_by_kind(db, &args.id).await?;
104 let succeeded = counts
105 .iter()
106 .find(|(k, _)| k == "item_done")
107 .map(|(_, n)| *n)
108 .unwrap_or(0);
109 let failed = counts
110 .iter()
111 .find(|(k, _)| k == "item_failed")
112 .map(|(_, n)| *n)
113 .unwrap_or(0);
114 let started = counts
115 .iter()
116 .find(|(k, _)| k == "item_started")
117 .map(|(_, n)| *n)
118 .unwrap_or(0);
119 let total: i64 = if row.kind == TaskKind::BatchFetch {
120 serde_json::from_str::<serde_json::Value>(&row.params_json)
121 .ok()
122 .and_then(|v| {
123 v.get("urls")
124 .and_then(|u| u.as_array())
125 .map(|a| a.len() as i64)
126 })
127 .unwrap_or(0)
128 } else {
129 0
130 };
131 let in_flight = (started - succeeded - failed).max(0);
132 let last = last_for_task(db, &args.id).await?;
133
134 match args.format {
135 OutputFormat::Ndjson => {
136 let snap = serde_json::json!({
137 "ts": rfc3339(now_ms),
138 "kind": "snapshot",
139 "task_id": row.id,
140 "task_kind": row.kind.as_str(),
141 "status": row.status.as_str(),
142 "total": total,
143 "succeeded": succeeded,
144 "failed": failed,
145 "in_flight": in_flight,
146 "completed": succeeded + failed,
147 "started_at": rfc3339(row.created_at),
148 "last_event_id": last.as_ref().map(|e| e.id),
149 "eta_s": eta_seconds(succeeded, total, row.created_at, now_ms),
150 });
151 writeln!(out, "{snap}")?;
152 }
153 OutputFormat::Human => {
154 if let Some(warn) = liveness {
155 writeln!(out, "{warn}")?;
156 }
157 if row.kind == TaskKind::BatchFetch {
158 writeln!(out, "Batch {} — {}", row.id, row.status.as_str())?;
159 } else {
160 writeln!(
161 out,
162 "Task {} — {} (kind: {})",
163 row.id,
164 row.status.as_str(),
165 row.kind.as_str()
166 )?;
167 }
168 writeln!(out, "Started {}", relative_human(now_ms - row.created_at))?;
169 if row.kind == TaskKind::BatchFetch && total > 0 {
170 let pct = (succeeded + failed) * 100 / total;
171 writeln!(
172 out,
173 "Progress: {}/{} ({}%) ✓ {} ✗ {} ⋯ {} in flight",
174 succeeded + failed,
175 total,
176 pct,
177 succeeded,
178 failed,
179 in_flight,
180 )?;
181 if let Some(eta) = eta_seconds(succeeded, total, row.created_at, now_ms) {
182 writeln!(out, "ETA ~{eta}s")?;
183 }
184 }
185 if let Some(ev) = last.as_ref() {
186 writeln!(
187 out,
188 "Last event: {} ({})",
189 summarise_event(ev),
190 relative_human(now_ms - ev.ts)
191 )?;
192 }
193 if !row.status.is_terminal() {
194 writeln!(out, "Tip: use `rover task {} --cancel` to stop.", row.id)?;
195 }
196 }
197 }
198 Ok(())
199}
200
201async fn monitor_loop<W: Write>(db: &Db, args: &Args, out: &mut W) -> anyhow::Result<()> {
202 let mut last_seen = args.from_event.unwrap_or(0);
203 loop {
204 let rows = range_since(db, &args.id, last_seen, 1000).await?;
205 for r in &rows {
206 emit_wire_line(r, out)?;
207 last_seen = r.id;
208 }
209 if rows.is_empty() {
210 let row = get(db, &args.id)
211 .await?
212 .ok_or_else(|| anyhow!("task {} disappeared", args.id))?;
213 if row.status.is_terminal() {
214 let extras = range_since(db, &args.id, last_seen, 1000).await?;
217 for r in &extras {
218 emit_wire_line(r, out)?;
219 }
220 return match row.status {
221 TaskStatus::Completed => Ok(()),
222 TaskStatus::Failed => Err(anyhow!("task failed")),
223 TaskStatus::Cancelled => Err(anyhow!("task cancelled")),
224 _ => Ok(()),
225 };
226 }
227 tokio::time::sleep(Duration::from_millis(200)).await;
228 }
229 }
230}
231
232fn emit_wire_line<W: Write>(r: &EventRow, out: &mut W) -> anyhow::Result<()> {
233 let payload: serde_json::Value =
235 serde_json::from_str(&r.payload_json).unwrap_or(serde_json::json!({}));
236 let mut obj = serde_json::Map::new();
237 obj.insert("ts".into(), serde_json::Value::String(rfc3339(r.ts)));
238 obj.insert("kind".into(), serde_json::Value::String(r.kind.clone()));
239 obj.insert("event_id".into(), serde_json::Value::Number(r.id.into()));
240 if let serde_json::Value::Object(map) = payload {
241 for (k, v) in map {
242 obj.entry(k).or_insert(v);
243 }
244 }
245 writeln!(out, "{}", serde_json::Value::Object(obj))?;
246 Ok(())
247}
248
249async fn check_liveness(db: &Db) -> anyhow::Result<Option<String>> {
250 let servers = db.list_servers().await?;
251 let now_s = Timestamp::now().as_second();
252 let warn = match servers.iter().map(|s| s.last_heartbeat).max() {
253 None => Some("⚠ No `rover mcp` process appears to be alive.".to_string()),
254 Some(hb) if now_s - hb > 30 => Some(format!(
255 "⚠ Task is marked `running` but no `rover mcp` process appears to be alive (last heartbeat {}s ago).",
256 now_s - hb,
257 )),
258 _ => None,
259 };
260 Ok(warn)
261}
262
263fn relative_human(ms: i64) -> String {
264 let s = (ms / 1000).max(0);
265 if s < 60 {
266 format!("{s}s ago")
267 } else if s < 3600 {
268 format!("{}m{}s ago", s / 60, s % 60)
269 } else {
270 format!("{}h{}m ago", s / 3600, (s % 3600) / 60)
271 }
272}
273
274fn rfc3339(ms: i64) -> String {
275 let ts = Timestamp::from_millisecond(ms).unwrap_or_else(|_| Timestamp::now());
276 ts.to_string()
277}
278
279fn eta_seconds(succeeded: i64, total: i64, started_ms: i64, now_ms: i64) -> Option<i64> {
280 if succeeded < 3 || total == 0 {
281 return None;
282 }
283 let elapsed_ms = now_ms - started_ms;
284 if elapsed_ms <= 0 {
285 return None;
286 }
287 let avg_per_item = elapsed_ms as f64 / succeeded as f64;
288 let remaining = (total - succeeded).max(0) as f64;
289 Some(((avg_per_item * remaining) / 1000.0) as i64)
290}
291
292fn summarise_event(r: &EventRow) -> String {
293 let v: serde_json::Value =
294 serde_json::from_str(&r.payload_json).unwrap_or(serde_json::json!({}));
295 if let Some(url) = v.get("url").and_then(|u| u.as_str()) {
296 format!("{} {}", r.kind, url)
297 } else {
298 r.kind.clone()
299 }
300}