cognee_database/pipelines/
noop.rs1use std::collections::HashMap;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use uuid::Uuid;
13
14use crate::pipelines::repository::{
15 PipelineRunRepository, PipelineRunRow, PipelineRunWithAttributionRow,
16};
17use crate::types::{DatabaseError, PipelineRun, PipelineRunStatus};
18
19#[derive(Debug, Default, Clone, Copy)]
26pub struct NoopPipelineRunRepository;
27
28impl NoopPipelineRunRepository {
29 pub const fn new() -> Self {
31 Self
32 }
33
34 pub fn arc() -> Arc<dyn PipelineRunRepository> {
36 Arc::new(Self)
37 }
38}
39
40#[async_trait]
41impl PipelineRunRepository for NoopPipelineRunRepository {
42 async fn log_pipeline_run(
43 &self,
44 pipeline_run_id: Uuid,
45 _pipeline_id: Uuid,
46 _pipeline_name: &str,
47 _dataset_id: Option<Uuid>,
48 _status: PipelineRunStatus,
49 _run_info: Option<serde_json::Value>,
50 ) -> Result<Uuid, DatabaseError> {
51 Ok(pipeline_run_id)
56 }
57
58 async fn latest_status(
59 &self,
60 _dataset_ids: &[Uuid],
61 _pipeline_name: &str,
62 ) -> Result<HashMap<Uuid, PipelineRunStatus>, DatabaseError> {
63 Ok(HashMap::new())
64 }
65
66 async fn list_recent(
67 &self,
68 _dataset_id: Option<Uuid>,
69 _limit: u32,
70 ) -> Result<Vec<PipelineRunRow>, DatabaseError> {
71 Ok(Vec::new())
72 }
73
74 async fn list_recent_with_attribution(
75 &self,
76 _dataset_id: Option<Uuid>,
77 _limit: u32,
78 ) -> Result<Vec<PipelineRunWithAttributionRow>, DatabaseError> {
79 Ok(Vec::new())
80 }
81
82 async fn reset_orphans(&self, _reason: &str) -> Result<u64, DatabaseError> {
83 Ok(0)
84 }
85
86 async fn set_payload_field(
87 &self,
88 _run_id: Uuid,
89 _key: &str,
90 _value: serde_json::Value,
91 ) -> Result<(), DatabaseError> {
92 Ok(())
93 }
94
95 async fn get_payload(
96 &self,
97 _run_id: Uuid,
98 ) -> Result<serde_json::Map<String, serde_json::Value>, DatabaseError> {
99 Ok(serde_json::Map::new())
100 }
101
102 async fn get_pipeline_run(
103 &self,
104 _pipeline_run_id: Uuid,
105 ) -> Result<Option<PipelineRun>, DatabaseError> {
106 Ok(None)
107 }
108
109 async fn get_pipeline_run_by_dataset(
110 &self,
111 _dataset_id: Uuid,
112 _pipeline_name: &str,
113 ) -> Result<Option<PipelineRun>, DatabaseError> {
114 Ok(None)
115 }
116
117 async fn get_pipeline_runs_by_dataset(
118 &self,
119 _dataset_id: Uuid,
120 ) -> Result<Vec<PipelineRun>, DatabaseError> {
121 Ok(Vec::new())
122 }
123}
124
125#[cfg(test)]
126#[allow(
127 clippy::unwrap_used,
128 clippy::expect_used,
129 reason = "test code — panics are acceptable failures"
130)]
131mod tests {
132 use super::*;
133
134 #[tokio::test]
135 async fn writes_echo_run_id_reads_return_empty() {
136 let repo = NoopPipelineRunRepository::new();
137 let run_id = Uuid::new_v4();
138 let echoed = repo
139 .log_pipeline_run(
140 run_id,
141 Uuid::new_v4(),
142 "test_pipeline",
143 None,
144 PipelineRunStatus::Initiated,
145 None,
146 )
147 .await
148 .expect("log_pipeline_run on noop never fails");
149 assert_eq!(echoed, run_id);
150
151 assert!(
152 repo.latest_status(&[], "p")
153 .await
154 .expect("latest_status")
155 .is_empty()
156 );
157 assert!(
158 repo.list_recent(None, 10)
159 .await
160 .expect("list_recent")
161 .is_empty()
162 );
163 assert!(
164 repo.list_recent_with_attribution(None, 10)
165 .await
166 .expect("list_recent_with_attribution")
167 .is_empty()
168 );
169 assert_eq!(repo.reset_orphans("test").await.expect("reset_orphans"), 0);
170 assert!(
171 repo.get_pipeline_run(Uuid::new_v4())
172 .await
173 .expect("get_pipeline_run")
174 .is_none()
175 );
176 }
177}