cognee_core/exec_status.rs
1use async_trait::async_trait;
2use uuid::Uuid;
3
4use crate::task::TaskError;
5/// Per-data-item status tracking for incremental pipeline execution.
6///
7/// The executor queries [`is_completed`](ExecStatusManager::is_completed) before
8/// processing each item and calls [`mark_completed`](ExecStatusManager::mark_completed)
9/// / [`mark_failed`](ExecStatusManager::mark_failed) afterwards. This enables
10/// safe resume after partial failures and prevents re-processing on re-runs.
11///
12/// Separate from [`PipelineWatcher`](crate::pipeline::PipelineWatcher) by design:
13/// the watcher is a write-only observer, while this trait is bidirectional (the
14/// executor reads `is_completed` to decide whether to skip).
15#[async_trait]
16pub trait ExecStatusManager: Send + Sync {
17 /// Returns `true` if this item was already successfully processed
18 /// for the given `(pipeline_name, dataset_id)` combination.
19 async fn is_completed(
20 &self,
21 data_id: &str,
22 pipeline_name: &str,
23 dataset_id: Option<Uuid>,
24 ) -> Result<bool, TaskError>;
25
26 /// Mark the item as successfully completed.
27 async fn mark_completed(
28 &self,
29 data_id: &str,
30 pipeline_name: &str,
31 dataset_id: Option<Uuid>,
32 ) -> Result<(), TaskError>;
33
34 /// Mark the item as failed (used for diagnostics / resume).
35 async fn mark_failed(
36 &self,
37 data_id: &str,
38 pipeline_name: &str,
39 dataset_id: Option<Uuid>,
40 error: &str,
41 ) -> Result<(), TaskError>;
42
43 /// Record provenance metadata for a processed data point.
44 ///
45 /// Called by the executor after each task succeeds (point 9 — provenance
46 /// stamping). `node_set` is an opaque label identifying the set of graph
47 /// nodes produced by the task.
48 async fn stamp_provenance(
49 &self,
50 data_id: &str,
51 pipeline_name: &str,
52 task_name: &str,
53 user_id: Option<Uuid>,
54 node_set: Option<&str>,
55 ) -> Result<(), TaskError>;
56}
57/// No-op implementation used when incremental loading is disabled.
58///
59/// `is_completed` always returns `false` (process everything), all writes are
60/// silent successes.
61pub struct NoopExecStatusManager;
62
63#[async_trait]
64impl ExecStatusManager for NoopExecStatusManager {
65 async fn is_completed(
66 &self,
67 _data_id: &str,
68 _pipeline_name: &str,
69 _dataset_id: Option<Uuid>,
70 ) -> Result<bool, TaskError> {
71 Ok(false)
72 }
73
74 async fn mark_completed(
75 &self,
76 _data_id: &str,
77 _pipeline_name: &str,
78 _dataset_id: Option<Uuid>,
79 ) -> Result<(), TaskError> {
80 Ok(())
81 }
82
83 async fn mark_failed(
84 &self,
85 _data_id: &str,
86 _pipeline_name: &str,
87 _dataset_id: Option<Uuid>,
88 _error: &str,
89 ) -> Result<(), TaskError> {
90 Ok(())
91 }
92
93 async fn stamp_provenance(
94 &self,
95 _data_id: &str,
96 _pipeline_name: &str,
97 _task_name: &str,
98 _user_id: Option<Uuid>,
99 _node_set: Option<&str>,
100 ) -> Result<(), TaskError> {
101 Ok(())
102 }
103}