Skip to main content

execgo_runtime/
repo.rs

1use std::path::{Path, PathBuf};
2
3use chrono::{DateTime, TimeZone, Utc};
4use rusqlite::{params, Connection, OptionalExtension, Row};
5use serde::{de::DeserializeOwned, Serialize};
6use serde_json::Value;
7use uuid::Uuid;
8
9use crate::{
10    error::{AppError, AppResult},
11    types::{
12        ControlContext, ErrorCode, EventRecord, EventType, ExecutionPlan, ExecutionSpec,
13        ResourceLimits, ResourceUsage, RuntimeErrorInfo, SandboxPolicy, SubmitTaskRequest,
14        TaskResourceReservation, TaskStatus,
15    },
16};
17
18#[derive(Debug, Clone)]
19pub struct Repository {
20    db_path: PathBuf,
21}
22
23#[derive(Debug, Clone)]
24pub struct TaskRecord {
25    pub task_id: String,
26    pub handle_id: String,
27    pub status: TaskStatus,
28    pub execution: ExecutionSpec,
29    pub limits: ResourceLimits,
30    pub sandbox: SandboxPolicy,
31    pub metadata: std::collections::BTreeMap<String, String>,
32    pub created_at: DateTime<Utc>,
33    pub updated_at: DateTime<Utc>,
34    pub started_at: Option<DateTime<Utc>>,
35    pub finished_at: Option<DateTime<Utc>>,
36    pub duration_ms: Option<u64>,
37    pub shim_pid: Option<u32>,
38    pub pid: Option<u32>,
39    pub pgid: Option<i32>,
40    pub exit_code: Option<i32>,
41    pub exit_signal: Option<i32>,
42    pub error_code: Option<ErrorCode>,
43    pub error: Option<RuntimeErrorInfo>,
44    pub usage: Option<ResourceUsage>,
45    pub task_dir: PathBuf,
46    pub workspace_dir: PathBuf,
47    pub request_path: PathBuf,
48    pub result_path: PathBuf,
49    pub stdout_path: PathBuf,
50    pub stderr_path: PathBuf,
51    pub script_path: Option<PathBuf>,
52    pub stdout_max_bytes: u64,
53    pub stderr_max_bytes: u64,
54    pub kill_requested: bool,
55    pub kill_requested_at: Option<DateTime<Utc>>,
56    pub timeout_triggered: bool,
57    pub result_json: Option<Value>,
58    pub execution_plan: Option<ExecutionPlan>,
59    pub control_context: Option<ControlContext>,
60    pub reservation: Option<TaskResourceReservation>,
61    pub reserved_at: Option<DateTime<Utc>>,
62    pub released_at: Option<DateTime<Utc>>,
63}
64
65#[derive(Debug, Clone)]
66pub struct NewTaskRecord {
67    pub task_id: String,
68    pub request: SubmitTaskRequest,
69    pub task_dir: PathBuf,
70    pub workspace_dir: PathBuf,
71    pub request_path: PathBuf,
72    pub result_path: PathBuf,
73    pub stdout_path: PathBuf,
74    pub stderr_path: PathBuf,
75    pub script_path: Option<PathBuf>,
76    pub execution_plan: ExecutionPlan,
77    pub control_context: Option<ControlContext>,
78}
79
80#[derive(Debug, Clone)]
81pub struct CompletionUpdate {
82    pub status: TaskStatus,
83    pub finished_at: DateTime<Utc>,
84    pub duration_ms: Option<u64>,
85    pub exit_code: Option<i32>,
86    pub exit_signal: Option<i32>,
87    pub error: Option<RuntimeErrorInfo>,
88    pub usage: Option<ResourceUsage>,
89    pub result_json: Option<Value>,
90}
91
92#[derive(Debug, Clone, Default)]
93pub struct MetricsSnapshot {
94    pub by_status: std::collections::BTreeMap<String, u64>,
95    pub by_error_code: std::collections::BTreeMap<String, u64>,
96    pub finished_durations_ms: Vec<u64>,
97}
98
99impl TaskRecord {
100    pub fn has_active_reservation(&self) -> bool {
101        self.reservation.is_some() && self.released_at.is_none()
102    }
103}
104
105impl Repository {
106    pub fn new(db_path: impl Into<PathBuf>) -> Self {
107        Self {
108            db_path: db_path.into(),
109        }
110    }
111
112    pub fn db_path(&self) -> &Path {
113        &self.db_path
114    }
115
116    pub fn init(&self) -> AppResult<()> {
117        if let Some(parent) = self.db_path.parent() {
118            std::fs::create_dir_all(parent)?;
119        }
120        let conn = self.connect()?;
121        conn.execute_batch(
122            r#"
123            PRAGMA journal_mode = WAL;
124            PRAGMA foreign_keys = ON;
125
126            CREATE TABLE IF NOT EXISTS tasks (
127                task_id TEXT PRIMARY KEY,
128                handle_id TEXT NOT NULL,
129                status TEXT NOT NULL,
130                execution_json TEXT NOT NULL,
131                limits_json TEXT NOT NULL,
132                sandbox_json TEXT NOT NULL,
133                metadata_json TEXT NOT NULL,
134                created_at_ms INTEGER NOT NULL,
135                updated_at_ms INTEGER NOT NULL,
136                started_at_ms INTEGER NULL,
137                finished_at_ms INTEGER NULL,
138                duration_ms INTEGER NULL,
139                shim_pid INTEGER NULL,
140                pid INTEGER NULL,
141                pgid INTEGER NULL,
142                exit_code INTEGER NULL,
143                exit_signal INTEGER NULL,
144                error_code TEXT NULL,
145                error_json TEXT NULL,
146                usage_json TEXT NULL,
147                task_dir TEXT NOT NULL,
148                workspace_dir TEXT NOT NULL,
149                request_path TEXT NOT NULL,
150                result_path TEXT NOT NULL,
151                stdout_path TEXT NOT NULL,
152                stderr_path TEXT NOT NULL,
153                script_path TEXT NULL,
154                stdout_max_bytes INTEGER NOT NULL,
155                stderr_max_bytes INTEGER NOT NULL,
156                kill_requested INTEGER NOT NULL DEFAULT 0,
157                kill_requested_at_ms INTEGER NULL,
158                timeout_triggered INTEGER NOT NULL DEFAULT 0,
159                result_json TEXT NULL,
160                execution_plan_json TEXT NULL,
161                control_context_json TEXT NULL,
162                reservation_json TEXT NULL,
163                reserved_at_ms INTEGER NULL,
164                released_at_ms INTEGER NULL
165            );
166
167            CREATE TABLE IF NOT EXISTS task_events (
168                seq INTEGER PRIMARY KEY AUTOINCREMENT,
169                task_id TEXT NOT NULL,
170                event_type TEXT NOT NULL,
171                timestamp_ms INTEGER NOT NULL,
172                message TEXT NULL,
173                data_json TEXT NULL,
174                FOREIGN KEY(task_id) REFERENCES tasks(task_id) ON DELETE CASCADE
175            );
176
177            CREATE INDEX IF NOT EXISTS idx_tasks_status_created ON tasks(status, created_at_ms);
178            CREATE INDEX IF NOT EXISTS idx_tasks_finished_at ON tasks(finished_at_ms);
179            CREATE INDEX IF NOT EXISTS idx_task_events_task_id_seq ON task_events(task_id, seq);
180            "#,
181        )?;
182        ensure_task_column(&conn, "execution_plan_json", "TEXT NULL")?;
183        ensure_task_column(&conn, "control_context_json", "TEXT NULL")?;
184        ensure_task_column(&conn, "reservation_json", "TEXT NULL")?;
185        ensure_task_column(&conn, "reserved_at_ms", "INTEGER NULL")?;
186        ensure_task_column(&conn, "released_at_ms", "INTEGER NULL")?;
187        Ok(())
188    }
189
190    pub fn insert_task(&self, new_task: &NewTaskRecord) -> AppResult<()> {
191        let now = Utc::now();
192        let conn = self.connect()?;
193        let tx = conn.unchecked_transaction()?;
194        tx.execute(
195            r#"
196            INSERT INTO tasks (
197                task_id, handle_id, status,
198                execution_json, limits_json, sandbox_json, metadata_json,
199                created_at_ms, updated_at_ms,
200                task_dir, workspace_dir, request_path, result_path, stdout_path, stderr_path, script_path,
201                stdout_max_bytes, stderr_max_bytes, execution_plan_json, control_context_json
202            ) VALUES (
203                ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
204            )
205            "#,
206            params![
207                new_task.task_id,
208                new_task.task_id,
209                encode_status(TaskStatus::Accepted),
210                to_json(&new_task.request.execution)?,
211                to_json(&new_task.request.limits)?,
212                to_json(&new_task.request.sandbox)?,
213                to_json(&new_task.request.metadata)?,
214                now.timestamp_millis(),
215                now.timestamp_millis(),
216                new_task.task_dir.to_string_lossy().to_string(),
217                new_task.workspace_dir.to_string_lossy().to_string(),
218                new_task.request_path.to_string_lossy().to_string(),
219                new_task.result_path.to_string_lossy().to_string(),
220                new_task.stdout_path.to_string_lossy().to_string(),
221                new_task.stderr_path.to_string_lossy().to_string(),
222                new_task
223                    .script_path
224                    .as_ref()
225                    .map(|p| p.to_string_lossy().to_string()),
226                i64::try_from(new_task.request.limits.stdout_max_bytes)
227                    .map_err(|_| AppError::InvalidInput("stdout_max_bytes is too large".into()))?,
228                i64::try_from(new_task.request.limits.stderr_max_bytes)
229                    .map_err(|_| AppError::InvalidInput("stderr_max_bytes is too large".into()))?,
230                to_json(&new_task.execution_plan)?,
231                new_task.control_context.as_ref().map(to_json).transpose()?,
232            ],
233        )
234        .map_err(|err| {
235            if let rusqlite::Error::SqliteFailure(code, _) = &err {
236                if code.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_PRIMARYKEY {
237                    return AppError::Conflict(format!("task {} already exists", new_task.task_id));
238                }
239            }
240            AppError::Sqlite(err)
241        })?;
242        insert_event_tx(
243            &tx,
244            &new_task.task_id,
245            EventType::Submitted,
246            Some("task submitted"),
247            None,
248        )?;
249        insert_event_tx(
250            &tx,
251            &new_task.task_id,
252            EventType::Accepted,
253            Some("task accepted"),
254            None,
255        )?;
256        insert_event_tx(
257            &tx,
258            &new_task.task_id,
259            EventType::Planned,
260            Some("execution plan resolved"),
261            Some(&serde_json::to_value(&new_task.execution_plan)?),
262        )?;
263        if new_task.execution_plan.degraded {
264            insert_event_tx(
265                &tx,
266                &new_task.task_id,
267                EventType::Degraded,
268                Some("execution plan degraded"),
269                Some(&serde_json::json!({
270                    "fallback_reasons": &new_task.execution_plan.fallback_reasons,
271                    "effective_sandbox": &new_task.execution_plan.effective_sandbox,
272                })),
273            )?;
274        }
275        tx.commit()?;
276        Ok(())
277    }
278
279    pub fn get_task(&self, task_id: &str) -> AppResult<TaskRecord> {
280        let conn = self.connect()?;
281        let task = conn
282            .query_row(
283                "SELECT * FROM tasks WHERE task_id = ?1",
284                params![task_id],
285                row_to_task_record,
286            )
287            .optional()?;
288        task.ok_or_else(|| AppError::NotFound(task_id.to_string()))
289    }
290
291    pub fn list_events(&self, task_id: &str) -> AppResult<Vec<EventRecord>> {
292        let conn = self.connect()?;
293        let mut stmt = conn.prepare(
294            "SELECT seq, task_id, event_type, timestamp_ms, message, data_json FROM task_events WHERE task_id = ?1 ORDER BY seq ASC",
295        )?;
296        let iter = stmt.query_map(params![task_id], |row| {
297            Ok(EventRecord {
298                seq: row.get(0)?,
299                task_id: row.get(1)?,
300                event_type: decode_event_type(row.get::<_, String>(2)?.as_str())?,
301                timestamp: ts_millis_to_utc(row.get(3)?),
302                message: row.get(4)?,
303                data: opt_json_value(row.get(5)?)?,
304            })
305        })?;
306        let mut events = Vec::new();
307        for item in iter {
308            events.push(item?);
309        }
310        Ok(events)
311    }
312
313    pub fn count_accepted(&self) -> AppResult<u64> {
314        self.count_by_status(TaskStatus::Accepted)
315    }
316
317    pub fn count_running(&self) -> AppResult<u64> {
318        self.count_by_status(TaskStatus::Running)
319    }
320
321    pub fn count_by_status(&self, status: TaskStatus) -> AppResult<u64> {
322        let conn = self.connect()?;
323        let count: i64 = conn.query_row(
324            "SELECT COUNT(*) FROM tasks WHERE status = ?1",
325            params![encode_status(status)],
326            |row| row.get(0),
327        )?;
328        Ok(count.max(0) as u64)
329    }
330
331    pub fn list_accepted(&self, limit: usize) -> AppResult<Vec<TaskRecord>> {
332        let conn = self.connect()?;
333        let mut stmt = conn.prepare(
334            "SELECT * FROM tasks WHERE status = 'accepted' ORDER BY created_at_ms ASC LIMIT ?1",
335        )?;
336        let iter = stmt.query_map(params![limit as i64], row_to_task_record)?;
337        let mut items = Vec::new();
338        for item in iter {
339            items.push(item?);
340        }
341        Ok(items)
342    }
343
344    pub fn list_non_terminal(&self) -> AppResult<Vec<TaskRecord>> {
345        let conn = self.connect()?;
346        let mut stmt = conn.prepare(
347            "SELECT * FROM tasks WHERE status IN ('accepted', 'running') ORDER BY created_at_ms ASC",
348        )?;
349        let iter = stmt.query_map([], row_to_task_record)?;
350        let mut items = Vec::new();
351        for item in iter {
352            items.push(item?);
353        }
354        Ok(items)
355    }
356
357    pub fn list_active_reservations(&self) -> AppResult<Vec<TaskRecord>> {
358        let conn = self.connect()?;
359        let mut stmt = conn.prepare(
360            "SELECT * FROM tasks WHERE reservation_json IS NOT NULL AND released_at_ms IS NULL ORDER BY reserved_at_ms ASC, created_at_ms ASC",
361        )?;
362        let iter = stmt.query_map([], row_to_task_record)?;
363        let mut items = Vec::new();
364        for item in iter {
365            items.push(item?);
366        }
367        Ok(items)
368    }
369
370    pub fn count_accepted_waiting(&self) -> AppResult<u64> {
371        let conn = self.connect()?;
372        let count: i64 = conn.query_row(
373            "SELECT COUNT(*) FROM tasks WHERE status = 'accepted' AND (reservation_json IS NULL OR released_at_ms IS NOT NULL)",
374            [],
375            |row| row.get(0),
376        )?;
377        Ok(count.max(0) as u64)
378    }
379
380    pub fn mark_dispatched(&self, task_id: &str, shim_pid: u32) -> AppResult<()> {
381        let now = Utc::now().timestamp_millis();
382        let conn = self.connect()?;
383        conn.execute(
384            "UPDATE tasks SET status = 'running', shim_pid = ?2, updated_at_ms = ?3 WHERE task_id = ?1 AND status = 'accepted'",
385            params![task_id, i64::from(shim_pid), now],
386        )?;
387        Ok(())
388    }
389
390    pub fn mark_started(
391        &self,
392        task_id: &str,
393        pid: u32,
394        pgid: i32,
395        script_path: Option<&Path>,
396    ) -> AppResult<()> {
397        let now = Utc::now();
398        let conn = self.connect()?;
399        let tx = conn.unchecked_transaction()?;
400        tx.execute(
401            "UPDATE tasks SET status = 'running', pid = ?2, pgid = ?3, started_at_ms = ?4, updated_at_ms = ?4, script_path = COALESCE(?5, script_path) WHERE task_id = ?1",
402            params![
403                task_id,
404                i64::from(pid),
405                pgid,
406                now.timestamp_millis(),
407                script_path.map(|p| p.to_string_lossy().to_string())
408            ],
409        )?;
410        insert_event_tx(&tx, task_id, EventType::Started, Some("task started"), None)?;
411        tx.commit()?;
412        Ok(())
413    }
414
415    pub fn reserve_resources(
416        &self,
417        task_id: &str,
418        reservation: &TaskResourceReservation,
419        message: &str,
420    ) -> AppResult<()> {
421        let now = Utc::now();
422        let conn = self.connect()?;
423        let tx = conn.unchecked_transaction()?;
424        tx.execute(
425            "UPDATE tasks SET reservation_json = ?2, reserved_at_ms = ?3, released_at_ms = NULL, updated_at_ms = ?3 WHERE task_id = ?1",
426            params![
427                task_id,
428                to_json(reservation)?,
429                now.timestamp_millis(),
430            ],
431        )?;
432        insert_event_tx(
433            &tx,
434            task_id,
435            EventType::ResourceReserved,
436            Some(message),
437            Some(&serde_json::to_value(reservation)?),
438        )?;
439        tx.commit()?;
440        Ok(())
441    }
442
443    pub fn release_resources(&self, task_id: &str, message: &str) -> AppResult<()> {
444        let now = Utc::now();
445        let conn = self.connect()?;
446        let tx = conn.unchecked_transaction()?;
447        let reservation_json: Option<String> = tx
448            .query_row(
449                "SELECT reservation_json FROM tasks WHERE task_id = ?1 AND reservation_json IS NOT NULL AND released_at_ms IS NULL",
450                params![task_id],
451                |row| row.get(0),
452            )
453            .optional()?;
454        if let Some(raw) = reservation_json {
455            let reservation: TaskResourceReservation =
456                serde_json::from_str(&raw).map_err(AppError::Json)?;
457            tx.execute(
458                "UPDATE tasks SET released_at_ms = ?2, updated_at_ms = ?2 WHERE task_id = ?1",
459                params![task_id, now.timestamp_millis()],
460            )?;
461            insert_event_tx(
462                &tx,
463                task_id,
464                EventType::ResourceReleased,
465                Some(message),
466                Some(&serde_json::to_value(reservation)?),
467            )?;
468        }
469        tx.commit()?;
470        Ok(())
471    }
472
473    pub fn set_cancel_requested(&self, task_id: &str) -> AppResult<TaskRecord> {
474        let now = Utc::now();
475        let conn = self.connect()?;
476        let tx = conn.unchecked_transaction()?;
477        tx.execute(
478            "UPDATE tasks SET kill_requested = 1, kill_requested_at_ms = ?2, updated_at_ms = ?2 WHERE task_id = ?1",
479            params![task_id, now.timestamp_millis()],
480        )?;
481        insert_event_tx(
482            &tx,
483            task_id,
484            EventType::KillRequested,
485            Some("kill requested"),
486            None,
487        )?;
488        tx.commit()?;
489        self.get_task(task_id)
490    }
491
492    pub fn mark_timeout_triggered(&self, task_id: &str) -> AppResult<()> {
493        let now = Utc::now();
494        let conn = self.connect()?;
495        let tx = conn.unchecked_transaction()?;
496        tx.execute(
497            "UPDATE tasks SET timeout_triggered = 1, updated_at_ms = ?2 WHERE task_id = ?1",
498            params![task_id, now.timestamp_millis()],
499        )?;
500        insert_event_tx(
501            &tx,
502            task_id,
503            EventType::TimeoutTriggered,
504            Some("timeout triggered"),
505            None,
506        )?;
507        tx.commit()?;
508        Ok(())
509    }
510
511    pub fn cancel_accepted_task(&self, task_id: &str, error: RuntimeErrorInfo) -> AppResult<()> {
512        let now = Utc::now();
513        let conn = self.connect()?;
514        let tx = conn.unchecked_transaction()?;
515        let active_reservation_json: Option<String> = tx
516            .query_row(
517                "SELECT reservation_json FROM tasks WHERE task_id = ?1 AND reservation_json IS NOT NULL AND released_at_ms IS NULL",
518                params![task_id],
519                |row| row.get(0),
520            )
521            .optional()?;
522        tx.execute(
523            r#"
524            UPDATE tasks
525            SET status = 'cancelled',
526                updated_at_ms = ?2,
527                finished_at_ms = ?2,
528                released_at_ms = CASE
529                    WHEN released_at_ms IS NULL AND reservation_json IS NOT NULL THEN ?2
530                    ELSE released_at_ms
531                END,
532                error_code = ?3,
533                error_json = ?4,
534                duration_ms = 0,
535                result_json = ?5
536            WHERE task_id = ?1 AND status = 'accepted'
537            "#,
538            params![
539                task_id,
540                now.timestamp_millis(),
541                encode_error_code(error.code),
542                to_json(&error)?,
543                to_json(&serde_json::json!({
544                        "task_id": task_id,
545                        "handle_id": task_id,
546                        "status": TaskStatus::Cancelled,
547                        "finished_at": now,
548                    "error": error,
549                }))?,
550            ],
551        )?;
552        if let Some(raw) = active_reservation_json {
553            let reservation: TaskResourceReservation =
554                serde_json::from_str(&raw).map_err(AppError::Json)?;
555            insert_event_tx(
556                &tx,
557                task_id,
558                EventType::ResourceReleased,
559                Some("task resources released"),
560                Some(&serde_json::to_value(reservation)?),
561            )?;
562        }
563        insert_event_tx(
564            &tx,
565            task_id,
566            EventType::Cancelled,
567            Some("task cancelled"),
568            None,
569        )?;
570        tx.commit()?;
571        Ok(())
572    }
573
574    pub fn complete_task(&self, task_id: &str, update: &CompletionUpdate) -> AppResult<()> {
575        let conn = self.connect()?;
576        let tx = conn.unchecked_transaction()?;
577        let active_reservation_json: Option<String> = tx
578            .query_row(
579                "SELECT reservation_json FROM tasks WHERE task_id = ?1 AND reservation_json IS NOT NULL AND released_at_ms IS NULL",
580                params![task_id],
581                |row| row.get(0),
582            )
583            .optional()?;
584        tx.execute(
585            r#"
586            UPDATE tasks
587            SET status = ?2,
588                updated_at_ms = ?3,
589                finished_at_ms = ?3,
590                released_at_ms = CASE
591                    WHEN released_at_ms IS NULL AND reservation_json IS NOT NULL THEN ?3
592                    ELSE released_at_ms
593                END,
594                duration_ms = ?4,
595                exit_code = ?5,
596                exit_signal = ?6,
597                error_code = ?7,
598                error_json = ?8,
599                usage_json = ?9,
600                result_json = ?10
601            WHERE task_id = ?1
602            "#,
603            params![
604                task_id,
605                encode_status(update.status.clone()),
606                update.finished_at.timestamp_millis(),
607                update
608                    .duration_ms
609                    .map(i64::try_from)
610                    .transpose()
611                    .map_err(|_| {
612                        AppError::InvalidInput("duration_ms is too large to persist".into())
613                    })?,
614                update.exit_code,
615                update.exit_signal,
616                update.error.as_ref().map(|e| encode_error_code(e.code)),
617                update.error.as_ref().map(to_json).transpose()?,
618                update.usage.as_ref().map(to_json).transpose()?,
619                update.result_json.as_ref().map(to_json).transpose()?,
620            ],
621        )?;
622
623        if let Some(raw) = active_reservation_json {
624            let reservation: TaskResourceReservation =
625                serde_json::from_str(&raw).map_err(AppError::Json)?;
626            insert_event_tx(
627                &tx,
628                task_id,
629                EventType::ResourceReleased,
630                Some("task resources released"),
631                Some(&serde_json::to_value(reservation)?),
632            )?;
633        }
634
635        let event_type = match update.status {
636            TaskStatus::Success => EventType::Finished,
637            TaskStatus::Failed => EventType::Failed,
638            TaskStatus::Cancelled => EventType::Cancelled,
639            TaskStatus::Accepted | TaskStatus::Running => EventType::Finished,
640        };
641        let message = match update.status {
642            TaskStatus::Success => Some("task finished"),
643            TaskStatus::Failed => Some("task failed"),
644            TaskStatus::Cancelled => Some("task cancelled"),
645            TaskStatus::Accepted | TaskStatus::Running => Some("task finished"),
646        };
647        insert_event_tx(&tx, task_id, event_type, message, None)?;
648        tx.commit()?;
649        Ok(())
650    }
651
652    pub fn mark_recovered(&self, task_id: &str) -> AppResult<()> {
653        let now = Utc::now();
654        let conn = self.connect()?;
655        let tx = conn.unchecked_transaction()?;
656        tx.execute(
657            "UPDATE tasks SET updated_at_ms = ?2 WHERE task_id = ?1 AND status = 'running'",
658            params![task_id, now.timestamp_millis()],
659        )?;
660        insert_event_tx(
661            &tx,
662            task_id,
663            EventType::Recovered,
664            Some("task recovered"),
665            None,
666        )?;
667        tx.commit()?;
668        Ok(())
669    }
670
671    pub fn mark_recovery_lost(&self, task_id: &str) -> AppResult<()> {
672        let update = CompletionUpdate {
673            status: TaskStatus::Failed,
674            finished_at: Utc::now(),
675            duration_ms: Some(0),
676            exit_code: None,
677            exit_signal: None,
678            error: Some(RuntimeErrorInfo {
679                code: ErrorCode::Internal,
680                message: "recovery_lost".into(),
681                details: None,
682            }),
683            usage: None,
684            result_json: None,
685        };
686        self.complete_task(task_id, &update)
687    }
688
689    pub fn is_cancel_requested(&self, task_id: &str) -> AppResult<bool> {
690        let conn = self.connect()?;
691        let flag: i64 = conn.query_row(
692            "SELECT kill_requested FROM tasks WHERE task_id = ?1",
693            params![task_id],
694            |row| row.get(0),
695        )?;
696        Ok(flag != 0)
697    }
698
699    pub fn list_gc_candidates(&self, finished_before: DateTime<Utc>) -> AppResult<Vec<TaskRecord>> {
700        let conn = self.connect()?;
701        let mut stmt = conn.prepare(
702            "SELECT * FROM tasks WHERE status IN ('success', 'failed', 'cancelled') AND finished_at_ms IS NOT NULL AND finished_at_ms <= ?1 ORDER BY finished_at_ms ASC",
703        )?;
704        let iter = stmt.query_map(
705            params![finished_before.timestamp_millis()],
706            row_to_task_record,
707        )?;
708        let mut items = Vec::new();
709        for item in iter {
710            items.push(item?);
711        }
712        Ok(items)
713    }
714
715    pub fn delete_task(&self, task_id: &str) -> AppResult<()> {
716        let conn = self.connect()?;
717        conn.execute("DELETE FROM tasks WHERE task_id = ?1", params![task_id])?;
718        Ok(())
719    }
720
721    pub fn metrics_snapshot(&self) -> AppResult<MetricsSnapshot> {
722        let conn = self.connect()?;
723        let mut snapshot = MetricsSnapshot::default();
724
725        let mut status_stmt = conn.prepare("SELECT status, COUNT(*) FROM tasks GROUP BY status")?;
726        let status_rows = status_stmt.query_map([], |row| {
727            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
728        })?;
729        for item in status_rows {
730            let (status, count) = item?;
731            snapshot.by_status.insert(status, count.max(0) as u64);
732        }
733
734        let mut err_stmt = conn.prepare(
735            "SELECT error_code, COUNT(*) FROM tasks WHERE error_code IS NOT NULL GROUP BY error_code",
736        )?;
737        let err_rows = err_stmt.query_map([], |row| {
738            Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
739        })?;
740        for item in err_rows {
741            let (code, count) = item?;
742            snapshot.by_error_code.insert(code, count.max(0) as u64);
743        }
744
745        let mut duration_stmt =
746            conn.prepare("SELECT duration_ms FROM tasks WHERE duration_ms IS NOT NULL")?;
747        let duration_rows = duration_stmt.query_map([], |row| row.get::<_, i64>(0))?;
748        for item in duration_rows {
749            snapshot.finished_durations_ms.push(item?.max(0) as u64);
750        }
751
752        Ok(snapshot)
753    }
754
755    fn connect(&self) -> AppResult<Connection> {
756        let conn = Connection::open(&self.db_path)?;
757        conn.busy_timeout(std::time::Duration::from_secs(5))?;
758        conn.pragma_update(None, "foreign_keys", "ON")?;
759        conn.pragma_update(None, "journal_mode", "WAL")?;
760        Ok(conn)
761    }
762}
763
764pub fn generate_task_id() -> String {
765    Uuid::new_v4().to_string()
766}
767
768fn ensure_task_column(conn: &Connection, name: &str, definition: &str) -> AppResult<()> {
769    let mut stmt = conn.prepare("PRAGMA table_info(tasks)")?;
770    let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
771    for column in columns {
772        if column? == name {
773            return Ok(());
774        }
775    }
776    conn.execute(
777        &format!("ALTER TABLE tasks ADD COLUMN {name} {definition}"),
778        [],
779    )?;
780    Ok(())
781}
782
783fn row_to_task_record(row: &Row<'_>) -> rusqlite::Result<TaskRecord> {
784    Ok(TaskRecord {
785        task_id: row.get("task_id")?,
786        handle_id: row.get("handle_id")?,
787        status: decode_status(row.get::<_, String>("status")?.as_str())?,
788        execution: from_json(row.get("execution_json")?)?,
789        limits: from_json(row.get("limits_json")?)?,
790        sandbox: from_json(row.get("sandbox_json")?)?,
791        metadata: from_json(row.get("metadata_json")?)?,
792        created_at: ts_millis_to_utc(row.get("created_at_ms")?),
793        updated_at: ts_millis_to_utc(row.get("updated_at_ms")?),
794        started_at: row
795            .get::<_, Option<i64>>("started_at_ms")?
796            .map(ts_millis_to_utc),
797        finished_at: row
798            .get::<_, Option<i64>>("finished_at_ms")?
799            .map(ts_millis_to_utc),
800        duration_ms: row
801            .get::<_, Option<i64>>("duration_ms")?
802            .map(|value| value.max(0) as u64),
803        shim_pid: row
804            .get::<_, Option<i64>>("shim_pid")?
805            .map(|value| value as u32),
806        pid: row.get::<_, Option<i64>>("pid")?.map(|value| value as u32),
807        pgid: row.get("pgid")?,
808        exit_code: row.get("exit_code")?,
809        exit_signal: row.get("exit_signal")?,
810        error_code: row
811            .get::<_, Option<String>>("error_code")?
812            .map(|value| decode_error_code(value.as_str()))
813            .transpose()?,
814        error: row
815            .get::<_, Option<String>>("error_json")?
816            .map(from_json)
817            .transpose()?,
818        usage: row
819            .get::<_, Option<String>>("usage_json")?
820            .map(from_json)
821            .transpose()?,
822        task_dir: PathBuf::from(row.get::<_, String>("task_dir")?),
823        workspace_dir: PathBuf::from(row.get::<_, String>("workspace_dir")?),
824        request_path: PathBuf::from(row.get::<_, String>("request_path")?),
825        result_path: PathBuf::from(row.get::<_, String>("result_path")?),
826        stdout_path: PathBuf::from(row.get::<_, String>("stdout_path")?),
827        stderr_path: PathBuf::from(row.get::<_, String>("stderr_path")?),
828        script_path: row
829            .get::<_, Option<String>>("script_path")?
830            .map(PathBuf::from),
831        stdout_max_bytes: row.get::<_, i64>("stdout_max_bytes")?.max(0) as u64,
832        stderr_max_bytes: row.get::<_, i64>("stderr_max_bytes")?.max(0) as u64,
833        kill_requested: row.get::<_, i64>("kill_requested")? != 0,
834        kill_requested_at: row
835            .get::<_, Option<i64>>("kill_requested_at_ms")?
836            .map(ts_millis_to_utc),
837        timeout_triggered: row.get::<_, i64>("timeout_triggered")? != 0,
838        result_json: row
839            .get::<_, Option<String>>("result_json")?
840            .map(from_json)
841            .transpose()?,
842        execution_plan: row
843            .get::<_, Option<String>>("execution_plan_json")?
844            .map(from_json)
845            .transpose()?,
846        control_context: row
847            .get::<_, Option<String>>("control_context_json")?
848            .map(from_json)
849            .transpose()?,
850        reservation: row
851            .get::<_, Option<String>>("reservation_json")?
852            .map(from_json)
853            .transpose()?,
854        reserved_at: row
855            .get::<_, Option<i64>>("reserved_at_ms")?
856            .map(ts_millis_to_utc),
857        released_at: row
858            .get::<_, Option<i64>>("released_at_ms")?
859            .map(ts_millis_to_utc),
860    })
861}
862
863fn insert_event_tx(
864    tx: &rusqlite::Transaction<'_>,
865    task_id: &str,
866    event_type: EventType,
867    message: Option<&str>,
868    data: Option<&Value>,
869) -> AppResult<()> {
870    tx.execute(
871        "INSERT INTO task_events (task_id, event_type, timestamp_ms, message, data_json) VALUES (?1, ?2, ?3, ?4, ?5)",
872        params![
873            task_id,
874            encode_event_type(event_type),
875            Utc::now().timestamp_millis(),
876            message,
877            data.map(to_json).transpose()?,
878        ],
879    )?;
880    Ok(())
881}
882
883fn to_json<T: Serialize>(value: &T) -> AppResult<String> {
884    Ok(serde_json::to_string(value)?)
885}
886
887fn from_json<T: DeserializeOwned>(raw: String) -> rusqlite::Result<T> {
888    serde_json::from_str(&raw).map_err(|err| {
889        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err))
890    })
891}
892
893fn opt_json_value(raw: Option<String>) -> rusqlite::Result<Option<Value>> {
894    raw.map(from_json).transpose()
895}
896
897fn encode_status(status: TaskStatus) -> &'static str {
898    match status {
899        TaskStatus::Accepted => "accepted",
900        TaskStatus::Running => "running",
901        TaskStatus::Success => "success",
902        TaskStatus::Failed => "failed",
903        TaskStatus::Cancelled => "cancelled",
904    }
905}
906
907fn decode_status(value: &str) -> rusqlite::Result<TaskStatus> {
908    match value {
909        "accepted" => Ok(TaskStatus::Accepted),
910        "running" => Ok(TaskStatus::Running),
911        "success" => Ok(TaskStatus::Success),
912        "failed" => Ok(TaskStatus::Failed),
913        "cancelled" => Ok(TaskStatus::Cancelled),
914        other => Err(rusqlite::Error::InvalidColumnType(
915            0,
916            other.into(),
917            rusqlite::types::Type::Text,
918        )),
919    }
920}
921
922fn encode_error_code(code: ErrorCode) -> &'static str {
923    match code {
924        ErrorCode::InvalidInput => "invalid_input",
925        ErrorCode::LaunchFailed => "launch_failed",
926        ErrorCode::Timeout => "timeout",
927        ErrorCode::Cancelled => "cancelled",
928        ErrorCode::MemoryLimitExceeded => "memory_limit_exceeded",
929        ErrorCode::CpuLimitExceeded => "cpu_limit_exceeded",
930        ErrorCode::ResourceLimitExceeded => "resource_limit_exceeded",
931        ErrorCode::SandboxSetupFailed => "sandbox_setup_failed",
932        ErrorCode::ExitNonZero => "exit_nonzero",
933        ErrorCode::UnsupportedCapability => "unsupported_capability",
934        ErrorCode::InsufficientResources => "insufficient_resources",
935        ErrorCode::Internal => "internal",
936    }
937}
938
939fn decode_error_code(value: &str) -> rusqlite::Result<ErrorCode> {
940    match value {
941        "invalid_input" => Ok(ErrorCode::InvalidInput),
942        "launch_failed" => Ok(ErrorCode::LaunchFailed),
943        "timeout" => Ok(ErrorCode::Timeout),
944        "cancelled" => Ok(ErrorCode::Cancelled),
945        "memory_limit_exceeded" => Ok(ErrorCode::MemoryLimitExceeded),
946        "cpu_limit_exceeded" => Ok(ErrorCode::CpuLimitExceeded),
947        "resource_limit_exceeded" => Ok(ErrorCode::ResourceLimitExceeded),
948        "sandbox_setup_failed" => Ok(ErrorCode::SandboxSetupFailed),
949        "exit_nonzero" => Ok(ErrorCode::ExitNonZero),
950        "unsupported_capability" => Ok(ErrorCode::UnsupportedCapability),
951        "insufficient_resources" => Ok(ErrorCode::InsufficientResources),
952        "internal" => Ok(ErrorCode::Internal),
953        other => Err(rusqlite::Error::InvalidColumnType(
954            0,
955            other.into(),
956            rusqlite::types::Type::Text,
957        )),
958    }
959}
960
961fn encode_event_type(event_type: EventType) -> &'static str {
962    match event_type {
963        EventType::Submitted => "submitted",
964        EventType::Accepted => "accepted",
965        EventType::Planned => "planned",
966        EventType::Degraded => "degraded",
967        EventType::ResourceReserved => "resource_reserved",
968        EventType::ResourceReleased => "resource_released",
969        EventType::Started => "started",
970        EventType::KillRequested => "kill_requested",
971        EventType::TimeoutTriggered => "timeout_triggered",
972        EventType::Finished => "finished",
973        EventType::Failed => "failed",
974        EventType::Cancelled => "cancelled",
975        EventType::Recovered => "recovered",
976    }
977}
978
979fn decode_event_type(value: &str) -> rusqlite::Result<EventType> {
980    match value {
981        "submitted" => Ok(EventType::Submitted),
982        "accepted" => Ok(EventType::Accepted),
983        "planned" => Ok(EventType::Planned),
984        "degraded" => Ok(EventType::Degraded),
985        "resource_reserved" => Ok(EventType::ResourceReserved),
986        "resource_released" => Ok(EventType::ResourceReleased),
987        "started" => Ok(EventType::Started),
988        "kill_requested" => Ok(EventType::KillRequested),
989        "timeout_triggered" => Ok(EventType::TimeoutTriggered),
990        "finished" => Ok(EventType::Finished),
991        "failed" => Ok(EventType::Failed),
992        "cancelled" => Ok(EventType::Cancelled),
993        "recovered" => Ok(EventType::Recovered),
994        other => Err(rusqlite::Error::InvalidColumnType(
995            0,
996            other.into(),
997            rusqlite::types::Type::Text,
998        )),
999    }
1000}
1001
1002fn ts_millis_to_utc(value: i64) -> DateTime<Utc> {
1003    Utc.timestamp_millis_opt(value)
1004        .single()
1005        .unwrap_or_else(Utc::now)
1006}