Skip to main content

cognee_database/pipelines/
noop.rs

1//! In-memory no-op [`PipelineRunRepository`] for embedded uses without a DB.
2//!
3//! Library convenience functions (`cognify`, `memify`, `AddPipeline::add`)
4//! now take an `Arc<dyn PipelineRunRepository>` (gap 08-07). Embedded users
5//! (no SQL database wired) construct `Arc::new(NoopPipelineRunRepository::new())`
6//! to satisfy the parameter without paying for any writes.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use uuid::Uuid;
13
14use crate::pipelines::repository::{
15    PipelineRunRepository, PipelineRunRow, PipelineRunWithAttributionRow,
16};
17use crate::types::{DatabaseError, PipelineRun, PipelineRunStatus};
18
19/// `PipelineRunRepository` that ignores all writes and returns empty
20/// results for reads.
21///
22/// Suitable for tests and embedded library users that don't have a SQL
23/// database. All write methods return `Ok(...)`; all read methods return
24/// empty results (`None`, `Vec::new()`, empty `HashMap`/`Map`).
25#[derive(Debug, Default, Clone, Copy)]
26pub struct NoopPipelineRunRepository;
27
28impl NoopPipelineRunRepository {
29    /// Construct a new no-op repository.
30    pub const fn new() -> Self {
31        Self
32    }
33
34    /// Convenience: return as `Arc<dyn PipelineRunRepository>`.
35    pub fn arc() -> Arc<dyn PipelineRunRepository> {
36        Arc::new(Self)
37    }
38}
39
40#[async_trait]
41impl PipelineRunRepository for NoopPipelineRunRepository {
42    async fn log_pipeline_run(
43        &self,
44        pipeline_run_id: Uuid,
45        _pipeline_id: Uuid,
46        _pipeline_name: &str,
47        _dataset_id: Option<Uuid>,
48        _status: PipelineRunStatus,
49        _run_info: Option<serde_json::Value>,
50    ) -> Result<Uuid, DatabaseError> {
51        // Mirror the original http-server `NoOpPipelineRunRepository`
52        // behaviour: echo the caller's `pipeline_run_id` so logical run
53        // identity (used as the slot key in `DefaultPipelineRunRegistry`)
54        // round-trips even when persistence is disabled.
55        Ok(pipeline_run_id)
56    }
57
58    async fn latest_status(
59        &self,
60        _dataset_ids: &[Uuid],
61        _pipeline_name: &str,
62    ) -> Result<HashMap<Uuid, PipelineRunStatus>, DatabaseError> {
63        Ok(HashMap::new())
64    }
65
66    async fn list_recent(
67        &self,
68        _dataset_id: Option<Uuid>,
69        _limit: u32,
70    ) -> Result<Vec<PipelineRunRow>, DatabaseError> {
71        Ok(Vec::new())
72    }
73
74    async fn list_recent_with_attribution(
75        &self,
76        _dataset_id: Option<Uuid>,
77        _limit: u32,
78    ) -> Result<Vec<PipelineRunWithAttributionRow>, DatabaseError> {
79        Ok(Vec::new())
80    }
81
82    async fn reset_orphans(&self, _reason: &str) -> Result<u64, DatabaseError> {
83        Ok(0)
84    }
85
86    async fn set_payload_field(
87        &self,
88        _run_id: Uuid,
89        _key: &str,
90        _value: serde_json::Value,
91    ) -> Result<(), DatabaseError> {
92        Ok(())
93    }
94
95    async fn get_payload(
96        &self,
97        _run_id: Uuid,
98    ) -> Result<serde_json::Map<String, serde_json::Value>, DatabaseError> {
99        Ok(serde_json::Map::new())
100    }
101
102    async fn get_pipeline_run(
103        &self,
104        _pipeline_run_id: Uuid,
105    ) -> Result<Option<PipelineRun>, DatabaseError> {
106        Ok(None)
107    }
108
109    async fn get_pipeline_run_by_dataset(
110        &self,
111        _dataset_id: Uuid,
112        _pipeline_name: &str,
113    ) -> Result<Option<PipelineRun>, DatabaseError> {
114        Ok(None)
115    }
116
117    async fn get_pipeline_runs_by_dataset(
118        &self,
119        _dataset_id: Uuid,
120    ) -> Result<Vec<PipelineRun>, DatabaseError> {
121        Ok(Vec::new())
122    }
123}
124
125#[cfg(test)]
126#[allow(
127    clippy::unwrap_used,
128    clippy::expect_used,
129    reason = "test code — panics are acceptable failures"
130)]
131mod tests {
132    use super::*;
133
134    #[tokio::test]
135    async fn writes_echo_run_id_reads_return_empty() {
136        let repo = NoopPipelineRunRepository::new();
137        let run_id = Uuid::new_v4();
138        let echoed = repo
139            .log_pipeline_run(
140                run_id,
141                Uuid::new_v4(),
142                "test_pipeline",
143                None,
144                PipelineRunStatus::Initiated,
145                None,
146            )
147            .await
148            .expect("log_pipeline_run on noop never fails");
149        assert_eq!(echoed, run_id);
150
151        assert!(
152            repo.latest_status(&[], "p")
153                .await
154                .expect("latest_status")
155                .is_empty()
156        );
157        assert!(
158            repo.list_recent(None, 10)
159                .await
160                .expect("list_recent")
161                .is_empty()
162        );
163        assert!(
164            repo.list_recent_with_attribution(None, 10)
165                .await
166                .expect("list_recent_with_attribution")
167                .is_empty()
168        );
169        assert_eq!(repo.reset_orphans("test").await.expect("reset_orphans"), 0);
170        assert!(
171            repo.get_pipeline_run(Uuid::new_v4())
172                .await
173                .expect("get_pipeline_run")
174                .is_none()
175        );
176    }
177}