Skip to main content

agent_orchestrator/task_repository/
mod.rs

1#![cfg_attr(
2    not(test),
3    deny(clippy::panic, clippy::unwrap_used, clippy::expect_used)
4)]
5
6mod command_run;
7mod items;
8mod queries;
9mod state;
10mod trait_def;
11mod types;
12mod write_ops;
13
14#[cfg(test)]
15mod tests;
16
17pub use command_run::NewCommandRun;
18pub use trait_def::TaskRepository;
19pub use types::{
20    DbEventRecord, NewTaskGraphRun, NewTaskGraphSnapshot, TaskLogRunRow, TaskRepositoryConn,
21    TaskRepositorySource, TaskRuntimeRow,
22};
23pub use write_ops::{CompletedRunRecord, InflightRunRecord};
24
25use crate::async_database::{flatten_err, AsyncDatabase};
26use crate::dto::{CommandRunDto, EventDto, TaskGraphDebugBundle, TaskItemDto};
27use anyhow::Result;
28use std::sync::Arc;
29
30/// Tuple returned by detail queries: items, runs, events, and graph bundles.
31pub type TaskDetailRows = (
32    Vec<TaskItemDto>,
33    Vec<CommandRunDto>,
34    Vec<EventDto>,
35    Vec<TaskGraphDebugBundle>,
36);
37
38/// Synchronous SQLite-backed implementation of [`TaskRepository`].
39pub struct SqliteTaskRepository {
40    source: types::TaskRepositorySource,
41}
42
43impl SqliteTaskRepository {
44    /// Creates a repository backed by the given connection source.
45    pub fn new<T>(source: T) -> Self
46    where
47        T: Into<types::TaskRepositorySource>,
48    {
49        Self {
50            source: source.into(),
51        }
52    }
53
54    fn connection(&self) -> Result<types::TaskRepositoryConn> {
55        self.source.connection()
56    }
57}
58
59impl TaskRepository for SqliteTaskRepository {
60    fn resolve_task_id(&self, task_id_or_prefix: &str) -> Result<String> {
61        let conn = self.connection()?;
62        queries::resolve_task_id(&conn, task_id_or_prefix)
63    }
64
65    fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
66        let conn = self.connection()?;
67        queries::load_task_summary(&conn, task_id)
68    }
69
70    fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
71        let conn = self.connection()?;
72        queries::load_task_detail_rows(&conn, task_id)
73    }
74
75    fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
76        let conn = self.connection()?;
77        queries::load_task_item_counts(&conn, task_id)
78    }
79
80    fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
81        let conn = self.connection()?;
82        queries::list_task_ids_ordered_by_created_desc(&conn)
83    }
84
85    fn find_latest_resumable_task_id(&self, include_pending: bool) -> Result<Option<String>> {
86        let conn = self.connection()?;
87        queries::find_latest_resumable_task_id(&conn, include_pending)
88    }
89
90    fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
91        let conn = self.connection()?;
92        queries::load_task_runtime_row(&conn, task_id)
93    }
94
95    fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
96        let conn = self.connection()?;
97        queries::first_task_item_id(&conn, task_id)
98    }
99
100    fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
101        let conn = self.connection()?;
102        queries::count_unresolved_items(&conn, task_id)
103    }
104
105    fn list_task_items_for_cycle(&self, task_id: &str) -> Result<Vec<crate::dto::TaskItemRow>> {
106        let conn = self.connection()?;
107        queries::list_task_items_for_cycle(&conn, task_id)
108    }
109
110    fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
111        let conn = self.connection()?;
112        queries::load_task_status(&conn, task_id)
113    }
114
115    fn set_task_status(&self, task_id: &str, status: &str, set_completed: bool) -> Result<()> {
116        let conn = self.connection()?;
117        state::set_task_status(&conn, task_id, status, set_completed)
118    }
119
120    fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
121        let conn = self.connection()?;
122        state::prepare_task_for_start_batch(&conn, task_id)
123    }
124
125    fn update_task_cycle_state(
126        &self,
127        task_id: &str,
128        current_cycle: u32,
129        init_done: bool,
130    ) -> Result<()> {
131        let conn = self.connection()?;
132        state::update_task_cycle_state(&conn, task_id, current_cycle, init_done)
133    }
134
135    fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
136        let conn = self.connection()?;
137        items::mark_task_item_running(&conn, task_item_id)
138    }
139
140    fn set_task_item_terminal_status(&self, task_item_id: &str, status: &str) -> Result<()> {
141        let conn = self.connection()?;
142        items::set_task_item_terminal_status(&conn, task_item_id, status)
143    }
144
145    fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
146        let conn = self.connection()?;
147        items::update_task_item_status(&conn, task_item_id, status)
148    }
149
150    fn update_task_item_pipeline_vars(
151        &self,
152        task_item_id: &str,
153        pipeline_vars_json: &str,
154    ) -> Result<()> {
155        let conn = self.connection()?;
156        items::update_task_item_pipeline_vars(&conn, task_item_id, pipeline_vars_json)
157    }
158
159    fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
160        let conn = self.connection()?;
161        queries::load_task_name(&conn, task_id)
162    }
163
164    fn list_task_log_runs(&self, task_id: &str, limit: usize) -> Result<Vec<TaskLogRunRow>> {
165        let conn = self.connection()?;
166        queries::list_task_log_runs(&conn, task_id, limit)
167    }
168
169    fn insert_task_graph_run(&self, run: &NewTaskGraphRun) -> Result<()> {
170        let conn = self.connection()?;
171        queries::insert_task_graph_run(&conn, run)
172    }
173
174    fn update_task_graph_run_status(&self, graph_run_id: &str, status: &str) -> Result<()> {
175        let conn = self.connection()?;
176        queries::update_task_graph_run_status(&conn, graph_run_id, status)
177    }
178
179    fn insert_task_graph_snapshot(&self, snapshot: &NewTaskGraphSnapshot) -> Result<()> {
180        let conn = self.connection()?;
181        queries::insert_task_graph_snapshot(&conn, snapshot)
182    }
183
184    fn load_task_graph_debug_bundles(
185        &self,
186        task_id: &str,
187    ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
188        let conn = self.connection()?;
189        queries::load_task_graph_debug_bundles(&conn, task_id)
190    }
191
192    fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
193        let conn = self.connection()?;
194        items::delete_task_and_collect_log_paths(&conn, task_id)
195    }
196
197    fn insert_command_run(&self, run: &NewCommandRun) -> Result<()> {
198        let conn = self.connection()?;
199        items::insert_command_run(&conn, run)
200    }
201
202    fn insert_event(&self, event: &DbEventRecord) -> Result<()> {
203        let conn = self.connection()?;
204        write_ops::insert_event(&conn, event)
205    }
206
207    fn update_command_run(&self, run: &NewCommandRun) -> Result<()> {
208        let conn = self.connection()?;
209        write_ops::update_command_run(&conn, run)
210    }
211
212    fn update_command_run_with_events(
213        &self,
214        run: &NewCommandRun,
215        events: &[DbEventRecord],
216    ) -> Result<()> {
217        let conn = self.connection()?;
218        write_ops::update_command_run_with_events(&conn, run, events)
219    }
220
221    fn persist_phase_result_with_events(
222        &self,
223        run: &NewCommandRun,
224        events: &[DbEventRecord],
225    ) -> Result<()> {
226        let conn = self.connection()?;
227        write_ops::persist_phase_result_with_events(&conn, run, events)
228    }
229
230    fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
231        let conn = self.connection()?;
232        write_ops::update_command_run_pid(&conn, run_id, pid)
233    }
234
235    fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
236        let conn = self.connection()?;
237        write_ops::find_active_child_pids(&conn, task_id)
238    }
239
240    fn find_inflight_command_runs_for_task(&self, task_id: &str) -> Result<Vec<InflightRunRecord>> {
241        let conn = self.connection()?;
242        write_ops::find_inflight_command_runs_for_task(&conn, task_id)
243    }
244
245    fn find_completed_runs_for_pending_items(
246        &self,
247        task_id: &str,
248    ) -> Result<Vec<write_ops::CompletedRunRecord>> {
249        let conn = self.connection()?;
250        write_ops::find_completed_runs_for_pending_items(&conn, task_id)
251    }
252
253    fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
254        let conn = self.connection()?;
255        queries::count_stale_pending_items(&conn, task_id)
256    }
257
258    fn count_recent_heartbeats_for_items(
259        &self,
260        task_id: &str,
261        item_ids: &[String],
262        cutoff_ts: &str,
263    ) -> Result<i64> {
264        let conn = self.connection()?;
265        write_ops::count_recent_heartbeats_for_items(&conn, task_id, item_ids, cutoff_ts)
266    }
267
268    fn update_task_pipeline_vars(&self, task_id: &str, pipeline_vars_json: &str) -> Result<()> {
269        let conn = self.connection()?;
270        write_ops::update_task_pipeline_vars(&conn, task_id, pipeline_vars_json)
271    }
272
273    fn update_task_item_tickets(
274        &self,
275        task_item_id: &str,
276        ticket_files_json: &str,
277        ticket_content_json: &str,
278    ) -> Result<()> {
279        let conn = self.connection()?;
280        write_ops::update_task_item_tickets(
281            &conn,
282            task_item_id,
283            ticket_files_json,
284            ticket_content_json,
285        )
286    }
287}
288
289/// Async wrapper around [`SqliteTaskRepository`] built on [`AsyncDatabase`].
290pub struct AsyncSqliteTaskRepository {
291    async_db: Arc<AsyncDatabase>,
292}
293
294impl AsyncSqliteTaskRepository {
295    /// Creates a new async repository wrapper.
296    pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
297        Self { async_db }
298    }
299
300    // ── Read operations (use reader) ──
301
302    /// Resolves a full task identifier from an ID prefix.
303    pub async fn resolve_task_id(&self, prefix: &str) -> Result<String> {
304        let prefix = prefix.to_owned();
305        self.async_db
306            .reader()
307            .call(move |conn| {
308                queries::resolve_task_id(conn, &prefix)
309                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
310            })
311            .await
312            .map_err(flatten_err)
313    }
314
315    /// Loads a summary row for a task.
316    pub async fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
317        let task_id = task_id.to_owned();
318        self.async_db
319            .reader()
320            .call(move |conn| {
321                queries::load_task_summary(conn, &task_id)
322                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
323            })
324            .await
325            .map_err(flatten_err)
326    }
327
328    /// Loads the full detail bundle for a task.
329    pub async fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
330        let task_id = task_id.to_owned();
331        self.async_db
332            .reader()
333            .call(move |conn| {
334                queries::load_task_detail_rows(conn, &task_id)
335                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
336            })
337            .await
338            .map_err(flatten_err)
339    }
340
341    /// Loads `(total, resolved, unresolved)` item counts for a task.
342    pub async fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
343        let task_id = task_id.to_owned();
344        self.async_db
345            .reader()
346            .call(move |conn| {
347                queries::load_task_item_counts(conn, &task_id)
348                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
349            })
350            .await
351            .map_err(flatten_err)
352    }
353
354    /// Lists task identifiers ordered by creation time descending.
355    pub async fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
356        self.async_db
357            .reader()
358            .call(move |conn| {
359                queries::list_task_ids_ordered_by_created_desc(conn)
360                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
361            })
362            .await
363            .map_err(flatten_err)
364    }
365
366    /// Returns the latest resumable task, optionally including pending tasks.
367    pub async fn find_latest_resumable_task_id(
368        &self,
369        include_pending: bool,
370    ) -> Result<Option<String>> {
371        self.async_db
372            .reader()
373            .call(move |conn| {
374                queries::find_latest_resumable_task_id(conn, include_pending)
375                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
376            })
377            .await
378            .map_err(flatten_err)
379    }
380
381    /// Loads execution state required to resume a task.
382    pub async fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
383        let task_id = task_id.to_owned();
384        self.async_db
385            .reader()
386            .call(move |conn| {
387                queries::load_task_runtime_row(conn, &task_id)
388                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
389            })
390            .await
391            .map_err(flatten_err)
392    }
393
394    /// Returns the first task-item identifier for a task, if present.
395    pub async fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
396        let task_id = task_id.to_owned();
397        self.async_db
398            .reader()
399            .call(move |conn| {
400                queries::first_task_item_id(conn, &task_id)
401                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
402            })
403            .await
404            .map_err(flatten_err)
405    }
406
407    /// Counts unresolved task items.
408    pub async fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
409        let task_id = task_id.to_owned();
410        self.async_db
411            .reader()
412            .call(move |conn| {
413                queries::count_unresolved_items(conn, &task_id)
414                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
415            })
416            .await
417            .map_err(flatten_err)
418    }
419
420    /// Lists task items participating in the current cycle.
421    pub async fn list_task_items_for_cycle(
422        &self,
423        task_id: &str,
424    ) -> Result<Vec<crate::dto::TaskItemRow>> {
425        let task_id = task_id.to_owned();
426        self.async_db
427            .reader()
428            .call(move |conn| {
429                queries::list_task_items_for_cycle(conn, &task_id)
430                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
431            })
432            .await
433            .map_err(flatten_err)
434    }
435
436    /// Loads the current task status string.
437    pub async fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
438        let task_id = task_id.to_owned();
439        self.async_db
440            .reader()
441            .call(move |conn| {
442                queries::load_task_status(conn, &task_id)
443                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
444            })
445            .await
446            .map_err(flatten_err)
447    }
448
449    /// Loads the human-readable task name.
450    pub async fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
451        let task_id = task_id.to_owned();
452        self.async_db
453            .reader()
454            .call(move |conn| {
455                queries::load_task_name(conn, &task_id)
456                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
457            })
458            .await
459            .map_err(flatten_err)
460    }
461
462    /// Lists recent command runs used for log inspection.
463    pub async fn list_task_log_runs(
464        &self,
465        task_id: &str,
466        limit: usize,
467    ) -> Result<Vec<TaskLogRunRow>> {
468        let task_id = task_id.to_owned();
469        self.async_db
470            .reader()
471            .call(move |conn| {
472                queries::list_task_log_runs(conn, &task_id, limit)
473                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
474            })
475            .await
476            .map_err(flatten_err)
477    }
478
479    /// Loads graph-planning debug bundles for a task.
480    pub async fn load_task_graph_debug_bundles(
481        &self,
482        task_id: &str,
483    ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
484        let task_id = task_id.to_owned();
485        self.async_db
486            .reader()
487            .call(move |conn| {
488                queries::load_task_graph_debug_bundles(conn, &task_id)
489                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
490            })
491            .await
492            .map_err(flatten_err)
493    }
494
495    // ── Write operations (use writer) ──
496
497    /// Updates a task status and optionally marks completion.
498    pub async fn set_task_status(
499        &self,
500        task_id: &str,
501        status: &str,
502        set_completed: bool,
503    ) -> Result<()> {
504        let task_id = task_id.to_owned();
505        let status = status.to_owned();
506        self.async_db
507            .writer()
508            .call(move |conn| {
509                state::set_task_status(conn, &task_id, &status, set_completed)
510                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
511            })
512            .await
513            .map_err(flatten_err)
514    }
515
516    /// Resets unresolved items back to pending without changing the task status.
517    /// Called before enqueuing a task so the worker can re-process them.
518    pub async fn reset_unresolved_items(&self, task_id: &str) -> Result<()> {
519        let task_id = task_id.to_owned();
520        self.async_db
521            .writer()
522            .call(move |conn| {
523                state::reset_unresolved_items(conn, &task_id)
524                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
525            })
526            .await
527            .map_err(flatten_err)
528    }
529
530    /// Resets a task into a fresh batch-start state.
531    pub async fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
532        let task_id = task_id.to_owned();
533        self.async_db
534            .writer()
535            .call(move |conn| {
536                state::prepare_task_for_start_batch(conn, &task_id)
537                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
538            })
539            .await
540            .map_err(flatten_err)
541    }
542
543    /// Persists cycle counters and `init_once` state.
544    pub async fn update_task_cycle_state(
545        &self,
546        task_id: &str,
547        current_cycle: u32,
548        init_done: bool,
549    ) -> Result<()> {
550        let task_id = task_id.to_owned();
551        self.async_db
552            .writer()
553            .call(move |conn| {
554                state::update_task_cycle_state(conn, &task_id, current_cycle, init_done)
555                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
556            })
557            .await
558            .map_err(flatten_err)
559    }
560
561    /// Marks a task item as running.
562    pub async fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
563        let task_item_id = task_item_id.to_owned();
564        self.async_db
565            .writer()
566            .call(move |conn| {
567                items::mark_task_item_running(conn, &task_item_id)
568                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
569            })
570            .await
571            .map_err(flatten_err)
572    }
573
574    /// Sets a terminal status for a task item.
575    pub async fn set_task_item_terminal_status(
576        &self,
577        task_item_id: &str,
578        status: &str,
579    ) -> Result<()> {
580        let task_item_id = task_item_id.to_owned();
581        let status = status.to_owned();
582        self.async_db
583            .writer()
584            .call(move |conn| {
585                items::set_task_item_terminal_status(conn, &task_item_id, &status)
586                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
587            })
588            .await
589            .map_err(flatten_err)
590    }
591
592    /// Updates a task item to an arbitrary status.
593    pub async fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
594        let task_item_id = task_item_id.to_owned();
595        let status = status.to_owned();
596        self.async_db
597            .writer()
598            .call(move |conn| {
599                items::update_task_item_status(conn, &task_item_id, &status)
600                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
601            })
602            .await
603            .map_err(flatten_err)
604    }
605
606    /// Persists accumulated pipeline variables back to the task item's dynamic_vars column.
607    pub async fn update_task_item_pipeline_vars(
608        &self,
609        task_item_id: &str,
610        pipeline_vars_json: &str,
611    ) -> Result<()> {
612        let task_item_id = task_item_id.to_owned();
613        let pipeline_vars_json = pipeline_vars_json.to_owned();
614        self.async_db
615            .writer()
616            .call(move |conn| {
617                items::update_task_item_pipeline_vars(conn, &task_item_id, &pipeline_vars_json)
618                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
619            })
620            .await
621            .map_err(flatten_err)
622    }
623
624    /// Deletes a task and returns log paths that should be removed.
625    pub async fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
626        let task_id = task_id.to_owned();
627        self.async_db
628            .writer()
629            .call(move |conn| {
630                items::delete_task_and_collect_log_paths(conn, &task_id)
631                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
632            })
633            .await
634            .map_err(flatten_err)
635    }
636
637    /// Inserts a command-run record.
638    pub async fn insert_command_run(&self, run: NewCommandRun) -> Result<()> {
639        self.async_db
640            .writer()
641            .call(move |conn| {
642                items::insert_command_run(conn, &run)
643                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
644            })
645            .await
646            .map_err(flatten_err)
647    }
648
649    /// Inserts an event record.
650    pub async fn insert_event(&self, event: DbEventRecord) -> Result<()> {
651        self.async_db
652            .writer()
653            .call(move |conn| {
654                write_ops::insert_event(conn, &event)
655                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
656            })
657            .await
658            .map_err(flatten_err)
659    }
660
661    /// Updates an existing command-run record.
662    pub async fn update_command_run(&self, run: NewCommandRun) -> Result<()> {
663        self.async_db
664            .writer()
665            .call(move |conn| {
666                write_ops::update_command_run(conn, &run)
667                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
668            })
669            .await
670            .map_err(flatten_err)
671    }
672
673    /// Updates a command run and appends emitted events.
674    pub async fn update_command_run_with_events(
675        &self,
676        run: NewCommandRun,
677        events: Vec<DbEventRecord>,
678    ) -> Result<()> {
679        self.async_db
680            .writer()
681            .call(move |conn| {
682                write_ops::update_command_run_with_events(conn, &run, &events)
683                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
684            })
685            .await
686            .map_err(flatten_err)
687    }
688
689    /// Persists a completed phase result together with emitted events.
690    pub async fn persist_phase_result_with_events(
691        &self,
692        run: NewCommandRun,
693        events: Vec<DbEventRecord>,
694    ) -> Result<()> {
695        self.async_db
696            .writer()
697            .call(move |conn| {
698                write_ops::persist_phase_result_with_events(conn, &run, &events)
699                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
700            })
701            .await
702            .map_err(flatten_err)
703    }
704
705    /// Updates the PID associated with a running command.
706    pub async fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
707        let run_id = run_id.to_owned();
708        self.async_db
709            .writer()
710            .call(move |conn| {
711                write_ops::update_command_run_pid(conn, &run_id, pid)
712                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
713            })
714            .await
715            .map_err(flatten_err)
716    }
717
718    /// Returns active child PIDs for a task.
719    pub async fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
720        let task_id = task_id.to_owned();
721        self.async_db
722            .reader()
723            .call(move |conn| {
724                write_ops::find_active_child_pids(conn, &task_id)
725                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
726            })
727            .await
728            .map_err(flatten_err)
729    }
730
731    /// Returns in-flight command runs for a task (FR-038).
732    pub async fn find_inflight_command_runs_for_task(
733        &self,
734        task_id: &str,
735    ) -> Result<Vec<InflightRunRecord>> {
736        let task_id = task_id.to_owned();
737        self.async_db
738            .reader()
739            .call(move |conn| {
740                write_ops::find_inflight_command_runs_for_task(conn, &task_id)
741                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
742            })
743            .await
744            .map_err(flatten_err)
745    }
746
747    /// Returns completed runs whose parent items are still `pending` (FR-038).
748    pub async fn find_completed_runs_for_pending_items(
749        &self,
750        task_id: &str,
751    ) -> Result<Vec<write_ops::CompletedRunRecord>> {
752        let task_id = task_id.to_owned();
753        self.async_db
754            .reader()
755            .call(move |conn| {
756                write_ops::find_completed_runs_for_pending_items(conn, &task_id)
757                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
758            })
759            .await
760            .map_err(flatten_err)
761    }
762
763    /// Counts stale pending items (FR-038).
764    pub async fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
765        let task_id = task_id.to_owned();
766        self.async_db
767            .reader()
768            .call(move |conn| {
769                queries::count_stale_pending_items(conn, &task_id)
770                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
771            })
772            .await
773            .map_err(flatten_err)
774    }
775
776    /// FR-052: Counts recent heartbeat events for specified item IDs since cutoff.
777    pub async fn count_recent_heartbeats_for_items(
778        &self,
779        task_id: &str,
780        item_ids: &[String],
781        cutoff_ts: &str,
782    ) -> Result<i64> {
783        let task_id = task_id.to_owned();
784        let item_ids = item_ids.to_vec();
785        let cutoff_ts = cutoff_ts.to_owned();
786        self.async_db
787            .reader()
788            .call(move |conn| {
789                write_ops::count_recent_heartbeats_for_items(conn, &task_id, &item_ids, &cutoff_ts)
790                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
791            })
792            .await
793            .map_err(flatten_err)
794    }
795
796    /// Persists the serialized pipeline-variable map for a task.
797    pub async fn update_task_pipeline_vars(
798        &self,
799        task_id: &str,
800        pipeline_vars_json: &str,
801    ) -> Result<()> {
802        let task_id = task_id.to_owned();
803        let pipeline_vars_json = pipeline_vars_json.to_owned();
804        self.async_db
805            .writer()
806            .call(move |conn| {
807                write_ops::update_task_pipeline_vars(conn, &task_id, &pipeline_vars_json)
808                    .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
809            })
810            .await
811            .map_err(flatten_err)
812    }
813
814    /// Persists active ticket paths and preview content for a task item.
815    pub async fn update_task_item_tickets(
816        &self,
817        task_item_id: &str,
818        ticket_files_json: &str,
819        ticket_content_json: &str,
820    ) -> Result<()> {
821        let task_item_id = task_item_id.to_owned();
822        let ticket_files_json = ticket_files_json.to_owned();
823        let ticket_content_json = ticket_content_json.to_owned();
824        self.async_db
825            .writer()
826            .call(move |conn| {
827                write_ops::update_task_item_tickets(
828                    conn,
829                    &task_item_id,
830                    &ticket_files_json,
831                    &ticket_content_json,
832                )
833                .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
834            })
835            .await
836            .map_err(flatten_err)
837    }
838
839    /// Inserts a task-graph planning run.
840    pub async fn insert_task_graph_run(&self, run: NewTaskGraphRun) -> Result<()> {
841        self.async_db
842            .writer()
843            .call(move |conn| {
844                queries::insert_task_graph_run(conn, &run)
845                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
846            })
847            .await
848            .map_err(flatten_err)
849    }
850
851    /// Updates the status of a task-graph planning run.
852    pub async fn update_task_graph_run_status(
853        &self,
854        graph_run_id: &str,
855        status: &str,
856    ) -> Result<()> {
857        let graph_run_id = graph_run_id.to_owned();
858        let status = status.to_owned();
859        self.async_db
860            .writer()
861            .call(move |conn| {
862                queries::update_task_graph_run_status(conn, &graph_run_id, &status)
863                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
864            })
865            .await
866            .map_err(flatten_err)
867    }
868
869    /// Blanket-pause all running tasks and reset their items to pending.
870    /// Used during daemon shutdown before exec() to prevent orphaned state.
871    pub async fn pause_all_running_tasks_and_items(&self) -> Result<usize> {
872        self.async_db
873            .writer()
874            .call(move |conn| {
875                state::pause_all_running_tasks_and_items(conn)
876                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
877            })
878            .await
879            .map_err(flatten_err)
880    }
881
882    /// Recovers all orphaned running items across all tasks.
883    pub async fn recover_orphaned_running_items(&self) -> Result<Vec<(String, Vec<String>)>> {
884        self.async_db
885            .writer()
886            .call(move |conn| {
887                state::recover_orphaned_running_items(conn)
888                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
889            })
890            .await
891            .map_err(flatten_err)
892    }
893
894    /// Recovers orphaned running items for a single task.
895    pub async fn recover_orphaned_running_items_for_task(
896        &self,
897        task_id: &str,
898    ) -> Result<Vec<String>> {
899        let task_id = task_id.to_owned();
900        self.async_db
901            .writer()
902            .call(move |conn| {
903                state::recover_orphaned_running_items_for_task(conn, &task_id)
904                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
905            })
906            .await
907            .map_err(flatten_err)
908    }
909
910    /// Recovers stalled running items older than the given threshold.
911    pub async fn recover_stalled_running_items(
912        &self,
913        stall_threshold_secs: u64,
914    ) -> Result<Vec<(String, Vec<String>)>> {
915        self.async_db
916            .writer()
917            .call(move |conn| {
918                state::recover_stalled_running_items(conn, stall_threshold_secs)
919                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
920            })
921            .await
922            .map_err(flatten_err)
923    }
924
925    /// Persists one task-graph snapshot payload.
926    pub async fn insert_task_graph_snapshot(&self, snapshot: NewTaskGraphSnapshot) -> Result<()> {
927        self.async_db
928            .writer()
929            .call(move |conn| {
930                queries::insert_task_graph_snapshot(conn, &snapshot)
931                    .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
932            })
933            .await
934            .map_err(flatten_err)
935    }
936}
937
938#[cfg(test)]
939mod async_wrapper_tests {
940    use super::*;
941    use crate::dto::CreateTaskPayload;
942    use crate::task_ops::create_task_impl;
943    use crate::test_utils::TestState;
944
945    fn seed_task(fixture: &mut TestState) -> (std::sync::Arc<crate::state::InnerState>, String) {
946        let state = fixture.build();
947        let qa_file = state
948            .data_dir
949            .join("workspace/default/docs/qa/async-repo.md");
950        std::fs::write(&qa_file, "# async repo\n").expect("seed qa file");
951        let created = create_task_impl(
952            &state,
953            CreateTaskPayload {
954                name: Some("async-repo".to_string()),
955                goal: Some("async-repo-goal".to_string()),
956                ..CreateTaskPayload::default()
957            },
958        )
959        .expect("create task");
960        (state, created.id)
961    }
962
963    fn first_item_id(state: &crate::state::InnerState, task_id: &str) -> String {
964        let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
965        conn.query_row(
966            "SELECT id FROM task_items WHERE task_id = ?1 ORDER BY order_no LIMIT 1",
967            rusqlite::params![task_id],
968            |row| row.get(0),
969        )
970        .expect("load item id")
971    }
972
973    #[tokio::test]
974    async fn async_repository_read_wrappers_round_trip() {
975        let mut fixture = TestState::new();
976        let (state, task_id) = seed_task(&mut fixture);
977        let repo = &state.task_repo;
978
979        let resolved = repo
980            .resolve_task_id(&task_id[..8])
981            .await
982            .expect("resolve task id");
983        assert_eq!(resolved, task_id);
984
985        let summary = repo
986            .load_task_summary(&task_id)
987            .await
988            .expect("load summary");
989        assert_eq!(summary.name, "async-repo");
990
991        let detail = repo
992            .load_task_detail_rows(&task_id)
993            .await
994            .expect("load detail rows");
995        assert!(!detail.0.is_empty());
996
997        let counts = repo
998            .load_task_item_counts(&task_id)
999            .await
1000            .expect("load item counts");
1001        assert!(counts.0 >= 1);
1002
1003        let ids = repo
1004            .list_task_ids_ordered_by_created_desc()
1005            .await
1006            .expect("list task ids");
1007        assert_eq!(ids[0], task_id);
1008
1009        let resumable = repo
1010            .find_latest_resumable_task_id(true)
1011            .await
1012            .expect("find latest resumable");
1013        assert_eq!(resumable.as_deref(), Some(task_id.as_str()));
1014
1015        let runtime = repo
1016            .load_task_runtime_row(&task_id)
1017            .await
1018            .expect("load runtime row");
1019        assert_eq!(runtime.goal, "async-repo-goal");
1020
1021        let item_id = repo
1022            .first_task_item_id(&task_id)
1023            .await
1024            .expect("first item id query");
1025        assert!(item_id.is_some());
1026
1027        let unresolved = repo
1028            .count_unresolved_items(&task_id)
1029            .await
1030            .expect("count unresolved");
1031        assert_eq!(unresolved, 0);
1032
1033        let items = repo
1034            .list_task_items_for_cycle(&task_id)
1035            .await
1036            .expect("list items for cycle");
1037        assert!(!items.is_empty());
1038
1039        let status = repo.load_task_status(&task_id).await.expect("load status");
1040        assert_eq!(status.as_deref(), Some("created"));
1041
1042        let name = repo.load_task_name(&task_id).await.expect("load task name");
1043        assert_eq!(name.as_deref(), Some("async-repo"));
1044
1045        let runs = repo
1046            .list_task_log_runs(&task_id, 10)
1047            .await
1048            .expect("list log runs");
1049        assert!(runs.is_empty());
1050    }
1051
1052    #[tokio::test]
1053    async fn async_repository_write_wrappers_update_task_and_item_state() {
1054        let mut fixture = TestState::new();
1055        let (state, task_id) = seed_task(&mut fixture);
1056        let repo = &state.task_repo;
1057        let item_id = first_item_id(&state, &task_id);
1058
1059        repo.mark_task_item_running(&item_id)
1060            .await
1061            .expect("mark item running");
1062        repo.update_task_item_status(&item_id, "qa_failed")
1063            .await
1064            .expect("update item status");
1065        repo.set_task_item_terminal_status(&item_id, "completed")
1066            .await
1067            .expect("set terminal status");
1068        repo.set_task_status(&task_id, "running", false)
1069            .await
1070            .expect("set task status");
1071        repo.update_task_cycle_state(&task_id, 3, true)
1072            .await
1073            .expect("update task cycle state");
1074
1075        let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
1076        let task_row: (String, i64, i64) = conn
1077            .query_row(
1078                "SELECT status, current_cycle, init_done FROM tasks WHERE id = ?1",
1079                rusqlite::params![task_id],
1080                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1081            )
1082            .expect("load task row");
1083        assert_eq!(task_row.0, "running");
1084        assert_eq!(task_row.1, 3);
1085        assert_eq!(task_row.2, 1);
1086
1087        let item_status: String = conn
1088            .query_row(
1089                "SELECT status FROM task_items WHERE id = ?1",
1090                rusqlite::params![item_id],
1091                |row| row.get(0),
1092            )
1093            .expect("load item status");
1094        assert_eq!(item_status, "completed");
1095
1096        repo.set_task_status(&task_id, "paused", false)
1097            .await
1098            .expect("pause task before prepare");
1099        repo.prepare_task_for_start_batch(&task_id)
1100            .await
1101            .expect("prepare task for start");
1102        let prepared_status: String = conn
1103            .query_row(
1104                "SELECT status FROM tasks WHERE id = ?1",
1105                rusqlite::params![task_id],
1106                |row| row.get(0),
1107            )
1108            .expect("reload task status");
1109        assert_eq!(prepared_status, "running");
1110    }
1111
1112    #[tokio::test]
1113    async fn async_repository_insert_and_delete_command_runs() {
1114        let mut fixture = TestState::new();
1115        let (state, task_id) = seed_task(&mut fixture);
1116        let repo = &state.task_repo;
1117        let item_id = first_item_id(&state, &task_id);
1118        let stdout_path = state.data_dir.join("logs/async-wrapper-stdout.log");
1119        let stderr_path = state.data_dir.join("logs/async-wrapper-stderr.log");
1120        std::fs::create_dir_all(
1121            stdout_path
1122                .parent()
1123                .expect("stdout parent directory should exist"),
1124        )
1125        .expect("create logs dir");
1126        std::fs::write(&stdout_path, "stdout").expect("write stdout");
1127        std::fs::write(&stderr_path, "stderr").expect("write stderr");
1128
1129        repo.insert_command_run(NewCommandRun {
1130            id: "async-run-1".to_string(),
1131            task_item_id: item_id,
1132            phase: "qa".to_string(),
1133            command: "echo repo".to_string(),
1134            command_template: None,
1135            cwd: state.data_dir.display().to_string(),
1136            workspace_id: "default".to_string(),
1137            agent_id: "echo".to_string(),
1138            exit_code: 0,
1139            stdout_path: stdout_path.display().to_string(),
1140            stderr_path: stderr_path.display().to_string(),
1141            started_at: crate::config_load::now_ts(),
1142            ended_at: crate::config_load::now_ts(),
1143            interrupted: 0,
1144            output_json: "{}".to_string(),
1145            artifacts_json: "[]".to_string(),
1146            confidence: Some(1.0),
1147            quality_score: Some(1.0),
1148            validation_status: "passed".to_string(),
1149            session_id: None,
1150            machine_output_source: "stdout".to_string(),
1151            output_json_path: None,
1152        })
1153        .await
1154        .expect("insert command run");
1155
1156        let runs = repo
1157            .list_task_log_runs(&task_id, 10)
1158            .await
1159            .expect("list runs after insert");
1160        assert_eq!(runs.len(), 1);
1161
1162        let paths = repo
1163            .delete_task_and_collect_log_paths(&task_id)
1164            .await
1165            .expect("delete task and collect log paths");
1166        assert_eq!(paths.len(), 2);
1167        assert!(paths
1168            .iter()
1169            .any(|path| path.ends_with("async-wrapper-stdout.log")));
1170    }
1171
1172    #[test]
1173    fn sqlite_repository_graph_debug_wrappers_round_trip() {
1174        let mut fixture = TestState::new();
1175        let (state, task_id) = seed_task(&mut fixture);
1176        let repo =
1177            SqliteTaskRepository::new(types::TaskRepositorySource::from(state.db_path.clone()));
1178
1179        repo.insert_task_graph_run(&NewTaskGraphRun {
1180            graph_run_id: "sync-graph-run".to_string(),
1181            task_id: task_id.clone(),
1182            cycle: 4,
1183            mode: "dynamic_dag".to_string(),
1184            source: "adaptive_planner".to_string(),
1185            status: "materialized".to_string(),
1186            fallback_mode: Some("static_segment".to_string()),
1187            planner_failure_class: None,
1188            planner_failure_message: None,
1189            entry_node_id: Some("qa".to_string()),
1190            node_count: 2,
1191            edge_count: 1,
1192        })
1193        .expect("insert graph run");
1194        repo.update_task_graph_run_status("sync-graph-run", "completed")
1195            .expect("update graph run");
1196        repo.insert_task_graph_snapshot(&NewTaskGraphSnapshot {
1197            graph_run_id: "sync-graph-run".to_string(),
1198            task_id: task_id.clone(),
1199            snapshot_kind: "effective_graph".to_string(),
1200            payload_json: "{\"entry\":\"qa\"}".to_string(),
1201        })
1202        .expect("insert graph snapshot");
1203
1204        let bundles = repo
1205            .load_task_graph_debug_bundles(&task_id)
1206            .expect("load graph bundles");
1207        assert_eq!(bundles.len(), 1);
1208        assert_eq!(bundles[0].graph_run_id, "sync-graph-run");
1209        assert_eq!(bundles[0].status, "completed");
1210        assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"qa\"}");
1211    }
1212
1213    #[tokio::test]
1214    async fn async_repository_graph_debug_wrappers_round_trip() {
1215        let mut fixture = TestState::new();
1216        let (state, task_id) = seed_task(&mut fixture);
1217        let repo = &state.task_repo;
1218
1219        repo.insert_task_graph_run(NewTaskGraphRun {
1220            graph_run_id: "async-graph-run".to_string(),
1221            task_id: task_id.clone(),
1222            cycle: 5,
1223            mode: "dynamic_dag".to_string(),
1224            source: "adaptive_planner".to_string(),
1225            status: "materialized".to_string(),
1226            fallback_mode: Some("deterministic_dag".to_string()),
1227            planner_failure_class: Some("invalid_json".to_string()),
1228            planner_failure_message: Some("planner output broken".to_string()),
1229            entry_node_id: Some("fix".to_string()),
1230            node_count: 3,
1231            edge_count: 2,
1232        })
1233        .await
1234        .expect("insert graph run");
1235        repo.update_task_graph_run_status("async-graph-run", "completed")
1236            .await
1237            .expect("update graph run");
1238        repo.insert_task_graph_snapshot(NewTaskGraphSnapshot {
1239            graph_run_id: "async-graph-run".to_string(),
1240            task_id: task_id.clone(),
1241            snapshot_kind: "effective_graph".to_string(),
1242            payload_json: "{\"entry\":\"fix\"}".to_string(),
1243        })
1244        .await
1245        .expect("insert graph snapshot");
1246
1247        let bundles = repo
1248            .load_task_graph_debug_bundles(&task_id)
1249            .await
1250            .expect("load graph bundles");
1251        assert_eq!(bundles.len(), 1);
1252        assert_eq!(bundles[0].graph_run_id, "async-graph-run");
1253        assert_eq!(
1254            bundles[0].fallback_mode.as_deref(),
1255            Some("deterministic_dag")
1256        );
1257        assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"fix\"}");
1258    }
1259}