1#![cfg_attr(
2 not(test),
3 deny(clippy::panic, clippy::unwrap_used, clippy::expect_used)
4)]
5
6mod command_run;
7pub(crate) mod items;
8mod queries;
9mod state;
10mod trait_def;
11mod types;
12mod write_ops;
13
14#[cfg(test)]
15mod tests;
16
17pub use command_run::NewCommandRun;
18pub use trait_def::{
19 CommandRunRepository, EventRepository, TaskGraphRepository, TaskItemMutRepository,
20 TaskItemQueryRepository, TaskQueryRepository, TaskRepository, TaskStateRepository,
21};
22pub use types::{
23 DbEventRecord, NewTaskGraphRun, NewTaskGraphSnapshot, TaskLogRunRow, TaskRepositoryConn,
24 TaskRepositorySource, TaskRuntimeRow,
25};
26pub use write_ops::{CompletedRunRecord, InflightRunRecord};
27
28use crate::async_database::{AsyncDatabase, flatten_err};
29use crate::dto::{CommandRunDto, EventDto, TaskGraphDebugBundle, TaskItemDto};
30use anyhow::Result;
31use std::sync::Arc;
32
33pub type TaskDetailRows = (
35 Vec<TaskItemDto>,
36 Vec<CommandRunDto>,
37 Vec<EventDto>,
38 Vec<TaskGraphDebugBundle>,
39);
40
41pub struct SqliteTaskRepository {
43 source: types::TaskRepositorySource,
44}
45
46impl SqliteTaskRepository {
47 pub fn new<T>(source: T) -> Self
49 where
50 T: Into<types::TaskRepositorySource>,
51 {
52 Self {
53 source: source.into(),
54 }
55 }
56
57 fn connection(&self) -> Result<types::TaskRepositoryConn> {
58 self.source.connection()
59 }
60}
61
62impl TaskQueryRepository for SqliteTaskRepository {
65 fn resolve_task_id(&self, task_id_or_prefix: &str) -> Result<String> {
66 let conn = self.connection()?;
67 queries::resolve_task_id(&conn, task_id_or_prefix)
68 }
69
70 fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
71 let conn = self.connection()?;
72 queries::load_task_summary(&conn, task_id)
73 }
74
75 fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
76 let conn = self.connection()?;
77 queries::load_task_detail_rows(&conn, task_id)
78 }
79
80 fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
81 let conn = self.connection()?;
82 queries::load_task_item_counts(&conn, task_id)
83 }
84
85 fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
86 let conn = self.connection()?;
87 queries::list_task_ids_ordered_by_created_desc(&conn)
88 }
89
90 fn find_latest_resumable_task_id(&self, include_pending: bool) -> Result<Option<String>> {
91 let conn = self.connection()?;
92 queries::find_latest_resumable_task_id(&conn, include_pending)
93 }
94
95 fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
96 let conn = self.connection()?;
97 queries::load_task_runtime_row(&conn, task_id)
98 }
99
100 fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
101 let conn = self.connection()?;
102 queries::load_task_status(&conn, task_id)
103 }
104
105 fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
106 let conn = self.connection()?;
107 queries::load_task_name(&conn, task_id)
108 }
109}
110
111impl TaskItemQueryRepository for SqliteTaskRepository {
114 fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
115 let conn = self.connection()?;
116 queries::first_task_item_id(&conn, task_id)
117 }
118
119 fn list_task_items_for_cycle(&self, task_id: &str) -> Result<Vec<crate::dto::TaskItemRow>> {
120 let conn = self.connection()?;
121 queries::list_task_items_for_cycle(&conn, task_id)
122 }
123
124 fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
125 let conn = self.connection()?;
126 queries::count_unresolved_items(&conn, task_id)
127 }
128
129 fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
130 let conn = self.connection()?;
131 queries::count_stale_pending_items(&conn, task_id)
132 }
133
134 fn count_recent_heartbeats_for_items(
135 &self,
136 task_id: &str,
137 item_ids: &[String],
138 cutoff_ts: &str,
139 ) -> Result<i64> {
140 let conn = self.connection()?;
141 write_ops::count_recent_heartbeats_for_items(&conn, task_id, item_ids, cutoff_ts)
142 }
143}
144
145impl TaskStateRepository for SqliteTaskRepository {
148 fn set_task_status(&self, task_id: &str, status: &str, set_completed: bool) -> Result<()> {
149 let conn = self.connection()?;
150 state::set_task_status(&conn, task_id, status, set_completed)
151 }
152
153 fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
154 let conn = self.connection()?;
155 state::prepare_task_for_start_batch(&conn, task_id)
156 }
157
158 fn update_task_cycle_state(
159 &self,
160 task_id: &str,
161 current_cycle: u32,
162 init_done: bool,
163 ) -> Result<()> {
164 let conn = self.connection()?;
165 state::update_task_cycle_state(&conn, task_id, current_cycle, init_done)
166 }
167
168 fn update_task_pipeline_vars(&self, task_id: &str, pipeline_vars_json: &str) -> Result<()> {
169 let conn = self.connection()?;
170 write_ops::update_task_pipeline_vars(&conn, task_id, pipeline_vars_json)
171 }
172
173 fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
174 let conn = self.connection()?;
175 items::delete_task_and_collect_log_paths(&conn, task_id)
176 }
177}
178
179impl TaskItemMutRepository for SqliteTaskRepository {
182 fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
183 let conn = self.connection()?;
184 items::mark_task_item_running(&conn, task_item_id)
185 }
186
187 fn set_task_item_terminal_status(&self, task_item_id: &str, status: &str) -> Result<()> {
188 let conn = self.connection()?;
189 items::set_task_item_terminal_status(&conn, task_item_id, status)
190 }
191
192 fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
193 let conn = self.connection()?;
194 items::update_task_item_status(&conn, task_item_id, status)
195 }
196
197 fn update_task_item_pipeline_vars(
198 &self,
199 task_item_id: &str,
200 pipeline_vars_json: &str,
201 ) -> Result<()> {
202 let conn = self.connection()?;
203 items::update_task_item_pipeline_vars(&conn, task_item_id, pipeline_vars_json)
204 }
205
206 fn update_task_item_tickets(
207 &self,
208 task_item_id: &str,
209 ticket_files_json: &str,
210 ticket_content_json: &str,
211 ) -> Result<()> {
212 let conn = self.connection()?;
213 write_ops::update_task_item_tickets(
214 &conn,
215 task_item_id,
216 ticket_files_json,
217 ticket_content_json,
218 )
219 }
220}
221
222impl CommandRunRepository for SqliteTaskRepository {
225 fn insert_command_run(&self, run: &NewCommandRun) -> Result<()> {
226 let conn = self.connection()?;
227 items::insert_command_run(&conn, run)
228 }
229
230 fn update_command_run(&self, run: &NewCommandRun) -> Result<()> {
231 let conn = self.connection()?;
232 write_ops::update_command_run(&conn, run)
233 }
234
235 fn update_command_run_with_events(
236 &self,
237 run: &NewCommandRun,
238 events: &[DbEventRecord],
239 ) -> Result<()> {
240 let conn = self.connection()?;
241 write_ops::update_command_run_with_events(&conn, run, events)
242 }
243
244 fn persist_phase_result_with_events(
245 &self,
246 run: &NewCommandRun,
247 events: &[DbEventRecord],
248 ) -> Result<()> {
249 let conn = self.connection()?;
250 write_ops::persist_phase_result_with_events(&conn, run, events)
251 }
252
253 fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
254 let conn = self.connection()?;
255 write_ops::update_command_run_pid(&conn, run_id, pid)
256 }
257
258 fn list_task_log_runs(&self, task_id: &str, limit: usize) -> Result<Vec<TaskLogRunRow>> {
259 let conn = self.connection()?;
260 queries::list_task_log_runs(&conn, task_id, limit)
261 }
262
263 fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
264 let conn = self.connection()?;
265 write_ops::find_active_child_pids(&conn, task_id)
266 }
267
268 fn find_inflight_command_runs_for_task(&self, task_id: &str) -> Result<Vec<InflightRunRecord>> {
269 let conn = self.connection()?;
270 write_ops::find_inflight_command_runs_for_task(&conn, task_id)
271 }
272
273 fn find_completed_runs_for_pending_items(
274 &self,
275 task_id: &str,
276 ) -> Result<Vec<write_ops::CompletedRunRecord>> {
277 let conn = self.connection()?;
278 write_ops::find_completed_runs_for_pending_items(&conn, task_id)
279 }
280}
281
282impl EventRepository for SqliteTaskRepository {
285 fn insert_event(&self, event: &DbEventRecord) -> Result<()> {
286 let conn = self.connection()?;
287 write_ops::insert_event(&conn, event)
288 }
289}
290
291impl TaskGraphRepository for SqliteTaskRepository {
294 fn insert_task_graph_run(&self, run: &NewTaskGraphRun) -> Result<()> {
295 let conn = self.connection()?;
296 queries::insert_task_graph_run(&conn, run)
297 }
298
299 fn update_task_graph_run_status(&self, graph_run_id: &str, status: &str) -> Result<()> {
300 let conn = self.connection()?;
301 queries::update_task_graph_run_status(&conn, graph_run_id, status)
302 }
303
304 fn insert_task_graph_snapshot(&self, snapshot: &NewTaskGraphSnapshot) -> Result<()> {
305 let conn = self.connection()?;
306 queries::insert_task_graph_snapshot(&conn, snapshot)
307 }
308
309 fn load_task_graph_debug_bundles(
310 &self,
311 task_id: &str,
312 ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
313 let conn = self.connection()?;
314 queries::load_task_graph_debug_bundles(&conn, task_id)
315 }
316}
317
318pub struct AsyncSqliteTaskRepository {
320 async_db: Arc<AsyncDatabase>,
321}
322
323impl AsyncSqliteTaskRepository {
324 pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
326 Self { async_db }
327 }
328
329 pub async fn resolve_task_id(&self, prefix: &str) -> Result<String> {
333 let prefix = prefix.to_owned();
334 self.async_db
335 .reader()
336 .call(move |conn| {
337 queries::resolve_task_id(conn, &prefix)
338 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
339 })
340 .await
341 .map_err(flatten_err)
342 }
343
344 pub async fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
346 let task_id = task_id.to_owned();
347 self.async_db
348 .reader()
349 .call(move |conn| {
350 queries::load_task_summary(conn, &task_id)
351 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
352 })
353 .await
354 .map_err(flatten_err)
355 }
356
357 pub async fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
359 let task_id = task_id.to_owned();
360 self.async_db
361 .reader()
362 .call(move |conn| {
363 queries::load_task_detail_rows(conn, &task_id)
364 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
365 })
366 .await
367 .map_err(flatten_err)
368 }
369
370 pub async fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
372 let task_id = task_id.to_owned();
373 self.async_db
374 .reader()
375 .call(move |conn| {
376 queries::load_task_item_counts(conn, &task_id)
377 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
378 })
379 .await
380 .map_err(flatten_err)
381 }
382
383 pub async fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
385 self.async_db
386 .reader()
387 .call(move |conn| {
388 queries::list_task_ids_ordered_by_created_desc(conn)
389 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
390 })
391 .await
392 .map_err(flatten_err)
393 }
394
395 pub async fn find_latest_resumable_task_id(
397 &self,
398 include_pending: bool,
399 ) -> Result<Option<String>> {
400 self.async_db
401 .reader()
402 .call(move |conn| {
403 queries::find_latest_resumable_task_id(conn, include_pending)
404 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
405 })
406 .await
407 .map_err(flatten_err)
408 }
409
410 pub async fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
412 let task_id = task_id.to_owned();
413 self.async_db
414 .reader()
415 .call(move |conn| {
416 queries::load_task_runtime_row(conn, &task_id)
417 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
418 })
419 .await
420 .map_err(flatten_err)
421 }
422
423 pub async fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
425 let task_id = task_id.to_owned();
426 self.async_db
427 .reader()
428 .call(move |conn| {
429 queries::first_task_item_id(conn, &task_id)
430 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
431 })
432 .await
433 .map_err(flatten_err)
434 }
435
436 pub async fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
438 let task_id = task_id.to_owned();
439 self.async_db
440 .reader()
441 .call(move |conn| {
442 queries::count_unresolved_items(conn, &task_id)
443 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
444 })
445 .await
446 .map_err(flatten_err)
447 }
448
449 pub async fn list_task_items_for_cycle(
451 &self,
452 task_id: &str,
453 ) -> Result<Vec<crate::dto::TaskItemRow>> {
454 let task_id = task_id.to_owned();
455 self.async_db
456 .reader()
457 .call(move |conn| {
458 queries::list_task_items_for_cycle(conn, &task_id)
459 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
460 })
461 .await
462 .map_err(flatten_err)
463 }
464
465 pub async fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
467 let task_id = task_id.to_owned();
468 self.async_db
469 .reader()
470 .call(move |conn| {
471 queries::load_task_status(conn, &task_id)
472 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
473 })
474 .await
475 .map_err(flatten_err)
476 }
477
478 pub async fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
480 let task_id = task_id.to_owned();
481 self.async_db
482 .reader()
483 .call(move |conn| {
484 queries::load_task_name(conn, &task_id)
485 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
486 })
487 .await
488 .map_err(flatten_err)
489 }
490
491 pub async fn list_task_log_runs(
493 &self,
494 task_id: &str,
495 limit: usize,
496 ) -> Result<Vec<TaskLogRunRow>> {
497 let task_id = task_id.to_owned();
498 self.async_db
499 .reader()
500 .call(move |conn| {
501 queries::list_task_log_runs(conn, &task_id, limit)
502 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
503 })
504 .await
505 .map_err(flatten_err)
506 }
507
508 pub async fn load_task_graph_debug_bundles(
510 &self,
511 task_id: &str,
512 ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
513 let task_id = task_id.to_owned();
514 self.async_db
515 .reader()
516 .call(move |conn| {
517 queries::load_task_graph_debug_bundles(conn, &task_id)
518 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
519 })
520 .await
521 .map_err(flatten_err)
522 }
523
524 pub async fn set_task_status(
528 &self,
529 task_id: &str,
530 status: &str,
531 set_completed: bool,
532 ) -> Result<()> {
533 let task_id = task_id.to_owned();
534 let status = status.to_owned();
535 self.async_db
536 .writer()
537 .call(move |conn| {
538 state::set_task_status(conn, &task_id, &status, set_completed)
539 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
540 })
541 .await
542 .map_err(flatten_err)
543 }
544
545 pub async fn reset_unresolved_items(&self, task_id: &str) -> Result<()> {
548 let task_id = task_id.to_owned();
549 self.async_db
550 .writer()
551 .call(move |conn| {
552 state::reset_unresolved_items(conn, &task_id)
553 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
554 })
555 .await
556 .map_err(flatten_err)
557 }
558
559 pub async fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
561 let task_id = task_id.to_owned();
562 self.async_db
563 .writer()
564 .call(move |conn| {
565 state::prepare_task_for_start_batch(conn, &task_id)
566 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
567 })
568 .await
569 .map_err(flatten_err)
570 }
571
572 pub async fn update_task_cycle_state(
574 &self,
575 task_id: &str,
576 current_cycle: u32,
577 init_done: bool,
578 ) -> Result<()> {
579 let task_id = task_id.to_owned();
580 self.async_db
581 .writer()
582 .call(move |conn| {
583 state::update_task_cycle_state(conn, &task_id, current_cycle, init_done)
584 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
585 })
586 .await
587 .map_err(flatten_err)
588 }
589
590 pub async fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
592 let task_item_id = task_item_id.to_owned();
593 self.async_db
594 .writer()
595 .call(move |conn| {
596 items::mark_task_item_running(conn, &task_item_id)
597 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
598 })
599 .await
600 .map_err(flatten_err)
601 }
602
603 pub async fn set_task_item_terminal_status(
605 &self,
606 task_item_id: &str,
607 status: &str,
608 ) -> Result<()> {
609 let task_item_id = task_item_id.to_owned();
610 let status = status.to_owned();
611 self.async_db
612 .writer()
613 .call(move |conn| {
614 items::set_task_item_terminal_status(conn, &task_item_id, &status)
615 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
616 })
617 .await
618 .map_err(flatten_err)
619 }
620
621 pub async fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
623 let task_item_id = task_item_id.to_owned();
624 let status = status.to_owned();
625 self.async_db
626 .writer()
627 .call(move |conn| {
628 items::update_task_item_status(conn, &task_item_id, &status)
629 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
630 })
631 .await
632 .map_err(flatten_err)
633 }
634
635 pub async fn update_task_item_pipeline_vars(
637 &self,
638 task_item_id: &str,
639 pipeline_vars_json: &str,
640 ) -> Result<()> {
641 let task_item_id = task_item_id.to_owned();
642 let pipeline_vars_json = pipeline_vars_json.to_owned();
643 self.async_db
644 .writer()
645 .call(move |conn| {
646 items::update_task_item_pipeline_vars(conn, &task_item_id, &pipeline_vars_json)
647 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
648 })
649 .await
650 .map_err(flatten_err)
651 }
652
653 pub async fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
655 let task_id = task_id.to_owned();
656 self.async_db
657 .writer()
658 .call(move |conn| {
659 items::delete_task_and_collect_log_paths(conn, &task_id)
660 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
661 })
662 .await
663 .map_err(flatten_err)
664 }
665
666 pub async fn insert_command_run(&self, run: NewCommandRun) -> Result<()> {
668 self.async_db
669 .writer()
670 .call(move |conn| {
671 items::insert_command_run(conn, &run)
672 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
673 })
674 .await
675 .map_err(flatten_err)
676 }
677
678 pub async fn insert_event(&self, event: DbEventRecord) -> Result<()> {
680 self.async_db
681 .writer()
682 .call(move |conn| {
683 write_ops::insert_event(conn, &event)
684 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
685 })
686 .await
687 .map_err(flatten_err)
688 }
689
690 pub async fn update_command_run(&self, run: NewCommandRun) -> Result<()> {
692 self.async_db
693 .writer()
694 .call(move |conn| {
695 write_ops::update_command_run(conn, &run)
696 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
697 })
698 .await
699 .map_err(flatten_err)
700 }
701
702 pub async fn update_command_run_with_events(
704 &self,
705 run: NewCommandRun,
706 events: Vec<DbEventRecord>,
707 ) -> Result<()> {
708 self.async_db
709 .writer()
710 .call(move |conn| {
711 write_ops::update_command_run_with_events(conn, &run, &events)
712 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
713 })
714 .await
715 .map_err(flatten_err)
716 }
717
718 pub async fn persist_phase_result_with_events(
720 &self,
721 run: NewCommandRun,
722 events: Vec<DbEventRecord>,
723 ) -> Result<()> {
724 self.async_db
725 .writer()
726 .call(move |conn| {
727 write_ops::persist_phase_result_with_events(conn, &run, &events)
728 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
729 })
730 .await
731 .map_err(flatten_err)
732 }
733
734 pub async fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
736 let run_id = run_id.to_owned();
737 self.async_db
738 .writer()
739 .call(move |conn| {
740 write_ops::update_command_run_pid(conn, &run_id, pid)
741 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
742 })
743 .await
744 .map_err(flatten_err)
745 }
746
747 pub async fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
749 let task_id = task_id.to_owned();
750 self.async_db
751 .reader()
752 .call(move |conn| {
753 write_ops::find_active_child_pids(conn, &task_id)
754 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
755 })
756 .await
757 .map_err(flatten_err)
758 }
759
760 pub async fn find_inflight_command_runs_for_task(
762 &self,
763 task_id: &str,
764 ) -> Result<Vec<InflightRunRecord>> {
765 let task_id = task_id.to_owned();
766 self.async_db
767 .reader()
768 .call(move |conn| {
769 write_ops::find_inflight_command_runs_for_task(conn, &task_id)
770 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
771 })
772 .await
773 .map_err(flatten_err)
774 }
775
776 pub async fn find_completed_runs_for_pending_items(
778 &self,
779 task_id: &str,
780 ) -> Result<Vec<write_ops::CompletedRunRecord>> {
781 let task_id = task_id.to_owned();
782 self.async_db
783 .reader()
784 .call(move |conn| {
785 write_ops::find_completed_runs_for_pending_items(conn, &task_id)
786 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
787 })
788 .await
789 .map_err(flatten_err)
790 }
791
792 pub async fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
794 let task_id = task_id.to_owned();
795 self.async_db
796 .reader()
797 .call(move |conn| {
798 queries::count_stale_pending_items(conn, &task_id)
799 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
800 })
801 .await
802 .map_err(flatten_err)
803 }
804
805 pub async fn count_recent_heartbeats_for_items(
807 &self,
808 task_id: &str,
809 item_ids: &[String],
810 cutoff_ts: &str,
811 ) -> Result<i64> {
812 let task_id = task_id.to_owned();
813 let item_ids = item_ids.to_vec();
814 let cutoff_ts = cutoff_ts.to_owned();
815 self.async_db
816 .reader()
817 .call(move |conn| {
818 write_ops::count_recent_heartbeats_for_items(conn, &task_id, &item_ids, &cutoff_ts)
819 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
820 })
821 .await
822 .map_err(flatten_err)
823 }
824
825 pub async fn update_task_pipeline_vars(
827 &self,
828 task_id: &str,
829 pipeline_vars_json: &str,
830 ) -> Result<()> {
831 let task_id = task_id.to_owned();
832 let pipeline_vars_json = pipeline_vars_json.to_owned();
833 self.async_db
834 .writer()
835 .call(move |conn| {
836 write_ops::update_task_pipeline_vars(conn, &task_id, &pipeline_vars_json)
837 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
838 })
839 .await
840 .map_err(flatten_err)
841 }
842
843 pub async fn update_task_item_tickets(
845 &self,
846 task_item_id: &str,
847 ticket_files_json: &str,
848 ticket_content_json: &str,
849 ) -> Result<()> {
850 let task_item_id = task_item_id.to_owned();
851 let ticket_files_json = ticket_files_json.to_owned();
852 let ticket_content_json = ticket_content_json.to_owned();
853 self.async_db
854 .writer()
855 .call(move |conn| {
856 write_ops::update_task_item_tickets(
857 conn,
858 &task_item_id,
859 &ticket_files_json,
860 &ticket_content_json,
861 )
862 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
863 })
864 .await
865 .map_err(flatten_err)
866 }
867
868 pub async fn insert_task_graph_run(&self, run: NewTaskGraphRun) -> Result<()> {
870 self.async_db
871 .writer()
872 .call(move |conn| {
873 queries::insert_task_graph_run(conn, &run)
874 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
875 })
876 .await
877 .map_err(flatten_err)
878 }
879
880 pub async fn update_task_graph_run_status(
882 &self,
883 graph_run_id: &str,
884 status: &str,
885 ) -> Result<()> {
886 let graph_run_id = graph_run_id.to_owned();
887 let status = status.to_owned();
888 self.async_db
889 .writer()
890 .call(move |conn| {
891 queries::update_task_graph_run_status(conn, &graph_run_id, &status)
892 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
893 })
894 .await
895 .map_err(flatten_err)
896 }
897
898 pub async fn pause_all_running_tasks_and_items(&self) -> Result<usize> {
901 self.async_db
902 .writer()
903 .call(move |conn| {
904 state::pause_all_running_tasks_and_items(conn)
905 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
906 })
907 .await
908 .map_err(flatten_err)
909 }
910
911 pub async fn pause_restart_pending_tasks_and_items(&self) -> Result<usize> {
913 self.async_db
914 .writer()
915 .call(move |conn| {
916 state::pause_restart_pending_tasks_and_items(conn)
917 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
918 })
919 .await
920 .map_err(flatten_err)
921 }
922
923 pub async fn recover_orphaned_running_items(&self) -> Result<Vec<(String, Vec<String>)>> {
925 self.async_db
926 .writer()
927 .call(move |conn| {
928 state::recover_orphaned_running_items(conn)
929 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
930 })
931 .await
932 .map_err(flatten_err)
933 }
934
935 pub async fn recover_orphaned_running_items_for_task(
937 &self,
938 task_id: &str,
939 ) -> Result<Vec<String>> {
940 let task_id = task_id.to_owned();
941 self.async_db
942 .writer()
943 .call(move |conn| {
944 state::recover_orphaned_running_items_for_task(conn, &task_id)
945 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
946 })
947 .await
948 .map_err(flatten_err)
949 }
950
951 pub async fn recover_stalled_running_items(
955 &self,
956 stall_threshold_secs: u64,
957 exclude_task_ids: std::collections::HashSet<String>,
958 ) -> Result<Vec<(String, Vec<String>)>> {
959 self.async_db
960 .writer()
961 .call(move |conn| {
962 state::recover_stalled_running_items(conn, stall_threshold_secs, &exclude_task_ids)
963 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
964 })
965 .await
966 .map_err(flatten_err)
967 }
968
969 pub async fn insert_task_graph_snapshot(&self, snapshot: NewTaskGraphSnapshot) -> Result<()> {
971 self.async_db
972 .writer()
973 .call(move |conn| {
974 queries::insert_task_graph_snapshot(conn, &snapshot)
975 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
976 })
977 .await
978 .map_err(flatten_err)
979 }
980}
981
982#[cfg(test)]
983mod async_wrapper_tests {
984 use super::*;
985 use crate::dto::CreateTaskPayload;
986 use crate::task_ops::create_task_impl;
987 use crate::test_utils::TestState;
988
989 fn seed_task(fixture: &mut TestState) -> (std::sync::Arc<crate::state::InnerState>, String) {
990 let state = fixture.build();
991 let qa_file = state
992 .data_dir
993 .join("workspace/default/docs/qa/async-repo.md");
994 std::fs::write(&qa_file, "# async repo\n").expect("seed qa file");
995 let created = create_task_impl(
996 &state,
997 CreateTaskPayload {
998 name: Some("async-repo".to_string()),
999 goal: Some("async-repo-goal".to_string()),
1000 ..CreateTaskPayload::default()
1001 },
1002 )
1003 .expect("create task");
1004 (state, created.id)
1005 }
1006
1007 fn first_item_id(state: &crate::state::InnerState, task_id: &str) -> String {
1008 let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
1009 conn.query_row(
1010 "SELECT id FROM task_items WHERE task_id = ?1 ORDER BY order_no LIMIT 1",
1011 rusqlite::params![task_id],
1012 |row| row.get(0),
1013 )
1014 .expect("load item id")
1015 }
1016
1017 #[tokio::test]
1018 async fn async_repository_read_wrappers_round_trip() {
1019 let mut fixture = TestState::new();
1020 let (state, task_id) = seed_task(&mut fixture);
1021 let repo = &state.task_repo;
1022
1023 let resolved = repo
1024 .resolve_task_id(&task_id[..8])
1025 .await
1026 .expect("resolve task id");
1027 assert_eq!(resolved, task_id);
1028
1029 let summary = repo
1030 .load_task_summary(&task_id)
1031 .await
1032 .expect("load summary");
1033 assert_eq!(summary.name, "async-repo");
1034
1035 let detail = repo
1036 .load_task_detail_rows(&task_id)
1037 .await
1038 .expect("load detail rows");
1039 assert!(!detail.0.is_empty());
1040
1041 let counts = repo
1042 .load_task_item_counts(&task_id)
1043 .await
1044 .expect("load item counts");
1045 assert!(counts.0 >= 1);
1046
1047 let ids = repo
1048 .list_task_ids_ordered_by_created_desc()
1049 .await
1050 .expect("list task ids");
1051 assert_eq!(ids[0], task_id);
1052
1053 let resumable = repo
1054 .find_latest_resumable_task_id(true)
1055 .await
1056 .expect("find latest resumable");
1057 assert_eq!(resumable.as_deref(), Some(task_id.as_str()));
1058
1059 let runtime = repo
1060 .load_task_runtime_row(&task_id)
1061 .await
1062 .expect("load runtime row");
1063 assert_eq!(runtime.goal, "async-repo-goal");
1064
1065 let item_id = repo
1066 .first_task_item_id(&task_id)
1067 .await
1068 .expect("first item id query");
1069 assert!(item_id.is_some());
1070
1071 let unresolved = repo
1072 .count_unresolved_items(&task_id)
1073 .await
1074 .expect("count unresolved");
1075 assert_eq!(unresolved, 0);
1076
1077 let items = repo
1078 .list_task_items_for_cycle(&task_id)
1079 .await
1080 .expect("list items for cycle");
1081 assert!(!items.is_empty());
1082
1083 let status = repo.load_task_status(&task_id).await.expect("load status");
1084 assert_eq!(status.as_deref(), Some("created"));
1085
1086 let name = repo.load_task_name(&task_id).await.expect("load task name");
1087 assert_eq!(name.as_deref(), Some("async-repo"));
1088
1089 let runs = repo
1090 .list_task_log_runs(&task_id, 10)
1091 .await
1092 .expect("list log runs");
1093 assert!(runs.is_empty());
1094 }
1095
1096 #[tokio::test]
1097 async fn async_repository_write_wrappers_update_task_and_item_state() {
1098 let mut fixture = TestState::new();
1099 let (state, task_id) = seed_task(&mut fixture);
1100 let repo = &state.task_repo;
1101 let item_id = first_item_id(&state, &task_id);
1102
1103 repo.mark_task_item_running(&item_id)
1104 .await
1105 .expect("mark item running");
1106 repo.update_task_item_status(&item_id, "qa_failed")
1107 .await
1108 .expect("update item status");
1109 repo.set_task_item_terminal_status(&item_id, "completed")
1110 .await
1111 .expect("set terminal status");
1112 repo.set_task_status(&task_id, "running", false)
1113 .await
1114 .expect("set task status");
1115 repo.update_task_cycle_state(&task_id, 3, true)
1116 .await
1117 .expect("update task cycle state");
1118
1119 let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
1120 let task_row: (String, i64, i64) = conn
1121 .query_row(
1122 "SELECT status, current_cycle, init_done FROM tasks WHERE id = ?1",
1123 rusqlite::params![task_id],
1124 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1125 )
1126 .expect("load task row");
1127 assert_eq!(task_row.0, "running");
1128 assert_eq!(task_row.1, 3);
1129 assert_eq!(task_row.2, 1);
1130
1131 let item_status: String = conn
1132 .query_row(
1133 "SELECT status FROM task_items WHERE id = ?1",
1134 rusqlite::params![item_id],
1135 |row| row.get(0),
1136 )
1137 .expect("load item status");
1138 assert_eq!(item_status, "completed");
1139
1140 repo.set_task_status(&task_id, "paused", false)
1141 .await
1142 .expect("pause task before prepare");
1143 repo.prepare_task_for_start_batch(&task_id)
1144 .await
1145 .expect("prepare task for start");
1146 let prepared_status: String = conn
1147 .query_row(
1148 "SELECT status FROM tasks WHERE id = ?1",
1149 rusqlite::params![task_id],
1150 |row| row.get(0),
1151 )
1152 .expect("reload task status");
1153 assert_eq!(prepared_status, "running");
1154 }
1155
1156 #[tokio::test]
1157 async fn async_repository_insert_and_delete_command_runs() {
1158 let mut fixture = TestState::new();
1159 let (state, task_id) = seed_task(&mut fixture);
1160 let repo = &state.task_repo;
1161 let item_id = first_item_id(&state, &task_id);
1162 let stdout_path = state.data_dir.join("logs/async-wrapper-stdout.log");
1163 let stderr_path = state.data_dir.join("logs/async-wrapper-stderr.log");
1164 std::fs::create_dir_all(
1165 stdout_path
1166 .parent()
1167 .expect("stdout parent directory should exist"),
1168 )
1169 .expect("create logs dir");
1170 std::fs::write(&stdout_path, "stdout").expect("write stdout");
1171 std::fs::write(&stderr_path, "stderr").expect("write stderr");
1172
1173 repo.insert_command_run(NewCommandRun {
1174 id: "async-run-1".to_string(),
1175 task_item_id: item_id,
1176 phase: "qa".to_string(),
1177 command: "echo repo".to_string(),
1178 command_template: None,
1179 cwd: state.data_dir.display().to_string(),
1180 workspace_id: "default".to_string(),
1181 agent_id: "echo".to_string(),
1182 exit_code: 0,
1183 stdout_path: stdout_path.display().to_string(),
1184 stderr_path: stderr_path.display().to_string(),
1185 started_at: crate::config_load::now_ts(),
1186 ended_at: crate::config_load::now_ts(),
1187 interrupted: 0,
1188 output_json: "{}".to_string(),
1189 artifacts_json: "[]".to_string(),
1190 confidence: Some(1.0),
1191 quality_score: Some(1.0),
1192 validation_status: "passed".to_string(),
1193 session_id: None,
1194 machine_output_source: "stdout".to_string(),
1195 output_json_path: None,
1196 command_rule_index: None,
1197 })
1198 .await
1199 .expect("insert command run");
1200
1201 let runs = repo
1202 .list_task_log_runs(&task_id, 10)
1203 .await
1204 .expect("list runs after insert");
1205 assert_eq!(runs.len(), 1);
1206
1207 let paths = repo
1208 .delete_task_and_collect_log_paths(&task_id)
1209 .await
1210 .expect("delete task and collect log paths");
1211 assert_eq!(paths.len(), 2);
1212 assert!(
1213 paths
1214 .iter()
1215 .any(|path| path.ends_with("async-wrapper-stdout.log"))
1216 );
1217 }
1218
1219 #[test]
1220 fn sqlite_repository_graph_debug_wrappers_round_trip() {
1221 let mut fixture = TestState::new();
1222 let (state, task_id) = seed_task(&mut fixture);
1223 let repo =
1224 SqliteTaskRepository::new(types::TaskRepositorySource::from(state.db_path.clone()));
1225
1226 repo.insert_task_graph_run(&NewTaskGraphRun {
1227 graph_run_id: "sync-graph-run".to_string(),
1228 task_id: task_id.clone(),
1229 cycle: 4,
1230 mode: "dynamic_dag".to_string(),
1231 source: "adaptive_planner".to_string(),
1232 status: "materialized".to_string(),
1233 fallback_mode: Some("static_segment".to_string()),
1234 planner_failure_class: None,
1235 planner_failure_message: None,
1236 entry_node_id: Some("qa".to_string()),
1237 node_count: 2,
1238 edge_count: 1,
1239 })
1240 .expect("insert graph run");
1241 repo.update_task_graph_run_status("sync-graph-run", "completed")
1242 .expect("update graph run");
1243 repo.insert_task_graph_snapshot(&NewTaskGraphSnapshot {
1244 graph_run_id: "sync-graph-run".to_string(),
1245 task_id: task_id.clone(),
1246 snapshot_kind: "effective_graph".to_string(),
1247 payload_json: "{\"entry\":\"qa\"}".to_string(),
1248 })
1249 .expect("insert graph snapshot");
1250
1251 let bundles = repo
1252 .load_task_graph_debug_bundles(&task_id)
1253 .expect("load graph bundles");
1254 assert_eq!(bundles.len(), 1);
1255 assert_eq!(bundles[0].graph_run_id, "sync-graph-run");
1256 assert_eq!(bundles[0].status, "completed");
1257 assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"qa\"}");
1258 }
1259
1260 #[tokio::test]
1261 async fn async_repository_graph_debug_wrappers_round_trip() {
1262 let mut fixture = TestState::new();
1263 let (state, task_id) = seed_task(&mut fixture);
1264 let repo = &state.task_repo;
1265
1266 repo.insert_task_graph_run(NewTaskGraphRun {
1267 graph_run_id: "async-graph-run".to_string(),
1268 task_id: task_id.clone(),
1269 cycle: 5,
1270 mode: "dynamic_dag".to_string(),
1271 source: "adaptive_planner".to_string(),
1272 status: "materialized".to_string(),
1273 fallback_mode: Some("deterministic_dag".to_string()),
1274 planner_failure_class: Some("invalid_json".to_string()),
1275 planner_failure_message: Some("planner output broken".to_string()),
1276 entry_node_id: Some("fix".to_string()),
1277 node_count: 3,
1278 edge_count: 2,
1279 })
1280 .await
1281 .expect("insert graph run");
1282 repo.update_task_graph_run_status("async-graph-run", "completed")
1283 .await
1284 .expect("update graph run");
1285 repo.insert_task_graph_snapshot(NewTaskGraphSnapshot {
1286 graph_run_id: "async-graph-run".to_string(),
1287 task_id: task_id.clone(),
1288 snapshot_kind: "effective_graph".to_string(),
1289 payload_json: "{\"entry\":\"fix\"}".to_string(),
1290 })
1291 .await
1292 .expect("insert graph snapshot");
1293
1294 let bundles = repo
1295 .load_task_graph_debug_bundles(&task_id)
1296 .await
1297 .expect("load graph bundles");
1298 assert_eq!(bundles.len(), 1);
1299 assert_eq!(bundles[0].graph_run_id, "async-graph-run");
1300 assert_eq!(
1301 bundles[0].fallback_mode.as_deref(),
1302 Some("deterministic_dag")
1303 );
1304 assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"fix\"}");
1305 }
1306}