Skip to main content

agent_orchestrator/task_repository/
mod.rs

1#![cfg_attr(
2    not(test),
3    deny(clippy::panic, clippy::unwrap_used, clippy::expect_used)
4)]
5
6mod command_run;
7pub(crate) mod items;
8mod queries;
9mod state;
10mod trait_def;
11mod types;
12mod write_ops;
13
14#[cfg(test)]
15mod tests;
16
17pub use command_run::NewCommandRun;
18pub use trait_def::{
19    CommandRunRepository, EventRepository, TaskGraphRepository, TaskItemMutRepository,
20    TaskItemQueryRepository, TaskQueryRepository, TaskRepository, TaskStateRepository,
21};
22pub use types::{
23    DbEventRecord, NewTaskGraphRun, NewTaskGraphSnapshot, TaskLogRunRow, TaskRepositoryConn,
24    TaskRepositorySource, TaskRuntimeRow,
25};
26pub use write_ops::{CompletedRunRecord, InflightRunRecord};
27
28use crate::async_database::{AsyncDatabase, flatten_err};
29use crate::dto::{CommandRunDto, EventDto, TaskGraphDebugBundle, TaskItemDto};
30use anyhow::Result;
31use std::sync::Arc;
32
33/// Tuple returned by detail queries: items, runs, events, and graph bundles.
34pub type TaskDetailRows = (
35    Vec<TaskItemDto>,
36    Vec<CommandRunDto>,
37    Vec<EventDto>,
38    Vec<TaskGraphDebugBundle>,
39);
40
41/// Synchronous SQLite-backed implementation of [`TaskRepository`].
42pub struct SqliteTaskRepository {
43    source: types::TaskRepositorySource,
44}
45
46impl SqliteTaskRepository {
47    /// Creates a repository backed by the given connection source.
48    pub fn new<T>(source: T) -> Self
49    where
50        T: Into<types::TaskRepositorySource>,
51    {
52        Self {
53            source: source.into(),
54        }
55    }
56
57    fn connection(&self) -> Result<types::TaskRepositoryConn> {
58        self.source.connection()
59    }
60}
61
62// ── TaskQueryRepository ─────────────────────────────────────────────
63
64impl TaskQueryRepository for SqliteTaskRepository {
65    fn resolve_task_id(&self, task_id_or_prefix: &str) -> Result<String> {
66        let conn = self.connection()?;
67        queries::resolve_task_id(&conn, task_id_or_prefix)
68    }
69
70    fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
71        let conn = self.connection()?;
72        queries::load_task_summary(&conn, task_id)
73    }
74
75    fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
76        let conn = self.connection()?;
77        queries::load_task_detail_rows(&conn, task_id)
78    }
79
80    fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
81        let conn = self.connection()?;
82        queries::load_task_item_counts(&conn, task_id)
83    }
84
85    fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
86        let conn = self.connection()?;
87        queries::list_task_ids_ordered_by_created_desc(&conn)
88    }
89
90    fn find_latest_resumable_task_id(&self, include_pending: bool) -> Result<Option<String>> {
91        let conn = self.connection()?;
92        queries::find_latest_resumable_task_id(&conn, include_pending)
93    }
94
95    fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
96        let conn = self.connection()?;
97        queries::load_task_runtime_row(&conn, task_id)
98    }
99
100    fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
101        let conn = self.connection()?;
102        queries::load_task_status(&conn, task_id)
103    }
104
105    fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
106        let conn = self.connection()?;
107        queries::load_task_name(&conn, task_id)
108    }
109}
110
111// ── TaskItemQueryRepository ──���──────────────────────────────────────
112
113impl TaskItemQueryRepository for SqliteTaskRepository {
114    fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
115        let conn = self.connection()?;
116        queries::first_task_item_id(&conn, task_id)
117    }
118
119    fn list_task_items_for_cycle(&self, task_id: &str) -> Result<Vec<crate::dto::TaskItemRow>> {
120        let conn = self.connection()?;
121        queries::list_task_items_for_cycle(&conn, task_id)
122    }
123
124    fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
125        let conn = self.connection()?;
126        queries::count_unresolved_items(&conn, task_id)
127    }
128
129    fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
130        let conn = self.connection()?;
131        queries::count_stale_pending_items(&conn, task_id)
132    }
133
134    fn count_recent_heartbeats_for_items(
135        &self,
136        task_id: &str,
137        item_ids: &[String],
138        cutoff_ts: &str,
139    ) -> Result<i64> {
140        let conn = self.connection()?;
141        write_ops::count_recent_heartbeats_for_items(&conn, task_id, item_ids, cutoff_ts)
142    }
143}
144
145// ── TaskStateRepository ──────────────────────────────────��──────────
146
147impl TaskStateRepository for SqliteTaskRepository {
148    fn set_task_status(&self, task_id: &str, status: &str, set_completed: bool) -> Result<()> {
149        let conn = self.connection()?;
150        state::set_task_status(&conn, task_id, status, set_completed)
151    }
152
153    fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
154        let conn = self.connection()?;
155        state::prepare_task_for_start_batch(&conn, task_id)
156    }
157
158    fn update_task_cycle_state(
159        &self,
160        task_id: &str,
161        current_cycle: u32,
162        init_done: bool,
163    ) -> Result<()> {
164        let conn = self.connection()?;
165        state::update_task_cycle_state(&conn, task_id, current_cycle, init_done)
166    }
167
168    fn update_task_pipeline_vars(&self, task_id: &str, pipeline_vars_json: &str) -> Result<()> {
169        let conn = self.connection()?;
170        write_ops::update_task_pipeline_vars(&conn, task_id, pipeline_vars_json)
171    }
172
173    fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
174        let conn = self.connection()?;
175        items::delete_task_and_collect_log_paths(&conn, task_id)
176    }
177}
178
179// ── TaskItemMutRepository ───────────────────────────────────────────
180
181impl TaskItemMutRepository for SqliteTaskRepository {
182    fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
183        let conn = self.connection()?;
184        items::mark_task_item_running(&conn, task_item_id)
185    }
186
187    fn set_task_item_terminal_status(&self, task_item_id: &str, status: &str) -> Result<()> {
188        let conn = self.connection()?;
189        items::set_task_item_terminal_status(&conn, task_item_id, status)
190    }
191
192    fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
193        let conn = self.connection()?;
194        items::update_task_item_status(&conn, task_item_id, status)
195    }
196
197    fn update_task_item_pipeline_vars(
198        &self,
199        task_item_id: &str,
200        pipeline_vars_json: &str,
201    ) -> Result<()> {
202        let conn = self.connection()?;
203        items::update_task_item_pipeline_vars(&conn, task_item_id, pipeline_vars_json)
204    }
205
206    fn update_task_item_tickets(
207        &self,
208        task_item_id: &str,
209        ticket_files_json: &str,
210        ticket_content_json: &str,
211    ) -> Result<()> {
212        let conn = self.connection()?;
213        write_ops::update_task_item_tickets(
214            &conn,
215            task_item_id,
216            ticket_files_json,
217            ticket_content_json,
218        )
219    }
220}
221
222// ─�� CommandRunRepository ────────────────────────────────────────────
223
224impl CommandRunRepository for SqliteTaskRepository {
225    fn insert_command_run(&self, run: &NewCommandRun) -> Result<()> {
226        let conn = self.connection()?;
227        items::insert_command_run(&conn, run)
228    }
229
230    fn update_command_run(&self, run: &NewCommandRun) -> Result<()> {
231        let conn = self.connection()?;
232        write_ops::update_command_run(&conn, run)
233    }
234
235    fn update_command_run_with_events(
236        &self,
237        run: &NewCommandRun,
238        events: &[DbEventRecord],
239    ) -> Result<()> {
240        let conn = self.connection()?;
241        write_ops::update_command_run_with_events(&conn, run, events)
242    }
243
244    fn persist_phase_result_with_events(
245        &self,
246        run: &NewCommandRun,
247        events: &[DbEventRecord],
248    ) -> Result<()> {
249        let conn = self.connection()?;
250        write_ops::persist_phase_result_with_events(&conn, run, events)
251    }
252
253    fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
254        let conn = self.connection()?;
255        write_ops::update_command_run_pid(&conn, run_id, pid)
256    }
257
258    fn list_task_log_runs(&self, task_id: &str, limit: usize) -> Result<Vec<TaskLogRunRow>> {
259        let conn = self.connection()?;
260        queries::list_task_log_runs(&conn, task_id, limit)
261    }
262
263    fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
264        let conn = self.connection()?;
265        write_ops::find_active_child_pids(&conn, task_id)
266    }
267
268    fn find_inflight_command_runs_for_task(&self, task_id: &str) -> Result<Vec<InflightRunRecord>> {
269        let conn = self.connection()?;
270        write_ops::find_inflight_command_runs_for_task(&conn, task_id)
271    }
272
273    fn find_completed_runs_for_pending_items(
274        &self,
275        task_id: &str,
276    ) -> Result<Vec<write_ops::CompletedRunRecord>> {
277        let conn = self.connection()?;
278        write_ops::find_completed_runs_for_pending_items(&conn, task_id)
279    }
280}
281
282// ── EventRepository ─────────────────────────────────────��───────────
283
284impl EventRepository for SqliteTaskRepository {
285    fn insert_event(&self, event: &DbEventRecord) -> Result<()> {
286        let conn = self.connection()?;
287        write_ops::insert_event(&conn, event)
288    }
289}
290
291// ── TaskGraphRepository ─────��───────────────────────────────────────
292
293impl TaskGraphRepository for SqliteTaskRepository {
294    fn insert_task_graph_run(&self, run: &NewTaskGraphRun) -> Result<()> {
295        let conn = self.connection()?;
296        queries::insert_task_graph_run(&conn, run)
297    }
298
299    fn update_task_graph_run_status(&self, graph_run_id: &str, status: &str) -> Result<()> {
300        let conn = self.connection()?;
301        queries::update_task_graph_run_status(&conn, graph_run_id, status)
302    }
303
304    fn insert_task_graph_snapshot(&self, snapshot: &NewTaskGraphSnapshot) -> Result<()> {
305        let conn = self.connection()?;
306        queries::insert_task_graph_snapshot(&conn, snapshot)
307    }
308
309    fn load_task_graph_debug_bundles(
310        &self,
311        task_id: &str,
312    ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
313        let conn = self.connection()?;
314        queries::load_task_graph_debug_bundles(&conn, task_id)
315    }
316}
317
318/// Async wrapper around [`SqliteTaskRepository`] built on [`AsyncDatabase`].
319pub struct AsyncSqliteTaskRepository {
320    async_db: Arc<AsyncDatabase>,
321}
322
323impl AsyncSqliteTaskRepository {
324    /// Creates a new async repository wrapper.
325    pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
326        Self { async_db }
327    }
328
329    // ── Read operations (use reader) ──
330
331    /// Resolves a full task identifier from an ID prefix.
332    pub async fn resolve_task_id(&self, prefix: &str) -> Result<String> {
333        let prefix = prefix.to_owned();
334        self.async_db
335            .reader()
336            .call(move |conn| {
337                queries::resolve_task_id(conn, &prefix)
338                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
339            })
340            .await
341            .map_err(flatten_err)
342    }
343
344    /// Loads a summary row for a task.
345    pub async fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
346        let task_id = task_id.to_owned();
347        self.async_db
348            .reader()
349            .call(move |conn| {
350                queries::load_task_summary(conn, &task_id)
351                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
352            })
353            .await
354            .map_err(flatten_err)
355    }
356
357    /// Loads the full detail bundle for a task.
358    pub async fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
359        let task_id = task_id.to_owned();
360        self.async_db
361            .reader()
362            .call(move |conn| {
363                queries::load_task_detail_rows(conn, &task_id)
364                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
365            })
366            .await
367            .map_err(flatten_err)
368    }
369
370    /// Loads `(total, resolved, unresolved)` item counts for a task.
371    pub async fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
372        let task_id = task_id.to_owned();
373        self.async_db
374            .reader()
375            .call(move |conn| {
376                queries::load_task_item_counts(conn, &task_id)
377                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
378            })
379            .await
380            .map_err(flatten_err)
381    }
382
383    /// Lists task identifiers ordered by creation time descending.
384    pub async fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
385        self.async_db
386            .reader()
387            .call(move |conn| {
388                queries::list_task_ids_ordered_by_created_desc(conn)
389                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
390            })
391            .await
392            .map_err(flatten_err)
393    }
394
395    /// Returns the latest resumable task, optionally including pending tasks.
396    pub async fn find_latest_resumable_task_id(
397        &self,
398        include_pending: bool,
399    ) -> Result<Option<String>> {
400        self.async_db
401            .reader()
402            .call(move |conn| {
403                queries::find_latest_resumable_task_id(conn, include_pending)
404                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
405            })
406            .await
407            .map_err(flatten_err)
408    }
409
410    /// Loads execution state required to resume a task.
411    pub async fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
412        let task_id = task_id.to_owned();
413        self.async_db
414            .reader()
415            .call(move |conn| {
416                queries::load_task_runtime_row(conn, &task_id)
417                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
418            })
419            .await
420            .map_err(flatten_err)
421    }
422
423    /// Returns the first task-item identifier for a task, if present.
424    pub async fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
425        let task_id = task_id.to_owned();
426        self.async_db
427            .reader()
428            .call(move |conn| {
429                queries::first_task_item_id(conn, &task_id)
430                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
431            })
432            .await
433            .map_err(flatten_err)
434    }
435
436    /// Counts unresolved task items.
437    pub async fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
438        let task_id = task_id.to_owned();
439        self.async_db
440            .reader()
441            .call(move |conn| {
442                queries::count_unresolved_items(conn, &task_id)
443                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
444            })
445            .await
446            .map_err(flatten_err)
447    }
448
449    /// Lists task items participating in the current cycle.
450    pub async fn list_task_items_for_cycle(
451        &self,
452        task_id: &str,
453    ) -> Result<Vec<crate::dto::TaskItemRow>> {
454        let task_id = task_id.to_owned();
455        self.async_db
456            .reader()
457            .call(move |conn| {
458                queries::list_task_items_for_cycle(conn, &task_id)
459                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
460            })
461            .await
462            .map_err(flatten_err)
463    }
464
465    /// Loads the current task status string.
466    pub async fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
467        let task_id = task_id.to_owned();
468        self.async_db
469            .reader()
470            .call(move |conn| {
471                queries::load_task_status(conn, &task_id)
472                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
473            })
474            .await
475            .map_err(flatten_err)
476    }
477
478    /// Loads the human-readable task name.
479    pub async fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
480        let task_id = task_id.to_owned();
481        self.async_db
482            .reader()
483            .call(move |conn| {
484                queries::load_task_name(conn, &task_id)
485                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
486            })
487            .await
488            .map_err(flatten_err)
489    }
490
491    /// Lists recent command runs used for log inspection.
492    pub async fn list_task_log_runs(
493        &self,
494        task_id: &str,
495        limit: usize,
496    ) -> Result<Vec<TaskLogRunRow>> {
497        let task_id = task_id.to_owned();
498        self.async_db
499            .reader()
500            .call(move |conn| {
501                queries::list_task_log_runs(conn, &task_id, limit)
502                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
503            })
504            .await
505            .map_err(flatten_err)
506    }
507
508    /// Loads graph-planning debug bundles for a task.
509    pub async fn load_task_graph_debug_bundles(
510        &self,
511        task_id: &str,
512    ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
513        let task_id = task_id.to_owned();
514        self.async_db
515            .reader()
516            .call(move |conn| {
517                queries::load_task_graph_debug_bundles(conn, &task_id)
518                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
519            })
520            .await
521            .map_err(flatten_err)
522    }
523
524    // ── Write operations (use writer) ──
525
526    /// Updates a task status and optionally marks completion.
527    pub async fn set_task_status(
528        &self,
529        task_id: &str,
530        status: &str,
531        set_completed: bool,
532    ) -> Result<()> {
533        let task_id = task_id.to_owned();
534        let status = status.to_owned();
535        self.async_db
536            .writer()
537            .call(move |conn| {
538                state::set_task_status(conn, &task_id, &status, set_completed)
539                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
540            })
541            .await
542            .map_err(flatten_err)
543    }
544
545    /// Resets unresolved items back to pending without changing the task status.
546    /// Called before enqueuing a task so the worker can re-process them.
547    pub async fn reset_unresolved_items(&self, task_id: &str) -> Result<()> {
548        let task_id = task_id.to_owned();
549        self.async_db
550            .writer()
551            .call(move |conn| {
552                state::reset_unresolved_items(conn, &task_id)
553                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
554            })
555            .await
556            .map_err(flatten_err)
557    }
558
559    /// Resets a task into a fresh batch-start state.
560    pub async fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
561        let task_id = task_id.to_owned();
562        self.async_db
563            .writer()
564            .call(move |conn| {
565                state::prepare_task_for_start_batch(conn, &task_id)
566                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
567            })
568            .await
569            .map_err(flatten_err)
570    }
571
572    /// Persists cycle counters and `init_once` state.
573    pub async fn update_task_cycle_state(
574        &self,
575        task_id: &str,
576        current_cycle: u32,
577        init_done: bool,
578    ) -> Result<()> {
579        let task_id = task_id.to_owned();
580        self.async_db
581            .writer()
582            .call(move |conn| {
583                state::update_task_cycle_state(conn, &task_id, current_cycle, init_done)
584                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
585            })
586            .await
587            .map_err(flatten_err)
588    }
589
590    /// Marks a task item as running.
591    pub async fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
592        let task_item_id = task_item_id.to_owned();
593        self.async_db
594            .writer()
595            .call(move |conn| {
596                items::mark_task_item_running(conn, &task_item_id)
597                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
598            })
599            .await
600            .map_err(flatten_err)
601    }
602
603    /// Sets a terminal status for a task item.
604    pub async fn set_task_item_terminal_status(
605        &self,
606        task_item_id: &str,
607        status: &str,
608    ) -> Result<()> {
609        let task_item_id = task_item_id.to_owned();
610        let status = status.to_owned();
611        self.async_db
612            .writer()
613            .call(move |conn| {
614                items::set_task_item_terminal_status(conn, &task_item_id, &status)
615                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
616            })
617            .await
618            .map_err(flatten_err)
619    }
620
621    /// Updates a task item to an arbitrary status.
622    pub async fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
623        let task_item_id = task_item_id.to_owned();
624        let status = status.to_owned();
625        self.async_db
626            .writer()
627            .call(move |conn| {
628                items::update_task_item_status(conn, &task_item_id, &status)
629                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
630            })
631            .await
632            .map_err(flatten_err)
633    }
634
635    /// Persists accumulated pipeline variables back to the task item's dynamic_vars column.
636    pub async fn update_task_item_pipeline_vars(
637        &self,
638        task_item_id: &str,
639        pipeline_vars_json: &str,
640    ) -> Result<()> {
641        let task_item_id = task_item_id.to_owned();
642        let pipeline_vars_json = pipeline_vars_json.to_owned();
643        self.async_db
644            .writer()
645            .call(move |conn| {
646                items::update_task_item_pipeline_vars(conn, &task_item_id, &pipeline_vars_json)
647                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
648            })
649            .await
650            .map_err(flatten_err)
651    }
652
653    /// Deletes a task and returns log paths that should be removed.
654    pub async fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
655        let task_id = task_id.to_owned();
656        self.async_db
657            .writer()
658            .call(move |conn| {
659                items::delete_task_and_collect_log_paths(conn, &task_id)
660                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
661            })
662            .await
663            .map_err(flatten_err)
664    }
665
666    /// Inserts a command-run record.
667    pub async fn insert_command_run(&self, run: NewCommandRun) -> Result<()> {
668        self.async_db
669            .writer()
670            .call(move |conn| {
671                items::insert_command_run(conn, &run)
672                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
673            })
674            .await
675            .map_err(flatten_err)
676    }
677
678    /// Inserts an event record.
679    pub async fn insert_event(&self, event: DbEventRecord) -> Result<()> {
680        self.async_db
681            .writer()
682            .call(move |conn| {
683                write_ops::insert_event(conn, &event)
684                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
685            })
686            .await
687            .map_err(flatten_err)
688    }
689
690    /// Updates an existing command-run record.
691    pub async fn update_command_run(&self, run: NewCommandRun) -> Result<()> {
692        self.async_db
693            .writer()
694            .call(move |conn| {
695                write_ops::update_command_run(conn, &run)
696                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
697            })
698            .await
699            .map_err(flatten_err)
700    }
701
702    /// Updates a command run and appends emitted events.
703    pub async fn update_command_run_with_events(
704        &self,
705        run: NewCommandRun,
706        events: Vec<DbEventRecord>,
707    ) -> Result<()> {
708        self.async_db
709            .writer()
710            .call(move |conn| {
711                write_ops::update_command_run_with_events(conn, &run, &events)
712                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
713            })
714            .await
715            .map_err(flatten_err)
716    }
717
718    /// Persists a completed phase result together with emitted events.
719    pub async fn persist_phase_result_with_events(
720        &self,
721        run: NewCommandRun,
722        events: Vec<DbEventRecord>,
723    ) -> Result<()> {
724        self.async_db
725            .writer()
726            .call(move |conn| {
727                write_ops::persist_phase_result_with_events(conn, &run, &events)
728                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
729            })
730            .await
731            .map_err(flatten_err)
732    }
733
734    /// Updates the PID associated with a running command.
735    pub async fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
736        let run_id = run_id.to_owned();
737        self.async_db
738            .writer()
739            .call(move |conn| {
740                write_ops::update_command_run_pid(conn, &run_id, pid)
741                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
742            })
743            .await
744            .map_err(flatten_err)
745    }
746
747    /// Returns active child PIDs for a task.
748    pub async fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
749        let task_id = task_id.to_owned();
750        self.async_db
751            .reader()
752            .call(move |conn| {
753                write_ops::find_active_child_pids(conn, &task_id)
754                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
755            })
756            .await
757            .map_err(flatten_err)
758    }
759
760    /// Returns in-flight command runs for a task (FR-038).
761    pub async fn find_inflight_command_runs_for_task(
762        &self,
763        task_id: &str,
764    ) -> Result<Vec<InflightRunRecord>> {
765        let task_id = task_id.to_owned();
766        self.async_db
767            .reader()
768            .call(move |conn| {
769                write_ops::find_inflight_command_runs_for_task(conn, &task_id)
770                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
771            })
772            .await
773            .map_err(flatten_err)
774    }
775
776    /// Returns completed runs whose parent items are still `pending` (FR-038).
777    pub async fn find_completed_runs_for_pending_items(
778        &self,
779        task_id: &str,
780    ) -> Result<Vec<write_ops::CompletedRunRecord>> {
781        let task_id = task_id.to_owned();
782        self.async_db
783            .reader()
784            .call(move |conn| {
785                write_ops::find_completed_runs_for_pending_items(conn, &task_id)
786                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
787            })
788            .await
789            .map_err(flatten_err)
790    }
791
792    /// Counts stale pending items (FR-038).
793    pub async fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
794        let task_id = task_id.to_owned();
795        self.async_db
796            .reader()
797            .call(move |conn| {
798                queries::count_stale_pending_items(conn, &task_id)
799                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
800            })
801            .await
802            .map_err(flatten_err)
803    }
804
805    /// FR-052: Counts recent heartbeat events for specified item IDs since cutoff.
806    pub async fn count_recent_heartbeats_for_items(
807        &self,
808        task_id: &str,
809        item_ids: &[String],
810        cutoff_ts: &str,
811    ) -> Result<i64> {
812        let task_id = task_id.to_owned();
813        let item_ids = item_ids.to_vec();
814        let cutoff_ts = cutoff_ts.to_owned();
815        self.async_db
816            .reader()
817            .call(move |conn| {
818                write_ops::count_recent_heartbeats_for_items(conn, &task_id, &item_ids, &cutoff_ts)
819                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
820            })
821            .await
822            .map_err(flatten_err)
823    }
824
825    /// Persists the serialized pipeline-variable map for a task.
826    pub async fn update_task_pipeline_vars(
827        &self,
828        task_id: &str,
829        pipeline_vars_json: &str,
830    ) -> Result<()> {
831        let task_id = task_id.to_owned();
832        let pipeline_vars_json = pipeline_vars_json.to_owned();
833        self.async_db
834            .writer()
835            .call(move |conn| {
836                write_ops::update_task_pipeline_vars(conn, &task_id, &pipeline_vars_json)
837                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
838            })
839            .await
840            .map_err(flatten_err)
841    }
842
843    /// Persists active ticket paths and preview content for a task item.
844    pub async fn update_task_item_tickets(
845        &self,
846        task_item_id: &str,
847        ticket_files_json: &str,
848        ticket_content_json: &str,
849    ) -> Result<()> {
850        let task_item_id = task_item_id.to_owned();
851        let ticket_files_json = ticket_files_json.to_owned();
852        let ticket_content_json = ticket_content_json.to_owned();
853        self.async_db
854            .writer()
855            .call(move |conn| {
856                write_ops::update_task_item_tickets(
857                    conn,
858                    &task_item_id,
859                    &ticket_files_json,
860                    &ticket_content_json,
861                )
862                .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
863            })
864            .await
865            .map_err(flatten_err)
866    }
867
868    /// Inserts a task-graph planning run.
869    pub async fn insert_task_graph_run(&self, run: NewTaskGraphRun) -> Result<()> {
870        self.async_db
871            .writer()
872            .call(move |conn| {
873                queries::insert_task_graph_run(conn, &run)
874                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
875            })
876            .await
877            .map_err(flatten_err)
878    }
879
880    /// Updates the status of a task-graph planning run.
881    pub async fn update_task_graph_run_status(
882        &self,
883        graph_run_id: &str,
884        status: &str,
885    ) -> Result<()> {
886        let graph_run_id = graph_run_id.to_owned();
887        let status = status.to_owned();
888        self.async_db
889            .writer()
890            .call(move |conn| {
891                queries::update_task_graph_run_status(conn, &graph_run_id, &status)
892                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
893            })
894            .await
895            .map_err(flatten_err)
896    }
897
898    /// Blanket-pause all running tasks and reset their items to pending.
899    /// Used during daemon shutdown before exec() to prevent orphaned state.
900    pub async fn pause_all_running_tasks_and_items(&self) -> Result<usize> {
901        self.async_db
902            .writer()
903            .call(move |conn| {
904                state::pause_all_running_tasks_and_items(conn)
905                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
906            })
907            .await
908            .map_err(flatten_err)
909    }
910
911    /// Pauses only tasks in `restart_pending` status and resets their running items.
912    pub async fn pause_restart_pending_tasks_and_items(&self) -> Result<usize> {
913        self.async_db
914            .writer()
915            .call(move |conn| {
916                state::pause_restart_pending_tasks_and_items(conn)
917                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
918            })
919            .await
920            .map_err(flatten_err)
921    }
922
923    /// Recovers all orphaned running items across all tasks.
924    pub async fn recover_orphaned_running_items(&self) -> Result<Vec<(String, Vec<String>)>> {
925        self.async_db
926            .writer()
927            .call(move |conn| {
928                state::recover_orphaned_running_items(conn)
929                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
930            })
931            .await
932            .map_err(flatten_err)
933    }
934
935    /// Recovers orphaned running items for a single task.
936    pub async fn recover_orphaned_running_items_for_task(
937        &self,
938        task_id: &str,
939    ) -> Result<Vec<String>> {
940        let task_id = task_id.to_owned();
941        self.async_db
942            .writer()
943            .call(move |conn| {
944                state::recover_orphaned_running_items_for_task(conn, &task_id)
945                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
946            })
947            .await
948            .map_err(flatten_err)
949    }
950
951    /// Recovers stalled running items older than the given threshold.
952    ///
953    /// Tasks in `exclude_task_ids` are skipped (they have active workers).
954    pub async fn recover_stalled_running_items(
955        &self,
956        stall_threshold_secs: u64,
957        exclude_task_ids: std::collections::HashSet<String>,
958    ) -> Result<Vec<(String, Vec<String>)>> {
959        self.async_db
960            .writer()
961            .call(move |conn| {
962                state::recover_stalled_running_items(conn, stall_threshold_secs, &exclude_task_ids)
963                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
964            })
965            .await
966            .map_err(flatten_err)
967    }
968
969    /// Persists one task-graph snapshot payload.
970    pub async fn insert_task_graph_snapshot(&self, snapshot: NewTaskGraphSnapshot) -> Result<()> {
971        self.async_db
972            .writer()
973            .call(move |conn| {
974                queries::insert_task_graph_snapshot(conn, &snapshot)
975                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
976            })
977            .await
978            .map_err(flatten_err)
979    }
980}
981
982#[cfg(test)]
983mod async_wrapper_tests {
984    use super::*;
985    use crate::dto::CreateTaskPayload;
986    use crate::task_ops::create_task_impl;
987    use crate::test_utils::TestState;
988
989    fn seed_task(fixture: &mut TestState) -> (std::sync::Arc<crate::state::InnerState>, String) {
990        let state = fixture.build();
991        let qa_file = state
992            .data_dir
993            .join("workspace/default/docs/qa/async-repo.md");
994        std::fs::write(&qa_file, "# async repo\n").expect("seed qa file");
995        let created = create_task_impl(
996            &state,
997            CreateTaskPayload {
998                name: Some("async-repo".to_string()),
999                goal: Some("async-repo-goal".to_string()),
1000                ..CreateTaskPayload::default()
1001            },
1002        )
1003        .expect("create task");
1004        (state, created.id)
1005    }
1006
1007    fn first_item_id(state: &crate::state::InnerState, task_id: &str) -> String {
1008        let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
1009        conn.query_row(
1010            "SELECT id FROM task_items WHERE task_id = ?1 ORDER BY order_no LIMIT 1",
1011            rusqlite::params![task_id],
1012            |row| row.get(0),
1013        )
1014        .expect("load item id")
1015    }
1016
1017    #[tokio::test]
1018    async fn async_repository_read_wrappers_round_trip() {
1019        let mut fixture = TestState::new();
1020        let (state, task_id) = seed_task(&mut fixture);
1021        let repo = &state.task_repo;
1022
1023        let resolved = repo
1024            .resolve_task_id(&task_id[..8])
1025            .await
1026            .expect("resolve task id");
1027        assert_eq!(resolved, task_id);
1028
1029        let summary = repo
1030            .load_task_summary(&task_id)
1031            .await
1032            .expect("load summary");
1033        assert_eq!(summary.name, "async-repo");
1034
1035        let detail = repo
1036            .load_task_detail_rows(&task_id)
1037            .await
1038            .expect("load detail rows");
1039        assert!(!detail.0.is_empty());
1040
1041        let counts = repo
1042            .load_task_item_counts(&task_id)
1043            .await
1044            .expect("load item counts");
1045        assert!(counts.0 >= 1);
1046
1047        let ids = repo
1048            .list_task_ids_ordered_by_created_desc()
1049            .await
1050            .expect("list task ids");
1051        assert_eq!(ids[0], task_id);
1052
1053        let resumable = repo
1054            .find_latest_resumable_task_id(true)
1055            .await
1056            .expect("find latest resumable");
1057        assert_eq!(resumable.as_deref(), Some(task_id.as_str()));
1058
1059        let runtime = repo
1060            .load_task_runtime_row(&task_id)
1061            .await
1062            .expect("load runtime row");
1063        assert_eq!(runtime.goal, "async-repo-goal");
1064
1065        let item_id = repo
1066            .first_task_item_id(&task_id)
1067            .await
1068            .expect("first item id query");
1069        assert!(item_id.is_some());
1070
1071        let unresolved = repo
1072            .count_unresolved_items(&task_id)
1073            .await
1074            .expect("count unresolved");
1075        assert_eq!(unresolved, 0);
1076
1077        let items = repo
1078            .list_task_items_for_cycle(&task_id)
1079            .await
1080            .expect("list items for cycle");
1081        assert!(!items.is_empty());
1082
1083        let status = repo.load_task_status(&task_id).await.expect("load status");
1084        assert_eq!(status.as_deref(), Some("created"));
1085
1086        let name = repo.load_task_name(&task_id).await.expect("load task name");
1087        assert_eq!(name.as_deref(), Some("async-repo"));
1088
1089        let runs = repo
1090            .list_task_log_runs(&task_id, 10)
1091            .await
1092            .expect("list log runs");
1093        assert!(runs.is_empty());
1094    }
1095
1096    #[tokio::test]
1097    async fn async_repository_write_wrappers_update_task_and_item_state() {
1098        let mut fixture = TestState::new();
1099        let (state, task_id) = seed_task(&mut fixture);
1100        let repo = &state.task_repo;
1101        let item_id = first_item_id(&state, &task_id);
1102
1103        repo.mark_task_item_running(&item_id)
1104            .await
1105            .expect("mark item running");
1106        repo.update_task_item_status(&item_id, "qa_failed")
1107            .await
1108            .expect("update item status");
1109        repo.set_task_item_terminal_status(&item_id, "completed")
1110            .await
1111            .expect("set terminal status");
1112        repo.set_task_status(&task_id, "running", false)
1113            .await
1114            .expect("set task status");
1115        repo.update_task_cycle_state(&task_id, 3, true)
1116            .await
1117            .expect("update task cycle state");
1118
1119        let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
1120        let task_row: (String, i64, i64) = conn
1121            .query_row(
1122                "SELECT status, current_cycle, init_done FROM tasks WHERE id = ?1",
1123                rusqlite::params![task_id],
1124                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1125            )
1126            .expect("load task row");
1127        assert_eq!(task_row.0, "running");
1128        assert_eq!(task_row.1, 3);
1129        assert_eq!(task_row.2, 1);
1130
1131        let item_status: String = conn
1132            .query_row(
1133                "SELECT status FROM task_items WHERE id = ?1",
1134                rusqlite::params![item_id],
1135                |row| row.get(0),
1136            )
1137            .expect("load item status");
1138        assert_eq!(item_status, "completed");
1139
1140        repo.set_task_status(&task_id, "paused", false)
1141            .await
1142            .expect("pause task before prepare");
1143        repo.prepare_task_for_start_batch(&task_id)
1144            .await
1145            .expect("prepare task for start");
1146        let prepared_status: String = conn
1147            .query_row(
1148                "SELECT status FROM tasks WHERE id = ?1",
1149                rusqlite::params![task_id],
1150                |row| row.get(0),
1151            )
1152            .expect("reload task status");
1153        assert_eq!(prepared_status, "running");
1154    }
1155
1156    #[tokio::test]
1157    async fn async_repository_insert_and_delete_command_runs() {
1158        let mut fixture = TestState::new();
1159        let (state, task_id) = seed_task(&mut fixture);
1160        let repo = &state.task_repo;
1161        let item_id = first_item_id(&state, &task_id);
1162        let stdout_path = state.data_dir.join("logs/async-wrapper-stdout.log");
1163        let stderr_path = state.data_dir.join("logs/async-wrapper-stderr.log");
1164        std::fs::create_dir_all(
1165            stdout_path
1166                .parent()
1167                .expect("stdout parent directory should exist"),
1168        )
1169        .expect("create logs dir");
1170        std::fs::write(&stdout_path, "stdout").expect("write stdout");
1171        std::fs::write(&stderr_path, "stderr").expect("write stderr");
1172
1173        repo.insert_command_run(NewCommandRun {
1174            id: "async-run-1".to_string(),
1175            task_item_id: item_id,
1176            phase: "qa".to_string(),
1177            command: "echo repo".to_string(),
1178            command_template: None,
1179            cwd: state.data_dir.display().to_string(),
1180            workspace_id: "default".to_string(),
1181            agent_id: "echo".to_string(),
1182            exit_code: 0,
1183            stdout_path: stdout_path.display().to_string(),
1184            stderr_path: stderr_path.display().to_string(),
1185            started_at: crate::config_load::now_ts(),
1186            ended_at: crate::config_load::now_ts(),
1187            interrupted: 0,
1188            output_json: "{}".to_string(),
1189            artifacts_json: "[]".to_string(),
1190            confidence: Some(1.0),
1191            quality_score: Some(1.0),
1192            validation_status: "passed".to_string(),
1193            session_id: None,
1194            machine_output_source: "stdout".to_string(),
1195            output_json_path: None,
1196            command_rule_index: None,
1197        })
1198        .await
1199        .expect("insert command run");
1200
1201        let runs = repo
1202            .list_task_log_runs(&task_id, 10)
1203            .await
1204            .expect("list runs after insert");
1205        assert_eq!(runs.len(), 1);
1206
1207        let paths = repo
1208            .delete_task_and_collect_log_paths(&task_id)
1209            .await
1210            .expect("delete task and collect log paths");
1211        assert_eq!(paths.len(), 2);
1212        assert!(
1213            paths
1214                .iter()
1215                .any(|path| path.ends_with("async-wrapper-stdout.log"))
1216        );
1217    }
1218
1219    #[test]
1220    fn sqlite_repository_graph_debug_wrappers_round_trip() {
1221        let mut fixture = TestState::new();
1222        let (state, task_id) = seed_task(&mut fixture);
1223        let repo =
1224            SqliteTaskRepository::new(types::TaskRepositorySource::from(state.db_path.clone()));
1225
1226        repo.insert_task_graph_run(&NewTaskGraphRun {
1227            graph_run_id: "sync-graph-run".to_string(),
1228            task_id: task_id.clone(),
1229            cycle: 4,
1230            mode: "dynamic_dag".to_string(),
1231            source: "adaptive_planner".to_string(),
1232            status: "materialized".to_string(),
1233            fallback_mode: Some("static_segment".to_string()),
1234            planner_failure_class: None,
1235            planner_failure_message: None,
1236            entry_node_id: Some("qa".to_string()),
1237            node_count: 2,
1238            edge_count: 1,
1239        })
1240        .expect("insert graph run");
1241        repo.update_task_graph_run_status("sync-graph-run", "completed")
1242            .expect("update graph run");
1243        repo.insert_task_graph_snapshot(&NewTaskGraphSnapshot {
1244            graph_run_id: "sync-graph-run".to_string(),
1245            task_id: task_id.clone(),
1246            snapshot_kind: "effective_graph".to_string(),
1247            payload_json: "{\"entry\":\"qa\"}".to_string(),
1248        })
1249        .expect("insert graph snapshot");
1250
1251        let bundles = repo
1252            .load_task_graph_debug_bundles(&task_id)
1253            .expect("load graph bundles");
1254        assert_eq!(bundles.len(), 1);
1255        assert_eq!(bundles[0].graph_run_id, "sync-graph-run");
1256        assert_eq!(bundles[0].status, "completed");
1257        assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"qa\"}");
1258    }
1259
1260    #[tokio::test]
1261    async fn async_repository_graph_debug_wrappers_round_trip() {
1262        let mut fixture = TestState::new();
1263        let (state, task_id) = seed_task(&mut fixture);
1264        let repo = &state.task_repo;
1265
1266        repo.insert_task_graph_run(NewTaskGraphRun {
1267            graph_run_id: "async-graph-run".to_string(),
1268            task_id: task_id.clone(),
1269            cycle: 5,
1270            mode: "dynamic_dag".to_string(),
1271            source: "adaptive_planner".to_string(),
1272            status: "materialized".to_string(),
1273            fallback_mode: Some("deterministic_dag".to_string()),
1274            planner_failure_class: Some("invalid_json".to_string()),
1275            planner_failure_message: Some("planner output broken".to_string()),
1276            entry_node_id: Some("fix".to_string()),
1277            node_count: 3,
1278            edge_count: 2,
1279        })
1280        .await
1281        .expect("insert graph run");
1282        repo.update_task_graph_run_status("async-graph-run", "completed")
1283            .await
1284            .expect("update graph run");
1285        repo.insert_task_graph_snapshot(NewTaskGraphSnapshot {
1286            graph_run_id: "async-graph-run".to_string(),
1287            task_id: task_id.clone(),
1288            snapshot_kind: "effective_graph".to_string(),
1289            payload_json: "{\"entry\":\"fix\"}".to_string(),
1290        })
1291        .await
1292        .expect("insert graph snapshot");
1293
1294        let bundles = repo
1295            .load_task_graph_debug_bundles(&task_id)
1296            .await
1297            .expect("load graph bundles");
1298        assert_eq!(bundles.len(), 1);
1299        assert_eq!(bundles[0].graph_run_id, "async-graph-run");
1300        assert_eq!(
1301            bundles[0].fallback_mode.as_deref(),
1302            Some("deterministic_dag")
1303        );
1304        assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"fix\"}");
1305    }
1306}