1#![cfg_attr(
2 not(test),
3 deny(clippy::panic, clippy::unwrap_used, clippy::expect_used)
4)]
5
6mod command_run;
7pub(crate) mod items;
8mod queries;
9mod state;
10mod trait_def;
11mod types;
12mod write_ops;
13
14#[cfg(test)]
15mod tests;
16
17pub use command_run::NewCommandRun;
18pub use trait_def::TaskRepository;
19pub use types::{
20 DbEventRecord, NewTaskGraphRun, NewTaskGraphSnapshot, TaskLogRunRow, TaskRepositoryConn,
21 TaskRepositorySource, TaskRuntimeRow,
22};
23pub use write_ops::{CompletedRunRecord, InflightRunRecord};
24
25use crate::async_database::{AsyncDatabase, flatten_err};
26use crate::dto::{CommandRunDto, EventDto, TaskGraphDebugBundle, TaskItemDto};
27use anyhow::Result;
28use std::sync::Arc;
29
30pub type TaskDetailRows = (
32 Vec<TaskItemDto>,
33 Vec<CommandRunDto>,
34 Vec<EventDto>,
35 Vec<TaskGraphDebugBundle>,
36);
37
38pub struct SqliteTaskRepository {
40 source: types::TaskRepositorySource,
41}
42
43impl SqliteTaskRepository {
44 pub fn new<T>(source: T) -> Self
46 where
47 T: Into<types::TaskRepositorySource>,
48 {
49 Self {
50 source: source.into(),
51 }
52 }
53
54 fn connection(&self) -> Result<types::TaskRepositoryConn> {
55 self.source.connection()
56 }
57}
58
59impl TaskRepository for SqliteTaskRepository {
60 fn resolve_task_id(&self, task_id_or_prefix: &str) -> Result<String> {
61 let conn = self.connection()?;
62 queries::resolve_task_id(&conn, task_id_or_prefix)
63 }
64
65 fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
66 let conn = self.connection()?;
67 queries::load_task_summary(&conn, task_id)
68 }
69
70 fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
71 let conn = self.connection()?;
72 queries::load_task_detail_rows(&conn, task_id)
73 }
74
75 fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
76 let conn = self.connection()?;
77 queries::load_task_item_counts(&conn, task_id)
78 }
79
80 fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
81 let conn = self.connection()?;
82 queries::list_task_ids_ordered_by_created_desc(&conn)
83 }
84
85 fn find_latest_resumable_task_id(&self, include_pending: bool) -> Result<Option<String>> {
86 let conn = self.connection()?;
87 queries::find_latest_resumable_task_id(&conn, include_pending)
88 }
89
90 fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
91 let conn = self.connection()?;
92 queries::load_task_runtime_row(&conn, task_id)
93 }
94
95 fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
96 let conn = self.connection()?;
97 queries::first_task_item_id(&conn, task_id)
98 }
99
100 fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
101 let conn = self.connection()?;
102 queries::count_unresolved_items(&conn, task_id)
103 }
104
105 fn list_task_items_for_cycle(&self, task_id: &str) -> Result<Vec<crate::dto::TaskItemRow>> {
106 let conn = self.connection()?;
107 queries::list_task_items_for_cycle(&conn, task_id)
108 }
109
110 fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
111 let conn = self.connection()?;
112 queries::load_task_status(&conn, task_id)
113 }
114
115 fn set_task_status(&self, task_id: &str, status: &str, set_completed: bool) -> Result<()> {
116 let conn = self.connection()?;
117 state::set_task_status(&conn, task_id, status, set_completed)
118 }
119
120 fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
121 let conn = self.connection()?;
122 state::prepare_task_for_start_batch(&conn, task_id)
123 }
124
125 fn update_task_cycle_state(
126 &self,
127 task_id: &str,
128 current_cycle: u32,
129 init_done: bool,
130 ) -> Result<()> {
131 let conn = self.connection()?;
132 state::update_task_cycle_state(&conn, task_id, current_cycle, init_done)
133 }
134
135 fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
136 let conn = self.connection()?;
137 items::mark_task_item_running(&conn, task_item_id)
138 }
139
140 fn set_task_item_terminal_status(&self, task_item_id: &str, status: &str) -> Result<()> {
141 let conn = self.connection()?;
142 items::set_task_item_terminal_status(&conn, task_item_id, status)
143 }
144
145 fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
146 let conn = self.connection()?;
147 items::update_task_item_status(&conn, task_item_id, status)
148 }
149
150 fn update_task_item_pipeline_vars(
151 &self,
152 task_item_id: &str,
153 pipeline_vars_json: &str,
154 ) -> Result<()> {
155 let conn = self.connection()?;
156 items::update_task_item_pipeline_vars(&conn, task_item_id, pipeline_vars_json)
157 }
158
159 fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
160 let conn = self.connection()?;
161 queries::load_task_name(&conn, task_id)
162 }
163
164 fn list_task_log_runs(&self, task_id: &str, limit: usize) -> Result<Vec<TaskLogRunRow>> {
165 let conn = self.connection()?;
166 queries::list_task_log_runs(&conn, task_id, limit)
167 }
168
169 fn insert_task_graph_run(&self, run: &NewTaskGraphRun) -> Result<()> {
170 let conn = self.connection()?;
171 queries::insert_task_graph_run(&conn, run)
172 }
173
174 fn update_task_graph_run_status(&self, graph_run_id: &str, status: &str) -> Result<()> {
175 let conn = self.connection()?;
176 queries::update_task_graph_run_status(&conn, graph_run_id, status)
177 }
178
179 fn insert_task_graph_snapshot(&self, snapshot: &NewTaskGraphSnapshot) -> Result<()> {
180 let conn = self.connection()?;
181 queries::insert_task_graph_snapshot(&conn, snapshot)
182 }
183
184 fn load_task_graph_debug_bundles(
185 &self,
186 task_id: &str,
187 ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
188 let conn = self.connection()?;
189 queries::load_task_graph_debug_bundles(&conn, task_id)
190 }
191
192 fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
193 let conn = self.connection()?;
194 items::delete_task_and_collect_log_paths(&conn, task_id)
195 }
196
197 fn insert_command_run(&self, run: &NewCommandRun) -> Result<()> {
198 let conn = self.connection()?;
199 items::insert_command_run(&conn, run)
200 }
201
202 fn insert_event(&self, event: &DbEventRecord) -> Result<()> {
203 let conn = self.connection()?;
204 write_ops::insert_event(&conn, event)
205 }
206
207 fn update_command_run(&self, run: &NewCommandRun) -> Result<()> {
208 let conn = self.connection()?;
209 write_ops::update_command_run(&conn, run)
210 }
211
212 fn update_command_run_with_events(
213 &self,
214 run: &NewCommandRun,
215 events: &[DbEventRecord],
216 ) -> Result<()> {
217 let conn = self.connection()?;
218 write_ops::update_command_run_with_events(&conn, run, events)
219 }
220
221 fn persist_phase_result_with_events(
222 &self,
223 run: &NewCommandRun,
224 events: &[DbEventRecord],
225 ) -> Result<()> {
226 let conn = self.connection()?;
227 write_ops::persist_phase_result_with_events(&conn, run, events)
228 }
229
230 fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
231 let conn = self.connection()?;
232 write_ops::update_command_run_pid(&conn, run_id, pid)
233 }
234
235 fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
236 let conn = self.connection()?;
237 write_ops::find_active_child_pids(&conn, task_id)
238 }
239
240 fn find_inflight_command_runs_for_task(&self, task_id: &str) -> Result<Vec<InflightRunRecord>> {
241 let conn = self.connection()?;
242 write_ops::find_inflight_command_runs_for_task(&conn, task_id)
243 }
244
245 fn find_completed_runs_for_pending_items(
246 &self,
247 task_id: &str,
248 ) -> Result<Vec<write_ops::CompletedRunRecord>> {
249 let conn = self.connection()?;
250 write_ops::find_completed_runs_for_pending_items(&conn, task_id)
251 }
252
253 fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
254 let conn = self.connection()?;
255 queries::count_stale_pending_items(&conn, task_id)
256 }
257
258 fn count_recent_heartbeats_for_items(
259 &self,
260 task_id: &str,
261 item_ids: &[String],
262 cutoff_ts: &str,
263 ) -> Result<i64> {
264 let conn = self.connection()?;
265 write_ops::count_recent_heartbeats_for_items(&conn, task_id, item_ids, cutoff_ts)
266 }
267
268 fn update_task_pipeline_vars(&self, task_id: &str, pipeline_vars_json: &str) -> Result<()> {
269 let conn = self.connection()?;
270 write_ops::update_task_pipeline_vars(&conn, task_id, pipeline_vars_json)
271 }
272
273 fn update_task_item_tickets(
274 &self,
275 task_item_id: &str,
276 ticket_files_json: &str,
277 ticket_content_json: &str,
278 ) -> Result<()> {
279 let conn = self.connection()?;
280 write_ops::update_task_item_tickets(
281 &conn,
282 task_item_id,
283 ticket_files_json,
284 ticket_content_json,
285 )
286 }
287}
288
289pub struct AsyncSqliteTaskRepository {
291 async_db: Arc<AsyncDatabase>,
292}
293
294impl AsyncSqliteTaskRepository {
295 pub fn new(async_db: Arc<AsyncDatabase>) -> Self {
297 Self { async_db }
298 }
299
300 pub async fn resolve_task_id(&self, prefix: &str) -> Result<String> {
304 let prefix = prefix.to_owned();
305 self.async_db
306 .reader()
307 .call(move |conn| {
308 queries::resolve_task_id(conn, &prefix)
309 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
310 })
311 .await
312 .map_err(flatten_err)
313 }
314
315 pub async fn load_task_summary(&self, task_id: &str) -> Result<crate::dto::TaskSummary> {
317 let task_id = task_id.to_owned();
318 self.async_db
319 .reader()
320 .call(move |conn| {
321 queries::load_task_summary(conn, &task_id)
322 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
323 })
324 .await
325 .map_err(flatten_err)
326 }
327
328 pub async fn load_task_detail_rows(&self, task_id: &str) -> Result<TaskDetailRows> {
330 let task_id = task_id.to_owned();
331 self.async_db
332 .reader()
333 .call(move |conn| {
334 queries::load_task_detail_rows(conn, &task_id)
335 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
336 })
337 .await
338 .map_err(flatten_err)
339 }
340
341 pub async fn load_task_item_counts(&self, task_id: &str) -> Result<(i64, i64, i64)> {
343 let task_id = task_id.to_owned();
344 self.async_db
345 .reader()
346 .call(move |conn| {
347 queries::load_task_item_counts(conn, &task_id)
348 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
349 })
350 .await
351 .map_err(flatten_err)
352 }
353
354 pub async fn list_task_ids_ordered_by_created_desc(&self) -> Result<Vec<String>> {
356 self.async_db
357 .reader()
358 .call(move |conn| {
359 queries::list_task_ids_ordered_by_created_desc(conn)
360 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
361 })
362 .await
363 .map_err(flatten_err)
364 }
365
366 pub async fn find_latest_resumable_task_id(
368 &self,
369 include_pending: bool,
370 ) -> Result<Option<String>> {
371 self.async_db
372 .reader()
373 .call(move |conn| {
374 queries::find_latest_resumable_task_id(conn, include_pending)
375 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
376 })
377 .await
378 .map_err(flatten_err)
379 }
380
381 pub async fn load_task_runtime_row(&self, task_id: &str) -> Result<TaskRuntimeRow> {
383 let task_id = task_id.to_owned();
384 self.async_db
385 .reader()
386 .call(move |conn| {
387 queries::load_task_runtime_row(conn, &task_id)
388 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
389 })
390 .await
391 .map_err(flatten_err)
392 }
393
394 pub async fn first_task_item_id(&self, task_id: &str) -> Result<Option<String>> {
396 let task_id = task_id.to_owned();
397 self.async_db
398 .reader()
399 .call(move |conn| {
400 queries::first_task_item_id(conn, &task_id)
401 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
402 })
403 .await
404 .map_err(flatten_err)
405 }
406
407 pub async fn count_unresolved_items(&self, task_id: &str) -> Result<i64> {
409 let task_id = task_id.to_owned();
410 self.async_db
411 .reader()
412 .call(move |conn| {
413 queries::count_unresolved_items(conn, &task_id)
414 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
415 })
416 .await
417 .map_err(flatten_err)
418 }
419
420 pub async fn list_task_items_for_cycle(
422 &self,
423 task_id: &str,
424 ) -> Result<Vec<crate::dto::TaskItemRow>> {
425 let task_id = task_id.to_owned();
426 self.async_db
427 .reader()
428 .call(move |conn| {
429 queries::list_task_items_for_cycle(conn, &task_id)
430 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
431 })
432 .await
433 .map_err(flatten_err)
434 }
435
436 pub async fn load_task_status(&self, task_id: &str) -> Result<Option<String>> {
438 let task_id = task_id.to_owned();
439 self.async_db
440 .reader()
441 .call(move |conn| {
442 queries::load_task_status(conn, &task_id)
443 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
444 })
445 .await
446 .map_err(flatten_err)
447 }
448
449 pub async fn load_task_name(&self, task_id: &str) -> Result<Option<String>> {
451 let task_id = task_id.to_owned();
452 self.async_db
453 .reader()
454 .call(move |conn| {
455 queries::load_task_name(conn, &task_id)
456 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
457 })
458 .await
459 .map_err(flatten_err)
460 }
461
462 pub async fn list_task_log_runs(
464 &self,
465 task_id: &str,
466 limit: usize,
467 ) -> Result<Vec<TaskLogRunRow>> {
468 let task_id = task_id.to_owned();
469 self.async_db
470 .reader()
471 .call(move |conn| {
472 queries::list_task_log_runs(conn, &task_id, limit)
473 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
474 })
475 .await
476 .map_err(flatten_err)
477 }
478
479 pub async fn load_task_graph_debug_bundles(
481 &self,
482 task_id: &str,
483 ) -> Result<Vec<crate::dto::TaskGraphDebugBundle>> {
484 let task_id = task_id.to_owned();
485 self.async_db
486 .reader()
487 .call(move |conn| {
488 queries::load_task_graph_debug_bundles(conn, &task_id)
489 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
490 })
491 .await
492 .map_err(flatten_err)
493 }
494
495 pub async fn set_task_status(
499 &self,
500 task_id: &str,
501 status: &str,
502 set_completed: bool,
503 ) -> Result<()> {
504 let task_id = task_id.to_owned();
505 let status = status.to_owned();
506 self.async_db
507 .writer()
508 .call(move |conn| {
509 state::set_task_status(conn, &task_id, &status, set_completed)
510 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
511 })
512 .await
513 .map_err(flatten_err)
514 }
515
516 pub async fn reset_unresolved_items(&self, task_id: &str) -> Result<()> {
519 let task_id = task_id.to_owned();
520 self.async_db
521 .writer()
522 .call(move |conn| {
523 state::reset_unresolved_items(conn, &task_id)
524 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
525 })
526 .await
527 .map_err(flatten_err)
528 }
529
530 pub async fn prepare_task_for_start_batch(&self, task_id: &str) -> Result<()> {
532 let task_id = task_id.to_owned();
533 self.async_db
534 .writer()
535 .call(move |conn| {
536 state::prepare_task_for_start_batch(conn, &task_id)
537 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
538 })
539 .await
540 .map_err(flatten_err)
541 }
542
543 pub async fn update_task_cycle_state(
545 &self,
546 task_id: &str,
547 current_cycle: u32,
548 init_done: bool,
549 ) -> Result<()> {
550 let task_id = task_id.to_owned();
551 self.async_db
552 .writer()
553 .call(move |conn| {
554 state::update_task_cycle_state(conn, &task_id, current_cycle, init_done)
555 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
556 })
557 .await
558 .map_err(flatten_err)
559 }
560
561 pub async fn mark_task_item_running(&self, task_item_id: &str) -> Result<()> {
563 let task_item_id = task_item_id.to_owned();
564 self.async_db
565 .writer()
566 .call(move |conn| {
567 items::mark_task_item_running(conn, &task_item_id)
568 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
569 })
570 .await
571 .map_err(flatten_err)
572 }
573
574 pub async fn set_task_item_terminal_status(
576 &self,
577 task_item_id: &str,
578 status: &str,
579 ) -> Result<()> {
580 let task_item_id = task_item_id.to_owned();
581 let status = status.to_owned();
582 self.async_db
583 .writer()
584 .call(move |conn| {
585 items::set_task_item_terminal_status(conn, &task_item_id, &status)
586 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
587 })
588 .await
589 .map_err(flatten_err)
590 }
591
592 pub async fn update_task_item_status(&self, task_item_id: &str, status: &str) -> Result<()> {
594 let task_item_id = task_item_id.to_owned();
595 let status = status.to_owned();
596 self.async_db
597 .writer()
598 .call(move |conn| {
599 items::update_task_item_status(conn, &task_item_id, &status)
600 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
601 })
602 .await
603 .map_err(flatten_err)
604 }
605
606 pub async fn update_task_item_pipeline_vars(
608 &self,
609 task_item_id: &str,
610 pipeline_vars_json: &str,
611 ) -> Result<()> {
612 let task_item_id = task_item_id.to_owned();
613 let pipeline_vars_json = pipeline_vars_json.to_owned();
614 self.async_db
615 .writer()
616 .call(move |conn| {
617 items::update_task_item_pipeline_vars(conn, &task_item_id, &pipeline_vars_json)
618 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
619 })
620 .await
621 .map_err(flatten_err)
622 }
623
624 pub async fn delete_task_and_collect_log_paths(&self, task_id: &str) -> Result<Vec<String>> {
626 let task_id = task_id.to_owned();
627 self.async_db
628 .writer()
629 .call(move |conn| {
630 items::delete_task_and_collect_log_paths(conn, &task_id)
631 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
632 })
633 .await
634 .map_err(flatten_err)
635 }
636
637 pub async fn insert_command_run(&self, run: NewCommandRun) -> Result<()> {
639 self.async_db
640 .writer()
641 .call(move |conn| {
642 items::insert_command_run(conn, &run)
643 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
644 })
645 .await
646 .map_err(flatten_err)
647 }
648
649 pub async fn insert_event(&self, event: DbEventRecord) -> Result<()> {
651 self.async_db
652 .writer()
653 .call(move |conn| {
654 write_ops::insert_event(conn, &event)
655 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
656 })
657 .await
658 .map_err(flatten_err)
659 }
660
661 pub async fn update_command_run(&self, run: NewCommandRun) -> Result<()> {
663 self.async_db
664 .writer()
665 .call(move |conn| {
666 write_ops::update_command_run(conn, &run)
667 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
668 })
669 .await
670 .map_err(flatten_err)
671 }
672
673 pub async fn update_command_run_with_events(
675 &self,
676 run: NewCommandRun,
677 events: Vec<DbEventRecord>,
678 ) -> Result<()> {
679 self.async_db
680 .writer()
681 .call(move |conn| {
682 write_ops::update_command_run_with_events(conn, &run, &events)
683 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
684 })
685 .await
686 .map_err(flatten_err)
687 }
688
689 pub async fn persist_phase_result_with_events(
691 &self,
692 run: NewCommandRun,
693 events: Vec<DbEventRecord>,
694 ) -> Result<()> {
695 self.async_db
696 .writer()
697 .call(move |conn| {
698 write_ops::persist_phase_result_with_events(conn, &run, &events)
699 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
700 })
701 .await
702 .map_err(flatten_err)
703 }
704
705 pub async fn update_command_run_pid(&self, run_id: &str, pid: i64) -> Result<()> {
707 let run_id = run_id.to_owned();
708 self.async_db
709 .writer()
710 .call(move |conn| {
711 write_ops::update_command_run_pid(conn, &run_id, pid)
712 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
713 })
714 .await
715 .map_err(flatten_err)
716 }
717
718 pub async fn find_active_child_pids(&self, task_id: &str) -> Result<Vec<i64>> {
720 let task_id = task_id.to_owned();
721 self.async_db
722 .reader()
723 .call(move |conn| {
724 write_ops::find_active_child_pids(conn, &task_id)
725 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
726 })
727 .await
728 .map_err(flatten_err)
729 }
730
731 pub async fn find_inflight_command_runs_for_task(
733 &self,
734 task_id: &str,
735 ) -> Result<Vec<InflightRunRecord>> {
736 let task_id = task_id.to_owned();
737 self.async_db
738 .reader()
739 .call(move |conn| {
740 write_ops::find_inflight_command_runs_for_task(conn, &task_id)
741 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
742 })
743 .await
744 .map_err(flatten_err)
745 }
746
747 pub async fn find_completed_runs_for_pending_items(
749 &self,
750 task_id: &str,
751 ) -> Result<Vec<write_ops::CompletedRunRecord>> {
752 let task_id = task_id.to_owned();
753 self.async_db
754 .reader()
755 .call(move |conn| {
756 write_ops::find_completed_runs_for_pending_items(conn, &task_id)
757 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
758 })
759 .await
760 .map_err(flatten_err)
761 }
762
763 pub async fn count_stale_pending_items(&self, task_id: &str) -> Result<i64> {
765 let task_id = task_id.to_owned();
766 self.async_db
767 .reader()
768 .call(move |conn| {
769 queries::count_stale_pending_items(conn, &task_id)
770 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
771 })
772 .await
773 .map_err(flatten_err)
774 }
775
776 pub async fn count_recent_heartbeats_for_items(
778 &self,
779 task_id: &str,
780 item_ids: &[String],
781 cutoff_ts: &str,
782 ) -> Result<i64> {
783 let task_id = task_id.to_owned();
784 let item_ids = item_ids.to_vec();
785 let cutoff_ts = cutoff_ts.to_owned();
786 self.async_db
787 .reader()
788 .call(move |conn| {
789 write_ops::count_recent_heartbeats_for_items(conn, &task_id, &item_ids, &cutoff_ts)
790 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
791 })
792 .await
793 .map_err(flatten_err)
794 }
795
796 pub async fn update_task_pipeline_vars(
798 &self,
799 task_id: &str,
800 pipeline_vars_json: &str,
801 ) -> Result<()> {
802 let task_id = task_id.to_owned();
803 let pipeline_vars_json = pipeline_vars_json.to_owned();
804 self.async_db
805 .writer()
806 .call(move |conn| {
807 write_ops::update_task_pipeline_vars(conn, &task_id, &pipeline_vars_json)
808 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
809 })
810 .await
811 .map_err(flatten_err)
812 }
813
814 pub async fn update_task_item_tickets(
816 &self,
817 task_item_id: &str,
818 ticket_files_json: &str,
819 ticket_content_json: &str,
820 ) -> Result<()> {
821 let task_item_id = task_item_id.to_owned();
822 let ticket_files_json = ticket_files_json.to_owned();
823 let ticket_content_json = ticket_content_json.to_owned();
824 self.async_db
825 .writer()
826 .call(move |conn| {
827 write_ops::update_task_item_tickets(
828 conn,
829 &task_item_id,
830 &ticket_files_json,
831 &ticket_content_json,
832 )
833 .map_err(|err| tokio_rusqlite::Error::Other(err.into()))
834 })
835 .await
836 .map_err(flatten_err)
837 }
838
839 pub async fn insert_task_graph_run(&self, run: NewTaskGraphRun) -> Result<()> {
841 self.async_db
842 .writer()
843 .call(move |conn| {
844 queries::insert_task_graph_run(conn, &run)
845 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
846 })
847 .await
848 .map_err(flatten_err)
849 }
850
851 pub async fn update_task_graph_run_status(
853 &self,
854 graph_run_id: &str,
855 status: &str,
856 ) -> Result<()> {
857 let graph_run_id = graph_run_id.to_owned();
858 let status = status.to_owned();
859 self.async_db
860 .writer()
861 .call(move |conn| {
862 queries::update_task_graph_run_status(conn, &graph_run_id, &status)
863 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
864 })
865 .await
866 .map_err(flatten_err)
867 }
868
869 pub async fn pause_all_running_tasks_and_items(&self) -> Result<usize> {
872 self.async_db
873 .writer()
874 .call(move |conn| {
875 state::pause_all_running_tasks_and_items(conn)
876 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
877 })
878 .await
879 .map_err(flatten_err)
880 }
881
882 pub async fn recover_orphaned_running_items(&self) -> Result<Vec<(String, Vec<String>)>> {
884 self.async_db
885 .writer()
886 .call(move |conn| {
887 state::recover_orphaned_running_items(conn)
888 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
889 })
890 .await
891 .map_err(flatten_err)
892 }
893
894 pub async fn recover_orphaned_running_items_for_task(
896 &self,
897 task_id: &str,
898 ) -> Result<Vec<String>> {
899 let task_id = task_id.to_owned();
900 self.async_db
901 .writer()
902 .call(move |conn| {
903 state::recover_orphaned_running_items_for_task(conn, &task_id)
904 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
905 })
906 .await
907 .map_err(flatten_err)
908 }
909
910 pub async fn recover_stalled_running_items(
912 &self,
913 stall_threshold_secs: u64,
914 ) -> Result<Vec<(String, Vec<String>)>> {
915 self.async_db
916 .writer()
917 .call(move |conn| {
918 state::recover_stalled_running_items(conn, stall_threshold_secs)
919 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
920 })
921 .await
922 .map_err(flatten_err)
923 }
924
925 pub async fn insert_task_graph_snapshot(&self, snapshot: NewTaskGraphSnapshot) -> Result<()> {
927 self.async_db
928 .writer()
929 .call(move |conn| {
930 queries::insert_task_graph_snapshot(conn, &snapshot)
931 .map_err(|e| tokio_rusqlite::Error::Other(e.into()))
932 })
933 .await
934 .map_err(flatten_err)
935 }
936}
937
938#[cfg(test)]
939mod async_wrapper_tests {
940 use super::*;
941 use crate::dto::CreateTaskPayload;
942 use crate::task_ops::create_task_impl;
943 use crate::test_utils::TestState;
944
945 fn seed_task(fixture: &mut TestState) -> (std::sync::Arc<crate::state::InnerState>, String) {
946 let state = fixture.build();
947 let qa_file = state
948 .data_dir
949 .join("workspace/default/docs/qa/async-repo.md");
950 std::fs::write(&qa_file, "# async repo\n").expect("seed qa file");
951 let created = create_task_impl(
952 &state,
953 CreateTaskPayload {
954 name: Some("async-repo".to_string()),
955 goal: Some("async-repo-goal".to_string()),
956 ..CreateTaskPayload::default()
957 },
958 )
959 .expect("create task");
960 (state, created.id)
961 }
962
963 fn first_item_id(state: &crate::state::InnerState, task_id: &str) -> String {
964 let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
965 conn.query_row(
966 "SELECT id FROM task_items WHERE task_id = ?1 ORDER BY order_no LIMIT 1",
967 rusqlite::params![task_id],
968 |row| row.get(0),
969 )
970 .expect("load item id")
971 }
972
973 #[tokio::test]
974 async fn async_repository_read_wrappers_round_trip() {
975 let mut fixture = TestState::new();
976 let (state, task_id) = seed_task(&mut fixture);
977 let repo = &state.task_repo;
978
979 let resolved = repo
980 .resolve_task_id(&task_id[..8])
981 .await
982 .expect("resolve task id");
983 assert_eq!(resolved, task_id);
984
985 let summary = repo
986 .load_task_summary(&task_id)
987 .await
988 .expect("load summary");
989 assert_eq!(summary.name, "async-repo");
990
991 let detail = repo
992 .load_task_detail_rows(&task_id)
993 .await
994 .expect("load detail rows");
995 assert!(!detail.0.is_empty());
996
997 let counts = repo
998 .load_task_item_counts(&task_id)
999 .await
1000 .expect("load item counts");
1001 assert!(counts.0 >= 1);
1002
1003 let ids = repo
1004 .list_task_ids_ordered_by_created_desc()
1005 .await
1006 .expect("list task ids");
1007 assert_eq!(ids[0], task_id);
1008
1009 let resumable = repo
1010 .find_latest_resumable_task_id(true)
1011 .await
1012 .expect("find latest resumable");
1013 assert_eq!(resumable.as_deref(), Some(task_id.as_str()));
1014
1015 let runtime = repo
1016 .load_task_runtime_row(&task_id)
1017 .await
1018 .expect("load runtime row");
1019 assert_eq!(runtime.goal, "async-repo-goal");
1020
1021 let item_id = repo
1022 .first_task_item_id(&task_id)
1023 .await
1024 .expect("first item id query");
1025 assert!(item_id.is_some());
1026
1027 let unresolved = repo
1028 .count_unresolved_items(&task_id)
1029 .await
1030 .expect("count unresolved");
1031 assert_eq!(unresolved, 0);
1032
1033 let items = repo
1034 .list_task_items_for_cycle(&task_id)
1035 .await
1036 .expect("list items for cycle");
1037 assert!(!items.is_empty());
1038
1039 let status = repo.load_task_status(&task_id).await.expect("load status");
1040 assert_eq!(status.as_deref(), Some("created"));
1041
1042 let name = repo.load_task_name(&task_id).await.expect("load task name");
1043 assert_eq!(name.as_deref(), Some("async-repo"));
1044
1045 let runs = repo
1046 .list_task_log_runs(&task_id, 10)
1047 .await
1048 .expect("list log runs");
1049 assert!(runs.is_empty());
1050 }
1051
1052 #[tokio::test]
1053 async fn async_repository_write_wrappers_update_task_and_item_state() {
1054 let mut fixture = TestState::new();
1055 let (state, task_id) = seed_task(&mut fixture);
1056 let repo = &state.task_repo;
1057 let item_id = first_item_id(&state, &task_id);
1058
1059 repo.mark_task_item_running(&item_id)
1060 .await
1061 .expect("mark item running");
1062 repo.update_task_item_status(&item_id, "qa_failed")
1063 .await
1064 .expect("update item status");
1065 repo.set_task_item_terminal_status(&item_id, "completed")
1066 .await
1067 .expect("set terminal status");
1068 repo.set_task_status(&task_id, "running", false)
1069 .await
1070 .expect("set task status");
1071 repo.update_task_cycle_state(&task_id, 3, true)
1072 .await
1073 .expect("update task cycle state");
1074
1075 let conn = crate::db::open_conn(&state.db_path).expect("open sqlite");
1076 let task_row: (String, i64, i64) = conn
1077 .query_row(
1078 "SELECT status, current_cycle, init_done FROM tasks WHERE id = ?1",
1079 rusqlite::params![task_id],
1080 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1081 )
1082 .expect("load task row");
1083 assert_eq!(task_row.0, "running");
1084 assert_eq!(task_row.1, 3);
1085 assert_eq!(task_row.2, 1);
1086
1087 let item_status: String = conn
1088 .query_row(
1089 "SELECT status FROM task_items WHERE id = ?1",
1090 rusqlite::params![item_id],
1091 |row| row.get(0),
1092 )
1093 .expect("load item status");
1094 assert_eq!(item_status, "completed");
1095
1096 repo.set_task_status(&task_id, "paused", false)
1097 .await
1098 .expect("pause task before prepare");
1099 repo.prepare_task_for_start_batch(&task_id)
1100 .await
1101 .expect("prepare task for start");
1102 let prepared_status: String = conn
1103 .query_row(
1104 "SELECT status FROM tasks WHERE id = ?1",
1105 rusqlite::params![task_id],
1106 |row| row.get(0),
1107 )
1108 .expect("reload task status");
1109 assert_eq!(prepared_status, "running");
1110 }
1111
1112 #[tokio::test]
1113 async fn async_repository_insert_and_delete_command_runs() {
1114 let mut fixture = TestState::new();
1115 let (state, task_id) = seed_task(&mut fixture);
1116 let repo = &state.task_repo;
1117 let item_id = first_item_id(&state, &task_id);
1118 let stdout_path = state.data_dir.join("logs/async-wrapper-stdout.log");
1119 let stderr_path = state.data_dir.join("logs/async-wrapper-stderr.log");
1120 std::fs::create_dir_all(
1121 stdout_path
1122 .parent()
1123 .expect("stdout parent directory should exist"),
1124 )
1125 .expect("create logs dir");
1126 std::fs::write(&stdout_path, "stdout").expect("write stdout");
1127 std::fs::write(&stderr_path, "stderr").expect("write stderr");
1128
1129 repo.insert_command_run(NewCommandRun {
1130 id: "async-run-1".to_string(),
1131 task_item_id: item_id,
1132 phase: "qa".to_string(),
1133 command: "echo repo".to_string(),
1134 command_template: None,
1135 cwd: state.data_dir.display().to_string(),
1136 workspace_id: "default".to_string(),
1137 agent_id: "echo".to_string(),
1138 exit_code: 0,
1139 stdout_path: stdout_path.display().to_string(),
1140 stderr_path: stderr_path.display().to_string(),
1141 started_at: crate::config_load::now_ts(),
1142 ended_at: crate::config_load::now_ts(),
1143 interrupted: 0,
1144 output_json: "{}".to_string(),
1145 artifacts_json: "[]".to_string(),
1146 confidence: Some(1.0),
1147 quality_score: Some(1.0),
1148 validation_status: "passed".to_string(),
1149 session_id: None,
1150 machine_output_source: "stdout".to_string(),
1151 output_json_path: None,
1152 })
1153 .await
1154 .expect("insert command run");
1155
1156 let runs = repo
1157 .list_task_log_runs(&task_id, 10)
1158 .await
1159 .expect("list runs after insert");
1160 assert_eq!(runs.len(), 1);
1161
1162 let paths = repo
1163 .delete_task_and_collect_log_paths(&task_id)
1164 .await
1165 .expect("delete task and collect log paths");
1166 assert_eq!(paths.len(), 2);
1167 assert!(
1168 paths
1169 .iter()
1170 .any(|path| path.ends_with("async-wrapper-stdout.log"))
1171 );
1172 }
1173
1174 #[test]
1175 fn sqlite_repository_graph_debug_wrappers_round_trip() {
1176 let mut fixture = TestState::new();
1177 let (state, task_id) = seed_task(&mut fixture);
1178 let repo =
1179 SqliteTaskRepository::new(types::TaskRepositorySource::from(state.db_path.clone()));
1180
1181 repo.insert_task_graph_run(&NewTaskGraphRun {
1182 graph_run_id: "sync-graph-run".to_string(),
1183 task_id: task_id.clone(),
1184 cycle: 4,
1185 mode: "dynamic_dag".to_string(),
1186 source: "adaptive_planner".to_string(),
1187 status: "materialized".to_string(),
1188 fallback_mode: Some("static_segment".to_string()),
1189 planner_failure_class: None,
1190 planner_failure_message: None,
1191 entry_node_id: Some("qa".to_string()),
1192 node_count: 2,
1193 edge_count: 1,
1194 })
1195 .expect("insert graph run");
1196 repo.update_task_graph_run_status("sync-graph-run", "completed")
1197 .expect("update graph run");
1198 repo.insert_task_graph_snapshot(&NewTaskGraphSnapshot {
1199 graph_run_id: "sync-graph-run".to_string(),
1200 task_id: task_id.clone(),
1201 snapshot_kind: "effective_graph".to_string(),
1202 payload_json: "{\"entry\":\"qa\"}".to_string(),
1203 })
1204 .expect("insert graph snapshot");
1205
1206 let bundles = repo
1207 .load_task_graph_debug_bundles(&task_id)
1208 .expect("load graph bundles");
1209 assert_eq!(bundles.len(), 1);
1210 assert_eq!(bundles[0].graph_run_id, "sync-graph-run");
1211 assert_eq!(bundles[0].status, "completed");
1212 assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"qa\"}");
1213 }
1214
1215 #[tokio::test]
1216 async fn async_repository_graph_debug_wrappers_round_trip() {
1217 let mut fixture = TestState::new();
1218 let (state, task_id) = seed_task(&mut fixture);
1219 let repo = &state.task_repo;
1220
1221 repo.insert_task_graph_run(NewTaskGraphRun {
1222 graph_run_id: "async-graph-run".to_string(),
1223 task_id: task_id.clone(),
1224 cycle: 5,
1225 mode: "dynamic_dag".to_string(),
1226 source: "adaptive_planner".to_string(),
1227 status: "materialized".to_string(),
1228 fallback_mode: Some("deterministic_dag".to_string()),
1229 planner_failure_class: Some("invalid_json".to_string()),
1230 planner_failure_message: Some("planner output broken".to_string()),
1231 entry_node_id: Some("fix".to_string()),
1232 node_count: 3,
1233 edge_count: 2,
1234 })
1235 .await
1236 .expect("insert graph run");
1237 repo.update_task_graph_run_status("async-graph-run", "completed")
1238 .await
1239 .expect("update graph run");
1240 repo.insert_task_graph_snapshot(NewTaskGraphSnapshot {
1241 graph_run_id: "async-graph-run".to_string(),
1242 task_id: task_id.clone(),
1243 snapshot_kind: "effective_graph".to_string(),
1244 payload_json: "{\"entry\":\"fix\"}".to_string(),
1245 })
1246 .await
1247 .expect("insert graph snapshot");
1248
1249 let bundles = repo
1250 .load_task_graph_debug_bundles(&task_id)
1251 .await
1252 .expect("load graph bundles");
1253 assert_eq!(bundles.len(), 1);
1254 assert_eq!(bundles[0].graph_run_id, "async-graph-run");
1255 assert_eq!(
1256 bundles[0].fallback_mode.as_deref(),
1257 Some("deterministic_dag")
1258 );
1259 assert_eq!(bundles[0].effective_graph_json, "{\"entry\":\"fix\"}");
1260 }
1261}