Skip to main content

cognee_database/traits/
delete_db.rs

1use async_trait::async_trait;
2use cognee_models::{Data, Dataset};
3use sea_orm::DatabaseConnection;
4use uuid::Uuid;
5
6use crate::ops::{data, datasets, graph_storage, pipeline_runs, search_history};
7use crate::types::{DatabaseError, GraphEdge, GraphNode};
8
9#[async_trait]
10pub trait DeleteDb: Send + Sync {
11    async fn get_data(&self, id: Uuid) -> Result<Option<Data>, DatabaseError>;
12    async fn delete_data(&self, id: Uuid) -> Result<(), DatabaseError>;
13    async fn count_data_dataset_links(&self, data_id: Uuid) -> Result<usize, DatabaseError>;
14    async fn list_datasets_for_data(&self, data_id: Uuid) -> Result<Vec<Dataset>, DatabaseError>;
15
16    async fn get_dataset_by_name(
17        &self,
18        name: &str,
19        owner_id: Uuid,
20        tenant_id: Option<Uuid>,
21    ) -> Result<Option<Dataset>, DatabaseError>;
22    async fn get_dataset_data(&self, dataset_id: Uuid) -> Result<Vec<Data>, DatabaseError>;
23    /// Count the number of data items linked to a dataset without loading them.
24    async fn count_dataset_data(&self, dataset_id: Uuid) -> Result<usize, DatabaseError>;
25    async fn list_datasets_by_owner(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatabaseError>;
26    async fn list_datasets(&self) -> Result<Vec<Dataset>, DatabaseError>;
27    async fn delete_dataset(&self, id: Uuid) -> Result<(), DatabaseError>;
28    async fn detach_data_from_dataset(
29        &self,
30        dataset_id: Uuid,
31        data_id: Uuid,
32    ) -> Result<(), DatabaseError>;
33
34    // ------------------------------------------------------------------
35    // Pipeline cleanup methods
36    // ------------------------------------------------------------------
37
38    /// Delete all `pipeline_runs` rows for a given dataset.
39    ///
40    /// Needed for data-scoped deletion where the dataset itself is not deleted
41    /// (FK cascade does not fire) but the pipeline cache should be invalidated.
42    async fn delete_pipeline_runs_by_dataset(&self, dataset_id: Uuid)
43    -> Result<u64, DatabaseError>;
44
45    /// Clear `pipeline_status` JSON entries keyed by `dataset_id` from all
46    /// `Data` records linked to that dataset.
47    ///
48    /// Must be called while junction rows still exist.
49    async fn clear_pipeline_status_for_dataset(
50        &self,
51        dataset_id: Uuid,
52    ) -> Result<usize, DatabaseError>;
53
54    /// Clear only the `cognify_pipeline` key from the `pipeline_status` JSON
55    /// of a single Data record, removing only the entry keyed by `dataset_id`.
56    async fn clear_cognify_pipeline_status_for_data(
57        &self,
58        data_id: Uuid,
59        dataset_id: Uuid,
60    ) -> Result<(), DatabaseError>;
61
62    // ------------------------------------------------------------------
63    // Graph provenance methods
64    // ------------------------------------------------------------------
65
66    /// Get all provenance node rows for a dataset.
67    async fn get_nodes_by_dataset(&self, dataset_id: Uuid)
68    -> Result<Vec<GraphNode>, DatabaseError>;
69
70    /// Get all provenance edge rows for a dataset.
71    async fn get_edges_by_dataset(&self, dataset_id: Uuid)
72    -> Result<Vec<GraphEdge>, DatabaseError>;
73
74    /// Get nodes belonging to `(data_id, dataset_id)` whose slug is NOT shared
75    /// with other data items in the same dataset. Safe for targeted deletion.
76    async fn get_unique_nodes_for_data(
77        &self,
78        data_id: Uuid,
79        dataset_id: Uuid,
80    ) -> Result<Vec<GraphNode>, DatabaseError>;
81
82    /// Get edges belonging to `(data_id, dataset_id)` whose slug is NOT shared
83    /// with other data items in the same dataset.
84    async fn get_unique_edges_for_data(
85        &self,
86        data_id: Uuid,
87        dataset_id: Uuid,
88    ) -> Result<Vec<GraphEdge>, DatabaseError>;
89
90    /// Delete all provenance node rows for a dataset.
91    async fn delete_provenance_nodes_for_dataset(
92        &self,
93        dataset_id: Uuid,
94    ) -> Result<(), DatabaseError>;
95
96    /// Delete all provenance edge rows for a dataset.
97    async fn delete_provenance_edges_for_dataset(
98        &self,
99        dataset_id: Uuid,
100    ) -> Result<(), DatabaseError>;
101
102    /// Delete provenance node rows for a specific `(data_id, dataset_id)` pair.
103    async fn delete_provenance_nodes_for_data(
104        &self,
105        data_id: Uuid,
106        dataset_id: Uuid,
107    ) -> Result<(), DatabaseError>;
108
109    /// Delete provenance edge rows for a specific `(data_id, dataset_id)` pair.
110    async fn delete_provenance_edges_for_data(
111        &self,
112        data_id: Uuid,
113        dataset_id: Uuid,
114    ) -> Result<(), DatabaseError>;
115
116    /// Count provenance node rows for a specific `(data_id, dataset_id)` pair.
117    async fn get_provenance_node_count_for_data(
118        &self,
119        data_id: Uuid,
120        dataset_id: Uuid,
121    ) -> Result<usize, DatabaseError>;
122
123    /// Count provenance edge rows for a specific `(data_id, dataset_id)` pair.
124    async fn get_provenance_edge_count_for_data(
125        &self,
126        data_id: Uuid,
127        dataset_id: Uuid,
128    ) -> Result<usize, DatabaseError>;
129
130    // ------------------------------------------------------------------
131    // Search history cleanup methods
132    // ------------------------------------------------------------------
133
134    /// Delete all search history (queries + cascaded results) for a user.
135    ///
136    /// Returns the number of deleted query rows.
137    async fn delete_search_history_for_user(&self, user_id: Uuid) -> Result<u64, DatabaseError>;
138
139    /// Delete all search history (queries + cascaded results).
140    ///
141    /// Returns the number of deleted query rows.
142    async fn delete_all_search_history(&self) -> Result<u64, DatabaseError>;
143
144    /// Count search history query rows for a specific user.
145    async fn count_search_history_for_user(&self, user_id: Uuid) -> Result<u64, DatabaseError>;
146
147    /// Count all search history query rows.
148    async fn count_all_search_history(&self) -> Result<u64, DatabaseError>;
149}
150
151#[async_trait]
152impl DeleteDb for DatabaseConnection {
153    async fn get_data(&self, id: Uuid) -> Result<Option<Data>, DatabaseError> {
154        data::get_data(self, id).await
155    }
156
157    async fn delete_data(&self, id: Uuid) -> Result<(), DatabaseError> {
158        data::delete_data(self, id).await
159    }
160
161    async fn count_data_dataset_links(&self, data_id: Uuid) -> Result<usize, DatabaseError> {
162        data::count_data_dataset_links(self, data_id).await
163    }
164
165    async fn list_datasets_for_data(&self, data_id: Uuid) -> Result<Vec<Dataset>, DatabaseError> {
166        data::list_datasets_for_data(self, data_id).await
167    }
168
169    async fn get_dataset_by_name(
170        &self,
171        name: &str,
172        owner_id: Uuid,
173        tenant_id: Option<Uuid>,
174    ) -> Result<Option<Dataset>, DatabaseError> {
175        datasets::get_dataset_by_name(self, name, owner_id, tenant_id).await
176    }
177
178    async fn get_dataset_data(&self, dataset_id: Uuid) -> Result<Vec<Data>, DatabaseError> {
179        datasets::get_dataset_data(self, dataset_id).await
180    }
181
182    async fn count_dataset_data(&self, dataset_id: Uuid) -> Result<usize, DatabaseError> {
183        datasets::count_dataset_data(self, dataset_id).await
184    }
185
186    async fn list_datasets_by_owner(&self, owner_id: Uuid) -> Result<Vec<Dataset>, DatabaseError> {
187        datasets::list_datasets_by_owner(self, owner_id).await
188    }
189
190    async fn list_datasets(&self) -> Result<Vec<Dataset>, DatabaseError> {
191        datasets::list_datasets(self).await
192    }
193
194    async fn delete_dataset(&self, id: Uuid) -> Result<(), DatabaseError> {
195        datasets::delete_dataset(self, id).await
196    }
197
198    async fn detach_data_from_dataset(
199        &self,
200        dataset_id: Uuid,
201        data_id: Uuid,
202    ) -> Result<(), DatabaseError> {
203        datasets::detach_data_from_dataset(self, dataset_id, data_id).await
204    }
205
206    // ------------------------------------------------------------------
207    // Pipeline cleanup
208    // ------------------------------------------------------------------
209
210    async fn delete_pipeline_runs_by_dataset(
211        &self,
212        dataset_id: Uuid,
213    ) -> Result<u64, DatabaseError> {
214        pipeline_runs::delete_pipeline_runs_by_dataset(self, dataset_id).await
215    }
216
217    async fn clear_pipeline_status_for_dataset(
218        &self,
219        dataset_id: Uuid,
220    ) -> Result<usize, DatabaseError> {
221        data::clear_pipeline_status_for_dataset(self, dataset_id).await
222    }
223
224    async fn clear_cognify_pipeline_status_for_data(
225        &self,
226        data_id: Uuid,
227        dataset_id: Uuid,
228    ) -> Result<(), DatabaseError> {
229        data::clear_cognify_pipeline_status_for_data(self, data_id, dataset_id).await
230    }
231
232    // ------------------------------------------------------------------
233    // Graph provenance
234    // ------------------------------------------------------------------
235
236    async fn get_nodes_by_dataset(
237        &self,
238        dataset_id: Uuid,
239    ) -> Result<Vec<GraphNode>, DatabaseError> {
240        graph_storage::get_nodes_by_dataset(self, dataset_id).await
241    }
242
243    async fn get_edges_by_dataset(
244        &self,
245        dataset_id: Uuid,
246    ) -> Result<Vec<GraphEdge>, DatabaseError> {
247        graph_storage::get_edges_by_dataset(self, dataset_id).await
248    }
249
250    async fn get_unique_nodes_for_data(
251        &self,
252        data_id: Uuid,
253        dataset_id: Uuid,
254    ) -> Result<Vec<GraphNode>, DatabaseError> {
255        graph_storage::get_unique_nodes_for_data(self, data_id, dataset_id).await
256    }
257
258    async fn get_unique_edges_for_data(
259        &self,
260        data_id: Uuid,
261        dataset_id: Uuid,
262    ) -> Result<Vec<GraphEdge>, DatabaseError> {
263        graph_storage::get_unique_edges_for_data(self, data_id, dataset_id).await
264    }
265
266    async fn delete_provenance_nodes_for_dataset(
267        &self,
268        dataset_id: Uuid,
269    ) -> Result<(), DatabaseError> {
270        graph_storage::delete_nodes_by_dataset(self, dataset_id).await
271    }
272
273    async fn delete_provenance_edges_for_dataset(
274        &self,
275        dataset_id: Uuid,
276    ) -> Result<(), DatabaseError> {
277        graph_storage::delete_edges_by_dataset(self, dataset_id).await
278    }
279
280    async fn delete_provenance_nodes_for_data(
281        &self,
282        data_id: Uuid,
283        dataset_id: Uuid,
284    ) -> Result<(), DatabaseError> {
285        graph_storage::delete_nodes_for_data(self, data_id, dataset_id).await
286    }
287
288    async fn delete_provenance_edges_for_data(
289        &self,
290        data_id: Uuid,
291        dataset_id: Uuid,
292    ) -> Result<(), DatabaseError> {
293        graph_storage::delete_edges_for_data(self, data_id, dataset_id).await
294    }
295
296    async fn get_provenance_node_count_for_data(
297        &self,
298        data_id: Uuid,
299        dataset_id: Uuid,
300    ) -> Result<usize, DatabaseError> {
301        graph_storage::count_nodes_for_data(self, data_id, dataset_id).await
302    }
303
304    async fn get_provenance_edge_count_for_data(
305        &self,
306        data_id: Uuid,
307        dataset_id: Uuid,
308    ) -> Result<usize, DatabaseError> {
309        graph_storage::count_edges_for_data(self, data_id, dataset_id).await
310    }
311
312    // ------------------------------------------------------------------
313    // Search history cleanup
314    // ------------------------------------------------------------------
315
316    async fn delete_search_history_for_user(&self, user_id: Uuid) -> Result<u64, DatabaseError> {
317        search_history::delete_queries_by_user(self, user_id).await
318    }
319
320    async fn delete_all_search_history(&self) -> Result<u64, DatabaseError> {
321        search_history::delete_all_queries(self).await
322    }
323
324    async fn count_search_history_for_user(&self, user_id: Uuid) -> Result<u64, DatabaseError> {
325        search_history::count_queries_by_user(self, user_id).await
326    }
327
328    async fn count_all_search_history(&self) -> Result<u64, DatabaseError> {
329        search_history::count_all_queries(self).await
330    }
331}