Skip to main content

cognee_core/
exec_status.rs

1use async_trait::async_trait;
2use uuid::Uuid;
3
4use crate::task::TaskError;
5/// Per-data-item status tracking for incremental pipeline execution.
6///
7/// The executor queries [`is_completed`](ExecStatusManager::is_completed) before
8/// processing each item and calls [`mark_completed`](ExecStatusManager::mark_completed)
9/// / [`mark_failed`](ExecStatusManager::mark_failed) afterwards.  This enables
10/// safe resume after partial failures and prevents re-processing on re-runs.
11///
12/// Separate from [`PipelineWatcher`](crate::pipeline::PipelineWatcher) by design:
13/// the watcher is a write-only observer, while this trait is bidirectional (the
14/// executor reads `is_completed` to decide whether to skip).
15#[async_trait]
16pub trait ExecStatusManager: Send + Sync {
17    /// Returns `true` if this item was already successfully processed
18    /// for the given `(pipeline_name, dataset_id)` combination.
19    async fn is_completed(
20        &self,
21        data_id: &str,
22        pipeline_name: &str,
23        dataset_id: Option<Uuid>,
24    ) -> Result<bool, TaskError>;
25
26    /// Mark the item as successfully completed.
27    async fn mark_completed(
28        &self,
29        data_id: &str,
30        pipeline_name: &str,
31        dataset_id: Option<Uuid>,
32    ) -> Result<(), TaskError>;
33
34    /// Mark the item as failed (used for diagnostics / resume).
35    async fn mark_failed(
36        &self,
37        data_id: &str,
38        pipeline_name: &str,
39        dataset_id: Option<Uuid>,
40        error: &str,
41    ) -> Result<(), TaskError>;
42
43    /// Record provenance metadata for a processed data point.
44    ///
45    /// Called by the executor after each task succeeds (point 9 — provenance
46    /// stamping).  `node_set` is an opaque label identifying the set of graph
47    /// nodes produced by the task.
48    async fn stamp_provenance(
49        &self,
50        data_id: &str,
51        pipeline_name: &str,
52        task_name: &str,
53        user_id: Option<Uuid>,
54        node_set: Option<&str>,
55    ) -> Result<(), TaskError>;
56}
57/// No-op implementation used when incremental loading is disabled.
58///
59/// `is_completed` always returns `false` (process everything), all writes are
60/// silent successes.
61pub struct NoopExecStatusManager;
62
63#[async_trait]
64impl ExecStatusManager for NoopExecStatusManager {
65    async fn is_completed(
66        &self,
67        _data_id: &str,
68        _pipeline_name: &str,
69        _dataset_id: Option<Uuid>,
70    ) -> Result<bool, TaskError> {
71        Ok(false)
72    }
73
74    async fn mark_completed(
75        &self,
76        _data_id: &str,
77        _pipeline_name: &str,
78        _dataset_id: Option<Uuid>,
79    ) -> Result<(), TaskError> {
80        Ok(())
81    }
82
83    async fn mark_failed(
84        &self,
85        _data_id: &str,
86        _pipeline_name: &str,
87        _dataset_id: Option<Uuid>,
88        _error: &str,
89    ) -> Result<(), TaskError> {
90        Ok(())
91    }
92
93    async fn stamp_provenance(
94        &self,
95        _data_id: &str,
96        _pipeline_name: &str,
97        _task_name: &str,
98        _user_id: Option<Uuid>,
99        _node_set: Option<&str>,
100    ) -> Result<(), TaskError> {
101        Ok(())
102    }
103}