1use std::path::{Path, PathBuf};
2
3use chrono::{DateTime, TimeZone, Utc};
4use rusqlite::{params, Connection, OptionalExtension, Row};
5use serde::{de::DeserializeOwned, Serialize};
6use serde_json::Value;
7use uuid::Uuid;
8
9use crate::{
10 error::{AppError, AppResult},
11 types::{
12 ControlContext, ErrorCode, EventRecord, EventType, ExecutionPlan, ExecutionSpec,
13 ResourceLimits, ResourceUsage, RuntimeErrorInfo, SandboxPolicy, SubmitTaskRequest,
14 TaskResourceReservation, TaskStatus,
15 },
16};
17
18#[derive(Debug, Clone)]
19pub struct Repository {
20 db_path: PathBuf,
21}
22
23#[derive(Debug, Clone)]
24pub struct TaskRecord {
25 pub task_id: String,
26 pub handle_id: String,
27 pub status: TaskStatus,
28 pub execution: ExecutionSpec,
29 pub limits: ResourceLimits,
30 pub sandbox: SandboxPolicy,
31 pub metadata: std::collections::BTreeMap<String, String>,
32 pub created_at: DateTime<Utc>,
33 pub updated_at: DateTime<Utc>,
34 pub started_at: Option<DateTime<Utc>>,
35 pub finished_at: Option<DateTime<Utc>>,
36 pub duration_ms: Option<u64>,
37 pub shim_pid: Option<u32>,
38 pub pid: Option<u32>,
39 pub pgid: Option<i32>,
40 pub exit_code: Option<i32>,
41 pub exit_signal: Option<i32>,
42 pub error_code: Option<ErrorCode>,
43 pub error: Option<RuntimeErrorInfo>,
44 pub usage: Option<ResourceUsage>,
45 pub task_dir: PathBuf,
46 pub workspace_dir: PathBuf,
47 pub request_path: PathBuf,
48 pub result_path: PathBuf,
49 pub stdout_path: PathBuf,
50 pub stderr_path: PathBuf,
51 pub script_path: Option<PathBuf>,
52 pub stdout_max_bytes: u64,
53 pub stderr_max_bytes: u64,
54 pub kill_requested: bool,
55 pub kill_requested_at: Option<DateTime<Utc>>,
56 pub timeout_triggered: bool,
57 pub result_json: Option<Value>,
58 pub execution_plan: Option<ExecutionPlan>,
59 pub control_context: Option<ControlContext>,
60 pub reservation: Option<TaskResourceReservation>,
61 pub reserved_at: Option<DateTime<Utc>>,
62 pub released_at: Option<DateTime<Utc>>,
63}
64
65#[derive(Debug, Clone)]
66pub struct NewTaskRecord {
67 pub task_id: String,
68 pub request: SubmitTaskRequest,
69 pub task_dir: PathBuf,
70 pub workspace_dir: PathBuf,
71 pub request_path: PathBuf,
72 pub result_path: PathBuf,
73 pub stdout_path: PathBuf,
74 pub stderr_path: PathBuf,
75 pub script_path: Option<PathBuf>,
76 pub execution_plan: ExecutionPlan,
77 pub control_context: Option<ControlContext>,
78}
79
80#[derive(Debug, Clone)]
81pub struct CompletionUpdate {
82 pub status: TaskStatus,
83 pub finished_at: DateTime<Utc>,
84 pub duration_ms: Option<u64>,
85 pub exit_code: Option<i32>,
86 pub exit_signal: Option<i32>,
87 pub error: Option<RuntimeErrorInfo>,
88 pub usage: Option<ResourceUsage>,
89 pub result_json: Option<Value>,
90}
91
92#[derive(Debug, Clone, Default)]
93pub struct MetricsSnapshot {
94 pub by_status: std::collections::BTreeMap<String, u64>,
95 pub by_error_code: std::collections::BTreeMap<String, u64>,
96 pub finished_durations_ms: Vec<u64>,
97}
98
99impl TaskRecord {
100 pub fn has_active_reservation(&self) -> bool {
101 self.reservation.is_some() && self.released_at.is_none()
102 }
103}
104
105impl Repository {
106 pub fn new(db_path: impl Into<PathBuf>) -> Self {
107 Self {
108 db_path: db_path.into(),
109 }
110 }
111
112 pub fn db_path(&self) -> &Path {
113 &self.db_path
114 }
115
116 pub fn init(&self) -> AppResult<()> {
117 if let Some(parent) = self.db_path.parent() {
118 std::fs::create_dir_all(parent)?;
119 }
120 let conn = self.connect()?;
121 conn.execute_batch(
122 r#"
123 PRAGMA journal_mode = WAL;
124 PRAGMA foreign_keys = ON;
125
126 CREATE TABLE IF NOT EXISTS tasks (
127 task_id TEXT PRIMARY KEY,
128 handle_id TEXT NOT NULL,
129 status TEXT NOT NULL,
130 execution_json TEXT NOT NULL,
131 limits_json TEXT NOT NULL,
132 sandbox_json TEXT NOT NULL,
133 metadata_json TEXT NOT NULL,
134 created_at_ms INTEGER NOT NULL,
135 updated_at_ms INTEGER NOT NULL,
136 started_at_ms INTEGER NULL,
137 finished_at_ms INTEGER NULL,
138 duration_ms INTEGER NULL,
139 shim_pid INTEGER NULL,
140 pid INTEGER NULL,
141 pgid INTEGER NULL,
142 exit_code INTEGER NULL,
143 exit_signal INTEGER NULL,
144 error_code TEXT NULL,
145 error_json TEXT NULL,
146 usage_json TEXT NULL,
147 task_dir TEXT NOT NULL,
148 workspace_dir TEXT NOT NULL,
149 request_path TEXT NOT NULL,
150 result_path TEXT NOT NULL,
151 stdout_path TEXT NOT NULL,
152 stderr_path TEXT NOT NULL,
153 script_path TEXT NULL,
154 stdout_max_bytes INTEGER NOT NULL,
155 stderr_max_bytes INTEGER NOT NULL,
156 kill_requested INTEGER NOT NULL DEFAULT 0,
157 kill_requested_at_ms INTEGER NULL,
158 timeout_triggered INTEGER NOT NULL DEFAULT 0,
159 result_json TEXT NULL,
160 execution_plan_json TEXT NULL,
161 control_context_json TEXT NULL,
162 reservation_json TEXT NULL,
163 reserved_at_ms INTEGER NULL,
164 released_at_ms INTEGER NULL
165 );
166
167 CREATE TABLE IF NOT EXISTS task_events (
168 seq INTEGER PRIMARY KEY AUTOINCREMENT,
169 task_id TEXT NOT NULL,
170 event_type TEXT NOT NULL,
171 timestamp_ms INTEGER NOT NULL,
172 message TEXT NULL,
173 data_json TEXT NULL,
174 FOREIGN KEY(task_id) REFERENCES tasks(task_id) ON DELETE CASCADE
175 );
176
177 CREATE INDEX IF NOT EXISTS idx_tasks_status_created ON tasks(status, created_at_ms);
178 CREATE INDEX IF NOT EXISTS idx_tasks_finished_at ON tasks(finished_at_ms);
179 CREATE INDEX IF NOT EXISTS idx_task_events_task_id_seq ON task_events(task_id, seq);
180 "#,
181 )?;
182 ensure_task_column(&conn, "execution_plan_json", "TEXT NULL")?;
183 ensure_task_column(&conn, "control_context_json", "TEXT NULL")?;
184 ensure_task_column(&conn, "reservation_json", "TEXT NULL")?;
185 ensure_task_column(&conn, "reserved_at_ms", "INTEGER NULL")?;
186 ensure_task_column(&conn, "released_at_ms", "INTEGER NULL")?;
187 Ok(())
188 }
189
190 pub fn insert_task(&self, new_task: &NewTaskRecord) -> AppResult<()> {
191 let now = Utc::now();
192 let conn = self.connect()?;
193 let tx = conn.unchecked_transaction()?;
194 tx.execute(
195 r#"
196 INSERT INTO tasks (
197 task_id, handle_id, status,
198 execution_json, limits_json, sandbox_json, metadata_json,
199 created_at_ms, updated_at_ms,
200 task_dir, workspace_dir, request_path, result_path, stdout_path, stderr_path, script_path,
201 stdout_max_bytes, stderr_max_bytes, execution_plan_json, control_context_json
202 ) VALUES (
203 ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
204 )
205 "#,
206 params![
207 new_task.task_id,
208 new_task.task_id,
209 encode_status(TaskStatus::Accepted),
210 to_json(&new_task.request.execution)?,
211 to_json(&new_task.request.limits)?,
212 to_json(&new_task.request.sandbox)?,
213 to_json(&new_task.request.metadata)?,
214 now.timestamp_millis(),
215 now.timestamp_millis(),
216 new_task.task_dir.to_string_lossy().to_string(),
217 new_task.workspace_dir.to_string_lossy().to_string(),
218 new_task.request_path.to_string_lossy().to_string(),
219 new_task.result_path.to_string_lossy().to_string(),
220 new_task.stdout_path.to_string_lossy().to_string(),
221 new_task.stderr_path.to_string_lossy().to_string(),
222 new_task
223 .script_path
224 .as_ref()
225 .map(|p| p.to_string_lossy().to_string()),
226 i64::try_from(new_task.request.limits.stdout_max_bytes)
227 .map_err(|_| AppError::InvalidInput("stdout_max_bytes is too large".into()))?,
228 i64::try_from(new_task.request.limits.stderr_max_bytes)
229 .map_err(|_| AppError::InvalidInput("stderr_max_bytes is too large".into()))?,
230 to_json(&new_task.execution_plan)?,
231 new_task.control_context.as_ref().map(to_json).transpose()?,
232 ],
233 )
234 .map_err(|err| {
235 if let rusqlite::Error::SqliteFailure(code, _) = &err {
236 if code.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_PRIMARYKEY {
237 return AppError::Conflict(format!("task {} already exists", new_task.task_id));
238 }
239 }
240 AppError::Sqlite(err)
241 })?;
242 insert_event_tx(
243 &tx,
244 &new_task.task_id,
245 EventType::Submitted,
246 Some("task submitted"),
247 None,
248 )?;
249 insert_event_tx(
250 &tx,
251 &new_task.task_id,
252 EventType::Accepted,
253 Some("task accepted"),
254 None,
255 )?;
256 insert_event_tx(
257 &tx,
258 &new_task.task_id,
259 EventType::Planned,
260 Some("execution plan resolved"),
261 Some(&serde_json::to_value(&new_task.execution_plan)?),
262 )?;
263 if new_task.execution_plan.degraded {
264 insert_event_tx(
265 &tx,
266 &new_task.task_id,
267 EventType::Degraded,
268 Some("execution plan degraded"),
269 Some(&serde_json::json!({
270 "fallback_reasons": &new_task.execution_plan.fallback_reasons,
271 "effective_sandbox": &new_task.execution_plan.effective_sandbox,
272 })),
273 )?;
274 }
275 tx.commit()?;
276 Ok(())
277 }
278
279 pub fn get_task(&self, task_id: &str) -> AppResult<TaskRecord> {
280 let conn = self.connect()?;
281 let task = conn
282 .query_row(
283 "SELECT * FROM tasks WHERE task_id = ?1",
284 params![task_id],
285 row_to_task_record,
286 )
287 .optional()?;
288 task.ok_or_else(|| AppError::NotFound(task_id.to_string()))
289 }
290
291 pub fn list_events(&self, task_id: &str) -> AppResult<Vec<EventRecord>> {
292 let conn = self.connect()?;
293 let mut stmt = conn.prepare(
294 "SELECT seq, task_id, event_type, timestamp_ms, message, data_json FROM task_events WHERE task_id = ?1 ORDER BY seq ASC",
295 )?;
296 let iter = stmt.query_map(params![task_id], |row| {
297 Ok(EventRecord {
298 seq: row.get(0)?,
299 task_id: row.get(1)?,
300 event_type: decode_event_type(row.get::<_, String>(2)?.as_str())?,
301 timestamp: ts_millis_to_utc(row.get(3)?),
302 message: row.get(4)?,
303 data: opt_json_value(row.get(5)?)?,
304 })
305 })?;
306 let mut events = Vec::new();
307 for item in iter {
308 events.push(item?);
309 }
310 Ok(events)
311 }
312
313 pub fn count_accepted(&self) -> AppResult<u64> {
314 self.count_by_status(TaskStatus::Accepted)
315 }
316
317 pub fn count_running(&self) -> AppResult<u64> {
318 self.count_by_status(TaskStatus::Running)
319 }
320
321 pub fn count_by_status(&self, status: TaskStatus) -> AppResult<u64> {
322 let conn = self.connect()?;
323 let count: i64 = conn.query_row(
324 "SELECT COUNT(*) FROM tasks WHERE status = ?1",
325 params![encode_status(status)],
326 |row| row.get(0),
327 )?;
328 Ok(count.max(0) as u64)
329 }
330
331 pub fn list_accepted(&self, limit: usize) -> AppResult<Vec<TaskRecord>> {
332 let conn = self.connect()?;
333 let mut stmt = conn.prepare(
334 "SELECT * FROM tasks WHERE status = 'accepted' ORDER BY created_at_ms ASC LIMIT ?1",
335 )?;
336 let iter = stmt.query_map(params![limit as i64], row_to_task_record)?;
337 let mut items = Vec::new();
338 for item in iter {
339 items.push(item?);
340 }
341 Ok(items)
342 }
343
344 pub fn list_non_terminal(&self) -> AppResult<Vec<TaskRecord>> {
345 let conn = self.connect()?;
346 let mut stmt = conn.prepare(
347 "SELECT * FROM tasks WHERE status IN ('accepted', 'running') ORDER BY created_at_ms ASC",
348 )?;
349 let iter = stmt.query_map([], row_to_task_record)?;
350 let mut items = Vec::new();
351 for item in iter {
352 items.push(item?);
353 }
354 Ok(items)
355 }
356
357 pub fn list_active_reservations(&self) -> AppResult<Vec<TaskRecord>> {
358 let conn = self.connect()?;
359 let mut stmt = conn.prepare(
360 "SELECT * FROM tasks WHERE reservation_json IS NOT NULL AND released_at_ms IS NULL ORDER BY reserved_at_ms ASC, created_at_ms ASC",
361 )?;
362 let iter = stmt.query_map([], row_to_task_record)?;
363 let mut items = Vec::new();
364 for item in iter {
365 items.push(item?);
366 }
367 Ok(items)
368 }
369
370 pub fn count_accepted_waiting(&self) -> AppResult<u64> {
371 let conn = self.connect()?;
372 let count: i64 = conn.query_row(
373 "SELECT COUNT(*) FROM tasks WHERE status = 'accepted' AND (reservation_json IS NULL OR released_at_ms IS NOT NULL)",
374 [],
375 |row| row.get(0),
376 )?;
377 Ok(count.max(0) as u64)
378 }
379
380 pub fn mark_dispatched(&self, task_id: &str, shim_pid: u32) -> AppResult<()> {
381 let now = Utc::now().timestamp_millis();
382 let conn = self.connect()?;
383 conn.execute(
384 "UPDATE tasks SET status = 'running', shim_pid = ?2, updated_at_ms = ?3 WHERE task_id = ?1 AND status = 'accepted'",
385 params![task_id, i64::from(shim_pid), now],
386 )?;
387 Ok(())
388 }
389
390 pub fn mark_started(
391 &self,
392 task_id: &str,
393 pid: u32,
394 pgid: i32,
395 script_path: Option<&Path>,
396 ) -> AppResult<()> {
397 let now = Utc::now();
398 let conn = self.connect()?;
399 let tx = conn.unchecked_transaction()?;
400 tx.execute(
401 "UPDATE tasks SET status = 'running', pid = ?2, pgid = ?3, started_at_ms = ?4, updated_at_ms = ?4, script_path = COALESCE(?5, script_path) WHERE task_id = ?1",
402 params![
403 task_id,
404 i64::from(pid),
405 pgid,
406 now.timestamp_millis(),
407 script_path.map(|p| p.to_string_lossy().to_string())
408 ],
409 )?;
410 insert_event_tx(&tx, task_id, EventType::Started, Some("task started"), None)?;
411 tx.commit()?;
412 Ok(())
413 }
414
415 pub fn reserve_resources(
416 &self,
417 task_id: &str,
418 reservation: &TaskResourceReservation,
419 message: &str,
420 ) -> AppResult<()> {
421 let now = Utc::now();
422 let conn = self.connect()?;
423 let tx = conn.unchecked_transaction()?;
424 tx.execute(
425 "UPDATE tasks SET reservation_json = ?2, reserved_at_ms = ?3, released_at_ms = NULL, updated_at_ms = ?3 WHERE task_id = ?1",
426 params![
427 task_id,
428 to_json(reservation)?,
429 now.timestamp_millis(),
430 ],
431 )?;
432 insert_event_tx(
433 &tx,
434 task_id,
435 EventType::ResourceReserved,
436 Some(message),
437 Some(&serde_json::to_value(reservation)?),
438 )?;
439 tx.commit()?;
440 Ok(())
441 }
442
443 pub fn release_resources(&self, task_id: &str, message: &str) -> AppResult<()> {
444 let now = Utc::now();
445 let conn = self.connect()?;
446 let tx = conn.unchecked_transaction()?;
447 let reservation_json: Option<String> = tx
448 .query_row(
449 "SELECT reservation_json FROM tasks WHERE task_id = ?1 AND reservation_json IS NOT NULL AND released_at_ms IS NULL",
450 params![task_id],
451 |row| row.get(0),
452 )
453 .optional()?;
454 if let Some(raw) = reservation_json {
455 let reservation: TaskResourceReservation =
456 serde_json::from_str(&raw).map_err(AppError::Json)?;
457 tx.execute(
458 "UPDATE tasks SET released_at_ms = ?2, updated_at_ms = ?2 WHERE task_id = ?1",
459 params![task_id, now.timestamp_millis()],
460 )?;
461 insert_event_tx(
462 &tx,
463 task_id,
464 EventType::ResourceReleased,
465 Some(message),
466 Some(&serde_json::to_value(reservation)?),
467 )?;
468 }
469 tx.commit()?;
470 Ok(())
471 }
472
473 pub fn set_cancel_requested(&self, task_id: &str) -> AppResult<TaskRecord> {
474 let now = Utc::now();
475 let conn = self.connect()?;
476 let tx = conn.unchecked_transaction()?;
477 tx.execute(
478 "UPDATE tasks SET kill_requested = 1, kill_requested_at_ms = ?2, updated_at_ms = ?2 WHERE task_id = ?1",
479 params![task_id, now.timestamp_millis()],
480 )?;
481 insert_event_tx(
482 &tx,
483 task_id,
484 EventType::KillRequested,
485 Some("kill requested"),
486 None,
487 )?;
488 tx.commit()?;
489 self.get_task(task_id)
490 }
491
492 pub fn mark_timeout_triggered(&self, task_id: &str) -> AppResult<()> {
493 let now = Utc::now();
494 let conn = self.connect()?;
495 let tx = conn.unchecked_transaction()?;
496 tx.execute(
497 "UPDATE tasks SET timeout_triggered = 1, updated_at_ms = ?2 WHERE task_id = ?1",
498 params![task_id, now.timestamp_millis()],
499 )?;
500 insert_event_tx(
501 &tx,
502 task_id,
503 EventType::TimeoutTriggered,
504 Some("timeout triggered"),
505 None,
506 )?;
507 tx.commit()?;
508 Ok(())
509 }
510
511 pub fn cancel_accepted_task(&self, task_id: &str, error: RuntimeErrorInfo) -> AppResult<()> {
512 let now = Utc::now();
513 let conn = self.connect()?;
514 let tx = conn.unchecked_transaction()?;
515 let active_reservation_json: Option<String> = tx
516 .query_row(
517 "SELECT reservation_json FROM tasks WHERE task_id = ?1 AND reservation_json IS NOT NULL AND released_at_ms IS NULL",
518 params![task_id],
519 |row| row.get(0),
520 )
521 .optional()?;
522 tx.execute(
523 r#"
524 UPDATE tasks
525 SET status = 'cancelled',
526 updated_at_ms = ?2,
527 finished_at_ms = ?2,
528 released_at_ms = CASE
529 WHEN released_at_ms IS NULL AND reservation_json IS NOT NULL THEN ?2
530 ELSE released_at_ms
531 END,
532 error_code = ?3,
533 error_json = ?4,
534 duration_ms = 0,
535 result_json = ?5
536 WHERE task_id = ?1 AND status = 'accepted'
537 "#,
538 params![
539 task_id,
540 now.timestamp_millis(),
541 encode_error_code(error.code),
542 to_json(&error)?,
543 to_json(&serde_json::json!({
544 "task_id": task_id,
545 "handle_id": task_id,
546 "status": TaskStatus::Cancelled,
547 "finished_at": now,
548 "error": error,
549 }))?,
550 ],
551 )?;
552 if let Some(raw) = active_reservation_json {
553 let reservation: TaskResourceReservation =
554 serde_json::from_str(&raw).map_err(AppError::Json)?;
555 insert_event_tx(
556 &tx,
557 task_id,
558 EventType::ResourceReleased,
559 Some("task resources released"),
560 Some(&serde_json::to_value(reservation)?),
561 )?;
562 }
563 insert_event_tx(
564 &tx,
565 task_id,
566 EventType::Cancelled,
567 Some("task cancelled"),
568 None,
569 )?;
570 tx.commit()?;
571 Ok(())
572 }
573
574 pub fn complete_task(&self, task_id: &str, update: &CompletionUpdate) -> AppResult<()> {
575 let conn = self.connect()?;
576 let tx = conn.unchecked_transaction()?;
577 let active_reservation_json: Option<String> = tx
578 .query_row(
579 "SELECT reservation_json FROM tasks WHERE task_id = ?1 AND reservation_json IS NOT NULL AND released_at_ms IS NULL",
580 params![task_id],
581 |row| row.get(0),
582 )
583 .optional()?;
584 tx.execute(
585 r#"
586 UPDATE tasks
587 SET status = ?2,
588 updated_at_ms = ?3,
589 finished_at_ms = ?3,
590 released_at_ms = CASE
591 WHEN released_at_ms IS NULL AND reservation_json IS NOT NULL THEN ?3
592 ELSE released_at_ms
593 END,
594 duration_ms = ?4,
595 exit_code = ?5,
596 exit_signal = ?6,
597 error_code = ?7,
598 error_json = ?8,
599 usage_json = ?9,
600 result_json = ?10
601 WHERE task_id = ?1
602 "#,
603 params![
604 task_id,
605 encode_status(update.status.clone()),
606 update.finished_at.timestamp_millis(),
607 update
608 .duration_ms
609 .map(i64::try_from)
610 .transpose()
611 .map_err(|_| {
612 AppError::InvalidInput("duration_ms is too large to persist".into())
613 })?,
614 update.exit_code,
615 update.exit_signal,
616 update.error.as_ref().map(|e| encode_error_code(e.code)),
617 update.error.as_ref().map(to_json).transpose()?,
618 update.usage.as_ref().map(to_json).transpose()?,
619 update.result_json.as_ref().map(to_json).transpose()?,
620 ],
621 )?;
622
623 if let Some(raw) = active_reservation_json {
624 let reservation: TaskResourceReservation =
625 serde_json::from_str(&raw).map_err(AppError::Json)?;
626 insert_event_tx(
627 &tx,
628 task_id,
629 EventType::ResourceReleased,
630 Some("task resources released"),
631 Some(&serde_json::to_value(reservation)?),
632 )?;
633 }
634
635 let event_type = match update.status {
636 TaskStatus::Success => EventType::Finished,
637 TaskStatus::Failed => EventType::Failed,
638 TaskStatus::Cancelled => EventType::Cancelled,
639 TaskStatus::Accepted | TaskStatus::Running => EventType::Finished,
640 };
641 let message = match update.status {
642 TaskStatus::Success => Some("task finished"),
643 TaskStatus::Failed => Some("task failed"),
644 TaskStatus::Cancelled => Some("task cancelled"),
645 TaskStatus::Accepted | TaskStatus::Running => Some("task finished"),
646 };
647 insert_event_tx(&tx, task_id, event_type, message, None)?;
648 tx.commit()?;
649 Ok(())
650 }
651
652 pub fn mark_recovered(&self, task_id: &str) -> AppResult<()> {
653 let now = Utc::now();
654 let conn = self.connect()?;
655 let tx = conn.unchecked_transaction()?;
656 tx.execute(
657 "UPDATE tasks SET updated_at_ms = ?2 WHERE task_id = ?1 AND status = 'running'",
658 params![task_id, now.timestamp_millis()],
659 )?;
660 insert_event_tx(
661 &tx,
662 task_id,
663 EventType::Recovered,
664 Some("task recovered"),
665 None,
666 )?;
667 tx.commit()?;
668 Ok(())
669 }
670
671 pub fn mark_recovery_lost(&self, task_id: &str) -> AppResult<()> {
672 let update = CompletionUpdate {
673 status: TaskStatus::Failed,
674 finished_at: Utc::now(),
675 duration_ms: Some(0),
676 exit_code: None,
677 exit_signal: None,
678 error: Some(RuntimeErrorInfo {
679 code: ErrorCode::Internal,
680 message: "recovery_lost".into(),
681 details: None,
682 }),
683 usage: None,
684 result_json: None,
685 };
686 self.complete_task(task_id, &update)
687 }
688
689 pub fn is_cancel_requested(&self, task_id: &str) -> AppResult<bool> {
690 let conn = self.connect()?;
691 let flag: i64 = conn.query_row(
692 "SELECT kill_requested FROM tasks WHERE task_id = ?1",
693 params![task_id],
694 |row| row.get(0),
695 )?;
696 Ok(flag != 0)
697 }
698
699 pub fn list_gc_candidates(&self, finished_before: DateTime<Utc>) -> AppResult<Vec<TaskRecord>> {
700 let conn = self.connect()?;
701 let mut stmt = conn.prepare(
702 "SELECT * FROM tasks WHERE status IN ('success', 'failed', 'cancelled') AND finished_at_ms IS NOT NULL AND finished_at_ms <= ?1 ORDER BY finished_at_ms ASC",
703 )?;
704 let iter = stmt.query_map(
705 params![finished_before.timestamp_millis()],
706 row_to_task_record,
707 )?;
708 let mut items = Vec::new();
709 for item in iter {
710 items.push(item?);
711 }
712 Ok(items)
713 }
714
715 pub fn delete_task(&self, task_id: &str) -> AppResult<()> {
716 let conn = self.connect()?;
717 conn.execute("DELETE FROM tasks WHERE task_id = ?1", params![task_id])?;
718 Ok(())
719 }
720
721 pub fn metrics_snapshot(&self) -> AppResult<MetricsSnapshot> {
722 let conn = self.connect()?;
723 let mut snapshot = MetricsSnapshot::default();
724
725 let mut status_stmt = conn.prepare("SELECT status, COUNT(*) FROM tasks GROUP BY status")?;
726 let status_rows = status_stmt.query_map([], |row| {
727 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
728 })?;
729 for item in status_rows {
730 let (status, count) = item?;
731 snapshot.by_status.insert(status, count.max(0) as u64);
732 }
733
734 let mut err_stmt = conn.prepare(
735 "SELECT error_code, COUNT(*) FROM tasks WHERE error_code IS NOT NULL GROUP BY error_code",
736 )?;
737 let err_rows = err_stmt.query_map([], |row| {
738 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
739 })?;
740 for item in err_rows {
741 let (code, count) = item?;
742 snapshot.by_error_code.insert(code, count.max(0) as u64);
743 }
744
745 let mut duration_stmt =
746 conn.prepare("SELECT duration_ms FROM tasks WHERE duration_ms IS NOT NULL")?;
747 let duration_rows = duration_stmt.query_map([], |row| row.get::<_, i64>(0))?;
748 for item in duration_rows {
749 snapshot.finished_durations_ms.push(item?.max(0) as u64);
750 }
751
752 Ok(snapshot)
753 }
754
755 fn connect(&self) -> AppResult<Connection> {
756 let conn = Connection::open(&self.db_path)?;
757 conn.busy_timeout(std::time::Duration::from_secs(5))?;
758 conn.pragma_update(None, "foreign_keys", "ON")?;
759 conn.pragma_update(None, "journal_mode", "WAL")?;
760 Ok(conn)
761 }
762}
763
764pub fn generate_task_id() -> String {
765 Uuid::new_v4().to_string()
766}
767
768fn ensure_task_column(conn: &Connection, name: &str, definition: &str) -> AppResult<()> {
769 let mut stmt = conn.prepare("PRAGMA table_info(tasks)")?;
770 let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
771 for column in columns {
772 if column? == name {
773 return Ok(());
774 }
775 }
776 conn.execute(
777 &format!("ALTER TABLE tasks ADD COLUMN {name} {definition}"),
778 [],
779 )?;
780 Ok(())
781}
782
783fn row_to_task_record(row: &Row<'_>) -> rusqlite::Result<TaskRecord> {
784 Ok(TaskRecord {
785 task_id: row.get("task_id")?,
786 handle_id: row.get("handle_id")?,
787 status: decode_status(row.get::<_, String>("status")?.as_str())?,
788 execution: from_json(row.get("execution_json")?)?,
789 limits: from_json(row.get("limits_json")?)?,
790 sandbox: from_json(row.get("sandbox_json")?)?,
791 metadata: from_json(row.get("metadata_json")?)?,
792 created_at: ts_millis_to_utc(row.get("created_at_ms")?),
793 updated_at: ts_millis_to_utc(row.get("updated_at_ms")?),
794 started_at: row
795 .get::<_, Option<i64>>("started_at_ms")?
796 .map(ts_millis_to_utc),
797 finished_at: row
798 .get::<_, Option<i64>>("finished_at_ms")?
799 .map(ts_millis_to_utc),
800 duration_ms: row
801 .get::<_, Option<i64>>("duration_ms")?
802 .map(|value| value.max(0) as u64),
803 shim_pid: row
804 .get::<_, Option<i64>>("shim_pid")?
805 .map(|value| value as u32),
806 pid: row.get::<_, Option<i64>>("pid")?.map(|value| value as u32),
807 pgid: row.get("pgid")?,
808 exit_code: row.get("exit_code")?,
809 exit_signal: row.get("exit_signal")?,
810 error_code: row
811 .get::<_, Option<String>>("error_code")?
812 .map(|value| decode_error_code(value.as_str()))
813 .transpose()?,
814 error: row
815 .get::<_, Option<String>>("error_json")?
816 .map(from_json)
817 .transpose()?,
818 usage: row
819 .get::<_, Option<String>>("usage_json")?
820 .map(from_json)
821 .transpose()?,
822 task_dir: PathBuf::from(row.get::<_, String>("task_dir")?),
823 workspace_dir: PathBuf::from(row.get::<_, String>("workspace_dir")?),
824 request_path: PathBuf::from(row.get::<_, String>("request_path")?),
825 result_path: PathBuf::from(row.get::<_, String>("result_path")?),
826 stdout_path: PathBuf::from(row.get::<_, String>("stdout_path")?),
827 stderr_path: PathBuf::from(row.get::<_, String>("stderr_path")?),
828 script_path: row
829 .get::<_, Option<String>>("script_path")?
830 .map(PathBuf::from),
831 stdout_max_bytes: row.get::<_, i64>("stdout_max_bytes")?.max(0) as u64,
832 stderr_max_bytes: row.get::<_, i64>("stderr_max_bytes")?.max(0) as u64,
833 kill_requested: row.get::<_, i64>("kill_requested")? != 0,
834 kill_requested_at: row
835 .get::<_, Option<i64>>("kill_requested_at_ms")?
836 .map(ts_millis_to_utc),
837 timeout_triggered: row.get::<_, i64>("timeout_triggered")? != 0,
838 result_json: row
839 .get::<_, Option<String>>("result_json")?
840 .map(from_json)
841 .transpose()?,
842 execution_plan: row
843 .get::<_, Option<String>>("execution_plan_json")?
844 .map(from_json)
845 .transpose()?,
846 control_context: row
847 .get::<_, Option<String>>("control_context_json")?
848 .map(from_json)
849 .transpose()?,
850 reservation: row
851 .get::<_, Option<String>>("reservation_json")?
852 .map(from_json)
853 .transpose()?,
854 reserved_at: row
855 .get::<_, Option<i64>>("reserved_at_ms")?
856 .map(ts_millis_to_utc),
857 released_at: row
858 .get::<_, Option<i64>>("released_at_ms")?
859 .map(ts_millis_to_utc),
860 })
861}
862
863fn insert_event_tx(
864 tx: &rusqlite::Transaction<'_>,
865 task_id: &str,
866 event_type: EventType,
867 message: Option<&str>,
868 data: Option<&Value>,
869) -> AppResult<()> {
870 tx.execute(
871 "INSERT INTO task_events (task_id, event_type, timestamp_ms, message, data_json) VALUES (?1, ?2, ?3, ?4, ?5)",
872 params![
873 task_id,
874 encode_event_type(event_type),
875 Utc::now().timestamp_millis(),
876 message,
877 data.map(to_json).transpose()?,
878 ],
879 )?;
880 Ok(())
881}
882
883fn to_json<T: Serialize>(value: &T) -> AppResult<String> {
884 Ok(serde_json::to_string(value)?)
885}
886
887fn from_json<T: DeserializeOwned>(raw: String) -> rusqlite::Result<T> {
888 serde_json::from_str(&raw).map_err(|err| {
889 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err))
890 })
891}
892
893fn opt_json_value(raw: Option<String>) -> rusqlite::Result<Option<Value>> {
894 raw.map(from_json).transpose()
895}
896
897fn encode_status(status: TaskStatus) -> &'static str {
898 match status {
899 TaskStatus::Accepted => "accepted",
900 TaskStatus::Running => "running",
901 TaskStatus::Success => "success",
902 TaskStatus::Failed => "failed",
903 TaskStatus::Cancelled => "cancelled",
904 }
905}
906
907fn decode_status(value: &str) -> rusqlite::Result<TaskStatus> {
908 match value {
909 "accepted" => Ok(TaskStatus::Accepted),
910 "running" => Ok(TaskStatus::Running),
911 "success" => Ok(TaskStatus::Success),
912 "failed" => Ok(TaskStatus::Failed),
913 "cancelled" => Ok(TaskStatus::Cancelled),
914 other => Err(rusqlite::Error::InvalidColumnType(
915 0,
916 other.into(),
917 rusqlite::types::Type::Text,
918 )),
919 }
920}
921
922fn encode_error_code(code: ErrorCode) -> &'static str {
923 match code {
924 ErrorCode::InvalidInput => "invalid_input",
925 ErrorCode::LaunchFailed => "launch_failed",
926 ErrorCode::Timeout => "timeout",
927 ErrorCode::Cancelled => "cancelled",
928 ErrorCode::MemoryLimitExceeded => "memory_limit_exceeded",
929 ErrorCode::CpuLimitExceeded => "cpu_limit_exceeded",
930 ErrorCode::ResourceLimitExceeded => "resource_limit_exceeded",
931 ErrorCode::SandboxSetupFailed => "sandbox_setup_failed",
932 ErrorCode::ExitNonZero => "exit_nonzero",
933 ErrorCode::UnsupportedCapability => "unsupported_capability",
934 ErrorCode::InsufficientResources => "insufficient_resources",
935 ErrorCode::Internal => "internal",
936 }
937}
938
939fn decode_error_code(value: &str) -> rusqlite::Result<ErrorCode> {
940 match value {
941 "invalid_input" => Ok(ErrorCode::InvalidInput),
942 "launch_failed" => Ok(ErrorCode::LaunchFailed),
943 "timeout" => Ok(ErrorCode::Timeout),
944 "cancelled" => Ok(ErrorCode::Cancelled),
945 "memory_limit_exceeded" => Ok(ErrorCode::MemoryLimitExceeded),
946 "cpu_limit_exceeded" => Ok(ErrorCode::CpuLimitExceeded),
947 "resource_limit_exceeded" => Ok(ErrorCode::ResourceLimitExceeded),
948 "sandbox_setup_failed" => Ok(ErrorCode::SandboxSetupFailed),
949 "exit_nonzero" => Ok(ErrorCode::ExitNonZero),
950 "unsupported_capability" => Ok(ErrorCode::UnsupportedCapability),
951 "insufficient_resources" => Ok(ErrorCode::InsufficientResources),
952 "internal" => Ok(ErrorCode::Internal),
953 other => Err(rusqlite::Error::InvalidColumnType(
954 0,
955 other.into(),
956 rusqlite::types::Type::Text,
957 )),
958 }
959}
960
961fn encode_event_type(event_type: EventType) -> &'static str {
962 match event_type {
963 EventType::Submitted => "submitted",
964 EventType::Accepted => "accepted",
965 EventType::Planned => "planned",
966 EventType::Degraded => "degraded",
967 EventType::ResourceReserved => "resource_reserved",
968 EventType::ResourceReleased => "resource_released",
969 EventType::Started => "started",
970 EventType::KillRequested => "kill_requested",
971 EventType::TimeoutTriggered => "timeout_triggered",
972 EventType::Finished => "finished",
973 EventType::Failed => "failed",
974 EventType::Cancelled => "cancelled",
975 EventType::Recovered => "recovered",
976 }
977}
978
979fn decode_event_type(value: &str) -> rusqlite::Result<EventType> {
980 match value {
981 "submitted" => Ok(EventType::Submitted),
982 "accepted" => Ok(EventType::Accepted),
983 "planned" => Ok(EventType::Planned),
984 "degraded" => Ok(EventType::Degraded),
985 "resource_reserved" => Ok(EventType::ResourceReserved),
986 "resource_released" => Ok(EventType::ResourceReleased),
987 "started" => Ok(EventType::Started),
988 "kill_requested" => Ok(EventType::KillRequested),
989 "timeout_triggered" => Ok(EventType::TimeoutTriggered),
990 "finished" => Ok(EventType::Finished),
991 "failed" => Ok(EventType::Failed),
992 "cancelled" => Ok(EventType::Cancelled),
993 "recovered" => Ok(EventType::Recovered),
994 other => Err(rusqlite::Error::InvalidColumnType(
995 0,
996 other.into(),
997 rusqlite::types::Type::Text,
998 )),
999 }
1000}
1001
1002fn ts_millis_to_utc(value: i64) -> DateTime<Utc> {
1003 Utc.timestamp_millis_opt(value)
1004 .single()
1005 .unwrap_or_else(Utc::now)
1006}